| src/main/java/org/springblade/binlog/client/MysqlClient.java | ●●●●● patch | view | raw | blame | history | |
| src/main/java/org/springblade/binlog/constant/BinLogConstants.java | ●●●●● patch | view | raw | blame | history | |
| src/main/java/org/springblade/binlog/listener/TourBinLogListener.java | ●●●●● patch | view | raw | blame | history | |
| src/main/java/org/springblade/binlog/util/BinLogUtils.java | ●●●●● patch | view | raw | blame | history | |
| src/main/resources/application-dev.yml | ●●●●● patch | view | raw | blame | history | |
| src/main/resources/application-prod.yml | ●●●●● patch | view | raw | blame | history | |
| src/main/resources/application-test.yml | ●●●●● patch | view | raw | blame | history | |
| src/main/resources/application.yml | ●●●●● patch | view | raw | blame | history |
src/main/java/org/springblade/binlog/client/MysqlClient.java
New file @@ -0,0 +1,102 @@ package org.springblade.binlog.client; import lombok.extern.slf4j.Slf4j; import org.springblade.binlog.constant.BinLogConstants; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; /** * mysql 客户端连接 */ @Slf4j @Component public class MysqlClient { //声明对象 private static MysqlClient mysqlClient; @Resource private BinLogConstants binLogConstants; /** * 初始化 */ @PostConstruct public void init(){ mysqlClient = this; mysqlClient.binLogConstants = this.binLogConstants; } /** * sql 连接 * @param sql */ public static void sqlConnect(String sql,Integer type){ String driver = "com.mysql.cj.jdbc.Driver"; String url = mysqlClient.binLogConstants.getFromUrl(); String user = mysqlClient.binLogConstants.getFromUsername(); String password = mysqlClient.binLogConstants.getFromPassword(); Connection conn = null; PreparedStatement ps = null; try { Class.forName ( driver ); conn = (Connection) DriverManager.getConnection ( url, user, password ); if (!conn.isClosed ()) { log.info( "数据库连接成功!" ); ps = conn.prepareStatement ( sql ); //判断是否为修改,删除 if (type==1){ //修改删除 ps.executeUpdate(); log.info( "数据已发送成功!" ); }else { //新增 ps.execute(); log.info( "数据已发送成功!" ); } } } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (SQLException e) { e.printStackTrace (); }finally { try { ps.close(); conn.close(); } catch (SQLException throwables) { throwables.printStackTrace(); } } } /** * 连接mysql数据库 新增 * @param sql */ public static void insert(String sql) { sqlConnect(sql,2); } /** * 连接mysql数据库 修改 * @param sql */ public static void update(String sql) { sqlConnect(sql,1); } /** * 连接mysql数据库 删除 * @param sql */ public static void delete(String sql) { sqlConnect(sql,1); } } src/main/java/org/springblade/binlog/constant/BinLogConstants.java
@@ -31,6 +31,15 @@ @Value("${binlog.table}") private String table; @Value("${binlog.from.datasource.url}") private String fromUrl; @Value("${binlog.from.datasource.username}") private String fromUsername; @Value("${binlog.from.datasource.password}") private String fromPassword; public static final int consumerThreads = 5; public static final long queueSleep = 1000; src/main/java/org/springblade/binlog/listener/TourBinLogListener.java
@@ -1,8 +1,8 @@ package org.springblade.binlog.listener; import cn.hutool.core.collection.CollectionUtil; import lombok.extern.slf4j.Slf4j; import org.springblade.binlog.client.MysqlClient; import org.springblade.binlog.config.DataSourceConfig; import org.springblade.binlog.constant.BinLogConstants; import org.springblade.binlog.util.BinLogUtils; @@ -16,6 +16,7 @@ import javax.annotation.Resource; import java.io.Serializable; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; @@ -90,28 +91,30 @@ private void saveHandle(BinLogItem item) { String tableName = item.getDbTable().split("-")[1]; Map<String, Serializable> data = item.getAfter(); log.info(tableName); Map<String, DataProperty> dataProperty = item.getDataProperty(); // 创建更新对象 List<String> keyList = new ArrayList<>(); List<String> valueList = new ArrayList<>(); // 遍历匹配数据 dataProperty.forEach((key,value)->{ log.info(key); log.info(value.dataType); // log.info("数据类型 " + value.dataType); if(null!=data.get(key)) { log.info(data.get(key).toString()); valueList.add(data.get(key).toString()); }else { if (value.dataType.equals("int") || value.dataType.equals("bigint") || value.dataType.equals("datetime") || value.dataType.equals("date")){ valueList.add(null); }else{ valueList.add(""); keyList.add(key); if (value.dataType.equals("varchar") || value.getDataType().equals("char")) { valueList.add("'" + data.get(key).toString() + "'"); }else if(value.dataType.equals("text") || value.dataType.equals("mediumtext")) { byte[] bytes = (byte[])data.get(key); valueList.add("'" + new String(bytes) + "'"); }else if(value.dataType.equals("date") || value.getDataType().equals("datetime")) { Long time = Long.parseLong(data.get(key).toString()); valueList.add("'" + BinLogUtils.getDateFormatStr(new Date(time)) + "'"); }else { valueList.add(data.get(key).toString()); } } keyList.add(key); }); // 拼接sql StringBuilder sqlBuilder = new StringBuilder(); @@ -124,6 +127,8 @@ .append(String.join(",",valueList)) .append(");"); log.info("sql: " + sqlBuilder.toString()); // 同步到其他数据库 MysqlClient.insert(sqlBuilder.toString()); } /** @@ -131,6 +136,43 @@ * @param item */ private void updateHandle(BinLogItem item) { String tableName = item.getDbTable().split("-")[1]; Map<String, Serializable> data = item.getAfter(); Map<String, DataProperty> dataProperty = item.getDataProperty(); // 创建更新对象 List<String> updateList = new ArrayList<>(); // 遍历匹配数据 dataProperty.forEach((key,value)->{ // log.info("数据类型 " + value.dataType); if(null!=data.get(key) && !key.equals("id")) { if (value.dataType.equals("varchar") || value.getDataType().equals("char")) { updateList.add(key + " = '" + data.get(key).toString() + "'"); }else if(value.dataType.equals("text") || value.dataType.equals("mediumtext")) { byte[] bytes = (byte[])data.get(key); updateList.add(key + " = '" + new String(bytes) + "'"); }else if(value.dataType.equals("date") || value.getDataType().equals("datetime")) { Long time = Long.parseLong(data.get(key).toString()); updateList.add(key + " = '" + BinLogUtils.getDateFormatStr(new Date(time)) + "'"); }else { updateList.add(key + " = " + data.get(key).toString()); } } }); // 拼接sql StringBuilder sqlBuilder = new StringBuilder(); sqlBuilder.append("update ") .append(tableName) .append(" set ") .append(String.join(",",updateList)) .append(" where id = ") .append(data.get("id")) .append(";"); log.info("sql: " + sqlBuilder.toString()); // 同步到其他数据库 MysqlClient.update(sqlBuilder.toString()); } /** @@ -138,6 +180,18 @@ * @param item 数据 */ private void deletedHandle(BinLogItem item) { String tableName = item.getDbTable().split("-")[1]; Map<String, Serializable> data = item.getBefore(); // 拼接sql StringBuilder sqlBuilder = new StringBuilder(); sqlBuilder.append("delete from ") .append(tableName) .append(" where id = ") .append(data.get("id")) .append(";"); log.info("sql: " + sqlBuilder.toString()); // 同步到其他数据库 MysqlClient.delete(sqlBuilder.toString()); } } src/main/java/org/springblade/binlog/util/BinLogUtils.java
@@ -8,11 +8,8 @@ import org.springblade.binlog.config.DataSourceConfig; import org.springblade.binlog.vo.BinLogItem; import org.springblade.binlog.vo.DataProperty; import org.springblade.core.tool.utils.CollectionUtil; import org.springblade.core.tool.utils.DateUtil; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.io.Serializable; import java.sql.*; import java.util.Arrays; @@ -36,15 +33,6 @@ public class BinLogUtils { private static BinLogUtils binLogUtils; // @Resource // private SearchStoreLogoExtMapper searchStoreLogoExtMapper; // @PostConstruct // public void init() { // binLogUtils = this; // binLogUtils.searchStoreLogoExtMapper = this.searchStoreLogoExtMapper; // } /** * 拼接dbTable @@ -90,26 +78,6 @@ } return null; } // /** // * 根据table获取code // * // * @param table // * @return java.lang.Integer // */ // public static Integer getCodeByTable(String table) { // if (StringUtils.isEmpty(table)) { // return null; // } // return CategoryEnum.getCodeByTab(table); // } // public static String getMsgByTab(String table) { // if (StringUtils.isEmpty(table)) { // return null; // } // return CategoryEnum.getMsgByTab(table); // } /** * 根据DBTable获取table @@ -181,28 +149,6 @@ return null; } // /** // * 根据storeId获取imgUrl // */ // public static String getImgUrl(Long storeId) { // // if (storeId == null) { // return ""; // } // //获取url // SearchStoreLogo searchStoreLogo = new SearchStoreLogo(); // searchStoreLogo.setStoreId(storeId); // List<SearchStoreLogo> searchStoreLogos = binLogUtils.searchStoreLogoExtMapper.selectList(searchStoreLogo); // if (CollectionUtil.isNotEmpty(searchStoreLogos)) { // SearchStoreLogo storeLogo = searchStoreLogos.get(0); // if (storeLogo != null) { // return storeLogo.getStoreLogo(); // } // } // return ""; // } /** * 格式化date * @@ -222,6 +168,20 @@ Date formatDate = DateUtil.parse(strDate, dateFormat); return formatDate; } /** * 格式化date * * @param date * @return java.util.Date */ public static String getDateFormatStr(Date date) { if (date == null) { return null; } String dateFormat = "yyyy-MM-dd HH:mm:ss"; return DateUtil.format(date, dateFormat); } } src/main/resources/application-dev.yml
@@ -77,3 +77,9 @@ db: jczz # 监听数据库 table: jczz_house,jczz_household,jczz_place enabled: false # 目标数据库 from: datasource: url: jdbc:mysql://106.225.193.35:3306/srjw?useSSL=false&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&allowPublicKeyRetrieval=true username: root password: HCyj@2022 src/main/resources/application-prod.yml
@@ -64,3 +64,9 @@ db: jczz # 监听数据库 table: jczz_house,jczz_household,jczz_place enabled: false # 目标数据库 from: datasource: url: jdbc:mysql://106.225.193.35:3306/srjw?useSSL=false&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&allowPublicKeyRetrieval=true username: root password: HCyj@2022 src/main/resources/application-test.yml
@@ -56,11 +56,18 @@ # binlog listener binlog: # 源数据库 datasource: host: 127.0.0.1 port: 3308 username: root password: root db: jczz_test table: jczz_house,jczz_household,jczz_place table: jczz_house,jczz_household,jczz_place,jczz_place_ext,blade_attach_data enabled: false # 目标数据库 from: datasource: url: jdbc:mysql://106.225.193.35:3306/srjw?useSSL=false&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&allowPublicKeyRetrieval=true username: root password: HCyj@2022 src/main/resources/application.yml
@@ -250,11 +250,14 @@ # - /blade-taskReportForRepairs/** # - /blade-placeExt/** # - /blade-grid/** # - /blade-community/** # - /blade-gridman/** # - /blade-propertyCompany/** # - /blade-eCallEvent/** # - /blade-system/** # - /blade-propertyCompanyComment/** # - /blade-policeStation/** # - /blade-policeAffairsGrid/** #授权认证配置 auth: - method: ALL