vlambda博客
学习文章列表

源码分析 – JAVA 通过 kettle 官方api创建Carte集群

    项目中数据采集用到 kettle 进行数据的抽取工作,受限于单节点执行效率等方面问题,随即研究了一下kettle中 carte 集群。

注:在此推一下本人二次开发并完全开源的 Kettle 的在线数据采集平台

1、传统 Carte 集群创建

回到正文,先来了解一下传统的kettle carte集群创建。Carte是一个轻量级的web服务,允许远程请求HTTP进行监控、启动、停止在Carte服务上运行的job和trans。运行Carte的服务器在kettle术语里称为slave server。

Kettle的下载安装就不再赘述,carte的配置文件主要用来配置端口、安全认证等。比如配置文件pwd/carte-config-master.xml,配置项:

port:绑定的端口号

hostname: 绑定的IP

username/password:认证用户

启动命令:nohup ./carte.sh pwd/carte-config-master.xml2>&1 &

在carte上跑的任务的日志统一输出到了carte日志里,不能为每个任务设置日志。

0x03 调用

远程提交job/trans时,同样支持资料库提交任务,carte服务器上需要提前配置好资料库。执行job的api是kettle/executeJob,通过curl提交命令:

curl -u "cluster:cluster" "http://dev-bi-cdh05:9080/kettle/executeJob/?rep=rep-test01&job=/test01_job&P1=test"

-u :指定用户名和密码

rep参数:指定配置的资料库

job参数:执行的job

P1参数:kettle的参数变量P1的值

访问web url可以看每个任务的运行状态、查看任务的运行日志:

2、Java 调用官方api创建kettle carte集群

2.1、项目引入官方依赖

 <!-- kettle核心依赖 -->
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-core</artifactId>
<version>8.3.0.0-371</version>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-engine</artifactId>
<version>8.3.0.0-371</version>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-dbdialog</artifactId>
<version>8.3.0.0-371</version>
</dependency>
<dependency>
<groupId>org.pentaho.di.plugins</groupId>
<artifactId>kettle-sap-plugin-core</artifactId>
<version>8.3.0.0-371</version>
</dependency>

2.2、调用创建 carte

public static void main(String[] args) throws Exception {
log.info("创建slave Server");
SlaveServer slaveServer = new SlaveServer("kettle", "127.0.0.1", "9080", "user", "password");
log.info("创建slave Config");
SlaveServerConfig slaveServerConfig = new SlaveServerConfig(slaveServer);
Carte.runCarte(slaveServerConfig);
}

源码解析:

2、调用SlaveServerConfig,将slave服务器信息创建。

3、通过Carte中的runCarte静态方法启动。

官方底层源码解析:

1、SlaveServer.class #服务器基本信息

构造方法

#1、采用默认方式创建slave配置,默认端口9080
SlaveServer();

#2、指定名称,IP,端口,登录账号及密码,默认非master服务器
SlaveServer( String name, String hostname, String port, String username, String password );

#3、在【2】的基础上增加代理配置、主从服务器申明
SlaveServer( String name, String hostname, String port, String username, String password,
String proxyHostname, String proxyPort, String nonProxyHosts, boolean master );

#4、在【3】的基础上,增加ssl方式连接
SlaveServer( String name, String hostname, String port, String username, String password,
String proxyHostname, String proxyPort, String nonProxyHosts, boolean master, boolean ssl );

#5、传递xml格式创建slave服务器,默认调用【1】方法实现
SlaveServer( Node slaveNode );

2、SlaveServerConfig.class


3、Carte.class

启动Carte服务的类,通过前面创建好的slaveServerConfig信息,启动一个jetty的服务,监听carte调度请求。

//注:此处有一个较坑的地方,org.pentaho.di.www.cache.CarteStatusCache类实现的是org.hibernate.cache.Cache接口,调用的是hibernate.core的3.6.9版本的包,Spirng4以后就不支持了,如果项目中引用了最新的spring-data-jpa的话,会导致版本冲突问题,Cache类报找不到的问题
public static void runCarte( SlaveServerConfig config ) throws Exception {
KettleLogStore.init( config.getMaxLogLines(), config.getMaxLogTimeoutMinutes() );

config.setJoining( true );

Carte carte = new Carte( config, false );
CarteSingleton.setCarte( carte );

carte.getWebServer().join();
}