vlambda博客
学习文章列表

物联网架构成长之路(49)-SpringBoot集成KafKa中间件

0.前言

  今天(2020-02-24)是开工的第一天,来到公司后,服务器出现问题,网管正在处理。没有服务器的后端,就像没有武器的剑客。没办法进行开发,就看看资料学习一点技术。
  疫情期间,虽然没有上班,但是自己的物联网平台还是在慢慢的优化中。下面这个图是规划后的V2版本架构图。

  架构图里面用到Kafka中间件,是作为数据流来处理。由于MQTT(EMQ)无法进行数据的持久化,所以需要引入Kafka来实现处理。EMQ用来保证通信的实时性和高效性。Kafka利用消息队列特性用来进行非实时与离线处理。
  比如架构图所示,可以利用EMQ的Kafka插件或者订阅MQTT根Topic的方式,把通信内容按照规则发往Kafka,作为生产者。而后面的支付服务,离线大数据处理服务,数据存储服务等,作为消费者。

物联网架构成长之路(49)-SpringBoot集成KafKa中间件


1. 利用Docker-Compose搭建kafka

  docker-compose.yml

version: '3'services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka: image: "wurstmeister/kafka:2.12-2.4.0" ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: 192.168.0.106 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: - /var/run/docker.sock:/var/run/docker.sock - /root/workspace/kafka/data:/kafka - /etc/localtime:/etc/localtime kafka-manager: image: "sheepkiller/kafka-manager" restart: always container_name: kafka-manger ports: - "9091:9000" links: - zookeeper - kafka environment: ZK_HOSTS: zookeeper:2181 KAFKA_BROKERS: kafka:9092 KM_ARGS: -Djava.net.preferIPv4Stack=true KM_USERNAME: admin KM_PASSWORD: admin

  下面是一些基础操作

#进入容器docker exec -it ${CONTAINER ID} /bin/bash #进入目录cd opt/kafka_2.11-0.10.1.1#创建Topicbin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test#查询Topicbin/kafka-topics.sh --list --bootstrap-server localhost:9092
#运行一个生产者bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test>This is Message
#运行一个消费者bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

物联网架构成长之路(49)-SpringBoot集成KafKa中间件

 

2. Kafka Manager界面
  访问192.168.0.106:9091,这个界面就是Kafka Manager管理界面,我们增加一个Cluster。然后可以查看对应的kafka信息
物联网架构成长之路(49)-SpringBoot集成KafKa中间件

 

3. SpringBoot集成Kafka

3.1 pom.xml

        <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> </dependency>

3.2 KafkaController.java

package com.wunaozai.demo.kafka;
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;
@RestController@RequestMapping(value="/kafka")public class KafkaController {
@Autowired private KafkaSender kafkaSender; @RequestMapping(value="/send") public String send(String msg) { boolean flag = kafkaSender.send(msg); return flag + ""; }}

3.3 KafkaMessage.java

package com.wunaozai.demo.kafka;
import java.sql.Timestamp;
public class KafkaMessage { private Long id; private String msg; private Timestamp ts; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } public Timestamp getTs() { return ts; } public void setTs(Timestamp ts) { this.ts = ts; } }

3.4 KafkaReceiver.java

package com.wunaozai.demo.kafka;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;
@Componentpublic class KafkaReceiver {
private static final Logger log = LoggerFactory.getLogger(KafkaReceiver.class);
@KafkaListener(topics= {"iot"}) public void listen(ConsumerRecord<?, ?> record) { Optional<?> message = Optional.ofNullable(record.value()); if(message.isPresent()) { Object msg = message.get(); log.info("record : " + record); log.info("message : " + msg); } }}

3.5 KafkaSender.java

package com.wunaozai.demo.kafka;
import java.sql.Timestamp;
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;
import com.google.gson.Gson;import com.google.gson.GsonBuilder;
@Componentpublic class KafkaSender {
@Autowired private KafkaTemplate<String, String> kafkaTemplate; private Gson gson = new GsonBuilder().create(); public boolean send(String msg) { KafkaMessage message = new KafkaMessage(); message.setId(System.currentTimeMillis()); message.setTs(new Timestamp(System.currentTimeMillis())); message.setMsg(msg); kafkaTemplate.send("iot", gson.toJson(message)); return true; }}

3.6 application.properties

#指定kafka 代理地址,可多个spring.kafka.bootstrap-servers=192.168.0.106:9092
#providerspring.kafka.producer.retries=0#每次批量发送消息的数量spring.kafka.producer.batch-size=16384spring.kafka.producer.buffer-memory=33554432#指定消息key和消息体body的编解码方式spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#consumer# 指定默认消费者group idspring.kafka.consumer.group-id=0spring.kafka.consumer.auto-offset-reset=earliestspring.kafka.consumer.enable-auto-commit=truespring.kafka.consumer.auto-commit-interval=100# 指定消息key和消息体的编解码方式spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

 运行效果图

物联网架构成长之路(49)-SpringBoot集成KafKa中间件

  经过测试,通过java方式与console方式都是可以互相通信。

  假设在KafkaReceiver.java 第23行增加 Thread.sleep(10000); 用来模拟实际业务延时。类似抢票或者秒杀的应用场景。突然高峰,然后把所有订单数据都放到Kafka,然后在慢慢消费。生成实际业务订单,再通知用户付款。

物联网架构成长之路(49)-SpringBoot集成KafKa中间件

  按照架构设计,平台部分会订阅EMQ的Topic,过滤部分数据或者完整数据报都发往Kafka,然后让Kafka后面的消费者根据需要自己进行消费。比如对所有 iot/product-uuid/device-uuid/device/+/property 所有属性日志相关的消息都通过Kafka的property这个Topic发送。消费者订阅property后,就可以消费。至于消费后可以存入influxdb进行持久化,也可以预处理后显示。MQTT设计中还有一类是 iot/product-uuid/device-uuid/device/+/event 事件类。这些相关的消息,会发往Kafka的event主题。然后由后面的消费者来消费event。进而进行报警等处理。

参考资料:
  http://kafka.apachecn.org/documentation.html#operations
  https://hub.docker.com/r/wurstmeister/kafka