From a7aaeabc7873a0eafb4a7ecad7f65b018b7a9bc9 Mon Sep 17 00:00:00 2001
From: sean.zhou <sean.zhou@dji.com>
Date: Fri, 24 Feb 2023 19:31:23 +0800
Subject: [PATCH] What's new? 1. Add license for dock. 2. Modify the logic corresponding to the firmware file and device type. 3. Add multiple mqtt clients options. 4. Modify the structure of the interface for obtaining the device list. 5. Fixed some issues.
---
src/main/java/com/dji/sample/component/mqtt/service/impl/MessageSenderServiceImpl.java | 44 +++++++++++++++++++++++++++++++++++---------
1 files changed, 35 insertions(+), 9 deletions(-)
diff --git a/src/main/java/com/dji/sample/component/mqtt/service/impl/MessageSenderServiceImpl.java b/src/main/java/com/dji/sample/component/mqtt/service/impl/MessageSenderServiceImpl.java
index 2510ead..2a76cf5 100644
--- a/src/main/java/com/dji/sample/component/mqtt/service/impl/MessageSenderServiceImpl.java
+++ b/src/main/java/com/dji/sample/component/mqtt/service/impl/MessageSenderServiceImpl.java
@@ -1,14 +1,18 @@
package com.dji.sample.component.mqtt.service.impl;
+import com.dji.sample.component.mqtt.model.Chan;
+import com.dji.sample.component.mqtt.model.CommonTopicReceiver;
import com.dji.sample.component.mqtt.model.CommonTopicResponse;
+import com.dji.sample.component.mqtt.model.ServiceReply;
import com.dji.sample.component.mqtt.service.IMessageSenderService;
import com.dji.sample.component.mqtt.service.IMqttMessageGateway;
-import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+
+import java.util.concurrent.atomic.AtomicInteger;
/**
* @author sean.zhou
@@ -22,12 +26,12 @@
@Autowired
private IMqttMessageGateway messageGateway;
+ @Autowired
+ private ObjectMapper mapper;
+
public void publish(String topic, CommonTopicResponse response) {
try {
- ObjectMapper mapper = new ObjectMapper();
- // Only parameters whose value is not null will be serialised.
- mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
-
+ log.info("send topic: {}, payload: {}", topic, response.toString());
messageGateway.publish(topic, mapper.writeValueAsBytes(response));
} catch (JsonProcessingException e) {
log.info("Failed to publish the message. {}", response.toString());
@@ -37,14 +41,36 @@
public void publish(String topic, int qos, CommonTopicResponse response) {
try {
- ObjectMapper mapper = new ObjectMapper();
- // Only parameters whose value is not null will be serialised.
- mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
-
messageGateway.publish(topic, mapper.writeValueAsBytes(response), qos);
} catch (JsonProcessingException e) {
log.info("Failed to publish the message. {}", response.toString());
e.printStackTrace();
}
}
+
+ public ServiceReply publishWithReply(String topic, CommonTopicResponse response) {
+ return this.publishWithReply(ServiceReply.class, topic, response, 2);
+ }
+
+ public <T> T publishWithReply(Class<T> clazz, String topic, CommonTopicResponse response, int retryTime) {
+ log.info("send topic: {}, payload: {}", topic, response.toString());
+ AtomicInteger time = new AtomicInteger(0);
+ // Retry three times
+ while (time.getAndIncrement() <= retryTime) {
+ this.publish(topic, response);
+
+ Chan<CommonTopicReceiver<T>> chan = Chan.getInstance();
+ // If the message is not received in 0.5 seconds then resend it again.
+ CommonTopicReceiver<T> receiver = chan.get(response.getTid());
+ if (receiver == null) {
+ continue;
+ }
+ // Need to match tid and bid.
+ if (receiver.getTid().equals(response.getTid()) &&
+ receiver.getBid().equals(response.getBid())) {
+ return receiver.getData();
+ }
+ }
+ throw new RuntimeException("No message reply received.");
+ }
}
\ No newline at end of file
--
Gitblit v1.9.3