MQTT整合

一、MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上。MQTT最大优点在于,用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

要了解 MQTT 的工作原理,首先需要掌握以下几个概念:MQTT 客户端、MQTT Broker、发布-订阅模式、主题、QoS。

MQTT 客户端

任何运行 MQTT 客户端库的应用或设备都是 MQTT 客户端。例如,使用 MQTT 的即时通讯应用是客户端,使用 MQTT 上报数据的各种传感器是客户端,各种 MQTT 测试工具也是客户端。

MQTT Broker

MQTT Broker 是负责处理客户端请求的关键组件,包括建立连接、断开连接、订阅和取消订阅等操作,同时还负责消息的转发。一个高效强大的 MQTT Broker 能够轻松应对海量连接和百万级消息吞吐量,从而帮助物联网服务提供商专注于业务发展,快速构建可靠的 MQTT 应用。

发布-订阅模式

发布-订阅模式与客户端-服务器模式的不同之处在于,它将发送消息的客户端(发布者)和接收消息的客户端(订阅者)进行了解耦。发布者和订阅者之间无需建立直接连接,而是通过 MQTT Broker 来负责消息的路由和分发。

下图展示了 MQTT 发布/订阅过程。温度传感器作为客户端连接到 MQTT Broker,并通过发布操作将温度数据发布到一个特定主题(例如 Temperature)。MQTT Broker 接收到该消息后会负责将其转发给订阅了相应主题(Temperature)的订阅者客户端。

主题

MQTT 协议根据主题来转发消息。主题通过 / 来区分层级,类似于 URL 路径,例如:

1
2
3
4
5
chat/room/1

sensor/10/temperature

sensor/+/temperature

MQTT 主题支持以下两种通配符:+#

  • +:表示单层通配符,例如 a/+ 匹配 a/xa/y
  • #:表示多层通配符,例如 a/# 匹配 a/xa/b/c/d

注意:通配符主题只能用于订阅,不能用于发布。

QoS

MQTT 提供了三种服务质量(QoS),在不同网络环境下保证消息的可靠性。

  • QoS 0:消息最多传送一次。如果当前客户端不可用,它将丢失这条消息。
  • QoS 1:消息至少传送一次。
  • QoS 2:消息只传送一次。

二、MQTT服务搭建

EMQ X 是一款完全开源,高可用低时延的百万级分布式物联网 MQTT 5.0 消息服务器。EMQX 支持多种协议,包括 MQTT (3.1、3.1.1 和 5.0)、HTTP、QUIC 和 WebSocket 等,保证各种网络环境和硬件设备的可访问性。EMQX 还提供了全面的 SSL/TLS 功能支持,比如双向认证以及多种身份验证机制,为物联网设备和应用程序提供可靠和高效的通信基础设施。

使用Docker可以快速搭建服务

运行以下命令获取 Docker 镜像:

1
docker pull emqx/emqx:5.2.1

运行以下命令启动 Docker 容器。

1
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.2.1

通过浏览器访问 http://localhost:18083/localhost 可替换为您的实际 IP 地址)以访问 EMQX Dashboard 管理控制台,进行设备连接与相关指标监控管理。

默认用户名及密码:

admin

public

三、SpringBoot集成

maven依赖

1
2
3
4
5
6
7
8
9
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>

application.yml文件配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
## MQTT配置
mqtt:
#MQTT-用户名
username: admin
#MQTT-密码
password: public
#MQTT-服务器连接地址
url: tcp://ip:1883
#MQTT-连接服务器默认客户端ID
clientId: client-test_server
#MQTT-默认的消息推送主题,实际可在调用接口时指定
defaultTopic: topic
#MQTT-心跳
keepAliveInterval: 60
#MQTT-自动重连
automaticReconnect: true
#MQTT-清除session
cleanSession: false
#MQTT-超时连接
completionTimeout: 30

订阅接受消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttSenderConfig {

@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.url}")
private String url;
@Value("${mqtt.clientId}")
private String clientId;
@Value("${mqtt.defaultTopic}")
private String defaultTopic;
@Value("${mqtt.keepAliveInterval}")
private Integer keepAliveInterval;
@Value("${mqtt.automaticReconnect}")
private Boolean automaticReconnect;
@Value("${mqtt.cleanSession}")
private Boolean cleanSession;
@Value("${mqtt.completionTimeout}")
private Integer completionTimeout;


/**
* mqtt连接选项
*
* @return {@link MqttConnectOptions}
*/
@Bean
public MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setServerURIs(new String[]{url});
options.setKeepAliveInterval(keepAliveInterval);
options.setAutomaticReconnect(automaticReconnect);
options.setCleanSession(cleanSession);
options.setConnectionTimeout(completionTimeout);
return options;
}

/**
* mqtt-paho客户工厂
*
* @param mqttConnectOptions mqtt连接选项
* @return {@link MqttPahoClientFactory}
*/
@Bean
public MqttPahoClientFactory mqttPahoClientFactory(MqttConnectOptions mqttConnectOptions) {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(mqttConnectOptions);
return factory;
}

// 接收通道1
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}

//接受通道2
@Bean
public MessageChannel mqttInputChannelTmp() {
return new DirectChannel();
}

// 配置client,监听topic
@Bean
public MessageProducer inbound(MqttPahoClientFactory mqttPahoClientFactory) {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "-subscribe", mqttPahoClientFactory, "hello1", "topic1");
// 此处设置的是毫秒 10000为十秒
adapter.setCompletionTimeout(10000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}

// 配置client2,监听topic
@Bean
public MessageProducer inboundTmp(MqttPahoClientFactory mqttPahoClientFactory) {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "-subscribeTmp", mqttPahoClientFactory, "hello2", "topic2");
// 此处设置的是毫秒 10000为十秒
adapter.setCompletionTimeout(10000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannelTmp());
return adapter;
}

// 通过通道1获取数据
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler receiveHandler() {
return message -> {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String mess = message.getPayload().toString();
log.info("通道1监听收到来自:{}的消息:{}", topic, mess);
};
}

// 通过通道2获取数据
@Bean
@ServiceActivator(inputChannel = "mqttInputChannelTmp")
public MessageHandler receiveHandlerTmp() {
return message -> {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String mess = message.getPayload().toString();
log.info("通道2监听收到来自:{}的消息:{}", topic, mess);
};
}


/************************************************************************************************************/

// 推送通道
@Bean
public MessageChannel mqttOutputChannel() {
return new DirectChannel();
}

@Bean
@ServiceActivator(inputChannel = "mqttOutputChannel")
public MessageHandler sendHandler(MqttPahoClientFactory mqttPahoClientFactory) {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "-publish", mqttPahoClientFactory);
messageHandler.setAsync(true);
messageHandler.setDefaultQos(1);
messageHandler.setDefaultTopic(defaultTopic);
return messageHandler;
}

}

效果

1
2
3
4
5
6
2023-10-12 17:01:58.089  INFO 22488 --- [erver-subscribe] c.e.mqtt.configuration.MqttSenderConfig  : 通道1监听收到来自:hello1的消息:{"msg": "hello"}
2023-10-12 17:01:59.502 INFO 22488 --- [erver-subscribe] c.e.mqtt.configuration.MqttSenderConfig : 通道1监听收到来自:hello1的消息:{"msg": "hello"}
2023-10-12 17:02:00.358 INFO 22488 --- [erver-subscribe] c.e.mqtt.configuration.MqttSenderConfig : 通道1监听收到来自:hello1的消息:{"msg": "hello"}
2023-10-12 17:02:03.755 INFO 22488 --- [er-subscribeTmp] c.e.mqtt.configuration.MqttSenderConfig : 通道2监听收到来自:hello2的消息:{"msg": "hello"}
2023-10-12 17:02:04.922 INFO 22488 --- [er-subscribeTmp] c.e.mqtt.configuration.MqttSenderConfig : 通道2监听收到来自:hello2的消息:{"msg": "hello"}
2023-10-12 17:02:05.705 INFO 22488 --- [er-subscribeTmp] c.e.mqtt.configuration.MqttSenderConfig : 通道2监听收到来自:hello2的消息:{"msg": "hello"}

推送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttGateway {

/**
* 发送到mqtt
*
* @param data 数据
* @param topic 话题
*/
void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
}
/**
* 你好世界
*
* @return {@link String}
*/
@GetMapping("hello")
public String helloWorld() {
mqttUtil.sendToMqtt("helloworld", "hello1");
return "hello world";
}

四、总结

发现一个更好用的工具参考。

lets-mica/mica-mqtt: 基于 java aio 实现,开源、简单、易用、低延迟、高性能百万级 Java mqtt client 组件和 Java mqtt broker 服务。 (github.com)


MQTT整合
https://xiong-hai.github.io/2023/10/12/MQTT整合/
作者
Xiong-Hai
发布于
2023年10月12日
许可协议