上世纪的技术“长轮询”,为什么被Nacos和Apollo看上?
数据交互模式
长轮询与轮询
-
推送延迟。客户端每隔 5s 拉取一次配置,若配置变更发生在第 6s,则配置推送的延迟会达到 4s。 -
服务端压力。配置一般不会发生变化,频繁的轮询会给服务端造成很大的压力。 -
推送延迟和服务端压力无法中和。降低轮询的间隔,延迟降低,压力增加;增加轮询的间隔,压力降低,延迟增高。
-
推送延迟。服务端数据发生变更后,长轮询结束,立刻返回响应给客户端。 -
服务端压力。长轮询的间隔期一般很长,例如 30s、60s,并且服务端 hold 住连接不会消耗太多服务端资源。
配置中心长轮询设计
-
客户端发起长轮询
-
服务端监听数据变化
服务端会维护 dataId 和长轮询的映射关系,如果配置发生变化,服务端会找到对应的连接,为响应写入更新后的配置内容。如果超时内配置未发生变化,服务端找到对应的超时长轮询连接,写入 304 响应。
304 在 HTTP 响应码中代表“未改变”,并不代表错误。比较契合长轮询时,配置未发生变更的场景。
-
客户端接收长轮询响应
首先查看响应码是 200 还是 304,以判断配置是否变更,做出相应的回调。之后再次发起下一次长轮询。
-
服务端设置配置写入的接入点
主要用配置控制台和 client 发布配置,触发配置变更
Servlet3.0 并不是一个特别新的规范,它跟 Java 6 是同一时期的产物。例如 SpringBoot 内嵌的 Tomcat 很早就支持了 Servlet3.0,你无需担心 AsyncContext 机制不起作用。
客户端实现
@Slf4j
public class ConfigClient {
private CloseableHttpClient httpClient;
private RequestConfig requestConfig;
public ConfigClient() {
this.httpClient = HttpClientBuilder.create().build();
// ① httpClient 客户端超时时间要大于长轮询约定的超时时间
this.requestConfig = RequestConfig.custom().setSocketTimeout( 40000).build();
}
@SneakyThrows
public void longPolling(String url, String dataId) {
String endpoint = url + "?dataId=" + dataId;
HttpGet request = new HttpGet(endpoint);
CloseableHttpResponse response = httpClient.execute(request);
switch (response.getStatusLine().getStatusCode()) {
case 200: {
BufferedReader rd = new BufferedReader( new InputStreamReader(response.getEntity()
.getContent()));
StringBuilder result = new StringBuilder();
String line;
while ((line = rd.readLine()) != null) {
result.append(line);
}
response.close();
String configInfo = result.toString();
log.info( "dataId: [{}] changed, receive configInfo: {}", dataId, configInfo);
longPolling(url, dataId);
break;
}
// ② 304 响应码标记配置未变更
case 304: {
log.info( "longPolling dataId: [{}] once finished, configInfo is unchanged, longPolling again", dataId);
longPolling(url, dataId);
break;
}
default: {
throw new RuntimeException( "unExcepted HTTP status code");
}
}
}
public static void main(String[] args) {
// httpClient 会打印很多 debug 日志,关闭掉
Logger logger = (Logger)LoggerFactory.getLogger( "org.apache.http");
logger.setLevel(Level.INFO);
logger.setAdditive( false);
ConfigClient configClient = new ConfigClient();
// ③ 对 dataId: user 进行配置监听
configClient.longPolling( "http://127.0.0.1:8080/listener", "user");
}
}
-
RequestConfig.custom().setSocketTimeout(40000).build()
。httpClient 客户端超时时间要大于长轮询约定的超时时间。很好理解,不然还没等服务端返回,客户端会自行断开 HTTP 连接。 -
response.getStatusLine().getStatusCode() == 304
。前文介绍过,约定使用 304 响应码来标识配置未发生变更,客户端继续发起长轮询。 -
configClient.longPolling("http://127.0.0.1:8080/listener", "user")
。在示例中,我们处于简单考虑,仅仅启动一个客户端,对单一的 dataId:user 进行监听(注意,需要先启动 server 端)。
服务端实现
@RestController
@Slf4j
@SpringBootApplication
public class ConfigServer {
@Data
private static class AsyncTask {
// 长轮询请求的上下文,包含请求和响应体
private AsyncContext asyncContext;
// 超时标记
private boolean timeout;
public AsyncTask(AsyncContext asyncContext, boolean timeout) {
this.asyncContext = asyncContext;
this.timeout = timeout;
}
}
// guava 提供的多值 Map,一个 key 可以对应多个 value
private Multimap<String, AsyncTask> dataIdContext = Multimaps.synchronizedSetMultimap(HashMultimap.create());
private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat( "longPolling-timeout-checker-%d")
.build();
private ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor( 1, threadFactory);
// 配置监听接入点
@RequestMapping( "/listener")
public void addListener(HttpServletRequest request, HttpServletResponse response) {
String dataId = request.getParameter( "dataId");
// 开启异步
AsyncContext asyncContext = request.startAsync(request, response);
AsyncTask asyncTask = new AsyncTask(asyncContext, true);
// 维护 dataId 和异步请求上下文的关联
dataIdContext.put(dataId, asyncTask);
// 启动定时器,30s 后写入 304 响应
timeoutChecker.schedule(() -> {
if (asyncTask.isTimeout()) {
dataIdContext.remove(dataId, asyncTask);
response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
asyncContext.complete();
}
}, 30000, TimeUnit.MILLISECONDS);
}
// 配置发布接入点
@RequestMapping( "/publishConfig")
@SneakyThrows
public String publishConfig(String dataId, String configInfo) {
log.info( "publish configInfo dataId: [{}], configInfo: {}", dataId, configInfo);
Collection<AsyncTask> asyncTasks = dataIdContext.removeAll(dataId);
for (AsyncTask asyncTask : asyncTasks) {
asyncTask.setTimeout( false);
HttpServletResponse response = (HttpServletResponse)asyncTask.getAsyncContext().getResponse();
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().println(configInfo);
asyncTask.getAsyncContext().complete();
}
return "success";
}
public static void main(String[] args) {
SpringApplication.run(ConfigServer .class, args);
}
}
@RequestMapping("/listener")
,配置监听接入点,也是长轮询的入口。在获取 dataId 之后,使用
request.startAsync
将请求设置为异步,这样在方法结束后,不会占用 Tomcat 的线程池。
dataIdContext.put(dataId, asyncTask)
会将 dataId 和异步请求上下文给关联起来,方便配置发布时,拿到对应的上下文。注意这里使用了一个 guava 提供的数据结构
Multimap<String, AsyncTask> dataIdContext
,它是一个多值 Map,一个 key 可以对应多个 value,你也可以理解为
Map<String,List<AsyncTask>>
,但使用
Multimap
维护起来可以更方便地处理一些并发逻辑。至于为什么会有多值,很好理解,因为配置中心的 Server 端会接受来自多个客户端对同一个 dataId 的监听。
timeoutChecker.schedule()
启动定时器,30s 后写入 304 响应。再结合之前客户端的逻辑,接收到 304 之后,会重新发起长轮询,形成一个循环。
@RequestMapping("/publishConfig")
,配置发布的入口。配置变更后,根据 dataId 一次拿出所有的长轮询,为之写入变更的响应,同时不要忘记取消定时任务。至此,完成了一个配置变更后推送的流程。
启动配置监听
22:18:09.185 [main] INFO moe.cnkirito.demo.ConfigClient - longPolling dataId: [user] once finished, configInfo is unchanged, longPolling again
22:18:39.197 [main] INFO moe.cnkirito.demo.ConfigClient - longPolling dataId: [user] once finished, configInfo is unchanged, longPolling again
curl -X GET "localhost:8080/publishConfig?dataId=user&configInfo=helloworld"
2021-01-24 22:18:50.801 INFO 73301 --- [nio-8080-exec-6] moe.cnkirito.demo.ConfigServer : publish configInfo dataId: [user], configInfo: helloworld
22:18:50.806 [main] INFO moe.cnkirito.demo.ConfigClient - dataId: [user] changed, receive configInfo: helloworld
实现细节思考
为什么需要定时器返回 304
-
和真正的客户端超时区分开。 -
仅仅使用异常(Exception)来表达异常流,而不应该用异常来表达正常的业务流。304 不是超时异常,而是长轮询中配置未变更的一种正常流程,不应该使用超时异常来表达。
长轮询包含多组 dataId
长轮询和长连接
-
长轮询实现起来比较容易,完全依赖于 HTTP 便可以实现全部逻辑,而 HTTP 是最能够被大众接受的通信方式。 -
长轮询使用 HTTP,便于多语言客户端的编写,大多数语言都有 HTTP 的客户端。
总结
-
本文介绍了长轮询、轮询、长连接这几种数据交互模型的差异性。 -
分析了 Nacos 和 Apollo 等主流配置中心均是通过长轮询的方式实现配置的实时推送的。实时感知建立在客户端拉的基础上,因为本质上还是通过 HTTP 进行的数据交互,之所以有“推”的感觉,是因为服务端 hold 住了客户端的响应体,并且在配置变更后主动写入了返回 response 对象再进行返回。 -
通过一个简单的 demo,实现了长轮询配置实时推送的过程演示,本文的 demo 示例存放在:https://github.com/lexburner/longPolling-demo
占小狼的博客
Java进阶技术干货、实践分享,跟着狼哥一起学习JVM、性能调优,欢迎关注。
Official Account
最近面试BAT,整理一份面试资料 《Java面试BAT通关手册》 ,覆盖了Java核心技术、JVM、Java并发、SSM、微服务、数据库、数据结构等等。