eureka有几个节点(Eureka源码系列疑问三级缓存)
eureka有几个节点(Eureka源码系列疑问三级缓存)if (this.shouldUseReadOnlyResponseCache) { this.timer.schedule(this.getCacheUpdateTask() new Date(System.currentTimeMillis() / responseCacheUpdateIntervalMs * responseCacheUpdateIntervalMs responseCacheUpdateIntervalMs) responseCacheUpdateIntervalMs); }private TimerTask getCacheUpdateTask() { return new TimerTask() { public void run() { ResponseCacheImpl.logger.debu
EurekaClient在向EurekaServer注册服务和在向EurekaServer拉取注册表信息时所涉及的其实是不同的缓存策略,Eureka Server 为了提供响应效率,提供了三层的缓存结构,将 Eureka Client 所需要的注册信息,直接存储在缓存结构中,实现原理如下图所示
一级缓存(注册表)ConcurrentHashMap
二级缓存(ReadWriteMap)guava#LoadingCache
三级缓存(ReadOnlyMap)ConcurrentHashMap
首先,EurekaClient在向EurekaServer注册时调用了EurekaServer的注册表服务接口,将注册信息添加到一级缓存上,在添加时会使用锁,会在一定程度上影响性能。
@POST
@Consumes({"application/json" "application/xml"})
public Response addInstance(InstanceInfo info @HeaderParam("x-netflix-discovery-replication") String isReplication) {
*****
this.registry.register(info "true".equals(isReplication));
return Response.status(204).build();
}
}
public void register(InstanceInfo info boolean isReplication) {
*****
super.register(info leaseDuration isReplication);
this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Register info.getAppName() info.getId() info (InstanceStatus)null isReplication);
}
public void register(InstanceInfo registrant int leaseDuration boolean isReplication) {
this.read.lock();
try {
Map<String Lease<InstanceInfo>> gMap = (Map)this.registry.get(registrant.getAppName());
EurekaMonitors.REGISTER.increment(isReplication);
//先判断是否已经注册过了
if (gMap == null) {
ConcurrentHashMap<String Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap();
gMap = (Map)this.registry.putIfAbsent(registrant.getAppName() gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = (Lease)((Map)gMap).get(registrant.getId());
if (existingLease != null && existingLease.getHolder() != null) {
} else {
synchronized(this.lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
this.expectedNumberOfClientsSendingRenews;
this.updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
//新建一个instanceInfo
Lease<InstanceInfo> lease = new Lease(registrant leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
//添加到map上 也就时一级缓存上
((Map)gMap).put(registrant.getId() lease);
} finally {
this.read.unlock();
}
}
EurekaClient在向EurekaServer拉取注册表信息时,其实是调用的三级缓存ReadOnlyMap,本质上是 ConcurrentHashMap,依赖定时从 readWriteCacheMap 同步数据,默认时间为 30 秒。
@GET
public Response getApplication(@PathParam("version") String version @HeaderParam("Accept") String acceptHeader @HeaderParam("X-Eureka-Accept") String eurekaAccept) {
*****
Key cacheKey = new Key(EntityType.Application this.appName keyType CurrentRequestVersion.get() EurekaAccept.fromString(eurekaAccept));
String payLoad = this.responseCache.get(cacheKey);
*****
}
}
public String get(Key key) {
return this.get(key this.shouldUseReadOnlyResponseCache);
}
@VisibleForTesting
String get(Key key boolean useReadOnlyCache) {
ResponseCacheImpl.Value payload = this.getValue(key useReadOnlyCache);
return payload != null && !payload.getPayload().equals("") ? payload.getPayload() : null;
}
为了供客户端获取注册信息时使用,其缓存更新,依赖于定时器的更新,通过和 readWriteCacheMap 的值做对比,如果数据不一致,则以 readWriteCacheMap 的数据为准。
@VisibleForTesting
ResponseCacheImpl.Value getValue(Key key boolean useReadOnlyCache) {
ResponseCacheImpl.Value payload = null;
try {
if (useReadOnlyCache) {
ResponseCacheImpl.Value currentPayload = (ResponseCacheImpl.Value)this.readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
payload = (ResponseCacheImpl.Value)this.readWriteCacheMap.get(key);
this.readOnlyCacheMap.put(key payload);
}
} else {
payload = (ResponseCacheImpl.Value)this.readWriteCacheMap.get(key);
}
} catch (Throwable var5) {
logger.error("Cannot get value for key : {}" key var5);
}
return payload;
}
ReadWriteMap缓存初始化,readWriteCacheMap使用的是LoadingCache对象,它是guava中提供的用来实现内存缓存的一个api
this.readWriteCacheMap = CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache()).expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds() TimeUnit.SECONDS).removalListener(new RemovalListener<Key ResponseCacheImpl.Value>() {
public void onRemoval(RemovalNotification<Key ResponseCacheImpl.Value> notification) {
Key removedKey = (Key)notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
ResponseCacheImpl.this.regionSpecificKeys.remove(cloneWithNoRegions removedKey);
}
}
}).build(new CacheLoader<Key ResponseCacheImpl.Value>() {
public ResponseCacheImpl.Value load(Key key) throws Exception {
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
ResponseCacheImpl.this.regionSpecificKeys.put(cloneWithNoRegions key);
}
ResponseCacheImpl.Value value = ResponseCacheImpl.this.generatePayload(key);
return value;
}
});
同步缓存
if (this.shouldUseReadOnlyResponseCache) {
this.timer.schedule(this.getCacheUpdateTask() new Date(System.currentTimeMillis() / responseCacheUpdateIntervalMs * responseCacheUpdateIntervalMs responseCacheUpdateIntervalMs) responseCacheUpdateIntervalMs);
}
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
public void run() {
ResponseCacheImpl.logger.debug("Updating the client cache from response cache");
Iterator var1 = ResponseCacheImpl.this.readOnlyCacheMap.keySet().iterator();
while(var1.hasNext()) {
Key key = (Key)var1.next();
if (ResponseCacheImpl.logger.isDebugEnabled()) {
ResponseCacheImpl.logger.debug("Updating the client cache from response cache for key : {} {} {} {}" new Object[]{key.getEntityType() key.getName() key.getVersion() key.getType()});
}
try {
CurrentRequestVersion.set(key.getVersion());
ResponseCacheImpl.Value cacheValue = (ResponseCacheImpl.Value)ResponseCacheImpl.this.readWriteCacheMap.get(key);
ResponseCacheImpl.Value currentCacheValue = (ResponseCacheImpl.Value)ResponseCacheImpl.this.readOnlyCacheMap.get(key);
if (cacheValue != currentCacheValue) {
ResponseCacheImpl.this.readOnlyCacheMap.put(key cacheValue);
}
} catch (Throwable var8) {
ResponseCacheImpl.logger.error("Error while updating the client cache from response cache for key {}" key.toStringCompact() var8);
} finally {
CurrentRequestVersion.remove();
}
}
}
};
}
当然在注册时,原有的二级缓存应该会有一些信息上的变化,这个时时候EurekaClient在向EurekaServer注册过后会显示清理二级缓存
public void register(InstanceInfo registrant int leaseDuration boolean isReplication) {
this.read.lock();
try {
******
this.invalidateCache(registrant.getAppName() registrant.getVIPAddress() registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})" new Object[]{registrant.getAppName() registrant.getId() registrant.getStatus() isReplication});
} finally {
this.read.unlock();
}
}