EMQ X Rule Engine Series - Store Messages to InfluxDB Time Series Database


InfluxDB is an open source database for storing and analyzing time series data, with built-in HTTP API, and the support for SQL-like statements and unstructured features are very friendly for users. Its powerful data throughput and stable performance make it ideal for the IoT area.

With the EMQ X messaging engine, we can customize the Template file and then convert the Json-formatted MQTT message into Measurement to write to InfluxDB:


Introduction of Scenario

In this scenario, it is required to store the messages that meet the criteria under EMQ X in the InfluxDB time series database. In order to facilitate subsequent analysis and retrieval, the message content needs to be split for storage.

The data reported by the device in this scenario is as follows:

  • Topic:data/sensor

  • Payload:

    "location": "bedroom",
    "data": {
      "temperature": 25,
      "humidity": 46.4,
      "pm2_5": 0.5


Database installation and initialization

Create a db database and open the 8089 UDP port.

$ docker pull influxdb

$ git clone -b v1.0.0 https://github.com/palkan/influx_udp.git

$ cd influx_udp

$ docker run --name=influxdb --rm -d -p 8086:8086 -p 8089:8089/udp -v ${PWD}/files/influxdb.conf:/etc/influxdb/influxdb.conf:ro -e INFLUXDB_DB=db influxdb:latest

Configuration instructions

Create a resource

Open EMQ X Dashboard, go to the Resources page on the left menu, click the New button, type MySQL server information for resource creation, select the InfluxDB resource type and complete the relevant configuration for resource creation.


Create a rule

Go to the Rules page on the left menu and click the New button to create the rule. Select the trigger event message.publish , which is triggered when the message is published for data processing.

After selecting the trigger event, we can see the optional fields and sample SQL on the interface:


Filter the required fields

The rules engine uses SQL statements to filter and process data. For example, in the scenario mentioned above, we need to extract the fields in payload, which can be implemented by payload.<fieldName>. At the same time we only expect to handle the data/sensor topic, then we can use the topic wildcard =~ to filter the topic in the WHERE clause: topic =~ 'data/sensor', and finally we get the SQL as follows:

  payload.location as location,
  payload.data.temperature as temperature,
  payload.data.humidity as humidity,
  payload.data.pm2_5 as pm2_5
    topic =~ 'data/sensor'

SQL Test

With the SQL test function, we can quickly confirm whether the SQL statement just filled in can achieve our goal. We firstly fill in the payload and other data for testing as follows:


Then click the Test button and get the following output, which is as expected.

  "humidity": 46.4,
  "location": "bedroom",
  "pm2_5": 0.5,
  "temperature": 25

Add a response action and store the message to InfluxDB

After the input and output of SQL condition is correct, we continue to add the corresponding action, configure to write SQL statement, and store the filtered result in MySQL.

Click the Add button in the response action, select action of Save Data to InfluxDB, select the InfluxDB resource just created, and then fill the ${fieldName} into Field Keys according to actual needs. In Tag Keys and Timestamp Key, Measurement represents the Measurement used when writing data to InfluxDB. Finally, click the New button to complete the rule creation.



Expected result

We successfully created a rule that contains a processing action, and expected result of the action is as follows:

  1. When the client reports a message to the data/sensor topic, it will hit the rule, and the number of hit in the rule list is increased by 1;
  2. A piece of data will be added to the db database in InfluxDB, and the data content is consistent with the processed message content

Test with the Websocket tool in Dashboard

Switch to the Tools --> Websocket page, connect to EMQ X with any Client ID, and send the following message in the Message card after the connection is successful:

  • Topic:data/sensor

  • Payload:

    "location": "bedroom",
    "data": {
      "temperature": 25,
      "humidity": 46.4,
      "pm2_5": 0.5


Click the Send button. After the transmission succeeds, you can see that number of hits for current rule has changed to 1.

Then check InfluxDB and see if the new data point is added successfully:

$ docker exec -it influxdb influx

> use db
Using database db
> select * from "sensor_data"
name: sensor_data
time                humidity location pm2_5 temperature
----                -------- -------- ----- -----------
1561535778444457348 46.4     bedroom  0.5   25

So far, we have implemented the business development of using the rules engine to store messages to InfluxDB .

Before reading this tutorial, assume that you already know simple knowledge about MQTT,EMQ X .

The streaming database built for IoT data storage and real-time processing.

Fully managed MQTT 5.0 IoT cloud, start a 180-day free trial.

Related Links

MQTT X v1.3.0 was officially released - Cross-platform MQTT 5.0 desktop test client

[MQTT X](https://mqttx.app) is a cross-platform MQTT 5.0 desktop test client provided by the world's leading open source IoT middleware provider [EMQ](https://emqx.io) , which supports macOS, Linux, Windows. The user interface of **MQTT X** simplifies the operation logic of the page with the pattern of chatting software. Users can quickly create multiple simultaneous-online MQTT clients to test the connection/publish/subscribe functions of MQTT/TCP, MQTT/TLS and other MQTT protocol features.

Retained message and message expiration interval of EMQ X MQTT 5.0 broker

The message retention function of [EMQ X MQTT Broker](https://emqx.io) is implemented by the `emqx_retainer` plugin, which is enabled by default. By modifying the configuration of the` emqx_retainer` plugin, you can adjust the EMQ X Broker's retention message Location, restrict the number of retained messages and maximum payload length, and adjust the expiration time of retained messages.

EMQ X Newsletter 202104

In April, EMQ X 4.3-beta.5 release summed up all the fixes has to be done before 4.3.0 release. This allowed us to gradually shift our focuses towards 5.0 development.