package org.springblade.binlog.listener;
|
|
import com.github.shyiko.mysql.binlog.BinaryLogClient;
|
import com.github.shyiko.mysql.binlog.event.Event;
|
import com.github.shyiko.mysql.binlog.event.EventData;
|
import com.github.shyiko.mysql.binlog.event.QueryEventData;
|
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
|
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
|
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import java.io.IOException;
|
import java.io.Serializable;
|
import java.util.List;
|
public class BinlogListenerMixed {
|
|
private static final Logger logger = LoggerFactory.getLogger(BinlogListenerMixed.class);
|
|
private static final String MYSQL_HOST = "127.0.0.1";
|
private static final int MYSQL_PORT = 3308;
|
private static final String MYSQL_USERNAME = "root";
|
private static final String MYSQL_PASSWORD = "root";
|
|
public static void main(String[] args) {
|
try {
|
BinaryLogClient client = new BinaryLogClient(MYSQL_HOST, MYSQL_PORT, MYSQL_USERNAME, MYSQL_PASSWORD);
|
// client.setBinlogFilename(null);
|
// client.setBinlogPosition(-1); // 或者设置为其他适当的初始位置
|
// client.setServerId(1);
|
// client.setBinlogFilename("mysql-bin.000005");
|
// client.setBinlogPosition(154);
|
EventDeserializer eventDeserializer = new EventDeserializer();
|
eventDeserializer.setCompatibilityMode(
|
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG
|
// EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY 该设置会将varchar 转为 byte[]
|
);
|
logger.info("使用主机={}, 端口={}, 用户名={}, 密码={} 连接到 MySQL", MYSQL_HOST, MYSQL_PORT, MYSQL_USERNAME, MYSQL_PASSWORD);
|
client.setEventDeserializer(eventDeserializer);
|
client.registerEventListener(BinlogListenerMixed::handleEvent);
|
client.registerLifecycleListener(new BinaryLogClient.LifecycleListener() {
|
@Override
|
public void onConnect(BinaryLogClient client) {
|
logger.info("Connected to MySQL server");
|
}
|
|
@Override
|
public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
|
logger.error("Communication failure with MySQL server", ex);
|
}
|
|
@Override
|
public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
|
logger.error("Event deserialization failure", ex);
|
}
|
|
@Override
|
public void onDisconnect(BinaryLogClient client) {
|
logger.warn("Disconnected from MySQL server");
|
// 在这里添加重新连接或其他处理逻辑
|
}
|
});
|
|
client.connect();
|
} catch (IOException e) {
|
logger.error("@@ 连接到 MySQL 时发生错误", e);
|
logger.error("@@ Error connecting to MySQL", e);
|
}
|
}
|
|
private static void handleEvent(Event event) {
|
logger.info("@@ 打印 event: {}", event);
|
logger.info("@@ Received event type: {}", event.getHeader().getEventType());
|
|
switch (event.getHeader().getEventType()) {
|
case WRITE_ROWS:
|
case EXT_WRITE_ROWS:
|
handleWriteRowsEvent((WriteRowsEventData) event.getData());
|
break;
|
case QUERY:
|
handleQueryEvent((QueryEventData) event.getData());
|
break;
|
case TABLE_MAP:
|
handleTableMapEvent((TableMapEventData) event.getData());
|
break;
|
// 其他事件处理...
|
}
|
}
|
|
private static void handleWriteRowsEvent(WriteRowsEventData eventData) {
|
List<Serializable[]> rows = eventData.getRows();
|
|
// 获取表名
|
String tableName = getTableName(eventData);
|
|
// 处理每一行数据
|
for (Serializable[] row : rows) {
|
// 根据需要调整以下代码以获取具体的列值
|
String column1Value = row[0].toString();
|
String column2Value = row[1].toString();
|
String url = row[2].toString();
|
logger.info(url);
|
|
// 将数据备份到另一个数据库
|
backupToAnotherDatabase(tableName, column1Value, column2Value);
|
}
|
}
|
|
private static void handleQueryEvent(QueryEventData eventData) {
|
String sql = eventData.getSql();
|
logger.info("@@ handleQueryEvent函数执行Query event SQL: {}", sql);
|
|
// 解析SQL语句,根据需要处理
|
// 例如,检查是否包含写入操作,然后执行相应的逻辑
|
}
|
|
private static void handleTableMapEvent(TableMapEventData eventData) {
|
// 获取表映射信息,根据需要处理
|
logger.info("@@ handleTableMapEvent函数执行TableMap event: {}", eventData);
|
}
|
|
private static String getTableName(EventData eventData) {
|
// 获取表名的逻辑,可以使用TableMapEventData等信息
|
// 根据实际情况实现
|
return "example_table";
|
}
|
|
private static void backupToAnotherDatabase(String tableName, String column1Value, String column2Value) {
|
// 将数据备份到另一个数据库的逻辑
|
logger.info("Backup to another database: Table={}, Column1={}, Column2={}", tableName, column1Value, column2Value);
|
}
|
}
|