vlambda博客
学习文章列表

Flink集成Vert.x数据库异步驱动

关于Vert.x在小弟之前的文章也介绍过:


情景假设:Flink读取离线日志数据进行处理时,可能需要查询数据库再进行聚合,例如日志只有userId,需要从数据库查询userName等



开始写代码

新建maven项目,pom

 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.13.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.13.1</version> <!-- <scope>provided</scope>-->    </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.13.1</version>    </dependency> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-mysql-client</artifactId> <version>4.1.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency>

解释一下,前3个依赖是Java集成Flink需要的,第4个vertx-mysql-client是Java集成Vert.x需要的,第5个是lombok


Flink使用外部Mysql需要用到RichAsyncFunction,下面新建一个并集成基于Vert.x的Mysql异步驱动

新建类MysqlAsyncFunc

package com.hm.function;
import com.hm.beans.User;import com.hm.beans.UserVO;import io.vertx.mysqlclient.MySQLConnectOptions;import io.vertx.mysqlclient.MySQLPool;import io.vertx.sqlclient.PoolOptions;import io.vertx.sqlclient.Row;import io.vertx.sqlclient.RowSet;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.async.ResultFuture;import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import java.util.ArrayList;import java.util.List;
public class MysqlAsyncFunc extends RichAsyncFunction<UserVO, User> { private static final long serialVersionUID = 1L; private transient MySQLPool client;
@Override public void open(Configuration parameters) throws Exception { MySQLConnectOptions connectOptions = new MySQLConnectOptions() .setPort(3306) .setHost("localhost") .setDatabase("flink_db") .setUser("root") .setPassword("root"); // Pool options PoolOptions poolOptions = new PoolOptions() .setMaxSize(5); // Create the client pool client = MySQLPool.pool(connectOptions, poolOptions); }
@Override public void close() throws Exception { client.close(); }
@Override public void asyncInvoke(UserVO userVO, ResultFuture<User> resultFuture) throws Exception { client .query("SELECT * FROM user WHERE name like '%"+userVO.getName()+"%'") .execute(ar -> { List<User> users = new ArrayList<>(); if (ar.succeeded()) { RowSet<Row> result = ar.result(); for (Row row : result) { User user = new User(); user.setId(row.getInteger("id")); user.setName(row.getString("name")); user.setAge(row.getInteger("age")); users.add(user); } } else { System.out.println("Failure: " + ar.cause().getMessage()); } // Now close the pool // client.close(); resultFuture.complete(users); }); }}

Vert.x的Mysql异步参考官网:https://vertx.io/docs/vertx-mysql-client/java/

UserVO 是入参,User是出参

package com.hm.beans;import lombok.Data;@Datapublic class User { private Integer id; private String name; private Integer age;}
package com.hm.beans;import lombok.AllArgsConstructor;import lombok.Data;@Data@AllArgsConstructorpublic class UserVO { private String name; private Integer age;}

新建一个有Main方法的类

package com.hm;
import com.hm.beans.UserVO;import com.hm.function.MysqlAsyncFunc;import org.apache.flink.streaming.api.datastream.AsyncDataStream;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.concurrent.TimeUnit;
public class MysqlAsyncTest { public static void main(String[] args) throws Exception { //获取流处理执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //读取数据 DataStream<String> inputStream = env.readTextFile("D:\\dev\\java\\micro-service\\flink\\src\\main\\resources\\user.txt"); //数据转换成UserVO类型 DataStream<UserVO> dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new UserVO(fields[0], Integer.parseInt (fields[1])); }); dataStream.print();//控制台输出 //根据名称到数据库查询返回 AsyncDataStream.unorderedWait( dataStream, new MysqlAsyncFunc(), 10, TimeUnit.SECONDS, 20).print(); //跟批处理不一样,流处理的话,以上只是定义,还需要执行 env.execute(); }}

数据源

Flink集成Vert.x数据库异步驱动

数据库

结果