Eureka注册中心功能及源码分析
近年来微服务不断的被应用,然而随着公司业务的增加,服务也不断的增加,从而导致服务实例失败与成功也难以维护。所以此时将众多的服务实例交由注册中心统一进行管理。而服务治理的最基本需求就是服务注册于服务发现,同时还需要服务治理的高可用。然而今天就从Netfilx的Eureka说起。 真的是太多了,不想写,最后附上一个详细的调用流程图吧,心烦,衰。
服务注册:EurekaClient会通过Http请求将自身的元信息上报给EurekaServer
服务发现:EurekaClient在启动时会通过Http请求到EurekaServer中获取注册表的信息,同时会将其保存在本地,并且在本地会开启一个定时任务从EurekaServer中获取最新的注册信息,而EurekaServer中的注册信息会每隔30秒进行更新。
服务续约:EurekaClient会通过Http请求进行服务续约,防止自身被误判从而导致剔除服务。
服务同步:主要用于EurekaServer进行集群部署,每个EurekaServer都会进行Http请求同步,以保证各自信息一致性。
服务调用:消费者(EurekaClient)从EurekaServer中获取到注册信息后,会通过Ribbon进行负载算法选择其中一个服务通过Http请求进行调用。
源码解析将略过SpringBoot源码,直接从SpringCloud Eureka源码进行讲解
EurekaServer源码解析
EurekaServer更像是一个Http服务器,说白了就是我们平时使用的SpringMVC,只不过说NetFilx的Eureka处理Http请求不是使用的SpringMVC而是使用Jersey,当然其原理都是一样的。 然而EurekaServer有两大功能,从集群节点获取注册表信息以及通过定时任务剔除服务。EurekaServer使用的 spring-cloud-starter-netflix-eureka-server 包,同时还需要在启动类上使用 @EnableEurekaServer 注解。
@EnableEurekaServer
然而这个注解里面其实并没有什么东西,也只是通过@Import注解导入了一个EurekaServerMarkerConfiguration,其主要所用是用于做一个标记配合Spring的@ConditionalOnBean注解使用的,当该类EurekaServerMarkerConfiguration存在就会对EurekaServerAutoConfiguration类进行加载以及初始化。
EurekaServerAutoConfiguration
EurekaServerAutoConfiguration类会在SpringBoot启动时会通过自动装配加载 spring-cloud-starter-netflix-eureka-server包中META-INF目录下的spring.factories文件中的类,那么也就是加载EurekaServerAutoConfiguration类 ,而此类主要为了实例化一些基本类
/**
* @author Gunnar Hillert
* @author Biju Kunjummen
* @author Fahim Farook
*/
@Configuration
@Import(EurekaServerInitializerConfiguration.class) //此类很重要主要y用于初始化EurekaServer配置
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
//这个就是主要用于dashboard
@Bean
@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
public EurekaController eurekaController() {
return new EurekaController(this.applicationInfoManager);
}
//EurekaServer上下文
@Bean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
registry, peerEurekaNodes, this.applicationInfoManager);
}
//EurekaServer启动类-》此类很重要
@Bean
public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
EurekaServerContext serverContext) {
return new EurekaServerBootstrap(this.applicationInfoManager,
this.eurekaClientConfig, this.eurekaServerConfig, registry,
serverContext);
}
/**
* 主要是Jersey使用
* Register the Jersey filter.
* @param eurekaJerseyApp an {@link Application} for the filter to be registered
* @return a jersey {@link FilterRegistrationBean}
*/
@Bean
public FilterRegistrationBean jerseyFilterRegistration(
javax.ws.rs.core.Application eurekaJerseyApp) {
FilterRegistrationBean bean = new FilterRegistrationBean();
bean.setFilter(new ServletContainer(eurekaJerseyApp));
bean.setOrder(Ordered.LOWEST_PRECEDENCE);
bean.setUrlPatterns(
Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
return bean;
}
}
EurekaServerInitializerConfiguration
这个类主要是由 EurekaServerAutoConfiguration类通过@Import进行导入的,然而此类实现了Spring的 SmartLifecycle生命周期类,也就是说会在Spring将容器中的Bean进行初始化完成后,就会调用SmartLifecycle类中start()方法。那么下面就来分析下这个类
public class EurekaServerInitializerConfiguration
implements ServletContextAware, SmartLifecycle, Ordered {
public void start() {
new Thread(() -> {
try {
// TODO: is this class even needed now?
//这里英文很清晰,EurekaServer启动从这里开始
eurekaServerBootstrap.contextInitialized(
EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
EurekaServerInitializerConfiguration.this.running = true;
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}).start();
}
}
上面介绍的两个类就是EurekaServer源码的主要入口类, 那么下面将直接从EurekaServer的两大功能说起
protected void initEurekaServerContext() throws Exception {
// For backward compatibility
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
if (isAws(this.applicationInfoManager.getInfo())) {
this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
this.eurekaClientConfig, this.registry, this.applicationInfoManager);
this.awsBinder.start();
}
EurekaServerContextHolder.initialize(this.serverContext);
log.info("Initialized server context");
// Copy registry from neighboring eureka node
//从集群节点同步注册表信息
int registryCount = this.registry.syncUp();
//剔除服务
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// Register all monitoring statistics.
EurekaMonitors.registerAllStats();
}
从集群节点同步注册表信息
@Override
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
//这里就是获取到所有的注册实例
Applications apps = eurekaClient.getApplications();
//拿到所有的注册实例
for (Application app : apps.getRegisteredApplications()) {
//循环所有的注册实例
for (InstanceInfo instance : app.getInstances()) {
try {
//判断是否可注册
if (isRegisterable(instance)) {
/**
* 注册条件:根据是否有租约,如果有则保留最新时间
* 否则更新为最初时间
* 同时修改实例的状态为UP,以及Type为ADDED
**/
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
定时任务剔除服务
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
//剔除服务的任务,主要就是看定时任务EvictionTask
evictionTaskRef.set(new EvictionTask());
//定时器schedule初始化方法
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
剔除服务任务
其实我觉得这里设计的很巧妙,在一线大厂BATJ,或者说NetFilx这些公司肯定动辄就是上千的服务,所以并不会一次性直接把这些服务直接剔除,而是先找到这些需要剔除的服务,然后通过随机的方式剔除服务,为什么要这样做? 我觉得在大型公司EurekaServer肯定是集群方式,也就是说把需要剔除的服务通过随机的方式让每个EurekaServer都能够剔除服务,而不是说让其中一个EurekaServer去剔除服务。
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
// We collect first all expired items, to evict them in random order. For large eviction sets,
// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
// the impact should be evenly distributed across all applications.
//这里就是把所有需要剔除的服务找出来
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
// triggering self-preservation. Without that we would wipe out full registry.
//本地注册表大小
int registrySize = (int) getLocalRegistrySize();
//注册表大小阈值
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
//这里计算出当前EurekaServer需要剔除的服务数量,然后取最小值
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
//
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
int next = i + random.nextInt(expiredLeases.size() - i);
//此方法用于交换位置,也就是说会将集合中的i和next元素进行换位
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
// 然后就会将该实例从注册表中删除,并修改该实例的类型为DELETED,
// 其实就是为了告知其他节点,这个服务需要删除
internalCancel(appName, id, false);
}
}
}
EurekaClient源码解析
EurekaClient需要将自身的元信息告知到EurekaServer,同时还需要到EurekaServer拉取所有的注册信息并缓存到本地然后Ribbon通过负载算法选择一个服务进行调用。如果需要将服务声明为EurekaClient就需要加上@EnableEurekaClient或者@EnableDiscoveryClient,以及添加spring-cloud-starter-netflix-eureka-client包。然后SpringBoot会在启动时通过自动装配加载器META-INF/spring.factories文件中的 EurekaClientAutoConfiguration类。
EurekaClientAutoConfiguration类
@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@Import(DiscoveryClientOptionalArgsConfiguration.class)
@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@ConditionalOnDiscoveryEnabled
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = {
"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
"org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })
public class EurekaClientAutoConfiguration {
/**
* 其实这个类最主要的地方就是实例话NetFilx原生的EurekaClient
**/
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager,
EurekaClientConfig config) {
return new CloudEurekaClient(manager, config, this.optionalArgs,
this.context);
}
/**
* 这个是SpringCloud对EurekaClient的封装,并将原生的EurekaClient注入到SpringCloud的EurekaClient
**/
@Bean
public DiscoveryClient discoveryClient(EurekaClient client,
EurekaClientConfig clientConfig) {
return new EurekaDiscoveryClient(client, clientConfig);
}
}
DiscoveryClient类源码
这个类的主要作用就对定时任务的初始化以及任务执行,比如服务注册,服务发现,服务续约等.
public class DiscoveryClient implements EurekaClient {
/**
* 只截取最重要部分
**/
try {
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
//服务续约的线程池
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
//服务发现的线程池
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
//最终,初始化这些定时任务
initScheduledTasks();
}
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread() //服务发现任务
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()//服务续约任务
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
public String getId() {
return "statusChangeListener";
}
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
//服务注册任务
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
服务发现
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
Applications applications = getApplications();
//如果是第一次启动或者禁用了增量跟新注册信息,就会全量拉取注册信息
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
getAndStoreFullRegistry();//全量拉取注册信息
} else {
getAndUpdateDelta(applications);//增量拉取注册信息
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
return true;
}
详细调用流程图