package org.springblade.binlog.listener; import com.github.shyiko.mysql.binlog.BinaryLogClient; import com.github.shyiko.mysql.binlog.event.*; import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import lombok.extern.slf4j.Slf4j; import org.kohsuke.args4j.Option; import org.springblade.binlog.config.DataSourceConfig; import org.springblade.binlog.constant.BinLogConstants; import org.springblade.binlog.vo.BinLogItem; import org.springblade.binlog.vo.DataProperty; import java.io.IOException; import java.io.Serializable; import java.util.Map; import java.util.concurrent.*; import static com.github.shyiko.mysql.binlog.event.EventType.*; import static org.springblade.binlog.util.BinLogUtils.getColMap; import static org.springblade.binlog.util.BinLogUtils.getdbTable; /** * 数据库监听器 * * @author zrj * @since 2021/7/26 **/ @Slf4j public class MysqlBinLogListener implements BinaryLogClient.EventListener { @Option(name = "-binlog-consume_threads", usage = "the thread num of consumer") private int consumerThreads = BinLogConstants.consumerThreads; private BinaryLogClient parseClient; private BlockingQueue queue; private final ExecutorService consumer; // 存放每张数据表对应的listener private Multimap listeners; private DataSourceConfig conf; private Map> dbTableCols; private String dbTable; /** * 监听器初始化 * * @param conf */ public MysqlBinLogListener(DataSourceConfig conf) { BinaryLogClient client = new BinaryLogClient(conf.getHost(), conf.getPort(), conf.getUsername(), conf.getPassword()); // 序列化 EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY 该设置会将varchar 转为 byte[] EventDeserializer eventDeserializer = new EventDeserializer(); eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG ); client.setEventDeserializer(eventDeserializer); this.parseClient = client; this.queue = new ArrayBlockingQueue<>(1024); this.conf = conf; this.listeners = ArrayListMultimap.create(); this.dbTableCols = new ConcurrentHashMap<>(); this.consumer = Executors.newFixedThreadPool(consumerThreads); } /** * 监听处理 * * @param event */ @Override public void onEvent(Event event) { EventType eventType = event.getHeader().getEventType(); if (eventType == EventType.TABLE_MAP) { TableMapEventData tableData = event.getData(); String db = tableData.getDatabase(); String table = tableData.getTable(); dbTable = getdbTable(db, table); } // 只处理添加删除更新三种操作 if (isWrite(eventType) || isUpdate(eventType) || isDelete(eventType)) { if (isWrite(eventType)) { WriteRowsEventData data = event.getData(); for (Serializable[] row : data.getRows()) { if (dbTableCols.containsKey(dbTable)) { BinLogItem item = BinLogItem.itemFromInsertOrDeleted(row, dbTableCols.get(dbTable), eventType); item.setDbTable(dbTable); queue.add(item); } } } if (isUpdate(eventType)) { UpdateRowsEventData data = event.getData(); for (Map.Entry row : data.getRows()) { if (dbTableCols.containsKey(dbTable)) { BinLogItem item = BinLogItem.itemFromUpdate(row, dbTableCols.get(dbTable), eventType); item.setDbTable(dbTable); queue.add(item); } } } if (isDelete(eventType)) { DeleteRowsEventData data = event.getData(); for (Serializable[] row : data.getRows()) { if (dbTableCols.containsKey(dbTable)) { BinLogItem item = BinLogItem.itemFromInsertOrDeleted(row, dbTableCols.get(dbTable), eventType); item.setDbTable(dbTable); queue.add(item); } } } } } /** * 注册监听 * * @param db 数据库 * @param table 操作表 * @param listener 监听器 * @throws Exception */ public void regListener(String db, String table, BinLogListener listener) throws Exception { String dbTable = getdbTable(db, table); // 获取字段集合 Map cols = getColMap(conf, db, table); // 保存字段信息 dbTableCols.put(dbTable, cols); // 保存当前注册的listener listeners.put(dbTable, listener); } /** * 开启多线程消费 * * @throws IOException */ public void parse() throws IOException { parseClient.registerEventListener(this); for (int i = 0; i < consumerThreads; i++) { consumer.submit(() -> { while (true) { if (queue.size() > 0) { try { BinLogItem item = queue.take(); String dbtable = item.getDbTable(); listeners.get(dbtable).forEach(binLogListener -> binLogListener.onEvent(item)); } catch (InterruptedException e) { e.printStackTrace(); } } Thread.sleep(BinLogConstants.queueSleep); } }); } parseClient.connect(); } }