vlambda博客
学习文章列表

设计一个具有等待队列的连接池

        说到连接池相关很多人都使用过,常见的有数据连接池,HttpClient连接池等。连接池的作用是保持一定量的连接让交互过程复用这些连接,从而大大节省连接创建过程或过多的损耗。在连接池策略中往往当池没有连接的情况都会抛出异常告诉使用者资源无法使用,正常来说这种做法比较普遍,但并发峰值往往都是瞬间存在的,只要没资源就拒绝这种情况服务上或多或少有些不太友好。

应用需求

        在构建组件网关转发和HttpClient设计中大量加入了连接池,在最开始的设计中只要连接池的连接没有的情况下直接抛出异常拒绝;但实际并发时的峰值往往是瞬间出现,在下一刻就会回落到比较低的水平;在这么短暂的时间内拒绝当前请求的确是没有必要,因此在连接池前置加个等待队列。

设计

        针对等待设计一般两种情况:一种是自旋等待,而另一种则是回调;前者不用说比较占用cpu资源,但好处就是代码结构好;后者则cpu资源利用好,但基于异步回调函数方式代码结构并不友好,集成逻辑比较麻烦。但在.net集成了async/await功能后异步处理再也不需要通讯函数回调的方式编写,整体代码结构和自旋等待方式并没差异。

实现

        接下来讲述BeetleX.Http.Clients是如何实现具有等待队列的连接池的,从而通过这机制保障并发峰值时可以接管更多的请求。针对连接池主要提供两个方法分别是Pop和Push,前者是从池中获取连接,后者则是把连接回收到池里.等待队列基本就围绕着这两个方法处理即可。接下来看一下BeetleX.Http.Clients的HttpClientHandlerPool是怎样实现的。

  • Pop

        该方法是从连接池中获取连接

public Task<HttpClientHandler> Pop(){ lock (this) { if (mPools.Count > 0) { var result = mPools.Pop(); result.Using = true; result.TimeOut = BeetleX.TimeWatch.GetElapsedMilliseconds() + TimeOut; return Task.FromResult(result); } if (Clients.Count < MaxConnections) { var result = Create(); result.Using = true; result.TimeOut = BeetleX.TimeWatch.GetElapsedMilliseconds() + TimeOut; return Task.FromResult(result); } if (mWaitQueue.Count < MaxWaitLength) { TaskCompletionSource<HttpClientHandler> completionSource = new TaskCompletionSource<HttpClientHandler>(); mWaitQueue.Enqueue(completionSource); return completionSource.Task; } else { throw new HttpClientException($"Request {Host} connections limit"); }
}}
  • 第一步是判断连接池有没有连接,如果有则直接返回可用连接。

  • 第二步如果连接池没有连接,则判断创建的连接数是否超过最大值,如果没有则创建一个新的连接并返回。

  • 第三步判断当前等待队列是否超出最大值,如果不是则创建对应的TaskCompletionSource存储到队列中返回相应的Task对象。

  • Push

    由于涉及到等待队列问题,所以当连接回收的时候也要做特别的处理

public void Push(HttpClientHandler client){ TaskCompletionSource<HttpClientHandler> result = null; lock (this) { if (mWaitQueue.Count > 0) { result = mWaitQueue.Dequeue(); client.Using = true; client.TimeOut = BeetleX.TimeWatch.GetElapsedMilliseconds() + TimeOut; } else { client.Using = false; mPools.Push(client); }    } if (result != null) { Task.Run(() => result.SetResult(client)); }}

    当连接回收后需要判断一下是否存在队列,如果存在则获取队列中等待的TaskCompletionSource并设置返回值;这里为何通过Task.Run来调用呢?主要原因是希望启用新线程来回调,不让当前线程处理回调任务影响其后续的工作。

使用

    针对连接池的使用只需要在调用Pop时加个await即可

client = await HttpHost.Pool.Pop();





BeetleX

开源跨平台通讯框架(支持TLS)
轻松实现高性能:tcp、http、websocket、redis、rpc和网关等服务应用

https://beetlex.io

如果你想了解某方面的知识或文章可以把想法发送到

[email protected]|[email protected]