# Introduction

EMQX cluster adopts a distributed architecture, which can handle a large number of clients and messages while ensuring high availability, fault tolerance and scalability.

EMQX 5.0 adopts the new Mria cluster architecture compared to the previous version's cluster design. With this Mria architecture, one EMQX node can support up to 5 million MQTT connections, and the EMQX cluster can support up to 100 million concurrent MQTT connections.

This chapter will introduce the distributed EMQX cluster and its working mode to help you get started.

# Working principles

In a distributed EMQX cluster, one EMQX instance runs on each node. All nodes within the cluster communicate with each other and share information on client connections, subscriptions, and published messages. So the EMQX cluster can automatically distribute the load across the nodes, thus ensuring the system's high availability.

EMQX cluster provides data redundancy by maintaining multiple copies of the data across different cluster nodes. If one node malfunctions, the data is still available on other nodes.

In addition, you can continuously add new nodes to one cluster to scale up the system's capabilities, so you can better handle an increasing number of clients and messages without worrying about the limitations of a single broker.

EMQX is powered by Erlang/OTP (opens new window), the programming language platform Ericsson developed for telecommunication equipment systems. Before we talk about EMQX clustering, let's first talk about what is Erlang/OTP.

# Erlang/OTP and Erlang node

Telecom equipment, such as routers and access gateways, are usually distributed systems with the main control board and multiple business boards connected via the backplane.

The distributed programs of the Erlang/OTP platform are multiple distributed yet interconnected Erlang runtime systems. Each Erlang runtime system is called a node. Nodes are interconnected with TCP to form a network (or a cluster).

Erlang nodes are identified by a unique node name, which consists of two parts separated by @:

<name>@<ip-address-or-FQDN>
1

Communication between nodes is addressed by node name. Next, we will illustrate how to create nodes and clusters in Erlang REPL.

  1. Start four shell terminals locally and then use the -name parameter to start four Erlang nodes with the same cookie:
erl -name node1@127.0.0.1 -setcookie my_nodes
erl -name node2@127.0.0.1 -setcookie my_nodes
erl -name node3@127.0.0.1 -setcookie my_nodes
erl -name node4@127.0.0.1 -setcookie my_nodes
1
2
3
4
  1. Visit the console (node1@127.0.0.1) and check the name of the current node and connected nodes, among which, node(). is to check the node name and nodes(). is to check the connected nodes.
(node1@127.0.0.1) 4> node().
'node1@127.0.0.1'

(node1@127.0.0.1) 4> nodes().
[]
1
2
3
4
5
  1. Let node1 initiate a connection to other nodes:
(node1@127.0.0.1) 1> net_kernel:connect_node('node2@127.0.0.1').
true
(node1@127.0.0.1) 2> net_kernel:connect_node('node3@127.0.0.1').
true
(node1@127.0.0.1) 3> net_kernel:connect_node('node4@127.0.0.1').
true
1
2
3
4
5
6
  1. Rerun the command in step 2 and recheck the connected nodes.
(node1@127.0.0.1) 4> nodes().
['node2@127.0.0.1','node3@127.0.0.1','node4@127.0.0.1']
1
2

We can see that node2, node3, and node4 have established a distributed connection with node1, and these four nodes form a cluster.

Whenever a new node joins the cluster, it will establish a TCP connection with all the nodes in the cluster. Connection among these 4 nodes is shown below:

image

# Distributed EMQX cluster

The basic function of a distributed EMQX cluster is to forward and publish messages to different subscribers, as shown below.

image

To achieve this, EMQX maintains several data structures in embedded database:

  • Subscription table
  • Routing table
  • Topic tree

# Subscription table: topics-subscribers

EMQX maintains a subscription table to store the topic-> subscriber mapping and ensure the incoming messages are routed to the correct clients. This data is only stored on the EMQX node where the subscribers are located. The table scheme is as follows:

node1:

    topic1 -> client1, client2
    topic2 -> client3

node2:

    topic1 -> client4
1
2
3
4
5
6
7
8

# Route table: topic-node

The route table stores the mapping between the topic and the node, that is, the topic list of each client on all nodes, and ensures the incoming messages are routed to the correct clients. This data will be duplicated among all nodes within the cluster. The table scheme is as follows:

topic1 -> node1, node2
topic2 -> node3
topic3 -> node2, node4
1
2
3

# Topic tree: topic matching with wildcards

Topic tree has a hierarchical data structure. It stores information on topic hierarchies for matching messages to subscribed clients.

This data will be duplicated among all nodes within the cluster. Below is a topic tree example:

ClientNodeSubscribed topic
client1node1t/+/x, t/+/y
client2node2t/#
client3node3t/+/x, t/a

When all subscriptions are completed, EMQX will maintain the following topic tree table and route table:

image

# Message distribution process

When an MQTT client publishes a message, the node where it is located retrieves the route table and forwards the message to the target node according to the message topic. The target node then retrieves the local subscription table and sends the message to the target subscribers.

For example, when client1 publishes a message to topic t/a, the routing and distribution of the message between nodes are as follows:

  1. client1 publishes a message with topic t/a to node1.

  2. node1 queries the topic tree and locate t/# and t/a that match topic t/a.

  3. node1 queries the route table and fins:

  4. Some clients on node2 subscribed topic t/#;

  5. Some clients on node3subscribed topic t/a;

    So node1 will forward the message to node2 and node3.

  6. node2 receives the forwarded t/a message, queries the local subscription table, and then distributes the message to clients subscribed to the topic.

  7. node3 receives the forwarded t/a message, queries the local subscription table, and then distributes the message to clients subscribed to the topic.

  8. Message forwarding and distribution are finished.

# Data partition and sharing

EMQX's subscription table is partitioned in the cluster, while the topic tree and routing table are replicated within the cluster.

# EMQX cluster node discovery and autocluster

EMQX added an abstraction layer with the Ekka (opens new window) library on top of distributed Erlang.

Ekka is a cluster management library developed for Erlang/OTP applications, enabling features like auto discovery of EMQX nodes, auto cluster, network partition autoheal and autoclean.

EMQX supports several node discovery strategies:

StrategyDescription
ManualManually create a cluster with commands
StaticAutocluster through static node list
DNAAutocluster through DNS A record
etcdAutocluster through etcd
k8sAutocluster provided by Kubernetes

# EMQX cluster protocol settings

Each Erlang node can be connected via TCP or TLS, and the connection method can be configured in etc/emqx.conf:

Configuration itemTypeDefault valueDescription
cluster.proto_distenuminet_tcpDistributed protocol with optional values are:
- inet_tcp: use TCP IPv4
- inet6_tcp: use TCP IPv6
- inet_tls: use TLS
node.ssl_dist_optfilefile pathetc/ssl_dist.confWhen cluster.proto_dist is selected as inet_tls, you need to configure the etc/ssl_dist.conf file and specify the TLS certificate.

# Network partition autoheal

EMQX supports network partition autoheal, which can be configured in etc/emqx.conf:

cluster.autoheal = on
1

The work process is as follows:

  1. The node receives inconsistent_database from Mnesia and waits 3 seconds before starting the network partition confirmation;
  2. After the node confirms the network partition, it reports to the Leader node (the cluster node that starts first);
  3. After the Leader node delays for a while, it creates a SplitView when all nodes are online;
  4. The Leader node selects the self-healing coordinator node in the majority partition;
  5. The coordinator node restarts the nodes in the minority partition to restore the cluster.

# Autoclean of cluster nodes

EMQX supports cluster autoclean, which can be configured in etc/emqx.conf :

cluster.autoclean = 5m
1

With this feature enabled, the disconnected node will be automatically removed from the cluster after the configured time interval.

# Session across Nodes

The persistent MQTT sessions (clean session = false) are across nodes in the cluster.

If a persistent MQTT client connected to node1 first, then disconnected and connects to node2, the MQTT connection and session will be located on different nodes。

# Further reading

You can continue to read the following chapters on how to cluster EMQX nodes.