package org.springblade.binlog.listener;
|
|
|
import cn.hutool.core.collection.CollectionUtil;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springblade.binlog.config.DataSourceConfig;
|
import org.springblade.binlog.constant.BinLogConstants;
|
import org.springblade.binlog.util.BinLogUtils;
|
import org.springblade.binlog.vo.BinLogItem;
|
import org.springblade.binlog.vo.DataProperty;
|
import org.springframework.boot.CommandLineRunner;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
import org.springframework.context.annotation.Configuration;
|
import org.springframework.core.annotation.Order;
|
import org.springframework.stereotype.Component;
|
import javax.annotation.Resource;
|
import java.io.Serializable;
|
import java.util.ArrayList;
|
import java.util.List;
|
import java.util.Map;
|
|
/**
|
* 乐游监听器
|
* SpringBoot启动成功后的执行业务线程操作
|
* CommandLineRunner去实现此操作
|
* 在有多个可被执行的业务时,通过使用 @Order 注解,设置各个线程的启动顺序(value值由小到大表示启动顺序)。
|
* 多个实现CommandLineRunner接口的类必须要设置启动顺序,不让程序启动会报错!
|
*
|
* @author zrj
|
* @since 2021/7/27
|
**/
|
@Slf4j
|
@Component
|
@Order(value = 1)
|
@Configuration(proxyBeanMethods = false)
|
@ConditionalOnProperty(value = "binlog.enabled")
|
public class TourBinLogListener implements CommandLineRunner {
|
|
@Resource
|
private BinLogConstants binLogConstants;
|
|
@Override
|
public void run(String... args) throws Exception {
|
log.info("初始化配置信息:" + binLogConstants.toString());
|
|
// 初始化配置信息
|
DataSourceConfig conf = new DataSourceConfig(binLogConstants.getHost(),
|
binLogConstants.getPort(),
|
binLogConstants.getUsername(),
|
binLogConstants.getPassword());
|
|
// 初始化监听器
|
MysqlBinLogListener mysqlBinLogListener = new MysqlBinLogListener(conf);
|
|
// 获取table集合
|
List<String> tableList = BinLogUtils.getListByStr(binLogConstants.getTable());
|
if (CollectionUtil.isEmpty(tableList)) {
|
return;
|
}
|
// 注册监听
|
tableList.forEach(table -> {
|
log.info("注册监听信息,注册DB:" + binLogConstants.getDb() + ",注册表:" + table);
|
try {
|
mysqlBinLogListener.regListener(binLogConstants.getDb(), table, item -> {
|
if (item.getEventType().name().equals("EXT_WRITE_ROWS")){
|
// 新增处理逻辑
|
saveHandle(item);
|
}
|
if (item.getEventType().name().equals("EXT_UPDATE_ROWS")){
|
// 更新处理逻辑
|
updateHandle(item);
|
}
|
if (item.getEventType().name().equals("EXT_DELETE_ROWS")){
|
// 删除处理逻辑
|
deletedHandle(item);
|
}
|
});
|
} catch (Exception e) {
|
log.error("BinLog监听异常:" + e);
|
}
|
});
|
// 多线程消费
|
mysqlBinLogListener.parse();
|
}
|
|
/**
|
* 新增处理逻辑
|
* @param item
|
*/
|
private void saveHandle(BinLogItem item) {
|
String tableName = item.getDbTable().split("-")[1];
|
Map<String, Serializable> data = item.getAfter();
|
log.info(tableName);
|
Map<String, DataProperty> dataProperty = item.getDataProperty();
|
|
List<String> keyList = new ArrayList<>();
|
List<String> valueList = new ArrayList<>();
|
dataProperty.forEach((key,value)->{
|
log.info(key);
|
log.info(value.dataType);
|
if(null!=data.get(key)) {
|
log.info(data.get(key).toString());
|
valueList.add(data.get(key).toString());
|
}else {
|
if (value.dataType.equals("int") ||
|
value.dataType.equals("bigint") ||
|
value.dataType.equals("datetime") ||
|
value.dataType.equals("date")){
|
valueList.add(null);
|
}else{
|
valueList.add("");
|
}
|
}
|
keyList.add(key);
|
});
|
// 拼接sql
|
StringBuilder sqlBuilder = new StringBuilder();
|
sqlBuilder.append("insert into ")
|
.append(tableName)
|
.append("(")
|
.append(String.join(",",keyList))
|
.append(") ")
|
.append(" values (")
|
.append(String.join(",",valueList))
|
.append(");");
|
log.info("sql: " + sqlBuilder.toString());
|
}
|
|
/**
|
* 修改处理逻辑
|
* @param item
|
*/
|
private void updateHandle(BinLogItem item) {
|
}
|
|
/**
|
* 删除处理逻辑
|
* @param item 数据
|
*/
|
private void deletedHandle(BinLogItem item) {
|
}
|
}
|