From 9b2eedb85d53ca32610c32c6e50b5230ab3b16cf Mon Sep 17 00:00:00 2001
From: sean.zhou <sean.zhou@dji.com>
Date: Fri, 22 Jul 2022 20:16:03 +0800
Subject: [PATCH] V1.1.0 for dock
---
src/main/java/com/dji/sample/component/mqtt/service/impl/MessageSenderServiceImpl.java | 38 ++++++++++++++++++++++++++++++--------
1 files changed, 30 insertions(+), 8 deletions(-)
diff --git a/src/main/java/com/dji/sample/component/mqtt/service/impl/MessageSenderServiceImpl.java b/src/main/java/com/dji/sample/component/mqtt/service/impl/MessageSenderServiceImpl.java
index 2510ead..ae24b37 100644
--- a/src/main/java/com/dji/sample/component/mqtt/service/impl/MessageSenderServiceImpl.java
+++ b/src/main/java/com/dji/sample/component/mqtt/service/impl/MessageSenderServiceImpl.java
@@ -1,14 +1,19 @@
package com.dji.sample.component.mqtt.service.impl;
+import com.dji.sample.component.mqtt.model.Chan;
+import com.dji.sample.component.mqtt.model.CommonTopicReceiver;
import com.dji.sample.component.mqtt.model.CommonTopicResponse;
+import com.dji.sample.component.mqtt.model.ServiceReply;
import com.dji.sample.component.mqtt.service.IMessageSenderService;
import com.dji.sample.component.mqtt.service.IMqttMessageGateway;
-import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* @author sean.zhou
@@ -22,11 +27,11 @@
@Autowired
private IMqttMessageGateway messageGateway;
+ @Autowired
+ private ObjectMapper mapper;
+
public void publish(String topic, CommonTopicResponse response) {
try {
- ObjectMapper mapper = new ObjectMapper();
- // Only parameters whose value is not null will be serialised.
- mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
messageGateway.publish(topic, mapper.writeValueAsBytes(response));
} catch (JsonProcessingException e) {
@@ -37,14 +42,31 @@
public void publish(String topic, int qos, CommonTopicResponse response) {
try {
- ObjectMapper mapper = new ObjectMapper();
- // Only parameters whose value is not null will be serialised.
- mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
-
messageGateway.publish(topic, mapper.writeValueAsBytes(response), qos);
} catch (JsonProcessingException e) {
log.info("Failed to publish the message. {}", response.toString());
e.printStackTrace();
}
}
+
+ public Optional<ServiceReply> publishWithReply(String topic, CommonTopicResponse response) {
+ AtomicInteger time = new AtomicInteger(0);
+ // Retry three times
+ while (time.getAndIncrement() < 3) {
+ this.publish(topic, response);
+
+ Chan<CommonTopicReceiver<ServiceReply>> chan = Chan.getInstance();
+ // If the message is not received in 0.5 seconds then resend it again.
+ CommonTopicReceiver<ServiceReply> receiver = chan.get(response.getMethod());
+ if (receiver == null) {
+ continue;
+ }
+ // Need to match tid and bid.
+ if (receiver.getTid().equals(response.getTid()) &&
+ receiver.getBid().equals(response.getBid())) {
+ return Optional.ofNullable(receiver.getData());
+ }
+ }
+ return Optional.empty();
+ }
}
\ No newline at end of file
--
Gitblit v1.9.3