vlambda博客
学习文章列表

Direct exchange模拟日志系统

我们希望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志消息避免浪费磁盘空间。



消息的发送方

private static final String EXCHANGE_NAME="direct_logs"; public static void main(String[] args) throws Exception { //1.获取信道 Channel channel = RabbitMqUtil.getChannel(); //2.声明一个交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //创建多个 bindingKey Map<String, String> bindingKeyMap = new HashMap<>(); bindingKeyMap.put("info","普通 info 信息"); bindingKeyMap.put("warning","警告 warning 信息"); bindingKeyMap.put("error","错误 error 信息"); //debug 没有消费这接收这个消息 所有就丢失了 bindingKeyMap.put("debug","调试 debug 信息");
Set<Map.Entry<String, String>> entrySet = bindingKeyMap.entrySet(); for (Map.Entry<String, String> entry:entrySet) { String bindingKey = entry.getKey(); String message=entry.getValue(); channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes()); System.out.println(message); } }


消息接收方infowarning

private static final String EXCHANGE_NAME="direct_logs";
public static void main(String[] args) throws Exception { //1.获取信道 Channel channel = RabbitMqUtil.getChannel(); //2.声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //3.声明队列 String queueName="console"; channel.queueDeclare(queueName, false, false, false, null); //4.绑定 channel.queueBind(queueName, EXCHANGE_NAME, "info"); channel.queueBind(queueName, EXCHANGE_NAME, "warning");
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("控制台打印接收到的消息:"+message); };
//取消消费的一个回调接口 如在消费的时候队列被删除掉了 CancelCallback cancelCallback=(consumerTag)->{ System.out.println("消息消费被中断");        }; //5.消费消息        channel.basicConsume(queueName, true, deliverCallback, cancelCallback); }


消息的接收方error

private static final String EXCHANGE_NAME="direct_logs";
public static void main(String[] args) throws Exception { //1.获取信道 Channel channel = RabbitMqUtil.getChannel(); //2.声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //3.声明队列 String queueName="disk"; channel.queueDeclare(queueName, false, false, false, null); //4.绑定 channel.queueBind(queueName, EXCHANGE_NAME, "error");

DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("控制台打印接收到的消息:"+message); };
//取消消费的一个回调接口 如在消费的时候队列被删除掉了 CancelCallback cancelCallback=(consumerTag)->{ System.out.println("消息消费被中断"); };
//5.消费消息 channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}