package org.springblade.binlog.listener; import com.github.shyiko.mysql.binlog.BinaryLogClient; import com.github.shyiko.mysql.binlog.event.Event; import com.github.shyiko.mysql.binlog.event.EventData; import com.github.shyiko.mysql.binlog.event.QueryEventData; import com.github.shyiko.mysql.binlog.event.TableMapEventData; import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; import java.util.List; public class BinlogListenerMixed { private static final Logger logger = LoggerFactory.getLogger(BinlogListenerMixed.class); private static final String MYSQL_HOST = "127.0.0.1"; private static final int MYSQL_PORT = 3308; private static final String MYSQL_USERNAME = "root"; private static final String MYSQL_PASSWORD = "root"; public static void main(String[] args) { try { BinaryLogClient client = new BinaryLogClient(MYSQL_HOST, MYSQL_PORT, MYSQL_USERNAME, MYSQL_PASSWORD); // client.setBinlogFilename(null); // client.setBinlogPosition(-1); // 或者设置为其他适当的初始位置 // client.setServerId(1); // client.setBinlogFilename("mysql-bin.000005"); // client.setBinlogPosition(154); EventDeserializer eventDeserializer = new EventDeserializer(); eventDeserializer.setCompatibilityMode( EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG // EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY 该设置会将varchar 转为 byte[] ); logger.info("使用主机={}, 端口={}, 用户名={}, 密码={} 连接到 MySQL", MYSQL_HOST, MYSQL_PORT, MYSQL_USERNAME, MYSQL_PASSWORD); client.setEventDeserializer(eventDeserializer); client.registerEventListener(BinlogListenerMixed::handleEvent); client.registerLifecycleListener(new BinaryLogClient.LifecycleListener() { @Override public void onConnect(BinaryLogClient client) { logger.info("Connected to MySQL server"); } @Override public void onCommunicationFailure(BinaryLogClient client, Exception ex) { logger.error("Communication failure with MySQL server", ex); } @Override public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) { logger.error("Event deserialization failure", ex); } @Override public void onDisconnect(BinaryLogClient client) { logger.warn("Disconnected from MySQL server"); // 在这里添加重新连接或其他处理逻辑 } }); client.connect(); } catch (IOException e) { logger.error("@@ 连接到 MySQL 时发生错误", e); logger.error("@@ Error connecting to MySQL", e); } } private static void handleEvent(Event event) { logger.info("@@ 打印 event: {}", event); logger.info("@@ Received event type: {}", event.getHeader().getEventType()); switch (event.getHeader().getEventType()) { case WRITE_ROWS: case EXT_WRITE_ROWS: handleWriteRowsEvent((WriteRowsEventData) event.getData()); break; case QUERY: handleQueryEvent((QueryEventData) event.getData()); break; case TABLE_MAP: handleTableMapEvent((TableMapEventData) event.getData()); break; // 其他事件处理... } } private static void handleWriteRowsEvent(WriteRowsEventData eventData) { List rows = eventData.getRows(); // 获取表名 String tableName = getTableName(eventData); // 处理每一行数据 for (Serializable[] row : rows) { // 根据需要调整以下代码以获取具体的列值 String column1Value = row[0].toString(); String column2Value = row[1].toString(); String url = row[2].toString(); logger.info(url); // 将数据备份到另一个数据库 backupToAnotherDatabase(tableName, column1Value, column2Value); } } private static void handleQueryEvent(QueryEventData eventData) { String sql = eventData.getSql(); logger.info("@@ handleQueryEvent函数执行Query event SQL: {}", sql); // 解析SQL语句,根据需要处理 // 例如,检查是否包含写入操作,然后执行相应的逻辑 } private static void handleTableMapEvent(TableMapEventData eventData) { // 获取表映射信息,根据需要处理 logger.info("@@ handleTableMapEvent函数执行TableMap event: {}", eventData); } private static String getTableName(EventData eventData) { // 获取表名的逻辑,可以使用TableMapEventData等信息 // 根据实际情况实现 return "example_table"; } private static void backupToAnotherDatabase(String tableName, String column1Value, String column2Value) { // 将数据备份到另一个数据库的逻辑 logger.info("Backup to another database: Table={}, Column1={}, Column2={}", tableName, column1Value, column2Value); } }