From a7aaeabc7873a0eafb4a7ecad7f65b018b7a9bc9 Mon Sep 17 00:00:00 2001
From: sean.zhou <sean.zhou@dji.com>
Date: Fri, 24 Feb 2023 19:31:23 +0800
Subject: [PATCH] What's new? 1. Add license for dock. 2. Modify the logic corresponding to the firmware file and device type. 3. Add multiple mqtt clients options. 4. Modify the structure of the interface for obtaining the device list. 5. Fixed some issues.

---
 src/main/java/com/dji/sample/component/mqtt/service/impl/MessageSenderServiceImpl.java |   20 ++++++++++++--------
 1 files changed, 12 insertions(+), 8 deletions(-)

diff --git a/src/main/java/com/dji/sample/component/mqtt/service/impl/MessageSenderServiceImpl.java b/src/main/java/com/dji/sample/component/mqtt/service/impl/MessageSenderServiceImpl.java
index ae24b37..2a76cf5 100644
--- a/src/main/java/com/dji/sample/component/mqtt/service/impl/MessageSenderServiceImpl.java
+++ b/src/main/java/com/dji/sample/component/mqtt/service/impl/MessageSenderServiceImpl.java
@@ -12,7 +12,6 @@
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -32,7 +31,7 @@
 
     public void publish(String topic, CommonTopicResponse response) {
         try {
-
+            log.info("send topic: {}, payload: {}", topic, response.toString());
             messageGateway.publish(topic, mapper.writeValueAsBytes(response));
         } catch (JsonProcessingException e) {
             log.info("Failed to publish the message. {}", response.toString());
@@ -49,24 +48,29 @@
         }
     }
 
-    public Optional<ServiceReply> publishWithReply(String topic, CommonTopicResponse response) {
+    public ServiceReply publishWithReply(String topic, CommonTopicResponse response) {
+        return this.publishWithReply(ServiceReply.class, topic, response, 2);
+    }
+
+    public <T> T publishWithReply(Class<T> clazz, String topic, CommonTopicResponse response, int retryTime) {
+        log.info("send topic: {}, payload: {}", topic, response.toString());
         AtomicInteger time = new AtomicInteger(0);
         // Retry three times
-        while (time.getAndIncrement() < 3) {
+        while (time.getAndIncrement() <= retryTime) {
             this.publish(topic, response);
 
-            Chan<CommonTopicReceiver<ServiceReply>> chan = Chan.getInstance();
+            Chan<CommonTopicReceiver<T>> chan = Chan.getInstance();
             // If the message is not received in 0.5 seconds then resend it again.
-            CommonTopicReceiver<ServiceReply> receiver = chan.get(response.getMethod());
+            CommonTopicReceiver<T> receiver = chan.get(response.getTid());
             if (receiver == null) {
                 continue;
             }
             // Need to match tid and bid.
             if (receiver.getTid().equals(response.getTid()) &&
                     receiver.getBid().equals(response.getBid())) {
-                return Optional.ofNullable(receiver.getData());
+                return receiver.getData();
             }
         }
-        return Optional.empty();
+        throw new RuntimeException("No message reply received.");
     }
 }
\ No newline at end of file

--
Gitblit v1.9.3