Skip to content

共享订阅

EMQX 实现了 MQTT 的共享订阅功能。共享订阅是一种订阅模式,用于在多个订阅者之间实现负载均衡。客户端可以分为多个订阅组,消息仍然会被转发到所有订阅组,但每个订阅组内只有一个客户端接收消息。您可以为一组订阅者的原始主题添加前缀以启用共享订阅。EMQX 支持两种格式的共享订阅前缀,分别为带群组的共享订阅(前缀为 $share/<group-name>/)和不带群组的共享订阅(前缀为 $queue/)。两种共享订阅格式示例如下:

前缀格式示例前缀真实主题名
带群组格式$share/abc/t/1$share/abc/t/1
不带群组格式$queue/t/1$queue/t/1

您可以使用客户端工具连接 EMQX 并尝试这个消息服务。 本节介绍了共享订阅的机制并演示了如何使用 MQTTX DesktopMQTTX CLI 来模拟客户端尝试通过共享订阅来接收消息。

带群组的共享订阅

您可以通过在原始主题前添加 $share/<group-name> 前缀为分组的订阅者启用共享订阅。组名可以是任意字符串。EMQX 同时将消息转发给不同的组,属于同一组的订阅者可以使用负载均衡接收消息。

例如,如果订阅者 s1s2s3 是组 g1 的成员,订阅者 s4s5 是组 g2 的成员,而所有订阅者都订阅了原始主题 t1。共享订阅的主题必须是 $share/g1/t1$share/g2/t1。当 EMQX 发布消息 msg1 到原始主题 t1 时:

  • EMQX 将 msg1 发送给 g1g2 两个组。
  • s1s2s3 中的一个订阅者将接收 msg1
  • s4s5 中的一个订阅者将接收 msg1
shared_subscription_group

不带群组的共享订阅

$queue/ 为前缀的共享订阅是不带群组的共享订阅。它是 $share 订阅的一种特例。您可以将其理解为所有订阅者都在一个订阅组中,如 $share/$queue

shared_subscription_queue

共享订阅与会话

在 MQTT 客户端中,共享订阅和持久会话功能的概念存在矛盾,因此无法同时使用这两个特性。如果您正在使用共享订阅功能,则必须通过将 clean_session 参数设置为 true 来启用 clean session 功能。

持久会话功能(clean_session=false)确保订阅者在重新连接后可以立即恢复数据流,而不会丢失任何消息。这对于确保消息传递的可靠性至关重要。通过将 clean_session 参数设置为 false,即使客户端离线,会话仍将持续存在,使设备可以继续接收消息。然而,由于设备处于离线状态,可能无法及时处理接收到的消息,导致消息在会话中随着时间的推移不断积累。

当共享订阅功能被启用,同一组内的另一个设备接管离线设备的数据流,但它不会收到任何已积累的消息,因为这些消息属于原始设备的会话。因此,如果设备长时间保持离线状态,持久会话的消息缓冲区可能会溢出,导致消息丢失。这种情况会影响负载平衡,并最终导致内存和存储资源耗尽,对系统的稳定性和性能产生负面影响。

更多关于持久会话功能的信息,参阅 MQTT 持久会话与 Clean Session 详解

使用 MQTTX Desktop 尝试共享订阅

前置准备

  • 了解 MQTT 的共享订阅
  • 能使用 MQTTX 进行基本的发布和订阅操作。

以下步骤演示了如何为原始主题加上 share 前缀让不同组的订阅者共享相同主题的订阅,以及这些订阅者将如何接收来自共享订阅的消息。

在本演示中,您可以创建一个名为 Demo 的客户端连接作为发布者,向主题 t/1 发布消息。然后,您可以创建 4 个客户端连接作为订阅者,例如 Subscriber1Subscriber2 Subscriber3Subscriber4。订阅者可以分为 ab 两个组,并且两个组都订阅主题 t/1

  1. 启动 EMQX 和 MQTTX Desktop。点击新建连接创建一个名为 Demo 的客户端连接作为发布者。

    • 名称栏中输入Demo
    • 在本演示中,服务器地址使用本地主机 127.0.0.1 作为示例。
    • 其它设置保持默认,点击连接

    TIP

    MQTTX Desktop 中介绍了更多详细的连接创建信息。

    retain-message-new-connection-general
  2. 点击连接窗格中的 + -> 新建连接创建 4 个新连接作为订阅者。将名称分别设置为 Subscriber1Subscriber2Subscriber3Subscriber4

  3. 连接窗格中依次选择订阅者客户端,点击添加订阅为各个订阅者创建共享订阅。根据下面的规则在主题栏中输入正确的主题。

    为了给多个订阅者分组,您需要在订阅的主题t/1前加上组名 {group} 。为了使他们同时订阅同一个主题,您还需要在组名前加上前缀 $share

    添加订阅弹出窗口中:

    • Subscribe1Subscriber2 订阅的主题设为 $share/a/t/1
    • Subscriber3Subscriber4的**主题 **设为 $share/b/t/1

    在以上主题示例中,

    • 前缀 $share 表明这是一个共享订阅。
    • {group}ab,也可以是其他自定义的名称。
    • t/1 是原始主题。

    其他选项保留为默认设置。点击确定

    shared-subscription
  4. 连接窗格中选择客户端 Demo发布消息。

    • 发布一条主题为 t/1 的消息。a 组的客户端 Subscriber1b 组的 Subscriber4 都会收到消息。

      shared-subscription-1
    • 再次发送一条相同的消息。a 组的客户端 Subscriber2b 组的客户端 Subscriber3 都会收到消息。

      shared-subscription-2

TIP

当共享订阅的消息被发布, EMQX 会同时将消息转发到不同的组,但是同一个组内一次只有一个订阅者会收到消息。

使用 MQTTX CLI 尝试共享订阅

  1. 将四个订阅者分为两个组,并订阅主题 t/1

    bash
    # 客户端 A 和 B 订阅主题 `$share/my_group1/t/1`
    mqttx sub -t '$share/my_group1/t/1' -h 'localhost' -p 1883
    
    ## 客户端 C 和 D 订阅主题 `$share/my_group2/t/1`
    mqttx sub -t '$share/my_group2/t/1' -h 'localhost' -p 1883
  2. 使用一个新的客户端,向原始主题 t/1 发布 4 条 payload 为 1234 的消息:

    bash
    mqttx pub -t 't/1' -m '1' -h 'localhost' -p 1883
    mqttx pub -t 't/1' -m '2' -h 'localhost' -p 1883
    mqttx pub -t 't/1' -m '3' -h 'localhost' -p 1883
    mqttx pub -t 't/1' -m '4' -h 'localhost' -p 1883
  3. 检查每个订阅组中的客户端接收到的消息:

    • 订阅组 1(A 和 B) 和 订阅组 2 (C 和 D) 同时接收到消息。
    • 同一组中的订阅者每次只有一个接收到消息。