zhongrj
2024-04-02 bd7035535abfda6be9ce792fb725d0d754ed42e3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package org.springblade.binlog.listener;
 
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import lombok.extern.slf4j.Slf4j;
import org.kohsuke.args4j.Option;
import org.springblade.binlog.config.DataSourceConfig;
import org.springblade.binlog.constant.BinLogConstants;
import org.springblade.binlog.vo.BinLogItem;
import org.springblade.binlog.vo.DataProperty;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.*;
import static com.github.shyiko.mysql.binlog.event.EventType.*;
import static org.springblade.binlog.util.BinLogUtils.getColMap;
import static org.springblade.binlog.util.BinLogUtils.getdbTable;
 
/**
 * 数据库监听器
 *
 * @author zrj
 * @since 2021/7/26
 **/
@Slf4j
public class MysqlBinLogListener implements BinaryLogClient.EventListener {
 
    @Option(name = "-binlog-consume_threads", usage = "the thread num of consumer")
    private int consumerThreads = BinLogConstants.consumerThreads;
 
    private BinaryLogClient parseClient;
 
    private BlockingQueue<BinLogItem> queue;
    private final ExecutorService consumer;
 
    // 存放每张数据表对应的listener
    private Multimap<String, BinLogListener> listeners;
 
    private DataSourceConfig conf;
    private Map<String, Map<String, DataProperty>> dbTableCols;
    private String dbTable;
 
    /**
     * 监听器初始化
     *
     * @param conf
     */
    public MysqlBinLogListener(DataSourceConfig conf) {
        BinaryLogClient client = new BinaryLogClient(conf.getHost(), conf.getPort(), conf.getUsername(), conf.getPassword());
        // 序列化 EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY 该设置会将varchar 转为 byte[]
        EventDeserializer eventDeserializer = new EventDeserializer();
            eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG
        );
        client.setEventDeserializer(eventDeserializer);
        this.parseClient = client;
        this.queue = new LinkedBlockingQueue<>();
        this.conf = conf;
        this.listeners = ArrayListMultimap.create();
        this.dbTableCols = new ConcurrentHashMap<>();
        this.consumer = Executors.newFixedThreadPool(consumerThreads);
    }
 
    /**
     * 监听处理
     *
     * @param event
     */
    @Override
    public void onEvent(Event event) {
        EventType eventType = event.getHeader().getEventType();
 
        if (eventType == EventType.TABLE_MAP) {
            TableMapEventData tableData = event.getData();
            String db = tableData.getDatabase();
            String table = tableData.getTable();
            dbTable = getdbTable(db, table);
        }
 
        // 只处理添加删除更新三种操作
        if (isWrite(eventType) || isUpdate(eventType) || isDelete(eventType)) {
            if (isWrite(eventType)) {
                WriteRowsEventData data = event.getData();
                for (Serializable[] row : data.getRows()) {
                    if (dbTableCols.containsKey(dbTable)) {
                        BinLogItem item = BinLogItem.itemFromInsertOrDeleted(row, dbTableCols.get(dbTable), eventType);
                        item.setDbTable(dbTable);
                        queue.add(item);
                    }
                }
            }
            if (isUpdate(eventType)) {
                UpdateRowsEventData data = event.getData();
                for (Map.Entry<Serializable[], Serializable[]> row : data.getRows()) {
                    if (dbTableCols.containsKey(dbTable)) {
                        BinLogItem item = BinLogItem.itemFromUpdate(row, dbTableCols.get(dbTable), eventType);
                        item.setDbTable(dbTable);
                        queue.add(item);
                    }
                }
 
            }
            if (isDelete(eventType)) {
                DeleteRowsEventData data = event.getData();
                for (Serializable[] row : data.getRows()) {
                    if (dbTableCols.containsKey(dbTable)) {
                        BinLogItem item = BinLogItem.itemFromInsertOrDeleted(row, dbTableCols.get(dbTable), eventType);
                        item.setDbTable(dbTable);
                        queue.add(item);
                    }
                }
            }
        }
    }
 
    /**
     * 注册监听
     *
     * @param db       数据库
     * @param table    操作表
     * @param listener 监听器
     * @throws Exception
     */
    public void regListener(String db, String table, BinLogListener listener) throws Exception {
        String dbTable = getdbTable(db, table);
        // 获取字段集合
        Map<String, DataProperty> cols = getColMap(conf, db, table);
        // 保存字段信息
        dbTableCols.put(dbTable, cols);
        // 保存当前注册的listener
        listeners.put(dbTable, listener);
    }
 
    /**
     * 开启多线程消费
     *
     * @throws IOException
     */
    public void parse() throws IOException {
        parseClient.registerEventListener(this);
 
        for (int i = 0; i < consumerThreads; i++) {
            consumer.submit(() -> {
                while (true) {
                    if (queue.size() > 0) {
                        log.info("队列个数1:" + queue.size());
                        try {
                            BinLogItem item = queue.take();
                            log.info("队列个数2:" + queue.size());
                            String dbtable = item.getDbTable();
                            listeners.get(dbtable).forEach(binLogListener -> binLogListener.onEvent(item));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    Thread.sleep(BinLogConstants.queueSleep);
                }
            });
        }
        parseClient.connect();
    }
 
}