| | |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Service; |
| | | |
| | | import java.util.Optional; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | |
| | | /** |
| | |
| | | } |
| | | } |
| | | |
| | | public Optional<ServiceReply> publishWithReply(String topic, CommonTopicResponse response) { |
| | | public ServiceReply publishWithReply(String topic, CommonTopicResponse response) { |
| | | return this.publishWithReply(ServiceReply.class, topic, response, 2); |
| | | } |
| | | |
| | | public <T> T publishWithReply(Class<T> clazz, String topic, CommonTopicResponse response, int retryTime) { |
| | | AtomicInteger time = new AtomicInteger(0); |
| | | // Retry three times |
| | | while (time.getAndIncrement() < 3) { |
| | | while (time.getAndIncrement() <= retryTime) { |
| | | this.publish(topic, response); |
| | | |
| | | Chan<CommonTopicReceiver<ServiceReply>> chan = Chan.getInstance(); |
| | | Chan<CommonTopicReceiver<T>> chan = Chan.getInstance(); |
| | | // If the message is not received in 0.5 seconds then resend it again. |
| | | CommonTopicReceiver<ServiceReply> receiver = chan.get(response.getMethod()); |
| | | CommonTopicReceiver<T> receiver = chan.get(response.getTid()); |
| | | if (receiver == null) { |
| | | continue; |
| | | } |
| | | // Need to match tid and bid. |
| | | if (receiver.getTid().equals(response.getTid()) && |
| | | receiver.getBid().equals(response.getBid())) { |
| | | return Optional.ofNullable(receiver.getData()); |
| | | return receiver.getData(); |
| | | } |
| | | } |
| | | return Optional.empty(); |
| | | throw new RuntimeException("No message reply received."); |
| | | } |
| | | } |