# Rule SQL Syntax

EMQX uses SQL-based rules for data extraction, filtering, enriching, and transformation in real-time. The EMQX rule syntax also supports embedding JQ programs (opens new window) in expressions, which allows you to do complex data transformations when it is needed.

image

EMQX has provided a rich set of built-in functions that you can access by clicking Data Integration -> Rules -> SQL Example on EMQX Dashboard. For more customized needs, EMQX also supports creating your own SQL-like statements. This section will introduce the SQL-like language.

This SQL-like syntax has two types of statements: SELECT and FOREACH. Each rule can have exactly one statement.

StatementDescription
SELECTFor situations where the result of the SQL statement is a single message.
FOREACHFor producing zero or more messages from a single input message.

The rule syntax also supports complex expressions that can be embedded within the SELECT and FOREACH statements.

For the fields that can be referenced in the SELECT and FOREACH statements, see Data Sources and Fields.

# The SELECT Statement

The rule engine SQL in EMQX uses a SELECT statement to select specific fields from an input message, rename fields, transform data, and filter messages based on conditions.

The basic format of a SELECT statement in the rule engine SQL is as follows:

SELECT <fields_expressions> FROM <topic> [WHERE <conditions>]
1

You can use the SELECT clause to specify which fields (from both message payload and metadata) to include in the output, and use the WHERE clause to filter messages based on specific conditions

# The SELECT Clause

By using the SELECT clause, you can specify the data source for your query. You can choose to select data from specific topics or events that match a certain condition.

# Select by Topics

You can use the SELECT clause to filter the fields to include in the output message.

For example, if you want to defines a rule applies to all messages published to topics matching the pattern t/# and my/other/topic, you can work with the statement below:

SELECT clientid, payload.clientid as myclientid FROM "t/#", "my/other/topic"
1

Where,

  • the SELECT clause is to specify the fields to be included in the output

    • clientid: is the client ID in the metadata

    • payload.clientid: is the client ID in the message payload, here the payload.clientid syntax to distinguise it from that in the meta

      • the asclause will renams the payload.clientid field as myclientid to prevent any naming conflicts.
  • the as clause is to set the data source, topic t/# and my/other/topic in this case.

TIP

You can find all available event topics in EMQX Dashboard for editing rules (Subscriptions -> Topics).

# Select by Events

You can also attach rules to events. For example, if you want to selects the IP address and port number of when client c1 initiates a connection request to EMQX, you can use the statement below:

SELECT peername as ip_port FROM "$events/client_connected" WHERE clientid = 'c1'
1

# The WHERE clause

The WHERE clause provides an optional way to refine message filtering by specifying additional conditions that the messages must meet, in addition to the topic/event filter specified in the FROM clause.

For instance, the following SQL statement filters messages under topic t/# that are sent by the user name eric:

SELECT * FROM "t/#" WHERE username = 'eric'
1

TIP

The fields used in the WHERE clause must be one field available in the message metadata or payload, or there will be an error.

# Work with Expressions

Expressions can also be used to transform data in the SELECT and WHERE clause. For example, the following SQL statement formats the clientid field value by converting it to uppercase and adding a suffix. The result is named cid in the output message:

SELECT (upper(clientid) + '_UPPERCASE_LETTERS') as cid FROM "t/#"
1

The following showcase the use of a parenthesized arithmetic expression to transform data:

SELECT (payload.integer_field + 2) * 2 as num FROM "t/#"
1

You can also use dot notation to access fields in a payload with a complex structure:

SELECT payload.a.b.c.deep as my_field FROM "t/#"
1

# The FOREACH Statement

The FOREACH statement can be seen as a more general form of the SELECT statement. It can produce zero or more output messages for each input message. You can use the FOREACH statement to filter data based on specific conditions and output the results to different MQTT topics or data storage.

Besides the FROM and WHERE clause, this FOREACH statement has two more types of clauses:

ClauseOptional/RequiredDescription
DOOptionalTo transform each element in the array selected by FOREACH

Correspond to the SELECT clause in the SELECT statement and accepts the same expressions
INCASEOptionalTo filter out array elements that do not match the specified conditions.

Accepts the same expressions as the WHERE clause

The basic format of a FOREACH statement in the rule engine SQL is as follows:

FOREACH <expression_that_evaluates_to_array> [as <name>]
[DO <fields_expressions>]
[INCASE <condition>]
FROM <topic>
[WHERE <condition>]
1
2
3
4
5

TIP

As all but the FOREACH clause have corresponding clauses in the SELECT statement, the FOREACH statement can be seen as a generalization of the SELECT statement as mentioned earlier. The following two statements are equivalent:

FOREACH jq('.', payload) 
DO item.field_1, item.field_2 
FROM "t/#"
1
2
3
SELECT payload.field_1, payload.field_2
FROM "t/#"
1
2

Below is an example to show how to use the FOREACH statement to output 2 values. Both values contain only one field called value. The value of the field value is the value of the field field_1 in one of the messages and the value of field_2 in the other message:

FOREACH jq('[.field_1, .field_2]', payload) 
DO item as value
FROM "t/#"
1
2
3

The FOREACH statement requires input data to be in an array format. If the input message already contains an array, you can directly apply the FOREACH statement.

For example, for messages published to topics t/#, if you want to output the timestamp, client ID, sensor name, and index when the sensor idx is 1 or above, you can use the code below:

FOREACH
    ## The data must be an array
    payload.sensors as sensor  
DO  ## The Do clause is used to select fields to the output message
    payload.timestamp,
    payload.client_id,
    upper(sensor.name) as name,
    sensor.idx as idx
INCASE
    sensor.idx >= 1
FROM "t/#"
1
2
3
4
5
6
7
8
9
10
11

Where,

  • the FOREACH clause specifies the sensors field in the input message's payload as the array to iterate over
  • the DO clause specifies the fields to be included in the output:
    • payload.timestamp is the time stamp from the message payload
    • payload.clientid is the client ID from the message payload,
    • sensor.name will be capitalized with the build-in upper function and renamed as name with the as clause
    • sensor.idx will be renamed as idx with the as clause
  • the INCASE clause adds another filter condition, when the sensor.idx value is 1 and above.
  • the FROM clause sets where to retrieve the messages.

If the input message does not contain an array, you can use the jq function to wrap the payload in an array, for example, in the code below.

FOREACH jq('.', payload) 
DO item.field_1, item.field_2 
FROM "t/#"
1
2
3

EMQX supports using the jq function for advanced operations, you can refer to the documentation page for the build-in jq function for more code examples.

After creating your rules, it's always recommended to test your rules before putting them into production. The Dashboard UI contains a test feature that allows you to test your rules with sample messages. For details on how to test the SQL statements, see Test the Rule.

# Expressions and Operations

EMQX rule syntax allows using expressions to transform data and filter messages, which can be used in various clauses, including SELECT, FOREACH, DO, INCASE, and WHERE. This section offers more information on using these expressions. The following are the operations that can be used to form expressions, and remember that there is a wide range of built-in functions that can also be used in expressions.

# Arithmetic Operations

FunctionPurposeReturned value
+addition, or string concatenationSum, or concatenated string
-SubtractionDifference
*multiplicationproduct
/divisionQuotient
divInteger divisionInteger quotient
modmodulusmodule

# Logical Operations

FunctionPurposeReturned Value
>greater thantrue/false
<less thantrue/false
<=less than or equaltrue/false
>=greater than or equaltrue/false
<>not equaltrue/false
!=not equaltrue/false
=Check if the two operands are completely equal. It can be used to compare valuestrue/false
=~Check if a topic can match the topic filter. It can only be used for topic matchingtrue/false
andlogical andtrue/false
orlogical ortrue/false

# CASE Expressions

The CASE expression can be used to perform conditional operations. A case expression corresponds to an if-then-else statement in other languages. How to use the CASE expression is illustrated by the following example.

SELECT
  CASE WHEN payload.x < 0 THEN 0
       WHEN payload.x > 7 THEN 7
       ELSE payload.x
  END as x
FROM "t/#"
1
2
3
4
5
6

Suppose the message is:

{"x": 8}
1

Then the output will be:

{"x": 7}
1

# More Examples

# Examples of SELECT Statements

  • Extract all fields from the messages with the topic "t/a":
    SELECT * FROM "t/a"
    
    1
  • Extract all fields from the messages with the topics "t/a" or "t/b":
    SELECT * FROM "t/a","t/b"
    
    1
  • Extract all fields from the message with a topic that matches 't/#'.
    SELECT * FROM "t/#"
    
    1
  • Extract the qos, username, and clientid fields from the input message with a topic that matches 't/#' (the output message will have a payload with the fields qos, username, and clientid):
    SELECT qos, username, clientid FROM "t/#"
    
    1
  • Extract the username field from any message with a payload field named username with the value 'Steven' (it is not recommended to use the topic filter (opens new window) '#' in the FROM clause as this means that the rule has to be checked for all messages that are sent to EMQX):
    SELECT username FROM "#" WHERE username='Steven'
    
    1
  • Extract the x field from the payload of the input message and rename the field to y in the output message. The new alias y for payload.x can also be used in the WHERE clause. A rule with this SQL statement matches messages with the payload {"x": 1} but not messages with the payload {"x": 2}:
    SELECT payload.x as x FROM "tests/test_topic_1" WHERE y = 1
    
    1
  • This SQL statement matches messages with the payload {"x": {"y": 1}} (and for example {"x": {"y": 1}, "other": "field}):
    SELECT * FROM "#" WHERE payload.x.y = 1
    
    1
  • If an MQTT client with clientid = 'c1' connected, extract its source IP address and port number:
    SELECT peername as ip_port FROM "$events/client_connected" WHERE clientid = 'c1'
    
    1
  • Matches all subscriptions to topics that matches the pattern 't/topic' and have a quality of service (QoS) level of 1. It extracts the clientid to the output message.
    SELECT clientid FROM "$events/session_subscribed" WHERE topic = 'my/topic' and qos = 1
    
    1
  • Similar to the above example but here the topic match operator =~ is used to match the topic filter (opens new window) 't/#':
    SELECT clientid FROM "$events/session_subscribed" WHERE topic =~ 't/#' and qos = 1
    
    1
  • Extract the User Property with Key "foo" (User properties is new in the MQTT 5.0 protocol so this is not relevant for older MQTT versions):
    SELECT pub_props.'User-Property'.foo as foo FROM "t/#"
    
    1

TIP

  • Topics in the FROM clause need to be enclosed in double quotes ("").

  • The WHERE clause is followed by the filter condition. If a string is used in the condition, it needs to be enclosed in single quotes ('').

  • If there are multiple topics in the FROM clause, they need to be separated by commas ",". For example, SELECT * FROM "topic1", "topic2" WHERE topic1.id = topic2.id.

    SELECT * FROM "t/1", "t/2".
    
    1
  • You can use the period symbol (.) to access inner fields of the payload. For example, if the payload is a nested JSON structure, you can use payload.outer_field.inner_field to access the inner_field of the outer_field.

# Examples of FOREACH Statements

Suppose there are messages with client ID c_steve coming to the topic t/1. The message body is in JSON format, and the sensors field is an array containing multiple objects as is shown in the following example:

{
    "date": "2020-04-24",
    "sensors": [
        {"name": "a", "idx":0},
        {"name": "b", "idx":1},
        {"name": "c", "idx":2}
    ]
}
1
2
3
4
5
6
7
8

# Example 1

In this example it is required that each object in sensors array is re-published to the topic sensors/${idx} (where index is taken from the object) with the content of ${name} (where name is taken from the object). That means that for for the example input given above the rule engine will issue the following three messages:

  1. Topic: sensors/0 Content: a
  2. Topic: sensors/1 Content: b
  3. Topic: sensors/2 Content: c

For the rule in this example, we need to configure the following action:

  • Action type: message republish
  • Target topic: sensors/${idx}
  • Target QoS: 2
  • Message content template: ${name}

And the following SQL statement:

FOREACH
    payload.sensors
FROM "t/#"
1
2
3

In the SQL statement above, the FOREACH clause specifies the array sensors that needs to be traversed. The FOREACH statement will perform a "message republish" action for each object in the result array, so the republish action will be performed three times.

# Example 2

In this example, it is required that each object in the sensors array with a value for the id field which is greater than or equal to 1 is re-published to the topic sensors/${idx} with the content clientid=${clientid},name=${name},date=${date}. This means that the rule will issue two messages when given the example input message specified above (since the array element with the id field set to zero will be filtered out).

  1. Topic: sensors/1 Content: clientid=c_steve,name=b,date=2023-04-24
  2. Topic: sensors/2 Content: clientid=c_steve,name=c,date=2023-04-24

For the rule in this example, we need to configure the following action:

  • Action type: message republish
  • Target topic: sensors/${idx}
  • Target QoS: 2
  • Message content template: clientid=${clientid},name=${name},date=${date}

And the following SQL statement:

FOREACH
    payload.sensors
DO
    clientid,
    item.name as name,
    item.idx as idx
INCASE
    item.idx >= 1
FROM "t/#"
1
2
3
4
5
6
7
8
9

In the above SQL statement, the FOREACH clause specifies that the array sensors needs to be traversed. The DO clause selects the fields required for each operation. The clientid field is selected from the message meta data and name and idx are selected from the current sensor object. The name item represents the current object in the sensors array. The INCASE clause specifies a filter condition for the array objects (objects that do not match the filter will be ignored).

In DO and INCASE clauses, you can use item to access the current object, or you can customize a variable name by using the as syntax in FOREACH. So the SQL statement in this example can also be written as follows:

FOREACH
    payload.sensors as s
DO
    clientid,
    s.name as name,
    s.idx as idx
INCASE
    s.idx >= 1
FROM "t/#"
1
2
3
4
5
6
7
8
9

# Example 3

This extends Example 2 by also removing the c_ prefix of c_steve in the clientid field.

The rule engine comes with a number of built in functions can be called in the FOREACH, DO and INCASE clauses. If you want to change c_steve into steve, you can change the SQL in Example 2 into:

FOREACH
    payload.sensors as s
DO
    nth(2, tokens(clientid,'_')) as clientid,
    s.name as name,
    s.idx as idx
INCASE
    s.idx >= 1
FROM "t/#"
1
2
3
4
5
6
7
8
9

Multiple expressions can be placed in the FOREACH clause as long as the last expression specifies the array to be traversed. For example, if the input messages payload was formatted like this instead:

{
    "date": "2020-04-24",
    "data": {
        "sensors": [
            {"name": "a", "idx":0},
            {"name": "b", "idx":1},
            {"name": "c", "idx":2}
        ]
    }
}
1
2
3
4
5
6
7
8
9
10

Then the FOREACH clause can give the payload data another name before selecting the array:

FOREACH
    payload.data as d
    d.sensors as s
...
1
2
3
4

This is equivalent to:

FOREACH
    payload.data.sensors as s
...
1
2
3

This feature can be useful when you are working with payloads that are structured in complex ways.