Commit d13cfc7c by 韩斌

[20240315007-ZY20240415002] 完成控制端与采集端的指令下达和反馈通信

parent 5d153182
package org.gidea.scada.monitor.rabbit.listener; package org.gidea.scada.monitor.rabbit.listener;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.gidea.scada.monitor.rabbit.entity.MonitorMessage;
import org.gidea.scada.monitor.rabbit.handler.RabbitMessageHandler; import org.gidea.scada.monitor.rabbit.handler.RabbitMessageHandler;
import org.gidea.scada.monitor.utils.EmptyUtil;
import org.gidea.scada.monitor.utils.MonitorUtils; import org.gidea.scada.monitor.utils.MonitorUtils;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
...@@ -46,13 +43,6 @@ public class MonitorRabbitListener { ...@@ -46,13 +43,6 @@ public class MonitorRabbitListener {
@Override @Override
public void onMessage(Message message, Channel channel) { public void onMessage(Message message, Channel channel) {
String messageStr = new String(message.getBody()); String messageStr = new String(message.getBody());
if (messageStr.startsWith("{") && messageStr.endsWith("}")) {
MonitorMessage data = JSONObject.parseObject(messageStr, MonitorMessage.class);
if (EmptyUtil.isEmpty(data.getClientId()) || monitorName.equals(data.getClientId())) {
messageHandler.handleMessage(deviceId, messageStr);
}
return;
}
messageHandler.handleMessage(deviceId, messageStr); messageHandler.handleMessage(deviceId, messageStr);
} }
}); });
......
package org.gidea.scada.monitor.config; package org.gidea.scada.monitor.config;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.gidea.scada.monitor.utils.Constants; import org.springframework.amqp.core.AcknowledgeMode;
import org.gidea.scada.monitor.utils.EmptyUtil;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.IOException;
@Log4j2 @Log4j2
@EnableRabbit @EnableRabbit
...@@ -75,4 +72,13 @@ public class RabbitMQConfig { ...@@ -75,4 +72,13 @@ public class RabbitMQConfig {
RabbitAdmin admin = new RabbitAdmin(connectionFactory); RabbitAdmin admin = new RabbitAdmin(connectionFactory);
return admin; return admin;
} }
public JSONObject getRabbitInfo() {
JSONObject rabbitInfo = new JSONObject();
rabbitInfo.put("host", host);
rabbitInfo.put("port", port);
rabbitInfo.put("username", username);
rabbitInfo.put("password", password);
return rabbitInfo;
}
} }
...@@ -4,54 +4,21 @@ import com.alibaba.fastjson.JSONObject; ...@@ -4,54 +4,21 @@ import com.alibaba.fastjson.JSONObject;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
@Data @Data
@NoArgsConstructor @NoArgsConstructor
public class MonitorMessage { public class MonitorMessage {
/** /**
* 客户端ID
*/
private String clientId;
/**
* 采集端ID * 采集端ID
*/ */
private String collectId; private String bscode;
/**
* 设备标识
*/
private String deviceId;
/**
* 下发命令
*/
private String command;
/** /**
* 下发命令时间 * 下发命令
*/ */
private LocalDateTime issueTime; private int cmd;
/** /**
* 命令响应结果 * 数据
*/ */
private String response; private JSONObject data;
/**
* 响应时间
*/
private LocalDateTime responseTime;
public MonitorMessage(String clientId, String collectId, String deviceId, String command) {
this.clientId = clientId;
this.collectId = collectId;
this.deviceId = deviceId;
this.command = command;
this.issueTime = LocalDateTime.now();
}
public MonitorMessage(String clientId, String deviceId, String command) {
this.clientId = clientId;
this.deviceId = deviceId;
this.command = command;
this.issueTime = LocalDateTime.now();
}
@Override @Override
public String toString() { public String toString() {
......
...@@ -13,6 +13,8 @@ public class Constants { ...@@ -13,6 +13,8 @@ public class Constants {
public static final String MonitorClientExchangeName = "monitor.client"; public static final String MonitorClientExchangeName = "monitor.client";
public static final String MonitorCjdExchangeName = "monitor.cjd";
public static final String MqttTopicName = "mqtt"; public static final String MqttTopicName = "mqtt";
......
...@@ -3,7 +3,7 @@ package org.gidea.scada.monitor.utils; ...@@ -3,7 +3,7 @@ package org.gidea.scada.monitor.utils;
public class MonitorUtils { public class MonitorUtils {
public static String getClientExchangeName(String deviceId) { public static String getClientExchangeName(String deviceId) {
return Constants.MonitorClientExchangeName + Constants.TopicSpacer + deviceId; return RabbitExchangeEnum.MonitorDeviceData.exchangeName + Constants.TopicSpacer + deviceId;
} }
public static String getRequestQueueName(String deviceId, String monitorName) { public static String getRequestQueueName(String deviceId, String monitorName) {
......
package org.gidea.scada.monitor.utils;
public enum RabbitExchangeEnum {
MonitorCommandService("monitor.command.service", "MonitorCommandService"),
MonitorCommandClient("monitor.command.client", "MonitorCommandClient"),
MonitorCjdService("monitor.cjd.service", "MonitorCjdService"),
MonitorCjdClient("monitor.cjd.client", "MonitorCjdClient"),
MonitorDeviceData("monitor.device.data.", "MonitorDeviceData"),
MonitorDeviceJh("monitor.device.jh.", "MonitorDeviceJh"),
;
public final String exchangeName;
public final String queueName;
RabbitExchangeEnum(String exchangeName, String queueName) {
this.exchangeName = exchangeName;
this.queueName = queueName;
}
}
...@@ -29,6 +29,10 @@ ...@@ -29,6 +29,10 @@
<artifactId>activemq-mqtt</artifactId> <artifactId>activemq-mqtt</artifactId>
<version>5.16.2</version> <!-- 使用适当的版本 --> <version>5.16.2</version> <!-- 使用适当的版本 -->
</dependency> </dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ip</artifactId>
</dependency>
</dependencies> </dependencies>
......
package org.gidea.scada.monitor.mqtt.handler; package org.gidea.scada.monitor.mqtt.handler;
import com.alibaba.fastjson.JSONObject;
import org.gidea.scada.monitor.rabbit.entity.MonitorMessage;
import org.gidea.scada.monitor.utils.MonitorUtils;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.jms.*;
import java.time.LocalDateTime;
@Component @Component
public class MqttHandler { public class MqttHandler {
@Resource(name = "mqttSession")
private Session session;
public void handler(String message) {
MonitorMessage msg = JSONObject.parseObject(message, MonitorMessage.class);
msg.setResponse("响应成功");
msg.setResponseTime(LocalDateTime.now());
// 创建目标
Destination destination = null;
try {
destination = session.createTopic(MonitorUtils.getMqttQueueName(msg.getDeviceId()));
// 创建生产者
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 创建消息
BytesMessage textMessage = session.createBytesMessage();
textMessage.writeBytes(msg.toString().getBytes());
// 发送消息
producer.send(textMessage);
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
} }
package org.gidea.scada.monitor.mqtt.listener; package org.gidea.scada.monitor.mqtt.listener;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.advisory.DestinationSource;
import org.apache.activemq.command.ActiveMQTopic;
import org.gidea.scada.monitor.utils.MonitorUtils;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.jms.*;
import java.util.Set;
@Log4j2
@Component
public class MonitorMqttListener { public class MonitorMqttListener {
@Resource(name = "mqttConnection")
private ActiveMQConnection connection;
@Resource(name = "mqttSession")
private Session session;
@Resource(name = "mqttListeners")
private Set<String> mqttListeners;
@Resource
private Channel channel;
@Scheduled(fixedDelay = 10000)
public void listener() throws Exception {
DestinationSource destinationSource = connection.getDestinationSource();
//获取有多少个主题
Set<ActiveMQTopic> topics = destinationSource.getTopics();
for (ActiveMQTopic topic : topics) {
if (!mqttListeners.contains(topic.getTopicName())) {
String deviceId = MonitorUtils.getDeviceIdByMqtt(topic.getTopicName());
channel.exchangeDeclare(MonitorUtils.getClientExchangeName(deviceId), BuiltinExchangeType.FANOUT, false, false, null);
mqttListeners.add(topic.getTopicName());
//队列等待最多为1,后来的会替换新的
Destination destination = session.createTopic(topic.getTopicName() + "?consumer.prefetchSize=1");
// 第六步:使用Session对象创建一个Consumer对象。
MessageConsumer consumer = session.createConsumer(destination);
// 第七步:接收消息。
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
BytesMessage bm = (BytesMessage) message;
byte[] bys = null;
try {
//获取消息内容
bys = new byte[(int) bm.getBodyLength()];
bm.readBytes(bys);
String msg = new String(bys, "utf-8");
// 第八步:推送消息。
channel.basicPublish(MonitorUtils.getClientExchangeName(deviceId), deviceId, null, msg.toString().getBytes("UTF-8"));
} catch (Exception e) {
log.error("推送消息失败!", e);
}
}
});
}
}
}
} }
package org.gidea.scada.monitor.rabbit.config; package org.gidea.scada.monitor.rabbit.config;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import org.gidea.scada.monitor.utils.Constants; import lombok.extern.log4j.Log4j2;
import org.gidea.scada.monitor.rabbit.listener.CjdListenerHandler;
import org.gidea.scada.monitor.rabbit.listener.CommandListenerHandler;
import org.gidea.scada.monitor.udp.entity.CjdMessage;
import org.gidea.scada.monitor.udp.utils.CrcUtils;
import org.springframework.amqp.core.Exchange; import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource; import javax.annotation.Resource;
import static org.gidea.scada.monitor.utils.RabbitExchangeEnum.*;
@Log4j2
@Configuration @Configuration
public class MonitorRabbitMQConfig { public class MonitorRabbitMQConfig {
@Resource @Resource
Channel channel; private Channel channel;
@Resource
private CommandListenerHandler commandListenerHandler;
@Resource
private CjdListenerHandler cjdListenerHandler;
@SneakyThrows
@Bean(name = "commandServiceExchange")
public Exchange commandServiceExchange(RabbitAdmin rabbitAdmin, SimpleRabbitListenerContainerFactory factory) {
FanoutExchange commandServiceExchange = new FanoutExchange(MonitorCommandService.exchangeName, false, false);
rabbitAdmin.declareExchange(commandServiceExchange);
// 声明请求主队列
Queue requestQueue = new Queue(MonitorCommandService.queueName, false, false, false);
rabbitAdmin.declareQueue(requestQueue);
channel.queueBind(MonitorCommandService.queueName, MonitorCommandService.exchangeName, "");
SimpleMessageListenerContainer container = factory.createListenerContainer();
container.setQueueNames(MonitorCommandService.queueName);
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) {
String messageStr = new String(message.getBody());
log.info("MonitorService接收数据:{}",messageStr);
commandListenerHandler.handler(messageStr);
}
});
container.start();
return commandServiceExchange;
}
@SneakyThrows @SneakyThrows
@Bean @Bean(name = "commandClientExchange")
public Exchange exchange(RabbitAdmin rabbitAdmin) { public Exchange commandClientExchange(RabbitAdmin rabbitAdmin) {
FanoutExchange serviceExchange = new FanoutExchange(Constants.MonitorServiceExchangeName, true, true); FanoutExchange commandClientExchange = new FanoutExchange(MonitorCommandClient.exchangeName, false, false);
rabbitAdmin.declareExchange(serviceExchange); rabbitAdmin.declareExchange(commandClientExchange);
return commandClientExchange;
}
@SneakyThrows
@Bean(name = "cjdServiceExchange")
public Exchange cjdServiceExchange(RabbitAdmin rabbitAdmin, SimpleRabbitListenerContainerFactory factory) {
FanoutExchange cjdServiceExchange = new FanoutExchange(MonitorCjdService.exchangeName, false, false);
rabbitAdmin.declareExchange(cjdServiceExchange);
// 声明请求主队列 // 声明请求主队列
Queue requestQueue = new Queue(Constants.MonitorServiceName, true, true, true); Queue requestQueue = new Queue(MonitorCjdService.queueName, false, false, false);
rabbitAdmin.declareQueue(requestQueue); rabbitAdmin.declareQueue(requestQueue);
channel.queueBind(Constants.MonitorServiceName, Constants.MonitorServiceExchangeName, Constants.MonitorServiceExchangeName); channel.queueBind(MonitorCjdService.queueName, MonitorCjdService.exchangeName, "");
return serviceExchange; SimpleMessageListenerContainer container = factory.createListenerContainer();
container.setQueueNames(MonitorCjdService.queueName);
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) {
String messageStr = new String(message.getBody());
log.info("CjdService接收数据:{}",messageStr);
if (messageStr.length() > 5) {
String str = messageStr.substring(5);
if ((messageStr.substring(0, 4).equalsIgnoreCase(CrcUtils.getCrc(str.getBytes())))) {
CjdMessage cjdMessage = JSONObject.parseObject(str, CjdMessage.class);
cjdListenerHandler.handler(cjdMessage);
}
}
}
});
container.start();
return cjdServiceExchange;
}
@SneakyThrows
@Bean(name = "cjdClientExchange")
public Exchange cjdClientExchange(RabbitAdmin rabbitAdmin) {
FanoutExchange cjdClientExchange = new FanoutExchange(MonitorCjdClient.exchangeName, false, false);
rabbitAdmin.declareExchange(cjdClientExchange);
return cjdClientExchange;
} }
} }
package org.gidea.scada.monitor.rabbit.controller; package org.gidea.scada.monitor.rabbit.controller;
import org.gidea.scada.monitor.rabbit.entity.ExchangeEntity;
import org.gidea.scada.monitor.rabbit.entity.Result; import org.gidea.scada.monitor.rabbit.entity.Result;
import org.gidea.scada.monitor.rabbit.service.MonitorRabbitService; import org.gidea.scada.monitor.rabbit.service.MonitorRabbitService;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
...@@ -15,7 +16,7 @@ public class MonitorRabbitController { ...@@ -15,7 +16,7 @@ public class MonitorRabbitController {
private MonitorRabbitService rabbitService; private MonitorRabbitService rabbitService;
@GetMapping("/applyConnection") @GetMapping("/applyConnection")
public Result<String> applyConnection(@RequestParam String deviceId) { public Result<ExchangeEntity> applyConnection(@RequestParam String deviceId) {
return rabbitService.applyConnection(deviceId); return rabbitService.applyConnection(deviceId);
} }
} }
package org.gidea.scada.monitor.rabbit.entity;
import lombok.Data;
import org.gidea.scada.monitor.utils.RabbitExchangeEnum;
@Data
public class ExchangeEntity {
private final String monitorCommandService = RabbitExchangeEnum.MonitorCommandService.exchangeName;
private final String monitorCommandClient = RabbitExchangeEnum.MonitorCommandClient.exchangeName;
private final String monitorDeviceData;
private final String monitorDeviceJh;
public ExchangeEntity(String deviceId) {
this.monitorDeviceData = RabbitExchangeEnum.MonitorDeviceData.exchangeName + deviceId;
this.monitorDeviceJh = RabbitExchangeEnum.MonitorDeviceJh.exchangeName + deviceId;
}
}
package org.gidea.scada.monitor.rabbit.listener;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.gidea.scada.monitor.rabbit.entity.MonitorMessage;
import org.gidea.scada.monitor.udp.entity.CjdMessage;
import org.gidea.scada.monitor.utils.RabbitExchangeEnum;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
@Log4j2
@Component
public class CjdListenerHandler {
@Resource
private Channel channel;
public void handler(CjdMessage message) {
MonitorMessage monitorMessage = new MonitorMessage();
BeanUtils.copyProperties(message, monitorMessage);
try {
channel.basicPublish(RabbitExchangeEnum.MonitorCommandClient.exchangeName, "", null, JSONObject.toJSONString(monitorMessage).getBytes());
} catch (IOException e) {
log.error("响应消息失败!{}", JSONObject.toJSONString(message), e);
throw new RuntimeException(e);
}
}
}
package org.gidea.scada.monitor.rabbit.listener;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.gidea.scada.monitor.rabbit.entity.MonitorMessage;
import org.gidea.scada.monitor.udp.config.UdpServerConfig;
import org.gidea.scada.monitor.udp.entity.CjdMessage;
import org.gidea.scada.monitor.udp.utils.CrcUtils;
import org.gidea.scada.monitor.utils.RabbitExchangeEnum;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
@Component
@Log4j2
public class CommandListenerHandler {
@Resource
private UdpServerConfig udpServerConfig;
@Resource
private Channel channel;
public void handler(String message) {
MonitorMessage monitorMessage = JSONObject.parseObject(message, MonitorMessage.class);
String cjdIp = udpServerConfig.getCjdIp(monitorMessage.getBscode());
if (cjdIp == null) {
JSONObject data = new JSONObject();
data.put("time", System.currentTimeMillis());
data.put("result", "采集端未连接!");
monitorMessage.setData(data);
try {
channel.basicPublish(RabbitExchangeEnum.MonitorCommandClient.exchangeName, "", null, JSONObject.toJSONString(monitorMessage).getBytes("UTF-8"));
} catch (IOException e) {
log.error("响应消息失败!{}", JSONObject.toJSONString(monitorMessage), e);
throw new RuntimeException(e);
}
} else {
CjdMessage cjdMessage = new CjdMessage();
BeanUtils.copyProperties(monitorMessage, cjdMessage);
cjdMessage.setIp(cjdIp);
cjdMessage.setCode(0);
String string = JSONObject.toJSONString(cjdMessage);
String msg = CrcUtils.getCrc(string.getBytes()) + "=" + string;
try {
channel.basicPublish(RabbitExchangeEnum.MonitorCjdClient.exchangeName, "", null, msg.getBytes());
} catch (IOException e) {
log.error("响应消息失败!{}", msg, e);
throw new RuntimeException(e);
}
}
}
}
package org.gidea.scada.monitor.rabbit.listener; package org.gidea.scada.monitor.rabbit.listener;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.gidea.scada.monitor.mqtt.handler.MqttHandler;
import org.gidea.scada.monitor.utils.Constants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Log4j2
@Component
public class MonitorRabbitListener { public class MonitorRabbitListener {
@Resource
private MqttHandler mqttHandler;
@Resource
private Channel channel;
@RabbitListener(queues = Constants.MonitorServiceName)
public void deviceTopic(String message) {
log.info("接收消息:{}", message);
mqttHandler.handler(message);
}
} }
package org.gidea.scada.monitor.rabbit.service; package org.gidea.scada.monitor.rabbit.service;
import org.gidea.scada.monitor.rabbit.entity.ExchangeEntity;
import org.gidea.scada.monitor.rabbit.entity.Result; import org.gidea.scada.monitor.rabbit.entity.Result;
public interface MonitorRabbitService { public interface MonitorRabbitService {
Result<String> applyConnection(String deviceId); Result<ExchangeEntity> applyConnection(String deviceId);
} }
package org.gidea.scada.monitor.rabbit.service.impl; package org.gidea.scada.monitor.rabbit.service.impl;
import org.gidea.scada.monitor.rabbit.entity.ExchangeEntity;
import org.gidea.scada.monitor.rabbit.entity.Result; import org.gidea.scada.monitor.rabbit.entity.Result;
import org.gidea.scada.monitor.rabbit.service.MonitorRabbitService; import org.gidea.scada.monitor.rabbit.service.MonitorRabbitService;
import org.gidea.scada.monitor.utils.MonitorUtils;
import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -16,11 +16,13 @@ public class MonitorRabbitServiceImpl implements MonitorRabbitService { ...@@ -16,11 +16,13 @@ public class MonitorRabbitServiceImpl implements MonitorRabbitService {
private RabbitAdmin rabbitAdmin; private RabbitAdmin rabbitAdmin;
@Override @Override
public Result<String> applyConnection(String deviceId) { public Result<ExchangeEntity> applyConnection(String deviceId) {
String exchangeName = MonitorUtils.getClientExchangeName(deviceId); ExchangeEntity exchange = new ExchangeEntity(deviceId);
FanoutExchange clientExchange = new FanoutExchange(exchangeName, false, false); FanoutExchange dataExchange = new FanoutExchange(exchange.getMonitorDeviceData(), false, false);
rabbitAdmin.declareExchange(clientExchange); FanoutExchange jhExchange = new FanoutExchange(exchange.getMonitorDeviceJh(), false, false);
return Result.success(exchangeName); rabbitAdmin.declareExchange(dataExchange);
rabbitAdmin.declareExchange(jhExchange);
return Result.success(exchange);
} }
} }
package org.gidea.scada.monitor.udp.config;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.gidea.scada.monitor.config.RabbitMQConfig;
import org.gidea.scada.monitor.udp.entity.CjdMessage;
import org.gidea.scada.monitor.udp.utils.CrcUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.Filter;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.ip.dsl.Udp;
import org.springframework.integration.ip.udp.UnicastSendingMessageHandler;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.io.*;
import java.util.LinkedHashMap;
import java.util.Map;
import static org.gidea.scada.monitor.utils.RabbitExchangeEnum.*;
@Slf4j
@Configuration
public class UdpServerConfig {
@Value("${upd.server.port:23005}")
private Integer listeningPort;
@Resource
private RabbitMQConfig rabbitMQConfig;
private Map<String, String> ipMap = new LinkedHashMap<>();
private String cjdFile = System.getProperty("user.dir") + "/config/cjd.json";
@PostConstruct
private void init() {
if (new File(cjdFile).exists()) {
String line;
try {
// Read the JSON file
BufferedReader reader = new BufferedReader(new FileReader(cjdFile));
StringBuilder jsonData = new StringBuilder();
while (true) {
if (!((line = reader.readLine()) != null)) {
break;
}
jsonData.append(line);
}
reader.close();
Map<String, String> data = JSONObject.parseObject(jsonData.toString(), new TypeReference<Map<String, String>>() {
});
ipMap.putAll(data);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
/**
* UDP消息接收服务
*/
@Bean
public IntegrationFlow integrationFlow() {
log.info("UDP服务启动成功,端口号为: {}", listeningPort);
return IntegrationFlows.from(Udp.inboundAdapter(listeningPort)).channel("udpChannel").get();
}
/**
* 转换器
*/
@Transformer(inputChannel = "udpChannel", outputChannel = "udpFilter")
public CjdMessage transformer(@Payload byte[] payload) {
String message = new String(payload);
// todo 进行数据转换
try {
if (message.endsWith("\n")) {
message = message.substring(0, message.length() - 1);
}
String tempCrc = message.substring(0, 4);
String tempMessage = message.substring(5);
String crc = CrcUtils.getCrc(tempMessage.getBytes());
// 数据校验
if (crc.equalsIgnoreCase(tempCrc)) {
CjdMessage cjdMessage = JSONObject.parseObject(tempMessage, CjdMessage.class);
return cjdMessage;
}
} catch (Exception e) {
log.error("解析消息失败!", e);
}
log.info("无法解析消息:{}", message);
return null;
}
/**
* 过滤器
*/
@Filter(inputChannel = "udpFilter", outputChannel = "udpRouter")
public boolean filter(CjdMessage message) {
if (message == null) {
return false;
}
return true;
}
/**
* 路由分发处理器:可以进行分发消息被那个处理器进行处理
*/
@Router(inputChannel = "udpRouter")
public String router(CjdMessage message) {
// todo 筛选,走那个处理器
switch (message.getCode()) {
case 0:
return "messageHandle0";
case 1:
return "messageHandle1";
}
return "messageHandle";
}
/**
* 消息处理
*/
@ServiceActivator(inputChannel = "messageHandle")
private void messageHandle(CjdMessage message) {
log.info("统一消息处理:{}", JSONObject.toJSONString(message));
}
/**
* 消息处理
*/
@ServiceActivator(inputChannel = "messageHandle0", outputChannel = "responseHandle")
private CjdMessage messageHandle0(CjdMessage message) {
log.info("处理采集端主动推送数据:{}", JSONObject.toJSONString(message));
CjdMessage cjdMessage = new CjdMessage(message);
cjdMessage.setCode(1);
if (10000 == message.getCmd()) {
ipMap.put(message.getBscode(), message.getIp());
File file = new File(cjdFile);
if (!file.exists()) {
try {
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
file.createNewFile();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
String string = JSONObject.toJSONString(ipMap, true);
// 将 JSON 字符串写入文件
try (FileWriter fileWriter = new FileWriter(file)) {
fileWriter.write(string);
fileWriter.flush();
fileWriter.close();
System.out.println("JSON 数据已成功写入文件。");
} catch (IOException e) {
e.printStackTrace();
}
JSONObject data = rabbitMQConfig.getRabbitInfo();
data.put("prefix", MonitorDeviceData.exchangeName);
data.put("serviceExchange", MonitorCjdService.exchangeName);
data.put("clientExchange", MonitorCjdClient.exchangeName);
cjdMessage.put("mqtt", data);
}
return cjdMessage;
}
/**
* 消息处理
*/
@ServiceActivator(inputChannel = "messageHandle1")
private CjdMessage messageHandle1(CjdMessage message) {
log.info("处理采集端响应结果数据:{}", JSONObject.toJSONString(message));
return null;
}
/**
* 消息响应发送
*/
@ServiceActivator(inputChannel = "responseHandle")
private void responseHandle(String message, @Headers Map<String, Object> headers) {
if (message == null || "".equals(message)) {
return;
}
// 获取来源IP,可以进行IP过滤
String ipAddress = headers.get("ip_address").toString();
// 获取来源Port
Integer port = Integer.parseInt(headers.get("ip_port").toString());
// 创建消息
Message<String> udpMessage = MessageBuilder.withPayload(message).setHeader("ip_address", ipAddress).setHeader("ip_port", port).build();
// 创建发送器
UnicastSendingMessageHandler messageHandler = new UnicastSendingMessageHandler(ipAddress, port);
// 发送消息
messageHandler.handleMessage(udpMessage);
}
public String getCjdIp(String cjdId) {
return ipMap.get(cjdId);
}
}
package org.gidea.scada.monitor.udp.entity;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import lombok.NoArgsConstructor;
@NoArgsConstructor
@Data
public class CjdMessage {
private int code;
private int cmd;
private String ip;
private String bscode;
private JSONObject data;
public CjdMessage(CjdMessage message) {
if (message == null) {
return;
}
this.code = message.code;
this.cmd = message.cmd;
this.ip = message.ip;
this.bscode = message.bscode;
this.data = new JSONObject();
}
public CjdMessage put(String key, Object value) {
if (this.data == null) {
this.data = new JSONObject();
}
this.data.put(key, value);
return this;
}
}
package org.gidea.scada.monitor.udp.utils;
public class CrcUtils {
public static String getCrc(byte[] data) {
int crc = 0xFFFF;
int len = data.length;
for (int i = 0; i < len; i++) {
crc = (crc ^ (data[i] & 0xFF));
for (int j = 0; j < 8; j++) {
crc = (crc & 1) != 0 ? ((crc >> 1) ^ 0xA001) : (crc >> 1);
}
}
byte hi = (byte) ((crc & 0xFF00) >> 8); // 高位置
byte lo = (byte) (crc & 0x00FF); // 低位置
String low = String.format("%02X", lo);
String high = String.format("%02X", hi);
return low + high;
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论