# Plugins

The emqttd broker could be extended by plugins. Users could develop plugins to customize authentication, ACL and functions of the broker, or integrate the broker with other systems.

The plugins that emqtt project released:

PluginDescription
emqttd_plugin_template (opens new window)Template Plugin
emqttd_dashboard (opens new window)Web Dashboard
emqttd_auth_http (opens new window)HTTP Auth/ACL Plugin
emqttd_plugin_mysql (opens new window)MySQL Auth/ACL Plugin
emqttd_plugin_pgsql (opens new window)PostgreSQL Auth/ACL Plugin
emqttd_plugin_redis (opens new window)Redis Auth/ACL Plugin
emqttd_plugin_mongo (opens new window)MongoDB Auth/ACL Plugin
emqttd_stomp (opens new window)STOMP Protocol Plugin
emqttd_sockjs (opens new window)STOMP over SockJS Plugin
emqttd_recon (opens new window)Recon Plugin
emqttd_reloader (opens new window)Reloader Plugin

# emqttd_plugin_template - Template Plugin

A plugin is just a normal Erlang application under the 'emqttd/plugins' folder. Each plugin has e configuration file: 'etc/plugin.config'.

plugins/emqttd_plugin_template is a demo plugin. The folder structure:

FileDescription
etc/plugin.configPlugin config file
ebin/Erlang program files

# Load, unload Plugin

Use 'bin/emqttd_ctl plugins' CLI to load, unload a plugin:

./bin/emqttd_ctl plugins load <PluginName>

./bin/emqttd_ctl plugins unload <PluginName>

./bin/emqttd_ctl plugins list

# emqttd_dashboard - Dashboard Plugin

The Web Dashboard for emqttd broker. The plugin will be loaded automatically when the broker started successfully.

Addresshttp://localhost:18083 (opens new window)
Default Useradmin
Default Passwordpublic

image

# Configure Dashboard

emqttd_dashboard/etc/plugin.config:

[
  {emqttd_dashboard, [
    {listener,
      {emqttd_dashboard, 18083, [
        {acceptors, 4},
        {max_clients, 512}]}
    }
  ]}
].

# emqttd_auth_http - HTTP Auth/ACL Plugin

MQTT Authentication/ACL with HTTP API: https://github.com/emqtt/emqttd_auth_http (opens new window)

Tip

Supported in 1.1 release

# Configure emqttd_auth_http/etc/plugin.config

[

  {emqttd_auth_http, [

    %% Variables: %u = username, %c = clientid, %a = ipaddress, %t = topic

    {super_req, [
      {method, post},
      {url, "http://localhost:8080/mqtt/superuser"},
      {params, [
        {username, "%u"},
        {clientid, "%c"}
      ]}
    ]},

    {auth_req, [
      {method, post},
      {url, "http://localhost:8080/mqtt/auth"},
      {params, [
        {clientid, "%c"},
        {username, "%u"},
        {password, "%P"}
      ]}
    ]},

    %% 'access' parameter: sub = 1, pub = 2

    {acl_req, [
      {method, post},
      {url, "http://localhost:8080/mqtt/acl"},
      {params, [
        {access,   "%A"},
        {username, "%u"},
        {clientid, "%c"},
        {ipaddr,   "%a"},
        {topic,    "%t"}
      ]}
    ]}
  ]}

].

# HTTP API

Return 200 if ok

Return 4xx if unauthorized

# Load emqttd_auth_http plugin

./bin/emqttd_ctl plugins load emqttd_auth_http

# emqttd_plugin_mysql - MySQL Auth/ACL Plugin

MQTT Authentication, ACL with MySQL database.

# MQTT User Table

CREATE TABLE `mqtt_user` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `username` varchar(100) DEFAULT NULL,
  `password` varchar(100) DEFAULT NULL,
  `salt` varchar(20) DEFAULT NULL,
  `is_superuser` tinyint(1) DEFAULT 0,
  `created` datetime DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `mqtt_username` (`username`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8;

# MQTT ACL Table

CREATE TABLE `mqtt_acl` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `allow` int(1) DEFAULT NULL COMMENT '0: deny, 1: allow',
  `ipaddr` varchar(60) DEFAULT NULL COMMENT 'IpAddress',
  `username` varchar(100) DEFAULT NULL COMMENT 'Username',
  `clientid` varchar(100) DEFAULT NULL COMMENT 'ClientId',
  `access` int(2) NOT NULL COMMENT '1: subscribe, 2: publish, 3: pubsub',
  `topic` varchar(100) NOT NULL DEFAULT '' COMMENT 'Topic Filter',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

INSERT INTO `mqtt_acl` (`id`, `allow`, `ipaddr`, `username`, `clientid`, `access`, `topic`)
VALUES
    (1,1,NULL,'$all',NULL,2,'#'),
    (2,0,NULL,'$all',NULL,1,'$SYS/#'),
    (3,0,NULL,'$all',NULL,1,'eq #'),
    (5,1,'127.0.0.1',NULL,NULL,2,'$SYS/#'),
    (6,1,'127.0.0.1',NULL,NULL,2,'#'),
    (7,1,NULL,'dashboard',NULL,1,'$SYS/#');

# Configure emqttd_plugin_mysql/etc/plugin.config

Configure MySQL host, username, password and database:

[

  {emqttd_plugin_mysql, [

    {mysql_pool, [
        %% ecpool options
        {pool_size, 8},
        {auto_reconnect, 3},

        %% mysql options
        {host,     "localhost"},
        {port,     3306},
        {user,     ""},
        {password, ""},
        {database, "mqtt"},
        {encoding, utf8}
    ]},

    %% Variables: %u = username, %c = clientid, %a = ipaddress

    %% Superuser Query
    {superquery, "select is_superuser from mqtt_user where username = '%u' limit 1"},

    %% Authentication Query: select password only
    {authquery, "select password from mqtt_user where username = '%u' limit 1"},

    %% hash algorithm: plain, md5, sha, sha256, pbkdf2?
    {password_hash, sha256},

    %% select password with salt
    %% {authquery, "select password, salt from mqtt_user where username = '%u'"},

    %% sha256 with salt prefix
    %% {password_hash, {salt, sha256}},

    %% sha256 with salt suffix
    %% {password_hash, {sha256, salt}},

    %% '%a' = ipaddress, '%u' = username, '%c' = clientid
    %% Comment this query, the acl will be disabled
    {aclquery, "select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"},

    %% If no ACL rules matched, return...
    {acl_nomatch, allow}

  ]}

].

# Load emqttd_plugin_mysql plugin

./bin/emqttd_ctl plugins load emqttd_plugin_mysql

# emqttd_plugin_pgsql - PostgreSQL Auth/ACL Plugin

MQTT Authentication, ACL with PostgreSQL Database.

# MQTT User Table

CREATE TABLE mqtt_user (
  id SERIAL primary key,
  is_superuser boolean,
  username character varying(100),
  password character varying(100),
  salt character varying(40)
);

# MQTT ACL Table

CREATE TABLE mqtt_acl (
  id SERIAL primary key,
  allow integer,
  ipaddr character varying(60),
  username character varying(100),
  clientid character varying(100),
  access  integer,
  topic character varying(100)
);

INSERT INTO mqtt_acl (id, allow, ipaddr, username, clientid, access, topic)
VALUES
    (1,1,NULL,'$all',NULL,2,'#'),
    (2,0,NULL,'$all',NULL,1,'$SYS/#'),
    (3,0,NULL,'$all',NULL,1,'eq #'),
    (5,1,'127.0.0.1',NULL,NULL,2,'$SYS/#'),
    (6,1,'127.0.0.1',NULL,NULL,2,'#'),
    (7,1,NULL,'dashboard',NULL,1,'$SYS/#');

# Configure emqttd_plugin_pgsql/etc/plugin.config

Configure host, username, password and database of PostgreSQL:

[

  {emqttd_plugin_pgsql, [

    {pgsql_pool, [
        %% ecpool options
        {pool_size, 8},
        {auto_reconnect, 3},

        %% pgsql options
        {host, "localhost"},
        {port, 5432},
        {ssl, false},
        {username, "feng"},
        {password, ""},
        {database, "mqtt"},
        {encoding,  utf8}
    ]},

    %% Variables: %u = username, %c = clientid, %a = ipaddress

    %% Superuser Query
    {superquery, "select is_superuser from mqtt_user where username = '%u' limit 1"},

    %% Authentication Query: select password only
    {authquery, "select password from mqtt_user where username = '%u' limit 1"},

    %% hash algorithm: plain, md5, sha, sha256, pbkdf2?
    {password_hash, sha256},

    %% select password with salt
    %% {authquery, "select password, salt from mqtt_user where username = '%u'"},

    %% sha256 with salt prefix
    %% {password_hash, {salt, sha256}},

    %% sha256 with salt suffix
    %% {password_hash, {sha256, salt}},

    %% Comment this query, the acl will be disabled. Notice: don't edit this query!
    {aclquery, "select allow, ipaddr, username, clientid, access, topic from mqtt_acl
                 where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"},

    %% If no rules matched, return...
    {acl_nomatch, allow}
  ]}
].

# Load emqttd_plugin_pgsql Plugin

./bin/emqttd_ctl plugins load emqttd_plugin_pgsql

# emqttd_plugin_redis - Redis Auth/ACL Plugin

MQTT Authentication, ACL with Redis: https://github.com/emqtt/emqttd_plugin_redis (opens new window)

# Configure emqttd_plugin_redis/etc/plugin.config

[
  {emqttd_plugin_redis, [

    {eredis_pool, [
      %% ecpool options
      {pool_size, 8},
      {auto_reconnect, 2},

      %% eredis options
      {host, "127.0.0.1"},
      {port, 6379},
      {database, 0},
      {password, ""}
    ]},

    %% Variables: %u = username, %c = clientid

    %% HMGET mqtt_user:%u is_superuser
    {supercmd, ["HGET", "mqtt_user:%u", "is_superuser"]},

    %% HMGET mqtt_user:%u password
    {authcmd, ["HGET", "mqtt_user:%u", "password"]},

    %% Password hash algorithm: plain, md5, sha, sha256, pbkdf2?
    {password_hash, sha256},

    %% SMEMBERS mqtt_acl:%u
    {aclcmd, ["SMEMBERS", "mqtt_acl:%u"]},

    %% If no rules matched, return...
    {acl_nomatch, deny},

    %% Load Subscriptions form Redis when client connected.
    {subcmd, ["HGETALL", "mqtt_subs:%u"]}
  ]}
].

# User HASH

Set a 'user' hash with 'password' field, for example:

HSET mqtt_user:\<username> is_superuser 1
HSET mqtt_user:\<username> password "passwd"

# ACL Rule SET

The plugin uses a redis SET to store ACL rules:

SADD mqtt_acl:\<username> "publish topic1"
SADD mqtt_acl:\<username> "subscribe topic2"
SADD mqtt_acl:\<username> "pubsub topic3"

# Subscription HASH

The plugin can store static subscriptions in a redis Hash:

HSET mqtt_subs:\<username> topic1 0
HSET mqtt_subs:\<username> topic2 1
HSET mqtt_subs:\<username> topic3 2

# Load emqttd_plugin_redis Plugin

./bin/emqttd_ctl plugins load emqttd_plugin_redis

# emqttd_plugin_mongo - MongoDB Auth/ACL Plugin

MQTT Authentication, ACL with MongoDB: https://github.com/emqtt/emqttd_plugin_mongo (opens new window)

# Configure emqttd_plugin_mongo/etc/plugin.config

[
  {emqttd_plugin_mongo, [

    {mongo_pool, [
      {pool_size, 8},
      {auto_reconnect, 3},

      %% Mongodb Driver Opts
      {host, "localhost"},
      {port, 27017},
      %% {login, ""},
      %% {password, ""},
      {database, "mqtt"}
    ]},

    %% Variables: %u = username, %c = clientid

    %% Superuser Query
    {superquery, [
      {collection, "mqtt_user"},
      {super_field, "is_superuser"},
      {selector, {"username", "%u"}}
    ]},

    %% Authentication Query
    {authquery, [
      {collection, "mqtt_user"},
      {password_field, "password"},
      %% Hash Algorithm: plain, md5, sha, sha256, pbkdf2?
      {password_hash, sha256},
      {selector, {"username", "%u"}}
    ]},

    %% ACL Query: "%u" = username, "%c" = clientid
    {aclquery, [
      {collection, "mqtt_acl"},
      {selector, {"username", "%u"}}
    ]},

    %% If no ACL rules matched, return...
    {acl_nomatch, deny}

  ]}
].

# MongoDB Database

use mqtt
db.createCollection("mqtt_user")
db.createCollection("mqtt_acl")
db.mqtt_user.ensureIndex({"username":1})

# User Collection

{
    username: "user",
    password: "password hash",
    is_superuser: boolean (true, false),
    created: "datetime"
}

For example:

db.mqtt_user.insert({username: "test", password: "password hash", is_superuser: false})
db.mqtt_user:insert({username: "root", is_superuser: true})

# ACL Collection

{
    username: "username",
    clientid: "clientid",
    publish: ["topic1", "topic2", ...],
    subscribe: ["subtop1", "subtop2", ...],
    pubsub: ["topic/#", "topic1", ...]
}

For example:

db.mqtt_acl.insert({username: "test", publish: ["t/1", "t/2"], subscribe: ["user/%u", "client/%c"]})
db.mqtt_acl.insert({username: "admin", pubsub: ["#"]})

# Load emqttd_plugin_mongo Plugin

./bin/emqttd_ctl plugins load emqttd_plugin_mongo

# emqttd_stomp - STOMP Protocol

Support STOMP 1.0/1.1/1.2 clients to connect to emqttd broker and communicate with MQTT Clients.

# Configure emqttd_stomp/etc/plugin.config

Tip

Default Port for STOMP Protocol: 61613

[
  {emqttd_stomp, [

    {default_user, [
        {login,    "guest"},
        {passcode, "guest"}
    ]},

    {allow_anonymous, true},

    %%TODO: unused...
    {frame, [
      {max_headers,       10},
      {max_header_length, 1024},
      {max_body_length,   8192}
    ]},

    {listeners, [
      {emqttd_stomp, 61613, [
        {acceptors,   4},
        {max_clients, 512}
      ]}
    ]}

  ]}
].

# Load emqttd_stomp Plugin

./bin/emqttd_ctl plugins load emqttd_stomp

# emqttd_sockjs - STOMP/SockJS Plugin

emqttd_sockjs plugin enables web browser to connect to emqttd broker and communicate with MQTT clients.

Tip

Default TCP Port: 61616

# Configure emqttd_sockjs

[
  {emqttd_sockjs, [

    {sockjs, []},

    {cowboy_listener, {stomp_sockjs, 61616, 4}},

  ]}
].

# Load emqttd_sockjs Plugin

Tip

emqttd_stomp Plugin required.

./bin/emqttd_ctl plugins load emqttd_stomp

./bin/emqttd_ctl plugins load emqttd_sockjs

# SockJS Demo Page

http://localhost:61616/index.html (opens new window)

# emqttd_recon - Recon Plugin

The plugin loads recon (opens new window) library on a running emqttd broker. Recon libray helps debug and optimize an Erlang application.

# Load emqttd_recon Plugin

./bin/emqttd_ctl plugins load emqttd_recon

# Recon CLI

./bin/emqttd_ctl recon

recon memory                 #recon_alloc:memory/2
recon allocated              #recon_alloc:memory(allocated_types, current|max)
recon bin_leak               #recon:bin_leak(100)
recon node_stats             #recon:node_stats(10, 1000)
recon remote_load Mod        #recon:remote_load(Mod)

# emqttd_reloader - Reloader Plugin

Erlang Module Reloader for Development

Tip

Don't load the plugin in production!

# Load emqttd_reloader Plugin

./bin/emqttd_ctl plugins load emqttd_reloader

# reload CLI

./bin/emqttd_ctl reload

reload \<Module>             # Reload a Module

# Plugin Development Guide

# Create a Plugin Project

Clone emqttd source from github.com:

git clone https://github.com/emqtt/emqttd.git

Create a plugin project under 'plugins' folder:

cd plugins && mkdir emqttd_my_plugin

cd emqttd_my_plugin && rebar create-app appid=emqttd_my_plugin

Template Plugin: https://github.com/emqtt/emqttd_plugin_template (opens new window)

# Register Auth/ACL Modules

emqttd_auth_demo.erl - demo authentication module:

-module(emqttd_auth_demo).

-behaviour(emqttd_auth_mod).

-include("../../../include/emqttd.hrl").

-export([init/1, check/3, description/0]).

init(Opts) -> {ok, Opts}.

check(#mqtt_client{client_id = ClientId, username = Username}, Password, _Opts) ->
    io:format("Auth Demo: clientId=~p, username=~p, password=~p~n",
              [ClientId, Username, Password]),
    ok.

description() -> "Demo Auth Module".

emqttd_acl_demo.erl - demo ACL module:

-module(emqttd_acl_demo).

-include("../../../include/emqttd.hrl").

%% ACL callbacks
-export([init/1, check_acl/2, reload_acl/1, description/0]).

init(Opts) ->
    {ok, Opts}.

check_acl({Client, PubSub, Topic}, Opts) ->
    io:format("ACL Demo: ~p ~p ~p~n", [Client, PubSub, Topic]),
    allow.

reload_acl(_Opts) ->
    ok.

description() -> "ACL Module Demo".

emqttd_plugin_template_app.erl - Register the auth/ACL modules:

ok = emqttd_access_control:register_mod(auth, emqttd_auth_demo, []),
ok = emqttd_access_control:register_mod(acl, emqttd_acl_demo, []),

# Register Callbacks for Hooks

The plugin could register callbacks for hooks. The hooks will be run by the broker when a client connected/disconnected, a topic subscribed/unsubscribed or a message published/delivered:

NameDescription
client.connectedRun when a client connected to the broker successfully
client.subscribeRun before a client subscribes topics
client.subscribe.afterRun after a client subscribed topics
client.unsubscribeRun when a client unsubscribes topics
message.publishRun when a message is published
message.deliveredRun when a message is delivered
message.ackedRun when a message(qos1/2) is acked
client.disconnectedRun when a client is disconnnected

emqttd_plugin_template.erl for example:

%% Called when the plugin application start
load(Env) ->
    emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
    emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
    emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/3, [Env]),
    emqttd:hook('client.subscribe.after', fun ?MODULE:on_client_subscribe_after/3, [Env]),
    emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/3, [Env]),
    emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
    emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/3, [Env]),
    emqttd:hook('message.acked', fun ?MODULE:on_message_acked/3, [Env]).

# Register CLI Modules

emqttd_cli_demo.erl:

-module(emqttd_cli_demo).

-include("../../../include/emqttd_cli.hrl").

-export([cmd/1]).

cmd(["arg1", "arg2"]) ->
    ?PRINT_MSG("ok");

cmd(_) ->
    ?USAGE([{"cmd arg1 arg2", "cmd demo"}]).

emqttd_plugin_template_app.erl - register the CLI module to emqttd broker:

emqttd_ctl:register_cmd(cmd, {emqttd_cli_demo, cmd}, []).

There will be a new CLI after the plugin loaded:

./bin/emqttd_ctl cmd arg1 arg2