| src/main/java/org/springblade/modules/sse/controller/SSEController.java | ●●●●● patch | view | raw | blame | history | |
| src/main/java/org/springblade/modules/sse/server/SSEServer.java | ●●●●● patch | view | raw | blame | history | |
| src/main/java/org/springblade/modules/sse/vo/SseVO.java | ●●●●● patch | view | raw | blame | history | |
| src/main/java/org/springblade/modules/task/service/impl/TaskReportForRepairsServiceImpl.java | ●●●●● patch | view | raw | blame | history | |
| src/main/resources/application.yml | ●●●●● patch | view | raw | blame | history |
src/main/java/org/springblade/modules/sse/controller/SSEController.java
New file @@ -0,0 +1,41 @@ package org.springblade.modules.sse.controller; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springblade.modules.sse.server.SSEServer; import org.springblade.modules.sse.vo.SseVO; import org.springframework.web.bind.annotation.CrossOrigin; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; @Slf4j @RestController @CrossOrigin @RequestMapping("/sse/sse") @AllArgsConstructor public class SSEController { /** * 建立连接 * @param sse * @return */ @GetMapping("/connect") public SseEmitter connect(SseVO sse){ String userId = sse.getType() + ":" + sse.getUserId(); return SSEServer.connect(userId); } /** * 断开连接 * @param sse * @return */ @GetMapping("/disconnect") public void disconnect(SseVO sse){ String userId = sse.getType() + ":" + sse.getUserId(); SSEServer.removeUser(userId); } } src/main/java/org/springblade/modules/sse/server/SSEServer.java
New file @@ -0,0 +1,149 @@ package org.springblade.modules.sse.server; import lombok.extern.slf4j.Slf4j; import org.springframework.http.MediaType; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @Slf4j public class SSEServer { /** * 当前连接数 */ private static AtomicInteger count = new AtomicInteger(0); private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>(); public static SseEmitter connect(String userId){ //设置超时时间,0表示不过期,默认是30秒,超过时间未完成会抛出异常 SseEmitter sseEmitter = new SseEmitter(0L); //注册回调 sseEmitter.onCompletion(completionCallBack(userId)); sseEmitter.onError(errorCallBack(userId)); sseEmitter.onTimeout(timeOutCallBack(userId)); sseEmitterMap.put(userId,sseEmitter); //数量+1 count.getAndIncrement(); log.info("create new sse connect ,current user:{}",userId); log.info("count",count.getAndIncrement()); return sseEmitter; } /** * 给指定用户发消息 */ public static void sendMessage(String userId, String message){ if(sseEmitterMap.containsKey(userId)){ try{ sseEmitterMap.get(userId).send(message); }catch (IOException e){ log.error("user id:{}, send message error:{}",userId,e.getMessage()); e.printStackTrace(); } } } /** * 想多人发送消息,组播 */ public static void groupSendMessage(String groupId, String message){ if(sseEmitterMap!=null&&!sseEmitterMap.isEmpty()){ sseEmitterMap.forEach((k,v) -> { try{ if(k.startsWith(groupId)){ v.send(message, MediaType.APPLICATION_JSON); } }catch (IOException e){ log.error("user id:{}, send message error:{}",groupId,message); removeUser(k); } }); } } /** * 批量发送消息 * @param message */ public static void batchSendMessage(String message) { sseEmitterMap.forEach((k,v)->{ try{ v.send(message,MediaType.APPLICATION_JSON); }catch (IOException e){ log.error("user id:{}, send message error:{}",k,e.getMessage()); removeUser(k); } }); } /** * 群发消息 */ public static void batchSendMessage(String message, Set<String> userIds){ userIds.forEach(userId->sendMessage(userId,message)); } /** * 用户离线删除用户 * @param userId */ public static void removeUser(String userId){ sseEmitterMap.remove(userId); //数量-1 count.getAndDecrement(); log.info("remove user id:{}",userId); } public static List<String> getIds(){ return new ArrayList<>(sseEmitterMap.keySet()); } public static int getUserCount(){ return count.intValue(); } /** * 结束回调 * @param userId * @return */ private static Runnable completionCallBack(String userId) { return () -> { log.info("结束连接,{}",userId); removeUser(userId); }; } /** * 超时回调 * @param userId * @return */ private static Runnable timeOutCallBack(String userId){ return ()->{ log.info("连接超时,{}",userId); removeUser(userId); }; } /** * 错误回调 * @param userId * @return */ private static Consumer<Throwable> errorCallBack(String userId){ return throwable -> { log.error("连接异常,{}",userId); removeUser(userId); }; } } src/main/java/org/springblade/modules/sse/vo/SseVO.java
New file @@ -0,0 +1,17 @@ package org.springblade.modules.sse.vo; import lombok.Data; @Data public class SseVO { /** * 类型 web,app,小程序 */ private String type; /** * 用户唯一值 */ private String userId; } src/main/java/org/springblade/modules/task/service/impl/TaskReportForRepairsServiceImpl.java
@@ -16,11 +16,13 @@ */ package org.springblade.modules.task.service.impl; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.metadata.IPage; import org.springblade.core.mp.base.BaseServiceImpl; import org.springblade.core.secure.utils.AuthUtil; import org.springblade.modules.grid.entity.GridmanEntity; import org.springblade.modules.grid.service.IGridService; import org.springblade.modules.sse.server.SSEServer; import org.springblade.modules.task.entity.TaskEntity; import org.springblade.modules.task.entity.TaskReportForRepairsEntity; import org.springblade.modules.task.mapper.TaskReportForRepairsMapper; @@ -102,6 +104,8 @@ taskReportForRepairs.setTaskId(taskEntity.getId()); taskReportForRepairs.setConfirmFlag(1); flag = save(taskReportForRepairs); // 同时向web 端推送消息 SSEServer.sendMessage("web:1","1"); } return flag; } src/main/resources/application.yml
@@ -207,14 +207,17 @@ #接口放行 skip-url: - /blade-test/** - /sse/** # - /blade-doorplateAddress/doorplateAddress/** - /blade-house/house/** - /blade-label/label/** - /blade-houseRental/houseRental/** - /blade-resource/oss/** - /blade-place/** - /blade-taskReportForRepairs/** - /blade-placeExt/** # - /blade-house/house/** # - /blade-label/label/** # - /blade-houseRental/houseRental/** # - /blade-resource/oss/** # - /blade-place/** # - /blade-taskReportForRepairs/** # - /blade-placeExt/** # - /blade-grid/** # - /blade-gridman/** #授权认证配置 auth: - method: ALL