vlambda博客
学习文章列表

通过rabbitmq将logback日志用websocket显示到前台 _

概述

这回想的是把后台的日志实时显示到前端页面上。有时候我们是会有这种需求,不需要通过服务器控制台查看日志,或者将日志存入ES中进行进一步分析等等。我查了一些资料,逐渐形成了自己的一个想法,就是利用rabbitmq,后台每打一条日志,就同时往消息队列里发一条消息,消息本身包含这条日志的内容。然后,消费者将消息通过websocket协议发送到前台进行显示。

经过查询资料发现,实际上spring对logback日志包已经做了amqp的实现。

后端

首先写这个logback配置文件,这个配置文件写好以后,每打一条日志就会往消息队列发一条消息:

<?xml version="1.0" encoding="UTF-8"?>
<configuration>

    <!--rabbitmq配置读取-->
    <springProperty name="rabbitmqHost" source="spring.rabbitmq.host"/>
    <springProperty name="rabbitmqPort" source="spring.rabbitmq.port"/>
    <springProperty name="rabbitmqUsername" source="spring.rabbitmq.username"/>
    <springProperty name="rabbitmqPassword" source="spring.rabbitmq.password"/>

    <!-- 控制台日志(颜色、编码) -->
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%yellow(%d{yyyy-MM-dd HH:mm:ss}) %green([%thread]) [%class:%line] %highlight(%-5level) - %cyan(%msg%n)
            </pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

<!--    amqp的相关配置,会自动发到队列里-->
    <appender name="AMQP" class="org.springframework.amqp.rabbit.logback.AmqpAppender">

        <!--Layout(纯文本)而不是格式化的JSON -->
        <layout>
            <pattern>
                <![CDATA[%d{yyyy-MM-dd HH:mm:ss} [%thread] [%class:%line] %-5level %logger - %msg%n ]]>
            </pattern>
        </layout>
        <host>${rabbitmqHost}</host>
        <port>${rabbitmqPort}</port>
        <username>${rabbitmqUsername}</username>
        <password>${rabbitmqPassword}</password>

        <declareExchange>false</declareExchange>
        <exchangeType>direct</exchangeType>
        <exchangeName>log_exchange</exchangeName>
        <routingKeyPattern>log_exchange</routingKeyPattern>
        <generateId>true</generateId>
        <charset>UTF-8</charset>
        <durable>false</durable>
        <deliveryMode>NON_PERSISTENT</deliveryMode>
    </appender>

    <root level="INFO">
        <appender-ref ref="STDOUT"/>
        <appender-ref ref="AMQP"/>
    </root>
</configuration>

另外,虽然说这么安排消费者,可以把日志实时安排到前台,但是由于消费的过程是源源不断的,监控页面关闭的时候消费也会继续进行,所以仍然无法监控到监控页面打开之外时间的日志。此时需要将mq监听器手动的打开和关闭,当进入监控页面的时候,mq监听器打开,然后队列中积累的所有消息进行消费,之后再实时消费。关闭监控页面的时候,将mq监听器关闭,于是此后产生的消息都积累在消息队列中,等待下一次消费:

import com.markerhub.config.RabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;

/**
 * logback日志内容塞到rabbitmq队列里
 * @author mingchiuli
 * @create 2022-01-03 8:56 PM
 */

@Slf4j
@Component
public class LogMessageHandler {

    SimpMessagingTemplate simpMessagingTemplate;

    @Autowired
    public void setSimpMessagingTemplate(SimpMessagingTemplate simpMessagingTemplate) {
        this.simpMessagingTemplate = simpMessagingTemplate;
    }

    @RabbitListener(id = "log", queues = RabbitConfig.LOG_QUEUE, autoStartup = "false"//第二次启动的配置
//    @RabbitListener(id = "log", queues = RabbitConfig.LOG_QUEUE) 第一次启动用这个配置
    public void processMessage(String msg) {
        simpMessagingTemplate.convertAndSend("/logs/log", msg);
    }
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;


/**
 * @author mingchiuli
 * @create 2022-01-04 4:35 PM
 */

@Controller
@Slf4j
public class LogController {

    RabbitListenerEndpointRegistry registry;

    @Autowired
    public void setRegistry(RabbitListenerEndpointRegistry registry) {
        this.registry = registry;
    }

    @MessageMapping("/startMQ")
    public void start() {
        MessageListenerContainer logContainer = registry.getListenerContainer("log");

        if (!logContainer.isRunning()) {
            logContainer.start();
        }

        log.info("消息队列日志监听器已开启");
    }

    @MessageMapping("/stopMQ")
    public void stop() {
        MessageListenerContainer logContainer = registry.getListenerContainer("log");

        if (logContainer.isRunning()) {
            logContainer.stop();
        }

        log.info("消息队列日志监听器已关闭");
    }
}

前端

前端还是stompjs和sockjs。Vue通过href去跳转页面的时候,是不执行destoryed钩子的,所以还是尽量不要用href。这里跳转离开监控页面的时候,destoryed钩子是必须执行的,否则rabbitmq监听器没有正常关闭,消费者还会继续消费,从而丢失消息。

<template>
  <el-card class="box-card">
    <div slot="header" class="clearfix">
      <span style="font-size: large">Spring-logback后台日志</span>
      <el-button id="tag-copy" :data-clipboard-text="msg" style="float: right; padding: 0.5% 0" type="text" @click="copy($event,msg)">复制日志</el-button>
    </div>
    <div v-html="msg" class="text-item">
      {{ msg }}
    </div>
  </el-card>
</template>

<script>

let stompClient

import Clipboard from 'clipboard';
import SockJS from 'sockjs-client';
import {Client} from '@stomp/stompjs';

export default {
  name"SystemLogs",
  data() {
    return {
      msg''
    }
  },

  created() {
    this.initWebSocket();
  },

  methods: {

    copy() {
      const clipboard = new Clipboard("#tag-copy")
      clipboard.on('success', e => {
        this.$message({ type'success'message'复制成功' })
        // 释放内存
        clipboard.destroy()
      })
      clipboard.on('error', e => {
        // 不支持复制
        this.$message({ type'warning'message'该浏览器不支持自动复制' })
        // 释放内存
        clipboard.destroy()
      })
    },


    initWebSocket() {
      this.connectWebSocket();

      this.webSocketTimer = setInterval(() => {
        if (!stompClient.connected) {
          console.log("websocket重连中 ...");
          this.connectWebSocket();
        }
      }, 10000);
    },

    connectWebSocket() {
      const _this = this
      stompClient = new Client({
        connectHeaders: {"Authorization": sessionStorage.getItem("myToken")},
        debugfunction (str{
          //debug日志,调试时候开启
          console.log(str);
        },
        reconnectDelay10000,//重连时间
        heartbeatIncoming4000,
        heartbeatOutgoing4000,
      });

      stompClient.webSocketFactory = function ({
        //因为服务端监听的是/sysLog路径下面的请求,所以跟服务端保持一致
        // return new SockJS("http://127.0.0.1:8081" + '/sysLog', null, {
          return new SockJS("http://119.91.233.182:8081" + '/sysLog'null, {
          timeout10000
        });
      };


      stompClient.onConnect = (frame) => {

        stompClient.publish({
          destination'/app/startMQ'
        })

        stompClient.subscribe('/logs/log', (res) => {

          _this.msg += res.body + '<br>'

        });
      };


      stompClient.onStompError = function (frame{
        // Will be invoked in case of error encountered at Broker
        // Bad login/passcode typically will cause an error
        // Complaint brokers will set `message` header with a brief message. Body may contain details.
        // Compliant brokers will terminate the connection after any error
        console.log('Broker reported error: ' + frame.headers['message']);
        console.log('Additional details: ' + frame.body);
      };


      stompClient.activate();
    },

    disconnectWebSocket() {
      if (stompClient !== null) {

        stompClient.publish({
          destination'/app/stopMQ'
        })

        stompClient.deactivate()
      }
    },

  },

  destroyedfunction ({
    clearInterval(this.webSocketTimer);
    this.disconnectWebSocket();
  },


}
</script>

<style scoped>

</style>