快捷搜索:  手机  明星

eureka有几个节点(Eureka源码系列疑问三级缓存)

eureka有几个节点(Eureka源码系列疑问三级缓存)if (this.shouldUseReadOnlyResponseCache) { this.timer.schedule(this.getCacheUpdateTask() new Date(System.currentTimeMillis() / responseCacheUpdateIntervalMs * responseCacheUpdateIntervalMs responseCacheUpdateIntervalMs) responseCacheUpdateIntervalMs); }private TimerTask getCacheUpdateTask() { return new TimerTask() { public void run() { ResponseCacheImpl.logger.debu

EurekaClient在向EurekaServer注册服务和在向EurekaServer拉取注册表信息时所涉及的其实是不同的缓存策略,Eureka Server 为了提供响应效率,提供了三层的缓存结构,将 Eureka Client 所需要的注册信息,直接存储在缓存结构中,实现原理如下图所示

eureka有几个节点(Eureka源码系列疑问三级缓存)(1)

一级缓存(注册表)ConcurrentHashMap
二级缓存(ReadWriteMap)guava#LoadingCache
三级缓存(ReadOnlyMap)ConcurrentHashMap

首先,EurekaClient在向EurekaServer注册时调用了EurekaServer的注册表服务接口,将注册信息添加到一级缓存上,在添加时会使用锁,会在一定程度上影响性能。

@POST @Consumes({"application/json" "application/xml"}) public Response addInstance(InstanceInfo info @HeaderParam("x-netflix-discovery-replication") String isReplication) { ***** this.registry.register(info "true".equals(isReplication)); return Response.status(204).build(); } }

public void register(InstanceInfo info boolean isReplication) { ***** super.register(info leaseDuration isReplication); this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Register info.getAppName() info.getId() info (InstanceStatus)null isReplication); }

public void register(InstanceInfo registrant int leaseDuration boolean isReplication) { this.read.lock(); try { Map<String Lease<InstanceInfo>> gMap = (Map)this.registry.get(registrant.getAppName()); EurekaMonitors.REGISTER.increment(isReplication); //先判断是否已经注册过了 if (gMap == null) { ConcurrentHashMap<String Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap(); gMap = (Map)this.registry.putIfAbsent(registrant.getAppName() gNewMap); if (gMap == null) { gMap = gNewMap; } } Lease<InstanceInfo> existingLease = (Lease)((Map)gMap).get(registrant.getId()); if (existingLease != null && existingLease.getHolder() != null) { } else { synchronized(this.lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { this.expectedNumberOfClientsSendingRenews; this.updateRenewsPerMinThreshold(); } } logger.debug("No previous lease information found; it is new registration"); } //新建一个instanceInfo Lease<InstanceInfo> lease = new Lease(registrant leaseDuration); if (existingLease != null) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } //添加到map上 也就时一级缓存上 ((Map)gMap).put(registrant.getId() lease); } finally { this.read.unlock(); } }

EurekaClient在向EurekaServer拉取注册表信息时,其实是调用的三级缓存ReadOnlyMap,本质上是 ConcurrentHashMap,依赖定时从 readWriteCacheMap 同步数据,默认时间为 30 秒。

@GET public Response getApplication(@PathParam("version") String version @HeaderParam("Accept") String acceptHeader @HeaderParam("X-Eureka-Accept") String eurekaAccept) { ***** Key cacheKey = new Key(EntityType.Application this.appName keyType CurrentRequestVersion.get() EurekaAccept.fromString(eurekaAccept)); String payLoad = this.responseCache.get(cacheKey); ***** } }

public String get(Key key) { return this.get(key this.shouldUseReadOnlyResponseCache); }

@VisibleForTesting String get(Key key boolean useReadOnlyCache) { ResponseCacheImpl.Value payload = this.getValue(key useReadOnlyCache); return payload != null && !payload.getPayload().equals("") ? payload.getPayload() : null; }

为了供客户端获取注册信息时使用,其缓存更新,依赖于定时器的更新,通过和 readWriteCacheMap 的值做对比,如果数据不一致,则以 readWriteCacheMap 的数据为准。

@VisibleForTesting ResponseCacheImpl.Value getValue(Key key boolean useReadOnlyCache) { ResponseCacheImpl.Value payload = null; try { if (useReadOnlyCache) { ResponseCacheImpl.Value currentPayload = (ResponseCacheImpl.Value)this.readOnlyCacheMap.get(key); if (currentPayload != null) { payload = currentPayload; } else { payload = (ResponseCacheImpl.Value)this.readWriteCacheMap.get(key); this.readOnlyCacheMap.put(key payload); } } else { payload = (ResponseCacheImpl.Value)this.readWriteCacheMap.get(key); } } catch (Throwable var5) { logger.error("Cannot get value for key : {}" key var5); } return payload; }

ReadWriteMap缓存初始化,readWriteCacheMap使用的是LoadingCache对象,它是guava中提供的用来实现内存缓存的一个api

this.readWriteCacheMap = CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache()).expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds() TimeUnit.SECONDS).removalListener(new RemovalListener<Key ResponseCacheImpl.Value>() { public void onRemoval(RemovalNotification<Key ResponseCacheImpl.Value> notification) { Key removedKey = (Key)notification.getKey(); if (removedKey.hasRegions()) { Key cloneWithNoRegions = removedKey.cloneWithoutRegions(); ResponseCacheImpl.this.regionSpecificKeys.remove(cloneWithNoRegions removedKey); } } }).build(new CacheLoader<Key ResponseCacheImpl.Value>() { public ResponseCacheImpl.Value load(Key key) throws Exception { if (key.hasRegions()) { Key cloneWithNoRegions = key.cloneWithoutRegions(); ResponseCacheImpl.this.regionSpecificKeys.put(cloneWithNoRegions key); } ResponseCacheImpl.Value value = ResponseCacheImpl.this.generatePayload(key); return value; } });

同步缓存

if (this.shouldUseReadOnlyResponseCache) { this.timer.schedule(this.getCacheUpdateTask() new Date(System.currentTimeMillis() / responseCacheUpdateIntervalMs * responseCacheUpdateIntervalMs responseCacheUpdateIntervalMs) responseCacheUpdateIntervalMs); }

private TimerTask getCacheUpdateTask() { return new TimerTask() { public void run() { ResponseCacheImpl.logger.debug("Updating the client cache from response cache"); Iterator var1 = ResponseCacheImpl.this.readOnlyCacheMap.keySet().iterator(); while(var1.hasNext()) { Key key = (Key)var1.next(); if (ResponseCacheImpl.logger.isDebugEnabled()) { ResponseCacheImpl.logger.debug("Updating the client cache from response cache for key : {} {} {} {}" new Object[]{key.getEntityType() key.getName() key.getVersion() key.getType()}); } try { CurrentRequestVersion.set(key.getVersion()); ResponseCacheImpl.Value cacheValue = (ResponseCacheImpl.Value)ResponseCacheImpl.this.readWriteCacheMap.get(key); ResponseCacheImpl.Value currentCacheValue = (ResponseCacheImpl.Value)ResponseCacheImpl.this.readOnlyCacheMap.get(key); if (cacheValue != currentCacheValue) { ResponseCacheImpl.this.readOnlyCacheMap.put(key cacheValue); } } catch (Throwable var8) { ResponseCacheImpl.logger.error("Error while updating the client cache from response cache for key {}" key.toStringCompact() var8); } finally { CurrentRequestVersion.remove(); } } } }; }

当然在注册时,原有的二级缓存应该会有一些信息上的变化,这个时时候EurekaClient在向EurekaServer注册过后会显示清理二级缓存

public void register(InstanceInfo registrant int leaseDuration boolean isReplication) { this.read.lock(); try { ****** this.invalidateCache(registrant.getAppName() registrant.getVIPAddress() registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})" new Object[]{registrant.getAppName() registrant.getId() registrant.getStatus() isReplication}); } finally { this.read.unlock(); } }

猜您喜欢: