Flink集成Vert.x数据库异步驱动
关于Vert.x在小弟之前的文章也介绍过:
情景假设:Flink读取离线日志数据进行处理时,可能需要查询数据库再进行聚合,例如日志只有userId,需要从数据库查询userName等
开始写代码
新建maven项目,pom
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.1</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mysql-client</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
解释一下,前3个依赖是Java集成Flink需要的,第4个vertx-mysql-client是Java集成Vert.x需要的,第5个是lombok
Flink使用外部Mysql需要用到RichAsyncFunction,下面新建一个并集成基于Vert.x的Mysql异步驱动
新建类MysqlAsyncFunc
package com.hm.function;
import com.hm.beans.User;
import com.hm.beans.UserVO;
import io.vertx.mysqlclient.MySQLConnectOptions;
import io.vertx.mysqlclient.MySQLPool;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import java.util.ArrayList;
import java.util.List;
public class MysqlAsyncFunc extends RichAsyncFunction<UserVO, User> {
private static final long serialVersionUID = 1L;
private transient MySQLPool client;
public void open(Configuration parameters) throws Exception {
MySQLConnectOptions connectOptions = new MySQLConnectOptions()
.setPort(3306)
.setHost("localhost")
.setDatabase("flink_db")
.setUser("root")
.setPassword("root");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the client pool
client = MySQLPool.pool(connectOptions, poolOptions);
}
public void close() throws Exception {
client.close();
}
public void asyncInvoke(UserVO userVO, ResultFuture<User> resultFuture) throws Exception {
client
.query("SELECT * FROM user WHERE name like '%"+userVO.getName()+"%'")
.execute(ar -> {
List<User> users = new ArrayList<>();
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
for (Row row : result) {
User user = new User();
user.setId(row.getInteger("id"));
user.setName(row.getString("name"));
user.setAge(row.getInteger("age"));
users.add(user);
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
// Now close the pool
// client.close();
resultFuture.complete(users);
});
}
}
Vert.x的Mysql异步参考官网:https://vertx.io/docs/vertx-mysql-client/java/
UserVO 是入参,User是出参
package com.hm.beans;
import lombok.Data;
public class User {
private Integer id;
private String name;
private Integer age;
}
package com.hm.beans;
import lombok.AllArgsConstructor;
import lombok.Data;
public class UserVO {
private String name;
private Integer age;
}
新建一个有Main方法的类
package com.hm;
import com.hm.beans.UserVO;
import com.hm.function.MysqlAsyncFunc;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.concurrent.TimeUnit;
public class MysqlAsyncTest {
public static void main(String[] args) throws Exception {
//获取流处理执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取数据
DataStream<String> inputStream = env.readTextFile("D:\\dev\\java\\micro-service\\flink\\src\\main\\resources\\user.txt");
//数据转换成UserVO类型
DataStream<UserVO> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new UserVO(fields[0], Integer.parseInt (fields[1]));
});
dataStream.print();//控制台输出
//根据名称到数据库查询返回
AsyncDataStream.unorderedWait(
dataStream,
new MysqlAsyncFunc(),
10,
TimeUnit.SECONDS,
20).print();
//跟批处理不一样,流处理的话,以上只是定义,还需要执行
env.execute();
}
}
数据源
数据库
结果