| | |
| | | package com.dji.sample.component.mqtt.service.impl; |
| | | |
| | | import com.dji.sample.component.mqtt.model.Chan; |
| | | import com.dji.sample.component.mqtt.model.CommonTopicReceiver; |
| | | import com.dji.sample.component.mqtt.model.CommonTopicResponse; |
| | | import com.dji.sample.component.mqtt.model.ServiceReply; |
| | | import com.dji.sample.component.mqtt.service.IMessageSenderService; |
| | | import com.dji.sample.component.mqtt.service.IMqttMessageGateway; |
| | | import com.fasterxml.jackson.annotation.JsonInclude; |
| | | import com.fasterxml.jackson.core.JsonProcessingException; |
| | | import com.fasterxml.jackson.databind.ObjectMapper; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Service; |
| | | |
| | | import java.util.Optional; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | |
| | | /** |
| | | * @author sean.zhou |
| | |
| | | @Autowired |
| | | private IMqttMessageGateway messageGateway; |
| | | |
| | | @Autowired |
| | | private ObjectMapper mapper; |
| | | |
| | | public void publish(String topic, CommonTopicResponse response) { |
| | | try { |
| | | ObjectMapper mapper = new ObjectMapper(); |
| | | // Only parameters whose value is not null will be serialised. |
| | | mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); |
| | | |
| | | messageGateway.publish(topic, mapper.writeValueAsBytes(response)); |
| | | } catch (JsonProcessingException e) { |
| | |
| | | |
| | | public void publish(String topic, int qos, CommonTopicResponse response) { |
| | | try { |
| | | ObjectMapper mapper = new ObjectMapper(); |
| | | // Only parameters whose value is not null will be serialised. |
| | | mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); |
| | | |
| | | messageGateway.publish(topic, mapper.writeValueAsBytes(response), qos); |
| | | } catch (JsonProcessingException e) { |
| | | log.info("Failed to publish the message. {}", response.toString()); |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | |
| | | public Optional<ServiceReply> publishWithReply(String topic, CommonTopicResponse response) { |
| | | AtomicInteger time = new AtomicInteger(0); |
| | | // Retry three times |
| | | while (time.getAndIncrement() < 3) { |
| | | this.publish(topic, response); |
| | | |
| | | Chan<CommonTopicReceiver<ServiceReply>> chan = Chan.getInstance(); |
| | | // If the message is not received in 0.5 seconds then resend it again. |
| | | CommonTopicReceiver<ServiceReply> receiver = chan.get(response.getMethod()); |
| | | if (receiver == null) { |
| | | continue; |
| | | } |
| | | // Need to match tid and bid. |
| | | if (receiver.getTid().equals(response.getTid()) && |
| | | receiver.getBid().equals(response.getBid())) { |
| | | return Optional.ofNullable(receiver.getData()); |
| | | } |
| | | } |
| | | return Optional.empty(); |
| | | } |
| | | } |