From 56df98ce4952239fbf7d0e99dbeb0e5c71531d6f Mon Sep 17 00:00:00 2001
From: sean.zhou <sean.zhou@dji.com>
Date: Fri, 18 Nov 2022 18:29:06 +0800
Subject: [PATCH] initial v1.3.0
---
src/main/java/com/dji/sample/component/mqtt/service/impl/MessageSenderServiceImpl.java | 41 +++++++++++++++++++++++++++++++++--------
1 files changed, 33 insertions(+), 8 deletions(-)
diff --git a/src/main/java/com/dji/sample/component/mqtt/service/impl/MessageSenderServiceImpl.java b/src/main/java/com/dji/sample/component/mqtt/service/impl/MessageSenderServiceImpl.java
index 2510ead..89e5093 100644
--- a/src/main/java/com/dji/sample/component/mqtt/service/impl/MessageSenderServiceImpl.java
+++ b/src/main/java/com/dji/sample/component/mqtt/service/impl/MessageSenderServiceImpl.java
@@ -1,14 +1,18 @@
package com.dji.sample.component.mqtt.service.impl;
+import com.dji.sample.component.mqtt.model.Chan;
+import com.dji.sample.component.mqtt.model.CommonTopicReceiver;
import com.dji.sample.component.mqtt.model.CommonTopicResponse;
+import com.dji.sample.component.mqtt.model.ServiceReply;
import com.dji.sample.component.mqtt.service.IMessageSenderService;
import com.dji.sample.component.mqtt.service.IMqttMessageGateway;
-import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+
+import java.util.concurrent.atomic.AtomicInteger;
/**
* @author sean.zhou
@@ -22,11 +26,11 @@
@Autowired
private IMqttMessageGateway messageGateway;
+ @Autowired
+ private ObjectMapper mapper;
+
public void publish(String topic, CommonTopicResponse response) {
try {
- ObjectMapper mapper = new ObjectMapper();
- // Only parameters whose value is not null will be serialised.
- mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
messageGateway.publish(topic, mapper.writeValueAsBytes(response));
} catch (JsonProcessingException e) {
@@ -37,14 +41,35 @@
public void publish(String topic, int qos, CommonTopicResponse response) {
try {
- ObjectMapper mapper = new ObjectMapper();
- // Only parameters whose value is not null will be serialised.
- mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
-
messageGateway.publish(topic, mapper.writeValueAsBytes(response), qos);
} catch (JsonProcessingException e) {
log.info("Failed to publish the message. {}", response.toString());
e.printStackTrace();
}
}
+
+ public ServiceReply publishWithReply(String topic, CommonTopicResponse response) {
+ return this.publishWithReply(ServiceReply.class, topic, response, 2);
+ }
+
+ public <T> T publishWithReply(Class<T> clazz, String topic, CommonTopicResponse response, int retryTime) {
+ AtomicInteger time = new AtomicInteger(0);
+ // Retry three times
+ while (time.getAndIncrement() <= retryTime) {
+ this.publish(topic, response);
+
+ Chan<CommonTopicReceiver<T>> chan = Chan.getInstance();
+ // If the message is not received in 0.5 seconds then resend it again.
+ CommonTopicReceiver<T> receiver = chan.get(response.getTid());
+ if (receiver == null) {
+ continue;
+ }
+ // Need to match tid and bid.
+ if (receiver.getTid().equals(response.getTid()) &&
+ receiver.getBid().equals(response.getBid())) {
+ return receiver.getData();
+ }
+ }
+ throw new RuntimeException("No message reply received.");
+ }
}
\ No newline at end of file
--
Gitblit v1.9.3