Direct exchange模拟日志系统
我们希望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志消息避免浪费磁盘空间。
消息的发送方
private static final String EXCHANGE_NAME="direct_logs";
public static void main(String[] args) throws Exception {
//1.获取信道
Channel channel = RabbitMqUtil.getChannel();
//2.声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//创建多个 bindingKey
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("info","普通 info 信息");
bindingKeyMap.put("warning","警告 warning 信息");
bindingKeyMap.put("error","错误 error 信息");
//debug 没有消费这接收这个消息 所有就丢失了
bindingKeyMap.put("debug","调试 debug 信息");
Set<Map.Entry<String, String>> entrySet = bindingKeyMap.entrySet();
for (Map.Entry<String, String> entry:entrySet) {
String bindingKey = entry.getKey();
String message=entry.getValue();
channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes());
System.out.println(message);
}
}
消息接收方info和warning
private static final String EXCHANGE_NAME="direct_logs";
public static void main(String[] args) throws Exception {
//1.获取信道
Channel channel = RabbitMqUtil.getChannel();
//2.声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//3.声明队列
String queueName="console";
channel.queueDeclare(queueName, false, false, false, null);
//4.绑定
channel.queueBind(queueName, EXCHANGE_NAME, "info");
channel.queueBind(queueName, EXCHANGE_NAME, "warning");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("控制台打印接收到的消息:"+message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消息消费被中断");
};
//5.消费消息
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}
消息的接收方error
private static final String EXCHANGE_NAME="direct_logs";
public static void main(String[] args) throws Exception {
//1.获取信道
Channel channel = RabbitMqUtil.getChannel();
//2.声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//3.声明队列
String queueName="disk";
channel.queueDeclare(queueName, false, false, false, null);
//4.绑定
channel.queueBind(queueName, EXCHANGE_NAME, "error");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("控制台打印接收到的消息:"+message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消息消费被中断");
};
//5.消费消息
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}