Commit 25a0d0d3 by 韩斌

[20240315007-ZY20240401006]MES端实现设备MQ定向订阅和推送

parent b0c25a19
package org.gidea.scada.monitor.rabbit.listener;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.gidea.scada.monitor.rabbit.handler.RabbitMessageHandler;
import org.gidea.scada.monitor.utils.Constants;
import org.gidea.scada.monitor.utils.EmptyUtil;
import org.gidea.scada.monitor.utils.RabbitUtils;
import org.gidea.scada.monitor.utils.MonitorUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
......@@ -45,30 +45,35 @@ public class MonitorRabbitListener {
@Override
public void onMessage(Message message, Channel channel) {
String messageStr = new String(message.getBody());
JSONObject data = JSONObject.parseObject(messageStr);
String client = data.getString("Client");
if (EmptyUtil.isEmpty(client) || client.equals(monitorName)) {
messageHandler.handleMessage(deviceId, messageStr);
if (messageStr.startsWith("{") && messageStr.endsWith("}")) {
JSONObject data = JSONObject.parseObject(messageStr);
String client = data.getString("Client");
if (EmptyUtil.isEmpty(client) || client.equals(monitorName)) {
messageHandler.handleMessage(deviceId, messageStr);
}
return;
}
messageHandler.handleMessage(deviceId, messageStr);
}
});
container.start();
}
public void addDeviceListener(String deviceId) {
String queueName = RabbitUtils.getRequestQueueName(deviceId, monitorName);
String queueName = MonitorUtils.getRequestQueueName(deviceId, monitorName);
// 创建监听设备请求队列
SimpleMessageListenerContainer container = queues.get(queueName);
if (container != null) {
return;
}
try {
String exchangeName = MonitorUtils.getClientExchangeName(deviceId);
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, false, true, null);
channel.queueDeclare(queueName, true, true, true, null);
channel.queueBind(queueName, Constants.MonitorClientExchangeName, monitorName);
channel.queueBind(queueName, exchangeName, deviceId);
} catch (IOException e) {
throw new RuntimeException(e);
}
container = containerFactory.createListenerContainer();
queueListener(deviceId, container, queueName);
queues.put(queueName, container);
......
......@@ -2,7 +2,6 @@ package org.gidea.scada.monitor.rabbit.template;
import org.gidea.scada.monitor.rabbit.listener.MonitorRabbitListener;
import org.gidea.scada.monitor.utils.Constants;
import org.gidea.scada.monitor.utils.RabbitUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
......
package org.gidea.scada.monitor.rabbit.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
@Data
public class DeviceTopicMessage {
/**
* 设备标识
*/
private String deviceId;
}
package org.gidea.scada.monitor.rabbit.entity;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
@Data
@NoArgsConstructor
public class MonitorMessage {
/**
* 客户端ID
*/
private String clientId;
/**
* 0: 下发指令
* 1: 设备绑定
* 采集端ID
*/
private String collectId;
/**
* 设备标识
*/
private int type = 0;
private String deviceId;
/**
* 数据
* 下发命令
*/
private String data;
private String command;
/**
* 下发命令时间
*/
private LocalDateTime issueTime;
/**
* 命令响应结果
*/
private String response;
/**
* 响应时间
*/
private LocalDateTime responseTime;
public MonitorMessage(String clientId, String collectId, String deviceId, String command) {
this.clientId = clientId;
this.collectId = collectId;
this.deviceId = deviceId;
this.command = command;
this.issueTime = LocalDateTime.now();
}
public MonitorMessage(String clientId, String deviceId, String command) {
this.clientId = clientId;
this.deviceId = deviceId;
this.command = command;
this.issueTime = LocalDateTime.now();
}
@Override
public String toString() {
return JSONObject.toJSONString(this);
}
}
package org.gidea.scada.monitor.rabbit.entity;
import lombok.Data;
@Data
public class Result<R> {
private int code;
private String message;
private R data;
public static <R> Result<R> success(R data) {
Result<R> result = new Result<>();
result.setCode(0);
result.setMessage("success");
result.setData(data);
return result;
}
public static <R> Result<R> fail(String message) {
Result<R> result = new Result<>();
result.setCode(-1);
result.setMessage(message);
return result;
}
public boolean isSuccess() {
return this.code == 0;
}
}
......@@ -7,10 +7,13 @@ public class Constants {
*/
public static final String MonitorServiceName = "MonitorServiceTopic";
public static final String TopicSpacer = ":";
public static final String TopicSpacer = ".";
public static final String MonitorServiceExchangeName = "monitor.service";
public static final String MonitorClientExchangeName = "monitor.client";
public static final String MqttTopicName = "mqtt";
}
package org.gidea.scada.monitor.utils;
public class MonitorUtils {
public static String getClientExchangeName(String deviceId) {
return Constants.MonitorClientExchangeName + Constants.TopicSpacer + deviceId;
}
public static String getRequestQueueName(String deviceId, String monitorName) {
return monitorName + Constants.TopicSpacer + deviceId;
}
public static String getMqttQueueName(String deviceId ) {
return Constants.MqttTopicName + Constants.TopicSpacer + deviceId;
}
public static String getDeviceIdByMqtt(String topicName) {
return topicName.replaceFirst(Constants.MqttTopicName + Constants.TopicSpacer, "");
}
}
package org.gidea.scada.monitor.utils;
public class RabbitUtils {
public static String getRequestQueueName(String deviceId, String monitorName) {
return monitorName + Constants.TopicSpacer + deviceId;
}
}
......@@ -30,11 +30,6 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--activeMq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
......@@ -50,4 +45,25 @@
</dependency>
</dependencies>
<distributionManagement>
<repository>
<id>nexus-releases</id>
<name>Releases</name>
<url>http://115.29.202.246:8288/repository/maven-releases/</url>
</repository>
<snapshotRepository>
<id>nexus-snapshots</id>
<name>Snapshot</name>
<url>http://115.29.202.246:8288/repository/maven-snapshots/</url>
</snapshotRepository>
</distributionManagement>
<!-- 配置私库地址 -->
<repositories>
<repository>
<id>nexus</id>
<name>nexus</name>
<url>http://115.29.202.246:8288/repository/maven-public/</url>
</repository>
</repositories>
</project>
\ No newline at end of file
......@@ -23,6 +23,12 @@
<artifactId>Scada-Monitor-Common</artifactId>
<version>1.0.0</version>
</dependency>
<!--activeMq-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-mqtt</artifactId>
<version>5.16.2</version> <!-- 使用适当的版本 -->
</dependency>
</dependencies>
......
......@@ -3,7 +3,9 @@ package org.gidea.scada.monitor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;
@EnableScheduling
@SpringBootApplication
public class MonitorServerApplication {
public static void main(String[] args) {
......
package org.gidea.scada.monitor.mqtt.config;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.jms.JMSException;
import javax.jms.Session;
import java.util.LinkedHashSet;
import java.util.Set;
@Configuration
public class MonitorMqttConfig {
@Value("${org.gidea.mqtt.url}")
private String url;
@Value("${org.gidea.mqtt.username}")
private String username;
@Value("${org.gidea.mqtt.password}")
private String password;
@Bean(name = "mqttListeners")
public Set<String> mqttListeners() {
return new LinkedHashSet<>();
}
@Bean(name = "mqttConnection")
public ActiveMQConnection connection() {
ActiveMQConnectionFactory factory = null;
ActiveMQConnection connection = null;
try {
// 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
factory = new ActiveMQConnectionFactory (username, password, url);
// 第二步:使用ConnectionFactory对象创建一个Connection对象。
connection = (ActiveMQConnection) factory.createConnection();
// 第三步:开启连接,调用Connection对象的start方法。
connection.start();
} catch (JMSException e) {
throw new RuntimeException(e);
}
return connection;
}
@Bean(name = "mqttSession")
public Session session(ActiveMQConnection connection) {
Session session = null;
try {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
throw new RuntimeException(e);
}
return session;
}
}
package org.gidea.scada.monitor.mqtt.handler;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.gidea.scada.monitor.utils.Constants;
import org.gidea.scada.monitor.rabbit.entity.MonitorMessage;
import org.gidea.scada.monitor.utils.MonitorUtils;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import javax.jms.*;
import java.time.LocalDateTime;
@Component
public class MqttHandler {
@Resource
private Channel channel;
@Resource(name = "mqttSession")
private Session session;
public void handler(String message) {
JSONObject msg = JSONObject.parseObject(message);
msg.put("result", "接收成功");
MonitorMessage msg = JSONObject.parseObject(message, MonitorMessage.class);
msg.setResponse("响应成功");
msg.setResponseTime(LocalDateTime.now());
// 创建目标
Destination destination = null;
try {
channel.basicPublish(Constants.MonitorClientExchangeName,"",null, msg.toString().getBytes("UTF-8"));
} catch (IOException e) {
destination = session.createTopic(MonitorUtils.getMqttQueueName(msg.getDeviceId()));
// 创建生产者
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 创建消息
BytesMessage textMessage = session.createBytesMessage();
textMessage.writeBytes(msg.toString().getBytes());
// 发送消息
producer.send(textMessage);
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
;
}
}
package org.gidea.scada.monitor.mqtt.listener;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.advisory.DestinationSource;
import org.apache.activemq.command.ActiveMQTopic;
import org.gidea.scada.monitor.utils.MonitorUtils;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.jms.*;
import java.util.Set;
@Log4j2
@Component
public class MonitorMqttListener {
@Resource(name = "mqttConnection")
private ActiveMQConnection connection;
@Resource(name = "mqttSession")
private Session session;
@Resource(name = "mqttListeners")
private Set<String> mqttListeners;
@Resource
private Channel channel;
@Scheduled(fixedDelay = 10000)
public void listener() throws Exception {
DestinationSource destinationSource = connection.getDestinationSource();
//获取有多少个主题
Set<ActiveMQTopic> topics = destinationSource.getTopics();
for (ActiveMQTopic topic : topics) {
if (!mqttListeners.contains(topic.getTopicName())) {
String deviceId = MonitorUtils.getDeviceIdByMqtt(topic.getTopicName());
channel.exchangeDeclare(MonitorUtils.getClientExchangeName(deviceId), BuiltinExchangeType.FANOUT, false, true, null);
mqttListeners.add(topic.getTopicName());
//队列等待最多为1,后来的会替换新的
Destination destination = session.createTopic(topic.getTopicName() + "?consumer.prefetchSize=1");
// 第六步:使用Session对象创建一个Consumer对象。
MessageConsumer consumer = session.createConsumer(destination);
// 第七步:接收消息。
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
BytesMessage bm = (BytesMessage) message;
byte[] bys = null;
try {
//获取消息内容
bys = new byte[(int) bm.getBodyLength()];
bm.readBytes(bys);
String msg = new String(bys, "utf-8");
// 第八步:推送消息。
channel.basicPublish(MonitorUtils.getClientExchangeName(deviceId), deviceId, null, msg.toString().getBytes("UTF-8"));
} catch (Exception e) {
log.error("推送消息失败!", e);
}
}
});
}
}
}
}
......@@ -3,32 +3,30 @@ package org.gidea.scada.monitor.rabbit.config;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import org.gidea.scada.monitor.utils.Constants;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Configuration
public class MonitorRabbitMQConfig {
@Autowired
@Resource
Channel channel;
@SneakyThrows
@Bean
public FanoutExchange exchange(RabbitAdmin rabbitAdmin) {
FanoutExchange exchange1 = new FanoutExchange(Constants.MonitorServiceExchangeName, true, false);
FanoutExchange exchange2 = new FanoutExchange(Constants.MonitorClientExchangeName, true, false);
rabbitAdmin.declareExchange(exchange1);
rabbitAdmin.declareExchange(exchange2);
// // 声明请求主队列
public Exchange exchange(RabbitAdmin rabbitAdmin) {
FanoutExchange serviceExchange = new FanoutExchange(Constants.MonitorServiceExchangeName, true, true);
rabbitAdmin.declareExchange(serviceExchange);
// 声明请求主队列
Queue requestQueue = new Queue(Constants.MonitorServiceName, true, true, true);
rabbitAdmin.declareQueue(requestQueue);
// channel.queueDeclare(Constants.MonitorServiceName, true, true, true, null);
channel.queueBind(Constants.MonitorServiceName, Constants.MonitorServiceExchangeName, "");
return exchange2;
channel.queueBind(Constants.MonitorServiceName, Constants.MonitorServiceExchangeName, Constants.MonitorServiceExchangeName);
return serviceExchange;
}
}
package org.gidea.scada.monitor.rabbit.controller;
import org.gidea.scada.monitor.rabbit.entity.Result;
import org.gidea.scada.monitor.rabbit.service.MonitorRabbitService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@RequestMapping("/monitor/rabbit")
public class MonitorRabbitController {
@Resource
private MonitorRabbitService rabbitService;
@GetMapping("/applyConnection")
public Result<String> applyConnection(@RequestParam String deviceId) {
return rabbitService.applyConnection(deviceId);
}
}
package org.gidea.scada.monitor.rabbit.listener;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.gidea.scada.monitor.mqtt.handler.MqttHandler;
import org.gidea.scada.monitor.rabbit.entity.MonitorMessage;
import org.gidea.scada.monitor.utils.Constants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
@Log4j2
@Component
public class MonitorRabbitListener {
@Resource
private MqttHandler mqttHandler;
@Resource
private Channel channel;
@RabbitListener(queues = Constants.MonitorServiceName)
public void deviceTopic(String message) {
......
package org.gidea.scada.monitor.rabbit.service;
import org.gidea.scada.monitor.rabbit.entity.Result;
public interface MonitorRabbitService {
Result<String> applyConnection(String deviceId);
}
package org.gidea.scada.monitor.rabbit.service.impl;
import org.gidea.scada.monitor.rabbit.entity.Result;
import org.gidea.scada.monitor.rabbit.service.MonitorRabbitService;
import org.gidea.scada.monitor.utils.MonitorUtils;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class MonitorRabbitServiceImpl implements MonitorRabbitService {
@Resource
private RabbitAdmin rabbitAdmin;
@Override
public Result<String> applyConnection(String deviceId) {
String exchangeName = MonitorUtils.getClientExchangeName(deviceId);
FanoutExchange clientExchange = new FanoutExchange(exchangeName, false, true);
rabbitAdmin.declareExchange(clientExchange);
return Result.success(exchangeName);
}
}
......@@ -2,3 +2,6 @@ org.gidea.rabbitmq.host=192.168.110.121
org.gidea.rabbitmq.port=5672
org.gidea.rabbitmq.username=admin
org.gidea.rabbitmq.password=qwerty
org.gidea.mqtt.url=failover:(tcp://192.168.110.107:19000)?initialReconnectDelay=1000&maxReconnectDelay=30000
org.gidea.mqtt.username=guser
org.gidea.mqtt.password=guser.soft
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论