package com.dji.sample.component.mqtt.handler;
|
|
import com.dji.sample.common.util.SpringBeanUtils;
|
import com.dji.sample.component.mqtt.model.ChannelName;
|
import com.dji.sample.component.mqtt.model.DeviceTopicEnum;
|
//import com.dji.sample.component.rabbitmq.config.MqttMsgProxyProducer;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.integration.annotation.Router;
|
import org.springframework.integration.mqtt.support.MqttHeaders;
|
import org.springframework.integration.router.AbstractMessageRouter;
|
import org.springframework.messaging.Message;
|
import org.springframework.messaging.MessageChannel;
|
import org.springframework.messaging.MessageHeaders;
|
import org.springframework.stereotype.Component;
|
|
import java.util.Collection;
|
import java.util.Collections;
|
|
/**
|
*
|
* @author sean.zhou
|
* @date 2021/11/10
|
* @version 0.1
|
*/
|
@Component
|
@Slf4j
|
public class InboundMessageRouter extends AbstractMessageRouter {
|
/*@Autowired
|
private MqttMsgProxyProducer mqttMsgProxyProducer;*/
|
|
/**
|
* All mqtt broker messages will arrive here before distributing them to different channels.
|
* @param message message from mqtt broker
|
* @return channel
|
*/
|
@Override
|
@Router(inputChannel = ChannelName.INBOUND)
|
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
|
MessageHeaders headers = message.getHeaders();
|
String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString();
|
byte[] payload = (byte[])message.getPayload();
|
|
log.debug("received topic :{} \t payload :{}", topic, new String(payload));
|
|
DeviceTopicEnum topicEnum = DeviceTopicEnum.find(topic);
|
MessageChannel bean = (MessageChannel) SpringBeanUtils.getBean(topicEnum.getBeanName());
|
String msg = new String(payload);
|
try {
|
// mqttMsgProxyProducer.publish(topic.replace("/","."), msg);
|
} catch (Exception e) {
|
log.error("消息发送失败:", e);
|
}
|
return Collections.singleton(bean);
|
}
|
}
|