# Hooks

# Definition

Hooks is a mechanism provided by EMQX Broker, which modifies or extends system functions by intercepting function calls, message passing, and event passing between modules.

In simple terms, the purpose of this mechanism is to enhance the flexibility of the software system, facilitate integration with the 3rd-party systems, or change the original default behavior of the system. Such as:

Hooks-In-System

When the Hooks mechanism does not exist in the system, the entire event processing flow (from the input of the event, to the handler and the result) is invisible and cannot be modified for the external system .

In the process, if a HookPoint where a function can be mounted is added, it will allow external plugins to mount multiple callback functions to form a call chain. Then, the internal event processing can be extended and modified. There are several functions that are implemented using the hook feature as follows.

  1. You can use the hook system to perform multi-step streaming processing (encoding/decoding, etc.) of messages when they are published. That is, the rules engine provides the message codec function.
  2. Caching of messages at message publishing time according to configuration.
  3. Delayed message publishing using the hook's blocking mechanism.

The authentication/authorization commonly used in the system is implemented according to this logic. Take the ExHook as an example:

When only the Built-in Database authentication is enabled, according to the processing logic of the event (see figure above), the logic of the authentication module at this time is:

  1. Receive user authentication request (Authenticate)
  2. Execute the hook of the authentication event with ClientInfo and default AccIn.
%% Default AccIn
{ok, #{is_superuser => false}}
1
2
  1. Call back to the emqx_exhook module, assume this authentication is valid, and get allow, is_superuser result.
%% AuthNResult
{ok, #{is_superuser => true}}
1
2
  1. Return Authentication succeeded, and successfully access the system as a superuser

It is shown in the following figure:

                     EMQX Core          Hooks & Plugins
                |<---  Scope  --->|<-------  Scope  -------->|
                |                 |       emqx_exhook        |
  Authenticate  |    InitAcc      |      (user's code)       |   Authenticate
 =============> > - - - - - - - - > - - - - - - - - - - - - Yes ==============> Success
     Request    |  (Init Result)  |      authenticate?       |      Result
                |                 |                          |
                +-----------------+--------------------------+
1
2
3
4
5
6
7
8

Therefore, in EMQX Broker, the mechanism of Hooks greatly enhances the flexibility of the system. We can only hook a function on HookPoint that EMQX provided on specific location, to extend EMQX Broker with various behaviors. And no need to modify the emqx (opens new window) core code.

The implementer of the plugin only needs to pay attention to:

  1. The location of HookPoint: Including its role, timing of execution, and how to mount and cancel mount.
  2. Implementation of callback function: including the number of input parameters, role, data structure of the callback function, and the meaning of the returned value.
  3. Understand the mechanism of callback function execution on the chain: including the order in which callback functions are executed, and how to terminate the execution of the chain in advance.

If you used hooks in the development of extension plugin, you should be able to fully understand these three above points, and try not to use blocking functions inside the hooks, which will affect system throughput.

# Callback Functions Chain

There may be multiple plugins on a single HookPoint that need to care about the event and perform the corresponding operation, so there may be multiple callback functions on each HookPoint.

We call this chain composed of multiple callback functions executed sequentially Callback Functions Chain.

Callback Functions Chain is Currently implemented according to the concept of Chain-of-Responsibility (opens new window). In order to satisfy the functionality and flexibility of the hook, it have the following attributes:

  • Ordered: The callback functions on the Callback Functions Chain must be executed in certain order.
  • In Parameter There must be one/some initialization parameters and, optionally, a cumulative value for modification by the Callback Functions Chain.
  • Output Result Each function in the chain must have an output, and ok should be used as the function return value for callback functions that do not care about the result of execution. For example, in a notification class event, "A client has successfully logged in" does not require a return value.
  • Transitive The result of callback functions in the chain is transitive. And to allow more flexibility in the use of hooks, we have designed two modes of handling the return values of callback functions in the chain.
    • Result Transitive
      It means that each callback function in the chain accepts the entry of the chain, and the return of the previous callback function (which can be interpreted as a cumulative value) as arguments. Until the last function, the return value of the last function is then the return value of the whole chain. Specially, the chain is called with an initial value for the cumulative value to be used by the first callback function in the chain.
    • Result Transparent
      Each function in the chain only cares about the chain's entry, and will ignore the return value of the previous callback function. And the chain will have a fixed return value of ok.
      This is actually a special case of the first type of Result Transitive. That is, the initial accumulation value is ok, and each callback function in the chain only cares about the incoming parameters of the chain and keeps the accumulation value as ok unchanged.
      Most of notification event follows this logic. So that we provide the general Callback Functions Chain execution module.
  • Callback Functions Chain needs to allow the functions with it to terminate the chain in advance and ignore this operation.
    • Termination in advance: After the execution of this function is completed, the execution of the chain is directly terminated. All subsequent callback functions on the chain are ignored.
      For example, an authentication believes that such clients do not need to check other authentication plug-ins after they are allowed to log in, so they need to be terminated in advance.
    • Ignore this operation: Do not modify the processing result on the chain, and pass it directly to the next callback function.
      For example, when there are multiple authentication plug-ins, an authentication plug-in believes that such clients do not belong to its authentication scope, and it does not need to modify the authentication results. This operation should be ignored and the returned value of the previous function should be passed directly to the next function on the chain.

Therefore, we can obtain two program flow diagrams for the execution chain based on the two ways of handling the return value of the callback function on the chain.

# Result Transitiv

Chain execution direction ===>>>

                                 ┌------ ok ------┐                 ┌------ ok ------┐                 ┌------ ok ------┐
                                 |   (Args, Acc)  |                 |   (Args, Acc)  |                 |   (Args, Acc)  |
(Args, InitAcc) -----> Fun1 ---->┤                ├-----> Fun2 ---->┤                ├-----> Fun3 ---->┤                ├-> [NO MORE FUNCS]
                        |        |                |        |        |                |        |        |                |        |
                        |        └- {ok, NewAcc} -┘        |        └- {ok, NewAcc} -┘        |        └- {ok, NewAcc} -┘        |
                  ┌-----┴-----┐    (Args, NewAcc)    ┌-----┴-----┐    (Args, NewAcc)    ┌-----┴-----┐    (Args, NewAcc)    ┌-----┴-----┐
                  |           |                      |           |                      |           |                      |           |
                stop     {stop, NewAcc}            stop     {stop, NewAcc}            stop     {stop, NewAcc}             ok      {ok, NewAcc}
                  |           |                      |           |                      |           |                      |           |
                 Acc       NewAcc                   Acc        NewAcc                  Acc        NewAcc                  Acc        NewAcc
                  └-----┬-----┘                      └-----┬-----┘                      └-----┬-----┘                      └-----┬-----┘
                        |                                  |                                  |                                  |
 Result <---------------┴<---------------------------------┴<---------------------------------┴<---------------------------------┘
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

The meaning of the figure is:

  1. A total of three callback functions are registered on the chain in the figure,Fun1 Fun2 Fun3 , which are executed in the order indicated
  2. The callback function execution order is determined by the priority, and the same priority is executed in the order of mounting
  3. The input parameters of the chain are read-only Args and the parameter InitAcc for function modification on the chain.
  4. Regardless of how the execution of chain is terminated, the chain always returns a value, which depends on the return form.
    • The callback function returns with:
      • ok: ignore this operation, continue the chain execution with read-only Args and Acc returned by the previous function
      • {ok, NewAcc}: perform some operations, modify Acc content, continue chain execution with read-only Args and new NewAcc
    • The callback function also returns with:
      • stop: Stop the transfer of the chain and immediately return the result of Acc from the previous function
      • {stop, NewAcc}: it means to stop the transfer of the chain and immediately return the result of NewAcc from this modification

# Result Transparent

Chain execution direction ===>>>

(Args) --------------> Fun1 -----------> ok ------------> Fun2 -----------> ok ------------> Fun3 -----------> ok --------> [NO MORE FUNCS]
                        |              (Args)              |              (Args)              |              (Args)              |
                       stop                               stop                               stop                               ok
                        |                                  |                                  |                                  |
                       ok                                 ok                                 ok                                 ok
 Result <---------------┴<---------------------------------┴<---------------------------------┴<---------------------------------┘
1
2
3
4
5
6
7
8

Comparing this with the first execution mode, you can see that the execution mode that ignores the return value in the chain is actually a special case of the pass return value mode. This is equivalent to the case where the InitAcc value is ok and every callback function mounted on the chain returns ok | {ok, ok} | stop | {stop, ok}.

The above is the main design concept of the callback function chain, which regulates the execution logic of the callback function on the hook.

In the following two sections of HookPoint and callback function, all operations on hooks depend on Erlang code-level API provided by emqx (opens new window). They are the basis for the entire hook logic implementation.

# HookPoint

EMQX Broker is based on a client's key activities during its life cycle, and presets a large number of HookPoints. The preset mount points in the system are:

NameDescriptionExecution Timing
client.connectProcess connection packetWhen the server receives the connection packet from the client
client.connackIssue connection responseWhen the server is ready to issue a connection response message
client.connectedConnection succeedAfter client authentication is completed and successfully connected to the system
client.disconnectedDisconnectConnection layer of client is ready to close
client.authenticateConnection authenticationAfter client.connect is executed
client.authorizePub/Sub authorizationBefore publish/subscribe operation is executed
client.subscribeSubscribe to topicAfter receiving the subscription message, and before executing client.authorize
client.unsubscribeUnsubscribeAfter receiving the unsubscribe packet
session.createdSession creationWhen a client.connected is completed and a new session is created
session.subscribedSession subscription topicsAfter the subscription operation is completed
session.unsubscribedSession unsubscriptionAfter the unsubscription operation is completed
session.resumedSession resumewhen client.connected is executed and the old session information is successfully resumed
session.discardedSession discardedAfter the session was terminated due to discarded
session.takenoverSession takenoverAfter the session was terminated due to takenover
session.terminatedSession terminatedAfter the session was terminated due to other reason
message.publishMessage publishedBefore the server publishes (routes) the message
message.deliveredMessage deliveredBefore the message is ready to be delivered to the client
message.ackedMessage ackedAfter the message ACK is received from the client
message.droppedMessage droppedAfter the published messages are discarded

TIP

  • The session is discarded: When the client logs in with the method of clean session, if the client's session already exists on the server, the old session will be discarded.
  • The Session is taken over: When the client logs in with the method of Reserved Session, if the client's session already exists on the server, the old session will be taken over by the new connection

# Hook and unhook

EMQX Broker provides an API for the operation of hooking and unhooking.

Hook:

%% Name: name of hook(hook point), such as 'client.authenticate'
%% {Module, Function, Args}: Modules, methods, and additional parameters for callback functions
%% Priority:integar, 0 by default
emqx:hook(Name, {Module, Function, Args}, Priority).
1
2
3
4

After the hook is completed, the callback functions will be executed in the order of priority, or the order of hook for the same priority. All official plugin mount hooks have a priority of 0.

Unhook

%% Name:name of hook(hook point), such as'client.authenticate'
%% {Module, Function}: Modules and methods for callback functions
emqx:unhook(Name, {Module, Function}).
1
2
3

# Callback function

The input parameters and returned value of the callback function are shown in the following table:

(For parameter data structure, see:emqx_types.erl (opens new window))

Nameinput parameterReturned value
client.connectConnInfo:Client connection layer parameters
Props:Properties of MQTT v5.0 connection packets
New Props
client.connackConnInfo:Client connection layer parameters
Rc:returned code
Props: Properties of MQTT v5.0 connection response packets
New Props
client.connectedClientInfo: Client information parameters
ConnInfo: Client connection layer parameters
-
client.disconnectedClientInfo:Client information parameters
ConnInfo:Client connection layer parameters
ReasonCode:Reason code
-
client.authenticateClientInfo:Client information parameters
AuthNResult:Authentication results
New AuthNResult
client.authorizeClientInfo:Client information parameters
Topic:Publish/subscribe topic
PubSub: Publish/subscribe
AuthZResult:Authentication result
New AuthZResult
client.subscribeClientInfo:Client information parameters
Props:Properties parameters of MQTT v5.0 subscription messages
TopicFilters:List of topics of subscription
New TopicFilters
client.unsubscribeClientInfo:Client information parameters
Props:Properties parameters of MQTT v5.0 unsubscription messages
TopicFilters:List of topics of unsubscription
New TopicFilters
session.createdClientInfo:Client information parameters
SessInfo:Session information
-
session.subscribedClientInfo:Client information parameters
Topic:subscribed topic
SubOpts:Configuration options for subscribe operations
-
session.unsubscribedClientInfo:Client information parameters
Topic:unsubscribed topic
SubOpts:Configuration options for unsubscribe operations
-
session.resumedClientInfo:Client information parameters
SessInfo:Session information
-
session.discardedClientInfo:Client information parameters
SessInfo:Session information
-
session.takenoverClientInfo:Client information parameters
SessInfo:Session information
session.terminatedClientInfo:Client information parameters
Reason:Termination reason
SessInfo:Session information
-
message.publishMessage:Message objectNew Message
message.deliveredClientInfo:Client information parameters
Message:Message object
New Message
message.ackedClientInfo:Client information parameters
Message:Message object
-
message.droppedMessage:Message object
By:Dropped by
Reason:Drop reason
-

For the application of these hooks, see:emqx_plugin_template (opens new window)