package org.springblade.binlog.listener; import cn.hutool.core.collection.CollectionUtil; import lombok.extern.slf4j.Slf4j; import org.springblade.binlog.client.MysqlClient; import org.springblade.binlog.config.DataSourceConfig; import org.springblade.binlog.constant.BinLogConstants; import org.springblade.binlog.util.BinLogUtils; import org.springblade.binlog.vo.BinLogItem; import org.springblade.binlog.vo.DataProperty; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Configuration; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.io.Serializable; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; /** * 乐游监听器 * SpringBoot启动成功后的执行业务线程操作 * CommandLineRunner去实现此操作 * 在有多个可被执行的业务时,通过使用 @Order 注解,设置各个线程的启动顺序(value值由小到大表示启动顺序)。 * 多个实现CommandLineRunner接口的类必须要设置启动顺序,不让程序启动会报错! * * @author zrj * @since 2021/7/27 **/ @Slf4j @Component @Order(value = 1) @Configuration(proxyBeanMethods = false) @ConditionalOnProperty(value = "binlog.enabled") public class TourBinLogListener implements CommandLineRunner { @Resource private BinLogConstants binLogConstants; @Override public void run(String... args) throws Exception { log.info("初始化配置信息:" + binLogConstants.toString()); // 初始化配置信息 DataSourceConfig conf = new DataSourceConfig(binLogConstants.getHost(), binLogConstants.getPort(), binLogConstants.getUsername(), binLogConstants.getPassword()); // 初始化监听器 MysqlBinLogListener mysqlBinLogListener = new MysqlBinLogListener(conf); // 获取table集合 List tableList = BinLogUtils.getListByStr(binLogConstants.getTable()); if (CollectionUtil.isEmpty(tableList)) { return; } // 注册监听 tableList.forEach(table -> { log.info("注册监听信息,注册DB:" + binLogConstants.getDb() + ",注册表:" + table); try { mysqlBinLogListener.regListener(binLogConstants.getDb(), table, item -> { if (item.getEventType().name().equals("EXT_WRITE_ROWS")){ // 新增处理逻辑 saveHandle(item); } if (item.getEventType().name().equals("EXT_UPDATE_ROWS")){ // 更新处理逻辑 updateHandle(item); } if (item.getEventType().name().equals("EXT_DELETE_ROWS")){ // 删除处理逻辑 deletedHandle(item); } }); } catch (Exception e) { log.error("BinLog监听异常:" + e); } }); // 多线程消费 mysqlBinLogListener.parse(); } /** * 新增处理逻辑 * @param item */ private void saveHandle(BinLogItem item) { String tableName = item.getDbTable().split("-")[1]; Map data = item.getAfter(); Map dataProperty = item.getDataProperty(); // 创建更新对象 List keyList = new ArrayList<>(); List valueList = new ArrayList<>(); // 遍历匹配数据 dataProperty.forEach((key,value)->{ // log.info("数据类型 " + value.dataType); if(null!=data.get(key)) { keyList.add(key); if (value.dataType.equals("varchar") || value.getDataType().equals("char")) { valueList.add("'" + data.get(key).toString() + "'"); }else if(value.dataType.equals("text") || value.dataType.equals("mediumtext") || value.dataType.equals("longblob")) { byte[] bytes = (byte[])data.get(key); valueList.add("'" + new String(bytes) + "'"); }else if (value.dataType.equals("geometry")) { // 该类型设置空,解析异常-尚未解决 valueList.add("'" + null + "'"); }else if(value.dataType.equals("date") || value.getDataType().equals("datetime")) { Long time = Long.parseLong(data.get(key).toString()); valueList.add("'" + BinLogUtils.getDateFormatStr(new Date(time)) + "'"); }else { valueList.add(data.get(key).toString()); } } }); // 拼接sql StringBuilder sqlBuilder = new StringBuilder(); sqlBuilder.append("insert into ") .append(tableName) .append("(") .append(String.join(",",keyList)) .append(") ") .append(" values (") .append(String.join(",",valueList)) .append(");"); log.info("sql: " + sqlBuilder.toString()); // 同步到其他数据库 MysqlClient.insert(sqlBuilder.toString()); } /** * 修改处理逻辑 * @param item */ private void updateHandle(BinLogItem item) { String tableName = item.getDbTable().split("-")[1]; Map data = item.getAfter(); Map dataProperty = item.getDataProperty(); // 创建更新对象 List updateList = new ArrayList<>(); // 遍历匹配数据 dataProperty.forEach((key,value)->{ // log.info("数据类型 " + value.dataType); if(null!=data.get(key) && !key.equals("id")) { if (value.dataType.equals("varchar") || value.getDataType().equals("char")) { updateList.add(key + " = '" + data.get(key).toString() + "'"); }else if(value.dataType.equals("text") || value.dataType.equals("mediumtext")) { byte[] bytes = (byte[])data.get(key); updateList.add(key + " = '" + new String(bytes) + "'"); }else if(value.dataType.equals("geometry")) { // 该类型不操作 }else if(value.dataType.equals("date") || value.getDataType().equals("datetime")) { Long time = Long.parseLong(data.get(key).toString()); updateList.add(key + " = '" + BinLogUtils.getDateFormatStr(new Date(time)) + "'"); }else { updateList.add(key + " = " + data.get(key).toString()); } } }); // 拼接sql StringBuilder sqlBuilder = new StringBuilder(); sqlBuilder.append("update ") .append(tableName) .append(" set ") .append(String.join(",",updateList)) .append(" where id = ") .append(data.get("id")) .append(";"); log.info("sql: " + sqlBuilder.toString()); // 同步到其他数据库 MysqlClient.update(sqlBuilder.toString()); } /** * 删除处理逻辑 * @param item 数据 */ private void deletedHandle(BinLogItem item) { String tableName = item.getDbTable().split("-")[1]; Map data = item.getBefore(); // 拼接sql StringBuilder sqlBuilder = new StringBuilder(); sqlBuilder.append("delete from ") .append(tableName) .append(" where id = ") .append(data.get("id")) .append(";"); log.info("sql: " + sqlBuilder.toString()); // 同步到其他数据库 MysqlClient.delete(sqlBuilder.toString()); } }