package org.springblade.binlog.listener; import cn.hutool.core.collection.CollectionUtil; import lombok.extern.slf4j.Slf4j; import org.springblade.binlog.config.DataSourceConfig; import org.springblade.binlog.constant.BinLogConstants; import org.springblade.binlog.util.BinLogUtils; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Configuration; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.List; /** * 乐游监听器 * SpringBoot启动成功后的执行业务线程操作 * CommandLineRunner去实现此操作 * 在有多个可被执行的业务时,通过使用 @Order 注解,设置各个线程的启动顺序(value值由小到大表示启动顺序)。 * 多个实现CommandLineRunner接口的类必须要设置启动顺序,不让程序启动会报错! * * @author zrj * @since 2021/7/27 **/ @Slf4j @Component @Order(value = 1) @Configuration(proxyBeanMethods = false) @ConditionalOnProperty(value = "binlog.enabled") public class TourBinLogListener implements CommandLineRunner { @Resource private BinLogConstants binLogConstants; @Override public void run(String... args) throws Exception { log.info("初始化配置信息:" + binLogConstants.toString()); // 初始化配置信息 DataSourceConfig conf = new DataSourceConfig(binLogConstants.getHost(), binLogConstants.getPort(), binLogConstants.getUsername(), binLogConstants.getPassword()); // 初始化监听器 MysqlBinLogListener mysqlBinLogListener = new MysqlBinLogListener(conf); // 获取table集合 List tableList = BinLogUtils.getListByStr(binLogConstants.getTable()); if (CollectionUtil.isEmpty(tableList)) { return; } // 注册监听 tableList.forEach(table -> { log.info("注册监听信息,注册DB:" + binLogConstants.getDb() + ",注册表:" + table); try { mysqlBinLogListener.regListener(binLogConstants.getDb(), table, item -> { log.info("监听逻辑处理"); }); } catch (Exception e) { log.error("BinLog监听异常:" + e); } }); // 多线程消费 mysqlBinLogListener.parse(); } }