Specific Rate Limiting for Publish Messages on Topics in EMQX

Hello EMQX Community,

I hope this message finds everyone well. I am currently working on a project involving EMQX, and I’ve encountered a specific requirement that I need some guidance on.

My project requires the implementation of a rate limit on the number of messages published to specific topics by each client. The key aspect of this requirement is that the rate limiting should apply exclusively to the publish messages, not including other types of messages like heartbeats or QoS messages.

I have explored the existing rate_limiting feature in EMQX, but it appears to encompass all types of messages, including heartbeat and QoS messages. This broader scope does not align with my project’s requirements, where I need a more targeted approach.

To be more specific, I am looking for a solution where:

-A rate limit can be set for publish messages on specific topics.
-Once a client exceeds this publish message limit, further messages should not be distributed to the subscribers and instead be dropped.
-The rate limit should not consider non-publish messages like heartbeats or QoS messages.

I am reaching out to ask if anyone in this community has faced a similar challenge or has insights into how this can be achieved in EMQX. Any guidance, whether it’s a direct solution, a workaround, or pointing towards relevant documentation, would be immensely helpful.

In addition to the above, I have also started exploring creating a custom plugin. Using the provided template, I’ve managed to set up a basic plugin and started working with the on_message_publish hook. However, besides creating custom logs I have no idea how to prevent messages being routed to the subscribers. I’m able to change the topic of the message as a temporary solution. Also as far as I explored on_message_publish hook doesn’t have client information. How can I have client information of the message to that hook? Thank you in advance for taking the time to read and respond to my query.

Hello,

I think this can be possible with the plugin. The internal message structure is the following:

%% See 'Application Message' in MQTT Version 5.0
-record(message, {
    %% Global unique message ID
    id :: binary(),
    %% Message QoS
    qos = 0,
    %% Message from
    from :: atom() | binary(),
    %% Message flags
    flags = #{} :: emqx_types:flags(),
    %% Message headers. May contain any metadata. e.g. the
    %% protocol version number, username, peerhost or
    %% the PUBLISH properties (MQTT 5.0).
    headers = #{} :: emqx_types:headers(),
    %% Topic that the message is published to
    topic :: emqx_types:topic(),
    %% Message Payload
    payload :: emqx_types:payload(),
    %% Timestamp (Unit: millisecond)
    timestamp :: integer(),
    %% Miscellaneous extensions, currently used for OpenTelemetry context propagation
    extra = #{} :: term()
}).

from field is the client ID of the publisher.

1 Like

Hello,

Thank you very much for your response. I appreciate the suggestion you provided and plan to explore that option as well. I wanted to share that I’ve made some progress today by deploying a gRPC server. This approach allowed me to successfully retrieve client information through the ‘from’ field and I have completed the initial development of the topic-based rate limiter. It works fine now but probably this is not the best scalable solution.

I am certainly interested in trying a similar approach in plugin development. Previously, I hadn’t delved into it in detail, primarily because I’m not very familiar with Erlang and faced some challenges while exploring.

Thanks again for your valuable input!

Hello,

Indeed, the gRPC plugin may not be the most optimal solution for this particular task, because ideally rate limiting should happen as close to the source of the request as possible. For maximum efficiency, Erlang plugin is the way to go.

You are correct. I’ve successfully retrieved client information using the following code in the on_message_publish hook:

on_message_publish(Message = #message{from = ClientId, topic = <<"test">>}, _Env)

Now, I have a question regarding controlling message delivery to subscribers. In typical usage of on_message_publish, I see it concludes with {ok, Message}:

on_message_publish(Message, _Env) ->
    io:format("Publish ~p~n", [emqx_message:to_map(Message)]),
    {ok, Message}.

However, I’m looking to prevent the message from being processed further and delivered to subscribers. I attempted using {stop} instead of {ok, Message}. thinking it would halt the message’s propagation, but it’s still being transmitted.

In gRPC approach I’ve set headers['allow_publish'] = 'false' to stops the transmission, should I follow similar approach? What’s the correct way to achieve this in plugin? So far tried this with no luck:

BlockedMessage = Message#message{headers = maps:put("allow_publish", false, Message#message.headers)},
{stop, BlockedMessage}

As a temporary workaround, I’ve considered altering the message content to an empty string and redirecting it to a different topic. However, I’m eager to learn about the best practices for this situation. Additionally, it would be very helpful if you could share the source or reference for your solution. This would allow me to delve deeper into the matter on my own, saving you from any further questions on this topic. Your guidance is much appreciated!

Hello,

Your investigation is correct. Simply returning {stop, ...} from the hook will abort further processing of the hooks, but the message still might end up dispatching. In order to drop it, you do indeed need to set allow_publish header of the message to false. Here’s the relevant code of the broker:

Code that you’ve written is almost correct, except "allow_publish" in quotes creates a string, whereas an atom is expected. allow_publish without quotes will create an atom.

Hello again,

Your quick and accurate responses were incredibly helpful in completing my plugin. Thank you once again for your support. Looking forward to more interactions in the future!

1 Like