EMQ X Enterprise + Apache Kafka Build a high-performance IoT message processing backend

2019-12-23

Background

In various types of IoT projects, the messages generated by devices not only affect devices, but also need to be used by business systems to implement functions such as security audits, traffic accounting, data statistics, and notification triggers, which are easily completed through the following prototype systems:

Artboard.png

In this prototype, multiple data channels need to be maintained on EMQ X for each business link to obtain message data from EMQ x according to their own needs. The problems with this solution are:

  • Each business needs to establish a data channel with EMQ X. The establishment and maintenance of data channels requires additional resource overhead, and the speed of data synchronization seriously affects EMQ X high-speed message exchange;
  • As business grows, every new business link needs to involve changes in the entire system;
  • Because the processing speed and timing of each link are different, some services will be blocked when there is a large amount of messages, which will further lead to serious consequences such as data loss and reduced system stability.

The above problems are highly consistent with the problems encountered in current Internet applications, which is that data integration and data synchronization issues between multiple business systems. In Internet applications, message queues are generally integrated to perform operations such as peak clipping, current limiting, and queue processing to decouple data from services. With the help of RabbitMQ, Kafka, RocketMQ, Pulsar and other message and stream middleware bridging functions provided by EMQ X, IoT projects can also use this model to solve the above problems.

This article uses common IoT usage scenarios as an example to introduce how to use EMQ X message middleware and open source flow processing platform Kafka to process massive message data of the Internet of Things, store massive data streams in a highly reliable and fault-tolerant manner and ensure the order of data streams to store message data and effectively provide message data to multiple business links at the same time.

Business scenario

Suppose there is now an intelligent door lock project, all the door locks will report the door lock information every 1 minute or any time when the door lock status changes, such as on / off, and the MQTT topic is reported as follows (QoS = 1):

devices/{client_id}/state

The format of data sent by each device is JSON, including data such as door lock power, unlocking status, operation result, etc. The content is as follows:

{
  "process_id": "7802441525528958",
  "action": "unlock",
  "battery": 83.4,
  "lock_state": 1,
  "version": 1.1,
  "client_id": "10083618796833171"
}

Each door lock subscribes to a unique topic. As a remote unlocking command, MQTT topics are issued as follows (QoS = 1):

devices/{client_id}/command

The data issued includes unlocking instruction, message encryption verification information, etc.

{
  "process_id": "7802441525528958",
  "action": "unlock",
  "nonce_str": "u7u4p0n8",
  "ts": 1574744434,
  "sign": "e9f5af7deaa28563"
}

The upstream and downstream message data need to be used by the following three business links:

  • Message notification: Notify the unlocking status to the notification method (mobile phone message, email) bound by the door lock user;
  • Status monitoring: Analyze and process the status information reported when the door is locked. If the power or status is abnormal, an alarm must be triggered to notify the user;
  • Security audit: Analyze the uplink and downlink message data, record the user's unlocking behavior, and prevent attacks such as tampering and replay of downlink instructions.

In this solution, EMQ X will uniformly bridge the messages of the above topics to Kafka for usage by business systems, and decouple the business systems from EMQ X.

client_id is the ID of the door lock, which is the MQTT client ID used to connect the door lock to EMQ X.

Introduction of the plan

Kafka is an open source stream processing platform developed by the Apache Software Foundation, written in Scala and Java. The goal of the project is to provide a unified, high-throughput, low-latency platform for processing real-time data.

kafka has the following characteristics:

  • High throughput: Throughput up to hundreds of thousands and high concurrency, supporting thousands of clients to read and write at the same time;
  • Low latency: The minimum latency is only a few milliseconds, making it easy to build real-time streaming applications;
  • Data reliability: The message data is stored safely and distributed in a fault-tolerant cluster, processed in strict accordance with the queue order, and provides message transaction support to ensure data integrity and consumption reliability;
  • Cluster fault tolerance: n-1 nodes are allowed to fail in multi node replica
  • Scalability: Support cluster dynamic expansion.

In this solution, Kafka is integrated to provide a message queue and a message bus for message passing between the EMQ X message server and the application. The producer (EMQ X) only adds data to the end of the queue, and each consumer (business link) reads the data in turn and processes it on its own. This architecture considers performance and data reliability, and effectively reduces system complexity and increases system expansion. The prototype of the solution is as follows:

Artboard Copy 12.png

EMQ X Enterprise installation

Installation

If you are new to EMQ X, we recommend getting started with EMQ X Guide

Visit EMQ website to download the installation package suitable for your operating system. As data persistence is an enterprise function, you need to download EMQ X Enterprise Edition (you can apply for a license trial) . At the time of this writing, the latest version of EMQ X Enterprise Edition is v3.4.4. The startup steps for downloading the zip package are as follows:

## Extract the downloaded installation package
unzip emqx-ee-macosx-v3.4.4.zip
cd emqx

## Copy the license file to the EMQ X designated directory etc /. In terms of the license , you need to apply for a trial or obtain it through a purchase authorization
cp ../emqx.lic ./etc

## Launch EMQ X in console mode
./bin/emqx console

Modify configuration

The configuration files needed in this article are as follows:

  1. License file, EMQ X Enterprise License file, covered with available licenses:
etc/emqx.lic
  1. EMQ x Kafka message storage plug-in configuration file is used to configure Kafka connection information and data bridging topics:
etc/plugins/emqx_bridge_kafka.conf

Fill in the plug-in configuration information as follows according to the actual deployment situation. For other configuration items, please read the configuration file carefully and make adjustments or use the default configuration directly.

## Connection address
bridge.kafka.servers = 127.0.0.1:9092

## The hooks that need to be processed are used for message transmission by QoS 1, and ACK hooks can be used
## Comment other unrelated events and messages hooks

## bridge.kafka.hook.client.connected.1     = {"topic":"client_connected"}
## bridge.kafka.hook.client.disconnected.1  = {"topic":"client_disconnected"}
## bridge.kafka.hook.session.subscribed.1   = {"filter":"#", "topic":"session_subscribed"}
## bridge.kafka.hook.session.unsubscribed.1 = {"filter":"#", "topic":"session_unsubscribed"}
## bridge.kafka.hook.message.deliver.1      = {"filter":"#", "topic":"message_deliver"}

## filter is the MQTT topic to be processed, and topoc is the Kafka topic to be written
## Register multiple Hooks for upstream and downstream message processing

## Select publish hooks for reporting instructions
bridge.kafka.hook.message.publish.1        = {"filter":"devices/+/state", "topic":"message_state"}

## Select acked hooks as issue instructions to ensure that messages arrive before they are stored
bridge.kafka.hook.message.acked.1       = {"filter":"devices/+/command", "topic":"message_command"}

Kafka installation and initialization

Install Kafka through Docker, and map the data 9092 port for connection. Kafka depends on Zookeeper. The complete installation command is provided below:

## Install Zookeeper
docker run -d --name zookeeper -p 2181 -t wurstmeister/zookeeper

## Install and configure Kafka
docker run -d --name kafka --publish 9092:9092 \
        --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
        --env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 \
        --env KAFKA_ADVERTISED_PORT=9092 \
        wurstmeister/kafka:latest

Create the required topic in Kafka in advance:

## Enter Kafka Docker container
docker exec -it kafka bash

## Upstream data topic message_state
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic message_state

## Downstream data topic message_command
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic message_command

At this point, you can restart EMQ X and launch the plugin to apply the above configuration:

./bin/emqx stop

./bin/emqx start

## Or use console mode for more information
./bin/emqx console

## Launch plugin
./bin/emqx_ctl plugins load emqx_bridge_kafka

## After successful startup, there will be the following prompt
Plugin load emqx_bridge_kafka loaded successfully.

Mock test

Use kafka-console-consumer to start consumption

The detailed implementation of the three business links in this solution will not be described in detail in this article. This article only needs to ensure that messages are written to Kafka. You can use Kafka's own consumption commands to view the data in the topic:

## Enter Kafka Docker container
docker exec -it kafka bash

## Upstream data topic
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic message_state --from-beginning

## Open another window to view the topic of downstream data
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic message_command --from-beginning

After the command is executed successfully, it will block and wait for the data of the topic to be consumed. We will continue the following operations.

Sending and receiving simulation of test data

Through the WebSocket tool in the EMQ X Management Console, you can simulate smart door lock up / downstream business data. Opens http: //127.0.0.1: 1883 by the browser to enter EMQ X management console, open Tool -> WebSocket function, enter connection information to establish MQTT connection to simulate door lock device . Client ID in the connection information is designated according to the business designation. In this article, we uses 10083618796833171.

Subscribe to Downstream Control Topics

According to business needs, you need to subscribe to the door lock exclusive downstream control topic devices / {client_id} / command, here you need to subscribe to the topicdevices / 10083618796833171 / command and set QoS = 1 :

image20191126150024089.png

Simulate issue instructions

Send unlocking command to the door lock control topic devices / {client {ID} / command. Here the data is:

  • Topic: devices/10083618796833171/command

  • QoS:1

  • payload:

    {
    "process_id": "7802441525528958",
    "action": "unlock",
    "nonce_str": "u7u4p0n8",
    "ts": 1574744434,
    "sign": "e9f5af7deaa28563"
    }
    

After successful issuing, a message can be received in the publish interface of the management console:

image20191126150044511.png

At the same time, the Kafka message_command topic consumer will receive one or more messages (the number of EMQ x ack hooks triggers is subject to the actual number of message clients ). The message is in JSON format. The formatted content is as follows :

{
  "client_id": "10083618796833171",
  "username": "",
  "from": "10083618796833171",
  "topic": "devices/10083618796833171/command",
  "payload": "eyAgICJwcm9jZXNzX2lkIjogIjc4MDI0NDE1MjU1Mjg5NTgiLCAgICJhY3Rpb24iOiAidW5sb2NrIiwgICAibm9uY2Vfc3RyIjogInU3dTRwMG44IiwgICAidHMiOiAxNTc0NzQ0NDM0LCAgICJzaWduIjogImU5ZjVhZjdkZWFhMjg1NjMiIH0=",
  "qos": 1,
  "node": "emqx@127.0.0.1",
  "ts": 1574751635845
}

This message contains MQTT receiving / publishing client information and Base64 encoded Payload data:

  • client_id: receiving client client_id
  • username: accepting client username
  • from: Publish client client_id
  • topic: message publishing target topic
  • payload: Base64 encoded message payload
  • qos: message QoS
  • node: message processing node
  • ts: hooks trigger timestamps in milliseconds

Simulation reporting status

Send status data to the door lock control topic devices / {client {ID} / state. The data published here is:

  • Topic:devices/10083618796833171/state

  • QoS: 1

  • payload:

    {
    "process_id": "7802441525528958",
    "action": "unlock",
    "battery": 83.4,
    "lock_state": 1,
    "version": 1.1,
    "client_id": "10083618796833171"
    }
    

After successful reporting, Kafka message_state consumers will receive a message (the number of EMQ x publish hooks triggers is related to the message publishing, regardless of whether the message subject is subscribed and the number of subscriptions). The message is in JSON format, and the content is formatted as follows:

{
  "client_id": "10083618796833171",
  "username": "",
  "topic": "devices/10083618796833171/state",
  "payload": "eyAgICJwcm9jZXNzX2lkIjogIjc4MDI0NDE1MjU1Mjg5NTgiLCAgICJhY3Rpb24iOiAidW5sb2NrIiwgICAiYmF0dGVyeSI6IDgzLjQsICAgImxvY2tfc3RhdGUiOiAxLCAgICJ2ZXJzaW9uIjogMS4xLCAgICJjbGllbnRfaWQiOiAiMTAwODM2MTg3OTY4MzMxNzEiIH0=",
  "qos": 1,
  "node": "emqx@127.0.0.1",
  "ts": 1574753026269
}

This message only contains MQTT publishing client information and Base64 encoded Payload data:

  • client_id: publishing client client_id
  • username: publishing client username
  • topic: message publishing target topic
  • payload: Base64 encoded message payload
  • qos: message QoS
  • node: message processing node
  • ts: hooks trigger timestamps in milliseconds

At this point, we have successfully completed all the steps of EMQ X bridging messages to Kafka. After accessing Kafka, the business system can make business judgments based on the number of messages consumed, the client_id of the message publisher / subscriber, and the content of the message payload to achieve the required business functions.

Performance Test

If readers are interested in the performance of this solution, they can use the MQTT-JMeter plugin to test it. It should be noted that the reader needs to ensure that the optimizations and configurations related to EMQ cluster, Kafka cluster, Kafka consumers, and JMeter test cluster are done during the performance test process to obtain the correct and optimal performance test results under the relevant configuration.

Summary

Through this article, readers can understand the important role of EMQ X + Kafka IoT message processing solution for message communication and business processing. Using this solution, you can build a loosely coupled, high-performance, high-fault-tolerant IoT message processing platform to achieve efficient data processing safely.

The code of this article implements specific business logic, and readers can expand it according to the business prototype and system architecture provided in this paper. Because the integrated architecture in the IoT project in the message/stream processing of RabbitMQ, RocketMQ, Pulsar which has been supported by EMQ X is similar to Kafka, readers can also use this article as a reference to freely select relevant components for solution integration according to their own technology stack.