# 规则引擎 SQL 语句中可用的字段

SELECT 和 WHERE 子句可用的字段与事件的类型相关。其中 clientid, usernameevent 是通用字段,每种事件类型都有。

# 使用规则引擎 SQL 语句处理消息发布

规则引擎的 SQL 语句可以处理消息发布。 在一个规则语句中,用户可以用 FROM 子句指定一个或者多个主题,当任何消息发布到指定的主题时都会触发该规则。

字段解释
idMQTT 消息 ID
clientid消息来源 Client ID
username消息来源用户名
payloadMQTT 消息体
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
flagsMQTT 消息的 Flags
headersMQTT 消息内部与流程处理相关的额外数据
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (ms)
publish_received_atPUBLISH 消息到达 Broker 的时间 (ms)
node事件触发所在节点

示例

SELECT
  payload.msg as msg,
  clientid,
  username,
  payload,
  topic,
  qos
FROM
  "t/#"
1
2
3
4
5
6
7
8
9

输出

{
  "username": "u_emqx",
  "topic": "t/a",
  "qos": 1,
  "payload": "{\"msg\":\"hello\"}",
  "msg": "hello",
  "clientid": "c_emqx"
}
1
2
3
4
5
6
7
8

# 使用规则引擎 SQL 语句处理事件

规则引擎的 SQL 语句既可以处理消息(消息发布),也可以处理事件(客户端上下线、客户端订阅等)。对于消息,FROM 子句后面直接跟主题名;对于事件,FROM 子句后面跟事件主题。

事件消息的主题以 "$events/" 开头,比如 "$events/client_connected", "$events/session_subscribed"。 如果想让 emqx 将事件消息发布出来,可以在 emqx_rule_engine.conf 文件中配置。

# FROM 子句可用的事件主题

事件主题名释义
$events/message_delivered消息投递
$events/message_acked消息确认
$events/message_dropped消息丢弃
$events/client_connected连接完成
$events/client_disconnected连接断开
$events/client_connack连接确认
$events/client_check_acl_complete鉴权结果
$events/session_subscribed订阅
$events/session_unsubscribed取消订阅

# $events/message_delivered (消息投递)

当消息被放入底层socket时触发规则

字段解释
idMQTT 消息 ID
from_clientid消息来源 Client ID
from_username消息来源用户名
clientid消息目的 Client ID
username消息目的用户名
payloadMQTT 消息体
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
flagsMQTT 消息的 Flags
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (ms)
publish_received_atPUBLISH 消息到达 Broker 的时间 (ms)
node事件触发所在节点

示例

SELECT
  from_clientid,
  from_username,
  topic,
  qos,
  node,
  timestamp
FROM
  "$events/message_delivered"
1
2
3
4
5
6
7
8
9

输出

{
  "topic": "t/a",
  "timestamp": 1645002753259,
  "qos": 1,
  "node": "emqx@127.0.0.1",
  "from_username": "u_emqx_1",
  "from_clientid": "c_emqx_1"
}
1
2
3
4
5
6
7
8

# $events/message_acked (消息确认)

当消息发送到客户端,并收到客户端回复的ack时触发规则,仅QOS1,QOS2会触发

字段解释
idMQTT 消息 ID
from_clientid消息来源 Client ID
from_username消息来源用户名
clientid消息目的 Client ID
username消息目的用户名
payloadMQTT 消息体
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
flagsMQTT 消息的 Flags
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
puback_propsPUBACK Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (ms)
publish_received_atPUBLISH 消息到达 Broker 的时间 (ms)
node事件触发所在节点

示例

SELECT
  from_clientid,
  from_username,
  topic,
  qos,
  node,
  timestamp
FROM
  "$events/message_acked"
1
2
3
4
5
6
7
8
9

输出

{
  "topic": "t/a",
  "timestamp": 1645002965664,
  "qos": 1,
  "node": "emqx@127.0.0.1",
  "from_username": "u_emqx_1",
  "from_clientid": "c_emqx_1"
}
1
2
3
4
5
6
7
8

# $events/message_dropped (消息在转发的过程中被丢弃)

当一条消息无任何订阅者时触发规则

字段解释
idMQTT 消息 ID
reason消息丢弃原因,可能的原因:
no_subscribers: 没有订阅者
clientid消息来源 Client ID
username消息来源用户名
payloadMQTT 消息体
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
flagsMQTT 消息的 Flags
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (ms)
publish_received_atPUBLISH 消息到达 Broker 的时间 (ms)
node事件触发所在节点

示例

SELECT
  reason,
  topic,
  qos,
  node,
  timestamp
FROM
  "$events/message_dropped"
1
2
3
4
5
6
7
8

输出

{
  "topic": "t/a",
  "timestamp": 1645003103004,
  "reason": "no_subscribers",
  "qos": 1,
  "node": "emqx@127.0.0.1"
}
1
2
3
4
5
6
7

# $events/delivery_dropped (消息在投递的过程中被丢弃)

当订阅者的消息队列已满时触发规则

字段解释
idMQTT 消息 ID
reason消息丢弃原因,可能的原因:
queue_full: 消息队列已满(QoS>0)
no_local: 不允许客户端接收自己发布的消息
expired: 消息或者会话过期
qos0_msg: QoS0 的消息因为消息队列已满被丢弃
from_clientid消息来源 Client ID
from_username消息来源用户名
clientid消息目的 Client ID
username消息目的用户名
payloadMQTT 消息体
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
flagsMQTT 消息的 Flags
pub_propsPUBLISH Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (ms)
publish_received_atPUBLISH 消息到达 Broker 的时间 (ms)
node事件触发所在节点

示例

SELECT
  from_clientid,
  from_username,
  reason,
  topic,
  qos
FROM "$events/delivery_dropped"
1
2
3
4
5
6
7

输出

{
  "topic": "t/a",
  "reason": "queue_full",
  "qos": 1,
  "from_username": "u_emqx_1",
  "from_clientid": "c_emqx_1"
}
1
2
3
4
5
6
7

# $events/client_connected (终端连接成功)

当终端连接成功时触发规则

字段解释
clientid消息目的 Client ID
username消息目的用户名
mountpoint主题挂载点(主题前缀)
peername终端的 IPAddress 和 Port
socknameemqx 监听的 IPAddress 和 Port
proto_name协议名字
proto_ver协议版本
keepaliveMQTT 保活间隔
clean_startMQTT clean_start
expiry_intervalMQTT Session 过期时间
is_bridge是否为 MQTT bridge 连接
connected_at终端连接完成时间 (s)
conn_propsCONNECT Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (ms)
node事件触发所在节点

示例

SELECT
  clientid,
  username,
  keepalive,
  is_bridge
FROM
  "$events/client_connected"
1
2
3
4
5
6
7

输出

{
  "username": "u_emqx",
  "keepalive": 60,
  "is_bridge": false,
  "clientid": "c_emqx"
}
1
2
3
4
5
6

# $events/client_disconnected (终端连接断开)

当终端连接断开时触发规则

字段解释
reason终端连接断开原因:
normal:客户端主动断开
kicked:服务端踢出,通过 REST API
keepalive_timeout: keepalive 超时
not_authorized: 认证失败,或者 acl_nomatch = disconnect 时没有权限的 Pub/Sub 会主动断开客户端
tcp_closed: 对端关闭了网络连接
internal_error: 畸形报文或其他未知错误
clientid消息目的 Client ID
username消息目的用户名
peername终端的 IPAddress 和 Port
socknameemqx 监听的 IPAddress 和 Port
disconnected_at终端连接断开时间 (s)
disconn_propsDISCONNECT Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (ms)
node事件触发所在节点

示例

SELECT
  clientid,
  username,
  reason,
  disconnected_at,
  node
FROM
  "$events/client_disconnected"
1
2
3
4
5
6
7
8

输出

{
  "username": "u_emqx",
  "reason": "normal",
  "node": "emqx@127.0.0.1",
  "disconnected_at": 1645003578536,
  "clientid": "c_emqx"
}
1
2
3
4
5
6
7

# $events/client_connack (连接确认)

当服务端向客户端发送CONNACK报文时触发规则, reason_code 包含各种错误原因代码

字段解释
reason_code各种原因代码
clientid消息目的 Client ID
username消息目的用户名
peername终端的 IPAddress 和 Port
socknameemqx 监听的 IPAddress 和 Port
proto_name协议名字
proto_ver协议版本
keepaliveMQTT 保活间隔
clean_startMQTT clean_start
expiry_intervalMQTT Session 过期时间
conn_propsCONNECT Properties (仅适用于 MQTT 5.0)
timestamp事件触发时间 (ms)
node事件触发所在节点

MQTT v5.0 协议将返回码重命名为原因码,增加了一个原因码来指示更多类型的错误(Reason code and ACK - MQTT 5.0 new features (opens new window))。 因此reason_code 在MQTT v3.1.1与MQTT v5.0中有很大的不同。

MQTT v3.1.1

reason_code描述
connection_accepted已接受连接
unacceptable_protocol_version服务器不支持客户端请求的 MQTT 协议
client_identifier_not_valid客户端 ID 是正确的 UTF-8 字符串,但服务器不允许
server_unavaliable网络连接已建立,但 MQTT 服务不可用
malformed_username_or_password用户名或密码中的数据格式错误
unauthorized_client客户端连接未授权

MQTT v5.0

reason_code描述
success连接成功
unspecified_error未指定的错误
malformed_packet畸形数据包
protocol_error协议错误
implementation_specific_error实现特定错误
unsupported_protocol_version不支持的协议版本
client_identifier_not_valid客户端标识符无效
bad_username_or_password错误的用户名或密码
not_authorized未经授权
server_unavailable服务器无法使用
server_busy服务器繁忙
banned禁止访问
bad_authentication_method错误的身份验证方法
topic_name_invalid主题名称无效
packet_too_large数据包太大
quota_exceeded超出配额
retain_not_supported不支持的retain
qos_not_supported不支持的qos
use_another_server使用另一台服务器
server_moved服务器迁移了
connection_rate_exceeded超出连接速率

示例

SELECT
  clientid,
  username,
  reason,
  node
FROM
  "$events/client_connack"
1
2
3
4
5
6
7

输出

{
  "username": "u_emqx",
  "reason": "success",
  "node": "emqx@127.0.0.1",
  "connected_at": 1645003578536,
  "clientid": "c_emqx"
}
1
2
3
4
5
6
7

# $events/client_check_acl_complete (鉴权结果)

当客户端鉴权结束时触发规则

字段解释
clientid消息目的 Client ID
username消息目的用户名
peerhost客户端的 IPAddress
topicMQTT 主题
actionpublish or subscribe, 发布或者订阅事件
resultallow or deny,鉴权结果
is_cachetrue or false,鉴权时数据的来源
is_cache为true时,鉴权数据来源于cache
is_cache为false时,鉴权数据来源于插件
timestamp事件触发时间 (ms)
node事件触发所在节点

示例

SELECT
  clientid,
  username,
  topic,
  action,
  result,
  is_cache,
  node
FROM
  "$events/client_check_acl_complete"
1
2
3
4
5
6
7
8
9
10

输出

{
  "username": "u_emqx",
  "topic": "t/a",
  "action": "publish",
  "result": "allow",
  "is_cache": "false",
  "node": "emqx@127.0.0.1",
  "clientid": "c_emqx"
}
1
2
3
4
5
6
7
8
9

# $events/session_subscribed (终端订阅成功)

当终端订阅成功时触发规则

字段解释
clientid消息目的 Client ID
username消息目的用户名
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
sub_propsSUBSCRIBE Properties (仅适用于 5.0)
timestamp事件触发时间 (ms)
node事件触发所在节点

示例

SELECT
  clientid,
  username,
  topic,
  qos
FROM
  "$events/session_subscribed"
1
2
3
4
5
6
7

输出

{
  "username": "u_emqx",
  "topic": "t/a",
  "qos": 1,
  "clientid": "c_emqx"
}
1
2
3
4
5
6

# $events/session_unsubscribed (取消终端订阅成功)

当取消终端订阅成功时触发规则

字段解释
clientid消息目的 Client ID
username消息目的用户名
peerhost客户端的 IPAddress
topicMQTT 主题
qosMQTT 消息的 QoS
unsub_propsUNSUBSCRIBE Properties (仅适用于 5.0)
timestamp事件触发时间 (ms)
node事件触发所在节点

示例

SELECT
  clientid,
  username,
  topic,
  qos
FROM
  "$events/session_unsubscribed"
1
2
3
4
5
6
7

输出

{
  "username": "u_emqx",
  "topic": "t/a",
  "qos": 1,
  "clientid": "c_emqx"
}
1
2
3
4
5
6

下一部分,规则引擎内置函数