Handle (optionally compressed) batch of MQTT messages

I would like to implement batching of MQTT messages (to the same topic). Sometimes we need to notify a lot of messages and publishing each of them separately takes too much time. I’ve done some tests with compressing and batching these messages. That compressed byte array is then used as the payload for publishing a single MQTT message. The compression algorithm and the separator value are passed as headers on the message. That speeds up things quite a lot.

Of course this should be seamless to the consumer. Therefore I was wondering if I could use the hook functionality to write a server hook that decompresses the batch of messages again and delivers them as separate messages on the topic.

The code for the onMessagePublish() function would be something like this:

        @Override
        public void onMessagePublish(MessagePublishRequest request, StreamObserver<ValuedResponse> responseObserver) {
            DEBUG("onMessagePublish", request);

            Message requestMessage = request.getMessage();
			// Check compression and list separator headers.
			String compressionAlgorithmHeaderValue = requestMessage.getHeadersOrDefault(Header.compressionAlgorithm, null);
			String listSeparatorHeaderValue = requestMessage.getHeadersOrDefault(Header.listSeparator, null);
			Compression compression = compressionAlgorithmHeaderValue == null ? null : Compression.valueOf(compressionAlgorithmHeaderValue);
			String listSeparator = listSeparatorHeaderValue == null ? null : listSeparatorHeaderValue;
			
			List<String> messages = null;
			try {
				if (compression != null && listSeparator == null) {
					// Payload is a compressed string.
					messages = List.of(CompressionUtils.decompressString(requestMessage.getPayload().toByteArray(), compression));
				}
				else if (compression != null && listSeparator != null) {
					// Payload is a compressed string list.
					messages = CompressionUtils.decompressStrings(requestMessage.getPayload().toByteArray(), compression, listSeparator.charAt(0));
				}
				else if (compression == null && listSeparator != null) {
					// Payload is a string list.
					messages = new ArrayList<>();
					byte[] payloadBytes = requestMessage.getPayload().toByteArray();
					int startIndex = 0;
					for (int index = 0; index < payloadBytes.length; index++) {
						if (payloadBytes[index] == listSeparator.charAt(0)) {
							messages.add(new String(payloadBytes, startIndex, index - startIndex, Charsets.UTF_8));
							startIndex = index + 1;
						};
					}
					if (startIndex < payloadBytes.length) messages.add(new String(payloadBytes, startIndex, payloadBytes.length - startIndex, Charsets.UTF_8));
				}
				else {
					// Payload is a string.
					messages = List.of(new String(requestMessage.getPayload().toByteArray(), Charsets.UTF_8));
				}
			}
			catch (IOException e) {
				// Failed to decompress message(s).
				System.out.printf("onMessagePublish eception. %s. (request: %s)".formatted(e.getMessage(), request));
				responseObserver.onCompleted();
				return;
			}

			// Serve each message.
			for (Iterator<String> iterator = messages.iterator(); iterator.hasNext();) {
            	ByteString bstr = ByteString.copyFromUtf8(iterator.next());
            	
            	Message message = Message.newBuilder()
            			.setId(request.getMessage().getId())
            			.setNode(request.getMessage().getNode())
            			.setFrom(request.getMessage().getFrom())
            			.setTopic(request.getMessage().getTopic())
            			.setPayload(bstr).build();
            	// Not sure about the ValuedResponse.ResponsedType. Assuming CONTINUE until the last message and then STOP_AND_RETURN?
            	ValuedResponse reply = ValuedResponse.newBuilder()
            			.setType(iterator.hasNext() ? ValuedResponse.ResponsedType.CONTINUE : ValuedResponse.ResponsedType.STOP_AND_RETURN)
            			.setMessage(message).build();
            	responseObserver.onNext(reply);
			}

            responseObserver.onCompleted();
        }

Would something like that work?

Thanks,
Frank

As far as I see now the server hooks work on a single message. Nevertheless would it be possible to support server hooks that allow me to split one message back into multiple? Is that something that the developers from EMQX would be willing to consider (see above for the use case).

All input is welcome.

Thanks,
Frank