flink读取Kafka存储到mysql或oracle整套流程
1.相关的依赖
<!-- json -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.2.4</version>
</dependency>
<!-- mysql连接池 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<version>2.1.1</version>
</dependency>
<!-- oracle连接池 -->
<oracle.version>11.2.0.3</oracle.version>
<druid.version>1.1.13</druid.version>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc6</artifactId>
<version>${oracle.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>${druid.version}</version>
</dependency>
2.写入数据到kafka
package com.kfk.flink.util;
import com.entity.Student;
import com.google.gson.Gson;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Date;
import java.util.Properties;
public class KafkaUtil {
public static final String broker_list = "bigdata-pro01.kfk.com:9092";
public static final String topic = "protest"; //kafka topic 需要和 flink 程序用同一个 topic
public static void writeToKafka() throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", broker_list);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer<String, String>(props);
Gson gson = new Gson();
for (int i = 1; i <= 100000; i++) {
String name = "zhisheng"+(i%2==0?i-1:i);
Student student = new Student(i, name, "password" + i, 18 + i,new Date().getTime());
ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, gson.toJson(student));
producer.send(record);
System.out.println("发送数据: " + gson.toJson(student));
Thread.sleep(1 * 500); //发送一条数据 sleep 10s,相当于 1 分钟 6 条
}
producer.flush();
}
public static void main(String[] args) throws InterruptedException {
writeToKafka();
}
}
3.接收kafka数据,并批量入库
package com.kfk.flink.datastream.kafkaToMysql;
import com.entity.Student;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class Master {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties props = new Properties();
props.put("bootstrap.servers", "bigdata-pro01.kfk.com:9092");
props.put("zookeeper.connect", "bigdata-pro01.kfk.com:2181");
props.put("group.id", "metric-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
DataStream<String> protest = env.addSource(new FlinkKafkaConsumer<>(
"protest", //这个 kafka topic 需要和上面的工具类的 topic 一致
new SimpleStringSchema(),
props)).setParallelism(1);
DataStream<Student> map = protest.map(new MapFunction<String, Student>() {
@Override
public Student map(String json) throws Exception {
return new Gson().fromJson(json, Student.class);
}
});
/**
* 指派eventtime和延迟时间
*/
DataStream<Student> dataStream = map.assignTimestampsAndWatermarks(WatermarkStrategy.<Student>forBoundedOutOfOrderness(Duration.ofSeconds(0L))
.withTimestampAssigner((event, timestamp) -> event.getEventTime()));
/**
* 指定窗口大小
* flink版本11后必须加wm
*/
SingleOutputStreamOperator<List<Student>> apply = dataStream.keyBy("name")
.window(TumblingEventTimeWindows.of(Time.seconds(20L)))
.apply(new WindowFunction<Student, List<Student>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow,
Iterable<Student> iterable, Collector<List<Student>> collector) throws Exception {
ArrayList<Student> students = Lists.newArrayList(iterable);
collector.collect(students);
}
});
apply.addSink(new SinkToMySQL());
env.execute("flink learning connectors kafka");
}
}
4.sink to mysql 代码
package com.kfk.flink.datastream.kafkaToMysql;
import com.entity.Student;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.List;
/**
* Desc: 数据批量 sink 数据到 mysql
*/
public class SinkToMySQL extends RichSinkFunction<List<Student>> {
PreparedStatement ps;
BasicDataSource dataSource;
private Connection connection;
/**
* open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
* @param parameters
* @throws Exception
*/
public void open(Configuration parameters) throws Exception {
super.open(parameters);
dataSource = new BasicDataSource();
connection = getConnection(dataSource);
String sql = "insert into student(id, name, password, age) values(?, ?, ?, ?);";
ps = connection.prepareStatement(sql);
}
public void close() throws Exception {
super.close();
//关闭连接和释放资源
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
/**
* 每条数据的插入都要调用一次 invoke() 方法
* @param value
* @param context
* @throws Exception
*/
public void invoke(List<Student> value, Context context) throws Exception {
//遍历数据集合
for (Student student : value) {
ps.setInt(1, student.getId());
ps.setString(2, student.getName());
ps.setString(3, student.getPassword());
ps.setInt(4, student.getAge());
ps.addBatch();
}
int[] count = ps.executeBatch();//批量后执行
System.out.println("成功了插入了" + count.length + "行数据");
}
private static Connection getConnection(BasicDataSource dataSource) {
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
//注意,替换成自己本地的 mysql 数据库地址和用户名、密码
dataSource.setUrl("jdbc:mysql://bigdata-pro01.kfk.com:3306/flink");
dataSource.setUsername("root");
dataSource.setPassword("Root@123");
//设置连接池的一些参数,此处的参数根据数据库设置
//否则过大会报错:Error preloading the connection pool
dataSource.setInitialSize(5);
dataSource.setMaxTotal(10);
dataSource.setMinIdle(2);
Connection con = null;
try {
con = dataSource.getConnection();
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>创建连接池成功:" + con);
} catch (Exception e) {
e.printStackTrace();
System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
}
return con;
}
}
5.sink to oracle代码
package com.kfk.flink.datastream.kafkaToMysql;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.flink.configuration.Configuration;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.List;
/**
* 程序功能: 数据批量 sink 数据到 oracle
* */
public class SinkToORACLE extends RichSinkFunction<List<String>> {
private PreparedStatement ps;
private BasicDataSource dataSource;
private Connection connection;
/**
* open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
*/
public void open(Configuration parameters) throws Exception {
super.open(parameters);
dataSource = new BasicDataSource();
connection = getConnection(dataSource);
//oracle的使用建议表名以及字段名全部使用大写,以免报错:如表或视图不存在或者无效的列。
String sql = "insert into STUDENT(ID,NAME,PASSWORD,AAAA) values(?, ?, ?, ?)";
ps = this.connection.prepareStatement(sql);
}
public void close() throws Exception {
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
/**
* 每批数据的插入都要调用一次 invoke() 方法
*/
public void invoke(List<String> value, Context context) throws Exception {
//遍历数据集合
for (String student : value) {
ps.setInt(1, 1);
ps.setString(2, student);
ps.setString(3, "123456");
ps.setInt(4, 18);
ps.addBatch();
}
int[] count = ps.executeBatch(); //批量后执行
System.out.println("成功了插入了" + count.length + "行数据");
}
private static Connection getConnection(BasicDataSource dataSource) {
//设置连接池的一些参数
dataSource.setInitialSize(10);
dataSource.setMaxTotal(50);
dataSource.setMinIdle(2);
Connection con = null;
try {
Class.forName("oracle.jdbc.driver.OracleDriver");
con = DriverManager.getConnection("jdbc:oracle:thin:@localhost:1521:xe", "hr", "hr");
System.out.println("创建连接池:" + con);
} catch (Exception e) {
e.printStackTrace();
System.out.println("-----------oracle get connection has exception , msg = " + e.getMessage());
}
return con;
}
}