src/main/java/org/springblade/binlog/listener/BinlogListenerMixed.java
New file @@ -0,0 +1,131 @@ package org.springblade.binlog.listener; import com.github.shyiko.mysql.binlog.BinaryLogClient; import com.github.shyiko.mysql.binlog.event.Event; import com.github.shyiko.mysql.binlog.event.EventData; import com.github.shyiko.mysql.binlog.event.QueryEventData; import com.github.shyiko.mysql.binlog.event.TableMapEventData; import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; import java.util.List; public class BinlogListenerMixed { private static final Logger logger = LoggerFactory.getLogger(BinlogListenerMixed.class); private static final String MYSQL_HOST = "127.0.0.1"; private static final int MYSQL_PORT = 3308; private static final String MYSQL_USERNAME = "root"; private static final String MYSQL_PASSWORD = "root"; public static void main(String[] args) { try { BinaryLogClient client = new BinaryLogClient(MYSQL_HOST, MYSQL_PORT, MYSQL_USERNAME, MYSQL_PASSWORD); // client.setBinlogFilename(null); // client.setBinlogPosition(-1); // 或者设置为其他适当的初始位置 // client.setServerId(1); // client.setBinlogFilename("mysql-bin.000005"); // client.setBinlogPosition(154); EventDeserializer eventDeserializer = new EventDeserializer(); eventDeserializer.setCompatibilityMode( EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG // EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY 该设置会将varchar 转为 byte[] ); logger.info("使用主机={}, 端口={}, 用户名={}, 密码={} 连接到 MySQL", MYSQL_HOST, MYSQL_PORT, MYSQL_USERNAME, MYSQL_PASSWORD); client.setEventDeserializer(eventDeserializer); client.registerEventListener(BinlogListenerMixed::handleEvent); client.registerLifecycleListener(new BinaryLogClient.LifecycleListener() { @Override public void onConnect(BinaryLogClient client) { logger.info("Connected to MySQL server"); } @Override public void onCommunicationFailure(BinaryLogClient client, Exception ex) { logger.error("Communication failure with MySQL server", ex); } @Override public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) { logger.error("Event deserialization failure", ex); } @Override public void onDisconnect(BinaryLogClient client) { logger.warn("Disconnected from MySQL server"); // 在这里添加重新连接或其他处理逻辑 } }); client.connect(); } catch (IOException e) { logger.error("@@ 连接到 MySQL 时发生错误", e); logger.error("@@ Error connecting to MySQL", e); } } private static void handleEvent(Event event) { logger.info("@@ 打印 event: {}", event); logger.info("@@ Received event type: {}", event.getHeader().getEventType()); switch (event.getHeader().getEventType()) { case WRITE_ROWS: case EXT_WRITE_ROWS: handleWriteRowsEvent((WriteRowsEventData) event.getData()); break; case QUERY: handleQueryEvent((QueryEventData) event.getData()); break; case TABLE_MAP: handleTableMapEvent((TableMapEventData) event.getData()); break; // 其他事件处理... } } private static void handleWriteRowsEvent(WriteRowsEventData eventData) { List<Serializable[]> rows = eventData.getRows(); // 获取表名 String tableName = getTableName(eventData); // 处理每一行数据 for (Serializable[] row : rows) { // 根据需要调整以下代码以获取具体的列值 String column1Value = row[0].toString(); String column2Value = row[1].toString(); String url = row[2].toString(); logger.info(url); // 将数据备份到另一个数据库 backupToAnotherDatabase(tableName, column1Value, column2Value); } } private static void handleQueryEvent(QueryEventData eventData) { String sql = eventData.getSql(); logger.info("@@ handleQueryEvent函数执行Query event SQL: {}", sql); // 解析SQL语句,根据需要处理 // 例如,检查是否包含写入操作,然后执行相应的逻辑 } private static void handleTableMapEvent(TableMapEventData eventData) { // 获取表映射信息,根据需要处理 logger.info("@@ handleTableMapEvent函数执行TableMap event: {}", eventData); } private static String getTableName(EventData eventData) { // 获取表名的逻辑,可以使用TableMapEventData等信息 // 根据实际情况实现 return "example_table"; } private static void backupToAnotherDatabase(String tableName, String column1Value, String column2Value) { // 将数据备份到另一个数据库的逻辑 logger.info("Backup to another database: Table={}, Column1={}, Column2={}", tableName, column1Value, column2Value); } } src/main/java/org/springblade/binlog/listener/MysqlBinLogListener.java
@@ -50,11 +50,10 @@ */ public MysqlBinLogListener(DataSourceConfig conf) { BinaryLogClient client = new BinaryLogClient(conf.getHost(), conf.getPort(), conf.getUsername(), conf.getPassword()); // 序列化 EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY 该设置会将varchar 转为 byte[] EventDeserializer eventDeserializer = new EventDeserializer(); //eventDeserializer.setCompatibilityMode(//序列化 // EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG, // EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY //); eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG ); client.setEventDeserializer(eventDeserializer); this.parseClient = client; this.queue = new ArrayBlockingQueue<>(1024); src/main/java/org/springblade/binlog/listener/TourBinLogListener.java
@@ -6,14 +6,18 @@ import org.springblade.binlog.config.DataSourceConfig; import org.springblade.binlog.constant.BinLogConstants; import org.springblade.binlog.util.BinLogUtils; import org.springblade.binlog.vo.BinLogItem; import org.springblade.binlog.vo.DataProperty; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Configuration; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; /** * 乐游监听器 @@ -58,7 +62,18 @@ log.info("注册监听信息,注册DB:" + binLogConstants.getDb() + ",注册表:" + table); try { mysqlBinLogListener.regListener(binLogConstants.getDb(), table, item -> { log.info("监听逻辑处理"); if (item.getEventType().name().equals("EXT_WRITE_ROWS")){ // 新增处理逻辑 saveHandle(item); } if (item.getEventType().name().equals("EXT_UPDATE_ROWS")){ // 更新处理逻辑 updateHandle(item); } if (item.getEventType().name().equals("EXT_DELETE_ROWS")){ // 删除处理逻辑 deletedHandle(item); } }); } catch (Exception e) { log.error("BinLog监听异常:" + e); @@ -67,6 +82,63 @@ // 多线程消费 mysqlBinLogListener.parse(); } /** * 新增处理逻辑 * @param item */ private void saveHandle(BinLogItem item) { String tableName = item.getDbTable().split("-")[1]; Map<String, Serializable> data = item.getAfter(); log.info(tableName); Map<String, DataProperty> dataProperty = item.getDataProperty(); List<String> keyList = new ArrayList<>(); List<String> valueList = new ArrayList<>(); dataProperty.forEach((key,value)->{ log.info(key); log.info(value.dataType); if(null!=data.get(key)) { log.info(data.get(key).toString()); valueList.add(data.get(key).toString()); }else { if (value.dataType.equals("int") || value.dataType.equals("bigint") || value.dataType.equals("datetime") || value.dataType.equals("date")){ valueList.add(null); }else{ valueList.add(""); } } keyList.add(key); }); // 拼接sql StringBuilder sqlBuilder = new StringBuilder(); sqlBuilder.append("insert into ") .append(tableName) .append("(") .append(String.join(",",keyList)) .append(") ") .append(" values (") .append(String.join(",",valueList)) .append(");"); log.info("sql: " + sqlBuilder.toString()); } /** * 修改处理逻辑 * @param item */ private void updateHandle(BinLogItem item) { } /** * 删除处理逻辑 * @param item 数据 */ private void deletedHandle(BinLogItem item) { } } src/main/java/org/springblade/modules/place/mapper/PlaceMapper.java
@@ -36,6 +36,22 @@ /** * 自定义分页 * * @param page * @param place * @param houseCodeList * @param regionChildCodesList * @param isAdministrator * @return */ List<PlaceVO> selectPlacePage(IPage page, @Param("place") PlaceVO place, @Param("gridCodeList") List<String> gridCodeList, @Param("regionChildCodesList") List<String> regionChildCodesList, @Param("isAdministrator") Integer isAdministrator); /** * 九小场所档案 * * @param page @@ -53,22 +69,6 @@ @Param("isAdministrator") Integer isAdministrator, @Param("nineTypeList") List<String> nineTypeList); /** * 自定义分页 * * @param page * @param place * @param houseCodeList * @param regionChildCodesList * @param isAdministrator * @return */ List<PlaceVO> selectPlacePage(IPage page, @Param("place") PlaceVO place, @Param("houseCodeList") List<String> houseCodeList, @Param("regionChildCodesList") List<String> regionChildCodesList, @Param("isAdministrator") Integer isAdministrator); /** * 查询场所集合信息 src/main/java/org/springblade/modules/place/mapper/PlaceMapper.xml
@@ -73,22 +73,59 @@ <if test="place.isPerfect==2"> and jp.status = 2 </if> <if test="houseCodeList != null and houseCodeList.size()>0"> and jp.house_code in <foreach collection="houseCodeList" item="houseCode" separator ="," open="(" close=")"> #{houseCode} </foreach> </if> <if test="isAdministrator==2"> <choose> <when test="regionChildCodesList !=null and regionChildCodesList.size()>0"> and jg.grid_code in <foreach collection="regionChildCodesList" item="code" open="(" close=")" separator=","> #{code} </foreach> <when test="place.roleName != null and place.roleName != ''"> <if test="place.roleName=='wgy'"> <choose> <when test="gridCodeList !=null and gridCodeList.size()>0"> and jp.grid_code in <foreach collection="gridCodeList" item="code" open="(" close=")" separator=","> #{code} </foreach> </when> <otherwise> and jp.grid_code in ('') </otherwise> </choose> </if> <if test="place.roleName=='mj'"> <choose> <when test="regionChildCodesList !=null and regionChildCodesList.size()>0"> and jpag.community_code in <foreach collection="regionChildCodesList" item="code" open="(" close=")" separator=","> #{code} </foreach> </when> <otherwise> and jpag.community_code in ('') </otherwise> </choose> </if> </when> <otherwise> and jg.grid_code in ('') <choose> <when test="regionChildCodesList !=null and regionChildCodesList.size()>0"> and ( jg.grid_code in <foreach collection="regionChildCodesList" item="code" open="(" close=")" separator=","> #{code} </foreach> or jpag.community_code in <foreach collection="regionChildCodesList" item="code" open="(" close=")" separator=","> #{code} </foreach> ) </when> <otherwise> and ( jg.grid_code in ('') or jpag.community_code in ('') ) </otherwise> </choose> </otherwise> </choose> </if> src/main/java/org/springblade/modules/place/service/impl/PlaceServiceImpl.java
@@ -24,6 +24,7 @@ import org.apache.logging.log4j.util.Strings; import org.springblade.common.cache.SysCache; import org.springblade.common.node.TreeStringNode; import org.springblade.common.param.CommonParamSet; import org.springblade.common.utils.IdUtils; import org.springblade.common.utils.SpringUtils; import org.springblade.core.mp.support.Condition; @@ -48,6 +49,7 @@ import org.springblade.modules.place.service.IPlaceExtService; import org.springblade.modules.place.service.IPlacePoiLabelService; import org.springblade.modules.place.service.IPlaceRelService; import org.springblade.modules.place.vo.PlaceCheckVO; import org.springblade.modules.place.vo.PlacePoiLabelVO; import org.springblade.modules.place.vo.PlaceVO; import org.springblade.modules.place.mapper.PlaceMapper; @@ -119,16 +121,13 @@ */ @Override public IPage<PlaceVO> selectPlacePage(IPage<PlaceVO> page, PlaceVO place) { List<String> regionChildCodesList = SysCache.getRegionChildCodesByDeptId(AuthUtil.getDeptId()); Integer isAdministrator = AuthUtil.isAdministrator() == true ? 1 : 2; List<String> list = new ArrayList<>(); if (null != place.getRoleName() && !place.getRoleName().equals("")) { if (place.getRoleName().equals("网格员")) { // 查询对应的房屋地址code list = gridService.getAddressCodeListByUserId(AuthUtil.getUserId()); } } List<PlaceVO> placeVOS = baseMapper.selectPlacePage(page, place, list, regionChildCodesList, isAdministrator); // 公共参数设置 CommonParamSet commonParamSet = new CommonParamSet().invoke(PlaceVO.class,place); List<PlaceVO> placeVOS = baseMapper.selectPlacePage(page, place, commonParamSet.getGridCodeList(), commonParamSet.getRegionChildCodesList(), commonParamSet.getIsAdministrator()); // 返回 return page.setRecords(placeVOS); }