| | |
| | | package com.dji.sample.component.mqtt.config; |
| | | |
| | | import com.dji.sample.component.mqtt.model.ChannelName; |
| | | import com.dji.sample.component.mqtt.model.MqttClientOptions; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.context.annotation.Bean; |
| | |
| | | import org.springframework.integration.mqtt.core.MqttPahoClientFactory; |
| | | import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; |
| | | import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; |
| | | import org.springframework.integration.mqtt.support.MqttHeaders; |
| | | import org.springframework.messaging.MessageChannel; |
| | | import org.springframework.messaging.MessageHandler; |
| | | |
| | |
| | | public class MqttInboundConfiguration { |
| | | |
| | | @Autowired |
| | | private MqttConfiguration mqttConfiguration; |
| | | |
| | | @Autowired |
| | | private MqttPahoClientFactory mqttClientFactory; |
| | | |
| | | @Resource(name = ChannelName.INBOUND) |
| | |
| | | */ |
| | | @Bean(name = "adapter") |
| | | public MessageProducerSupport mqttInbound() { |
| | | MqttClientOptions options = MqttConfiguration.getBasicClientOptions(); |
| | | MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( |
| | | mqttConfiguration.getClientId() + "_consumer_" + System.currentTimeMillis(), |
| | | mqttClientFactory, mqttConfiguration.getInboundTopic().split(",")); |
| | | options.getClientId() + "_consumer_" + System.currentTimeMillis(), |
| | | mqttClientFactory, options.getInboundTopic().split(",")); |
| | | DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); |
| | | // use byte types uniformly |
| | | converter.setPayloadAsBytes(true); |
| | |
| | | |
| | | /** |
| | | * Define a default channel to handle messages that have no effect. |
| | | * 定义默认通道处理无效消息 |
| | | * @return |
| | | */ |
| | | @Bean |
| | | @ServiceActivator(inputChannel = ChannelName.DEFAULT) |
| | | public MessageHandler defaultInboundHandler() { |
| | | return message -> { |
| | | log.info("The default channel does not handle messages."); |
| | | log.info("The default channel does not handle messages." + |
| | | "\nTopic: " + message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC) + |
| | | "\nPayload: " + message.getPayload()); |
| | | }; |
| | | } |
| | | |