| src/main/java/org/springblade/common/config/ThreadPoolConfig.java | ●●●●● patch | view | raw | blame | history | |
| src/main/java/org/springblade/common/utils/SpinLockUtil.java | ●●●●● patch | view | raw | blame | history | |
| src/main/java/org/springblade/common/utils/ThreadPoolUtil.java | ●●●●● patch | view | raw | blame | history | |
| src/main/java/org/springblade/modules/house/service/impl/HouseholdServiceImpl.java | ●●●●● patch | view | raw | blame | history | |
| src/main/java/org/springblade/modules/system/service/impl/UserServiceImpl.java | ●●●●● patch | view | raw | blame | history |
src/main/java/org/springblade/common/config/ThreadPoolConfig.java
New file @@ -0,0 +1,33 @@ package org.springblade.common.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; @Configuration public class ThreadPoolConfig { @Bean(name = "customThreadPool") public ThreadPoolTaskExecutor customThreadPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 核心线程数 executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); // executor.setCorePoolSize(10); // 最大线程数 executor.setMaxPoolSize(40); // 队列大小 executor.setQueueCapacity(600); // 线程空闲时间(秒) executor.setKeepAliveSeconds(60); // 线程名称前缀 executor.setThreadNamePrefix("CustomThreadPool-"); // 设置拒绝策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 初始化线程池 executor.initialize(); return executor; } } src/main/java/org/springblade/common/utils/SpinLockUtil.java
New file @@ -0,0 +1,55 @@ package org.springblade.common.utils; import org.springblade.core.redis.cache.BladeRedis; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; @Component public class SpinLockUtil { @Autowired private BladeRedis bladeRedis; private static final String LOCK_PREFIX = "thread-safe-lock:"; private static final Long DEFAULT_LOCK_TIMEOUT = 6L; // 默认锁超时时间,单位:秒 /** * 尝试获取锁,如果获取失败则自旋等待。 * * @param lockKey 锁的键 * @return 是否成功获取锁 */ public boolean tryLock(String lockKey) { String threadId = Thread.currentThread().getName() + Thread.currentThread().getId(); // 使用线程名和ID作为标识 long startTime = System.currentTimeMillis(); while (System.currentTimeMillis() - startTime < DEFAULT_LOCK_TIMEOUT * 1000) { if (bladeRedis.getRedisTemplate().opsForValue().setIfAbsent(LOCK_PREFIX + lockKey, threadId, DEFAULT_LOCK_TIMEOUT, TimeUnit.SECONDS)) { return true; } // 短暂休眠,避免高并发下的CPU过度消耗 LockSupport.parkNanos(100_000_000L); // 使用LockSupport进行更细粒度的暂停,单位为纳秒 } return false; } /** * 释放锁,只有加锁的线程才能解锁 * * @param lockKey 锁的键 * @return 是否成功释放锁 */ public boolean unlock(String lockKey) { String threadId = Thread.currentThread().getName() + Thread.currentThread().getId(); String currentValue = (String) bladeRedis.getRedisTemplate().opsForValue().get(LOCK_PREFIX + lockKey); if (threadId.equals(currentValue)) { bladeRedis.getRedisTemplate().delete(LOCK_PREFIX + lockKey); return true; } return false; // 如果不是当前线程持有的锁,则返回false,不进行解锁操作 } } src/main/java/org/springblade/common/utils/ThreadPoolUtil.java
New file @@ -0,0 +1,40 @@ package org.springblade.common.utils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.util.concurrent.Callable; import java.util.concurrent.Future; @Component public class ThreadPoolUtil { // 指定Ben名称注入 @Qualifier("customThreadPool") @Autowired private ThreadPoolTaskExecutor executor; // @Autowired // public ThreadPoolUtil(ThreadPoolTaskExecutor executor) { // this.executor = executor; // } /** * 提交一个Runnable任务到线程池执行 * @param task 要执行的任务 */ public void execute(Runnable task) { executor.execute(task); } /** * 提交一个Callable任务到线程池执行,并返回Future对象用于获取结果 * @param task 要执行的任务 * @return Future对象 */ public <T> Future<T> submit(Callable<T> task) { return executor.submit(task); } } src/main/java/org/springblade/modules/house/service/impl/HouseholdServiceImpl.java
@@ -31,6 +31,7 @@ import org.springblade.common.param.CommonParamSet; import org.springblade.common.utils.NodeTreeUtil; import org.springblade.common.utils.SpringUtils; import org.springblade.common.utils.ThreadPoolUtil; import org.springblade.core.secure.utils.AuthUtil; import org.springblade.core.tool.utils.BeanUtil; import org.springblade.core.tool.utils.Func; @@ -58,13 +59,14 @@ import org.springblade.modules.system.service.IUserService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.StopWatch; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -93,6 +95,10 @@ @Autowired private IHouseRentalService iHouseRentalService; @Autowired private ThreadPoolUtil threadPoolUtil; @Override @@ -361,17 +367,15 @@ int importNum = 0; int updateNum = 0; int errorNum = 0; // 创建一个固定大小的线程池 int numberOfThreads = Runtime.getRuntime().availableProcessors(); // 使用可用处理器数量作为线程数 ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads); int runErrorNum = 0; // 创建Future列表来保存每个任务的结果 List<Future<Map<String, String>>> futures = new ArrayList<>(); for (ImportHouseholdExcel houseHoldExcel : data) { Callable<Map<String, String>> task = () -> importHouseHold(houseHoldExcel, houseService, isCovered, isTenant); futures.add(executorService.submit(task)); // 提交任务 futures.add(threadPoolUtil.submit(() -> { // 这里执行你的异步任务 return importHouseHold(houseHoldExcel, houseService, isCovered, isTenant); })); } // 收集并打印结果 for (Future<Map<String, String>> future : futures) { @@ -399,18 +403,9 @@ } // 获取并打印每个任务的结果 } catch (Exception e) { errorNum++; runErrorNum++; logger.error("获取异常-----》", e); } } // 关闭线程池 executorService.shutdown(); try { if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { executorService.shutdownNow(); } } catch (InterruptedException e) { executorService.shutdownNow(); } StringBuilder builder = new StringBuilder("导入完成!"); builder.append("其中本次表格共有 ").append(totalNum).append(" 条数据,") @@ -421,10 +416,13 @@ if (errorNum > 0) { builder.append("共有 ").append(errorNum).append(" 条数据由于无姓名或门牌地址编码信息未导入!"); } if (runErrorNum > 0) { builder.append("共有 ").append(runErrorNum).append(" 条数据由于数据其他信息异常未导入!"); } return builder.toString(); } @Transactional(rollbackFor = Exception.class) @Transactional(propagation = Propagation.REQUIRES_NEW) public Map<String, String> importHouseHold(ImportHouseholdExcel houseHoldExcel, IHouseService houseService, Boolean isCovered, String isTenant) { Map<String, String> objectObjectHashMap = new HashMap<>(); HouseholdEntity householdEntity = Objects.requireNonNull(BeanUtil.copy(houseHoldExcel, HouseholdEntity.class)); src/main/java/org/springblade/modules/system/service/impl/UserServiceImpl.java
@@ -17,7 +17,6 @@ package org.springblade.modules.system.service.impl; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.Wrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; @@ -25,9 +24,10 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers; import lombok.AllArgsConstructor; import org.apache.commons.lang3.StringUtils; import org.apache.ibatis.annotations.Param; import org.apache.logging.log4j.util.Strings; import org.flowable.idm.engine.impl.persistence.entity.UserEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springblade.common.cache.DictCache; import org.springblade.common.cache.ParamCache; import org.springblade.common.cache.SysCache; @@ -35,11 +35,11 @@ import org.springblade.common.constant.CommonConstant; import org.springblade.common.constant.TenantConstant; import org.springblade.common.enums.DictEnum; import org.springblade.common.utils.SpinLockUtil; import org.springblade.core.log.exception.ServiceException; import org.springblade.core.mp.base.BaseServiceImpl; import org.springblade.core.mp.support.Condition; import org.springblade.core.mp.support.Query; import org.springblade.core.redis.cache.BladeRedis; import org.springblade.core.secure.utils.AuthUtil; import org.springblade.core.tenant.BladeTenantProperties; import org.springblade.core.tool.constant.BladeConstant; @@ -47,12 +47,9 @@ import org.springblade.core.tool.support.Kv; import org.springblade.core.tool.utils.*; import org.springblade.modules.auth.enums.UserEnum; import org.springblade.modules.community.entity.CommunityEntity; import org.springblade.modules.community.service.ICommunityService; import org.springblade.modules.grid.service.IGridmanService; import org.springblade.modules.house.entity.HouseholdEntity; import org.springblade.modules.house.service.IHouseholdService; import org.springblade.modules.house.vo.HouseholdVO; import org.springblade.modules.police.entity.PoliceAffairsGridEntity; import org.springblade.modules.police.service.IPoliceAffairsGridService; import org.springblade.modules.property.entity.PropertyCompanyEntity; @@ -67,14 +64,10 @@ import org.springblade.modules.system.vo.UserVO; import org.springblade.modules.system.wrapper.UserWrapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.ReturnType; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.ValueOperations; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.*; import java.util.concurrent.TimeUnit; import static org.springblade.common.constant.CommonConstant.DEFAULT_PARAM_PASSWORD; @@ -86,6 +79,9 @@ @Service @AllArgsConstructor public class UserServiceImpl extends BaseServiceImpl<UserMapper, User> implements IUserService { private static final Logger logger = LoggerFactory.getLogger(UserServiceImpl.class); private static final String GUEST_NAME = "guest"; private final IUserDeptService userDeptService; @@ -96,7 +92,7 @@ private final IPoliceAffairsGridService policeAffairsGridService; @Autowired private BladeRedis redisTemplate; private SpinLockUtil spinLockUtil; @Override @@ -212,53 +208,33 @@ } private boolean submitUserDept(User user) { List<Long> deptIdList = Func.toLongList(user.getDeptId()); List<UserDept> userDeptList = new ArrayList<>(); deptIdList.forEach(deptId -> { UserDept userDept = new UserDept(); userDept.setUserId(user.getId()); userDept.setDeptId(deptId); userDeptList.add(userDept); }); userDeptService.remove(Wrappers.<UserDept>update().lambda().eq(UserDept::getUserId, user.getId())); boolean b = userDeptService.saveBatch(userDeptList); return b; } /** * 尝试获取锁 * @param timeout 超时时间(毫秒) * @return 是否成功获取锁 */ public boolean lock(long timeout,String lockKey) { long endTime = System.currentTimeMillis() + timeout; ValueOperations<String, Object> ops = redisTemplate.getValueOps(); while (System.currentTimeMillis() < endTime) { // 使用setIfAbsent命令尝试设置值,仅当key不存在时设置,类似于SETNX Boolean result = ops.setIfAbsent(lockKey, String.valueOf(System.currentTimeMillis() + 5000), 5, TimeUnit.SECONDS); if (result != null && result) { return true; try { // 加锁 boolean isLock = spinLockUtil.tryLock(user.getId().toString()); boolean result = false; List<Long> deptIdList = Func.toLongList(user.getDeptId()); List<UserDept> userDeptList = new ArrayList<>(); deptIdList.forEach(deptId -> { UserDept userDept = new UserDept(); userDept.setUserId(user.getId()); userDept.setDeptId(deptId); userDeptList.add(userDept); }); // 是否加锁成功 logger.info("是否加锁成功:" + isLock); if (isLock) { userDeptService.remove(Wrappers.<UserDept>update().lambda().eq(UserDept::getUserId, user.getId())); result = userDeptService.saveBatch(userDeptList); } try { // 短暂休眠,防止CPU过度占用 Thread.sleep(10); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } // 释放锁 spinLockUtil.unlock(user.getId().toString()); return result; } catch (Exception e) { // 释放锁 spinLockUtil.unlock(user.getId().toString()); throw new RuntimeException(e); } return false; } /** * 释放锁 */ public void unlock(String lockKey) { // 使用lua脚本保证操作的原子性,避免删除非本客户端创建的锁 String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; redisTemplate.getRedisTemplate().execute((RedisCallback<Object>) (connection) -> connection.eval(script.getBytes(), ReturnType.INTEGER, 1, lockKey.getBytes(), String.valueOf(redisTemplate.getValueOps().get(lockKey)).getBytes())); } @Override