源码分析 – JAVA 通过 kettle 官方api创建Carte集群
项目中数据采集用到 kettle 进行数据的抽取工作,受限于单节点执行效率等方面问题,随即研究了一下kettle中 carte 集群。
注:在此推一下本人二次开发并完全开源的 Kettle 的在线数据采集平台
1、传统 Carte 集群创建
回到正文,先来了解一下传统的kettle carte集群创建。Carte是一个轻量级的web服务,允许远程请求HTTP进行监控、启动、停止在Carte服务上运行的job和trans。运行Carte的服务器在kettle术语里称为slave server。
Kettle的下载安装就不再赘述,carte的配置文件主要用来配置端口、安全认证等。比如配置文件pwd/carte-config-master.xml,配置项:
port:绑定的端口号
hostname: 绑定的IP
username/password:认证用户
启动命令:nohup ./carte.sh pwd/carte-config-master.xml2>&1 &
在carte上跑的任务的日志统一输出到了carte日志里,不能为每个任务设置日志。
0x03 调用
远程提交job/trans时,同样支持资料库提交任务,carte服务器上需要提前配置好资料库。执行job的api是kettle/executeJob,通过curl提交命令:
curl -u "cluster:cluster" "http://dev-bi-cdh05:9080/kettle/executeJob/?rep=rep-test01&job=/test01_job&P1=test"
-u :指定用户名和密码
rep参数:指定配置的资料库
job参数:执行的job
P1参数:kettle的参数变量P1的值
访问web url可以看每个任务的运行状态、查看任务的运行日志:
2、Java 调用官方api创建kettle carte集群
2.1、项目引入官方依赖
<!-- kettle核心依赖 -->
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-core</artifactId>
<version>8.3.0.0-371</version>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-engine</artifactId>
<version>8.3.0.0-371</version>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-dbdialog</artifactId>
<version>8.3.0.0-371</version>
</dependency>
<dependency>
<groupId>org.pentaho.di.plugins</groupId>
<artifactId>kettle-sap-plugin-core</artifactId>
<version>8.3.0.0-371</version>
</dependency>
2.2、调用创建 carte
public static void main(String[] args) throws Exception {
log.info("创建slave Server");
SlaveServer slaveServer = new SlaveServer("kettle", "127.0.0.1", "9080", "user", "password");
log.info("创建slave Config");
SlaveServerConfig slaveServerConfig = new SlaveServerConfig(slaveServer);
Carte.runCarte(slaveServerConfig);
}
源码解析:
2、调用SlaveServerConfig,将slave服务器信息创建。
3、通过Carte中的runCarte静态方法启动。
官方底层源码解析:
1、SlaveServer.class #服务器基本信息
构造方法
#1、采用默认方式创建slave配置,默认端口9080
SlaveServer();
#2、指定名称,IP,端口,登录账号及密码,默认非master服务器
SlaveServer( String name, String hostname, String port, String username, String password );
#3、在【2】的基础上增加代理配置、主从服务器申明
SlaveServer( String name, String hostname, String port, String username, String password,
String proxyHostname, String proxyPort, String nonProxyHosts, boolean master );
#4、在【3】的基础上,增加ssl方式连接
SlaveServer( String name, String hostname, String port, String username, String password,
String proxyHostname, String proxyPort, String nonProxyHosts, boolean master, boolean ssl );
#5、传递xml格式创建slave服务器,默认调用【1】方法实现
SlaveServer( Node slaveNode );
2、SlaveServerConfig.class
3、Carte.class
启动Carte服务的类,通过前面创建好的slaveServerConfig信息,启动一个jetty的服务,监听carte调度请求。
//注:此处有一个较坑的地方,org.pentaho.di.www.cache.CarteStatusCache类实现的是org.hibernate.cache.Cache接口,调用的是hibernate.core的3.6.9版本的包,Spirng4以后就不支持了,如果项目中引用了最新的spring-data-jpa的话,会导致版本冲突问题,Cache类报找不到的问题
public static void runCarte( SlaveServerConfig config ) throws Exception {
KettleLogStore.init( config.getMaxLogLines(), config.getMaxLogTimeoutMinutes() );
config.setJoining( true );
Carte carte = new Carte( config, false );
CarteSingleton.setCarte( carte );
carte.getWebServer().join();
}