Commit b0c25a19 by 韩斌

[20240315007-ZY20240401006]MES端实现设备MQ定向订阅和推送

parents
.idea/
**/.idea/
log/
**/log/
*.log
**/*.log
*.out
**/*.out
logs/
**/logs/
target/
**/target/
.DS_Store
**/.DS_Store
*.iml
**/*.iml
*.project
**/*.project
*.settings
**/*.settings
*.class
**/*.class
*.classpath
**/*.classpath
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.gidea</groupId>
<artifactId>Scada-Monitor</artifactId>
<version>1.0.0</version>
</parent>
<groupId>org.gidea</groupId>
<artifactId>Scada-Monitor-Client</artifactId>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.gidea</groupId>
<artifactId>Scada-Monitor-Common</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.gidea.scada.monitor.rabbit.config;
import lombok.extern.log4j.Log4j2;
import org.springframework.context.annotation.Configuration;
@Log4j2
@Configuration
public class MonitorClientConfig {
}
package org.gidea.scada.monitor.rabbit.listener;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.gidea.scada.monitor.rabbit.handler.RabbitMessageHandler;
import org.gidea.scada.monitor.utils.Constants;
import org.gidea.scada.monitor.utils.EmptyUtil;
import org.gidea.scada.monitor.utils.RabbitUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.ConcurrentReferenceHashMap;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.locks.Lock;
@Log4j2
@Component
public class MonitorRabbitListener {
@Resource
private RabbitMessageHandler messageHandler;
@Value("${org.gidea.monitor.name}")
private String monitorName;
private Map<String, SimpleMessageListenerContainer> queues = new ConcurrentReferenceHashMap<>();
private Map<String, Lock> queueLocks = new ConcurrentReferenceHashMap<>();
@Resource
private SimpleRabbitListenerContainerFactory containerFactory;
@Resource
private Channel channel;
private void queueListener(String deviceId, SimpleMessageListenerContainer container, String responseQueueName) {
container.setQueueNames(responseQueueName);
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) {
String messageStr = new String(message.getBody());
JSONObject data = JSONObject.parseObject(messageStr);
String client = data.getString("Client");
if (EmptyUtil.isEmpty(client) || client.equals(monitorName)) {
messageHandler.handleMessage(deviceId, messageStr);
}
}
});
container.start();
}
public void addDeviceListener(String deviceId) {
String queueName = RabbitUtils.getRequestQueueName(deviceId, monitorName);
// 创建监听设备请求队列
SimpleMessageListenerContainer container = queues.get(queueName);
if (container != null) {
return;
}
try {
channel.queueDeclare(queueName, true, true, true, null);
channel.queueBind(queueName, Constants.MonitorClientExchangeName, monitorName);
} catch (IOException e) {
throw new RuntimeException(e);
}
container = containerFactory.createListenerContainer();
queueListener(deviceId, container, queueName);
queues.put(queueName, container);
}
public void removeDeviceListener(String queueResponseName) {
// 删除监听设备响应队列
try {
SimpleMessageListenerContainer container = queues.get(queueResponseName);
if (container == null) {
return;
}
container.stop();
queues.remove(queueResponseName);
} finally {
// 删除设备锁信息
queueLocks.remove(queueResponseName);
}
}
}
package org.gidea.scada.monitor.rabbit.template;
import org.gidea.scada.monitor.rabbit.listener.MonitorRabbitListener;
import org.gidea.scada.monitor.utils.Constants;
import org.gidea.scada.monitor.utils.RabbitUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Configuration
public class MonitorRabbitTemplate {
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private MonitorRabbitListener rabbitListener;
/**
* 发送消息
*
* @param deviceId
* @param message
*/
public void sendMessage(String deviceId, String message) {
rabbitListener.addDeviceListener(deviceId);
rabbitTemplate.convertAndSend(Constants.MonitorServiceName, message);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.gidea</groupId>
<artifactId>Scada-Monitor</artifactId>
<version>1.0.0</version>
</parent>
<groupId>org.gidea</groupId>
<artifactId>Scada-Monitor-Common</artifactId>
<dependencies>
<!--httpClient请求工具-->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.6</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.5.6</version>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>
\ No newline at end of file
package org.gidea.scada.monitor.config;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.gidea.scada.monitor.utils.Constants;
import org.gidea.scada.monitor.utils.EmptyUtil;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.io.IOException;
@Log4j2
@EnableRabbit
@Configuration
public class RabbitMQConfig {
@Value("${org.gidea.rabbitmq.host}")
private String host;
@Value("${org.gidea.rabbitmq.port}")
private Integer port;
@Value("${org.gidea.rabbitmq.username}")
private String username;
@Value("${org.gidea.rabbitmq.password}")
private String password;
@Resource
private ApplicationContext applicationContext;
@Bean
public CachingConnectionFactory connectionFactory() {
// 创建 ConnectionFactory 实例
CachingConnectionFactory factory = new CachingConnectionFactory();
// 设置 RabbitMQ 服务器相关参数
factory.setHost(host); // 替换为实际的 RabbitMQ 主机地址
factory.setPort(port); // 默认端口,如有变动请替换
factory.setUsername(username); // 替换为实际的 RabbitMQ 用户名
factory.setPassword(password); // 替换为实际的 RabbitMQ 密码
factory.setVirtualHost("/"); // 替换为实际的 RabbitMQ 虚拟主机名称,默认为 /
return factory;
}
@Bean
public Channel channel(CachingConnectionFactory factory) {
Channel channel = factory.createConnection().createChannel(false);
return channel;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 可选配置:
// 设置确认模式(默认为 AUTO_ACKNOWLEDGE)
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
// 设置消费者数量(默认为 1)
factory.setConcurrentConsumers(5);
// 设置最大消费者数量(默认为 Integer.MAX_VALUE)
factory.setMaxConcurrentConsumers(10);
return factory;
}
@Bean
public RabbitAdmin amqpAdmin(CachingConnectionFactory connectionFactory) {
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
return admin;
}
}
package org.gidea.scada.monitor.rabbit.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
@Data
public class DeviceTopicMessage {
/**
* 设备标识
*/
private String deviceId;
}
package org.gidea.scada.monitor.rabbit.entity;
import lombok.Data;
@Data
public class MonitorMessage {
/**
* 客户端ID
*/
private String clientId;
/**
* 0: 下发指令
* 1: 设备绑定
*/
private int type = 0;
/**
* 数据
*/
private String data;
}
package org.gidea.scada.monitor.rabbit.handler;
public interface RabbitMessageHandler {
void handleMessage(String deviceId, String message);
}
package org.gidea.scada.monitor.utils;
public class Constants {
/**
* 监控服务主题
*/
public static final String MonitorServiceName = "MonitorServiceTopic";
public static final String TopicSpacer = ":";
public static final String MonitorServiceExchangeName = "monitor.service";
public static final String MonitorClientExchangeName = "monitor.client";
}
package org.gidea.scada.monitor.utils;
import java.util.Collection;
import java.util.Map;
/**
* @ClassName:
* @Description:
* @Author:三刀 Date:2019/12/23 15:24
* Version:1.0
**/
public class EmptyUtil {
/**
* 判断字符串是否为空
* PS:
* 为空的条件:
* 1. String对象为空
* 2. 没有任何字符的字符串
*
* @param str 需要判断的字符串
* @return 为空(true), 非空(false)
*/
public static boolean isEmpty(String str) {
return null == str || "".equals(str);
}
/**
* 判断字符串是否为空
* PS:
* 为空的条件:
* 1. String对象为空
* 2. 没有任何字符的字符串
*
* @param str 需要判断的字符串
* @param isTrimmed 判断前是否去掉字符串前后的空格:是(true), 否(false)
* @return 为空(true), 非空(false)
*/
public static boolean isEmpty(String str, boolean isTrimmed) {
return isTrimmed ? null == str || "".equals(str.trim()) : null == str || "".equals(str);
}
/**
* 判断对象是否为空
*
* @param obj 需要进行判断的对象
* @return 为空(true), 不为空(false)
*/
public static boolean isEmpty(Object obj) {
if(null==obj){
return true;
}
return false;
}
/**
* 判断map是否为空
* PS:
* 集合为空的条件:
* 1. map对象为null
* 2. map中没有元素
*
* @param map 需要进行判断的集合
* @return 为空(true), 不为空(false)
*/
public static boolean isEmpty(Map<?,?> map) {
return null == map || map.size() == 0;
}
/**
* 判断集合是否为空
* PS:
* 集合为空的条件:
* 1. 集合对象为null
* 2. 集合中没有元素
*
* @param collection 需要进行判断的集合
* @return 为空(true), 不为空(false)
*/
public static boolean isEmpty(Collection<?> collection) {
return null == collection || collection.size() == 0;
}
/**
* 判断对象数组是否为空
* PS:
* 对象数组为空的条件:
* 1. 对象数组为null
* 2. 对象数组中没有元素
*
* @param array 需要进行判断的对象数组
* @return 为空(true), 不为空(false)
*/
public static boolean isEmpty(Object[] array) {
return null == array || array.length == 0;
}
/**
* 判断数组是否为空
* PS:
* 数组为空的条件:
* 1. 数组为null
* 2. 数组中没有元素
*
* @param array 需要进行判断的数组
* @return 为空(true), 不为空(false)
*/
public static boolean isEmpty(long[] array) {
return array == null || array.length == 0;
}
/**
* 判断数组是否为空
* PS:
* 数组为空的条件:
* 1. 数组为null
* 2. 数组中没有元素
*
* @param array 需要进行判断的数组
* @return 为空(true), 不为空(false)
*/
public static boolean isEmpty(int[] array) {
return array == null || array.length == 0;
}
/**
* 判断数组是否为空
* PS:
* 数组为空的条件:
* 1. 数组为null
* 2. 数组中没有元素
*
* @param array 需要进行判断的数组
* @return 为空(true), 不为空(false)
*/
public static boolean isEmpty(short[] array) {
return array == null || array.length == 0;
}
/**
* 判断数组是否为空
* PS:
* 数组为空的条件:
* 1. 数组为null
* 2. 数组中没有元素
*
* @param array 需要进行判断的数组
* @return 为空(true), 不为空(false)
*/
public static boolean isEmpty(char[] array) {
return array == null || array.length == 0;
}
/**
* 判断数组是否为空
* PS:
* 数组为空的条件:
* 1. 数组为null
* 2. 数组中没有元素
*
* @param array 需要进行判断的数组
* @return 为空(true), 不为空(false)
*/
public static boolean isEmpty(byte[] array) {
return array == null || array.length == 0;
}
/**
* 判断数组是否为空
* PS:
* 数组为空的条件:
* 1. 数组为null
* 2. 数组中没有元素
*
* @param array 需要进行判断的数组
* @return 为空(true), 不为空(false)
*/
public static boolean isEmpty(double[] array) {
return array == null || array.length == 0;
}
/**
* 判断数组是否为空
* PS:
* 数组为空的条件:
* 1. 数组为null
* 2. 数组中没有元素
*
* @param array 需要进行判断的数组
* @return 为空(true), 不为空(false)
*/
public static boolean isEmpty(float[] array) {
return array == null || array.length == 0;
}
/**
* 判断数组是否为空
* PS:
* 数组为空的条件:
* 1. 数组为null
* 2. 数组中没有元素
*
* @param array 需要进行判断的数组
* @return 为空(true), 不为空(false)
*/
public static boolean isEmpty(boolean[] array) {
return array == null || array.length == 0;
}
public static boolean isEmpty(Long obj) {
return null==obj;
}
public static boolean isEmpty(Float obj) {return null==obj || obj == 0;}
}
package org.gidea.scada.monitor.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.http.NameValuePair;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class HttpClient {
private static Logger logger = LoggerFactory.getLogger(HttpClient.class);
private static int TIME_OUT = 5;
public static String doGet(String url, Map<String, String> haeder, Map<String, String> param) {
return doGet(url, haeder, param, TIME_OUT);
}
public static String doGet(String url, Map<String, String> haeder, Map<String, String> param, int timeOut) {
// 创建Httpclient对象
CloseableHttpClient httpClient = HttpClients.createDefault();
CloseableHttpResponse response = null;
String resultString = "";
try {
URIBuilder builder = new URIBuilder(url);
if (param != null) {
Iterator var7 = param.keySet().iterator();
while (var7.hasNext()) {
String key = (String) var7.next();
builder.addParameter(key, (String) param.get(key));
}
}
URI uri = builder.build();
// 创建Http Post请求
HttpGet httpGet = new HttpGet(uri);
if (haeder != null && haeder.keySet().size() > 0) {
for (String head : haeder.keySet()) {
httpGet.setHeader(head, haeder.get(head));
}
}
RequestConfig config = RequestConfig.custom().setConnectTimeout(timeOut * 1000).setSocketTimeout(timeOut * 1000).setConnectionRequestTimeout(timeOut * 1000).build();
httpGet.setConfig(config);
// 执行http请求
response = httpClient.execute(httpGet);
resultString = EntityUtils.toString(response.getEntity(), "utf-8");
} catch (Exception e) {
logger.error("访问GET地址报错:" + url);
logger.error("访问地址Haeder:" + JSONObject.toJSONString(haeder));
logger.error("访问地址参数:" + JSONObject.toJSONString(param));
} finally {
try {
if (response != null) {
response.close();
}
} catch (IOException e) {
}
}
resultString = String.valueOf(JSON.parse(resultString));
return resultString;
}
public static String doPost(String url, Map<String, String> haeder, Map<String, String> param) {
// 创建Httpclient对象
CloseableHttpClient httpClient = HttpClients.createDefault();
CloseableHttpResponse response = null;
String resultString = "";
try {
// 创建Http Post请求
HttpPost httpPost = new HttpPost(url);
if (haeder != null && haeder.keySet().size() > 0) {
for (String head : haeder.keySet()) {
httpPost.setHeader(head, haeder.get(head));
}
}
// 创建参数列表
if (param != null) {
List<NameValuePair> paramList = new ArrayList<>();
for (String key : param.keySet()) {
String va1 = param.get(key);
paramList.add(new BasicNameValuePair(key, va1));
}
UrlEncodedFormEntity entity = new UrlEncodedFormEntity(paramList, "UTF-8");
httpPost.setEntity(entity);
}
// 执行http请求
response = httpClient.execute(httpPost);
resultString = EntityUtils.toString(response.getEntity(), "utf-8");
} catch (Exception e) {
logger.error("访问POST地址报错:" + url);
logger.error("访问地址Haeder:" + JSONObject.toJSONString(haeder));
logger.error("访问地址参数:" + JSONObject.toJSONString(param));
} finally {
try {
if (response != null) {
response.close();
}
} catch (IOException e) {
}
}
resultString = String.valueOf(JSON.parse(resultString));
return resultString;
}
public static String doPostJson(String url, String json, Map<String, String> haeder) {
String s = doPostJson(url, json, haeder, TIME_OUT);
return s;
}
public static String doPostJson(String url, String json, Map<String, String> haeder, int timeOut) {
// 创建Httpclient对象
CloseableHttpClient httpClient = HttpClients.createDefault();
CloseableHttpResponse response = null;
String resultString = "";
try {
// 创建Http Post请求
HttpPost httpPost = new HttpPost(url);
if (haeder != null && haeder.keySet().size() > 0) {
for (String head : haeder.keySet()) {
httpPost.setHeader(head, haeder.get(head));
}
}
// 创建请求内容
StringEntity entity = new StringEntity(json, ContentType.APPLICATION_JSON);
RequestConfig config = RequestConfig.custom().setConnectTimeout(timeOut * 1000).setSocketTimeout(timeOut * 1000).setConnectionRequestTimeout(timeOut * 1000).build();
httpPost.setConfig(config);
httpPost.setEntity(entity);
// 执行http请求
response = httpClient.execute(httpPost);
resultString = EntityUtils.toString(response.getEntity(), "UTF-8");
} catch (Exception e) {
logger.error("访问POST地址报错:" + url);
logger.error("访问地址Haeder:" + JSONObject.toJSONString(haeder));
logger.error("访问地址参数:" + json);
} finally {
try {
if (response != null) {
response.close();
}
} catch (IOException e) {
}
}
return resultString;
}
}
package org.gidea.scada.monitor.utils;
public class RabbitUtils {
public static String getRequestQueueName(String deviceId, String monitorName) {
return monitorName + Constants.TopicSpacer + deviceId;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.gidea</groupId>
<artifactId>Scada-Monitor</artifactId>
<version>1.0.0</version>
<packaging>pom</packaging>
<modules>
<module>server</module>
<module>client</module>
<module>common</module>
</modules>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<!--继承springboot提供的父工程-->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.14</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--activeMq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.gidea</groupId>
<artifactId>Scada-Monitor</artifactId>
<version>1.0.0</version>
</parent>
<groupId>org.gidea</groupId>
<artifactId>Scada-Monitor-Server</artifactId>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.gidea</groupId>
<artifactId>Scada-Monitor-Common</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
<!--spring boot打包的话需要指定一个唯一的入门-->
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.5.14</version>
<configuration>
<!-- 指定该Main Class为全局的唯一入口 -->
<mainClass>org.gidea.scada.monitor.MonitorServerApplication</mainClass>
<layout>ZIP</layout>
</configuration>
<executions>
<execution>
<goals>
<!--可以把依赖的包都打包到生成的Jar包中-->
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package org.gidea.scada.monitor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
public class MonitorServerApplication {
public static void main(String[] args) {
SpringApplication.run(MonitorServerApplication.class, args);
System.out.println("服务已启动完成......");
}
}
\ No newline at end of file
package org.gidea.scada.monitor.mqtt.handler;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.gidea.scada.monitor.utils.Constants;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
@Component
public class MqttHandler {
@Resource
private Channel channel;
public void handler(String message) {
JSONObject msg = JSONObject.parseObject(message);
msg.put("result", "接收成功");
try {
channel.basicPublish(Constants.MonitorClientExchangeName,"",null, msg.toString().getBytes("UTF-8"));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
;
}
package org.gidea.scada.monitor.rabbit.config;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import org.gidea.scada.monitor.utils.Constants;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MonitorRabbitMQConfig {
@Autowired
Channel channel;
@SneakyThrows
@Bean
public FanoutExchange exchange(RabbitAdmin rabbitAdmin) {
FanoutExchange exchange1 = new FanoutExchange(Constants.MonitorServiceExchangeName, true, false);
FanoutExchange exchange2 = new FanoutExchange(Constants.MonitorClientExchangeName, true, false);
rabbitAdmin.declareExchange(exchange1);
rabbitAdmin.declareExchange(exchange2);
// // 声明请求主队列
Queue requestQueue = new Queue(Constants.MonitorServiceName, true, true, true);
rabbitAdmin.declareQueue(requestQueue);
// channel.queueDeclare(Constants.MonitorServiceName, true, true, true, null);
channel.queueBind(Constants.MonitorServiceName, Constants.MonitorServiceExchangeName, "");
return exchange2;
}
}
package org.gidea.scada.monitor.rabbit.listener;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.gidea.scada.monitor.mqtt.handler.MqttHandler;
import org.gidea.scada.monitor.rabbit.entity.MonitorMessage;
import org.gidea.scada.monitor.utils.Constants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
@Log4j2
@Component
public class MonitorRabbitListener {
@Resource
private MqttHandler mqttHandler;
@RabbitListener(queues = Constants.MonitorServiceName)
public void deviceTopic(String message) {
log.info("接收消息:{}", message);
mqttHandler.handler(message);
}
}
org.gidea.rabbitmq.host=192.168.110.121
org.gidea.rabbitmq.port=5672
org.gidea.rabbitmq.username=admin
org.gidea.rabbitmq.password=qwerty
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论