vlambda博客
学习文章列表

一文解读消息中间件RabbitMQ实现简单的RPC服务(图文+源码)

架构师学习路线
PHP架构师之路,专注于分享交流PHP进阶、性能调优、微服务架构、golang微服务、高并发分布式技术、大厂笔试面试技巧等相关文章、视频、程序等。
12篇原创内容
Official Account


RPC(Remote Procedure Call, 远程过程调用),是一种计算机通信协议。对于两台机器而言,就是 A 服务器上的应用程序调用 B 服务器上的函数或者方法,由于不在同一个内存空间或机器上运行,因此需要借助于网络通信。


1. RPC 框架

我们首先通过一张图理解 RPC 的工作流程:

一文解读消息中间件RabbitMQ实现简单的RPC服务(图文+源码)

因此,实现一个最简单的 RPC 服务,只需要 Client、Server 和 Network,本文就是利用消息中间件 RabbitMQ 作为 Network 载体传输信息,实现简单的 RPC 服务。简单原理可如下图所示:

一文解读消息中间件RabbitMQ实现简单的RPC服务(图文+源码)

即:当 Client 发送 RPC 请求时,Client 端是消息生产者,Server 端是消息消费者;当 Server 返回结果时,Server 端是消息生产者,Client 是消息消费者;发送和返回使用不同的队列。

接下来我们通过代码,详细展示一个计算斐波那契数列的 RPC 服务。


2. RPCServer 实现

2.1 Server 初始化

1.  `/**`
2. `* 队列名、交换机名、路由键`
3. `*/`
4. `private static final String EXCHANGE_NAME = "rpc_exchange";`
5. `private static final String QUEUE_NAME = "request_rpc_queue";`
6. `private static final String ROUTING_KEY = "rpc_routing_key";`
7. `private Connection connection = null;`
8. `private Channel channel = null;`
9. `private QueueingConsumer consumer = null;`
10. `/**`
11. `* Server的构造函数`
12. `*/`
13. `private RPCServer() {`
14. `try {`
15. `//创建链接`
16. `ConnectionFactory factory = new ConnectionFactory();`
17. `factory.setHost(Config.HOST);`
18. `factory.setPort(Config.PORT);`
19. `factory.setUsername(Config.USER);`
20. `factory.setPassword(Config.PASSWORD);`
21. `connection = factory.newConnection();`
22. `//创建信道`
23. `channel = connection.createChannel();`
24. `//设置AMQP的通信结构`
25. `channel.exchangeDeclare(EXCHANGE_NAME, "direct");`
26. `channel.queueDeclare(QUEUE_NAME, false, false, false, null);`
27. `channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);`
28. `//设置消费者`
29. `consumer = new QueueingConsumer(channel);`
30. `channel.basicConsume(QUEUE_NAME, false, QUEUE_NAME, consumer);`
31. `} catch (Exception e) {`
32. `LOG.error("build connection failed!", e);`
33. `}`
34. `}`

初始化就是声明 RabbitMQ 的链接工厂、链接、信道、队列、交换机等等,并做了绑定,由此构成了 AMQP 的通信结构。

2.2 监听队列并反馈

1.  `/**`
2. `* 开启server`
3. `*/`
4. `private void startServer() {`
5. `try {`
6. `LOG.info("Waiting for RPC calls.....");`
7. `while (true) {`
8. `//获得文本消息`
9. `QueueingConsumer.Delivery delivery = consumer.nextDelivery();`
10. `BasicProperties props = delivery.getProperties();`
11. `//返回消息的属性`
12. `BasicProperties replyProps = new BasicProperties.Builder()`
13. `.correlationId(props.getCorrelationId())`
14. `.build();`
15. `long receiveTime = System.currentTimeMillis();`
16. `JSONObject json = new JSONObject();`
17. `try {`
18. `String message = new String(delivery.getBody(), "UTF-8");`
19. `int n = Integer.parseInt(message);`
20. `LOG.info("Got a request: fib(" + message + ")");`
21. `json.put("status", "success");`
22. `json.put("result", fib(n));`
23. `} catch (Exception e) {`
24. `json.put("status", "fail");`
25. `json.put("reason", "Not a Number!");`
26. `LOG.error("receive message failed!", e);`
27. `} finally {`
28. `long responseTime = System.currentTimeMillis();`
29. `json.put("calculateTime", (responseTime - receiveTime));`
30. `channel.basicPublish("", props.getReplyTo(), replyProps, json.toString().getBytes("UTF-8"));`
31. `channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);`
32. `}`
33. `}`
34. `} catch (Exception e) {`
35. `LOG.error("server failed!", e);`
36. `} finally {`
37. `if (connection != null) {`
38. `try {`
39. `connection.close();`
40. `} catch (Exception e) {`
41. `LOG.error("close failed!", e);`
42. `}`
43. `}`
44. `}`
45. `}`

在该方法中使用了一个无限循环,每次处理一条消息。通过调用消费者对象的 nextDelivery 方法来获得 RabbitMQ 队列的最新一条消息。同时通过 getProperties 获取到消息中的反馈信息属性,用于标记客户端 Client 的属性。然后计算斐波那契数列的结果。
最后通过 basicAck 使用消息信封向 RabbitMQ 确认了该消息。

到这里就实现了计算斐波那契数列 RPC 服务的 Server 端。


3. RPCClient 实现

3.1 初始化 CLient

1.  `/**`
2. `* 消息请求的队列名、交换机名、路由键`
3. `*/`
4. `private static final String EXCHANGE_NAME = "rpc_exchange";`
5. `private static final String QUEUE_NAME = "request_rpc_queue";`
6. `private static final String ROUTING_KEY = "rpc_routing_key";`
7. `/**`
8. `* 消息返回的队列名、交换机名、路由键`
9. `*/`
10. `private static final String RESPONSE_QUEUE = "response_rpc_queue";`
11. `private static final String RESPONSE_ROUTING_KEY = "response_rpc_routing_key";`
12. `/**`
13. `* RabbitMQ的实体`
14. `*/`
15. `private Connection connection = null;`
16. `private Channel channel = null;`
17. `private QueueingConsumer consumer = null;`
18. `/**`
19. `* 构造客户端`
20. `* @throws Exception`
21. `*/`
22. `private RPCClient() throws Exception {`
23. `ConnectionFactory factory = new ConnectionFactory();`
24. `factory.setHost(Config.HOST);`
25. `factory.setPort(Config.PORT);`
26. `factory.setUsername(Config.USER);`
27. `factory.setPassword(Config.PASSWORD);`
28. `connection = factory.newConnection();`
29. `channel = connection.createChannel();`
30. `channel.exchangeDeclare(EXCHANGE_NAME, "direct");`
31. `channel.queueDeclare(QUEUE_NAME, false, false, false, null);`
32. `channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);`
33. `channel.queueDeclare(RESPONSE_QUEUE, false, false, false, null);`
34. `channel.queueBind(RESPONSE_QUEUE, EXCHANGE_NAME, RESPONSE_ROUTING_KEY);`
35. `consumer = new QueueingConsumer(channel);`
36. `channel.basicConsume(RESPONSE_QUEUE, true, consumer);`
37. `}`

这里声明 AMQP 结构体的方式和 Server 端类似,只不过 Client 端需要多声明一个队列,用于 RPC 的 response。

3.2 发送 / 接收消息

1.  `/**`
2. `* 请求server`
3. `* @param message`
4. `* @return`
5. `* @throws Exception`
6. `*/`
7. `private String requestMessage(String message) throws Exception {`
8. `String response = null;`
9. `String corrId = UUID.randomUUID().toString();`
10. `BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(RESPONSE_QUEUE).build();`
11. `channel.basicPublish("", QUEUE_NAME, props, message.getBytes("UTF-8"));`
12. `while (true) {`
13. `QueueingConsumer.Delivery delivery = consumer.nextDelivery();`
14. `if (delivery.getProperties().getCorrelationId().equals(corrId)) {`
15. `response = new String(delivery.getBody(),"UTF-8");`
16. `break;`
17. `}`
18. `}`
19. `return response;`
20. `}`

BasicProperties 用于存储你请求消息的属性,这里我设置了 correlationId 和 replyTo 属性,用于 Server 端的返回识别。


4. 运行测试

Client 端发送:

一文解读消息中间件RabbitMQ实现简单的RPC服务(图文+源码)

Server 端接收并处理:

一文解读消息中间件RabbitMQ实现简单的RPC服务(图文+源码)

Client 收到计算结果:

一文解读消息中间件RabbitMQ实现简单的RPC服务(图文+源码)

由于我运行 RabbitMQ 的服务器是租用的阿里云的,差不多传输时延在 60ms 左右,如果把 RPC 服务和消息中间件同机房部署的话延时基本上就在 ms 级别。


5. FAQ

5.1 说明

需要体验完整的过程,你需要如下环境:

1. `JDK1.6以上 + Maven+ RabbitMQ`

5.2 源代码

完整代码代码请戳:github:https://github.com/chadwick521/rabbitmqdemo

其中 Server 的代码在:

1. `rpc.RPCServer`

Client 端的代码位置:

1. `rpc.RPCClient`

以上内容就是关于基于消息中间件 RabbitMQ 实现简单的 RPC 服务的全部内容了,谢谢你阅读到了这里!



原文:https://zhuanlan.zhihu.com/p/136532347


每日更新互联网大厂技术
加薇薇老师了解更多技术前沿和定制规划职业路线