From v4.1, EMQ X MQTT broker provides the specified plugin that supports multiple languages emqx_extension_hook. Currently, it is supported that use other programming languages to process the hook events of EMQ X. The developer can use Python or Java to quickly develop their plugins or do some expansions based on the official functions to satisfy their business scenarios. For example:
Note:the message hook is only supported in the enterprise.
Python and Java drivers are based on the processes Erlang/OTP-Port to implement communication, and have very high throughput performance. This article will take Java expansion as an example to introduce how to use EMQ X cross-language expansion.
io.emqx.extension.jar
and erlport.jar
to the dependency of the projectexamples/SampleHandler.java
to your projectSampleHandler.java
to ensure successfully compile.You need to deploy sdk
and code files into EMQ X after compiling all the source code.
io.emqx.extension.jar
into emqx/data/extension
directory.class
file, such as SampleHandler.class
into emqx/data/extension
directoryemqx/etc/plugins/emqx_extension_hook.conf
:exhook.drivers = java
## Search path for scripts or library
exhook.drivers.java.path = data/extension/
exhook.drivers.java.init_module = SampleHandler
Enable plugin emqx_extension_hook
. If configuration error or write wrong Java code, it can not be enabled normally. After it is enabled, try to establish the MQTT connection and observer the running situation of the business.
The example of the program Main.java is as follows. This program inherits the class DefaultCommunicationHandler
of the SDK. This code example demonstrates how to mount all hooks of the EMQ X system.
import emqx.extension.java.handler.*;
import emqx.extension.java.handler.codec.*;
import emqx.extension.java.handler.ActionOptionConfig.Keys;
public class SampleHandler extends DefaultCommunicationHandler {
@Override
public ActionOptionConfig getActionOption() {
ActionOptionConfig option = new ActionOptionConfig();
option.set(Keys.MESSAGE_PUBLISH_TOPICS, "#");
option.set(Keys.MESSAGE_DELIVERED_TOPICS, "#");
option.set(Keys.MESSAGE_ACKED_TOPICS, "#");
option.set(Keys.MESSAGE_DROPPED_TOPICS, "#");
return option;
}
// Clients
@Override
public void onClientConnect(ConnInfo connInfo, Property[] props) {
System.err.printf("[Java] onClientConnect: connInfo: %s, props: %s\n", connInfo, props);
}
@Override
public void onClientConnack(ConnInfo connInfo, ReturnCode rc, Property[] props) {
System.err.printf("[Java] onClientConnack: connInfo: %s, rc: %s, props: %s\n", connInfo, rc, props);
}
@Override
public void onClientConnected(ClientInfo clientInfo) {
System.err.printf("[Java] onClientConnected: clientinfo: %s\n", clientInfo);
}
@Override
public void onClientDisconnected(ClientInfo clientInfo, Reason reason) {
System.err.printf("[Java] onClientDisconnected: clientinfo: %s, reason: %s\n", clientInfo, reason);
}
// Determine the authentication result, return true or false
@Override
public boolean onClientAuthenticate(ClientInfo clientInfo, boolean authresult) {
System.err.printf("[Java] onClientAuthenticate: clientinfo: %s, authresult: %s\n", clientInfo, authresult);
return true;
}
// Determine the ACL check result, return true or false
@Override
public boolean onClientCheckAcl(ClientInfo clientInfo, PubSub pubsub, Topic topic, boolean result) {
System.err.printf("[Java] onClientCheckAcl: clientinfo: %s, pubsub: %s, topic: %s, result: %s\n", clientInfo, pubsub, topic, result);
return true;
}
@Override
public void onClientSubscribe(ClientInfo clientInfo, Property[] props, TopicFilter[] topic) {
System.err.printf("[Java] onClientSubscribe: clientinfo: %s, topic: %s, props: %s\n", clientInfo, topic, props);
}
@Override
public void onClientUnsubscribe(ClientInfo clientInfo, Property[] props, TopicFilter[] topic) {
System.err.printf("[Java] onClientUnsubscribe: clientinfo: %s, topic: %s, props: %s\n", clientInfo, topic, props);
}
// Sessions
@Override
public void onSessionCreated(ClientInfo clientInfo) {
System.err.printf("[Java] onSessionCreated: clientinfo: %s\n", clientInfo);
}
@Override
public void onSessionSubscribed(ClientInfo clientInfo, Topic topic, SubscribeOption opts) {
System.err.printf("[Java] onSessionSubscribed: clientinfo: %s, topic: %s\n", clientInfo, topic);
}
@Override
public void onSessionUnsubscribed(ClientInfo clientInfo, Topic topic) {
System.err.printf("[Java] onSessionUnsubscribed: clientinfo: %s, topic: %s\n", clientInfo, topic);
}
@Override
public void onSessionResumed(ClientInfo clientInfo) {
System.err.printf("[Java] onSessionResumed: clientinfo: %s\n", clientInfo);
}
@Override
public void onSessionDiscarded(ClientInfo clientInfo) {
System.err.printf("[Java] onSessionDiscarded: clientinfo: %s\n", clientInfo);
}
@Override
public void onSessionTakeovered(ClientInfo clientInfo) {
System.err.printf("[Java] onSessionTakeovered: clientinfo: %s\n", clientInfo);
}
@Override
public void onSessionTerminated(ClientInfo clientInfo, Reason reason) {
System.err.printf("[Java] onSessionTerminated: clientinfo: %s, reason: %s\n", clientInfo, reason);
}
// Messages
@Override
public Message onMessagePublish(Message message) {
System.err.printf("[Java] onMessagePublish: message: %s\n", message);
return message;
}
@Override
public void onMessageDropped(Message message, Reason reason) {
System.err.printf("[Java] onMessageDropped: message: %s, reason: %s\n", message, reason);
}
@Override
public void onMessageDelivered(ClientInfo clientInfo, Message message) {
System.err.printf("[Java] onMessageDelivered: clientinfo: %s, message: %s\n", clientInfo, message);
}
@Override
public void onMessageAcked(ClientInfo clientInfo, Message message) {
System.err.printf("[Java] onMessageAcked: clientinfo: %s, message: %s\n", clientInfo, message);
}
}
SampleHandler
mainly includes two sections:
Override the method getActionOption
. This method configures hooks related to Message and specifies the list of topics that need to be in effect.
Configuration items | Corresponding hook |
---|---|
MESSAGE_PUBLISH_TOPICS | message_publish |
MESSAGE_DELIVERED_TOPICS | message_delivered |
MESSAGE_ACKED_TOPICS | message_acked |
MESSAGE_DROPPED_TOPICS | message_dropped |
Override the method on<hookName>
. These methods are the callback function to deal with hook events. The method how to name function is that add the prefix on
in the front of each variant hook name. The way of variant is that use CamelCase after removing the underline of the hook name, for example, the hook client_connect corresponds function name onClientConnect. The events that are generated by EMQ X such as: connect, publish, subscribe, etc, will finally be distributed to the callback function of these hook events. Next, the callback function can operate every attribute and status. The program example only prints each parameter. If you only care about partly hook events, only need to override the callback function of this part hook events instead of overriding all the callback functions.
The timing of executing each callback function and the list of supported hooks are the same as the build-in hooks of EMQ X, please refer to Hooks - EMQ X.
The simplest method is inheriting the superclass DefaultCommunicationHandler
, when you implement your expansion programs. This superclass wraps the binding of each hook and callback function, and further wraps the parameter data structure involved in the callback function to facilitate a quick start.
If you have higher requirements for the controllability of Java extensions and the class DefaultCommunicationHandler
can not satisfy your requirements, you can control code logic from a lower layer through implementing interface CommunicationHandler
.
package emqx.extension.java.handler;
public interface CommunicationHandler {
public Object init();
public void deinit();
}
init()
: for initialization, declaring which hooks are required in the extension, and the configuration of mountingdeinit()
: for logoutFor the detailed introduction of data format, please refer to the design documentation.