flink读取Kafka存储到mysql或oracle整套流程
1.相关的依赖
<!-- json --><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.2.4</version></dependency><!-- mysql连接池 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-dbcp2</artifactId><version>2.1.1</version></dependency><!-- oracle连接池 --><oracle.version>11.2.0.3</oracle.version><druid.version>1.1.13</druid.version><dependency><groupId>com.oracle</groupId><artifactId>ojdbc6</artifactId><version>${oracle.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>${druid.version}</version></dependency>
2.写入数据到kafka
package com.kfk.flink.util;import com.entity.Student;import com.google.gson.Gson;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Date;import java.util.Properties;public class KafkaUtil {public static final String broker_list = "bigdata-pro01.kfk.com:9092";public static final String topic = "protest"; //kafka topic 需要和 flink 程序用同一个 topicpublic static void writeToKafka() throws InterruptedException {Properties props = new Properties();props.put("bootstrap.servers", broker_list);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer producer = new KafkaProducer<String, String>(props);Gson gson = new Gson();for (int i = 1; i <= 100000; i++) {String name = "zhisheng"+(i%2==0?i-1:i);Student student = new Student(i, name, "password" + i, 18 + i,new Date().getTime());ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, gson.toJson(student));producer.send(record);System.out.println("发送数据: " + gson.toJson(student));Thread.sleep(1 * 500); //发送一条数据 sleep 10s,相当于 1 分钟 6 条}producer.flush();}public static void main(String[] args) throws InterruptedException {writeToKafka();}}
3.接收kafka数据,并批量入库
package com.kfk.flink.datastream.kafkaToMysql;import com.entity.Student;import com.google.common.collect.Lists;import com.google.gson.Gson;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.tuple.Tuple;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.windowing.WindowFunction;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.util.Collector;import java.time.Duration;import java.util.ArrayList;import java.util.List;import java.util.Properties;public class Master {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);Properties props = new Properties();props.put("bootstrap.servers", "bigdata-pro01.kfk.com:9092");props.put("zookeeper.connect", "bigdata-pro01.kfk.com:2181");props.put("group.id", "metric-group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "latest");DataStream<String> protest = env.addSource(new FlinkKafkaConsumer<>("protest", //这个 kafka topic 需要和上面的工具类的 topic 一致new SimpleStringSchema(),props)).setParallelism(1);DataStream<Student> map = protest.map(new MapFunction<String, Student>() {@Overridepublic Student map(String json) throws Exception {return new Gson().fromJson(json, Student.class);}});/*** 指派eventtime和延迟时间*/DataStream<Student> dataStream = map.assignTimestampsAndWatermarks(WatermarkStrategy.<Student>forBoundedOutOfOrderness(Duration.ofSeconds(0L)).withTimestampAssigner((event, timestamp) -> event.getEventTime()));/*** 指定窗口大小* flink版本11后必须加wm*/SingleOutputStreamOperator<List<Student>> apply = dataStream.keyBy("name").window(TumblingEventTimeWindows.of(Time.seconds(20L))).apply(new WindowFunction<Student, List<Student>, Tuple, TimeWindow>() {@Overridepublic void apply(Tuple tuple, TimeWindow timeWindow,Iterable<Student> iterable, Collector<List<Student>> collector) throws Exception {ArrayList<Student> students = Lists.newArrayList(iterable);collector.collect(students);}});apply.addSink(new SinkToMySQL());env.execute("flink learning connectors kafka");}}
4.sink to mysql 代码
package com.kfk.flink.datastream.kafkaToMysql;import com.entity.Student;import org.apache.commons.dbcp2.BasicDataSource;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;import java.sql.PreparedStatement;import java.util.List;/*** Desc: 数据批量 sink 数据到 mysql*/public class SinkToMySQL extends RichSinkFunction<List<Student>> {PreparedStatement ps;BasicDataSource dataSource;private Connection connection;/*** open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接* @param parameters* @throws Exception*/public void open(Configuration parameters) throws Exception {super.open(parameters);dataSource = new BasicDataSource();connection = getConnection(dataSource);String sql = "insert into student(id, name, password, age) values(?, ?, ?, ?);";ps = connection.prepareStatement(sql);}public void close() throws Exception {super.close();//关闭连接和释放资源if (connection != null) {connection.close();}if (ps != null) {ps.close();}}/*** 每条数据的插入都要调用一次 invoke() 方法* @param value* @param context* @throws Exception*/public void invoke(List<Student> value, Context context) throws Exception {//遍历数据集合for (Student student : value) {ps.setInt(1, student.getId());ps.setString(2, student.getName());ps.setString(3, student.getPassword());ps.setInt(4, student.getAge());ps.addBatch();}int[] count = ps.executeBatch();//批量后执行System.out.println("成功了插入了" + count.length + "行数据");}private static Connection getConnection(BasicDataSource dataSource) {dataSource.setDriverClassName("com.mysql.jdbc.Driver");//注意,替换成自己本地的 mysql 数据库地址和用户名、密码dataSource.setUrl("jdbc:mysql://bigdata-pro01.kfk.com:3306/flink");dataSource.setUsername("root");dataSource.setPassword("Root@123");//设置连接池的一些参数,此处的参数根据数据库设置//否则过大会报错:Error preloading the connection pooldataSource.setInitialSize(5);dataSource.setMaxTotal(10);dataSource.setMinIdle(2);Connection con = null;try {con = dataSource.getConnection();System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>创建连接池成功:" + con);} catch (Exception e) {e.printStackTrace();System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());}return con;}}
5.sink to oracle代码
package com.kfk.flink.datastream.kafkaToMysql;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.apache.commons.dbcp2.BasicDataSource;import org.apache.flink.configuration.Configuration;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.util.List;/*** 程序功能: 数据批量 sink 数据到 oracle* */public class SinkToORACLE extends RichSinkFunction<List<String>> {private PreparedStatement ps;private BasicDataSource dataSource;private Connection connection;/*** open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接*/public void open(Configuration parameters) throws Exception {super.open(parameters);dataSource = new BasicDataSource();connection = getConnection(dataSource);//oracle的使用建议表名以及字段名全部使用大写,以免报错:如表或视图不存在或者无效的列。String sql = "insert into STUDENT(ID,NAME,PASSWORD,AAAA) values(?, ?, ?, ?)";ps = this.connection.prepareStatement(sql);}public void close() throws Exception {if (connection != null) {connection.close();}if (ps != null) {ps.close();}}/*** 每批数据的插入都要调用一次 invoke() 方法*/public void invoke(List<String> value, Context context) throws Exception {//遍历数据集合for (String student : value) {ps.setInt(1, 1);ps.setString(2, student);ps.setString(3, "123456");ps.setInt(4, 18);ps.addBatch();}int[] count = ps.executeBatch(); //批量后执行System.out.println("成功了插入了" + count.length + "行数据");}private static Connection getConnection(BasicDataSource dataSource) {//设置连接池的一些参数dataSource.setInitialSize(10);dataSource.setMaxTotal(50);dataSource.setMinIdle(2);Connection con = null;try {Class.forName("oracle.jdbc.driver.OracleDriver");con = DriverManager.getConnection("jdbc:oracle:thin:@localhost:1521:xe", "hr", "hr");System.out.println("创建连接池:" + con);} catch (Exception e) {e.printStackTrace();System.out.println("-----------oracle get connection has exception , msg = " + e.getMessage());}return con;}}
