Commit f87865b2 LN

1

1 个父辈 8420f39e
package com.neotel.smfcore.common.mqtt;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.io.Serializable;
@Component
@ConfigurationProperties(prefix = "com.mqtt") //对应yml文件中的com下的mqtt文件配置
@Data
public class MqttConfiguration implements Serializable {
private String url;
private String clientId;
private String topics;
private String username;
private String password;
private String timeout;
private String keepalive;
}
//package com.neotel.smfcore.common.mqtt;
//
//import lombok.Data;
//import org.springframework.boot.context.properties.ConfigurationProperties;
//import org.springframework.stereotype.Component;
//
//import java.io.Serializable;
//
//@Component
//@ConfigurationProperties(prefix = "com.mqtt") //对应yml文件中的com下的mqtt文件配置
//@Data
//public class MqttConfiguration implements Serializable {
// private String url;
// private String clientId;
// private String topics;
// private String username;
// private String password;
// private String timeout;
// private String keepalive;
//}
package com.neotel.smfcore.common.mqtt;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
// 定义重载方法,用于消息发送
void sendToMqtt(String payload);
// 指定topic进行消息发送
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
}
//package com.neotel.smfcore.common.mqtt;
//
//import org.springframework.integration.annotation.MessagingGateway;
//import org.springframework.integration.mqtt.support.MqttHeaders;
//import org.springframework.messaging.handler.annotation.Header;
//
//@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
//public interface MqttGateway {
// // 定义重载方法,用于消息发送
// void sendToMqtt(String payload);
//
// // 指定topic进行消息发送
// void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
//
// void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
//
// void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
//}
......@@ -23,51 +23,51 @@ import java.util.Map;
@RequestMapping("api/mqtt")
public class MqttPubController {
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
private MqttGateway mqttGateway;
@AnonymousAccess
@GetMapping("/hello")
public ResultBean hello() {
log.info("hello");
return ResultBean.newOkResult("hello!");
}
@AnonymousAccess
@GetMapping("/sendMqtt")
public ResultBean sendMqtt( String sendData) {
log.info("sendMqtt:" + sendData);
mqttGateway.sendToMqtt("topic-smf-01", (String) sendData);
log.info("sendMqtt topic[topic-smf-01] OK"+ sendData);
return ResultBean.newOkResult("sendMqtt topic[topic-smf-01] OK:" + sendData);
}
@AnonymousAccess
@GetMapping("/sendMqttTopic")
public ResultBean sendMqttTopic(String sendData, String topic) {
log.info("sendMqtt: topic=" + topic + ",sendData=" + sendData);
mqttGateway.sendToMqtt(topic, (String) sendData);
log.info("sendMqtt OK,topic=[" + topic + "],sendData=" + sendData);
return ResultBean.newOkResult("sendMqtt OK,topic=[" + topic + "],sendData=" + sendData);
}
@AnonymousAccess
@GetMapping("/sendMqttTopicMsg")
public ResultBean sendMqttTopicMsg(String topic ) {
Map<String,String> params=new HashMap<>();
params.put("reelId","reelId00999");
params.put("slot","A3_23");
params.put("deviceId","1");
GreeRequestMsg msgB=GreeRequestMsg.newMsg("pull_request",params );
String msg= JsonUtil.toJsonStr(msgB);
mqttGateway.sendToMqtt(topic, (String) msg);
log.info("sendMqttTopicMsg: topic=" + topic + ",sendData=" + msg);
return ResultBean.newOkResult("sendMqttTopicMsg OK,topic=[" + topic + "]");
}
// @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
// @Autowired
// private MqttGateway mqttGateway;
//
//
// @AnonymousAccess
// @GetMapping("/hello")
// public ResultBean hello() {
// log.info("hello");
// return ResultBean.newOkResult("hello!");
// }
//
// @AnonymousAccess
// @GetMapping("/sendMqtt")
// public ResultBean sendMqtt( String sendData) {
// log.info("sendMqtt:" + sendData);
// mqttGateway.sendToMqtt("topic-smf-01", (String) sendData);
// log.info("sendMqtt topic[topic-smf-01] OK"+ sendData);
//
// return ResultBean.newOkResult("sendMqtt topic[topic-smf-01] OK:" + sendData);
// }
//
//
// @AnonymousAccess
// @GetMapping("/sendMqttTopic")
// public ResultBean sendMqttTopic(String sendData, String topic) {
//
// log.info("sendMqtt: topic=" + topic + ",sendData=" + sendData);
// mqttGateway.sendToMqtt(topic, (String) sendData);
// log.info("sendMqtt OK,topic=[" + topic + "],sendData=" + sendData);
// return ResultBean.newOkResult("sendMqtt OK,topic=[" + topic + "],sendData=" + sendData);
// }
//
// @AnonymousAccess
// @GetMapping("/sendMqttTopicMsg")
// public ResultBean sendMqttTopicMsg(String topic ) {
// Map<String,String> params=new HashMap<>();
// params.put("reelId","reelId00999");
// params.put("slot","A3_23");
// params.put("deviceId","1");
// GreeRequestMsg msgB=GreeRequestMsg.newMsg("pull_request",params );
//
// String msg= JsonUtil.toJsonStr(msgB);
// mqttGateway.sendToMqtt(topic, (String) msg);
// log.info("sendMqttTopicMsg: topic=" + topic + ",sendData=" + msg);
// return ResultBean.newOkResult("sendMqttTopicMsg OK,topic=[" + topic + "]");
// }
}
\ No newline at end of file
package com.neotel.smfcore.common.mqtt;
import com.neotel.smfcore.custom.gree20242.api.GreeMsgHandlerImpl;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* MQTT消费端
*
*/
@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttboundConfiguration {
@Autowired
private MqttConfiguration mqttProperties;
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
log.info("mqttProperties=" + mqttProperties.toString());
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
String[] array = mqttProperties.getUrl().split(",");
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(array);
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
options.setKeepAliveInterval(2);
log.info("开始连接:[" + mqttProperties.getUrl() + "][" + mqttProperties.getUsername() + "][" + mqttProperties.getPassword() + "]");
//接受离线消息
options.setCleanSession(false);
factory.setConnectionOptions(options);
return factory;
}
//配置client,监听的topic
@Bean
public MessageProducer inbound() {
String[] inboundTopics = mqttProperties.getTopics().split(",");
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
mqttProperties.getClientId()+"_inbound",mqttClientFactory(), inboundTopics); //对inboundTopics主题进行监听
adapter.setCompletionTimeout(5000);
adapter.setQos(1);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setOutputChannel(mqttInputChannel());
log.info("inbound:监听topic:"+mqttProperties.getTopics());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler mqttHandler(GreeMsgHandlerImpl messageHandler) {
return message -> messageHandler.handleMessage(message);
}
// //通过通道获取数据
//package com.neotel.smfcore.common.mqtt;
//
//
//import cn.hutool.core.util.ObjectUtil;
//import com.neotel.smfcore.custom.gree20242.api.GreeMsgHandlerImpl;
//import lombok.extern.slf4j.Slf4j;
//import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.integration.annotation.IntegrationComponentScan;
//import org.springframework.integration.annotation.ServiceActivator;
//import org.springframework.integration.channel.DirectChannel;
//import org.springframework.integration.core.MessageProducer;
//import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
//import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
//import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
//import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
//import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
//import org.springframework.messaging.MessageChannel;
//import org.springframework.messaging.MessageHandler;
//
//
///**
// * MQTT消费端
// *
// */
//@Configuration
//@IntegrationComponentScan
//@Slf4j
//public class MqttboundConfiguration {
//
// @Autowired
// private MqttConfiguration mqttProperties;
//
//
// @Bean
// public MessageChannel mqttInputChannel() {
// return new DirectChannel();
// }
//
// @Bean
// public MqttPahoClientFactory mqttClientFactory() {
// if(ObjectUtil.isEmpty(mqttProperties.getUrl())){
// return null;
// }
// log.info("mqttProperties=" + mqttProperties.toString());
// DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
// String[] array = mqttProperties.getUrl().split(",");
// MqttConnectOptions options = new MqttConnectOptions();
//
// options.setServerURIs(array);
// options.setUserName(mqttProperties.getUsername());
// options.setPassword(mqttProperties.getPassword().toCharArray());
// options.setKeepAliveInterval(2);
// log.info("开始连接:[" + mqttProperties.getUrl() + "][" + mqttProperties.getUsername() + "][" + mqttProperties.getPassword() + "]");
// //接受离线消息
// options.setCleanSession(false);
//
// factory.setConnectionOptions(options);
//
// return factory;
// }
//
// //配置client,监听的topic
// @Bean
// @ServiceActivator(inputChannel = "mqttInputChannel") //异步处理
// public MessageHandler handler() {
// return new MessageHandler() {
// @Override
// public void handleMessage(Message<?> message) throws MessagingException {
//// System.out.println("message:"+message);
// log.info("---------handleMessage-------------");
// log.info("收到消息:message:"+message.getPayload());
// log.info("收到消息:PacketId:"+message.getHeaders().getId());
// log.info("收到消息:Qos:"+message.getHeaders().get(MqttHeaders.QOS));
//
// String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
// log.info("收到消息:topic:"+topic);
//
// }
// };
// public MessageProducer inbound() {
// if(ObjectUtil.isEmpty(mqttProperties.getUrl())){
// return null;
// }
// String[] inboundTopics = mqttProperties.getTopics().split(",");
// MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
// mqttProperties.getClientId()+"_inbound",mqttClientFactory(), inboundTopics); //对inboundTopics主题进行监听
// adapter.setCompletionTimeout(5000);
// adapter.setQos(1);
// adapter.setConverter(new DefaultPahoMessageConverter());
// adapter.setOutputChannel(mqttInputChannel());
// log.info("inbound:监听topic:"+mqttProperties.getTopics());
// return adapter;
// }
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
mqttProperties.getClientId()+"outbound", mqttClientFactory());
messageHandler.setAsync(true);
return messageHandler;
}
}
\ No newline at end of file
// @Bean
// @ServiceActivator(inputChannel = "mqttInputChannel")
// public MessageHandler mqttHandler(GreeMsgHandlerImpl messageHandler) {
// return message -> messageHandler.handleMessage(message);
// }
//
//
//
//// //通过通道获取数据
//// @Bean
//// @ServiceActivator(inputChannel = "mqttInputChannel") //异步处理
//// public MessageHandler handler() {
//// return new MessageHandler() {
//// @Override
//// public void handleMessage(Message<?> message) throws MessagingException {
////// System.out.println("message:"+message);
//// log.info("---------handleMessage-------------");
//// log.info("收到消息:message:"+message.getPayload());
//// log.info("收到消息:PacketId:"+message.getHeaders().getId());
//// log.info("收到消息:Qos:"+message.getHeaders().get(MqttHeaders.QOS));
////
//// String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
//// log.info("收到消息:topic:"+topic);
////
//// }
//// };
//// }
//
// @Bean
// @ServiceActivator(inputChannel = "mqttOutboundChannel")
// public MessageHandler mqttOutbound() {
// if(ObjectUtil.isEmpty(mqttProperties.getUrl())){
// return null;
// }
// MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
// mqttProperties.getClientId()+"outbound", mqttClientFactory());
// messageHandler.setAsync(true);
//
// return messageHandler;
// }
//
//}
\ No newline at end of file
......@@ -139,7 +139,7 @@ public class GreeDeviceController {
return ResultBean.newOkResult("");
}
@ApiOperation("获取出料机构入口的料")
@PostMapping(value = "/reelArriveOutBound")
@PostMapping(value = "/getOutBoundReel")
@ResponseBody
@AnonymousAccess
public ResultBean getOutBoundReel(HttpServletRequest request) {
......
package com.neotel.smfcore.custom.gree20242.api;
import com.neotel.smfcore.common.mqtt.MqttGateway;
import com.neotel.smfcore.common.utils.JsonUtil;
import com.neotel.smfcore.core.api.listener.BaseSmfApiListener;
import com.neotel.smfcore.core.order.service.po.LiteOrder;
......@@ -20,7 +19,7 @@ public class GreeApi extends BaseSmfApiListener {
}
@Autowired
private MqttGateway mqttGateway;
// private MqttGateway mqttGateway;
@Override
public void inTaskStatusChange(String inNotifyUrl, DataLog task) {
......@@ -65,12 +64,12 @@ public class GreeApi extends BaseSmfApiListener {
public boolean SendMsg(String url,GreeRequestMsg msgBean ) {
//url地址配置为发送主题
String msg = JsonUtil.toJsonStr(msgBean);
mqttGateway.sendToMqtt(url, msg);
log.info("GreeApi SendMsg: topic=[" + url + "],sendData=" + msg);
GreeMsgCache.addSendMsg(msgBean);
return true;
// //url地址配置为发送主题
// String msg = JsonUtil.toJsonStr(msgBean);
// mqttGateway.sendToMqtt(url, msg);
// log.info("GreeApi SendMsg: topic=[" + url + "],sendData=" + msg);
// GreeMsgCache.addSendMsg(msgBean);
return true;
}
}
......@@ -4,7 +4,6 @@ package com.neotel.smfcore.custom.gree20242.api;
import cn.hutool.core.util.ObjectUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.neotel.smfcore.common.mqtt.MqttGateway;
import com.neotel.smfcore.common.utils.JsonUtil;
import com.neotel.smfcore.common.utils.StorageConstants;
import com.neotel.smfcore.core.device.bean.BoxStatusBean;
......@@ -41,8 +40,8 @@ public class GreeMsgHandlerImpl implements MessageHandler {
@Autowired
private DataCache dataCache;
@Autowired
private MqttGateway mqttGateway;
// @Autowired
// private MqttGateway mqttGateway;
......@@ -83,7 +82,7 @@ public class GreeMsgHandlerImpl implements MessageHandler {
String responseStr=JsonUtil.toJsonStr(responseMsg);
log.info("返回消息:topic:["+topic+"],packetId["+packetId+"],qos:["+qos+"],message:[" + responseStr+"]");
mqttGateway.sendToMqtt(topic,qos,responseStr);
// mqttGateway.sendToMqtt(topic,qos,responseStr);
}else{
log.info("收到消息: message:[" + message.getPayload()+"] 主题或ID为空,暂不处理" );
......
支持 Markdown 格式
你添加了 0 到此讨论。请谨慎行事。
Finish editing this message first!