vlambda博客
学习文章列表

flink读取Kafka存储到mysql或oracle整套流程

1.相关的依赖

<!-- json --><dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.2.4</version></dependency><!-- mysql连接池 --><dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-dbcp2</artifactId> <version>2.1.1</version></dependency><!-- oracle连接池 --><oracle.version>11.2.0.3</oracle.version><druid.version>1.1.13</druid.version>
<dependency> <groupId>com.oracle</groupId> <artifactId>ojdbc6</artifactId> <version>${oracle.version}</version></dependency><dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>${druid.version}</version></dependency>

2.写入数据到kafka

package com.kfk.flink.util;
import com.entity.Student;import com.google.gson.Gson;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Date;import java.util.Properties;
public class KafkaUtil {
public static final String broker_list = "bigdata-pro01.kfk.com:9092"; public static final String topic = "protest"; //kafka topic 需要和 flink 程序用同一个 topic
public static void writeToKafka() throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", broker_list); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer producer = new KafkaProducer<String, String>(props); Gson gson = new Gson(); for (int i = 1; i <= 100000; i++) { String name = "zhisheng"+(i%2==0?i-1:i); Student student = new Student(i, name, "password" + i, 18 + i,new Date().getTime()); ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, gson.toJson(student)); producer.send(record); System.out.println("发送数据: " + gson.toJson(student)); Thread.sleep(1 * 500); //发送一条数据 sleep 10s,相当于 1 分钟 6 条 } producer.flush();    } public static void main(String[] args) throws InterruptedException { writeToKafka(); }}

3.接收kafka数据,并批量入库

package com.kfk.flink.datastream.kafkaToMysql;
import com.entity.Student;import com.google.common.collect.Lists;import com.google.gson.Gson;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.tuple.Tuple;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.windowing.WindowFunction;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.util.Collector;import java.time.Duration;import java.util.ArrayList;import java.util.List;import java.util.Properties;
public class Master {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); Properties props = new Properties(); props.put("bootstrap.servers", "bigdata-pro01.kfk.com:9092"); props.put("zookeeper.connect", "bigdata-pro01.kfk.com:2181"); props.put("group.id", "metric-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "latest");
DataStream<String> protest = env.addSource(new FlinkKafkaConsumer<>( "protest", //这个 kafka topic 需要和上面的工具类的 topic 一致 new SimpleStringSchema(), props)).setParallelism(1); DataStream<Student> map = protest.map(new MapFunction<String, Student>() { @Override public Student map(String json) throws Exception { return new Gson().fromJson(json, Student.class); } }); /** * 指派eventtime和延迟时间 */ DataStream<Student> dataStream = map.assignTimestampsAndWatermarks(WatermarkStrategy.<Student>forBoundedOutOfOrderness(Duration.ofSeconds(0L)) .withTimestampAssigner((event, timestamp) -> event.getEventTime())); /** * 指定窗口大小 * flink版本11后必须加wm */ SingleOutputStreamOperator<List<Student>> apply = dataStream.keyBy("name") .window(TumblingEventTimeWindows.of(Time.seconds(20L))) .apply(new WindowFunction<Student, List<Student>, Tuple, TimeWindow>() { @Override public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Student> iterable, Collector<List<Student>> collector) throws Exception {                        ArrayList<Student> students = Lists.newArrayList(iterable); collector.collect(students); } }); apply.addSink(new SinkToMySQL()); env.execute("flink learning connectors kafka"); }}

4.sink to mysql 代码

package com.kfk.flink.datastream.kafkaToMysql;
import com.entity.Student;import org.apache.commons.dbcp2.BasicDataSource;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;import java.sql.PreparedStatement;import java.util.List;
/** * Desc: 数据批量 sink 数据到 mysql */public class SinkToMySQL extends RichSinkFunction<List<Student>> {
PreparedStatement ps; BasicDataSource dataSource; private Connection connection;
/** * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接 * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); dataSource = new BasicDataSource(); connection = getConnection(dataSource); String sql = "insert into student(id, name, password, age) values(?, ?, ?, ?);"; ps = connection.prepareStatement(sql); }
@Override public void close() throws Exception { super.close(); //关闭连接和释放资源 if (connection != null) { connection.close(); } if (ps != null) { ps.close(); } }
/** * 每条数据的插入都要调用一次 invoke() 方法 * @param value * @param context * @throws Exception */ @Override public void invoke(List<Student> value, Context context) throws Exception {        //遍历数据集合 for (Student student : value) { ps.setInt(1, student.getId()); ps.setString(2, student.getName()); ps.setString(3, student.getPassword()); ps.setInt(4, student.getAge()); ps.addBatch(); } int[] count = ps.executeBatch();//批量后执行 System.out.println("成功了插入了" + count.length + "行数据"); }
private static Connection getConnection(BasicDataSource dataSource) { dataSource.setDriverClassName("com.mysql.jdbc.Driver"); //注意,替换成自己本地的 mysql 数据库地址和用户名、密码 dataSource.setUrl("jdbc:mysql://bigdata-pro01.kfk.com:3306/flink"); dataSource.setUsername("root"); dataSource.setPassword("Root@123");        //设置连接池的一些参数,此处的参数根据数据库设置        //否则过大会报错:Error preloading the connection pool dataSource.setInitialSize(5); dataSource.setMaxTotal(10); dataSource.setMinIdle(2); Connection con = null; try { con = dataSource.getConnection(); System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>创建连接池成功:" + con); } catch (Exception e) { e.printStackTrace(); System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage()); } return con; }}

5.sink to oracle代码

package com.kfk.flink.datastream.kafkaToMysql;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.apache.commons.dbcp2.BasicDataSource;import org.apache.flink.configuration.Configuration;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.util.List;
/** * 程序功能: 数据批量 sink 数据到 oracle * */public class SinkToORACLE extends RichSinkFunction<List<String>> { private PreparedStatement ps; private BasicDataSource dataSource; private Connection connection;
/** * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接 */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); dataSource = new BasicDataSource(); connection = getConnection(dataSource);        //oracle的使用建议表名以及字段名全部使用大写,以免报错:如表或视图不存在或者无效的列。 String sql = "insert into STUDENT(ID,NAME,PASSWORD,AAAA) values(?, ?, ?, ?)"; ps = this.connection.prepareStatement(sql); }
@Override public void close() throws Exception { if (connection != null) { connection.close(); } if (ps != null) { ps.close(); } }
/** * 每批数据的插入都要调用一次 invoke() 方法 */ @Override public void invoke(List<String> value, Context context) throws Exception { //遍历数据集合 for (String student : value) { ps.setInt(1, 1); ps.setString(2, student); ps.setString(3, "123456"); ps.setInt(4, 18); ps.addBatch(); } int[] count = ps.executeBatch(); //批量后执行 System.out.println("成功了插入了" + count.length + "行数据"); }
private static Connection getConnection(BasicDataSource dataSource) { //设置连接池的一些参数 dataSource.setInitialSize(10); dataSource.setMaxTotal(50); dataSource.setMinIdle(2); Connection con = null; try { Class.forName("oracle.jdbc.driver.OracleDriver"); con = DriverManager.getConnection("jdbc:oracle:thin:@localhost:1521:xe", "hr", "hr"); System.out.println("创建连接池:" + con); } catch (Exception e) { e.printStackTrace(); System.out.println("-----------oracle get connection has exception , msg = " + e.getMessage()); } return con; }}