使用Kettle抽取Kafka消息并插入数据库
使用Kettle抽取Kafka消息并插入数据库
1、前提条件
需部署kafka、安装Kettle、准备一个json文件
2、通过Kafka Producer上传数据
首先新建一个转换
创建JSON input 点击浏览文件 再点击增加到选中的文件
选择字段页签,点击select fields获取需要转换的字段
在Streaming中找到的Kafkaproducer
在Bootstrap server中输入Kafka的ip+端口
Topic必填(已经创建好了)
Message field必填(和jsoninput字段对应)
创建好连接json input 和kafkaproducer
3、从Kafka消息队列拉取并转换
首先单独新建一个转换,这个转换要做的是从流中读取数据
在Streaming中找到get recordsfrom stream
输入下列字段,这些字段是Kafka consumer拉取下来中流里的字段
接下来新建Kafka到数据库的转换
创建一个kafka consumer
Transformation中选择前面创建的流读取的转换
Topics输入与producer一致的topic
Consumer group必填 值任意
创建json input
字段选择输入名称(数据库字段名称)
路径为在json中的路径
Ps:数组需带上[*]
最后创建表输出
新建数据库连接 根据需要插入的数据库类型创建 不再详细描述
选择目标表
输入记录提交数量(如果拉取数量小于提交数量则需拉取数据至提交数量才会提交,所以一开始数据库会查不到数据)
选中指定数据库字段
数据库字段 不再详细描述
最后结果图
4、运行转换
首选运行的是有kafka consumer的转换,点击运行,会等待消息上传
再运行有kafka producer的转换,当显示转换完成则数据上传成功
此时回到有kafka consumer的转换会看到日志输出已拉取的数据 但是数据库并没有数据
因为设置的提交数量是10条 需要提交的话就停止转换 日志会显示表输出已插入两条数据