zhongrj
2024-02-23 1af3f0036d8a0ede8f97ecb2075bee43109a4977
场所记录查询过滤调整,binlog 解析调整
5 files modified
1 files added
326 ■■■■ changed files
src/main/java/org/springblade/binlog/listener/BinlogListenerMixed.java 131 ●●●●● patch | view | raw | blame | history
src/main/java/org/springblade/binlog/listener/MysqlBinLogListener.java 7 ●●●●● patch | view | raw | blame | history
src/main/java/org/springblade/binlog/listener/TourBinLogListener.java 76 ●●●●● patch | view | raw | blame | history
src/main/java/org/springblade/modules/place/mapper/PlaceMapper.java 32 ●●●● patch | view | raw | blame | history
src/main/java/org/springblade/modules/place/mapper/PlaceMapper.xml 61 ●●●● patch | view | raw | blame | history
src/main/java/org/springblade/modules/place/service/impl/PlaceServiceImpl.java 19 ●●●● patch | view | raw | blame | history
src/main/java/org/springblade/binlog/listener/BinlogListenerMixed.java
New file
@@ -0,0 +1,131 @@
package org.springblade.binlog.listener;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
public class BinlogListenerMixed {
    private static final Logger logger = LoggerFactory.getLogger(BinlogListenerMixed.class);
    private static final String MYSQL_HOST = "127.0.0.1";
    private static final int MYSQL_PORT = 3308;
    private static final String MYSQL_USERNAME = "root";
    private static final String MYSQL_PASSWORD = "root";
    public static void main(String[] args) {
        try {
            BinaryLogClient client = new BinaryLogClient(MYSQL_HOST, MYSQL_PORT, MYSQL_USERNAME, MYSQL_PASSWORD);
//            client.setBinlogFilename(null);
//            client.setBinlogPosition(-1); // 或者设置为其他适当的初始位置
//            client.setServerId(1);
//            client.setBinlogFilename("mysql-bin.000005");
//            client.setBinlogPosition(154);
            EventDeserializer eventDeserializer = new EventDeserializer();
            eventDeserializer.setCompatibilityMode(
                EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG
                // EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY 该设置会将varchar 转为 byte[]
            );
            logger.info("使用主机={}, 端口={}, 用户名={}, 密码={} 连接到 MySQL", MYSQL_HOST, MYSQL_PORT, MYSQL_USERNAME, MYSQL_PASSWORD);
            client.setEventDeserializer(eventDeserializer);
            client.registerEventListener(BinlogListenerMixed::handleEvent);
            client.registerLifecycleListener(new BinaryLogClient.LifecycleListener() {
                @Override
                public void onConnect(BinaryLogClient client) {
                    logger.info("Connected to MySQL server");
                }
                @Override
                public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
                    logger.error("Communication failure with MySQL server", ex);
                }
                @Override
                public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
                    logger.error("Event deserialization failure", ex);
                }
                @Override
                public void onDisconnect(BinaryLogClient client) {
                    logger.warn("Disconnected from MySQL server");
                    // 在这里添加重新连接或其他处理逻辑
                }
            });
            client.connect();
        } catch (IOException e) {
            logger.error("@@ 连接到 MySQL 时发生错误", e);
            logger.error("@@ Error connecting to MySQL", e);
        }
    }
    private static void handleEvent(Event event) {
        logger.info("@@ 打印 event: {}", event);
        logger.info("@@ Received event type: {}", event.getHeader().getEventType());
        switch (event.getHeader().getEventType()) {
            case WRITE_ROWS:
            case EXT_WRITE_ROWS:
                handleWriteRowsEvent((WriteRowsEventData) event.getData());
                break;
            case QUERY:
                handleQueryEvent((QueryEventData) event.getData());
                break;
            case TABLE_MAP:
                handleTableMapEvent((TableMapEventData) event.getData());
                break;
            // 其他事件处理...
        }
    }
    private static void handleWriteRowsEvent(WriteRowsEventData eventData) {
        List<Serializable[]> rows = eventData.getRows();
        // 获取表名
        String tableName = getTableName(eventData);
        // 处理每一行数据
        for (Serializable[] row : rows) {
            // 根据需要调整以下代码以获取具体的列值
            String column1Value = row[0].toString();
            String column2Value = row[1].toString();
            String url = row[2].toString();
            logger.info(url);
            // 将数据备份到另一个数据库
            backupToAnotherDatabase(tableName, column1Value, column2Value);
        }
    }
    private static void handleQueryEvent(QueryEventData eventData) {
        String sql = eventData.getSql();
        logger.info("@@ handleQueryEvent函数执行Query event SQL: {}", sql);
        // 解析SQL语句,根据需要处理
        // 例如,检查是否包含写入操作,然后执行相应的逻辑
    }
    private static void handleTableMapEvent(TableMapEventData eventData) {
        // 获取表映射信息,根据需要处理
        logger.info("@@ handleTableMapEvent函数执行TableMap event: {}", eventData);
    }
    private static String getTableName(EventData eventData) {
        // 获取表名的逻辑,可以使用TableMapEventData等信息
        // 根据实际情况实现
        return "example_table";
    }
    private static void backupToAnotherDatabase(String tableName, String column1Value, String column2Value) {
        // 将数据备份到另一个数据库的逻辑
        logger.info("Backup to another database: Table={}, Column1={}, Column2={}", tableName, column1Value, column2Value);
    }
}
src/main/java/org/springblade/binlog/listener/MysqlBinLogListener.java
@@ -50,11 +50,10 @@
     */
    public MysqlBinLogListener(DataSourceConfig conf) {
        BinaryLogClient client = new BinaryLogClient(conf.getHost(), conf.getPort(), conf.getUsername(), conf.getPassword());
        // 序列化 EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY 该设置会将varchar 转为 byte[]
        EventDeserializer eventDeserializer = new EventDeserializer();
        //eventDeserializer.setCompatibilityMode(//序列化
        //        EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
        //        EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
        //);
            eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG
        );
        client.setEventDeserializer(eventDeserializer);
        this.parseClient = client;
        this.queue = new ArrayBlockingQueue<>(1024);
src/main/java/org/springblade/binlog/listener/TourBinLogListener.java
@@ -6,14 +6,18 @@
import org.springblade.binlog.config.DataSourceConfig;
import org.springblade.binlog.constant.BinLogConstants;
import org.springblade.binlog.util.BinLogUtils;
import org.springblade.binlog.vo.BinLogItem;
import org.springblade.binlog.vo.DataProperty;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
 * 乐游监听器
@@ -58,7 +62,18 @@
            log.info("注册监听信息,注册DB:" + binLogConstants.getDb() + ",注册表:" + table);
            try {
                mysqlBinLogListener.regListener(binLogConstants.getDb(), table, item -> {
                    log.info("监听逻辑处理");
                    if (item.getEventType().name().equals("EXT_WRITE_ROWS")){
                        // 新增处理逻辑
                        saveHandle(item);
                    }
                    if (item.getEventType().name().equals("EXT_UPDATE_ROWS")){
                        // 更新处理逻辑
                        updateHandle(item);
                    }
                    if (item.getEventType().name().equals("EXT_DELETE_ROWS")){
                        // 删除处理逻辑
                        deletedHandle(item);
                    }
                });
            } catch (Exception e) {
                log.error("BinLog监听异常:" + e);
@@ -67,6 +82,63 @@
        // 多线程消费
        mysqlBinLogListener.parse();
    }
    /**
     * 新增处理逻辑
     * @param item
     */
    private void saveHandle(BinLogItem item) {
        String tableName = item.getDbTable().split("-")[1];
        Map<String, Serializable> data = item.getAfter();
        log.info(tableName);
        Map<String, DataProperty> dataProperty = item.getDataProperty();
        List<String> keyList = new ArrayList<>();
        List<String> valueList = new ArrayList<>();
        dataProperty.forEach((key,value)->{
            log.info(key);
            log.info(value.dataType);
            if(null!=data.get(key)) {
                log.info(data.get(key).toString());
                valueList.add(data.get(key).toString());
            }else {
                if (value.dataType.equals("int") ||
                    value.dataType.equals("bigint") ||
                    value.dataType.equals("datetime") ||
                    value.dataType.equals("date")){
                    valueList.add(null);
                }else{
                    valueList.add("");
                }
            }
            keyList.add(key);
        });
        // 拼接sql
        StringBuilder sqlBuilder = new StringBuilder();
        sqlBuilder.append("insert into ")
            .append(tableName)
            .append("(")
            .append(String.join(",",keyList))
            .append(") ")
            .append(" values (")
            .append(String.join(",",valueList))
            .append(");");
        log.info("sql: " + sqlBuilder.toString());
    }
    /**
     * 修改处理逻辑
     * @param item
     */
    private void updateHandle(BinLogItem item) {
    }
    /**
     * 删除处理逻辑
     * @param item 数据
     */
    private void deletedHandle(BinLogItem item) {
    }
}
src/main/java/org/springblade/modules/place/mapper/PlaceMapper.java
@@ -36,6 +36,22 @@
    /**
     * 自定义分页
     *
     * @param page
     * @param place
     * @param houseCodeList
     * @param regionChildCodesList
     * @param isAdministrator
     * @return
     */
    List<PlaceVO> selectPlacePage(IPage page,
                                  @Param("place") PlaceVO place,
                                  @Param("gridCodeList") List<String> gridCodeList,
                                  @Param("regionChildCodesList") List<String> regionChildCodesList,
                                  @Param("isAdministrator") Integer isAdministrator);
    /**
     * 九小场所档案
     *
     * @param page
@@ -53,22 +69,6 @@
                                  @Param("isAdministrator") Integer isAdministrator,
                                  @Param("nineTypeList") List<String> nineTypeList);
    /**
     * 自定义分页
     *
     * @param page
     * @param place
     * @param houseCodeList
     * @param regionChildCodesList
     * @param isAdministrator
     * @return
     */
    List<PlaceVO> selectPlacePage(IPage page,
                                  @Param("place") PlaceVO place,
                                  @Param("houseCodeList") List<String> houseCodeList,
                                  @Param("regionChildCodesList") List<String> regionChildCodesList,
                                  @Param("isAdministrator") Integer isAdministrator);
    /**
     * 查询场所集合信息
src/main/java/org/springblade/modules/place/mapper/PlaceMapper.xml
@@ -73,22 +73,59 @@
        <if test="place.isPerfect==2">
            and jp.status = 2
        </if>
        <if test="houseCodeList != null and houseCodeList.size()>0">
            and jp.house_code in
            <foreach collection="houseCodeList" item="houseCode" separator ="," open="("  close=")">
                #{houseCode}
            </foreach>
        </if>
        <if test="isAdministrator==2">
            <choose>
                <when test="regionChildCodesList !=null and regionChildCodesList.size()>0">
                    and jg.grid_code in
                    <foreach collection="regionChildCodesList" item="code" open="(" close=")" separator=",">
                        #{code}
                    </foreach>
                <when test="place.roleName != null and place.roleName != ''">
                    <if test="place.roleName=='wgy'">
                        <choose>
                            <when test="gridCodeList !=null and gridCodeList.size()>0">
                                and jp.grid_code in
                                <foreach collection="gridCodeList" item="code" open="(" close=")" separator=",">
                                    #{code}
                                </foreach>
                            </when>
                            <otherwise>
                                and jp.grid_code in ('')
                            </otherwise>
                        </choose>
                    </if>
                    <if test="place.roleName=='mj'">
                        <choose>
                            <when test="regionChildCodesList !=null and regionChildCodesList.size()>0">
                                and jpag.community_code in
                                <foreach collection="regionChildCodesList" item="code" open="(" close=")" separator=",">
                                    #{code}
                                </foreach>
                            </when>
                            <otherwise>
                                and jpag.community_code in ('')
                            </otherwise>
                        </choose>
                    </if>
                </when>
                <otherwise>
                    and jg.grid_code in ('')
                    <choose>
                        <when test="regionChildCodesList !=null and regionChildCodesList.size()>0">
                            and
                            (
                            jg.grid_code in
                            <foreach collection="regionChildCodesList" item="code" open="(" close=")" separator=",">
                                #{code}
                            </foreach>
                            or
                            jpag.community_code in
                            <foreach collection="regionChildCodesList" item="code" open="(" close=")" separator=",">
                                #{code}
                            </foreach>
                            )
                        </when>
                        <otherwise>
                            and
                            (
                            jg.grid_code in ('') or jpag.community_code in ('')
                            )
                        </otherwise>
                    </choose>
                </otherwise>
            </choose>
        </if>
src/main/java/org/springblade/modules/place/service/impl/PlaceServiceImpl.java
@@ -24,6 +24,7 @@
import org.apache.logging.log4j.util.Strings;
import org.springblade.common.cache.SysCache;
import org.springblade.common.node.TreeStringNode;
import org.springblade.common.param.CommonParamSet;
import org.springblade.common.utils.IdUtils;
import org.springblade.common.utils.SpringUtils;
import org.springblade.core.mp.support.Condition;
@@ -48,6 +49,7 @@
import org.springblade.modules.place.service.IPlaceExtService;
import org.springblade.modules.place.service.IPlacePoiLabelService;
import org.springblade.modules.place.service.IPlaceRelService;
import org.springblade.modules.place.vo.PlaceCheckVO;
import org.springblade.modules.place.vo.PlacePoiLabelVO;
import org.springblade.modules.place.vo.PlaceVO;
import org.springblade.modules.place.mapper.PlaceMapper;
@@ -119,16 +121,13 @@
     */
    @Override
    public IPage<PlaceVO> selectPlacePage(IPage<PlaceVO> page, PlaceVO place) {
        List<String> regionChildCodesList = SysCache.getRegionChildCodesByDeptId(AuthUtil.getDeptId());
        Integer isAdministrator = AuthUtil.isAdministrator() == true ? 1 : 2;
        List<String> list = new ArrayList<>();
        if (null != place.getRoleName() && !place.getRoleName().equals("")) {
            if (place.getRoleName().equals("网格员")) {
                // 查询对应的房屋地址code
                list = gridService.getAddressCodeListByUserId(AuthUtil.getUserId());
            }
        }
        List<PlaceVO> placeVOS = baseMapper.selectPlacePage(page, place, list, regionChildCodesList, isAdministrator);
        // 公共参数设置
        CommonParamSet commonParamSet = new CommonParamSet().invoke(PlaceVO.class,place);
        List<PlaceVO> placeVOS = baseMapper.selectPlacePage(page,
            place,
            commonParamSet.getGridCodeList(),
            commonParamSet.getRegionChildCodesList(),
            commonParamSet.getIsAdministrator());
        // 返回
        return page.setRecords(placeVOS);
    }