Commit 033cdbaa LN

增加kafka通信

1 个父辈 f87865b2
......@@ -268,6 +268,58 @@
<artifactId>spring-boot-autoconfigure</artifactId>
<version>2.7.1</version>
</dependency>
<!--kafka依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-kafka</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-amqp-proton</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-json-jackson</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.1</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>io.cloudevents</groupId>-->
<!-- <artifactId>cloudevents-core</artifactId>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>io.cloudevents</groupId>-->
<!-- <artifactId>cloudevents-json-jackson</artifactId>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>io.cloudevents</groupId>-->
<!-- <artifactId>cloudevents-kafka</artifactId>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>io.cloudevents</groupId>-->
<!-- <artifactId>cloudevents-amqp-proton</artifactId>-->
<!-- </dependency>-->
</dependencies>
......
package com.neotel.smfcore.common.kafka;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSONObject;
import com.neotel.smfcore.common.utils.JsonUtil;
import com.neotel.smfcore.custom.gree20242.bean.GreeRequestMsg;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.kafka.CloudEventSerializer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.util.Properties;
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaMsgUtil {
private final KafkaTemplate<String, Object> kafkaTemplate;
@Value("${spring.kafka.bootstrap-servers}")
private String kafka_servers;
@Value("${spring.kafka.topic: Topic_task_dzlc}")
public final String kafka_topic="Topic_task_dzlc";
@Value("${spring.kafka.msg.header.specVersion: 1.0}")
public final String header_specVersion = "1.0";
@Value("${spring.kafka.msg.header.type: com.github.pull_request.opened}")
public final String header_type = "com.github.pull_request.opened";
@Value("${spring.kafka.msg.header.source: https://github.com/cloudevents/spec/pull}")
public final String header_source = "https://github.com/cloudevents/spec/pull";
@Value("${spring.kafka.msg.header.subject: pull_request}")
public final String header_subject = "pull_request";
@Value("${spring.kafka.msg.header.dataContenType: application/json}")
public final String header_dataContenType = "application/json";
private String getTopic(String topic){
if(ObjectUtil.isEmpty(topic)){
topic=this.kafka_topic;
}
return topic;
}
public void test(String topic, GreeRequestMsg msg, ListenableFutureCallback successCallback) {
topic=getTopic(topic);
ProducerRecord<String, Object> pr = new ProducerRecord<>(topic, "sendData");
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(pr);
if (successCallback != null) {
future.addCallback(successCallback);
}
}
public void sendMsg(String topic, GreeRequestMsg msg, ListenableFutureCallback successCallback) {
topic=getTopic(topic);
String sendData = JsonUtil.toJsonStr(msg.getData());
ProducerRecord<String, Object> pr = new ProducerRecord<>(topic, sendData);
pr.headers().add("specVersion", header_specVersion.getBytes(StandardCharsets.UTF_8));
pr.headers().add("type", header_type.getBytes(StandardCharsets.UTF_8));
pr.headers().add("source", header_source.getBytes(StandardCharsets.UTF_8));
pr.headers().add("subject", header_subject.getBytes(StandardCharsets.UTF_8));
pr.headers().add("id", msg.getId().getBytes(StandardCharsets.UTF_8));
pr.headers().add("time", msg.getTime().getBytes(StandardCharsets.UTF_8));
pr.headers().add("datacontentType", header_dataContenType.getBytes(StandardCharsets.UTF_8));
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(pr);
if (successCallback != null) {
future.addCallback(successCallback);
}
}
public CloudEvent getCloudEvent(GreeRequestMsg msg) {
String data = JSONObject.toJSONString(msg.getData());
CloudEvent event = CloudEventBuilder.v1()
.withType(header_type) //事件类型
.withSource(URI.create(header_source)) //事件来源上下文
.withSubject(header_subject) //事件主题
.withId(msg.getId()) //事件id
.withTime(OffsetDateTime.now()) //
.withDataContentType(header_dataContenType) //数据格式类型
.withData(header_dataContenType, data.getBytes()) //数据
.build();
return event;
}
public void sendCloudMsg(String topic, GreeRequestMsg msg,Callback callback) {
topic=getTopic(topic);
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,kafka_servers);
// Other config props
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class);
try (KafkaProducer<String, CloudEvent> producer = new KafkaProducer<>(props)) {
// Build an event
CloudEvent event = getCloudEvent(msg);
// Produce the event
producer.send(new ProducerRecord<>(topic, event), callback);
producer.close();
} catch (Exception ex) {
log.error("CloudEventProducer 出错" + ex.toString());
}
}
// /**
// * // * 监听消息 Topic_task_dzlc
// * //
// */
// @KafkaListener(topics = {"Topic_task_dzlc"}, groupId = "${spring.kafka.consumer.group-id}")
// public void kafkaListener(ConsumerRecord<?, ?> record) {
// try {
// log.info("---------------kafkaListener 收到消息-----------------");
// log.info(":" + record.topic() + "--offset=" + record.offset() + "--" + record.partition() + "----" + record.value());
// log.info(":" + record.topic() + "--header=" + record.headers());
//
// } catch (Exception ex) {
// log.error(" kafkaListener 出错:" + ex.toString());
// }
//
// }
}
package com.neotel.smfcore.common.kafka;
import org.apache.kafka.clients.producer.Callback;
import com.neotel.smfcore.common.bean.ResultBean;
import com.neotel.smfcore.common.utils.JsonUtil;
import com.neotel.smfcore.custom.gree20242.bean.GreeRequestMsg;
import com.neotel.smfcore.security.annotation.AnonymousAccess;
import io.swagger.annotations.Api;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
@Slf4j
@Api(tags = "KAFKA TEST")
@RequestMapping("api/kafka")
public class KafkaTestController {
@Autowired
private KafkaMsgUtil kafkaMsgUtil;
private GreeRequestMsg getDMsg(){
Map<String, String> params = new HashMap<>();
params.put("reelId", "reelId00999");
params.put("slot", "A3_23");
params.put("deviceId", "1");
GreeRequestMsg msgB = GreeRequestMsg.newMsg("pull_request", params);
return msgB;
}
/**
* 测试卡夫卡消息
* @return 结果
*/
@GetMapping("/kafkaTest")
public ResultBean kafkaTest() {
kafkaMsgUtil.test("newTopic",getDMsg(),null);
return ResultBean.newOkResult("end");
}
@AnonymousAccess
@GetMapping("/sendTest")
public ResultBean sendTest(String topicName ) {
GreeRequestMsg msgB =getDMsg();
ListenableFutureCallback callback = new ListenableFutureCallback() {
@Override
public void onSuccess(Object result) {
log.info("onSuccess");
}
@Override
public void onFailure(Throwable ex) {
// 发送失败后的处理逻辑
log.info("onFailure");
}
};
// kafkaMsgUtil.test(topicName, msgB, callback);
String sendData = JsonUtil.toJsonStr(msgB);
log.info("kafka sendTest: 准备发送 topic=" + topicName + ",sendData=" + sendData);
kafkaMsgUtil.sendMsg(topicName, msgB, callback);
log.info("kafka sendTest: 发送完成 topic=" + topicName + ",sendData=" + sendData);
return ResultBean.newOkResult("kafka sendTest OK,topic=[" + topicName + "],sendData=[" + sendData + "]");
}
@AnonymousAccess
@GetMapping("/sendCloudMsg")
public ResultBean sendCloudMsg(String topicName ) {
GreeRequestMsg msgB =getDMsg();
String sendData = JsonUtil.toJsonStr(msgB);
log.info("kafka sendCloudMsg: 准备发送 topic=" + topicName + ",sendData=" + sendData);
Callback callBack=new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
log.info(e.toString());
} else {
log.info("onCompletion,topic=[" + recordMetadata.topic() + ", ofofSet=[" + recordMetadata.offset() + "]," + recordMetadata.serializedKeySize() + "," + recordMetadata.serializedValueSize() + "]");
}
}
};
kafkaMsgUtil.sendCloudMsg(topicName, msgB,callBack);
log.info("kafka sendCloudMsg: 发送完成 topic=" + topicName + ",sendData=" + sendData);
return ResultBean.newOkResult("kafka sendCloudMsg OK,topic=[" + topicName + "],sendData=[" + sendData + "]");
}
}
\ No newline at end of file
package com.neotel.smfcore.custom.gree20242.api;
import com.neotel.smfcore.common.kafka.KafkaMsgUtil;
import com.neotel.smfcore.common.utils.JsonUtil;
import com.neotel.smfcore.core.api.listener.BaseSmfApiListener;
import com.neotel.smfcore.core.order.service.po.LiteOrder;
import com.neotel.smfcore.core.system.service.po.DataLog;
import com.neotel.smfcore.custom.gree20242.bean.GreeRequestMsg;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.HashMap;
......@@ -20,6 +23,7 @@ public class GreeApi extends BaseSmfApiListener {
@Autowired
// private MqttGateway mqttGateway;
private KafkaMsgUtil msgUtil;
@Override
public void inTaskStatusChange(String inNotifyUrl, DataLog task) {
......@@ -64,11 +68,20 @@ public class GreeApi extends BaseSmfApiListener {
public boolean SendMsg(String url,GreeRequestMsg msgBean ) {
// //url地址配置为发送主题
// String msg = JsonUtil.toJsonStr(msgBean);
// mqttGateway.sendToMqtt(url, msg);
// log.info("GreeApi SendMsg: topic=[" + url + "],sendData=" + msg);
// GreeMsgCache.addSendMsg(msgBean);
//url地址配置为发送主题
String msg = JsonUtil.toJsonStr(msgBean);
msgUtil.sendCloudMsg("", msgBean, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
log.info(e.toString());
} else {
log.info("onCompletion,topic=[" + metadata.topic() + ", ofofSet=[" + metadata.offset() + "]," + metadata.serializedKeySize() + "," + metadata.serializedValueSize() + "]");
}
}
});
log.info("GreeApi SendMsg: topic=[" + url + "],sendData=" + msg);
GreeMsgCache.addSendMsg(msgBean);
return true;
}
......
......@@ -4,6 +4,7 @@ package com.neotel.smfcore.custom.gree20242.api;
import cn.hutool.core.util.ObjectUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.neotel.smfcore.common.kafka.KafkaMsgUtil;
import com.neotel.smfcore.common.utils.JsonUtil;
import com.neotel.smfcore.common.utils.StorageConstants;
import com.neotel.smfcore.core.device.bean.BoxStatusBean;
......@@ -19,8 +20,10 @@ import com.neotel.smfcore.core.system.util.DevicesStatusUtil;
import com.neotel.smfcore.custom.gree20242.bean.GreeRequestMsg;
import com.neotel.smfcore.custom.gree20242.bean.GreeResponseMsg;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
......@@ -40,10 +43,22 @@ public class GreeMsgHandlerImpl implements MessageHandler {
@Autowired
private DataCache dataCache;
// @Autowired
@Autowired
private KafkaMsgUtil msgUtil;
// private MqttGateway mqttGateway;
@KafkaListener(topics = {"Topic_task_dzlc"}, groupId = "${spring.kafka.consumer.group-id}")
public void kafkaListener(ConsumerRecord<?, ?> record) {
try {
log.info("---------------kafkaListener 收到消息-----------------");
log.info(":" + record.topic() + "--offset=" + record.offset() + "--" + record.partition() + "----" + record.value());
log.info(":" + record.topic() + "--header=" + record.headers());
} catch (Exception ex) {
log.error(" kafkaListener 出错:" + ex.toString());
}
}
// 当然可以,您可以在一个新的类中创建一个新的处理方法,然后将其注入到主类中,这样可以使代码更加模块化和易于管理。例如: ```java @Component
// public class MessageHandlerImpl implements MessageHandler {
......
......@@ -12,18 +12,19 @@ public class GreeRequestMsg<T> implements Serializable {
public static String getUUID() {
System.out.println(UUID.randomUUID());
String uuid = UUID.randomUUID().toString().trim().replaceAll("-", "");
// String uuid = UUID.randomUUID().toString().trim().replaceAll("-", "");
String uuid = UUID.randomUUID().toString();
return uuid;
}
public static GreeRequestMsg newMsg(String suject,Object params){
public static GreeRequestMsg newMsg(String subject, Object params){
GreeRequestMsg msg=new GreeRequestMsg() ;
msg.setSpecVersion("1.0");
msg.setType("com.github.pull_request.opened");
msg.setSource("https://github.com/cloudevents/spec/pull");
// msg.setSpecVersion("1.0");
// msg.setType("com.github.pull_request.opened");
// msg.setSource("https://github.com/cloudevents/spec/pull");
// msg.setSubject("pull_request");
// msg.setDatacontentType("application/json");
msg.setSubject("pull_request");
msg.setDatacontentType("application/json");
msg.setSubject(suject);
msg.setData(params);
String time=DateUtil.toDateString(new Date(),"yyyy-MM-dd HH:mm:ss");
// String uid=DateUtil.toDateString(new Date(),"yyyyMMddHHmmss");
......@@ -35,35 +36,39 @@ public class GreeRequestMsg<T> implements Serializable {
/**
* specVersion CloudEvents 规范版本 String 是"1.0"
*/
private String specVersion;
/**
* type 事件类型 String 是 com.github.pull_request.opened
*/
private String type;
/**
* source 事件来源上下文 String 是 https://github.com/cloudevents/spec/pull
*/
private String source ;
/**
* subject 事件主题 String 是 pull_request
*/
// /**
// * specVersion CloudEvents 规范版本 String 是"1.0"
// */
// private String specVersion;
// /**
// * type 事件类型 String 是 com.github.pull_request.opened
// */
// private String type;
// /**
// * source 事件来源上下文 String 是 https://github.com/cloudevents/spec/pull
// */
// private String source ;
// /**
// * subject 事件主题 String 是 pull_request
// */
private String subject ;
//
// /**
// * datacontentType 数据格式类型 String 是 application/json
// */
// private String datacontentType ;
/**
* id 事件 ID String 是 UUID 标识唯一
*/
private String id;
/**
* time 事件时间 String 是 格式:yyyy-MM-dd HH:mm:ss
*/
private String time ;
/**
* datacontentType 数据格式类型 String 是 application/json
*/
private String datacontentType ;
/**
*
*/
private T data ;
......
支持 Markdown 格式
你添加了 0 到此讨论。请谨慎行事。
Finish editing this message first!