Commit 23ed044d LN

增加mqtt通信

1 个父辈 bd32adc0
package com.neotel.smfcore.common.mqtt;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.io.Serializable;
@Component
@ConfigurationProperties(prefix = "com.mqtt") //对应yml文件中的com下的mqtt文件配置
@Data
public class MqttConfiguration implements Serializable {
private String url;
private String clientId;
private String topics;
private String username;
private String password;
private String timeout;
private String keepalive;
}
package com.neotel.smfcore.common.mqtt;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
// 定义重载方法,用于消息发送
void sendToMqtt(String payload);
// 指定topic进行消息发送
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
}
package com.neotel.smfcore.common.mqtt;
import com.neotel.smfcore.common.bean.ResultBean;
import com.neotel.smfcore.common.utils.JsonUtil;
import com.neotel.smfcore.custom.gree20242.bean.GreeRequestMsg;
import com.neotel.smfcore.security.annotation.AnonymousAccess;
import io.swagger.annotations.Api;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
@Slf4j
@Api(tags = "MQTT TEST")
@RequestMapping("api/mqtt")
public class MqttPubController {
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
private MqttGateway mqttGateway;
@AnonymousAccess
@GetMapping("/hello")
public ResultBean hello() {
log.info("hello");
return ResultBean.newOkResult("hello!");
}
@AnonymousAccess
@GetMapping("/sendMqtt")
public ResultBean sendMqtt( String sendData) {
log.info("sendMqtt:" + sendData);
mqttGateway.sendToMqtt("topic-smf-01", (String) sendData);
log.info("sendMqtt topic[topic-smf-01] OK"+ sendData);
return ResultBean.newOkResult("sendMqtt topic[topic-smf-01] OK:" + sendData);
}
@AnonymousAccess
@GetMapping("/sendMqttTopic")
public ResultBean sendMqttTopic(String sendData, String topic) {
log.info("sendMqtt: topic=" + topic + ",sendData=" + sendData);
mqttGateway.sendToMqtt(topic, (String) sendData);
log.info("sendMqtt OK,topic=[" + topic + "],sendData=" + sendData);
return ResultBean.newOkResult("sendMqtt OK,topic=[" + topic + "],sendData=" + sendData);
}
@AnonymousAccess
@GetMapping("/sendMqttTopicMsg")
public ResultBean sendMqttTopicMsg(String topic ) {
Map<String,String> params=new HashMap<>();
params.put("reelId","reelId00999");
params.put("slot","A3_23");
params.put("deviceId","1");
GreeRequestMsg msgB=GreeRequestMsg.newMsg("pull_request",params );
String msg= JsonUtil.toJsonStr(msgB);
mqttGateway.sendToMqtt(topic, (String) msg);
log.info("sendMqttTopicMsg: topic=" + topic + ",sendData=" + msg);
return ResultBean.newOkResult("sendMqttTopicMsg OK,topic=[" + topic + "]");
}
}
\ No newline at end of file
package com.neotel.smfcore.common.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
/**
* MQTT消费端
*
*/
@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttboundConfiguration {
@Autowired
private MqttConfiguration mqttProperties;
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
log.info("mqttProperties=" + mqttProperties.toString());
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
String[] array = mqttProperties.getUrl().split(",");
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(array);
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
options.setKeepAliveInterval(2);
log.info("开始连接:[" + mqttProperties.getUrl() + "][" + mqttProperties.getUsername() + "][" + mqttProperties.getPassword() + "]");
//接受离线消息
options.setCleanSession(false);
factory.setConnectionOptions(options);
return factory;
}
//配置client,监听的topic
@Bean
public MessageProducer inbound() {
String[] inboundTopics = mqttProperties.getTopics().split(",");
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
mqttProperties.getClientId()+"_inbound",mqttClientFactory(), inboundTopics); //对inboundTopics主题进行监听
adapter.setCompletionTimeout(5000);
adapter.setQos(1);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setOutputChannel(mqttInputChannel());
log.info("inbound:监听topic:"+mqttProperties.getTopics());
return adapter;
}
//通过通道获取数据
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel") //异步处理
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
// System.out.println("message:"+message);
log.info("---------handleMessage-------------");
log.info("收到消息:message:"+message.getPayload());
log.info("收到消息:PacketId:"+message.getHeaders().getId());
log.info("收到消息:Qos:"+message.getHeaders().get(MqttHeaders.QOS));
String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
log.info("收到消息:topic:"+topic);
}
};
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
mqttProperties.getClientId()+"outbound", mqttClientFactory());
messageHandler.setAsync(true);
return messageHandler;
}
}
\ No newline at end of file
package com.neotel.smfcore.custom.gree20242;
import com.neotel.smfcore.core.api.listener.BaseSmfApiListener;
import com.neotel.smfcore.core.order.service.po.LiteOrder;
import com.neotel.smfcore.core.system.service.po.DataLog;
import com.neotel.smfcore.custom.gree20242.bean.GreeRequestMsg;
import java.util.HashMap;
import java.util.Map;
public class GreeApi extends BaseSmfApiListener {
@Override
public boolean isForThisApi(String apiName) {
return apiName != null && apiName.equalsIgnoreCase("20242");
}
@Override
public void inTaskStatusChange(String inNotifyUrl, DataLog task) {
if (task.isPutInTask() && task.isFinished()) {
Map<String, String> params = new HashMap<>();
params.put("reelId", task.getBarcode());
params.put("slot", task.getPosName());
params.put("deviceId", task.getStorageId());
GreeRequestMsg msg = GreeRequestMsg.newMsg(inNotifyUrl, params);
boolean result= SendMsg(inNotifyUrl,msg);
}
}
@Override
public void onOrderStatusChange(String orderNotifyUrl, LiteOrder liteOrder) {
Map<String, String> params = new HashMap<>();
params.put("shelfId", liteOrder.getOrderNo());
params.put("hSerial", liteOrder.getOrderNo());
params.put("so", liteOrder.getSo());
params.put("line", liteOrder.getLine());
GreeRequestMsg msg = GreeRequestMsg.newMsg(orderNotifyUrl, params);
boolean result= SendMsg(orderNotifyUrl,msg);
}
public void ShelfArriveLine(String arriveLineUrl, LiteOrder liteOrder) {
Map<String, String> params = new HashMap<>();
params.put("shelfId", liteOrder.getOrderNo());
params.put("hSerial", liteOrder.getOrderNo());
params.put("so", liteOrder.getSo());
params.put("line", liteOrder.getLine());
GreeRequestMsg msg = GreeRequestMsg.newMsg(arriveLineUrl, params);
boolean result= SendMsg(arriveLineUrl,msg);
}
public boolean SendMsg(String url,GreeRequestMsg msg){
return true;
}
}
package com.neotel.smfcore.custom.gree20242;
import org.springframework.stereotype.Service;
@Service
public class GreeApiHandler {
}
package com.neotel.smfcore.custom.gree20242.bean;
import com.neotel.smfcore.common.utils.DateUtil;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@Data
public class GreeRequestMsg implements Serializable {
public static GreeRequestMsg newMsg(String suject,Map<String,String> params){
GreeRequestMsg msg=new GreeRequestMsg() ;
msg.setSubject(suject);
msg.setData(params);
String time=DateUtil.toDateString(new Date(),"yyyy-MM-dd HH:mm:ss");
String uid=DateUtil.toDateString(new Date(),"yyyyMMddHHmmss");
msg.setTime(time);
msg.setId(uid);
return msg;
}
/**
* specVersion CloudEvents 规范版本 String 是"1.0"
*/
private String specVersion = "1.0";
/**
* type 事件类型 String 是 com.github.pull_request.opened
*/
private String type = "com.github.pull_request.opened";
/**
* source 事件来源上下文 String 是 https://github.com/cloudevents/spec/pull
*/
private String source = "https://github.com/cloudevents/spec/pull";
/**
* subject 事件主题 String 是 pull_request
*/
private String subject = "pull_request";
/**
* id 事件 ID String 是 UUID 标识唯一
*/
private String id = "UUID 标识唯一";
/**
* time 事件时间 String 是 格式:yyyy-MM-dd HH:mm:ss
*/
private String time = "格式:yyyy-MM-dd HH:mm:ss";
/**
* datacontentType 数据格式类型 String 是 application/json
*/
private String datacontentType = "application/json";
/**
*
*/
private Map<String, String> data = new HashMap<>();
}
package com.neotel.smfcore.custom.gree20242.bean;
import lombok.Data;
import java.io.Serializable;
@Data
public class GreeResponseMsg implements Serializable {
/**
* id 事件 ID String 是 与请求的事件 ID 一一对应
*/
private String id;
/**
* feedbackTime 反馈时间 String 是 格式:yyyy-MM-dd HH:mm:ss
*/
private String feedbackTime;
/**
* code 结果码 Integer 是 0=成功, 其他为失败(如1001为参数错误)
*/
private String code;
/**
* message 消息内容 String 是 消息内容
*/
private String message;
/**
* data 返回的数据 Object 是 JSON 格式
*/
private String data;
}
支持 Markdown 格式
你添加了 0 到此讨论。请谨慎行事。
Finish editing this message first!