保留消息和最后遗嘱
文章目录
保留消息(Retained)
保留消息是指在 PUBLISH 数据包中 Retain 标识设为 1 的消息,Broker 收到这样的 PUBLISH 包以后,将保存这个消息以及它的QoS,当有一个新的订阅者订阅相应主题的时候,Broker 会马上将这个消息发送给订阅者。
保留消息具有以下特点:
一个 Topic 只能有 1 条保留消息,发布新的保留消息将覆盖老的保留消息
如果订阅者使用通配符订阅主题,它会收到所有匹配的主题上的保留消息,如下所示
保留消息发送到订阅者时,消息的 Retain 标识仍然是 1,订阅者可以判断这个消息是否是保留消息,以做相应的处理。同时,订阅者收到的消息的QoS与保留消息一致
只有新的订阅者才会收到保留消息,如果订阅者重复订阅一个主题,也会被当做新的订阅者,然后收到保留消息
删除一个 Retained 消息只需要向这个主题发布一个 Payload 长度为 0 的 Retained 消息即可
注意:保留消息和持久性会话没有关系,保留消息是 Broker为每一个Topic单独存储的,而持久性会话是 Broker 为每一个 Client 单独存储的
最后遗嘱(LWT - Last Will and Testament)
遗嘱消息是在客户端连接的时候在CONNETCT报文里设置,详见[CONNECT报文](MQTT控制报文 | Standing on the Shoulder of Giants (jonathanlin.top))。举例来首,下图的CONNECT报文中:
- Will Flag标志打开,表明开启使用LWT
- Will Retain标志打开,表明遗嘱消息作为保留消息发布(即客户端异常断开以后,broker会往遗嘱Topic里写入遗嘱消息,且遗嘱消息以保留消息的方式保存)
- 遗嘱消息的长度,Topic以及内容见下图的红框里的内容
当客户端异常断开连接的时候,Broker会往遗嘱Topic里发送遗嘱消息,这样订阅遗嘱Topic的客户端就能相应的收到遗嘱消息。如果客户端通过DISCONNECT报文正常断开连接,那么Broker不会触发LWT机制,同时Broker把遗嘱消息从服务端删除。
Broker在以下情况认为Client是异常断开连接:
- 服务端检测到了一个 I/O 错误或者网络故障
- 客户端在保持连接(Keep Alive)的时间内未能通讯
- 客户端没有先发送 DISCONNECT 报文直接关闭了网络连接
- 由于协议错误服务端关闭了网络连接
保留消息和最后遗嘱结合使用的场景
如果要实现设备上线和离线通知(下线包括正常离线和异常离线),可以结合保留消息和最后遗嘱来实现。参考下面的golang代码:
1func main() {
2 clientOption := mqtt.NewClientOptions()
3 clientOption.AddBroker("tcp://192.168.60.40:1883")
4 clientOption.SetUsername("user")
5 clientOption.SetPassword("1234")
6 clientOption.SetClientID("jonlimx110")
7 // 通过CONNECT设置遗嘱消息,这里把遗嘱消息的Will Retian标志位打开,使得订阅mqtttest/status主题的客户端,不论是当前是连接的还是后续连接的都能接收到遗嘱消息
8 clientOption.SetWill("mqtttest/status", "lwt - offline", 0, true)
9 client := mqtt.NewClient(clientOption)
10 if token := client.Connect(); token.Wait() && token.Error() != nil {
11 panic(token.Error())
12 }
13
14 // 客户端上线以后,往mqtttest/status主题发布消息,通知该客户端上线
15 token := client.Publish("mqtttest/status", 0, true, "online")
16 token.Wait()
17
18 c := make(chan os.Signal, 1)
19 signal.Notify(c, os.Interrupt, syscall.SIGTERM)
20
21 select {
22 case <-c:
23 // 这里没有调用client.Disconnect就退出,模拟异常离线的情况
24 // 异常离线后,Broker触发LWT,往mqtttest/status发布遗嘱消息,且是以保留消息的方式发布
25 fmt.Println("exit without disconnect from broker")
26 case <-time.After(5 * time.Second):
27 // 客户端如果正常离线,发布保留消息通过客户端离线
28 token = client.Publish("mqtttest/status", 0, true, "retained - offline")
29 token.Wait()
30 client.Disconnect(100)
31 }
32}