【大数据开发】Spark Streaming 之ForeachRDD与外部集成案例(九)
9
Part
9
Spark Streaming 之ForeachRDD与外部集成案例
Ⅰ
ForeachRDD函数意义
Ⅱ
ForeachRDD与外部集成案例
ForeachRDD函数意义
在Foreach中,传入一个function,这个函数的传入参数就是每个partition中,每次的foreach得到的一个rdd的kv实例,也就是具体的内容。
ForeachRDD与外部集成案例
案例:统计nc服务输出的字符个数,写入mysql数据库。
//Javaimport kfk.spark.common.CommSparkContext;import kfk.spark.common.ConnectionPool;import org.apache.spark.api.java.Optional;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import scala.Tuple2;import java.sql.Connection;import java.sql.Statement;import java.util.Arrays;public class ForeachPersistMySQL {public static void main(String[] args) throws Exception{JavaStreamingContext jssc = CommSparkContext.getJssc();jssc.checkpoint("hdfs://bigdata-pro-m01.kfk.com:9000/user/kfk/datas/sparkCheckpoint");JavaReceiverInputDStream<String> lines =jssc.socketTextStream("bigdata-pro-m01.kfk.com",9999);JavaDStream<String> words = lines.flatMap(line ->Arrays.asList(line.split(" ")).iterator());JavaPairDStream<String,Integer> pair = words.mapToPair(word -> new Tuple2<>(word, 1));//通过spark来维护一份每个单词的全局统计次数JavaPairDStream<String,Integer> wordCount = pair.updateStateByKey((values , state) -> {Integer newValues = 0;if(state.isPresent()){newValues = state.get();}for (Integer value : values){newValues += value ;}return Optional.of(newValues);});wordCount.foreachRDD(x -> x.foreachPartition(tuple -> {Tuple2<String,Integer> wordcount = null ;Connection conn = ConnectionPool.getConnection() ;while (tuple.hasNext()){wordcount = tuple.next();String sql = "insert into spark.wordcount(word,count) " +" values('"+wordcount._1+"', "+wordcount._2+") ";Statement stm = conn.createStatement();stm.executeUpdate(sql);}ConnectionPool.returnConnection(conn);}));jssc.start();jssc.awaitTermination();jssc.close();}}
nc输入字符:
输出结果:
