一文解读消息中间件RabbitMQ实现简单的RPC服务(图文+源码)
RPC(Remote Procedure Call, 远程过程调用),是一种计算机通信协议。对于两台机器而言,就是 A 服务器上的应用程序调用 B 服务器上的函数或者方法,由于不在同一个内存空间或机器上运行,因此需要借助于网络通信。
1. RPC 框架
我们首先通过一张图理解 RPC 的工作流程:
因此,实现一个最简单的 RPC 服务,只需要 Client、Server 和 Network,本文就是利用消息中间件 RabbitMQ 作为 Network 载体传输信息,实现简单的 RPC 服务。简单原理可如下图所示:
即:当 Client 发送 RPC 请求时,Client 端是消息生产者,Server 端是消息消费者;当 Server 返回结果时,Server 端是消息生产者,Client 是消息消费者;发送和返回使用不同的队列。
接下来我们通过代码,详细展示一个计算斐波那契数列的 RPC 服务。
2. RPCServer 实现
2.1 Server 初始化
1. `/**`
2. `* 队列名、交换机名、路由键`
3. `*/`
4. `private static final String EXCHANGE_NAME = "rpc_exchange";`
5. `private static final String QUEUE_NAME = "request_rpc_queue";`
6. `private static final String ROUTING_KEY = "rpc_routing_key";`
7. `private Connection connection = null;`
8. `private Channel channel = null;`
9. `private QueueingConsumer consumer = null;`
10. `/**`
11. `* Server的构造函数`
12. `*/`
13. `private RPCServer() {`
14. `try {`
15. `//创建链接`
16. `ConnectionFactory factory = new ConnectionFactory();`
17. `factory.setHost(Config.HOST);`
18. `factory.setPort(Config.PORT);`
19. `factory.setUsername(Config.USER);`
20. `factory.setPassword(Config.PASSWORD);`
21. `connection = factory.newConnection();`
22. `//创建信道`
23. `channel = connection.createChannel();`
24. `//设置AMQP的通信结构`
25. `channel.exchangeDeclare(EXCHANGE_NAME, "direct");`
26. `channel.queueDeclare(QUEUE_NAME, false, false, false, null);`
27. `channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);`
28. `//设置消费者`
29. `consumer = new QueueingConsumer(channel);`
30. `channel.basicConsume(QUEUE_NAME, false, QUEUE_NAME, consumer);`
31. `} catch (Exception e) {`
32. `LOG.error("build connection failed!", e);`
33. `}`
34. `}`
初始化就是声明 RabbitMQ 的链接工厂、链接、信道、队列、交换机等等,并做了绑定,由此构成了 AMQP 的通信结构。
2.2 监听队列并反馈
1. `/**`
2. `* 开启server`
3. `*/`
4. `private void startServer() {`
5. `try {`
6. `LOG.info("Waiting for RPC calls.....");`
7. `while (true) {`
8. `//获得文本消息`
9. `QueueingConsumer.Delivery delivery = consumer.nextDelivery();`
10. `BasicProperties props = delivery.getProperties();`
11. `//返回消息的属性`
12. `BasicProperties replyProps = new BasicProperties.Builder()`
13. `.correlationId(props.getCorrelationId())`
14. `.build();`
15. `long receiveTime = System.currentTimeMillis();`
16. `JSONObject json = new JSONObject();`
17. `try {`
18. `String message = new String(delivery.getBody(), "UTF-8");`
19. `int n = Integer.parseInt(message);`
20. `LOG.info("Got a request: fib(" + message + ")");`
21. `json.put("status", "success");`
22. `json.put("result", fib(n));`
23. `} catch (Exception e) {`
24. `json.put("status", "fail");`
25. `json.put("reason", "Not a Number!");`
26. `LOG.error("receive message failed!", e);`
27. `} finally {`
28. `long responseTime = System.currentTimeMillis();`
29. `json.put("calculateTime", (responseTime - receiveTime));`
30. `channel.basicPublish("", props.getReplyTo(), replyProps, json.toString().getBytes("UTF-8"));`
31. `channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);`
32. `}`
33. `}`
34. `} catch (Exception e) {`
35. `LOG.error("server failed!", e);`
36. `} finally {`
37. `if (connection != null) {`
38. `try {`
39. `connection.close();`
40. `} catch (Exception e) {`
41. `LOG.error("close failed!", e);`
42. `}`
43. `}`
44. `}`
45. `}`
在该方法中使用了一个无限循环,每次处理一条消息。通过调用消费者对象的 nextDelivery 方法来获得 RabbitMQ 队列的最新一条消息。同时通过 getProperties 获取到消息中的反馈信息属性,用于标记客户端 Client 的属性。然后计算斐波那契数列的结果。
最后通过 basicAck 使用消息信封向 RabbitMQ 确认了该消息。
到这里就实现了计算斐波那契数列 RPC 服务的 Server 端。
3. RPCClient 实现
3.1 初始化 CLient
1. `/**`
2. `* 消息请求的队列名、交换机名、路由键`
3. `*/`
4. `private static final String EXCHANGE_NAME = "rpc_exchange";`
5. `private static final String QUEUE_NAME = "request_rpc_queue";`
6. `private static final String ROUTING_KEY = "rpc_routing_key";`
7. `/**`
8. `* 消息返回的队列名、交换机名、路由键`
9. `*/`
10. `private static final String RESPONSE_QUEUE = "response_rpc_queue";`
11. `private static final String RESPONSE_ROUTING_KEY = "response_rpc_routing_key";`
12. `/**`
13. `* RabbitMQ的实体`
14. `*/`
15. `private Connection connection = null;`
16. `private Channel channel = null;`
17. `private QueueingConsumer consumer = null;`
18. `/**`
19. `* 构造客户端`
20. `* @throws Exception`
21. `*/`
22. `private RPCClient() throws Exception {`
23. `ConnectionFactory factory = new ConnectionFactory();`
24. `factory.setHost(Config.HOST);`
25. `factory.setPort(Config.PORT);`
26. `factory.setUsername(Config.USER);`
27. `factory.setPassword(Config.PASSWORD);`
28. `connection = factory.newConnection();`
29. `channel = connection.createChannel();`
30. `channel.exchangeDeclare(EXCHANGE_NAME, "direct");`
31. `channel.queueDeclare(QUEUE_NAME, false, false, false, null);`
32. `channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);`
33. `channel.queueDeclare(RESPONSE_QUEUE, false, false, false, null);`
34. `channel.queueBind(RESPONSE_QUEUE, EXCHANGE_NAME, RESPONSE_ROUTING_KEY);`
35. `consumer = new QueueingConsumer(channel);`
36. `channel.basicConsume(RESPONSE_QUEUE, true, consumer);`
37. `}`
这里声明 AMQP 结构体的方式和 Server 端类似,只不过 Client 端需要多声明一个队列,用于 RPC 的 response。
3.2 发送 / 接收消息
1. `/**`
2. `* 请求server`
3. `* @param message`
4. `* @return`
5. `* @throws Exception`
6. `*/`
7. `private String requestMessage(String message) throws Exception {`
8. `String response = null;`
9. `String corrId = UUID.randomUUID().toString();`
10. `BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(RESPONSE_QUEUE).build();`
11. `channel.basicPublish("", QUEUE_NAME, props, message.getBytes("UTF-8"));`
12. `while (true) {`
13. `QueueingConsumer.Delivery delivery = consumer.nextDelivery();`
14. `if (delivery.getProperties().getCorrelationId().equals(corrId)) {`
15. `response = new String(delivery.getBody(),"UTF-8");`
16. `break;`
17. `}`
18. `}`
19. `return response;`
20. `}`
BasicProperties 用于存储你请求消息的属性,这里我设置了 correlationId 和 replyTo 属性,用于 Server 端的返回识别。
4. 运行测试
Client 端发送:
Server 端接收并处理:
Client 收到计算结果:
由于我运行 RabbitMQ 的服务器是租用的阿里云的,差不多传输时延在 60ms 左右,如果把 RPC 服务和消息中间件同机房部署的话延时基本上就在 ms 级别。
5. FAQ
5.1 说明
需要体验完整的过程,你需要如下环境:
1. `JDK1.6以上 + Maven+ RabbitMQ`
5.2 源代码
完整代码代码请戳:github:https://github.com/chadwick521/rabbitmqdemo
其中 Server 的代码在:
1. `rpc.RPCServer`
Client 端的代码位置:
1. `rpc.RPCClient`
以上内容就是关于基于消息中间件 RabbitMQ 实现简单的 RPC 服务的全部内容了,谢谢你阅读到了这里!
原文:https://zhuanlan.zhihu.com/p/136532347