保留消息和最后遗嘱

文章目录

保留消息(Retained)

保留消息是指在 PUBLISH 数据包中 Retain 标识设为 1 的消息,Broker 收到这样的 PUBLISH 包以后,将保存这个消息以及它的QoS,当有一个新的订阅者订阅相应主题的时候,Broker 会马上将这个消息发送给订阅者。

Retained Message

保留消息具有以下特点:

  • 一个 Topic 只能有 1 条保留消息,发布新的保留消息将覆盖老的保留消息

  • 如果订阅者使用通配符订阅主题,它会收到所有匹配的主题上的保留消息,如下所示

    通配符订阅

  • 保留消息发送到订阅者时,消息的 Retain 标识仍然是 1,订阅者可以判断这个消息是否是保留消息,以做相应的处理。同时,订阅者收到的消息的QoS与保留消息一致

    image-20230412170420371

  • 只有新的订阅者才会收到保留消息,如果订阅者重复订阅一个主题,也会被当做新的订阅者,然后收到保留消息

  • 删除一个 Retained 消息只需要向这个主题发布一个 Payload 长度为 0 的 Retained 消息即可

注意:保留消息和持久性会话没有关系,保留消息是 Broker为每一个Topic单独存储的,而持久性会话是 Broker 为每一个 Client 单独存储的

最后遗嘱(LWT - Last Will and Testament)

遗嘱消息是在客户端连接的时候在CONNETCT报文里设置,详见[CONNECT报文](MQTT控制报文 | Standing on the Shoulder of Giants (jonathanlin.top))。举例来首,下图的CONNECT报文中:

  • Will Flag标志打开,表明开启使用LWT
  • Will Retain标志打开,表明遗嘱消息作为保留消息发布(即客户端异常断开以后,broker会往遗嘱Topic里写入遗嘱消息,且遗嘱消息以保留消息的方式保存)
  • 遗嘱消息的长度,Topic以及内容见下图的红框里的内容

image-20230523101516636

当客户端异常断开连接的时候,Broker会往遗嘱Topic里发送遗嘱消息,这样订阅遗嘱Topic的客户端就能相应的收到遗嘱消息。如果客户端通过DISCONNECT报文正常断开连接,那么Broker不会触发LWT机制,同时Broker把遗嘱消息从服务端删除。

Broker在以下情况认为Client是异常断开连接:

  • 服务端检测到了一个 I/O 错误或者网络故障
  • 客户端在保持连接(Keep Alive)的时间内未能通讯
  • 客户端没有先发送 DISCONNECT 报文直接关闭了网络连接
  • 由于协议错误服务端关闭了网络连接

保留消息和最后遗嘱结合使用的场景

如果要实现设备上线和离线通知(下线包括正常离线和异常离线),可以结合保留消息和最后遗嘱来实现。参考下面的golang代码:

 1func main() {
 2	clientOption := mqtt.NewClientOptions()
 3	clientOption.AddBroker("tcp://192.168.60.40:1883")
 4	clientOption.SetUsername("user")
 5	clientOption.SetPassword("1234")
 6	clientOption.SetClientID("jonlimx110")
 7  // 通过CONNECT设置遗嘱消息,这里把遗嘱消息的Will Retian标志位打开,使得订阅mqtttest/status主题的客户端,不论是当前是连接的还是后续连接的都能接收到遗嘱消息
 8	clientOption.SetWill("mqtttest/status", "lwt - offline", 0, true)
 9	client := mqtt.NewClient(clientOption)
10	if token := client.Connect(); token.Wait() && token.Error() != nil {
11		panic(token.Error())
12	}
13  
14  // 客户端上线以后,往mqtttest/status主题发布消息,通知该客户端上线
15	token := client.Publish("mqtttest/status", 0, true, "online")
16	token.Wait()
17
18	c := make(chan os.Signal, 1)
19	signal.Notify(c, os.Interrupt, syscall.SIGTERM)
20
21	select {
22	case <-c:
23    // 这里没有调用client.Disconnect就退出,模拟异常离线的情况
24    // 异常离线后,Broker触发LWT,往mqtttest/status发布遗嘱消息,且是以保留消息的方式发布
25		fmt.Println("exit without disconnect from broker")
26	case <-time.After(5 * time.Second):
27    // 客户端如果正常离线,发布保留消息通过客户端离线
28		token = client.Publish("mqtttest/status", 0, true, "retained - offline")
29		token.Wait()
30		client.Disconnect(100)
31	}
32}