Commit 8ee5ccf5 zshaohui

1.黑灯工厂增加设备状态推送

2.增加errorcode信息 传给mes
1 个父辈 6e1a4a95
...@@ -15,7 +15,7 @@ import org.springframework.scheduling.annotation.EnableScheduling; ...@@ -15,7 +15,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
//@EnableAsync //@EnableAsync
//@EnableScheduling @EnableScheduling
@EnableKafka @EnableKafka
@RestController @RestController
@SpringBootApplication @SpringBootApplication
......
...@@ -760,6 +760,7 @@ public class BaseDeviceHandler implements IDeviceHandler { ...@@ -760,6 +760,7 @@ public class BaseDeviceHandler implements IDeviceHandler {
statusBean.setOp(statusBeanToSave.getOp()); statusBean.setOp(statusBeanToSave.getOp());
statusBean.setSeq(statusBeanToSave.getSeq()); statusBean.setSeq(statusBeanToSave.getSeq());
statusBean.setClientIp(statusBeanToSave.getClientIp()); statusBean.setClientIp(statusBeanToSave.getClientIp());
statusBean.setErrorCode(statusBeanToSave.getErrorCode());
/** /**
* 已解除的报警信息存到数据库中 * 已解除的报警信息存到数据库中
......
...@@ -722,4 +722,14 @@ public class DataCache { ...@@ -722,4 +722,14 @@ public class DataCache {
} }
return 0; return 0;
} }
public List<Storage> getStorageByMachineId(String machineId) {
List<Storage> storageList = new ArrayList<>();
for (Storage storage : getAllStorage().values()) {
if (machineId.equals(storage.getMachineId())){
storageList.add(storage);
}
}
return storageList;
}
} }
package com.neotel.smfcore.custom.lizhen.kafka.bean;
import com.alibaba.fastjson.annotation.JSONField;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.util.List;
@Data
@ApiModel("MachineParameter")
public class MachineParameter {
@JSONField(name = "OccurrenceTime")
@ApiModelProperty("发生时间")
private String OccurrenceTime;
@JSONField(name = "MachineID")
@ApiModelProperty("机台唯一ID")
private String MachineID;
@JSONField(name = "TopicType")
@ApiModelProperty("消息类别")
private String TopicType;
@JSONField(name = "ContentType")
@ApiModelProperty("消息类别")
private String ContentType = "F";
@JSONField(name = "MachineType")
@ApiModelProperty("设备类型")
private String MachineType;
@JSONField(name = "SupplierID")
@ApiModelProperty("供应商")
private String SupplierID = "NEOTEL";
@JSONField(name = "ProgramName")
@ApiModelProperty("程序名称")
private String ProgramName;
@JSONField(name = "ProgramVersion")
@ApiModelProperty("程序版本")
private String ProgramVersion;
@JSONField(name = "SerialNumber")
@ApiModelProperty("序列号")
private String SerialNumber;
@JSONField(name = "Result")
@ApiModelProperty("结果")
private String Result;
@JSONField(name = "EmpNo")
@ApiModelProperty("人员")
private String EmpNo;
@JSONField(name = "ClientIP")
@ApiModelProperty("客户端 IP")
private String ClientIP;
@JSONField(name = "Data")
@ApiModelProperty("Data")
private List<MachineParameterData> Data;
}
package com.neotel.smfcore.custom.lizhen.kafka.bean;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class MachineParameterData {
@JSONField(name = "ParaName")
private String ParaName;
@JSONField(name = "ParaValue")
private String ParaValue;
}
...@@ -10,4 +10,10 @@ public class KafkaConfig { ...@@ -10,4 +10,10 @@ public class KafkaConfig {
* 心跳主题 * 心跳主题
*/ */
public static final String HEARTBEAT_TOPIC = "Heartbeat"; public static final String HEARTBEAT_TOPIC = "Heartbeat";
/**
* MachineParameter
*/
public static final String MACHINEPARAMETER_TOPIC = "MachineParameter";
} }
package com.neotel.smfcore.custom.lizhen.kafka.config;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class StorageExportConfig {
//private final static List<String> exportStrArr = Arrays.asList("MU1_1", "MU4_1", "MU2_1", "MU5_1", "MU1_2", "MU4_2", "MU5_2", "MU3_1");
public static Map<String, String> exportMap = new ConcurrentHashMap<>();
@PostConstruct
public void initMap() {
exportMap.put("MU1_1", "SJ202306205006");
exportMap.put("MU1_2", "SJ202306205007");
exportMap.put("MU2_1", "SJ202306205005");
exportMap.put("MU3_1", "SJ202306205008");
exportMap.put("MU4_1", "SJ202306205003");
exportMap.put("MU4_2", "SJ202306205004");
exportMap.put("MU5_1", "SJ202306205001");
exportMap.put("MU5_2", "SJ202306205002");
}
public static String getMachineId(String key) {
return exportMap.get(key);
}
}
package com.neotel.smfcore.custom.lizhen.kafka.service; package com.neotel.smfcore.custom.lizhen.kafka.service;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.neotel.smfcore.common.utils.StringUtils; import com.neotel.smfcore.common.utils.StringUtils;
import com.neotel.smfcore.core.device.bean.StatusBean; import com.neotel.smfcore.core.device.bean.StatusBean;
import com.neotel.smfcore.core.device.enums.BOX_STATUS; import com.neotel.smfcore.core.device.enums.BOX_STATUS;
import com.neotel.smfcore.core.device.enums.OP;
import com.neotel.smfcore.core.device.enums.OP_STATUS;
import com.neotel.smfcore.core.device.util.DataCache; import com.neotel.smfcore.core.device.util.DataCache;
import com.neotel.smfcore.core.storage.service.manager.IStoragePosManager;
import com.neotel.smfcore.core.storage.service.po.Storage; import com.neotel.smfcore.core.storage.service.po.Storage;
import com.neotel.smfcore.core.system.service.manager.IDataLogManager;
import com.neotel.smfcore.core.system.util.DevicesStatusUtil; import com.neotel.smfcore.core.system.util.DevicesStatusUtil;
import com.neotel.smfcore.custom.lizhen.innerBox.bean.StorageExport;
import com.neotel.smfcore.custom.lizhen.innerBox.util.StorageExportUtil;
import com.neotel.smfcore.custom.lizhen.kafka.bean.Heartbeat; import com.neotel.smfcore.custom.lizhen.kafka.bean.Heartbeat;
import com.neotel.smfcore.custom.lizhen.kafka.bean.MachineParameter;
import com.neotel.smfcore.custom.lizhen.kafka.bean.MachineParameterData;
import com.neotel.smfcore.custom.lizhen.kafka.bean.MachineStatus; import com.neotel.smfcore.custom.lizhen.kafka.bean.MachineStatus;
import com.neotel.smfcore.custom.lizhen.kafka.config.KafkaConfig; import com.neotel.smfcore.custom.lizhen.kafka.config.KafkaConfig;
import com.neotel.smfcore.custom.lizhen.kafka.config.StorageExportConfig;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFuture;
import javax.annotation.PostConstruct;
import java.util.*; import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
//@Async //@Async
...@@ -37,25 +45,17 @@ public class KafkaService { ...@@ -37,25 +45,17 @@ public class KafkaService {
@Autowired @Autowired
private KafkaTemplate kafkaTemplate; private KafkaTemplate kafkaTemplate;
@Autowired
private IDataLogManager dataLogManager;
Map<String,String> statusMap = Maps.newConcurrentMap();
Map<String, String> statusMap = Maps.newConcurrentMap();
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1);
@PostConstruct
void init(){
scheduledThreadPool.scheduleAtFixedRate(
() -> {
setMachineStatus();
setHeartbeat();
},5,60, TimeUnit.SECONDS
);
}
/** /**
* 设备状态发送 * 设备状态发送
*/ */
//@Scheduled(fixedRate = 1000 * 60 * 2) @Scheduled(fixedRate = 1000 * 60 * 1)
public void setMachineStatus() { public void setMachineStatus() {
log.info("发送设备状态开始"); log.info("发送设备状态开始");
Collection<Storage> storages = dataCache.getAllStorage().values(); Collection<Storage> storages = dataCache.getAllStorage().values();
...@@ -68,6 +68,7 @@ public class KafkaService { ...@@ -68,6 +68,7 @@ public class KafkaService {
//故障 //故障
boolean fault = false; boolean fault = false;
List<Map<String, String>> msgList = new ArrayList<>();
for (Storage storage : storages) { for (Storage storage : storages) {
if (machineId.equals(storage.getMachineId())) { if (machineId.equals(storage.getMachineId())) {
StatusBean statusBean = DevicesStatusUtil.getStatusBean(storage.getCid()); StatusBean statusBean = DevicesStatusUtil.getStatusBean(storage.getCid());
...@@ -77,11 +78,21 @@ public class KafkaService { ...@@ -77,11 +78,21 @@ public class KafkaService {
if (statusBean.timeOut()) { if (statusBean.timeOut()) {
offline = true; offline = true;
} else { } else {
int status = statusBean.getStatus(); if (!statusBean.isAvailable()) {
if (status == BOX_STATUS.EMERGENCY || status == BOX_STATUS.PROBLEM) {
fault = true; fault = true;
} }
} }
String msg = statusBean.getMsg();
String errorCode = statusBean.getErrorCode();
String[] msgStr = msg.split("\r\n", -1);
String[] errorCodeStr = errorCode.split("\r\n", -1);
for (int i = 0; i < errorCodeStr.length; i++) {
Map<String, String> map = new HashMap<>();
map.put("errorCode", errorCodeStr[i]);
map.put("msg", msgStr[i]);
msgList.add(map);
}
} }
} }
} }
...@@ -95,27 +106,39 @@ public class KafkaService { ...@@ -95,27 +106,39 @@ public class KafkaService {
} }
//判断状态与上一次是否一致 //判断状态与上一次是否一致
String lastStatus = statusMap.get(machineId); /*String lastStatus = statusMap.get(machineId);
if (StringUtils.isNotBlank(lastStatus) && lastStatus.equals(currentStatus)) { if (StringUtils.isNotBlank(lastStatus) && lastStatus.equals(currentStatus)) {
log.info(machineId + "与上一次通知状态相同,跳过" + currentStatus); log.info(machineId + "与上一次通知状态相同,跳过" + currentStatus);
continue; continue;
} }
*/
if("1".equals(currentStatus) || "3".equals(currentStatus)) { if ("1".equals(currentStatus) || "3".equals(currentStatus)) {
MachineStatus status = new MachineStatus(); MachineStatus status = new MachineStatus();
String dateStr = DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss.SSS"); String dateStr = DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss.SSS");
status.setOccurrenceTime(dateStr); status.setOccurrenceTime(dateStr);
status.setMachineID(machineId); status.setMachineID(machineId);
status.setTopicType(KafkaConfig.MACHINESTATUS_TOPIC); status.setTopicType(KafkaConfig.MACHINESTATUS_TOPIC);
status.setCurrentStatus(currentStatus); status.setCurrentStatus(currentStatus);
status.setClientIP("");
if (msgList != null && !msgList.isEmpty()) {
for (Map<String, String> map : msgList) {
status.setErrorCode(map.get("errorCode"));
status.setErrorMsg(map.get("msg"));
String statusStr = JSON.toJSONString(status);
log.info("主题为:" + KafkaConfig.MACHINESTATUS_TOPIC + "内容为:" + statusStr);
ListenableFuture future = kafkaTemplate.send(KafkaConfig.MACHINESTATUS_TOPIC, statusStr);
log.info("返回结果为:" + JSON.toJSONString(future));
}
} else {
status.setErrorCode(""); status.setErrorCode("");
status.setErrorMsg(""); status.setErrorMsg("");
status.setClientIP("");
String statusStr = JSON.toJSONString(status); String statusStr = JSON.toJSONString(status);
log.info("主题为:" + KafkaConfig.MACHINESTATUS_TOPIC + "内容为:" + statusStr); log.info("主题为:" + KafkaConfig.MACHINESTATUS_TOPIC + "内容为:" + statusStr);
ListenableFuture future = kafkaTemplate.send(KafkaConfig.MACHINESTATUS_TOPIC, statusStr); ListenableFuture future = kafkaTemplate.send(KafkaConfig.MACHINESTATUS_TOPIC, statusStr);
log.info("返回结果为:" + JSON.toJSONString(future)); log.info("返回结果为:" + JSON.toJSONString(future));
}
statusMap.put(machineId, currentStatus); statusMap.put(machineId, currentStatus);
} }
} }
...@@ -126,7 +149,7 @@ public class KafkaService { ...@@ -126,7 +149,7 @@ public class KafkaService {
/** /**
* 心跳数据发送 * 心跳数据发送
*/ */
//@Scheduled(fixedRate = 1000 * 60 * 5) @Scheduled(fixedRate = 1000 * 60 * 5)
public void setHeartbeat() { public void setHeartbeat() {
log.info("发送心跳开始"); log.info("发送心跳开始");
//根据machineId,找到设备状态,是否正常 //根据machineId,找到设备状态,是否正常
...@@ -169,6 +192,113 @@ public class KafkaService { ...@@ -169,6 +192,113 @@ public class KafkaService {
} }
/**
* 设备状态发送
*/
@Scheduled(fixedRate = 1000 * 60 * 1)
public void setStorageExportStatus() {
log.info("发送出料口信息开始");
for (Map.Entry<String, StorageExport> exportEntry : StorageExportUtil.exportMap.entrySet()) {
if (exportEntry.getKey().contains(StorageExportUtil.OUT_STATION)) {
continue;
}
//默认是等待中2,禁用是故障3,有任务是运行1
String currentStatus = "2";
StorageExport export = exportEntry.getValue();
if (export.isDisable()) {
currentStatus = "3";
} else {
if (StringUtils.isNotBlank(export.getHSerial())) {
currentStatus = "1";
}
}
MachineStatus status = new MachineStatus();
String dateStr = DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss.SSS");
status.setOccurrenceTime(dateStr);
status.setMachineID(StorageExportConfig.getMachineId(exportEntry.getKey()));
status.setTopicType(KafkaConfig.MACHINESTATUS_TOPIC);
status.setCurrentStatus(currentStatus);
status.setErrorCode("");
status.setErrorMsg("");
status.setClientIP("");
String statusStr = JSON.toJSONString(status);
log.info("出料口主题为:" + KafkaConfig.MACHINESTATUS_TOPIC + "内容为:" + statusStr);
ListenableFuture future = kafkaTemplate.send(KafkaConfig.MACHINESTATUS_TOPIC, statusStr);
log.info("出料口返回结果为:" + JSON.toJSONString(future));
}
log.info("发送出料口信息结束");
}
/**
* MachineParameter发送
*/
@Scheduled(fixedRate = 1000 * 60 * 5)
public void setMachineParameter() {
log.info("MachineParameter开始发送");
List<String> machineIdList = getMachineIdList();
for (String machineId : machineIdList) {
List<String> storageIdList = new ArrayList<>();
int totalCount = 0;
int usedCount = 0;
int emptyCount = 0;
List<Storage> storageList = dataCache.getStorageByMachineId(machineId);
for (Storage storage : storageList) {
totalCount = totalCount + storage.getTotalSlots();
emptyCount = emptyCount + storage.getEmptySlots();
usedCount = usedCount + (storage.getTotalSlots() - storage.getEmptySlots());
storageIdList.add(storage.getId());
}
int todayInCount = getTodayInOutCount(storageIdList, OP.PUT_IN);
int todayOutCount = getTodayInOutCount(storageIdList, OP.CHECKOUT);
MachineParameter machineParameter = new MachineParameter();
String dateStr = DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss.SSS");
machineParameter.setOccurrenceTime(dateStr);
machineParameter.setMachineID(machineId);
machineParameter.setContentType("F");
machineParameter.setMachineType("智能仓储位");
machineParameter.setSupplierID("NEOTEL");
machineParameter.setProgramName("");
machineParameter.setProgramVersion("");
machineParameter.setSerialNumber("");
machineParameter.setResult("");
machineParameter.setEmpNo("");
machineParameter.setClientIP("");
List<MachineParameterData> dataList = new ArrayList<>();
dataList.add(new MachineParameterData("totalCount", totalCount + ""));
dataList.add(new MachineParameterData("emptyCount", emptyCount + ""));
dataList.add(new MachineParameterData("usedCount", usedCount + ""));
dataList.add(new MachineParameterData("todayInCount", todayInCount + ""));
dataList.add(new MachineParameterData("todayOutCount", todayOutCount + ""));
machineParameter.setData(dataList);
String machineParameterStr = JSON.toJSONString(machineParameter);
log.info("MachineParameter主题为:" + KafkaConfig.MACHINEPARAMETER_TOPIC + "内容为:" + machineParameterStr);
ListenableFuture future = kafkaTemplate.send(KafkaConfig.MACHINEPARAMETER_TOPIC, machineParameterStr);
log.info("MachineParameter返回结果为:" + JSON.toJSONString(future));
}
log.info("MachineParameter结束发送");
}
public int getTodayInOutCount(List<String> storageIdList, int type) {
String todayStr = DateUtil.today();
DateTime today = DateUtil.parse(todayStr, "yyyy-MM-dd");
Query q = new Query();
Criteria c = new Criteria();
c.and("updateDate").gte(today); //时间大于今天
c.and("status").in(OP_STATUS.END.name(), OP_STATUS.FINISHED.name()); // 任务是完成的
c.and("storageId").in(storageIdList);
c.and("type").is(type);
return dataLogManager.countByQuery(q.addCriteria(c));
}
public List<String> getMachineIdList() { public List<String> getMachineIdList() {
//找出所有的machineId //找出所有的machineId
List<String> machineIdList = new ArrayList<>(); List<String> machineIdList = new ArrayList<>();
......
支持 Markdown 格式
你添加了 0 到此讨论。请谨慎行事。
Finish editing this message first!