vlambda博客
学习文章列表

Flink 的三种WordCount(文末领取Flink书籍)

Hi,大家好,我是 Johngo 呀!
今天是 Flink 从 0 到 1 系列的第 2 篇:《WordCount及FlinkSQL》。
目标:通过每天一小会儿,熟悉 Flink 大大小小知识点。

环境版本

JDK:1.8
Flink:1.13.6
Scala:2.12
github:https://github.com/xiaozhutec/FlinkProject1.13.6.git
创建Flink 工程网上已经很多说明方法了,这里先不赘述,以下全部的代码使用 IDEA 进行编码。
本文讲解的 WordCount 程序是大数据的入门程序。
WordCount 程序是在不同上下文环境下实现的,是一个入门版本,可以跟着一步一步实现起来。包括 Streaming 和 Batch 以及 SQL 的简单案例。
上述所有的 Flink 语义都会在后面分篇章详细赘述。

基础配置

首先pom.xml 中要配置的依赖是:
provided 选项在这表示此依赖只在代码编译的时候使用,运行和打包的时候不使用。
版本依赖
 
   
   
 
<properties>
     <maven.compiler.source>8 </maven.compiler.source>
     <maven.compiler.target>8 </maven.compiler.target>
     <project.build.sourceEncoding>UTF-8 </project.build.sourceEncoding>
     <flink.version>1.13.6 </flink.version>
     <scala.version>2.12 </scala.version>
</properties>
java 相关依赖:
 
   
   
 
<dependency>
     <groupId>org.apache.flink </groupId>
     <artifactId>flink-java </artifactId>
     <version>${flink.version} </version>
   <!-- <scope>provided</scope> -->
</dependency>
<dependency>
     <groupId>org.apache.flink </groupId>
     <artifactId>flink-streaming-java_${scala.version} </artifactId>
     <version>${flink.version} </version>
   <!-- <scope>provided</scope>-->
</dependency>
scala 相关依赖:
 
   
   
 
<dependency>
     <groupId>org.apache.flink </groupId>
     <artifactId>flink-streaming-scala_${scala.binary.version} </artifactId>
     <version>${flink.version} </version>
   <!-- <scope>provided</scope> -->
</dependency>

<dependency>
     <groupId>org.apache.flink </groupId>
     <artifactId>flink-clients_${scala.binary.version} </artifactId>
     <version>${flink.version} </version>
   <!-- <scope>provided</scope>-->
</dependency>
另外,pom文件中镜像文件建议配置阿里云的maven仓库,国内下载速度会快,如果找不到对应的镜像文件,需要切换到国外仓库。
 
   
   
 
<repositories>
     <repository>
         <id>central </id>
         <name>aliyun maven </name>
         <url>http://maven.aliyun.com/nexus/content/groups/public/ </url>
         <layout>default </layout>
         <!-- 是否开启发布版构件下载 -->
         <releases>
             <enabled>true </enabled>
         </releases>
         <!-- 是否开启快照版构件下载 -->
         <snapshots>
             <enabled>false </enabled>
         </snapshots>
     </repository>
</repositories>
语言界的 hello word,大数据界的 WordCount,都是一个入门Demo。
今天咱们也按照这个入门的 Demo,把 Flink 相关代码捋顺。
包括 Streaming、Batch 以及 Flink Sql 三方面分别来实现。

Streaming WordCount

先来分析一个 Streaming WordCount。
为了模仿流式计算,咱们在本地利用  netcat 命令  nc -l {port}来进行模仿数据产出。
同时,咱们实现的功能是:每隔 1s 计算过去 2s 内产出数据各单词的个数,也就是实现每隔1s计算过去 2s 的 WordCount 程序。
将窗口内接收到的数据进行拆分致每一行,然后分别赋值为1,之后进行分组求和。
Flink 的三种WordCount(文末领取Flink书籍)
大致处理的流程如上所示,现在来一步一步实现这个案例。
先开始创建 Flink 的运行环境:
 
   
   
 
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
然后指定了数据 Source 源,以及 Source 源的一些配置:
 
   
   
 
String hostname =  "127.0.0.1";
int port =  8899;
String delimiter =  "\n";
// 链接 socket 获取数据
DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
之后就进行了数据的平铺,分组,窗口计算等操作。
另外,程序中实现了一个内部类 WordWithCount,用来表示单词的 key 和 count。
利用  keyBy()函数对  key进行分组。
window函数表示每一个滑动窗口, SlidingProcessingTimeWindows实现每隔 1s 对过去 2s 进行计数。
后面的教程会详细讲解 Windows 相关知识,这里仅做入门学习。
下面整体看下代码:
 
   
   
 
public  class SocketWindowWCJava {
     public static void main(String[] args) throws Exception {
         // 获取流式运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String hostname =  "127.0.0.1";
         int port =  8899;
        String delimiter =  "\n";
         // 获取数据源(Socket数据源,单词以逗号分割)
        DataStreamSource<String> source = env.socketTextStream(hostname, port, delimiter);
        SingleOutputStreamOperator<WC> res = source.flatMap( new FlatMapFunction<String, WC>() {

                     @Override
                     public void flatMap(String value, Collector<WC> out) throws Exception {
                        String[] splits = value.split( ",");
                         for (String split : splits) {
                            out.collect( new WC(split,  1));
                        }
                    }
                }).keyBy(x -> x.word)
                .window(SlidingProcessingTimeWindows.of(Time.seconds( 1), Time.seconds( 2)))   // 每隔1秒,统计过去2秒的数据
                 // .sum("count");
                .reduce( new ReduceFunction<WC>() {
                     @Override
                     public WC reduce(WC t1, WC t2) throws Exception {
                         return  new WC(t1.word, t1.count+t2.count);
                    }
                });
        
        res.print().setParallelism( 1);
        env.execute( "SocketWindowWCJava");
    }

     public  static  class WC {
         public String word;
         public  int count;

         public WC() {}
         public WC(String word, int count) {
             this.word = word;
             this.count = count;
        }

         @Override
         public String toString() {
             return  "WC{" +
                     "word='" + word +  '\'' +
                     ", count=" + count +
                     '}';
        }
    }
}
现在把程序执行起来,先在本地起一个 netcat程序,然后启动 Flink程序:
 
   
   
 
$  nc -lk 8899
flink,flink,spark
hadoop,flink
之后,控制台进行了相应的打印:
Flink 的三种WordCount(文末领取Flink书籍)
用 java 实现完,接下来用 scala 也实现一下相同的逻辑,有兴趣的朋友可作参考:
 
   
   
 
object SocketWindowWCScala {
   def main(args:  Array[ String]):  Unit = {
     // 获取运行环境
     val env =  StreamExecutionEnvironment.getExecutionEnvironment

     val hostname =  "localhost"
     val port =  8899
     val delimiter = '\n'
     val source = env.socketTextStream(hostname, port, delimiter)

     import org.apache.flink.api.scala._
     // 数据格式:word,word2,word3
     val res = source.flatMap(line => line.split(','))  // 将每一行按照逗号打平
      .map(word =>  WC(word,  1))
      .keyBy(x => x.word)
      .window( SlidingProcessingTimeWindows.of( Time.seconds( 1),  Time.seconds( 2)))
      .reduce((v1, v2) =>  WC(v1.word, v1.count + v2.count))

    res.print( "data: ").setParallelism( 1)

    env.execute( "SocketWindowWCScala")
  }

   case  class WC(word: String, count: Long)
}
依然是启动 flink 程序和 nc:
 
   
   
 
nc -lk 8888
flink,flink,spark
hadoop,flink
再看控制台的打印结果,是和咱们想实现的一致:
Flink 的三种WordCount(文末领取Flink书籍)
再次注意:窗口的使用方式在新版本中有较大的区别,这个咱们在后面会详细把这部分进行讲解。

Batch WordCount

批处理程序,这里用一个文本来作为数据源。
将文本中的数据进行拆分致每一行,然后分别赋值为1,之后进行分组求和。
Flink 的三种WordCount(文末领取Flink书籍)
处理逻辑依然如图所示,然后下面咱们也创建一个文本如图里的内容(src/main/datas/dm.csv):
 
   
   
 
Java,Fink
Scala
Streaming
Flink,Java
Scala
Batch,Scala
首先创建 Flink 运行环境
 
   
   
 
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
之后进行读取文件
 
   
   
 
DataSource text = env.readTextFile(filePath);
然后通过实现  FlatMapFunction 接口进行数据的打平操作(上面类 Tokenizer 的实现)。
最后进行分组求和,Batch WordCount 全部完成!
下面看 Batch 整体代码:
 
   
   
 
public  class WordCountJava {
     public static void main(String[] args) throws Exception {
        String filePath =  "./datas/dm.csv";
        String resultPath =  "./datas/wc_rst.csv";

         // 获取运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> source = env.readTextFile(filePath);

        AggregateOperator<Tuple2<String, Integer>> res = source.flatMap( new JGFlatMapFunction())
                .groupBy( 0)
                .sum( 1);
        res.print();
        res.writeAsCsv(resultPath).setParallelism( 1);

        env.execute( "WordCountJava");
    }

     public  static  class JGFlatMapFunction implements FlatMapFunction<StringTuple2<StringInteger>> {
 
         @Override
         public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            String[] splits = value.split( ",");
             for (String split : splits) {
                out.collect(Tuple2.of(split,  1));
            }
        }
    }
}
程序中,通过读取 ./datas/dm.csv中的数据,最后计算结果打印到控制台以及存储结果数据到 ./datas/wc_rst.csv
执行起来,看打印结果:
Flink 的三种WordCount(文末领取Flink书籍)
求得给定文件的 WordCount 的结果。
下面用 Scala 实现一次:
 
   
   
 
object WordCountScala {
   def main(args:  Array[ String]):  Unit = {
     val filePath =  "./datas/dm.csv"
     val resultPath =  "./datas/wc_rst.csv"

     // 获取运行环境
     val env =  ExecutionEnvironment.getExecutionEnvironment

     val text = env.readTextFile(filePath)

     //引入隐式转换
     import org.apache.flink.api.scala._
     val counts = text.flatMap { _.toLowerCase.split( ",") filter { _.nonEmpty } }
      .map((_,  1))
      .groupBy( 0)
      .sum( 1)
    counts.print()
    counts.writeAsCsv(resultPath,  "\n"" ")
  }
}

用 Scala 实现起来就很简单了。
注意:这块如果代码出错的话,试着找找导入的包是否正确。

Flink SQL WordCount

尤其是有过 MapReduce 和 Hive 经历的朋友,就可以和它们放在一起做比较,一个复杂,一个简单。
比如说下面的 SQL 语句,就一句就可以省去上面那么多的代码工作量。
 
   
   
 
SELECT word,  COUNT(*)  FROM  table  GROUP  BY word;
下面利用 FlinkSQL 实现 WordCount 功能。
首先,pom 文件必须要添加的依赖:
 
   
   
 
<!-- use the Table API & SQL for defining pipelines.-->
<dependency>
     <groupId>org.apache.flink </groupId>
     <artifactId>flink-table-api-java-bridge_${scala.version} </artifactId>
     <version>${flink.version} </version>
</dependency>
<dependency>
     <groupId>org.apache.flink </groupId>
     <artifactId>flink-table-api-scala-bridge_${scala.version} </artifactId>
     <version>${flink.version} </version>
</dependency>
<!-- run the Table API & SQL programs locally within your IDE,-->
<dependency>
     <groupId>org.apache.flink </groupId>
     <artifactId>flink-table-planner-blink_${scala.version} </artifactId>
     <version>${flink.version} </version>
</dependency>

<!-- SQL Client-->
<dependency>
     <groupId>org.apache.flink </groupId>
     <artifactId>flink-table-common </artifactId>
     <version>${flink.version} </version>
     <!-- <scope>provided</scope>-->
</dependency>
先用 Java 来实现 FlinkSQL,将 nc 程序起来,进行按照逗号分割进行测试。
 
   
   
 
$  nc -lk 8899
spark,flink,spark
spark,flink,spark
...
a. 首先创建 Flink 的运行环境以及 SQL api 环境:
 
   
   
 
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
b. nc 输出,将字符串转换为 (word, 1L) 的格式:
 
   
   
 
SingleOutputStreamOperator<WC> dataStream = env.socketTextStream( "localhost"8899)
        .flatMap( new FlatMapFunction<String, WC>() {
     @Override
     public void flatMap(String value, Collector<WC> out) throws Exception {
        String[] splits = value.split( ",");
         for (String split : splits) {
            out.collect( new WC(split,  1L));
        }
    }
});
c. 注册成表,转为视图&查询
 
   
   
 
Table WordCountTable = tableEnv.fromDataStream(dataStream);
tableEnv.createTemporaryView( "WC", WordCountTable);
Table resultTable = tableEnv.sqlQuery( "SELECT word, SUM(`count`) FROM WC group by word");
d. 转为 Stream 并且打印出来
 
   
   
 
tableEnv.toRetractStream(resultTable, Row .class).print().setParallelism(1);
下面看整体代码:
 
   
   
 
public  class WordCountWithSQLJava {
     public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        SingleOutputStreamOperator<WC> dataStream = env.socketTextStream( "localhost"8899)
                .flatMap( new FlatMapFunction<String, WC>() {
             @Override
             public void flatMap(String value, Collector<WC> out) throws Exception {
                String[] splits = value.split( ",");
                 for (String split : splits) {
                    out.collect( new WC(split,  1L));
                }
            }
        });

         //DataStream 转sql & 查询
        Table WordCountTable = tableEnv.fromDataStream(dataStream);
        tableEnv.createTemporaryView( "WC", WordCountTable);
        Table resultTable = tableEnv.sqlQuery( "SELECT word, SUM(`count`) FROM WC group by word");

         // 将结果数据转换为DataStream toRetractStream toAppendStream
        tableEnv.toRetractStream(resultTable, Row .class).print().setParallelism(1);
        env.execute( "WCSQLJava");
    }

     public  static  class WC {
         public String word;
         public  long count;

         public  WC() {}
         public WC(String word, long count) {
             this.word = word;
             this.count = count;
        }
         @Override
         public String toString() {
             return  "WC {" +
                     "word='" + word +  '\'' +
                     ", count=" + count +
                     '}';
        }
    }
}

整体代码执行结果:
Flink 的三种WordCount(文末领取Flink书籍)
其中,+是操作后,I 是插入,U是更新,D是删除。例如:-U是撤回前的数据,+U是更新后的数据
true代表数据插入,false代表数据的撤回
Java 实现后,下面再用 Scala 来实现一次,代码逻辑一致,可以参考:
 
   
   
 
object WordCountSQLScala {
   def main(args:  Array[ String]):  Unit = {
     // 创建运行环境
     val env =  StreamExecutionEnvironment.getExecutionEnvironment
     val tableEnv =  StreamTableEnvironment.create(env)

     import org.apache.flink.api.scala._
     // 从 nc 接入数据, 数据格式:word,word2,word3
     val dataStream = env.socketTextStream( "localhost"8899, '\n')
      .flatMap(line => line.split(','))
      .map(word =>  WC(word,  1L))

     // 转换为一个表(table) & 查询
     val inputTable = tableEnv.fromDataStream(dataStream)
    tableEnv.createTemporaryView( "WC", inputTable)
     val resultTable = tableEnv.sqlQuery( "SELECT word, SUM(`count`) FROM WC GROUP BY word")

     // toAppendStream toRetractStream
     val resValue = tableEnv.toChangelogStream(resultTable)
    resValue.print().setParallelism( 1)

    env.execute( "WordCountSQLScala")
  }

   case  class WC(word: String, count: Long)
}
代码执行的结果也一致:
Flink 的三种WordCount(文末领取Flink书籍)

总结

今天实现了大数据的经典案例 WordCount,然后在不同场景下的实现。包括 Streaming 和 Batch,以及 Flink SQL 的实现。
该篇文章还只是一个入门级的程序,后面将会各重要点进行详细阐述。

Flink汇总

GitHub[阅读原文]直达 👉

https://github.com/xiaozhutec/FlinkProject1.13.6.git




🔥火热书籍:给大家准备了目前比较火的 Flink 电子书

静候 各位朋友,备注“Flink”即可很快通过!

我私发给大家!!