vlambda博客
学习文章列表

【大数据开发】Spark Streaming 之ForeachRDD与外部集成案例(九)

9


Part

9

Spark Streaming 之ForeachRDD与外部集成案例

ForeachRDD函数意义

ForeachRDD与外部集成案例



ForeachRDD函数意义

 在Foreach中,传入一个function,这个函数的传入参数就是每个partition中,每次的foreach得到的一个rdd的kv实例,也就是具体的内容。



ForeachRDD与外部集成案例

 案例:统计nc服务输出的字符个数,写入mysql数据库。

//Java
import kfk.spark.common.CommSparkContext;import kfk.spark.common.ConnectionPool;import org.apache.spark.api.java.Optional;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import scala.Tuple2;
import java.sql.Connection;import java.sql.Statement;import java.util.Arrays;
public class ForeachPersistMySQL { public static void main(String[] args) throws Exception{ JavaStreamingContext jssc = CommSparkContext.getJssc(); jssc.checkpoint("hdfs://bigdata-pro-m01.kfk.com:9000/user/kfk/datas/sparkCheckpoint"); JavaReceiverInputDStream<String> lines = jssc.socketTextStream("bigdata-pro-m01.kfk.com",9999); JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaPairDStream<String,Integer> pair = words.mapToPair(word -> new Tuple2<>(word, 1));
//通过spark来维护一份每个单词的全局统计次数 JavaPairDStream<String,Integer> wordCount = pair.updateStateByKey((values , state) -> { Integer newValues = 0; if(state.isPresent()){ newValues = state.get(); }
for (Integer value : values){ newValues += value ; } return Optional.of(newValues);});
wordCount.foreachRDD(x -> x.foreachPartition(tuple -> { Tuple2<String,Integer> wordcount = null ; Connection conn = ConnectionPool.getConnection() ; while (tuple.hasNext()){ wordcount = tuple.next(); String sql = "insert into spark.wordcount(word,count) " + " values('"+wordcount._1+"', "+wordcount._2+") ";
Statement stm = conn.createStatement(); stm.executeUpdate(sql); } ConnectionPool.returnConnection(conn);
}));
jssc.start();
jssc.awaitTermination(); jssc.close();
}
}

  nc输入字符:

【大数据开发】Spark Streaming 之ForeachRDD与外部集成案例(九)


输出结果: