vlambda博客
学习文章列表

实现集成Kafka与Storm结合案例--文末送书

点击卡片进入数据仓库与Python大数据主页

然后点击右上角 “设为星标

比别人更快接收好文章



到目前为止,我们已经知道Storm的作用主要是进行流式计算,对于源源不断的均匀数据流流入处理是非常有效的,而现实生活中大部分场景并不是均匀的数据流,而是时而多时而少的数据流入,这种情况下显然用批量处理是不合适的,如果使用Storm做实时计算的话可能因为数据拥堵而导致服务器挂掉,应对这种情况,使用Kafka作为消息队列是非常合适的选择,Kafka可以将不均匀的数据转换成均匀的消息流,从而和Storm比较完善的结合,这样才可以实现稳定的流式计算,那么接下来开发一个简单的案例来实现StormKafka的结合。

  StormKafka结合,实质上无非是之前我们说过的计算模式结合起来,就是数据先进入Kafka生产者,然后Storm作为消费者进行消费,最后将消费后的数据输出或者保存到文件、数据库、分布式存储等等,具体框如下图1所示:

实现集成Kafka与Storm结合案例--文末送书

                   图1    集成KafkaStorm的框架


 Storm从Kafka中接收数据

  为了实现Kafka与Storm的集成,我们需要添加以下的依赖信息。

01 <dependency>02 <groupId>org.apache.kafka</groupId>03 <artifactId>kafka_2.9.2</artifactId>04 <version>0.8.2.2</version>05 <exclusions>06 <exclusion>07 <groupId>org.apache.zookeeper</groupId>08 <artifactId>zookeeper</artifactId>09 </exclusion>10 <exclusion>11 <groupId>log4j</groupId>12 <artifactId>log4j</artifactId>13 </exclusion>14 </exclusions>15 </dependency>16 <dependency>17 <groupId>org.apache.storm</groupId>18 <artifactId>storm-kafka</artifactId>19 <version>1.0.3</version>20   </dependency>

   修改之前的WordCountTopology主程序,创建一个新的KafkaSpout组件,并将其作为WordCountTopology任务的输入。这KafkaSpout将从Kafka中接收消息,作为Kafka消息的Consumer使用。下面是创建这个KafkaSpout组件的核心代码程序。

01 //支持从Kakfa消息系统中读取数据02 private static KafkaSpout createKafkaSpout() {03 //指定ZooKeeper的地址信息04 BrokerHosts brokerHosts = new ZkHosts("kafka101:2181");05 06 //创建KafkaSpout的配置信息。07 SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, 08 "mytopic1", "/mytopic1", 09 UUID.randomUUID().toString());10 spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());11 spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();12 //返回一个KafkaSpout13 return new KafkaSpout(spoutConfig);14   }
conf.startOffsetTime =   kafka.api.OffsetRequest.LatestTime();

   另一方面,由于应用程序将从Kafka中接收消息,由于Kafka发送来的消息中并没有“sentence”字段,所以需要修改一下WordCountSplitBolt的代码,如下:

01 //进行单词拆分02 public class WordCountSplitBolt extends BaseRichBolt{03 04 private OutputCollector collector;05 06 @Override07 public void execute(Tuple tuple) {08 //如何处理上一级组件发来的Tuple09 //取出数据: I love Beijing10 //String data = tuple.getStringByField("sentence"); 11 String data = tuple.getString(0);12 13 String[] words = data.split(" ");14 15 for(String w:words) { //k2 v216 this.collector.emit(new Values(w,1));17 }18 }19 20 @Override21 public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {22 // OutputCollector该级组件的输出流23 this.collector = collector;24 }25 26 @Override27 public void declareOutputFields(OutputFieldsDeclarer declare) {28 declare.declare(new Fields("word","count"));29 }30   }

注意:这里的第10行代码和第11行代码的区别。

为了测试的方便,我们将WordCountTopology任务运行在本地模式下。下面为大家展示了完整的代码程序。

01 import org.apache.storm.Config;02 import org.apache.storm.LocalCluster;03 import org.apache.storm.StormSubmitter;04 import org.apache.storm.generated.AlreadyAliveException;05 import org.apache.storm.generated.AuthorizationException;06 import org.apache.storm.generated.InvalidTopologyException;07 import org.apache.storm.generated.StormTopology;08 import org.apache.storm.kafka.BrokerHosts;09 import org.apache.storm.kafka.KafkaSpout;10 import org.apache.storm.kafka.SpoutConfig;11 import org.apache.storm.kafka.StringScheme;12 import org.apache.storm.kafka.ZkHosts;13 import org.apache.storm.spout.SchemeAsMultiScheme;14 import org.apache.storm.topology.IRichBolt;15 import org.apache.storm.topology.IRichSpout;16 import org.apache.storm.topology.TopologyBuilder;17 import org.apache.storm.tuple.Fields;18 import org.apache.storm.tuple.ITuple;19 20 public class WordCountTopology {21 22 public static void main(String[] args) {23 //创建一个Topology24 TopologyBuilder builder = new TopologyBuilder();25 26 //指定任务的spout组件27 //builder.setSpout("myspout", new WordCountSpout());28 builder.setSpout("myspout", createKafkaSpout());29 30 //指定拆分单词的bolt,是随机分组31 builder.setBolt("mysplit", 32 new WordCountSplitBolt())33 .shuffleGrouping("myspout");34 35 //指定单词计数的bolt36 builder.setBolt("mytotal", 37 new WordCountTotalBolt())38 .fieldsGrouping("mysplit", 39 new Fields("word"));40 41 //创建任务42 StormTopology topology = builder.createTopology();43 44 //配置参数 45 Config conf = new Config();46 47 //本地模式48 LocalCluster cluster = new LocalCluster();49 cluster.submitTopology("MyWC", conf, topology);50 }51 52 private static IRichSpout createKafkaSpout() {53 54 BrokerHosts zkHost = new ZkHosts("kafka101:2181");55 SpoutConfig conf = new SpoutConfig(zkHost, 56 "mytopic1", 57 "/mytopic1", 58 "mygroupID");59 60 //指定反序列化机制61 conf.scheme = new SchemeAsMultiScheme(new StringScheme());62 63 conf.startOffsetTime = kafka.api.OffsetRequest.LatestTime();64 65 return new KafkaSpout(conf);66 }67   }



 测试Kafka与Storm的集成

(1)  在kafka101、kafka102和kafka103上启动ZooKeeper集群,执行下面的命令 :    

zkServer.sh start

(2)   启动Kafka集群

(3)  执行下面的命令,启动Kafka Producer Console

bin/kafka-console-producer.sh --broker-list kafka101:9092--topic mytopic1

(4)  启动Storm应用程序,如下图2所示:


实现集成Kafka与Storm结合案例--文末送书

图2  启动Storm 应用程序

(5)在Kafka Producer Console输入一些字符串,并回车发送。观察Storm应用程序的结果。可以看到当在Kafka Producer Console上输入了数据,在Storm的应用程序中将实时接收消息,并处理消息。如下图3所示:

实现集成Kafka与Storm结合案例--文末送书

                 图3    Storm 应用程序接收Kafka的消息



 Storm将数据输出到Kafka

在前面的例子中,我们集成了Storm和Kafka。将Kafka作为Storm的Spout,Storm将从Kafka中接收数据,并处理数据。其实还有另一种情况,就是Storm处理完成后,也可以将数据输出到Kafka中,下图描述了这一过程。

实现集成Kafka与Storm结合案例--文末送书

图4 Storm 应用程序发送消息到Kafka


在了解上面的过程后,我们可以改造一下之前的Topology主程序的代码,创建一个新的Bolt任务,将任务处理完成的数据输出到Kafka中,下面列出了改造后的代码程序。


01 private static IRichBolt createKafkaBolt() {02 Properties props = new Properties();03 //broker的地址04 props.put("bootstrap.servers", "kafka101:9092");05 //说明了使用何种序列化方式将用户提供的key和vaule值序列化成字节。06 props.put("key.serializer", 07 "org.apache.kafka.common.serialization.StringSerializer");08 props.put("value.serializer", 09 "org.apache.kafka.common.serialization.StringSerializer");10 11 props.put("acks", "1");
13 KafkaBolt<String, String> bolt = new KafkaBolt<String, String>();14 15 //指定Kafka的配置信息16 bolt.withProducerProperties(props);17 18 //指定Topic的名字19 bolt.withTopicSelector(new DefaultTopicSelector("mytopic1"));20 21 //指定将上一级Bolt处理完成后的Key和Value//22 //KafkaBolt将会按照这里指定的Key和Value将数据发送到Kakfa23 bolt.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper24 <String, String>("word","total"));25 26 return bolt;27   }


创建好了新的Bolt组件后,可以将其添加进入Topology任务中,下面列出了改造后的主程序代码。

public static void main(String[] args) {02 //创建一个Topology03 TopologyBuilder builder = new TopologyBuilder();04 05 //指定任务的spout组件06 //builder.setSpout("myspout", new WordCountSpout());07 builder.setSpout("myspout", createKafkaSpout());08 09 //指定拆分单词的bolt,是随机分组10 builder.setBolt("mysplit", 11 new WordCountSplitBolt())12 .shuffleGrouping("myspout");13 14 //指定单词计数的bolt15 builder.setBolt("mytotal", 16 new WordCountTotalBolt())17 .fieldsGrouping("mysplit", 18 new Fields("word"));19 20 //创建KafkaBolt,将结果发送到Kafka中21 builder.setBolt("kafkabolt", createKafkaBolt())22 .shuffleGrouping("mytotal");23 24 //创建任务25 StormTopology topology = builder.createTopology();26 27 //配置参数 28 Config conf = new Config();29 30 //本地模式31 LocalCluster cluster = new LocalCluster();32 cluster.submitTopology("MyWC", conf, topology);}

注意:这里的第21行代码和第22行代码,我们使用了之前的createKafkaBolt方法,将KafkaBolt任务添加进了Topology任务中。

              本文选自电子工业出版社的《kafka进阶》一书,略有修改,经出版社授权刊登于此。


赠书福利


实现集成Kafka与Storm结合案例--文末送书

      本书基于作者多年的教学与实践进行编写,重点介绍Kafka消息系统的核心原理与架构,内容涉及开发、运维、管理与架构。全书共11章,第1章,介绍Kafka体系架构基础,包括消息系统的基本知识、Kafka的体系架构与ZooKeeper;第2章,介绍Kafka的环境部署,以及基本的应用程序开发;第3章,介绍Kafka的生产者及其运行机制,包括生产者的创建和执行过程、生产者的消息发送模式和生产者的高级特性等;第4章,介绍Kafka的消费者及其运行机制,包括消费者的消费模式、消费者组与消费者、消费者的偏移量与提交及消费者的高级特性等;第5章,介绍Kafka服务器端的核心原理,包括主题与分区、消息的持久性与传输保障、Kafka配额与日志的管理;第6章,介绍Kafka的流处理引擎Kafka Stream;第7章,介绍使用不同的工具监控Kafka,包括Kafka Manager、Kafka Tool、KafkaOffsetMonitor和JConsole;第8章至第11章,介绍Kafka与外部系统的集成,包括集成Flink、集成Storm、集成Spark和集成Flume。


赠送规则: