Flink 的三种WordCount(文末领取Flink书籍)
环境版本
JDK:1.8 Flink:1.13.6 Scala:2.12 github:https://github.com/xiaozhutec/FlinkProject1.13.6.git
基础配置
<properties>
<maven.compiler.source>8 </maven.compiler.source>
<maven.compiler.target>8 </maven.compiler.target>
<project.build.sourceEncoding>UTF-8 </project.build.sourceEncoding>
<flink.version>1.13.6 </flink.version>
<scala.version>2.12 </scala.version>
</properties>
<dependency>
<groupId>org.apache.flink </groupId>
<artifactId>flink-java </artifactId>
<version>${flink.version} </version>
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>org.apache.flink </groupId>
<artifactId>flink-streaming-java_${scala.version} </artifactId>
<version>${flink.version} </version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink </groupId>
<artifactId>flink-streaming-scala_${scala.binary.version} </artifactId>
<version>${flink.version} </version>
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>org.apache.flink </groupId>
<artifactId>flink-clients_${scala.binary.version} </artifactId>
<version>${flink.version} </version>
<!-- <scope>provided</scope>-->
</dependency>
<repositories>
<repository>
<id>central </id>
<name>aliyun maven </name>
<url>http://maven.aliyun.com/nexus/content/groups/public/ </url>
<layout>default </layout>
<!-- 是否开启发布版构件下载 -->
<releases>
<enabled>true </enabled>
</releases>
<!-- 是否开启快照版构件下载 -->
<snapshots>
<enabled>false </enabled>
</snapshots>
</repository>
</repositories>
Streaming WordCount
netcat
命令
nc -l {port}
来进行模仿数据产出。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String hostname = "127.0.0.1";
int port = 8899;
String delimiter = "\n";
// 链接 socket 获取数据
DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
WordWithCount
,用来表示单词的 key 和 count。
keyBy()
函数对
key
进行分组。
window
函数表示每一个滑动窗口,
SlidingProcessingTimeWindows
实现每隔 1s 对过去 2s 进行计数。
public class SocketWindowWCJava {
public static void main(String[] args) throws Exception {
// 获取流式运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String hostname = "127.0.0.1";
int port = 8899;
String delimiter = "\n";
// 获取数据源(Socket数据源,单词以逗号分割)
DataStreamSource<String> source = env.socketTextStream(hostname, port, delimiter);
SingleOutputStreamOperator<WC> res = source.flatMap( new FlatMapFunction<String, WC>() {
@Override
public void flatMap(String value, Collector<WC> out) throws Exception {
String[] splits = value.split( ",");
for (String split : splits) {
out.collect( new WC(split, 1));
}
}
}).keyBy(x -> x.word)
.window(SlidingProcessingTimeWindows.of(Time.seconds( 1), Time.seconds( 2))) // 每隔1秒,统计过去2秒的数据
// .sum("count");
.reduce( new ReduceFunction<WC>() {
@Override
public WC reduce(WC t1, WC t2) throws Exception {
return new WC(t1.word, t1.count+t2.count);
}
});
res.print().setParallelism( 1);
env.execute( "SocketWindowWCJava");
}
public static class WC {
public String word;
public int count;
public WC() {}
public WC(String word, int count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WC{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
netcat
程序,然后启动
Flink
程序:
$ nc -lk 8899
flink,flink,spark
hadoop,flink
object SocketWindowWCScala {
def main(args: Array[ String]): Unit = {
// 获取运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val hostname = "localhost"
val port = 8899
val delimiter = '\n'
val source = env.socketTextStream(hostname, port, delimiter)
import org.apache.flink.api.scala._
// 数据格式:word,word2,word3
val res = source.flatMap(line => line.split(',')) // 将每一行按照逗号打平
.map(word => WC(word, 1))
.keyBy(x => x.word)
.window( SlidingProcessingTimeWindows.of( Time.seconds( 1), Time.seconds( 2)))
.reduce((v1, v2) => WC(v1.word, v1.count + v2.count))
res.print( "data: ").setParallelism( 1)
env.execute( "SocketWindowWCScala")
}
case class WC(word: String, count: Long)
}
nc -lk 8888
flink,flink,spark
hadoop,flink
Batch WordCount
Java,Fink
Scala
Streaming
Flink,Java
Scala
Batch,Scala
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource text = env.readTextFile(filePath);
FlatMapFunction
接口进行数据的打平操作(上面类 Tokenizer 的实现)。
public class WordCountJava {
public static void main(String[] args) throws Exception {
String filePath = "./datas/dm.csv";
String resultPath = "./datas/wc_rst.csv";
// 获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> source = env.readTextFile(filePath);
AggregateOperator<Tuple2<String, Integer>> res = source.flatMap( new JGFlatMapFunction())
.groupBy( 0)
.sum( 1);
res.print();
res.writeAsCsv(resultPath).setParallelism( 1);
env.execute( "WordCountJava");
}
public static class JGFlatMapFunction implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] splits = value.split( ",");
for (String split : splits) {
out.collect(Tuple2.of(split, 1));
}
}
}
}
./datas/dm.csv
中的数据,最后计算结果打印到控制台以及存储结果数据到
./datas/wc_rst.csv
object WordCountScala {
def main(args: Array[ String]): Unit = {
val filePath = "./datas/dm.csv"
val resultPath = "./datas/wc_rst.csv"
// 获取运行环境
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile(filePath)
//引入隐式转换
import org.apache.flink.api.scala._
val counts = text.flatMap { _.toLowerCase.split( ",") filter { _.nonEmpty } }
.map((_, 1))
.groupBy( 0)
.sum( 1)
counts.print()
counts.writeAsCsv(resultPath, "\n", " ")
}
}
Flink SQL WordCount
SELECT word, COUNT(*) FROM table GROUP BY word;
<!-- use the Table API & SQL for defining pipelines.-->
<dependency>
<groupId>org.apache.flink </groupId>
<artifactId>flink-table-api-java-bridge_${scala.version} </artifactId>
<version>${flink.version} </version>
</dependency>
<dependency>
<groupId>org.apache.flink </groupId>
<artifactId>flink-table-api-scala-bridge_${scala.version} </artifactId>
<version>${flink.version} </version>
</dependency>
<!-- run the Table API & SQL programs locally within your IDE,-->
<dependency>
<groupId>org.apache.flink </groupId>
<artifactId>flink-table-planner-blink_${scala.version} </artifactId>
<version>${flink.version} </version>
</dependency>
<!-- SQL Client-->
<dependency>
<groupId>org.apache.flink </groupId>
<artifactId>flink-table-common </artifactId>
<version>${flink.version} </version>
<!-- <scope>provided</scope>-->
</dependency>
$ nc -lk 8899
spark,flink,spark
spark,flink,spark
...
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
SingleOutputStreamOperator<WC> dataStream = env.socketTextStream( "localhost", 8899)
.flatMap( new FlatMapFunction<String, WC>() {
@Override
public void flatMap(String value, Collector<WC> out) throws Exception {
String[] splits = value.split( ",");
for (String split : splits) {
out.collect( new WC(split, 1L));
}
}
});
Table WordCountTable = tableEnv.fromDataStream(dataStream);
tableEnv.createTemporaryView( "WC", WordCountTable);
Table resultTable = tableEnv.sqlQuery( "SELECT word, SUM(`count`) FROM WC group by word");
tableEnv.toRetractStream(resultTable, Row .class).print().setParallelism(1);
public class WordCountWithSQLJava {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
SingleOutputStreamOperator<WC> dataStream = env.socketTextStream( "localhost", 8899)
.flatMap( new FlatMapFunction<String, WC>() {
@Override
public void flatMap(String value, Collector<WC> out) throws Exception {
String[] splits = value.split( ",");
for (String split : splits) {
out.collect( new WC(split, 1L));
}
}
});
//DataStream 转sql & 查询
Table WordCountTable = tableEnv.fromDataStream(dataStream);
tableEnv.createTemporaryView( "WC", WordCountTable);
Table resultTable = tableEnv.sqlQuery( "SELECT word, SUM(`count`) FROM WC group by word");
// 将结果数据转换为DataStream toRetractStream toAppendStream
tableEnv.toRetractStream(resultTable, Row .class).print().setParallelism(1);
env.execute( "WCSQLJava");
}
public static class WC {
public String word;
public long count;
public WC() {}
public WC(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WC {" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
object WordCountSQLScala {
def main(args: Array[ String]): Unit = {
// 创建运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
import org.apache.flink.api.scala._
// 从 nc 接入数据, 数据格式:word,word2,word3
val dataStream = env.socketTextStream( "localhost", 8899, '\n')
.flatMap(line => line.split(','))
.map(word => WC(word, 1L))
// 转换为一个表(table) & 查询
val inputTable = tableEnv.fromDataStream(dataStream)
tableEnv.createTemporaryView( "WC", inputTable)
val resultTable = tableEnv.sqlQuery( "SELECT word, SUM(`count`) FROM WC GROUP BY word")
// toAppendStream toRetractStream
val resValue = tableEnv.toChangelogStream(resultTable)
resValue.print().setParallelism( 1)
env.execute( "WordCountSQLScala")
}
case class WC(word: String, count: Long)
}
总结
Flink汇总
GitHub[阅读原文]直达 👉
https://github.com/xiaozhutec/FlinkProject1.13.6.git
🔥火热书籍:给大家准备了目前比较火的 Flink 电子书
静候 各位朋友,备注“Flink”即可很快通过!