| | |
| | | import org.springblade.binlog.config.DataSourceConfig; |
| | | import org.springblade.binlog.constant.BinLogConstants; |
| | | import org.springblade.binlog.util.BinLogUtils; |
| | | import org.springblade.binlog.vo.BinLogItem; |
| | | import org.springblade.binlog.vo.DataProperty; |
| | | import org.springframework.boot.CommandLineRunner; |
| | | import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; |
| | | import org.springframework.context.annotation.Configuration; |
| | | import org.springframework.core.annotation.Order; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import javax.annotation.Resource; |
| | | import java.io.Serializable; |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | |
| | | /** |
| | | * 乐游监听器 |
| | |
| | | log.info("注册监听信息,注册DB:" + binLogConstants.getDb() + ",注册表:" + table); |
| | | try { |
| | | mysqlBinLogListener.regListener(binLogConstants.getDb(), table, item -> { |
| | | log.info("监听逻辑处理"); |
| | | if (item.getEventType().name().equals("EXT_WRITE_ROWS")){ |
| | | // 新增处理逻辑 |
| | | saveHandle(item); |
| | | } |
| | | if (item.getEventType().name().equals("EXT_UPDATE_ROWS")){ |
| | | // 更新处理逻辑 |
| | | updateHandle(item); |
| | | } |
| | | if (item.getEventType().name().equals("EXT_DELETE_ROWS")){ |
| | | // 删除处理逻辑 |
| | | deletedHandle(item); |
| | | } |
| | | }); |
| | | } catch (Exception e) { |
| | | log.error("BinLog监听异常:" + e); |
| | |
| | | // 多线程消费 |
| | | mysqlBinLogListener.parse(); |
| | | } |
| | | |
| | | /** |
| | | * 新增处理逻辑 |
| | | * @param item |
| | | */ |
| | | private void saveHandle(BinLogItem item) { |
| | | String tableName = item.getDbTable().split("-")[1]; |
| | | Map<String, Serializable> data = item.getAfter(); |
| | | log.info(tableName); |
| | | Map<String, DataProperty> dataProperty = item.getDataProperty(); |
| | | |
| | | List<String> keyList = new ArrayList<>(); |
| | | List<String> valueList = new ArrayList<>(); |
| | | dataProperty.forEach((key,value)->{ |
| | | log.info(key); |
| | | log.info(value.dataType); |
| | | if(null!=data.get(key)) { |
| | | log.info(data.get(key).toString()); |
| | | valueList.add(data.get(key).toString()); |
| | | }else { |
| | | if (value.dataType.equals("int") || |
| | | value.dataType.equals("bigint") || |
| | | value.dataType.equals("datetime") || |
| | | value.dataType.equals("date")){ |
| | | valueList.add(null); |
| | | }else{ |
| | | valueList.add(""); |
| | | } |
| | | } |
| | | keyList.add(key); |
| | | }); |
| | | // 拼接sql |
| | | StringBuilder sqlBuilder = new StringBuilder(); |
| | | sqlBuilder.append("insert into ") |
| | | .append(tableName) |
| | | .append("(") |
| | | .append(String.join(",",keyList)) |
| | | .append(") ") |
| | | .append(" values (") |
| | | .append(String.join(",",valueList)) |
| | | .append(");"); |
| | | log.info("sql: " + sqlBuilder.toString()); |
| | | } |
| | | |
| | | /** |
| | | * 修改处理逻辑 |
| | | * @param item |
| | | */ |
| | | private void updateHandle(BinLogItem item) { |
| | | } |
| | | |
| | | /** |
| | | * 删除处理逻辑 |
| | | * @param item 数据 |
| | | */ |
| | | private void deletedHandle(BinLogItem item) { |
| | | } |
| | | } |
| | | |
| | | |