vlambda博客
学习文章列表

使用Kettle抽取Kafka消息并插入数据库

使用Kettle抽取Kafka消息并插入数据库

1、前提条件

需部署kafka、安装Kettle、准备一个json文件

2、通过Kafka Producer上传数据

首先新建一个转换

创建JSON input 点击浏览文件 再点击增加到选中的文件

使用Kettle抽取Kafka消息并插入数据库

选择字段页签,点击select fields获取需要转换的字段

使用Kettle抽取Kafka消息并插入数据库

在Streaming中找到的Kafkaproducer

在Bootstrap server中输入Kafka的ip+端口

Topic必填(已经创建好了)

Message field必填(和jsoninput字段对应)

使用Kettle抽取Kafka消息并插入数据库

创建好连接json input 和kafkaproducer

使用Kettle抽取Kafka消息并插入数据库

3、从Kafka消息队列拉取并转换

首先单独新建一个转换,这个转换要做的是从流中读取数据

在Streaming中找到get recordsfrom stream

输入下列字段,这些字段是Kafka consumer拉取下来中流里的字段

使用Kettle抽取Kafka消息并插入数据库

接下来新建Kafka到数据库的转换

创建一个kafka consumer

Transformation中选择前面创建的流读取的转换

Topics输入与producer一致的topic

Consumer group必填 值任意

使用Kettle抽取Kafka消息并插入数据库

创建json input

字段选择输入名称(数据库字段名称)

路径为在json中的路径

Ps:数组需带上[*]

使用Kettle抽取Kafka消息并插入数据库

最后创建表输出

新建数据库连接 根据需要插入的数据库类型创建 不再详细描述

使用Kettle抽取Kafka消息并插入数据库

选择目标表

输入记录提交数量(如果拉取数量小于提交数量则需拉取数据至提交数量才会提交,所以一开始数据库会查不到数据)

选中指定数据库字段

使用Kettle抽取Kafka消息并插入数据库

数据库字段 不再详细描述

使用Kettle抽取Kafka消息并插入数据库

最后结果图

使用Kettle抽取Kafka消息并插入数据库

4、运行转换

首选运行的是有kafka consumer的转换,点击运行,会等待消息上传

使用Kettle抽取Kafka消息并插入数据库

再运行有kafka producer的转换,当显示转换完成则数据上传成功

使用Kettle抽取Kafka消息并插入数据库

此时回到有kafka consumer的转换会看到日志输出已拉取的数据 但是数据库并没有数据

因为设置的提交数量是10条 需要提交的话就停止转换 日志会显示表输出已插入两条数据