package com.dji.sample.component.mqtt.config;
|
|
import com.dji.sample.component.mqtt.model.ChannelName;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Configuration;
|
import org.springframework.integration.annotation.IntegrationComponentScan;
|
import org.springframework.integration.annotation.ServiceActivator;
|
import org.springframework.integration.endpoint.MessageProducerSupport;
|
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
|
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
|
import org.springframework.integration.mqtt.support.MqttHeaders;
|
import org.springframework.messaging.MessageChannel;
|
import org.springframework.messaging.MessageHandler;
|
|
import javax.annotation.Resource;
|
|
/**
|
* Client configuration for inbound messages.
|
* @author sean.zhou
|
* @date 2021/11/10
|
* @version 0.1
|
*/
|
@Slf4j
|
@Configuration
|
@IntegrationComponentScan
|
public class MqttInboundConfiguration {
|
|
@Autowired
|
private MqttConfiguration mqttConfiguration;
|
|
@Autowired
|
private MqttPahoClientFactory mqttClientFactory;
|
|
@Resource(name = ChannelName.INBOUND)
|
private MessageChannel inboundChannel;
|
|
/**
|
* Clients of inbound message channels.
|
* @return
|
*/
|
@Bean(name = "adapter")
|
public MessageProducerSupport mqttInbound() {
|
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
|
mqttConfiguration.getClientId() + "_consumer_" + System.currentTimeMillis(),
|
mqttClientFactory, mqttConfiguration.getInboundTopic().split(","));
|
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
|
// use byte types uniformly
|
converter.setPayloadAsBytes(true);
|
adapter.setConverter(converter);
|
adapter.setQos(1);
|
adapter.setOutputChannel(inboundChannel);
|
return adapter;
|
}
|
|
/**
|
* Define a default channel to handle messages that have no effect.
|
* @return
|
*/
|
@Bean
|
@ServiceActivator(inputChannel = ChannelName.DEFAULT)
|
public MessageHandler defaultInboundHandler() {
|
return message -> {
|
log.info("The default channel does not handle messages." +
|
"\nTopic: " + message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC) +
|
"\nPayload: " + message.getPayload());
|
};
|
}
|
|
}
|