# Introduction
EMQX cluster adopts a distributed architecture, which can handle a large number of clients and messages while ensuring high availability, fault tolerance and scalability.
EMQX 5.0 adopts the new Mria cluster architecture compared to the previous version's cluster design. With this Mria architecture, one EMQX node can support up to 5 million MQTT connections, and the EMQX cluster can support up to 100 million concurrent MQTT connections.
This chapter will introduce the distributed EMQX cluster and its working mode to help you get started.
# Working principles
In a distributed EMQX cluster, one EMQX instance runs on each node. All nodes within the cluster communicate with each other and share information on client connections, subscriptions, and published messages. So the EMQX cluster can automatically distribute the load across the nodes, thus ensuring the system's high availability.
EMQX cluster provides data redundancy by maintaining multiple copies of the data across different cluster nodes. If one node malfunctions, the data is still available on other nodes.
In addition, you can continuously add new nodes to one cluster to scale up the system's capabilities, so you can better handle an increasing number of clients and messages without worrying about the limitations of a single broker.
EMQX is powered by Erlang/OTP (opens new window), the programming language platform Ericsson developed for telecommunication equipment systems. Before we talk about EMQX clustering, let's first talk about what is Erlang/OTP.
# Erlang/OTP and Erlang node
Telecom equipment, such as routers and access gateways, are usually distributed systems with the main control board and multiple business boards connected via the backplane.
The distributed programs of the Erlang/OTP platform are multiple distributed yet interconnected Erlang runtime systems. Each Erlang runtime system is called a node. Nodes are interconnected with TCP to form a network (or a cluster).
Erlang nodes are identified by a unique node name, which consists of two parts separated by @
:
<name>@<ip-address-or-FQDN>
Communication between nodes is addressed by node name. Next, we will illustrate how to create nodes and clusters in Erlang REPL.
- Start four shell terminals locally and then use the
-name
parameter to start four Erlang nodes with the samecookie
:
erl -name node1@127.0.0.1 -setcookie my_nodes
erl -name node2@127.0.0.1 -setcookie my_nodes
erl -name node3@127.0.0.1 -setcookie my_nodes
erl -name node4@127.0.0.1 -setcookie my_nodes
2
3
4
- Visit the console (
node1@127.0.0.1
) and check the name of the current node and connected nodes, among which,node().
is to check the node name andnodes().
is to check the connected nodes.
(node1@127.0.0.1) 4> node().
'node1@127.0.0.1'
(node1@127.0.0.1) 4> nodes().
[]
2
3
4
5
- Let
node1
initiate a connection to other nodes:
(node1@127.0.0.1) 1> net_kernel:connect_node('node2@127.0.0.1').
true
(node1@127.0.0.1) 2> net_kernel:connect_node('node3@127.0.0.1').
true
(node1@127.0.0.1) 3> net_kernel:connect_node('node4@127.0.0.1').
true
2
3
4
5
6
- Rerun the command in step 2 and recheck the connected nodes.
(node1@127.0.0.1) 4> nodes().
['node2@127.0.0.1','node3@127.0.0.1','node4@127.0.0.1']
2
We can see that node2
, node3
, and node4
have established a distributed connection with node1
, and these four nodes form a cluster.
Whenever a new node joins the cluster, it will establish a TCP connection with all the nodes in the cluster. Connection among these 4 nodes is shown below:
# Distributed EMQX cluster
The basic function of a distributed EMQX cluster is to forward and publish messages to different subscribers, as shown below.
To achieve this, EMQX maintains several data structures in embedded database:
- Subscription table
- Routing table
- Topic tree
# Subscription table: topics-subscribers
EMQX maintains a subscription table to store the topic-> subscriber mapping and ensure the incoming messages are routed to the correct clients. This data is only stored on the EMQX node where the subscribers are located. The table scheme is as follows:
node1:
topic1 -> client1, client2
topic2 -> client3
node2:
topic1 -> client4
2
3
4
5
6
7
8
# Route table: topic-node
The route table stores the mapping between the topic and the node, that is, the topic list of each client on all nodes, and ensures the incoming messages are routed to the correct clients. This data will be duplicated among all nodes within the cluster. The table scheme is as follows:
topic1 -> node1, node2
topic2 -> node3
topic3 -> node2, node4
2
3
# Topic tree: topic matching with wildcards
Topic tree has a hierarchical data structure. It stores information on topic hierarchies for matching messages to subscribed clients.
This data will be duplicated among all nodes within the cluster. Below is a topic tree example:
Client | Node | Subscribed topic |
---|---|---|
client1 | node1 | t/+/x, t/+/y |
client2 | node2 | t/# |
client3 | node3 | t/+/x, t/a |
When all subscriptions are completed, EMQX will maintain the following topic tree table and route table:
# Message distribution process
When an MQTT client publishes a message, the node where it is located retrieves the route table and forwards the message to the target node according to the message topic. The target node then retrieves the local subscription table and sends the message to the target subscribers.
For example, when client1
publishes a message to topic t/a
, the routing and distribution of the message between nodes are as follows:
client1
publishes a message with topict/a
tonode1
.node1
queries the topic tree and locatet/#
andt/a
that match topict/a
.node1
queries the route table and fins:Some clients on
node2
subscribed topict/#
;Some clients on
node3
subscribed topict/a
;So
node1
will forward the message tonode2
andnode3
.node2
receives the forwardedt/a
message, queries the local subscription table, and then distributes the message to clients subscribed to the topic.node3
receives the forwardedt/a
message, queries the local subscription table, and then distributes the message to clients subscribed to the topic.Message forwarding and distribution are finished.
# Data partition and sharing
EMQX's subscription table is partitioned in the cluster, while the topic tree and routing table are replicated within the cluster.
# EMQX cluster node discovery and autocluster
EMQX added an abstraction layer with the Ekka (opens new window) library on top of distributed Erlang.
Ekka is a cluster management library developed for Erlang/OTP applications, enabling features like auto discovery of EMQX nodes, auto cluster, network partition autoheal and autoclean.
EMQX supports several node discovery strategies:
Strategy | Description |
---|---|
Manual | Manually create a cluster with commands |
Static | Autocluster through static node list |
DNA | Autocluster through DNS A record |
etcd | Autocluster through etcd |
k8s | Autocluster provided by Kubernetes |
# EMQX cluster protocol settings
Each Erlang node can be connected via TCP or TLS, and the connection method can be configured in etc/emqx.conf
:
Configuration item | Type | Default value | Description |
---|---|---|---|
cluster.proto_dist | enum | inet_tcp | Distributed protocol with optional values are: - inet_tcp: use TCP IPv4 - inet6_tcp: use TCP IPv6 - inet_tls: use TLS |
node.ssl_dist_optfile | file path | etc/ssl_dist.conf | When cluster.proto_dist is selected as inet_tls , you need to configure the etc/ssl_dist.conf file and specify the TLS certificate. |
# Network partition autoheal
EMQX supports network partition autoheal, which can be configured in etc/emqx.conf
:
cluster.autoheal = on
The work process is as follows:
- The node receives
inconsistent_database
from Mnesia and waits 3 seconds before starting the network partition confirmation; - After the node confirms the network partition, it reports to the Leader node (the cluster node that starts first);
- After the Leader node delays for a while, it creates a SplitView when all nodes are online;
- The Leader node selects the self-healing coordinator node in the majority partition;
- The coordinator node restarts the nodes in the minority partition to restore the cluster.
# Autoclean of cluster nodes
EMQX supports cluster autoclean, which can be configured in etc/emqx.conf
:
cluster.autoclean = 5m
With this feature enabled, the disconnected node will be automatically removed from the cluster after the configured time interval.
# Session across Nodes
The persistent MQTT sessions (clean session = false) are across nodes in the cluster.
If a persistent MQTT client connected to node1 first, then disconnected and connects to node2, the MQTT connection and session will be located on different nodes。
# Further reading
You can continue to read the following chapters on how to cluster EMQX nodes.
- EMQX cluster security setting
- Create Cluster