Using Python to develop EMQ X plugins

From v4.1, EMQ X MQTT broker provides the specified plugin that supports multiple languages emqx_extension_hook. Currently, it is supported that use other programming languages to process the hook events of EMQ X. The developer can use Python or Java to quickly develop their plugins or do some expansions based on the official functions to satisfy their business scenarios. For example:

  • Verify the client's login permission: when connecting to the client, the corresponding function will be triggered and the client information will be obtained through parameters. Finally, it judges whether it has the login permission after reading the database, comparison, etc.
  • Record the online status of client and online and offline history: trigger corresponding functions when the status of the client changes, the client information will be obtained through parameters, and the online status of the client in the database will be rewritten.
  • Verify the operation permission for PUB/SUB of the client: trigger corresponding functions when publish or subscribe, and the client information and current topics will be obtained through parameters to judge whether it has the corresponding operation permission.
  • Handle session and message events, implement the subscription relation and message processing or storage: trigger corresponding functions when publish messages and status changes, the current client information, information status and message content will be forwarded to Kafka or database for storage.

Note:the message hook is only supported in the enterprise.

Python and Java drivers are based on the processes Erlang/OTP-Port to implement communication, and have very high throughput performance. This article will take Python expansion as an example to introduce how to use EMQ X cross-language expansion.

upeb67488ae758908b02ac8567c37fcf2d0a9.png

The example of using Python expansions

Requirements

  • The broker of EMQ X is required to install Python 3.6 or higher version

Steps for usage

  1. Install Python SDK through pip
  2. Adjust EMQ X configurations to ensure that the corresponding configuration items correctly point Python project
  3. Import SDK to write code

Install Python plugins

Install SDK locally through pip commands and ensure using pip3 to install:

pip3 install emqx-extension-sdk

Modify configuration

Modify the configuration of plugin emqx-extension-hook and correctly use expansion.

## Setup the supported drivers
##
## Value: python2 | python3 | java
exhook.drivers = python3

## Search path for scripts/library
exhook.drivers.python3.path = data/extension/hooks.py

## Call timeout
##
## Value: Duration
##exhook.drivers.python3.call_timeout = 5s

## Initial module name
## Your filename or module name
exhook.drivers.python3.init_module = hooks

Write code

Create file hooks.py in the directory emqx/data/extension and imporrt SDK to write business logic. The example is as follows.

## data/extension/hooks.py

from emqx_extension.hooks import EmqxHookSdk, hooks_handler
from emqx_extension.types import EMQX_CLIENTINFO_PARSE_T, EMQX_MESSAGE_PARSE_T


# inherit SDK class HookSdk 
class CustomHook(EmqxHookSdk):

    # use decorator to register hooks
    @hooks_handler()
    def on_client_connect(self,
                          conninfo: EMQX_CLIENTINFO_PARSE_T = None,
                          props: dict = None,
                          state: list = None):
        print(f'[Python SDK] [on_client_connect] {conninfo.clientid} connecte')

    @hooks_handler()
    def on_client_connected(self,
                            clientinfo: EMQX_CLIENTINFO_PARSE_T,
                            state: list = None):
        print(
            f'[Python SDK] [on_client_connected] {clientinfo.clientid} connected')

    @hooks_handler()
    def on_client_check_acl(self, clientinfo: EMQX_CLIENTINFO_PARSE_T,
                            pubsub: str,
                            topic: str,
                            result: bool,
                            state: tuple) -> bool:
        print(
            f'[Python SDK] [on_client_check_acl] {clientinfo.username} check ACL: {pubsub} {topic}')
        # when the username is empty, ACL verification failed
        if clientinfo.username == '':
            return False
        return True

    @hooks_handler()
    def on_client_authenticate(self, clientinfo: EMQX_CLIENTINFO_PARSE_T, authresult,
                               state) -> bool:
        print(
            f'[Python SDK] [on_client_authenticate] {clientinfo.clientid} authenticate')
        # when the clientid is not empty, verification passed
        if clientinfo.clientid != '':
            return True
        return False

    # on_message_* only supports the enterprise
    @hooks_handler()
    def on_message_publish(self, message: EMQX_MESSAGE_PARSE_T, state):
        print(
            f'[Python SDK] [on_message_publish] {message.topic} {message.payload}')


emqx_hook = CustomHook(hook_module=f'{__name__}.emqx_hook')


def init():
    return emqx_hook.start()


def deinit():
    return

Enable

Enable plugin emqx_extension_hook. If configuration error or write wrong Python code, it can not be enabled normally. After it is enabled, try to establish the MQTT connection and observer the running situation of the business.

./bin/emqx_ctl plugins load emqx_extension_hook

Advanced development

Currently, the EMQ X Python extension SDK is open source, if you have higher requirements for the controllability and performance or you need to use the running environment of Python 2.7, welcome to contribute code or develop based on original examples.

The streaming database built for IoT data storage and real-time processing.

Fully managed MQTT 5.0 IoT cloud, start a 180-day free trial.

Related Links

Retained message and message expiration interval of EMQ X MQTT 5.0 broker

The message retention function of [EMQ X MQTT Broker](https://emqx.io) is implemented by the `emqx_retainer` plugin, which is enabled by default. By modifying the configuration of the` emqx_retainer` plugin, you can adjust the EMQ X Broker's retention message Location, restrict the number of retained messages and maximum payload length, and adjust the expiration time of retained messages.

EMQ X Newsletter 202103

In March, the focus of our work was on finalising 4.3 release as well as the design of EMQ X Broker 5.0

MQTT X v1.3.0 was officially released - Cross-platform MQTT 5.0 desktop test client

[MQTT X](https://mqttx.app) is a cross-platform MQTT 5.0 desktop test client provided by the world's leading open source IoT middleware provider [EMQ](https://emqx.io) , which supports macOS, Linux, Windows. The user interface of **MQTT X** simplifies the operation logic of the page with the pattern of chatting software. Users can quickly create multiple simultaneous-online MQTT clients to test the connection/publish/subscribe functions of MQTT/TCP, MQTT/TLS and other MQTT protocol features.