主题
使用说明
自定义实现采集数据加工逻辑。
引入依赖
wueasy-jdbc
:jdbc操作工具包wueasy-etl
:etl基础服务
xml
<dependency>
<groupId>com.wueasy.cloud</groupId>
<artifactId>wueasy-jdbc</artifactId>
<version>最新版本</version>
</dependency>
<dependency>
<groupId>com.wueasy.cloud</groupId>
<artifactId>wueasy-etl</artifactId>
<version>最新版本</version>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
编写加工任务
etl任务需要继承类
com.wueasy.cloud.etl.BaseEtl
,并实现里面的方法
方法说明
void init()
:初始化方法,初始化执行,执行任务前执行,只会调用一次List<DataMap> readData()
:读取数据方法,只会调用一次,如果需要分页,需要在方法内循环处理DataMap handleData(DataMap data)
:处理单条数据方法,对查询出来的每条数据进行转换加工处理,返回处理结果对象void saveData(List<DataMap> dataList)
:保存数据方法,对加工完成的数据进行保存操作void before()
:执行前方法,初始化之后执行void after()
:执行后方法,保存数据后执行void enableMultiThread()
:启动的多线程处理void setBusinessParam(Map<String, String> businessParam)
:保存自定义业务参数(对于需要保存业务参数,可以通过此方式记录)Map<String, String> getBusinessParam()
:获取业务参数
参考示例
java
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import com.wueasy.base.util.DateHelper;
import com.wueasy.base.vo.DataMap;
import com.wueasy.cloud.etl.BaseEtl;
import com.wueasy.cloud.etl.constants.WueasyEtlConstants;
import com.wueasy.cloud.etl.enums.OperationType;
import com.wueasy.cloud.jdbc.JdbcTemplate;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class EtlTest extends BaseEtl {
private JdbcTemplate jdbcTemplate = new JdbcTemplate("test");
private Long minId;
@Override
public void init() throws Exception {
//开启多线程
enableMultiThread();
//查询最大id
minId = jdbcTemplate.queryLong("select min(id) from test2");
}
/**
* 读取数据
*/
@Override
public List<DataMap> readData() throws Exception {
List<DataMap> dataList = getList(minId);
int index = 1;
while(null!=dataList && !dataList.isEmpty()) {
Long newMinId = dataList.get(dataList.size()-1).getLong("id");
if(minId.equals(newMinId) && index>1) {
break;
}
minId = newMinId;
addQueue(dataList);
dataList = getList(minId);
index++;
}
return null;
}
public List<DataMap> getList(long minId){
return jdbcTemplate.queryList("select * from test where id > ? order by id asc limit ? ",new Object[] {minId,10});
}
/**
* 处理数据
*/
@Override
public DataMap handleData(DataMap data) throws Exception {
Long id = data.getLong("id");
DataMap dataMap = new DataMap();
dataMap.set("id", id);
dataMap.set("name", data.getString("name"));
if(check(id)) {
dataMap.set(WueasyEtlConstants.OPERATION_TYPE_COLUMN_NAME, OperationType.EDIT.name());
}else {
dataMap.set(WueasyEtlConstants.OPERATION_TYPE_COLUMN_NAME, OperationType.ADD.name());
}
return dataMap;
}
/**
* 保存数据
*/
@Override
public void saveData(List<DataMap> dataList) throws Exception {
if(null!=dataList && !dataList.isEmpty()) {
List<DataMap> insertList = new ArrayList<>();
List<DataMap> updateList = new ArrayList<>();
for (DataMap data : dataList) {
String type = data.getString(WueasyEtlConstants.OPERATION_TYPE_COLUMN_NAME);
data.remove(WueasyEtlConstants.OPERATION_TYPE_COLUMN_NAME);
if(OperationType.ADD.name().equals(type)) {
insertList.add(data);
}else if(OperationType.EDIT.name().equals(type)) {
updateList.add(data);
}
}
if(!insertList.isEmpty()) {
insertList.forEach(item->{
item.set("create_date", new Date());
item.set("update_date", new Date());
item.set("update_time", DateHelper.getMicrosecond());
});
jdbcTemplate.batchInsert("test2", insertList);
}
if(!updateList.isEmpty()) {
updateList.forEach(item->{
item.set("update_date", new Date());
item.set("update_time", DateHelper.getMicrosecond());
});
jdbcTemplate.batchUpdate("test2", updateList,Arrays.asList("id"));
}
if(log.isInfoEnabled()) {
log.info("新增{}条数据,更新{}条数据",insertList.size(),updateList.size());
}
}
}
public boolean check(Long id) {
long count = jdbcTemplate.queryCount("select id from test2 where id = ?", new Object[] {id});
return count>0;
}
public static void main(String[] args) throws Exception {
BaseEtl etl = new EtlTest();
etl.execute(args);
}
}