Commit f8d8f8ee by 韩斌

[20240315007-ZY20240415002] 完成控制端与采集端的指令下达和反馈通信-调整

parent 4de5136e
package org.gidea.scada.monitor.rabbit.config;
import org.gidea.scada.monitor.rabbit.entity.MonitorMessage;
/**
* 客户端对命令的处理方式,自定义
*/
public interface CommandAnalysis {
/**
* 请求指令,是否需要对指令做存储方便对接收到的响应指令做分析判断是否是自己的指令
*
* @param message
*/
void requestCommand(MonitorMessage message);
/**
* 响应指令,是否需要对指令响应结果做分析判断是否是自己的指令的响应结果
*
* @param message
*/
void responseCommand(MonitorMessage message);
/**
* 获取响应指令
*
* @param requestMessage 请求指令
* @return
*/
MonitorMessage result(MonitorMessage requestMessage);
}
package org.gidea.scada.monitor.rabbit.config;
import com.rabbitmq.client.Channel;
import org.gidea.scada.monitor.rabbit.entity.MonitorMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.io.IOException;
import static org.gidea.scada.monitor.utils.RabbitExchangeEnum.MonitorCommandService;
@Configuration
public class MonitorClient {
@Resource
private Channel channel;
@Autowired(required = false)
private CommandAnalysis analysis;
public void sendCommand(MonitorMessage message) throws IOException {
if (analysis != null) {
analysis.requestCommand(message);
}
channel.basicPublish(MonitorCommandService.exchangeName, "", null, message.toString().getBytes());
}
}
......@@ -15,6 +15,7 @@ import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFacto
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
......@@ -22,8 +23,6 @@ import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.gidea.scada.monitor.utils.RabbitExchangeEnum.MonitorCommandClient;
import static org.gidea.scada.monitor.utils.RabbitExchangeEnum.MonitorCommandService;
......@@ -38,9 +37,8 @@ public class MonitorClientConfig {
@Resource
private Channel channel;
private Map<Integer, MonitorMessage> commandHistory = new HashMap<>();
private Map<Integer, MonitorMessage> commandResult = new HashMap<>();
@Autowired(required = false)
private CommandAnalysis analysis;
@SneakyThrows
......@@ -61,27 +59,16 @@ public class MonitorClientConfig {
public void onMessage(Message message, Channel channel) {
String messageStr = new String(message.getBody());
MonitorMessage monitorMessage = JSONObject.parseObject(messageStr, MonitorMessage.class);
MonitorMessage history = commandHistory.get(monitorMessage.getCmd());
if (history != null) {
commandResult.put(monitorMessage.getCmd(), monitorMessage);
commandHistory.remove(monitorMessage.getCmd());
if (analysis != null) {
analysis.responseCommand(monitorMessage);
}
log.info("MonitorService接收数据:{}", messageStr);
log.info("MonitorClient接收数据:{}", messageStr);
}
});
container.start();
return commandClientExchange;
}
public void sendCommand(MonitorMessage message) throws IOException {
channel.basicPublish(MonitorCommandService.exchangeName, "", null, message.toString().getBytes());
commandHistory.put(message.getCmd(), message);
if (commandResult.get(message.getCmd()) != null) {
commandResult.remove(message.getCmd());
}
}
public MonitorMessage getResult(int cmd) {
return commandResult.get(cmd);
}
}
......@@ -49,10 +49,23 @@ public class MonitorRabbitListener {
}
/**
* 设备实时数据监听
*
* @param deviceId 设备标识
* @param messageHandler 设备消息处理
*/
public void addDeviceListener(String deviceId, RabbitMessageHandler messageHandler) {
addDeviceListener(deviceId, messageHandler, true);
}
/**
* 设备数据监听
*
* @param deviceId 设备标识
* @param messageHandler 设备消息处理
* @param isReal 是否是实时数据 (true:实时数据,false:聚合数据)
*/
public void addDeviceListener(String deviceId, RabbitMessageHandler messageHandler, boolean isReal) {
String queueName = MonitorUtils.getRequestQueueName(deviceId, monitorName);
// 创建监听设备请求队列
SimpleMessageListenerContainer container = queues.get(queueName);
......@@ -60,7 +73,7 @@ public class MonitorRabbitListener {
return;
}
try {
String exchangeName = MonitorUtils.getDeviceExchangeName(deviceId);
String exchangeName = isReal ? MonitorUtils.getDeviceExchangeName(deviceId) : MonitorUtils.getJhExchangeName(deviceId);
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, false, false, null);
channel.queueDeclare(queueName, true, true, true, null);
channel.queueBind(queueName, exchangeName, deviceId);
......
......@@ -6,7 +6,7 @@ import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
public class MonitorMessage {
public class MonitorMessage<T> {
/**
* 采集端ID
*/
......@@ -16,9 +16,13 @@ public class MonitorMessage {
*/
private int cmd;
/**
* 是否响应 (-1:响应失败,0:下发,1:响应)
*/
private int code;
/**
* 数据
*/
private JSONObject data;
private Object data;
@Override
public String toString() {
......
package org.gidea.scada.monitor.utils;
import java.util.UUID;
public class MonitorUtils {
public static String getDeviceExchangeName(String deviceId) {
return RabbitExchangeEnum.MonitorDeviceData.exchangeName + Constants.TopicSpacer + deviceId;
}
public static String getJhExchangeName(String deviceId) {
return RabbitExchangeEnum.MonitorDeviceJh.exchangeName + Constants.TopicSpacer + deviceId;
}
public static String getCjdClientExchangeName(String deviceId) {
return RabbitExchangeEnum.MonitorCjdClient.exchangeName + Constants.TopicSpacer + deviceId;
}
......@@ -14,7 +20,7 @@ public class MonitorUtils {
}
public static String getRequestQueueName(String deviceId, String monitorName) {
return monitorName + Constants.TopicSpacer + deviceId;
return monitorName + Constants.TopicSpacer + deviceId + Constants.TopicSpacer + UUID.randomUUID();
}
public static String getQueueName(String deviceId, String monitorName) {
......
package org.gidea.scada.monitor.utils;
public enum RabbitExchangeEnum {
MonitorCommandService("monitor.command.service"), MonitorCommandClient("monitor.command.client"), MonitorCjdService("monitor.cjd.service"), MonitorCjdClient("monitor.cjd.client"), MonitorDeviceData("monitor.device.data"), MonitorDeviceJh("monitor.device.jh"),
MonitorCommandService("monitor.command.service"),
MonitorCommandClient("monitor.command.client"),
MonitorCjdService("monitor.cjd.service"),
MonitorCjdClient("monitor.cjd.client"),
MonitorDeviceData("monitor.device.data"),
MonitorDeviceJh("monitor.device.jh"),
;
public final String exchangeName;
......
......@@ -47,7 +47,7 @@ public class CjdServiceMqtt {
sampleClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost. Reason: " + cause);
System.out.println("CjdServiceMqtt Connection lost. Reason: " + cause);
}
/**
......
......@@ -76,7 +76,7 @@ public class MonitorMqttListener {
sampleClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost. Reason: " + cause);
System.out.println("MonitorMqttListener Connection lost. Reason: " + cause);
}
/**
......
......@@ -44,7 +44,7 @@ public class RealtimeMqtt {
sampleClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost. Reason: " + cause);
System.out.println("RealtimeMqtt Connection lost. Reason: " + cause);
}
/**
......
......@@ -32,6 +32,7 @@ public class CommandListenerHandler {
data.put("time", System.currentTimeMillis());
data.put("result", "采集端未连接!");
monitorMessage.setData(data);
monitorMessage.setCmd(-1);
try {
channel.basicPublish(RabbitExchangeEnum.MonitorCommandClient.exchangeName, "", null, JSONObject.toJSONString(monitorMessage).getBytes("UTF-8"));
} catch (IOException e) {
......@@ -54,6 +55,7 @@ public class CommandListenerHandler {
data.put("time", System.currentTimeMillis());
data.put("result", "采集端未连接!");
monitorMessage.setData(data);
monitorMessage.setCmd(-1);
channel.basicPublish(RabbitExchangeEnum.MonitorCommandClient.exchangeName, "", null, JSONObject.toJSONString(monitorMessage).getBytes("UTF-8"));
}
} catch (Exception e) {
......
......@@ -159,13 +159,15 @@ public class UdpServerConfig {
} catch (IOException e) {
e.printStackTrace();
}
JSONObject data = new JSONObject();
JSONObject mqtt = new JSONObject();
mqtt.put("url", url);
mqtt.put("username", username);
mqtt.put("password", password);
mqtt.put("service", RabbitExchangeEnum.MonitorCjdService.exchangeName);
mqtt.put("client", RabbitExchangeEnum.MonitorCjdClient.exchangeName);
cjdMessage.put("mqtt", mqtt);
data.put("mqtt", mqtt);
cjdMessage.setData(data);
CjdServiceMqtt serviceMqtt = mqttMap.get(cjdMessage.getBscode());
try {
if (serviceMqtt == null) {
......
......@@ -11,7 +11,7 @@ public class CjdMessage {
private int cmd;
private String ip;
private String bscode;
private JSONObject data;
private Object data;
public CjdMessage(CjdMessage message) {
if (message == null) {
......@@ -23,12 +23,4 @@ public class CjdMessage {
this.bscode = message.bscode;
this.data = new JSONObject();
}
public CjdMessage put(String key, Object value) {
if (this.data == null) {
this.data = new JSONObject();
}
this.data.put(key, value);
return this;
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论