Commit 95490153 by 韩斌

[20240315007-ZY20240415002] 完成控制端与采集端的指令下达和反馈通信

parent d13cfc7c
package org.gidea.scada.monitor.rabbit.config;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import org.gidea.scada.monitor.rabbit.entity.MonitorMessage;
import org.gidea.scada.monitor.utils.MonitorUtils;
import org.gidea.scada.monitor.utils.RabbitExchangeEnum;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.gidea.scada.monitor.utils.RabbitExchangeEnum.MonitorCommandClient;
import static org.gidea.scada.monitor.utils.RabbitExchangeEnum.MonitorCommandService;
@Log4j2
@Configuration
public class MonitorClientConfig {
@Value("${mqtt.client.id}")
private String monitorName;
@Resource
private Channel channel;
private Map<Integer, MonitorMessage> commandHistory = new HashMap<>();
private Map<Integer, MonitorMessage> commandResult = new HashMap<>();
@SneakyThrows
@Bean(name = "commandClientExchange")
public Exchange commandClientExchange(RabbitAdmin rabbitAdmin, SimpleRabbitListenerContainerFactory factory) {
RabbitExchangeEnum exchangeEnum = MonitorCommandClient;
FanoutExchange commandClientExchange = new FanoutExchange(exchangeEnum.exchangeName, false, false);
rabbitAdmin.declareExchange(commandClientExchange);
String queueName = MonitorUtils.getQueueName(exchangeEnum.exchangeName, monitorName);
// 声明请求主队列
Queue requestQueue = new Queue(queueName, false, false, false);
rabbitAdmin.declareQueue(requestQueue);
channel.queueBind(queueName, exchangeEnum.exchangeName, "");
SimpleMessageListenerContainer container = factory.createListenerContainer();
container.setQueueNames(queueName);
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) {
String messageStr = new String(message.getBody());
MonitorMessage monitorMessage = JSONObject.parseObject(messageStr, MonitorMessage.class);
MonitorMessage history = commandHistory.get(monitorMessage.getCmd());
if (history != null) {
commandResult.put(monitorMessage.getCmd(), monitorMessage);
commandHistory.remove(monitorMessage.getCmd());
}
log.info("MonitorService接收数据:{}", messageStr);
}
});
container.start();
return commandClientExchange;
}
public void sendCommand(MonitorMessage message) throws IOException {
channel.basicPublish(MonitorCommandService.exchangeName, "", null, message.toString().getBytes());
commandHistory.put(message.getCmd(), message);
if (commandResult.get(message.getCmd()) != null) {
commandResult.remove(message.getCmd());
}
}
public MonitorMessage getResult(int cmd) {
return commandResult.get(cmd);
}
}
......@@ -21,8 +21,6 @@ import java.util.concurrent.locks.Lock;
@Log4j2
@Component
public class MonitorRabbitListener {
@Resource
private RabbitMessageHandler messageHandler;
@Value("${org.gidea.monitor.name}")
private String monitorName;
......@@ -33,12 +31,13 @@ public class MonitorRabbitListener {
@Resource
private SimpleRabbitListenerContainerFactory containerFactory;
@Resource
private Channel channel;
private void queueListener(String deviceId, SimpleMessageListenerContainer container, String responseQueueName) {
container.setQueueNames(responseQueueName);
private void queueListener(String deviceId, SimpleMessageListenerContainer container, String queueName, RabbitMessageHandler messageHandler) {
container.setQueueNames(queueName);
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) {
......@@ -49,7 +48,11 @@ public class MonitorRabbitListener {
container.start();
}
public void addDeviceListener(String deviceId) {
/**
* @param deviceId 设备标识
* @param messageHandler 设备消息处理
*/
public void addDeviceListener(String deviceId, RabbitMessageHandler messageHandler) {
String queueName = MonitorUtils.getRequestQueueName(deviceId, monitorName);
// 创建监听设备请求队列
SimpleMessageListenerContainer container = queues.get(queueName);
......@@ -57,7 +60,7 @@ public class MonitorRabbitListener {
return;
}
try {
String exchangeName = MonitorUtils.getClientExchangeName(deviceId);
String exchangeName = MonitorUtils.getDeviceExchangeName(deviceId);
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, false, false, null);
channel.queueDeclare(queueName, true, true, true, null);
channel.queueBind(queueName, exchangeName, deviceId);
......@@ -65,7 +68,7 @@ public class MonitorRabbitListener {
throw new RuntimeException(e);
}
container = containerFactory.createListenerContainer();
queueListener(deviceId, container, queueName);
queueListener(deviceId, container, queueName, messageHandler);
queues.put(queueName, container);
}
......
package org.gidea.scada.monitor.rabbit.template;
import org.gidea.scada.monitor.rabbit.entity.MonitorMessage;
import org.gidea.scada.monitor.rabbit.listener.MonitorRabbitListener;
import org.gidea.scada.monitor.utils.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Configuration
public class MonitorRabbitTemplate {
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private MonitorRabbitListener rabbitListener;
/**
* 发送消息
*
* @param deviceId
* @param message
*/
public void sendMessage(String deviceId, MonitorMessage message) {
rabbitListener.addDeviceListener(deviceId);
rabbitTemplate.convertAndSend(Constants.MonitorServiceName, message.toString());
}
}
......@@ -72,13 +72,4 @@ public class RabbitMQConfig {
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
return admin;
}
public JSONObject getRabbitInfo() {
JSONObject rabbitInfo = new JSONObject();
rabbitInfo.put("host", host);
rabbitInfo.put("port", port);
rabbitInfo.put("username", username);
rabbitInfo.put("password", password);
return rabbitInfo;
}
}
package org.gidea.scada.monitor.utils;
public class MonitorUtils {
public static String getClientExchangeName(String deviceId) {
public static String getDeviceExchangeName(String deviceId) {
return RabbitExchangeEnum.MonitorDeviceData.exchangeName + Constants.TopicSpacer + deviceId;
}
public static String getCjdClientExchangeName(String deviceId) {
return RabbitExchangeEnum.MonitorCjdClient.exchangeName + Constants.TopicSpacer + deviceId;
}
public static String getCjdServiceExchangeName(String deviceId) {
return RabbitExchangeEnum.MonitorCjdService.exchangeName + Constants.TopicSpacer + deviceId;
}
public static String getRequestQueueName(String deviceId, String monitorName) {
return monitorName + Constants.TopicSpacer + deviceId;
}
public static String getMqttQueueName(String deviceId ) {
return Constants.MqttTopicName + Constants.TopicSpacer + deviceId;
public static String getQueueName(String deviceId, String monitorName) {
return monitorName + Constants.TopicSpacer + Constants.MqttTopicName + Constants.TopicSpacer + deviceId;
}
public static String getDeviceIdByMqtt(String topicName) {
return topicName.replaceFirst(Constants.MqttTopicName + Constants.TopicSpacer, "");
}
}
package org.gidea.scada.monitor.utils;
public enum RabbitExchangeEnum {
MonitorCommandService("monitor.command.service", "MonitorCommandService"),
MonitorCommandClient("monitor.command.client", "MonitorCommandClient"),
MonitorCjdService("monitor.cjd.service", "MonitorCjdService"),
MonitorCjdClient("monitor.cjd.client", "MonitorCjdClient"),
MonitorDeviceData("monitor.device.data.", "MonitorDeviceData"),
MonitorDeviceJh("monitor.device.jh.", "MonitorDeviceJh"),
MonitorCommandService("monitor.command.service"), MonitorCommandClient("monitor.command.client"), MonitorCjdService("monitor.cjd.service"), MonitorCjdClient("monitor.cjd.client"), MonitorDeviceData("monitor.device.data"), MonitorDeviceJh("monitor.device.jh"),
;
public final String exchangeName;
public final String queueName;
RabbitExchangeEnum(String exchangeName, String queueName) {
RabbitExchangeEnum(String exchangeName) {
this.exchangeName = exchangeName;
this.queueName = queueName;
}
}
{
"14022356771278336":{
"bscode":"14022356771278336",
"cmd":10000,
"code":0,
"ip":"192.168.110.101"
},
"14022362316115456":{
"bscode":"14022362316115456",
"cmd":10000,
"code":0,
"ip":"192.168.110.101"
},
"13987640862798080":{
"bscode":"13987640862798080",
"cmd":10000,
"code":0,
"ip":"192.168.110.101"
},
"13901687193801344":{
"bscode":"13901687193801344",
"cmd":10000,
"code":0,
"ip":"192.168.110.101"
}
}
\ No newline at end of file
......@@ -23,15 +23,15 @@
<artifactId>Scada-Monitor-Common</artifactId>
<version>1.0.0</version>
</dependency>
<!--activeMq-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-mqtt</artifactId>
<version>5.16.2</version> <!-- 使用适当的版本 -->
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version> <!-- 或者最新的版本 -->
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ip</artifactId>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>3.9.10</version> <!-- 根据您的需求选择合适的版本 -->
</dependency>
......
package org.gidea.scada.monitor.entity;
import lombok.Data;
import java.time.LocalDateTime;
@Data
public class ScadaTxsbVo {
/**
* 通讯设备主键
*/
private Long scadaTxsboid;
/**
* 驱动FK
*/
private Long scadaDriverFk;
/**
* 驱动名称
*/
private String drivername;
/**
* 驱动id
*/
private String driverid;
/**
* 驱动类型 网口 串口 都有
*/
private String xytype;
/**
* 设备编号
*/
private String sbbh;
/**
* 设备名称
*/
private String sbmc;
/**
* 备注
*/
private String bz;
/**
* 设备IP
*/
private String sbip;
/**
* 设备名称
*/
private Integer sbport;
/**
* 设备com口
*/
private String com;
/**
* 波特率
*/
private Integer btl;
/**
* 扫描速率(ms)
*/
private Integer smsl;
/**
* 创建时间
*/
private LocalDateTime created;
/**
* 维护人外键
*/
private Long sysLastUpdBy;
/**
* 维护人名称
*/
private String whrmc;
private LocalDateTime sysUpdUpd;
/**
* 接入方式
*/
private String conntype;
/**
* 存储mongo名称
*/
private String savetablename;
/**
* 是否需要校验 对应yhsz
*/
private Boolean needcheck;
/**
* 连接账号
*/
private String uid;
/**
* 连接密码
*/
private String pwd;
/**
* 是否需要个性化配置 对应cpusz
*/
private Boolean needopt;
/**
* cpu机架号
*/
private Integer cpujj;
/**
* cpu插槽号 站号
*/
private Integer cpucc;
/**
* 数据位
*/
private Integer databit;
/**
* 停止位
*/
private Integer stopbit;
/**
* 是否ABCD
*/
private String isabcd;
/**
* 是否颠倒或翻转
*/
private Boolean isfanz;
/**
* 首位从0开始
*/
private Boolean startzero;
/**
* 采集端FK
*/
private Long scadaCjdFk;
/**
* 采集端名称
*/
private String scadaCjdName;
/**
* 项目fk
*/
private Long scadaProjectFk;
/**
* 项目名称
*/
private String scadaProjectName;
/**
* 设备分类外键",dataType = "Long
*/
private Long scadaSbflFk;
/**
* 设备分类名称",dataType = "String
*/
private String scadaSbflName;
/**
* mes设备编码",dataType = "String
*/
private String mesSbid;
}
package org.gidea.scada.monitor.mqtt.config;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.jms.JMSException;
import javax.jms.Session;
import java.util.LinkedHashSet;
import java.util.Set;
@Configuration
public class MonitorMqttConfig {
@Value("${org.gidea.mqtt.url}")
@Value("${org.gidea.mqtt.uri}")
private String url;
@Value("${org.gidea.mqtt.username}")
......@@ -23,38 +19,26 @@ public class MonitorMqttConfig {
@Value("${org.gidea.mqtt.password}")
private String password;
@Bean(name = "mqttListeners")
public Set<String> mqttListeners() {
return new LinkedHashSet<>();
}
@Bean(name = "mqttConnection")
public ActiveMQConnection connection() {
ActiveMQConnectionFactory factory = null;
ActiveMQConnection connection = null;
try {
// 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
factory = new ActiveMQConnectionFactory (username, password, url);
// 第二步:使用ConnectionFactory对象创建一个Connection对象。
connection = (ActiveMQConnection) factory.createConnection();
// 第三步:开启连接,调用Connection对象的start方法。
connection.start();
} catch (JMSException e) {
throw new RuntimeException(e);
public MqttClient mqttClient(String clientId) throws MqttException {
// 创建 MQTT 客户端实例
MqttClient sampleClient = new MqttClient(url, clientId);
// 设置 MQTT 连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
// 你可以在这里设置用户名和密码,如果需要的话
connOpts.setUserName(username);
connOpts.setPassword(password.toCharArray());
// 设置自动重连策略
connOpts.setAutomaticReconnect(true);
// connOpts.setInitialReconnectDelay(1000); // 初始重连延迟 1 秒
connOpts.setMaxReconnectDelay(30000); // 最大重连延迟 30 秒
// 连接 MQTT 服务器
System.out.println("【" + clientId + "】连接 MQTT 服务器: " + url);
sampleClient.connect(connOpts);
System.out.println("【" + clientId + "】连接成功");
return sampleClient;
}
return connection;
}
@Bean(name = "mqttSession")
public Session session(ActiveMQConnection connection) {
Session session = null;
try {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
throw new RuntimeException(e);
}
return session;
}
}
package org.gidea.scada.monitor.mqtt.listener;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.*;
import org.gidea.scada.monitor.mqtt.config.MonitorMqttConfig;
import org.gidea.scada.monitor.udp.entity.CjdMessage;
import org.gidea.scada.monitor.udp.utils.CrcUtils;
import org.gidea.scada.monitor.utils.RabbitExchangeEnum;
@Log4j2
public class CjdServiceMqtt {
private String topicName;
private MonitorMqttConfig mqttConfig;
private Channel channel;
private MqttClient sampleClient;
public CjdServiceMqtt(String topic, MonitorMqttConfig mqttConfig, Channel channel) {
this.mqttConfig = mqttConfig;
this.topicName = topic;
this.channel = channel;
// 创建 MQTT 客户端实例
try {
sampleClient = mqttConfig.mqttClient("MonitorMqttService_" + topicName);
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
public void publish(String topic, String message) {
try {
sampleClient.publish(topic, new MqttMessage(message.getBytes()));
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
public void listener() throws MqttException {
// 订阅以 "mqtt." 开始的所有主题
int qos = 2; // 设置服务质量级别
sampleClient.subscribe(topicName, qos);
// 设置回调以接收消息
sampleClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost. Reason: " + cause);
}
/**
* 接收到采集端返回的设备实时数据
* @param topic name of the topic on the message was published to
* @param message the actual message.
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String msg = new String(message.getPayload());
if (msg.endsWith("\n")) {
msg = msg.substring(0, msg.length() - 1);
}
String tempCrc = msg.substring(0, 4);
String tempMessage = msg.substring(5);
String crc = CrcUtils.getCrc(tempMessage.getBytes());
// // 数据校验
if (crc.equalsIgnoreCase(tempCrc)) {
CjdMessage cjdMessage = JSONObject.parseObject(tempMessage, CjdMessage.class);
if (cjdMessage.getCode() == 0) {
log.info("【{}】主动上报:{}", topic, msg);
} else if (cjdMessage.getCode() == 1) {
log.info("【{}】响应指令:{}", topic, msg);
}
channel.basicPublish(RabbitExchangeEnum.MonitorCommandClient.exchangeName, "", null, JSONObject.toJSONString(cjdMessage).getBytes());
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("Delivery complete for token: " + token);
}
});
}
}
package org.gidea.scada.monitor.mqtt.listener;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.*;
import org.gidea.scada.monitor.entity.ScadaTxsbVo;
import org.gidea.scada.monitor.mqtt.config.MonitorMqttConfig;
import org.gidea.scada.monitor.rabbit.entity.MonitorMessage;
import org.gidea.scada.monitor.udp.utils.CrcUtils;
import org.gidea.scada.monitor.utils.EmptyUtil;
import org.gidea.scada.monitor.utils.RabbitExchangeEnum;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
import java.util.stream.Collectors;
@Log4j2
@Component
public class MonitorMqttListener {
Set<String> txsb = new HashSet<>();
@Resource
private Channel channel;
@Value("${org.gidea.scada.txsb.uri}")
private String txsbUri;
@Resource
private MonitorMqttConfig mqttConfig;
@Resource
private RestTemplateBuilder restTemplate;
@PostConstruct
@Scheduled(cron = "0/10 * * * * ?")
public void queryTxsb() {
RestTemplate template = restTemplate.build();
List<ScadaTxsbVo> data = null;
try {
data = template.postForEntity(txsbUri, null, JSONObject.class).getBody().getObject("data", new TypeReference<List<ScadaTxsbVo>>() {
});
} catch (RestClientException e) {
return;
}
List<String> bss = data.parallelStream().filter(o -> !txsb.contains(o.getSavetablename())).map(ScadaTxsbVo::getSavetablename).collect(Collectors.toList());
if (!EmptyUtil.isEmpty(bss)) {
txsb.addAll(bss);
bss.parallelStream().forEach(bs -> {
try {
new RealtimeMqtt("mqtt." + bs, mqttConfig, channel).listener();
new RealtimeMqtt("mqtt." + bs + ".alarm", mqttConfig, channel).listener();
} catch (MqttException e) {
throw new RuntimeException(e);
}
});
}
}
@Bean
public MqttClient serverLister() throws MqttException {
// 创建 MQTT 客户端实例
MqttClient sampleClient = mqttConfig.mqttClient("MonitorMqttServer");
// 订阅以 "mqtt." 开始的所有主题
String wildcardTopic = "monitor.mqtt.server";
int qos = 2; // 设置服务质量级别
sampleClient.subscribe(wildcardTopic, qos);
// 设置回调以接收消息
sampleClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost. Reason: " + cause);
}
/**
* 接收到采集端返回的响应数据
* @param topic name of the topic on the message was published to
* @param message the actual message.
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String messageStr = new String(message.getPayload());
if (messageStr.length() > 5) {
String str = messageStr.substring(5);
if ((messageStr.substring(0, 4).equalsIgnoreCase(CrcUtils.getCrc(str.getBytes())))) {
MonitorMessage monitorMessage = JSONObject.parseObject(str, MonitorMessage.class);
channel.basicPublish(RabbitExchangeEnum.MonitorCommandClient.exchangeName, "", null, JSONObject.toJSONString(monitorMessage).getBytes());
log.info("接收到采集端消息:{}", messageStr);
return;
}
}
log.info("接收到采集端不合法消息:{}", messageStr);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("Delivery complete for token: " + token);
}
});
return sampleClient;
}
}
package org.gidea.scada.monitor.mqtt.listener;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.*;
import org.gidea.scada.monitor.mqtt.config.MonitorMqttConfig;
import org.gidea.scada.monitor.utils.MonitorUtils;
import java.io.IOException;
@Log4j2
public class RealtimeMqtt {
private String topicName;
private String exchangeName;
private MonitorMqttConfig mqttConfig;
private Channel channel;
public RealtimeMqtt(String topic, MonitorMqttConfig mqttConfig, Channel channel) {
this.mqttConfig = mqttConfig;
this.topicName = topic;
this.channel = channel;
String deviceId = MonitorUtils.getDeviceIdByMqtt(topic);
exchangeName = MonitorUtils.getDeviceExchangeName(deviceId);
try {
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, false, false, null);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void listener() throws MqttException {
// 创建 MQTT 客户端实例
MqttClient sampleClient = mqttConfig.mqttClient("MonitorMqttRealtime_" + topicName);
// 订阅以 "mqtt." 开始的所有主题
int qos = 2; // 设置服务质量级别
sampleClient.subscribe(topicName, qos);
// 设置回调以接收消息
sampleClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost. Reason: " + cause);
}
/**
* 接收到采集端返回的设备实时数据
* @param topic name of the topic on the message was published to
* @param message the actual message.
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String msg = new String(message.getPayload());
log.info("【{}】接收到实时数据:{}", topic, msg);
channel.basicPublish(exchangeName, "", null, message.getPayload());
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("Delivery complete for token: " + token);
}
});
}
}
......@@ -8,6 +8,7 @@ import org.gidea.scada.monitor.rabbit.listener.CjdListenerHandler;
import org.gidea.scada.monitor.rabbit.listener.CommandListenerHandler;
import org.gidea.scada.monitor.udp.entity.CjdMessage;
import org.gidea.scada.monitor.udp.utils.CrcUtils;
import org.gidea.scada.monitor.utils.RabbitExchangeEnum;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Message;
......@@ -36,14 +37,16 @@ public class MonitorRabbitMQConfig {
@SneakyThrows
@Bean(name = "commandServiceExchange")
public Exchange commandServiceExchange(RabbitAdmin rabbitAdmin, SimpleRabbitListenerContainerFactory factory) {
FanoutExchange commandServiceExchange = new FanoutExchange(MonitorCommandService.exchangeName, false, false);
RabbitExchangeEnum exchangeEnum = MonitorCommandService;
FanoutExchange commandServiceExchange = new FanoutExchange(exchangeEnum.exchangeName, false, false);
rabbitAdmin.declareExchange(commandServiceExchange);
// 声明请求主队列
Queue requestQueue = new Queue(MonitorCommandService.queueName, false, false, false);
Queue requestQueue = new Queue(exchangeEnum.exchangeName, false, false, false);
rabbitAdmin.declareQueue(requestQueue);
channel.queueBind(MonitorCommandService.queueName, MonitorCommandService.exchangeName, "");
channel.queueBind(exchangeEnum.exchangeName, exchangeEnum.exchangeName, "");
SimpleMessageListenerContainer container = factory.createListenerContainer();
container.setQueueNames(MonitorCommandService.queueName);
container.setQueueNames(exchangeEnum.exchangeName);
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) {
......@@ -64,41 +67,4 @@ public class MonitorRabbitMQConfig {
return commandClientExchange;
}
@SneakyThrows
@Bean(name = "cjdServiceExchange")
public Exchange cjdServiceExchange(RabbitAdmin rabbitAdmin, SimpleRabbitListenerContainerFactory factory) {
FanoutExchange cjdServiceExchange = new FanoutExchange(MonitorCjdService.exchangeName, false, false);
rabbitAdmin.declareExchange(cjdServiceExchange);
// 声明请求主队列
Queue requestQueue = new Queue(MonitorCjdService.queueName, false, false, false);
rabbitAdmin.declareQueue(requestQueue);
channel.queueBind(MonitorCjdService.queueName, MonitorCjdService.exchangeName, "");
SimpleMessageListenerContainer container = factory.createListenerContainer();
container.setQueueNames(MonitorCjdService.queueName);
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) {
String messageStr = new String(message.getBody());
log.info("CjdService接收数据:{}",messageStr);
if (messageStr.length() > 5) {
String str = messageStr.substring(5);
if ((messageStr.substring(0, 4).equalsIgnoreCase(CrcUtils.getCrc(str.getBytes())))) {
CjdMessage cjdMessage = JSONObject.parseObject(str, CjdMessage.class);
cjdListenerHandler.handler(cjdMessage);
}
}
}
});
container.start();
return cjdServiceExchange;
}
@SneakyThrows
@Bean(name = "cjdClientExchange")
public Exchange cjdClientExchange(RabbitAdmin rabbitAdmin) {
FanoutExchange cjdClientExchange = new FanoutExchange(MonitorCjdClient.exchangeName, false, false);
rabbitAdmin.declareExchange(cjdClientExchange);
return cjdClientExchange;
}
}
......@@ -3,10 +3,12 @@ package org.gidea.scada.monitor.rabbit.listener;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.gidea.scada.monitor.mqtt.listener.CjdServiceMqtt;
import org.gidea.scada.monitor.rabbit.entity.MonitorMessage;
import org.gidea.scada.monitor.udp.config.UdpServerConfig;
import org.gidea.scada.monitor.udp.entity.CjdMessage;
import org.gidea.scada.monitor.udp.utils.CrcUtils;
import org.gidea.scada.monitor.utils.MonitorUtils;
import org.gidea.scada.monitor.utils.RabbitExchangeEnum;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Component;
......@@ -24,7 +26,7 @@ public class CommandListenerHandler {
public void handler(String message) {
MonitorMessage monitorMessage = JSONObject.parseObject(message, MonitorMessage.class);
String cjdIp = udpServerConfig.getCjdIp(monitorMessage.getBscode());
CjdMessage cjdIp = udpServerConfig.getCjdIp(monitorMessage.getBscode());
if (cjdIp == null) {
JSONObject data = new JSONObject();
data.put("time", System.currentTimeMillis());
......@@ -39,13 +41,22 @@ public class CommandListenerHandler {
} else {
CjdMessage cjdMessage = new CjdMessage();
BeanUtils.copyProperties(monitorMessage, cjdMessage);
cjdMessage.setIp(cjdIp);
cjdMessage.setIp(cjdIp.getIp());
cjdMessage.setCode(0);
String string = JSONObject.toJSONString(cjdMessage);
String msg = CrcUtils.getCrc(string.getBytes()) + "=" + string;
try {
channel.basicPublish(RabbitExchangeEnum.MonitorCjdClient.exchangeName, "", null, msg.getBytes());
} catch (IOException e) {
CjdServiceMqtt mqtt = udpServerConfig.getMqtt(cjdMessage.getBscode());
if (mqtt != null) {
mqtt.publish(MonitorUtils.getCjdClientExchangeName(cjdMessage.getBscode()), msg);
} else {
JSONObject data = new JSONObject();
data.put("time", System.currentTimeMillis());
data.put("result", "采集端未连接!");
monitorMessage.setData(data);
channel.basicPublish(RabbitExchangeEnum.MonitorCommandClient.exchangeName, "", null, JSONObject.toJSONString(monitorMessage).getBytes("UTF-8"));
}
} catch (Exception e) {
log.error("响应消息失败!{}", msg, e);
throw new RuntimeException(e);
}
......
......@@ -2,47 +2,57 @@ package org.gidea.scada.monitor.udp.config;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.rabbitmq.client.Channel;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.datagram.DatagramSocket;
import io.vertx.core.net.SocketAddress;
import lombok.extern.slf4j.Slf4j;
import org.gidea.scada.monitor.config.RabbitMQConfig;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.gidea.scada.monitor.mqtt.config.MonitorMqttConfig;
import org.gidea.scada.monitor.mqtt.listener.CjdServiceMqtt;
import org.gidea.scada.monitor.udp.entity.CjdMessage;
import org.gidea.scada.monitor.udp.utils.CrcUtils;
import org.gidea.scada.monitor.utils.MonitorUtils;
import org.gidea.scada.monitor.utils.RabbitExchangeEnum;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.Filter;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.ip.dsl.Udp;
import org.springframework.integration.ip.udp.UnicastSendingMessageHandler;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.io.*;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import static org.gidea.scada.monitor.utils.RabbitExchangeEnum.*;
@Slf4j
@Configuration
public class UdpServerConfig {
@Value("${upd.server.port:23005}")
private Integer listeningPort;
@Value("${org.gidea.mqtt.uri}")
private String url;
@Value("${org.gidea.mqtt.username}")
private String username;
@Value("${org.gidea.mqtt.password}")
private String password;
@Resource
private MonitorMqttConfig mqttConfig;
@Resource
private RabbitMQConfig rabbitMQConfig;
private Channel channel;
private Map<String, String> ipMap = new LinkedHashMap<>();
private Map<String, CjdMessage> ipMap = new LinkedHashMap<>();
private String cjdFile = System.getProperty("user.dir") + "/config/cjd.json";
@PostConstruct
private void init() {
private Map<String, CjdServiceMqtt> mqttMap = new HashMap<>();
@Bean
public DatagramSocket socket() {
if (new File(cjdFile).exists()) {
String line;
try {
......@@ -56,96 +66,78 @@ public class UdpServerConfig {
jsonData.append(line);
}
reader.close();
Map<String, String> data = JSONObject.parseObject(jsonData.toString(), new TypeReference<Map<String, String>>() {
Map<String, CjdMessage> data = JSONObject.parseObject(jsonData.toString(), new TypeReference<Map<String, CjdMessage>>() {
});
ipMap.putAll(data);
} catch (IOException e) {
for (CjdMessage cjdMessage : data.values()) {
CjdServiceMqtt serviceMqtt = new CjdServiceMqtt(MonitorUtils.getCjdServiceExchangeName(cjdMessage.getBscode()), mqttConfig, channel);
try {
serviceMqtt.listener();
} catch (MqttException e) {
throw new RuntimeException(e);
}
mqttMap.put(cjdMessage.getBscode(), serviceMqtt);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
/**
* UDP消息接收服务
*/
@Bean
public IntegrationFlow integrationFlow() {
log.info("UDP服务启动成功,端口号为: {}", listeningPort);
return IntegrationFlows.from(Udp.inboundAdapter(listeningPort)).channel("udpChannel").get();
}
/**
* 转换器
*/
@Transformer(inputChannel = "udpChannel", outputChannel = "udpFilter")
public CjdMessage transformer(@Payload byte[] payload) {
String message = new String(payload);
// todo 进行数据转换
try {
}
Vertx vertx = Vertx.vertx();
DatagramSocket socket = vertx.createDatagramSocket();
socket.listen(23005, "192.168.110.51", result -> {
if (result.succeeded()) {
socket.handler(packet -> {
String message = packet.data().toString();
if (message.endsWith("\n")) {
message = message.substring(0, message.length() - 1);
}
String tempCrc = message.substring(0, 4);
String tempMessage = message.substring(5);
String crc = CrcUtils.getCrc(tempMessage.getBytes());
// 数据校验
// // 数据校验
if (crc.equalsIgnoreCase(tempCrc)) {
CjdMessage cjdMessage = JSONObject.parseObject(tempMessage, CjdMessage.class);
return cjdMessage;
}
} catch (Exception e) {
log.error("解析消息失败!", e);
}
log.info("无法解析消息:{}", message);
return null;
switch (cjdMessage.getCode()) {
case 0:
CjdMessage handle0 = messageHandle0(cjdMessage);
SocketAddress sender = packet.sender();
String string = JSONObject.toJSONString(handle0);
String s = CrcUtils.getCrc(string.getBytes()) + "=" + string;
socket.send(Buffer.buffer(s), sender.port(), sender.host(), r -> {
if (r.succeeded()) {
System.out.println("【UDP服务】成功推送消息:" + s);
} else {
System.err.println("Failed to send reply: " + r.cause().getMessage());
}
/**
* 过滤器
*/
@Filter(inputChannel = "udpFilter", outputChannel = "udpRouter")
public boolean filter(CjdMessage message) {
if (message == null) {
return false;
});
return;
case 1:
messageHandle1(cjdMessage);
return;
default:
messageHandle(cjdMessage);
return;
}
return true;
}
/**
* 路由分发处理器:可以进行分发消息被那个处理器进行处理
*/
@Router(inputChannel = "udpRouter")
public String router(CjdMessage message) {
// todo 筛选,走那个处理器
switch (message.getCode()) {
case 0:
return "messageHandle0";
case 1:
return "messageHandle1";
log.info("无法解析消息:{}", message);
});
} else {
System.out.println("Listen failed" + result.cause());
}
return "messageHandle";
});
return socket;
}
/**
* 消息处理
*/
@ServiceActivator(inputChannel = "messageHandle")
private void messageHandle(CjdMessage message) {
log.info("统一消息处理:{}", JSONObject.toJSONString(message));
}
/**
* 消息处理
*/
@ServiceActivator(inputChannel = "messageHandle0", outputChannel = "responseHandle")
private CjdMessage messageHandle0(CjdMessage message) {
log.info("处理采集端主动推送数据:{}", JSONObject.toJSONString(message));
CjdMessage cjdMessage = new CjdMessage(message);
cjdMessage.setCode(1);
if (10000 == message.getCmd()) {
ipMap.put(message.getBscode(), message.getIp());
ipMap.put(message.getBscode(), message);
File file = new File(cjdFile);
if (!file.exists()) {
try {
......@@ -167,46 +159,38 @@ public class UdpServerConfig {
} catch (IOException e) {
e.printStackTrace();
}
JSONObject data = rabbitMQConfig.getRabbitInfo();
data.put("prefix", MonitorDeviceData.exchangeName);
data.put("serviceExchange", MonitorCjdService.exchangeName);
data.put("clientExchange", MonitorCjdClient.exchangeName);
cjdMessage.put("mqtt", data);
JSONObject mqtt = new JSONObject();
mqtt.put("url", url);
mqtt.put("username", username);
mqtt.put("password", password);
mqtt.put("service", RabbitExchangeEnum.MonitorCjdService.exchangeName);
mqtt.put("client", RabbitExchangeEnum.MonitorCjdClient.exchangeName);
cjdMessage.put("mqtt", mqtt);
CjdServiceMqtt serviceMqtt = mqttMap.get(cjdMessage.getBscode());
try {
if (serviceMqtt == null) {
serviceMqtt = new CjdServiceMqtt(MonitorUtils.getCjdServiceExchangeName(cjdMessage.getBscode()), mqttConfig, channel);
serviceMqtt.listener();
mqttMap.put(cjdMessage.getBscode(), serviceMqtt);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return cjdMessage;
}
/**
* 消息处理
*/
@ServiceActivator(inputChannel = "messageHandle1")
private CjdMessage messageHandle1(CjdMessage message) {
log.info("处理采集端响应结果数据:{}", JSONObject.toJSONString(message));
return null;
}
/**
* 消息响应发送
*/
@ServiceActivator(inputChannel = "responseHandle")
private void responseHandle(String message, @Headers Map<String, Object> headers) {
if (message == null || "".equals(message)) {
return;
}
// 获取来源IP,可以进行IP过滤
String ipAddress = headers.get("ip_address").toString();
// 获取来源Port
Integer port = Integer.parseInt(headers.get("ip_port").toString());
// 创建消息
Message<String> udpMessage = MessageBuilder.withPayload(message).setHeader("ip_address", ipAddress).setHeader("ip_port", port).build();
// 创建发送器
UnicastSendingMessageHandler messageHandler = new UnicastSendingMessageHandler(ipAddress, port);
// 发送消息
messageHandler.handleMessage(udpMessage);
public CjdServiceMqtt getMqtt(String bscode) {
return mqttMap.get(bscode);
}
public String getCjdIp(String cjdId) {
public CjdMessage getCjdIp(String cjdId) {
return ipMap.get(cjdId);
}
}
......
org.gidea.scada.txsb.uri=http://192.168.110.51:12000/scada-business/bm/scadasbgl/getAllTxsb
org.gidea.rabbitmq.host=192.168.110.121
org.gidea.rabbitmq.port=5672
org.gidea.rabbitmq.username=admin
org.gidea.rabbitmq.password=qwerty
org.gidea.mqtt.url=failover:(tcp://192.168.110.107:19000)?initialReconnectDelay=1000&maxReconnectDelay=30000
org.gidea.mqtt.username=guser
org.gidea.mqtt.password=guser.soft
org.gidea.mqtt.uri=tcp://192.168.110.121:1883
org.gidea.mqtt.username=admin
org.gidea.mqtt.password=qwerty
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论