【大数据开发】Spark Streaming 之ForeachRDD与外部集成案例(九)
9
Part
9
Spark Streaming 之ForeachRDD与外部集成案例
Ⅰ
ForeachRDD函数意义
Ⅱ
ForeachRDD与外部集成案例
ForeachRDD函数意义
在Foreach中,传入一个function,这个函数的传入参数就是每个partition中,每次的foreach得到的一个rdd的kv实例,也就是具体的内容。
ForeachRDD与外部集成案例
案例:统计nc服务输出的字符个数,写入mysql数据库。
//Java
import kfk.spark.common.CommSparkContext;
import kfk.spark.common.ConnectionPool;
import org.apache.spark.api.java.Optional;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.sql.Connection;
import java.sql.Statement;
import java.util.Arrays;
public class ForeachPersistMySQL {
public static void main(String[] args) throws Exception{
JavaStreamingContext jssc = CommSparkContext.getJssc();
jssc.checkpoint("hdfs://bigdata-pro-m01.kfk.com:9000/user/kfk/datas/sparkCheckpoint");
JavaReceiverInputDStream<String> lines =
jssc.socketTextStream("bigdata-pro-m01.kfk.com",9999);
JavaDStream<String> words = lines.flatMap(line ->
Arrays.asList(line.split(" ")).iterator());
JavaPairDStream<String,Integer> pair = words.mapToPair(word -> new Tuple2<>(word, 1));
//通过spark来维护一份每个单词的全局统计次数
JavaPairDStream<String,Integer> wordCount = pair.updateStateByKey((values , state) -> {
Integer newValues = 0;
if(state.isPresent()){
newValues = state.get();
}
for (Integer value : values){
newValues += value ;
}
return Optional.of(newValues);
});
wordCount.foreachRDD(x -> x.foreachPartition(tuple -> {
Tuple2<String,Integer> wordcount = null ;
Connection conn = ConnectionPool.getConnection() ;
while (tuple.hasNext()){
wordcount = tuple.next();
String sql = "insert into spark.wordcount(word,count) " +
" values('"+wordcount._1+"', "+wordcount._2+") ";
Statement stm = conn.createStatement();
stm.executeUpdate(sql);
}
ConnectionPool.returnConnection(conn);
}));
jssc.start();
jssc.awaitTermination();
jssc.close();
}
}
nc输入字符:
输出结果: