linwe
2024-06-19 6b113a88e7b7b625405cc0b1d46efa22d46c71a2
住户导入创建共用线程池和用户机构插入加锁
2 files modified
3 files added
252 ■■■■ changed files
src/main/java/org/springblade/common/config/ThreadPoolConfig.java 33 ●●●●● patch | view | raw | blame | history
src/main/java/org/springblade/common/utils/SpinLockUtil.java 55 ●●●●● patch | view | raw | blame | history
src/main/java/org/springblade/common/utils/ThreadPoolUtil.java 40 ●●●●● patch | view | raw | blame | history
src/main/java/org/springblade/modules/house/service/impl/HouseholdServiceImpl.java 38 ●●●● patch | view | raw | blame | history
src/main/java/org/springblade/modules/system/service/impl/UserServiceImpl.java 86 ●●●●● patch | view | raw | blame | history
src/main/java/org/springblade/common/config/ThreadPoolConfig.java
New file
@@ -0,0 +1,33 @@
package org.springblade.common.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class ThreadPoolConfig {
    @Bean(name = "customThreadPool")
    public ThreadPoolTaskExecutor customThreadPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 核心线程数
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        // executor.setCorePoolSize(10);
        // 最大线程数
        executor.setMaxPoolSize(40);
        // 队列大小
        executor.setQueueCapacity(600);
        // 线程空闲时间(秒)
        executor.setKeepAliveSeconds(60);
        // 线程名称前缀
        executor.setThreadNamePrefix("CustomThreadPool-");
        // 设置拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 初始化线程池
        executor.initialize();
        return executor;
    }
}
src/main/java/org/springblade/common/utils/SpinLockUtil.java
New file
@@ -0,0 +1,55 @@
package org.springblade.common.utils;
import org.springblade.core.redis.cache.BladeRedis;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
@Component
public class SpinLockUtil {
    @Autowired
    private BladeRedis bladeRedis;
    private static final String LOCK_PREFIX = "thread-safe-lock:";
    private static final Long DEFAULT_LOCK_TIMEOUT = 6L; // 默认锁超时时间,单位:秒
    /**
     * 尝试获取锁,如果获取失败则自旋等待。
     *
     * @param lockKey 锁的键
     * @return 是否成功获取锁
     */
    public boolean tryLock(String lockKey) {
        String threadId = Thread.currentThread().getName() + Thread.currentThread().getId(); // 使用线程名和ID作为标识
        long startTime = System.currentTimeMillis();
        while (System.currentTimeMillis() - startTime < DEFAULT_LOCK_TIMEOUT * 1000) {
            if (bladeRedis.getRedisTemplate().opsForValue().setIfAbsent(LOCK_PREFIX + lockKey, threadId,
                DEFAULT_LOCK_TIMEOUT, TimeUnit.SECONDS)) {
                return true;
            }
            // 短暂休眠,避免高并发下的CPU过度消耗
            LockSupport.parkNanos(100_000_000L); // 使用LockSupport进行更细粒度的暂停,单位为纳秒
        }
        return false;
    }
    /**
     * 释放锁,只有加锁的线程才能解锁
     *
     * @param lockKey 锁的键
     * @return 是否成功释放锁
     */
    public boolean unlock(String lockKey) {
        String threadId = Thread.currentThread().getName() + Thread.currentThread().getId();
        String currentValue = (String) bladeRedis.getRedisTemplate().opsForValue().get(LOCK_PREFIX + lockKey);
        if (threadId.equals(currentValue)) {
            bladeRedis.getRedisTemplate().delete(LOCK_PREFIX + lockKey);
            return true;
        }
        return false; // 如果不是当前线程持有的锁,则返回false,不进行解锁操作
    }
}
src/main/java/org/springblade/common/utils/ThreadPoolUtil.java
New file
@@ -0,0 +1,40 @@
package org.springblade.common.utils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
@Component
public class ThreadPoolUtil {
    // 指定Ben名称注入
    @Qualifier("customThreadPool")
    @Autowired
    private ThreadPoolTaskExecutor executor;
    // @Autowired
    // public ThreadPoolUtil(ThreadPoolTaskExecutor executor) {
    //     this.executor = executor;
    // }
    /**
     * 提交一个Runnable任务到线程池执行
     * @param task 要执行的任务
     */
    public void execute(Runnable task) {
        executor.execute(task);
    }
    /**
     * 提交一个Callable任务到线程池执行,并返回Future对象用于获取结果
     * @param task 要执行的任务
     * @return Future对象
     */
    public <T> Future<T> submit(Callable<T> task) {
        return executor.submit(task);
    }
}
src/main/java/org/springblade/modules/house/service/impl/HouseholdServiceImpl.java
@@ -31,6 +31,7 @@
import org.springblade.common.param.CommonParamSet;
import org.springblade.common.utils.NodeTreeUtil;
import org.springblade.common.utils.SpringUtils;
import org.springblade.common.utils.ThreadPoolUtil;
import org.springblade.core.secure.utils.AuthUtil;
import org.springblade.core.tool.utils.BeanUtil;
import org.springblade.core.tool.utils.Func;
@@ -58,13 +59,14 @@
import org.springblade.modules.system.service.IUserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StopWatch;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@@ -93,6 +95,10 @@
    @Autowired
    private IHouseRentalService iHouseRentalService;
    @Autowired
    private ThreadPoolUtil threadPoolUtil;
    @Override
@@ -361,17 +367,15 @@
        int importNum = 0;
        int updateNum = 0;
        int errorNum = 0;
        // 创建一个固定大小的线程池
        int numberOfThreads = Runtime.getRuntime().availableProcessors(); // 使用可用处理器数量作为线程数
        ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
        int runErrorNum = 0;
        // 创建Future列表来保存每个任务的结果
        List<Future<Map<String, String>>> futures = new ArrayList<>();
        for (ImportHouseholdExcel houseHoldExcel : data) {
            Callable<Map<String, String>> task = () -> importHouseHold(houseHoldExcel, houseService, isCovered, isTenant);
            futures.add(executorService.submit(task));
            // 提交任务
            futures.add(threadPoolUtil.submit(() -> {
                // 这里执行你的异步任务
                return importHouseHold(houseHoldExcel, houseService, isCovered, isTenant);
            }));
        }
        // 收集并打印结果
        for (Future<Map<String, String>> future : futures) {
@@ -399,18 +403,9 @@
                }
                // 获取并打印每个任务的结果
            } catch (Exception e) {
                errorNum++;
                runErrorNum++;
                logger.error("获取异常-----》", e);
            }
        }
        // 关闭线程池
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            executorService.shutdownNow();
        }
        StringBuilder builder = new StringBuilder("导入完成!");
        builder.append("其中本次表格共有 ").append(totalNum).append(" 条数据,")
@@ -421,10 +416,13 @@
        if (errorNum > 0) {
            builder.append("共有 ").append(errorNum).append(" 条数据由于无姓名或门牌地址编码信息未导入!");
        }
        if (runErrorNum > 0) {
            builder.append("共有 ").append(runErrorNum).append(" 条数据由于数据其他信息异常未导入!");
        }
        return builder.toString();
    }
    @Transactional(rollbackFor = Exception.class)
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public Map<String, String> importHouseHold(ImportHouseholdExcel houseHoldExcel, IHouseService houseService, Boolean isCovered, String isTenant) {
        Map<String, String> objectObjectHashMap = new HashMap<>();
        HouseholdEntity householdEntity = Objects.requireNonNull(BeanUtil.copy(houseHoldExcel, HouseholdEntity.class));
src/main/java/org/springblade/modules/system/service/impl/UserServiceImpl.java
@@ -17,7 +17,6 @@
package org.springblade.modules.system.service.impl;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
@@ -25,9 +24,10 @@
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.AllArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.annotations.Param;
import org.apache.logging.log4j.util.Strings;
import org.flowable.idm.engine.impl.persistence.entity.UserEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springblade.common.cache.DictCache;
import org.springblade.common.cache.ParamCache;
import org.springblade.common.cache.SysCache;
@@ -35,11 +35,11 @@
import org.springblade.common.constant.CommonConstant;
import org.springblade.common.constant.TenantConstant;
import org.springblade.common.enums.DictEnum;
import org.springblade.common.utils.SpinLockUtil;
import org.springblade.core.log.exception.ServiceException;
import org.springblade.core.mp.base.BaseServiceImpl;
import org.springblade.core.mp.support.Condition;
import org.springblade.core.mp.support.Query;
import org.springblade.core.redis.cache.BladeRedis;
import org.springblade.core.secure.utils.AuthUtil;
import org.springblade.core.tenant.BladeTenantProperties;
import org.springblade.core.tool.constant.BladeConstant;
@@ -47,12 +47,9 @@
import org.springblade.core.tool.support.Kv;
import org.springblade.core.tool.utils.*;
import org.springblade.modules.auth.enums.UserEnum;
import org.springblade.modules.community.entity.CommunityEntity;
import org.springblade.modules.community.service.ICommunityService;
import org.springblade.modules.grid.service.IGridmanService;
import org.springblade.modules.house.entity.HouseholdEntity;
import org.springblade.modules.house.service.IHouseholdService;
import org.springblade.modules.house.vo.HouseholdVO;
import org.springblade.modules.police.entity.PoliceAffairsGridEntity;
import org.springblade.modules.police.service.IPoliceAffairsGridService;
import org.springblade.modules.property.entity.PropertyCompanyEntity;
@@ -67,14 +64,10 @@
import org.springblade.modules.system.vo.UserVO;
import org.springblade.modules.system.wrapper.UserWrapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.ReturnType;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.*;
import java.util.concurrent.TimeUnit;
import static org.springblade.common.constant.CommonConstant.DEFAULT_PARAM_PASSWORD;
@@ -86,6 +79,9 @@
@Service
@AllArgsConstructor
public class UserServiceImpl extends BaseServiceImpl<UserMapper, User> implements IUserService {
    private static final Logger logger = LoggerFactory.getLogger(UserServiceImpl.class);
    private static final String GUEST_NAME = "guest";
    private final IUserDeptService userDeptService;
@@ -96,7 +92,7 @@
    private final IPoliceAffairsGridService policeAffairsGridService;
    @Autowired
    private BladeRedis redisTemplate;
    private SpinLockUtil spinLockUtil;
    @Override
@@ -212,53 +208,33 @@
    }
    private boolean submitUserDept(User user) {
        List<Long> deptIdList = Func.toLongList(user.getDeptId());
        List<UserDept> userDeptList = new ArrayList<>();
        deptIdList.forEach(deptId -> {
            UserDept userDept = new UserDept();
            userDept.setUserId(user.getId());
            userDept.setDeptId(deptId);
            userDeptList.add(userDept);
        });
        userDeptService.remove(Wrappers.<UserDept>update().lambda().eq(UserDept::getUserId, user.getId()));
        boolean b = userDeptService.saveBatch(userDeptList);
        return b;
    }
    /**
     * 尝试获取锁
     * @param timeout 超时时间(毫秒)
     * @return 是否成功获取锁
     */
    public boolean lock(long timeout,String lockKey) {
        long endTime = System.currentTimeMillis() + timeout;
        ValueOperations<String, Object> ops = redisTemplate.getValueOps();
        while (System.currentTimeMillis() < endTime) {
            // 使用setIfAbsent命令尝试设置值,仅当key不存在时设置,类似于SETNX
            Boolean result = ops.setIfAbsent(lockKey, String.valueOf(System.currentTimeMillis() + 5000), 5, TimeUnit.SECONDS);
            if (result != null && result) {
                return true;
        try {
            // 加锁
            boolean isLock = spinLockUtil.tryLock(user.getId().toString());
            boolean result = false;
            List<Long> deptIdList = Func.toLongList(user.getDeptId());
            List<UserDept> userDeptList = new ArrayList<>();
            deptIdList.forEach(deptId -> {
                UserDept userDept = new UserDept();
                userDept.setUserId(user.getId());
                userDept.setDeptId(deptId);
                userDeptList.add(userDept);
            });
            // 是否加锁成功
            logger.info("是否加锁成功:" + isLock);
            if (isLock) {
                userDeptService.remove(Wrappers.<UserDept>update().lambda().eq(UserDept::getUserId, user.getId()));
                result = userDeptService.saveBatch(userDeptList);
            }
            try {
                // 短暂休眠,防止CPU过度占用
                Thread.sleep(10);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            // 释放锁
            spinLockUtil.unlock(user.getId().toString());
            return result;
        } catch (Exception e) {
            // 释放锁
            spinLockUtil.unlock(user.getId().toString());
            throw new RuntimeException(e);
        }
        return false;
    }
    /**
     * 释放锁
     */
    public void unlock(String lockKey) {
        // 使用lua脚本保证操作的原子性,避免删除非本客户端创建的锁
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        redisTemplate.getRedisTemplate().execute((RedisCallback<Object>) (connection) -> connection.eval(script.getBytes(), ReturnType.INTEGER, 1, lockKey.getBytes(), String.valueOf(redisTemplate.getValueOps().get(lockKey)).getBytes()));
    }
    @Override