zhongrj
2023-11-30 69802fcc0c075a6544ac86033287223bb3eee1c5
新增报事报修实时消息提示推送
2 files modified
3 files added
228 ■■■■■ changed files
src/main/java/org/springblade/modules/sse/controller/SSEController.java 41 ●●●●● patch | view | raw | blame | history
src/main/java/org/springblade/modules/sse/server/SSEServer.java 149 ●●●●● patch | view | raw | blame | history
src/main/java/org/springblade/modules/sse/vo/SseVO.java 17 ●●●●● patch | view | raw | blame | history
src/main/java/org/springblade/modules/task/service/impl/TaskReportForRepairsServiceImpl.java 4 ●●●● patch | view | raw | blame | history
src/main/resources/application.yml 17 ●●●●● patch | view | raw | blame | history
src/main/java/org/springblade/modules/sse/controller/SSEController.java
New file
@@ -0,0 +1,41 @@
package org.springblade.modules.sse.controller;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springblade.modules.sse.server.SSEServer;
import org.springblade.modules.sse.vo.SseVO;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@Slf4j
@RestController
@CrossOrigin
@RequestMapping("/sse/sse")
@AllArgsConstructor
public class SSEController {
    /**
     * 建立连接
     * @param sse
     * @return
     */
    @GetMapping("/connect")
    public SseEmitter connect(SseVO sse){
        String userId = sse.getType() + ":" + sse.getUserId();
        return SSEServer.connect(userId);
    }
    /**
     * 断开连接
     * @param sse
     * @return
     */
    @GetMapping("/disconnect")
    public void disconnect(SseVO sse){
        String userId = sse.getType() + ":" + sse.getUserId();
        SSEServer.removeUser(userId);
    }
}
src/main/java/org/springblade/modules/sse/server/SSEServer.java
New file
@@ -0,0 +1,149 @@
package org.springblade.modules.sse.server;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
@Slf4j
public class SSEServer {
    /**
     * 当前连接数
     */
    private static AtomicInteger count = new AtomicInteger(0);
    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
    public static SseEmitter connect(String userId){
        //设置超时时间,0表示不过期,默认是30秒,超过时间未完成会抛出异常
        SseEmitter sseEmitter = new SseEmitter(0L);
        //注册回调
        sseEmitter.onCompletion(completionCallBack(userId));
        sseEmitter.onError(errorCallBack(userId));
        sseEmitter.onTimeout(timeOutCallBack(userId));
        sseEmitterMap.put(userId,sseEmitter);
        //数量+1
        count.getAndIncrement();
        log.info("create new sse connect ,current user:{}",userId);
        log.info("count",count.getAndIncrement());
        return sseEmitter;
    }
    /**
     * 给指定用户发消息
     */
    public static void sendMessage(String userId, String message){
        if(sseEmitterMap.containsKey(userId)){
            try{
                sseEmitterMap.get(userId).send(message);
            }catch (IOException e){
                log.error("user id:{}, send message error:{}",userId,e.getMessage());
                e.printStackTrace();
            }
        }
    }
    /**
     * 想多人发送消息,组播
     */
    public static void groupSendMessage(String groupId, String message){
        if(sseEmitterMap!=null&&!sseEmitterMap.isEmpty()){
            sseEmitterMap.forEach((k,v) -> {
                try{
                    if(k.startsWith(groupId)){
                        v.send(message, MediaType.APPLICATION_JSON);
                    }
                }catch (IOException e){
                    log.error("user id:{}, send message error:{}",groupId,message);
                    removeUser(k);
                }
            });
        }
    }
    /**
     * 批量发送消息
     * @param message
     */
    public static void batchSendMessage(String message) {
        sseEmitterMap.forEach((k,v)->{
            try{
                v.send(message,MediaType.APPLICATION_JSON);
            }catch (IOException e){
                log.error("user id:{}, send message error:{}",k,e.getMessage());
                removeUser(k);
            }
        });
    }
    /**
     * 群发消息
     */
    public static void batchSendMessage(String message, Set<String> userIds){
        userIds.forEach(userId->sendMessage(userId,message));
    }
    /**
     * 用户离线删除用户
     * @param userId
     */
    public static void removeUser(String userId){
        sseEmitterMap.remove(userId);
        //数量-1
        count.getAndDecrement();
        log.info("remove user id:{}",userId);
    }
    public static List<String> getIds(){
        return new ArrayList<>(sseEmitterMap.keySet());
    }
    public static int getUserCount(){
        return count.intValue();
    }
    /**
     * 结束回调
     * @param userId
     * @return
     */
    private static Runnable completionCallBack(String userId) {
        return () -> {
            log.info("结束连接,{}",userId);
            removeUser(userId);
        };
    }
    /**
     * 超时回调
     * @param userId
     * @return
     */
    private static Runnable timeOutCallBack(String userId){
        return ()->{
            log.info("连接超时,{}",userId);
            removeUser(userId);
        };
    }
    /**
     * 错误回调
     * @param userId
     * @return
     */
    private static Consumer<Throwable> errorCallBack(String userId){
        return throwable -> {
            log.error("连接异常,{}",userId);
            removeUser(userId);
        };
    }
}
src/main/java/org/springblade/modules/sse/vo/SseVO.java
New file
@@ -0,0 +1,17 @@
package org.springblade.modules.sse.vo;
import lombok.Data;
@Data
public class SseVO {
    /**
     * 类型 web,app,小程序
     */
    private String type;
    /**
     * 用户唯一值
     */
    private String userId;
}
src/main/java/org/springblade/modules/task/service/impl/TaskReportForRepairsServiceImpl.java
@@ -16,11 +16,13 @@
 */
package org.springblade.modules.task.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.metadata.IPage;
import org.springblade.core.mp.base.BaseServiceImpl;
import org.springblade.core.secure.utils.AuthUtil;
import org.springblade.modules.grid.entity.GridmanEntity;
import org.springblade.modules.grid.service.IGridService;
import org.springblade.modules.sse.server.SSEServer;
import org.springblade.modules.task.entity.TaskEntity;
import org.springblade.modules.task.entity.TaskReportForRepairsEntity;
import org.springblade.modules.task.mapper.TaskReportForRepairsMapper;
@@ -102,6 +104,8 @@
            taskReportForRepairs.setTaskId(taskEntity.getId());
            taskReportForRepairs.setConfirmFlag(1);
            flag = save(taskReportForRepairs);
            // 同时向web 端推送消息
            SSEServer.sendMessage("web:1","1");
        }
        return flag;
    }
src/main/resources/application.yml
@@ -207,14 +207,17 @@
    #接口放行
    skip-url:
      - /blade-test/**
      - /sse/**
#      - /blade-doorplateAddress/doorplateAddress/**
      - /blade-house/house/**
      - /blade-label/label/**
      - /blade-houseRental/houseRental/**
      - /blade-resource/oss/**
      - /blade-place/**
      - /blade-taskReportForRepairs/**
      - /blade-placeExt/**
#      - /blade-house/house/**
#      - /blade-label/label/**
#      - /blade-houseRental/houseRental/**
#      - /blade-resource/oss/**
#      - /blade-place/**
#      - /blade-taskReportForRepairs/**
#      - /blade-placeExt/**
#      - /blade-grid/**
#      - /blade-gridman/**
    #授权认证配置
    auth:
      - method: ALL