提升RabbitMQ消费速度的一些实践
来源:
https://blog.bossma.cn/rabbitmq/practices-on-improving-the-speed-of-rabbitmq-consumption/
RabbitMQ是一个开源的消息中间件,自带管理界面友好、开发语言支持广泛、没有对其它中间件的依赖,而且社区非常活跃,特别适合中小型企业拿来就用。这篇文章主要探讨提升RabbitMQ消费速度的一些方法和实践,比如增加消费者、提高Prefetch count、多线程处理、批量Ack等。
增加消费者
提高Prefetch count
RabbitMQ关于吞吐量,延迟和带宽的一些理论
参考文档:https://blog.csdn.net/gbbqrglvir3dyi82/article/details/78663828
多线程处理
来看一个例子:
consumer.Received += (o, e) =>
{
ThreadPool.QueueUserWorkItem(new WaitCallback(ProcessSingleContextMessage), e);
};
// 接收消息存入列表,当接收数量达到prefetchCount/2时就加入处理队列;
// 1/2是考虑了消息从RabbitMQ到消费者的传输时间,不需要等所有的消息都到达了才开始处理。
consumer.Received += (o, e) =>
{
lock(receiveLocker){
basicDeliverEventArgsList.Add(e);
if (basicDeliverEventArgsList.Count >= prefetchCount/2)
{
var deliverEventArgs = basicDeliverEventArgsList.ToArray();
basicDeliverEventArgsList.Clear();
EnProcessQueue(deliverEventArgs);
}
}
};
// 此处省略数据出队列的代码,请自行脑补
....
// 然后这个方法是用来处理消息的,将消息根据数据Key分成若干组,放到多个任务中并行处理;
// 相同数据Key的消息将分配到一个组中,在这个组中数据被顺序处理
private void Process(BasicDeliverEventArgs[] args)
{
if (args.Length <= 0)
{
return;
}
try
{
var tasks = CreateParallelProcessTasksByDataKey(args);
Task.WaitAll(tasks);
}
catch (Exception ex)
{
ToLog("处理任务发生异常", ex);
}
}
// 创建并行处理多条消息的任务
private Task[] CreateParallelProcessTasksByDataKey(BasicDeliverEventArgs[] args)
{
// 根据dataKey进行分组,dataKey可以放到消息的header中进行传输,这里就不给出具体的分组方法了
Dictionary<string, List<DeliverObject>> eDic = GetMessgeGroupByDataKey(args);
// 任务数量
var paralleTaskNum = this.parallelNum;
if (paralleTaskNum > eDic.Count)
{
paralleTaskNum = eDic.Count;
}
// 每个任务处理的消息数量
var perTaskNum = (int)Math.Ceiling(args.Length / (double)paralleTaskNum);
// 任务数组
List<Task> tasks = new List<Task>();
var taskArgs = new List<DeliverObject>();
for (int j = eDic.Count - 1; j >= 0; j--)
{
var currentElement = eDic.ElementAt(j);
taskArgs.AddRange(currentElement.Value);
eDic.Remove(currentElement.Key);
if (taskArgs.Count >= perTaskNum || j == 0)
{
// 创建任务,并处理分配的消息
var taskList = taskArgs.Select(d => d).ToList();
taskArgs.Clear();
var task = Task.Factory.StartNew(() =>
{
// 这这里处理分组中的消息
...
});
tasks.Add(task);
}
}
return tasks.ToArray();
}
批量Ack
channel.BasicAck(e.DeliveryTag, true);
总结
-
启用Prefetch count设置; -
先1个消费者,1次只接收1条,处理完毕后再传输下一条,这样可以避免并发冲突和消息顺序问题; -
如果消费速度不满足要求,则1次接收多条,按接收顺序处理; -
如果消费速度还是不满足要求,则1次接收多条,并行处理; -
如果消费速度还是不满足要求,则启动多个消费者,并行处理。 -
如果消费速度还是不满足要求,改需求,或者换别的中间件。
RabbitMQ • 长按关注
喜欢文章,点个在看