# SQL 数据源和字段

规则的 SQL 语句可以处理的数据源有:MQTT 消息、MQTT 事件,或者是数据桥接。

SQL 语句使用 FROM 来指定数据源,在 SELECTWHERE 子句中可以引用相应的字段。 数据源类型不同,可以使用的字段也不同。

# 数据桥接

规则使用 $bridges/ 开头的主题来表示数据桥接的消息或事件。 格式为:$bridges/<type>:<name>

其中 <type>:<name> 部分是数据桥接的 ID,<type> 是数据桥接的类型,<name> 是数据桥接的名字。 比如 $bridges/mqtt:my_mqtt_bridge

# MQTT 桥接事件 ("$bridges/mqtt:*")

当该 MQTT 桥接从远程 MQTT Broker 接收到消息时触发规则

字段解释
idMQTT 消息 ID
server远程 MQTT Broker 的地址,例如 "broker.emqx.io:1883"
payloadMQTT 消息体
topicMQTT 主题
qosMQTT 消息的 QoS
dupMQTT 消息的 DUP Flag
retainMQTT 消息的 Retain Flag
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
message_received_atPUBLISH 消息到达 Broker 的时间 (单位:毫秒)

示例

SELECT
  *
FROM
  "$bridges/mqtt:my_mqtt_bridge"
1
2
3
4

输出:

{
  "id": "0005E27C1D24E44FF440000017520000",
  "server": "broker.emqx.io:1883",
  "payload": "hello",
  "topic": "t/a",
  "qos": 1,
  "dup": false,
  "retain": false,
  "pub_props": {
    "Message-Expiry-Interval": 30,
    "Payload-Format-Indicator": 0,
    "User-Property": {
      "foo": "bar"
    },
    "User-Property-Pairs": [
      {
        "key": "foo"
      },
      {
        "value": "bar"
      }
    ]
  },
  "message_received_at": 1645002753259,
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# MQTT 消息

规则的 SQL 语句可以处理消息发布。 在一个规则语句中,用户可以用 FROM 子句指定一个或者多个主题, 当任何消息发布到指定的主题时都会触发该规则。

字段解释
idMQTT 消息 ID
clientid消息来源 Client ID
username消息来源用户名
payloadMQTT 消息体
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
flagsMQTT 消息的 Flags
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (单位:毫秒)
publish_received_atPUBLISH 消息到达 Broker 的时间 (单位:毫秒)
node事件触发所在节点

示例

SELECT
  *
FROM
  "t/#"
1
2
3
4

输出

{
  "clientid": "c_emqx",
  "event": "message.publish",
  "event_type": "message_publish",
  "flags": {},
  "id": "0005E27C1D24E44FF440000017520000",
  "metadata": {
    "rule_id": "sql_tester:099ddfa9c466d1ca"
  },
  "node": "emqx@127.0.0.1",
  "payload": "abc",
  "peerhost": "192.168.0.10",
  "pub_props": {
    "Message-Expiry-Interval": 30,
    "Payload-Format-Indicator": 0,
    "User-Property": {
      "foo": "bar"
    },
    "User-Property-Pairs": [
      {
        "key": "foo"
      },
      {
        "value": "bar"
      }
    ]
  },
  "publish_received_at": 1656397576334,
  "qos": 1,
  "timestamp": 1656397576334,
  "topic": "t/a",
  "username": "u_emqx"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# MQTT 事件

规则的 SQL 语句既可以处理消息(消息发布),也可以处理事件(客户端上下线、客户端订阅等)。对于消息,FROM 子句后面直接跟主题名;对于事件,FROM 子句后面跟事件主题。

事件消息的主题以 "$events/" 开头,比如 "$events/client_connected", "$events/session_subscribed"。 如果想让 emqx 将事件消息发布出来,可以在 emqx_rule_engine.conf 文件中配置。

# FROM 子句可用的事件主题

事件主题名释义
$events/message_delivered消息投递
$events/message_acked消息确认
$events/message_dropped消息丢弃
$events/client_connected连接完成
$events/client_disconnected连接断开
$events/client_connack连接确认
$events/client_check_authz_complete鉴权完成
$events/session_subscribed订阅
$events/session_unsubscribed取消订阅

# 消息投递事件 ("$events/message_delivered")

当消息被放入底层socket时触发规则

字段解释
idMQTT 消息 ID
from_clientid消息来源 Client ID
from_username消息来源用户名
clientid消息目的 Client ID
username消息目的用户名
payloadMQTT 消息体
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
flagsMQTT 消息的 Flags
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (单位:毫秒)
publish_received_atPUBLISH 消息到达 Broker 的时间 (单位:毫秒)
node事件触发所在节点

示例

SELECT
  from_clientid,
  from_username,
  topic,
  qos,
  node,
  timestamp
FROM
  "$events/message_delivered"
1
2
3
4
5
6
7
8
9

输出

{
  "topic": "t/a",
  "timestamp": 1645002753259,
  "qos": 1,
  "node": "emqx@127.0.0.1",
  "from_username": "u_emqx_1",
  "from_clientid": "c_emqx_1"
}
1
2
3
4
5
6
7
8

# 消息确认事件 ("$events/message_acked")

当消息发送到客户端,并收到客户端回复的ack时触发规则,仅QOS1,QOS2会触发

字段解释
idMQTT 消息 ID
from_clientid消息来源 Client ID
from_username消息来源用户名
clientid消息目的 Client ID
username消息目的用户名
payloadMQTT 消息体
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
flagsMQTT 消息的 Flags
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
puback_propsPUBACK Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (单位:毫秒)
publish_received_atPUBLISH 消息到达 Broker 的时间 (单位:毫秒)
node事件触发所在节点

示例

SELECT
  from_clientid,
  from_username,
  topic,
  qos,
  node,
  timestamp
FROM
  "$events/message_acked"
1
2
3
4
5
6
7
8
9

输出

{
  "topic": "t/a",
  "timestamp": 1645002965664,
  "qos": 1,
  "node": "emqx@127.0.0.1",
  "from_username": "u_emqx_1",
  "from_clientid": "c_emqx_1"
}
1
2
3
4
5
6
7
8

# 消息在转发的过程中被丢弃事件 ("$events/message_dropped")

当一条消息无任何订阅者时触发规则

字段解释
idMQTT 消息 ID
reason消息丢弃原因,可能的原因:
no_subscribers:没有订阅者
clientid消息来源 Client ID
username消息来源用户名
payloadMQTT 消息体
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
flagsMQTT 消息的 Flags
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (单位:毫秒)
publish_received_atPUBLISH 消息到达 Broker 的时间 (单位:毫秒)
node事件触发所在节点

示例

SELECT
  reason,
  topic,
  qos,
  node,
  timestamp
FROM
  "$events/message_dropped"
1
2
3
4
5
6
7
8

输出

{
  "topic": "t/a",
  "timestamp": 1645003103004,
  "reason": "no_subscribers",
  "qos": 1,
  "node": "emqx@127.0.0.1"
}
1
2
3
4
5
6
7

# 消息在投递的过程中被丢弃事件 ("$events/delivery_dropped")

当订阅者的消息队列已满时触发规则

字段解释
idMQTT 消息 ID
reason消息丢弃原因,可能的原因:
queue_full:消息队列已满(QoS>0)
no_local:不允许客户端接收自己发布的消息
expired:消息或者会话过期
qos0_msg:QoS 0 的消息因为消息队列已满被丢弃
from_clientid消息来源 Client ID
from_username消息来源用户名
clientid消息目的 Client ID
username消息目的用户名
payloadMQTT 消息体
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
flagsMQTT 消息的 Flags
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (单位:毫秒)
publish_received_atPUBLISH 消息到达 Broker 的时间 (单位:毫秒)
node事件触发所在节点

示例

SELECT
  from_clientid,
  from_username,
  reason,
  topic,
  qos
FROM "$events/delivery_dropped"
1
2
3
4
5
6
7

输出

{
  "topic": "t/a",
  "reason": "queue_full",
  "qos": 1,
  "from_username": "u_emqx_1",
  "from_clientid": "c_emqx_1"
}
1
2
3
4
5
6
7

# 终端连接成功事件 ("$events/client_connected")

当终端连接成功时触发规则

字段解释
clientid消息目的 Client ID
username消息目的用户名
mountpoint主题挂载点(主题前缀)
peername终端的 IPAddress 和 Port
socknameemqx 监听的 IPAddress 和 Port
proto_name协议名字
proto_ver协议版本
keepaliveMQTT 保活间隔
clean_startMQTT clean_start
expiry_intervalMQTT Session 过期时间
is_bridge是否为 MQTT bridge 连接
connected_at终端连接完成时间 (单位:毫秒)
conn_propsCONNECT Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (单位:毫秒)
node事件触发所在节点

示例

SELECT
  clientid,
  username,
  keepalive,
  is_bridge
FROM
  "$events/client_connected"
1
2
3
4
5
6
7

输出

{
  "username": "u_emqx",
  "keepalive": 60,
  "is_bridge": false,
  "clientid": "c_emqx"
}
1
2
3
4
5
6

# 终端连接断开事件 ("$events/client_disconnected")

当终端连接断开时触发规则

字段解释
reason终端连接断开原因:
normal:客户端主动断开
kicked:服务端踢出,通过 REST API
keepalive_timeout:keepalive 超时
not_authorized:认证失败,或者 acl_nomatch = disconnect 时没有权限的 Pub/Sub 会主动断开客户端
tcp_closed:对端关闭了网络连接
internal_error:畸形报文或其他未知错误
clientid消息目的 Client ID
username消息目的用户名
peername终端的 IPAddress 和 Port
socknameemqx 监听的 IPAddress 和 Port
disconnected_at终端连接断开时间 (单位:毫秒)
disconn_propsDISCONNECT Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (单位:毫秒)
node事件触发所在节点

示例

SELECT
  clientid,
  username,
  reason,
  disconnected_at,
  node
FROM
  "$events/client_disconnected"
1
2
3
4
5
6
7
8

输出

{
  "username": "u_emqx",
  "reason": "normal",
  "node": "emqx@127.0.0.1",
  "disconnected_at": 1645003578536,
  "clientid": "c_emqx"
}
1
2
3
4
5
6
7

# 连接确认事件 ("$events/client_connack")

当服务端向客户端发送CONNACK报文时触发规则,reason_code 包含各种错误原因代码

字段解释
reason_code各种原因代码
clientid消息目的 Client ID
username消息目的用户名
peername终端的 IPAddress 和 Port
socknameemqx 监听的 IPAddress 和 Port
proto_name协议名字
proto_ver协议版本
keepaliveMQTT 保活间隔
clean_startMQTT clean_start
expiry_intervalMQTT Session 过期时间
conn_propsCONNECT Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (ms)
node事件触发所在节点

MQTT v5.0 协议将返回码重命名为原因码,增加了一个原因码来指示更多类型的错误(Reason code and ACK - MQTT 5.0 new features (opens new window))。 因此reason_code 在MQTT v3.1.1与MQTT v5.0中有很大的不同。

MQTT v3.1.1

reason_code描述
connection_accepted已接受连接
unacceptable_protocol_version服务器不支持客户端请求的 MQTT 协议
client_identifier_not_valid客户端 ID 是正确的 UTF-8 字符串,但服务器不允许
server_unavaliable网络连接已建立,但 MQTT 服务不可用
malformed_username_or_password用户名或密码中的数据格式错误
unauthorized_client客户端连接未授权

MQTT v5.0

reason_code描述
success连接成功
unspecified_error未指定的错误
malformed_packet畸形数据包
protocol_error协议错误
implementation_specific_error实现特定错误
unsupported_protocol_version不支持的协议版本
client_identifier_not_valid客户端标识符无效
bad_username_or_password错误的用户名或密码
not_authorized未经授权
server_unavailable服务器无法使用
server_busy服务器繁忙
banned禁止访问
bad_authentication_method错误的身份验证方法
topic_name_invalid主题名称无效
packet_too_large数据包太大
quota_exceeded超出配额
retain_not_supported不支持的retain
qos_not_supported不支持的qos
use_another_server使用另一台服务器
server_moved服务器迁移了
connection_rate_exceeded超出连接速率

示例

SELECT
  clientid,
  username,
  reason,
  node
FROM
  "$events/client_connack"
1
2
3
4
5
6
7

输出

{
  "username": "u_emqx",
  "reason": "success",
  "node": "emqx@127.0.0.1",
  "connected_at": 1645003578536,
  "clientid": "c_emqx"
}
1
2
3
4
5
6
7

# 鉴权完成事件 ("$events/client_check_authz_complete")

当客户端鉴权结束时触发规则

字段解释
clientid消息目的 Client ID
username消息目的用户名
peerhost客户端的 IPAddress
topicMQTT 主题
actionpublish or subscribe,发布或者订阅事件
resultallow or deny,鉴权完成
is_cachetrue or false,鉴权时数据的来源
is_cache为true时,鉴权数据来源于cache
is_cache为false时,鉴权数据来源于插件
timestamp事件触发时间 (ms)
node事件触发所在节点

示例

SELECT
  clientid,
  username,
  topic,
  action,
  result,
  is_cache,
  node
FROM
  "$events/client_check_authz_complete"
1
2
3
4
5
6
7
8
9
10

输出

{
  "username": "u_emqx",
  "topic": "t/a",
  "action": "publish",
  "result": "allow",
  "is_cache": "false",
  "node": "emqx@127.0.0.1",
  "clientid": "c_emqx"
}
1
2
3
4
5
6
7
8
9

# 终端订阅成功事件 ("$events/session_subscribed")

当终端订阅成功时触发规则

字段解释
clientid消息目的 Client ID
username消息目的用户名
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
sub_propsSUBSCRIBE Properties (仅适用于 5.0)
timestamp事件触发时间 (单位:毫秒)
node事件触发所在节点

示例

SELECT
  clientid,
  username,
  topic,
  qos
FROM
  "$events/session_subscribed"
1
2
3
4
5
6
7

输出

{
  "username": "u_emqx",
  "topic": "t/a",
  "qos": 1,
  "clientid": "c_emqx"
}
1
2
3
4
5
6

# 取消终端订阅成功事件 ("$events/session_unsubscribed")

当取消终端订阅成功时触发规则

字段解释
clientid消息目的 Client ID
username消息目的用户名
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
unsub_propsUNSUBSCRIBE Properties (仅适用于 5.0)
timestamp事件触发时间 (单位:毫秒)
node事件触发所在节点

示例

SELECT
  clientid,
  username,
  topic,
  qos
FROM
  "$events/session_unsubscribed"
1
2
3
4
5
6
7

输出

{
  "username": "u_emqx",
  "topic": "t/a",
  "qos": 1,
  "clientid": "c_emqx"
}
1
2
3
4
5
6