<span>Kettle 分布式集群环境实践</span>
1. 环境搭建
1.1 资源规划
本次搭建一主两从分布式Kettle环境。资源规划情况如下表
ip | 角色 |
---|---|
192.168.21.107 | Master |
192.168.21.108 | Slave1 |
192.168.21.109 | Slave2 |
1.2 软件环境
- JDK 1.8
- Cent os 7
- PDI 8.2.0.0-342(Kettle版本) PDI 8.2.0.0-342 https://pan.baidu.com/s/1krPu8myZDL4kdzCfk6_U_Q 提取码 pcqf
1.3 解压Kettle
unzip pdi-ce-8.2.0.0-342.zip -d /usr/local/
1.4 集群配置
修改配置文件及所在的目录
carte-config-master-8080.xml是主节点需要修改的配置文件. carte-config-808x.xml是从节点需要修改的配置文件,需要多少个从节点就修改多少个。 Master节点上 修改carte-config-master-8080.xml的内容为
<slaveserver> <name>master</name> <hostname>192.168.21.107</hostname> <port>8080</port> <master>Y</master> </slaveserver>
参数名称:
- name:节点名称
- hostname:本机ip或主机名
- port:端口号
- master:Y为本节点是主节点,N为本节点是从节点
salve1节点
修改carte-config-8081.xml
<masters> <slaveserver> <name>master</name> <hostname>192.168.21.107</hostname> <port>8080</port> <username>cluster</username> <password>cluster</password> <master>Y</master> </slaveserver> </masters> <report_to_masters>Y</report_to_masters> <slaveserver> <name>slave1</name> <hostname>192.168.21.108</hostname> <port>8081</port> <username>cluster</username> <password>cluster</password> <master>N</master> </slaveserver>
slave2节点
修改carte-config-8082.xml
<masters> <slaveserver> <name>master</name> <hostname>192.168.21.107</hostname> <port>8080</port> <username>cluster</username> <password>cluster</password> <master>Y</master> </slaveserver> </masters> <report_to_masters>Y</report_to_masters> <slaveserver> <name>slave2</name> <hostname>192.168.21.109</hostname> <port>8082</port> <username>cluster</username> <password>cluster</password> <master>N</master> </slaveserver>
1.5 分发
将master上配置的信息分发到各集群中
scp -r /usr/local/data-integration root@192.168.21.107:/usr/local
1.6 下载MySQL驱动包
将mysql驱动包放到data-integration/lib 目录下,后续如果需要测试其他数据库,也应存放到该目录下,主从节点都需要。
mv mysql-connector-java-5.1.47.jar /usr/local/data-integration/lib/
1.7 启动各个节点
-
启动master
cd /usr/local/data-integration/ ./carte.sh 192.168.21.107 8080
-
启动slave1
cd /usr/local/data-integration/ ./carte.sh 192.168.21.108 8081
- 启动slave2
cd /usr/local/data-integration/
./carte.sh 192.168.21.109 8082
2. Kettle 图形化方式使用集群模式
2.1 启动Kettle 图形化工具
Kettle 图形化工具安装在192.168.21.105节点上,与主从环境进行区分。 启动kettle图形化
cd /data/software/data-integration/
./spoon.sh
2.2 在spoon中新建一个转换
在新建的转换中选择主对象树--->子服务器--->新建子服务器。 主节点
从节点1 从节点2
注:填写的信息要跟服务器上配置的XML文件要一致,master子服务器填写master中配置文件的信息,slave则填写相应的slave信息。
2.2 配置schema
在主对象树中选择Kettle集群schemas,新建一个schema,在弹出的对话框中,选择子服务器。
2.3 测试
选择核心对象--->输入--->表输入,配置相应的数据库连接信息及抽取字段信息。
同理,在核心对象,输出中配置相应的输出。此处测试使用表输出。 在表输出处,右键,选择集群,选择刚才配置的集群名称 选择完成后,可以看到C*2,表示有两个slave进行任务处理。(主节点只负责接受客户端请求及收集处理结果)
点击启动 在Run configuration中选择集群,如图,c1,点击启动即可。
3. java提交任务到集群运行
3.1 创建maven工程
POM依赖
<properties> <kettle.version>8.2.0.0-342</kettle.version> <jersey.version>2.25.1</jersey.version> </properties> <dependencies> <dependency> <groupId>pentaho-kettle</groupId> <artifactId>kettle-core</artifactId> <version>${kettle.version}</version> </dependency> <dependency> <groupId>pentaho-kettle</groupId> <artifactId>kettle-engine</artifactId> <version>${kettle.version}</version> </dependency> <dependency> <groupId>pentaho-kettle</groupId> <artifactId>metastore</artifactId> <version>${kettle.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.26</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.26</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-vfs2</artifactId> <version>2.1</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>20.0</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.2</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.2</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.13</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpcore</artifactId> <version>4.4.13</version> </dependency> <!--MySQL--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.46</version> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>4.5.6</version> </dependency> <!--encoders--> <dependency> <groupId>org.owasp.encoder</groupId> <artifactId>encoder</artifactId> <version>1.2.1</version> </dependency> <!--apache cli--> <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <version>1.2</version> </dependency> <!--lombok--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.16</version> </dependency> </dependencies>
对于kettle相关的包,需要本地maven install 安装。
将图形化生成的ktr文件存放至resource文件目录下。
log4j.properties配置文件
log4j.rootLogger=DEBUG,stdout
#log4j.rootLogger=DEBUG,stdout,file
log4j.additivity.org.apache=true
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.threshold=INFO
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-5p %c{1}:%L - %m%n
# 将日志写入到指定文件中
#log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
#log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.DatePattern='.'yyyy-MM-dd-HH-mm
#log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
#log4j.appender.file.Threshold=INFO
#log4j.appender.file.append=true
Kettle集群模式调用代码
KettleEnvironment.init();
//加载ktr文件
String fileName = "diner_test.ktr";
TransMeta transMeta = new TransMeta(fileName);
//设置执行模式
TransExecutionConfiguration config = new TransExecutionConfiguration();
//设置执行模式
config.setExecutingClustered(true);
config.setExecutingLocally(false);
config.setExecutingRemotely(false);
config.setClusterPosting(true);
config.setClusterPreparing(true);
config.setClusterStarting(true);
config.setLogLevel(LogLevel.DEBUG);
TransSplitter transSplitter = Trans.executeClustered(transMeta, config);
// 配置clusterMonitor
Object clusterMonitor = new Object();
long nrErrors = Trans.monitorClusteredTransformation(new LogChannel(clusterMonitor),transSplitter, null, 1);
System.out.println(nrErrors);
System.out.println(transSplitter.getCarteObjectMap());
System.out.println(transSplitter.getMaster());
System.out.println(transSplitter.getSlaves()[0]);
System.out.println(transSplitter.getSlaves()[1].getStepNames()[0]);