| | |
| | | ); |
| | | client.setEventDeserializer(eventDeserializer); |
| | | this.parseClient = client; |
| | | this.queue = new ArrayBlockingQueue<>(1024); |
| | | this.queue = new LinkedBlockingQueue<>(); |
| | | this.conf = conf; |
| | | this.listeners = ArrayListMultimap.create(); |
| | | this.dbTableCols = new ConcurrentHashMap<>(); |
| | |
| | | consumer.submit(() -> { |
| | | while (true) { |
| | | if (queue.size() > 0) { |
| | | log.info("队列个数1:" + queue.size()); |
| | | try { |
| | | BinLogItem item = queue.take(); |
| | | log.info("队列个数2:" + queue.size()); |
| | | String dbtable = item.getDbTable(); |
| | | listeners.get(dbtable).forEach(binLogListener -> binLogListener.onEvent(item)); |
| | | } catch (InterruptedException e) { |