Flink集成Vert.x数据库异步驱动
关于Vert.x在小弟之前的文章也介绍过:
情景假设:Flink读取离线日志数据进行处理时,可能需要查询数据库再进行聚合,例如日志只有userId,需要从数据库查询userName等
开始写代码
新建maven项目,pom
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.13.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.13.1</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.13.1</version></dependency><dependency><groupId>io.vertx</groupId><artifactId>vertx-mysql-client</artifactId><version>4.1.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
解释一下,前3个依赖是Java集成Flink需要的,第4个vertx-mysql-client是Java集成Vert.x需要的,第5个是lombok
Flink使用外部Mysql需要用到RichAsyncFunction,下面新建一个并集成基于Vert.x的Mysql异步驱动
新建类MysqlAsyncFunc
package com.hm.function;import com.hm.beans.User;import com.hm.beans.UserVO;import io.vertx.mysqlclient.MySQLConnectOptions;import io.vertx.mysqlclient.MySQLPool;import io.vertx.sqlclient.PoolOptions;import io.vertx.sqlclient.Row;import io.vertx.sqlclient.RowSet;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.async.ResultFuture;import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;import java.util.ArrayList;import java.util.List;public class MysqlAsyncFunc extends RichAsyncFunction<UserVO, User> {private static final long serialVersionUID = 1L;private transient MySQLPool client;public void open(Configuration parameters) throws Exception {MySQLConnectOptions connectOptions = new MySQLConnectOptions().setPort(3306).setHost("localhost").setDatabase("flink_db").setUser("root").setPassword("root");// Pool optionsPoolOptions poolOptions = new PoolOptions().setMaxSize(5);// Create the client poolclient = MySQLPool.pool(connectOptions, poolOptions);}public void close() throws Exception {client.close();}public void asyncInvoke(UserVO userVO, ResultFuture<User> resultFuture) throws Exception {client.query("SELECT * FROM user WHERE name like '%"+userVO.getName()+"%'").execute(ar -> {List<User> users = new ArrayList<>();if (ar.succeeded()) {RowSet<Row> result = ar.result();for (Row row : result) {User user = new User();user.setId(row.getInteger("id"));user.setName(row.getString("name"));user.setAge(row.getInteger("age"));users.add(user);}} else {System.out.println("Failure: " + ar.cause().getMessage());}// Now close the pool// client.close();resultFuture.complete(users);});}}
Vert.x的Mysql异步参考官网:https://vertx.io/docs/vertx-mysql-client/java/
UserVO 是入参,User是出参
package com.hm.beans;import lombok.Data;public class User {private Integer id;private String name;private Integer age;}
package com.hm.beans;import lombok.AllArgsConstructor;import lombok.Data;public class UserVO {private String name;private Integer age;}
新建一个有Main方法的类
package com.hm;import com.hm.beans.UserVO;import com.hm.function.MysqlAsyncFunc;import org.apache.flink.streaming.api.datastream.AsyncDataStream;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.concurrent.TimeUnit;public class MysqlAsyncTest {public static void main(String[] args) throws Exception {//获取流处理执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//读取数据DataStream<String> inputStream = env.readTextFile("D:\\dev\\java\\micro-service\\flink\\src\\main\\resources\\user.txt");//数据转换成UserVO类型DataStream<UserVO> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new UserVO(fields[0], Integer.parseInt (fields[1]));});dataStream.print();//控制台输出//根据名称到数据库查询返回AsyncDataStream.unorderedWait(dataStream,new MysqlAsyncFunc(),10,TimeUnit.SECONDS,20).print();//跟批处理不一样,流处理的话,以上只是定义,还需要执行env.execute();}}
数据源
数据库
结果
