From 88de0b5169112f8ea908cc2c048bc34286f8851d Mon Sep 17 00:00:00 2001
From: lin <sbla5888@163.com>
Date: Mon, 26 Feb 2024 13:12:00 +0800
Subject: [PATCH] Merge remote-tracking branch 'origin/master'

---
 src/main/java/org/springblade/binlog/listener/TourBinLogListener.java |  132 +++++++++++++++++++++++++++++++++++++++++++-
 1 files changed, 129 insertions(+), 3 deletions(-)

diff --git a/src/main/java/org/springblade/binlog/listener/TourBinLogListener.java b/src/main/java/org/springblade/binlog/listener/TourBinLogListener.java
index 97c2a37..28139c3 100644
--- a/src/main/java/org/springblade/binlog/listener/TourBinLogListener.java
+++ b/src/main/java/org/springblade/binlog/listener/TourBinLogListener.java
@@ -1,19 +1,24 @@
 package org.springblade.binlog.listener;
 
-
 import cn.hutool.core.collection.CollectionUtil;
 import lombok.extern.slf4j.Slf4j;
+import org.springblade.binlog.client.MysqlClient;
 import org.springblade.binlog.config.DataSourceConfig;
 import org.springblade.binlog.constant.BinLogConstants;
 import org.springblade.binlog.util.BinLogUtils;
+import org.springblade.binlog.vo.BinLogItem;
+import org.springblade.binlog.vo.DataProperty;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.core.annotation.Order;
 import org.springframework.stereotype.Component;
-
 import javax.annotation.Resource;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
+import java.util.Map;
 
 /**
  * 乐游监听器
@@ -58,7 +63,18 @@
 			log.info("注册监听信息,注册DB:" + binLogConstants.getDb() + ",注册表:" + table);
 			try {
 				mysqlBinLogListener.regListener(binLogConstants.getDb(), table, item -> {
-					log.info("监听逻辑处理");
+					if (item.getEventType().name().equals("EXT_WRITE_ROWS")){
+						// 新增处理逻辑
+						saveHandle(item);
+					}
+					if (item.getEventType().name().equals("EXT_UPDATE_ROWS")){
+						// 更新处理逻辑
+						updateHandle(item);
+					}
+					if (item.getEventType().name().equals("EXT_DELETE_ROWS")){
+						// 删除处理逻辑
+						deletedHandle(item);
+					}
 				});
 			} catch (Exception e) {
 				log.error("BinLog监听异常:" + e);
@@ -67,6 +83,116 @@
 		// 多线程消费
 		mysqlBinLogListener.parse();
 	}
+
+	/**
+	 * 新增处理逻辑
+	 * @param item
+	 */
+	private void saveHandle(BinLogItem item) {
+		String tableName = item.getDbTable().split("-")[1];
+		Map<String, Serializable> data = item.getAfter();
+		Map<String, DataProperty> dataProperty = item.getDataProperty();
+		// 创建更新对象
+		List<String> keyList = new ArrayList<>();
+		List<String> valueList = new ArrayList<>();
+		// 遍历匹配数据
+		dataProperty.forEach((key,value)->{
+//			log.info("数据类型 " + value.dataType);
+			if(null!=data.get(key)) {
+				keyList.add(key);
+				if (value.dataType.equals("varchar") ||
+					value.getDataType().equals("char")) {
+					valueList.add("'" + data.get(key).toString() + "'");
+				}else if(value.dataType.equals("text") ||
+					value.dataType.equals("mediumtext")) {
+					byte[] bytes = (byte[])data.get(key);
+					valueList.add("'" + new String(bytes) + "'");
+				}else if(value.dataType.equals("date") ||
+					value.getDataType().equals("datetime")) {
+					Long time = Long.parseLong(data.get(key).toString());
+					valueList.add("'" + BinLogUtils.getDateFormatStr(new Date(time)) + "'");
+				}else {
+					valueList.add(data.get(key).toString());
+				}
+			}
+		});
+		// 拼接sql
+		StringBuilder sqlBuilder = new StringBuilder();
+		sqlBuilder.append("insert into ")
+			.append(tableName)
+			.append("(")
+			.append(String.join(",",keyList))
+			.append(") ")
+			.append(" values (")
+			.append(String.join(",",valueList))
+			.append(");");
+		log.info("sql: " + sqlBuilder.toString());
+		// 同步到其他数据库
+		MysqlClient.insert(sqlBuilder.toString());
+	}
+
+	/**
+	 * 修改处理逻辑
+	 * @param item
+	 */
+	private void updateHandle(BinLogItem item) {
+		String tableName = item.getDbTable().split("-")[1];
+		Map<String, Serializable> data = item.getAfter();
+		Map<String, DataProperty> dataProperty = item.getDataProperty();
+		// 创建更新对象
+		List<String> updateList = new ArrayList<>();
+		// 遍历匹配数据
+		dataProperty.forEach((key,value)->{
+//			log.info("数据类型 " + value.dataType);
+			if(null!=data.get(key) && !key.equals("id")) {
+				if (value.dataType.equals("varchar") ||
+					value.getDataType().equals("char")) {
+					updateList.add(key + " = '" + data.get(key).toString() + "'");
+				}else if(value.dataType.equals("text") ||
+					value.dataType.equals("mediumtext")) {
+					byte[] bytes = (byte[])data.get(key);
+					updateList.add(key + " = '" + new String(bytes) + "'");
+				}else if(value.dataType.equals("date") ||
+					value.getDataType().equals("datetime")) {
+					Long time = Long.parseLong(data.get(key).toString());
+					updateList.add(key + " = '" + BinLogUtils.getDateFormatStr(new Date(time)) + "'");
+				}else {
+					updateList.add(key + " = " + data.get(key).toString());
+				}
+			}
+		});
+		// 拼接sql
+		StringBuilder sqlBuilder = new StringBuilder();
+		sqlBuilder.append("update ")
+			.append(tableName)
+			.append(" set ")
+			.append(String.join(",",updateList))
+			.append(" where id = ")
+			.append(data.get("id"))
+			.append(";");
+		log.info("sql: " + sqlBuilder.toString());
+		// 同步到其他数据库
+		MysqlClient.update(sqlBuilder.toString());
+	}
+
+	/**
+	 * 删除处理逻辑
+	 * @param item 数据
+	 */
+	private void deletedHandle(BinLogItem item) {
+		String tableName = item.getDbTable().split("-")[1];
+		Map<String, Serializable> data = item.getBefore();
+		// 拼接sql
+		StringBuilder sqlBuilder = new StringBuilder();
+		sqlBuilder.append("delete from ")
+			.append(tableName)
+			.append(" where id = ")
+			.append(data.get("id"))
+			.append(";");
+		log.info("sql: " + sqlBuilder.toString());
+		// 同步到其他数据库
+		MysqlClient.delete(sqlBuilder.toString());
+	}
 }
 
 

--
Gitblit v1.9.3