pom.xml
@@ -219,6 +219,26 @@ <artifactId>spring-boot-starter-mail</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/com.github.shyiko/mysql-binlog-connector-java --> <dependency> <groupId>com.github.shyiko</groupId> <artifactId>mysql-binlog-connector-java</artifactId> <version>0.17.0</version> </dependency> <!-- https://mvnrepository.com/artifact/args4j/args4j --> <dependency> <groupId>args4j</groupId> <artifactId>args4j</artifactId> <version>2.33</version> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.5.4</version> <scope>compile</scope> </dependency> </dependencies> <build> src/main/java/org/springblade/binlog/config/DataSourceConfig.java
New file @@ -0,0 +1,20 @@ package org.springblade.binlog.config; import lombok.AllArgsConstructor; import lombok.Data; /** * 数据库配置 * * @author zrj * @since 2024/02/19 **/ @Data @AllArgsConstructor public class DataSourceConfig { private String host; private int port; private String username; private String password; } src/main/java/org/springblade/binlog/constant/BinLogConstants.java
New file @@ -0,0 +1,40 @@ package org.springblade.binlog.constant; import lombok.Data; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * 监听配置信息 * * @author zrj * @since 2024/02/19 **/ @Data @Component public class BinLogConstants { @Value("${binlog.datasource.host}") private String host; @Value("${binlog.datasource.port}") private int port; @Value("${binlog.datasource.username}") private String username; @Value("${binlog.datasource.password}") private String password; @Value("${binlog.db}") private String db; @Value("${binlog.table}") private String table; public static final int consumerThreads = 5; public static final long queueSleep = 1000; } src/main/java/org/springblade/binlog/listener/BinLogListener.java
New file @@ -0,0 +1,17 @@ package org.springblade.binlog.listener; import org.springblade.binlog.vo.BinLogItem; /** * BinLogListener监听器 * * @author zrj * @since 2024/02/19 **/ @FunctionalInterface public interface BinLogListener { void onEvent(BinLogItem item); } src/main/java/org/springblade/binlog/listener/MysqlBinLogListener.java
New file @@ -0,0 +1,165 @@ package org.springblade.binlog.listener; import com.github.shyiko.mysql.binlog.BinaryLogClient; import com.github.shyiko.mysql.binlog.event.*; import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import lombok.extern.slf4j.Slf4j; import org.kohsuke.args4j.Option; import org.springblade.binlog.config.DataSourceConfig; import org.springblade.binlog.constant.BinLogConstants; import org.springblade.binlog.vo.BinLogItem; import org.springblade.binlog.vo.DataProperty; import java.io.IOException; import java.io.Serializable; import java.util.Map; import java.util.concurrent.*; import static com.github.shyiko.mysql.binlog.event.EventType.*; import static org.springblade.binlog.util.BinLogUtils.getColMap; import static org.springblade.binlog.util.BinLogUtils.getdbTable; /** * 数据库监听器 * * @author zrj * @since 2021/7/26 **/ @Slf4j public class MysqlBinLogListener implements BinaryLogClient.EventListener { @Option(name = "-binlog-consume_threads", usage = "the thread num of consumer") private int consumerThreads = BinLogConstants.consumerThreads; private BinaryLogClient parseClient; private BlockingQueue<BinLogItem> queue; private final ExecutorService consumer; // 存放每张数据表对应的listener private Multimap<String, BinLogListener> listeners; private DataSourceConfig conf; private Map<String, Map<String, DataProperty>> dbTableCols; private String dbTable; /** * 监听器初始化 * * @param conf */ public MysqlBinLogListener(DataSourceConfig conf) { BinaryLogClient client = new BinaryLogClient(conf.getHost(), conf.getPort(), conf.getUsername(), conf.getPassword()); EventDeserializer eventDeserializer = new EventDeserializer(); //eventDeserializer.setCompatibilityMode(//序列化 // EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG, // EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY //); client.setEventDeserializer(eventDeserializer); this.parseClient = client; this.queue = new ArrayBlockingQueue<>(1024); this.conf = conf; this.listeners = ArrayListMultimap.create(); this.dbTableCols = new ConcurrentHashMap<>(); this.consumer = Executors.newFixedThreadPool(consumerThreads); } /** * 监听处理 * * @param event */ @Override public void onEvent(Event event) { EventType eventType = event.getHeader().getEventType(); if (eventType == EventType.TABLE_MAP) { TableMapEventData tableData = event.getData(); String db = tableData.getDatabase(); String table = tableData.getTable(); dbTable = getdbTable(db, table); } // 只处理添加删除更新三种操作 if (isWrite(eventType) || isUpdate(eventType) || isDelete(eventType)) { if (isWrite(eventType)) { WriteRowsEventData data = event.getData(); for (Serializable[] row : data.getRows()) { if (dbTableCols.containsKey(dbTable)) { BinLogItem item = BinLogItem.itemFromInsertOrDeleted(row, dbTableCols.get(dbTable), eventType); item.setDbTable(dbTable); queue.add(item); } } } if (isUpdate(eventType)) { UpdateRowsEventData data = event.getData(); for (Map.Entry<Serializable[], Serializable[]> row : data.getRows()) { if (dbTableCols.containsKey(dbTable)) { BinLogItem item = BinLogItem.itemFromUpdate(row, dbTableCols.get(dbTable), eventType); item.setDbTable(dbTable); queue.add(item); } } } if (isDelete(eventType)) { DeleteRowsEventData data = event.getData(); for (Serializable[] row : data.getRows()) { if (dbTableCols.containsKey(dbTable)) { BinLogItem item = BinLogItem.itemFromInsertOrDeleted(row, dbTableCols.get(dbTable), eventType); item.setDbTable(dbTable); queue.add(item); } } } } } /** * 注册监听 * * @param db 数据库 * @param table 操作表 * @param listener 监听器 * @throws Exception */ public void regListener(String db, String table, BinLogListener listener) throws Exception { String dbTable = getdbTable(db, table); // 获取字段集合 Map<String, DataProperty> cols = getColMap(conf, db, table); // 保存字段信息 dbTableCols.put(dbTable, cols); // 保存当前注册的listener listeners.put(dbTable, listener); } /** * 开启多线程消费 * * @throws IOException */ public void parse() throws IOException { parseClient.registerEventListener(this); for (int i = 0; i < consumerThreads; i++) { consumer.submit(() -> { while (true) { if (queue.size() > 0) { try { BinLogItem item = queue.take(); String dbtable = item.getDbTable(); listeners.get(dbtable).forEach(binLogListener -> binLogListener.onEvent(item)); } catch (InterruptedException e) { e.printStackTrace(); } } Thread.sleep(BinLogConstants.queueSleep); } }); } parseClient.connect(); } } src/main/java/org/springblade/binlog/listener/TourBinLogListener.java
New file @@ -0,0 +1,72 @@ package org.springblade.binlog.listener; import cn.hutool.core.collection.CollectionUtil; import lombok.extern.slf4j.Slf4j; import org.springblade.binlog.config.DataSourceConfig; import org.springblade.binlog.constant.BinLogConstants; import org.springblade.binlog.util.BinLogUtils; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Configuration; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.List; /** * 乐游监听器 * SpringBoot启动成功后的执行业务线程操作 * CommandLineRunner去实现此操作 * 在有多个可被执行的业务时,通过使用 @Order 注解,设置各个线程的启动顺序(value值由小到大表示启动顺序)。 * 多个实现CommandLineRunner接口的类必须要设置启动顺序,不让程序启动会报错! * * @author zrj * @since 2021/7/27 **/ @Slf4j @Component @Order(value = 1) @Configuration(proxyBeanMethods = false) @ConditionalOnProperty(value = "binlog.enabled") public class TourBinLogListener implements CommandLineRunner { @Resource private BinLogConstants binLogConstants; @Override public void run(String... args) throws Exception { log.info("初始化配置信息:" + binLogConstants.toString()); // 初始化配置信息 DataSourceConfig conf = new DataSourceConfig(binLogConstants.getHost(), binLogConstants.getPort(), binLogConstants.getUsername(), binLogConstants.getPassword()); // 初始化监听器 MysqlBinLogListener mysqlBinLogListener = new MysqlBinLogListener(conf); // 获取table集合 List<String> tableList = BinLogUtils.getListByStr(binLogConstants.getTable()); if (CollectionUtil.isEmpty(tableList)) { return; } // 注册监听 tableList.forEach(table -> { log.info("注册监听信息,注册DB:" + binLogConstants.getDb() + ",注册表:" + table); try { mysqlBinLogListener.regListener(binLogConstants.getDb(), table, item -> { log.info("监听逻辑处理"); }); } catch (Exception e) { log.error("BinLog监听异常:" + e); } }); // 多线程消费 mysqlBinLogListener.parse(); } } src/main/java/org/springblade/binlog/util/BinLogUtils.java
New file @@ -0,0 +1,227 @@ package org.springblade.binlog.util; import com.github.shyiko.mysql.binlog.event.EventType; import com.google.common.collect.Lists; import liquibase.repackaged.org.apache.commons.lang3.StringUtils; import lombok.extern.slf4j.Slf4j; import org.springblade.binlog.config.DataSourceConfig; import org.springblade.binlog.vo.BinLogItem; import org.springblade.binlog.vo.DataProperty; import org.springblade.core.tool.utils.CollectionUtil; import org.springblade.core.tool.utils.DateUtil; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.io.Serializable; import java.sql.*; import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import static com.github.shyiko.mysql.binlog.event.EventType.isDelete; import static com.github.shyiko.mysql.binlog.event.EventType.isUpdate; import static com.github.shyiko.mysql.binlog.event.EventType.isWrite; /** * 监听工具 * * @author zrj * @since 2021/7/27 **/ @Slf4j @Component public class BinLogUtils { private static BinLogUtils binLogUtils; // @Resource // private SearchStoreLogoExtMapper searchStoreLogoExtMapper; // @PostConstruct // public void init() { // binLogUtils = this; // binLogUtils.searchStoreLogoExtMapper = this.searchStoreLogoExtMapper; // } /** * 拼接dbTable */ public static String getdbTable(String db, String table) { return db + "-" + table; } /** * 获取columns集合 */ public static Map<String, DataProperty> getColMap(DataSourceConfig conf, String db, String table) throws ClassNotFoundException { try { Class.forName("com.mysql.cj.jdbc.Driver"); // 保存当前注册的表的colum信息 Connection connection = DriverManager.getConnection( "jdbc:mysql://" + conf.getHost() + ":" + conf.getPort() + "?serverTimezone=GMT%2B8", conf.getUsername(), conf.getPassword()); // 执行sql String preSql = "SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE, ORDINAL_POSITION FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? and TABLE_NAME = ?"; PreparedStatement ps = connection.prepareStatement(preSql); ps.setString(1, db); ps.setString(2, table); ResultSet rs = ps.executeQuery(); Map<String, DataProperty> map = new HashMap<>(rs.getRow()); while (rs.next()) { String schema = rs.getString("TABLE_SCHEMA"); String tableName = rs.getString("TABLE_NAME"); String column = rs.getString("COLUMN_NAME"); int idx = rs.getInt("ORDINAL_POSITION"); String dataType = rs.getString("DATA_TYPE"); if (column != null && idx >= 1) { // sql的位置从1开始 map.put(column, new DataProperty(schema, tableName, idx - 1, column, dataType)); } } ps.close(); rs.close(); return map; } catch (SQLException e) { log.error("load db conf error, db_table={}:{} ", db, table, e); } return null; } // /** // * 根据table获取code // * // * @param table // * @return java.lang.Integer // */ // public static Integer getCodeByTable(String table) { // if (StringUtils.isEmpty(table)) { // return null; // } // return CategoryEnum.getCodeByTab(table); // } // public static String getMsgByTab(String table) { // if (StringUtils.isEmpty(table)) { // return null; // } // return CategoryEnum.getMsgByTab(table); // } /** * 根据DBTable获取table * * @param dbTable * @return java.lang.String */ public static String getTable(String dbTable) { if (StringUtils.isEmpty(dbTable)) { return ""; } String[] split = dbTable.split("-"); if (split.length == 2) { return split[1]; } return ""; } /** * 将逗号拼接字符串转List * * @param str * @return */ public static List<String> getListByStr(String str) { if (StringUtils.isEmpty(str)) { return Lists.newArrayList(); } return Arrays.asList(str.split(",")); } /** * 根据操作类型获取对应集合 * * @param binLogItem * @return */ public static Map<String, Serializable> getOptMap(BinLogItem binLogItem) { // 获取操作类型 EventType eventType = binLogItem.getEventType(); if (isWrite(eventType) || isUpdate(eventType)) { return binLogItem.getAfter(); } if (isDelete(eventType)) { return binLogItem.getBefore(); } return null; } /** * 获取操作类型 * * @param binLogItem * @return */ public static Integer getOptType(BinLogItem binLogItem) { // 获取操作类型 EventType eventType = binLogItem.getEventType(); if (isWrite(eventType)) { return 1; } if (isUpdate(eventType)) { return 2; } if (isDelete(eventType)) { return 3; } return null; } // /** // * 根据storeId获取imgUrl // */ // public static String getImgUrl(Long storeId) { // // if (storeId == null) { // return ""; // } // //获取url // SearchStoreLogo searchStoreLogo = new SearchStoreLogo(); // searchStoreLogo.setStoreId(storeId); // List<SearchStoreLogo> searchStoreLogos = binLogUtils.searchStoreLogoExtMapper.selectList(searchStoreLogo); // if (CollectionUtil.isNotEmpty(searchStoreLogos)) { // SearchStoreLogo storeLogo = searchStoreLogos.get(0); // if (storeLogo != null) { // return storeLogo.getStoreLogo(); // } // } // return ""; // } /** * 格式化date * * @param date * @return java.util.Date */ public static Date getDateFormat(Date date) { if (date == null) { return null; } String dateFormat = "yyyy-MM-dd HH:mm:ss"; String strDate = DateUtil.format(date, dateFormat); if (StringUtils.isEmpty(strDate)) { return null; } Date formatDate = DateUtil.parse(strDate, dateFormat); return formatDate; } } src/main/java/org/springblade/binlog/vo/BinLogItem.java
New file @@ -0,0 +1,102 @@ package org.springblade.binlog.vo; import com.github.shyiko.mysql.binlog.event.EventType; import com.google.common.collect.Maps; import lombok.Data; import java.io.Serializable; import java.util.Map; import static com.github.shyiko.mysql.binlog.event.EventType.isDelete; import static com.github.shyiko.mysql.binlog.event.EventType.isWrite; /** * binlog对象 * * @author zrj * @since 2024/02/19 **/ @Data public class BinLogItem implements Serializable { private static final long serialVersionUID = 5503152746318421290L; private String dbTable; private EventType eventType; private Long timestamp = null; private Long serverId = null; // 存储字段-之前的值之后的值 private Map<String, Serializable> before = null; private Map<String, Serializable> after = null; // 存储字段--类型 private Map<String, DataProperty> dataProperty = null; /** * 新增或者删除操作数据格式化 */ public static BinLogItem itemFromInsertOrDeleted(Serializable[] row, Map<String, DataProperty> dataPropertyMap, EventType eventType) { if (null == row || null == dataPropertyMap) { return null; } if (row.length != dataPropertyMap.size()) { return null; } // 初始化Item BinLogItem item = new BinLogItem(); item.eventType = eventType; item.dataProperty = dataPropertyMap; item.before = Maps.newHashMap(); item.after = Maps.newHashMap(); Map<String, Serializable> beOrAf = Maps.newHashMap(); dataPropertyMap.entrySet().forEach(entry -> { String key = entry.getKey(); DataProperty dataProperty = entry.getValue(); beOrAf.put(key, row[dataProperty.inx]); }); // 写操作放after,删操作放before if (isWrite(eventType)) { item.after = beOrAf; } if (isDelete(eventType)) { item.before = beOrAf; } return item; } /** * 更新操作数据格式化 */ public static BinLogItem itemFromUpdate(Map.Entry<Serializable[], Serializable[]> mapEntry, Map<String, DataProperty> dataPropertyMap, EventType eventType) { if (null == mapEntry || null == dataPropertyMap) { return null; } // 初始化Item BinLogItem item = new BinLogItem(); item.eventType = eventType; item.dataProperty = dataPropertyMap; item.before = Maps.newHashMap(); item.after = Maps.newHashMap(); Map<String, Serializable> be = Maps.newHashMap(); Map<String, Serializable> af = Maps.newHashMap(); dataPropertyMap.entrySet().forEach(entry -> { String key = entry.getKey(); DataProperty dataProperty = entry.getValue(); be.put(key, mapEntry.getKey()[dataProperty.inx]); af.put(key, mapEntry.getValue()[dataProperty.inx]); }); item.before = be; item.after = af; return item; } } src/main/java/org/springblade/binlog/vo/DataProperty.java
New file @@ -0,0 +1,39 @@ package org.springblade.binlog.vo; import lombok.Data; /** * 字段属性对象 * * @author zrj * @since 2024/02/19 **/ @Data public class DataProperty { public int inx; /** * 列名 */ public String colName; /** * 类型 */ public String dataType; /** * 数据库 */ public String schema; /** * 表 */ public String table; public DataProperty(String schema, String table, int idx, String colName, String dataType) { this.schema = schema; this.table = table; this.colName = colName; this.dataType = dataType; this.inx = idx; } } src/main/java/org/springblade/modules/ownersCommittee/controller/OwnersCommitteeController.java
@@ -99,6 +99,7 @@ @ApiOperationSupport(order = 5) @ApiOperation(value = "修改", notes = "传入ownersCommittee") public R update(@Valid @RequestBody OwnersCommitteeEntity ownersCommittee) { // 负责人修改了需要去更新负责人 return R.status(ownersCommitteeService.updateById(ownersCommittee)); } src/main/java/org/springblade/modules/ownersCommittee/entity/OwnersCommitteeEntity.java
@@ -58,7 +58,7 @@ /** 建立时间 */ @ApiModelProperty(value = "建立时间", example = "") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") @JsonFormat(pattern = "yyyy-MM-dd", timezone = "GMT+8") @TableField("establish_time") private Date establishTime; @@ -119,13 +119,13 @@ /** 开始时间 */ @ApiModelProperty(value = "开始时间", example = "") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") @JsonFormat(pattern = "yyyy-MM-dd", timezone = "GMT+8") @TableField("start_time") private Date startTime; /** 截止时间 */ @ApiModelProperty(value = "截止时间", example = "") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") @JsonFormat(pattern = "yyyy-MM-dd", timezone = "GMT+8") @TableField("end_time") private Date endTime; @@ -169,6 +169,5 @@ /** 0否 1是 */ @ApiModelProperty(value = "0否 1是", example = "") @TableField("delete_flag") @TableLogic private Integer deleteFlag; } src/main/java/org/springblade/modules/system/mapper/UserMapper.xml
@@ -98,7 +98,7 @@ jd.id = #{districtId} and bu.is_deleted = '0' and jh.is_deleted = '0' and jh.role_type = '1' </select> <!--根据手机号查询对应账号和手机号的用户信息--> src/main/resources/application-dev.yml
@@ -66,3 +66,14 @@ port: 7018 address: enabled: false # binlog listener binlog: datasource: # 订阅binlog数据库连接信息,ip,端口,用户密码(用户必须要有权限) host: 127.0.0.1 port: 3306 username: root password: 1qaz@WSX3edc db: jczz # 监听数据库 table: jczz_house,jczz_household,jczz_place enabled: false src/main/resources/application-prod.yml
@@ -53,3 +53,14 @@ port: 7018 address: enabled: true # binlog listener binlog: datasource: # 订阅binlog数据库连接信息,ip,端口,用户密码(用户必须要有权限) host: 127.0.0.1 port: 3306 username: root password: 1qaz@WSX3edc db: jczz # 监听数据库 table: jczz_house,jczz_household,jczz_place enabled: false src/main/resources/application-test.yml
@@ -14,7 +14,7 @@ # nodes: 127.0.0.1:7001,127.0.0.1:7002,127.0.0.1:7003 # commandTimeout: 5000 datasource: url: jdbc:mysql://127.0.0.1:3308/jczz?useSSL=false&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&allowPublicKeyRetrieval=true url: jdbc:mysql://127.0.0.1:3308/jczz_test?useSSL=false&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true&allowPublicKeyRetrieval=true username: root password: root @@ -53,3 +53,14 @@ port: 7018 address: enabled: false # binlog listener binlog: datasource: host: 127.0.0.1 port: 3308 username: root password: root db: jczz_test table: jczz_house,jczz_household,jczz_place enabled: false