Hi sapiens!
I testing few MQTT brokers for our need with this simple python scripts:
this for consume messages:
from datetime import datetime as dt
import paho.mqtt.client as mqtt
from pickle import loads
user_data = {'npp': 0, 'avg': 0, 'fcnt': 0}
client = mqtt.Client(client_id="XXX2", userdata=None, protocol=mqtt.MQTTv5, transport="tcp")
client.user_data_set(user_data)
client.connect(host="127.0.0.1", port=1883)
@client.connect_callback()
def on_connect(client, userdata, flags, rc, properties):
print("Connected with result code " + str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe(topic="xxxx01", qos=0)
@client.message_callback()
def on_message(client, userdata, msg):
x = loads(msg.payload)
userdata['avg'] = ((dt.now().timestamp() - x[2]) + userdata['avg']) / 2
if userdata['npp'] % 100 == 0:
print(f' msg.topic: {msg.topic} avg_ts: {userdata["avg"]:.12f} df>> {dt.utcfromtimestamp(x[2]).strftime("%Y-%m-%d %H:%M:%S.%f") } now() >>{dt.now().strftime("%Y-%m-%d %H:%M:%S.%f")}')
client.loop_forever()
this one for produce:
import os
import paho.mqtt.client as mqtt
import paho.mqtt.properties as props
from datetime import datetime
from pickle import dumps
SIZE = 1024 * 1024 * 7
client = mqtt.Client(client_id="XXX", userdata=None, protocol=mqtt.MQTTv5, transport="tcp")
client.max_inflight_messages_set(1000)
client.connect(host="127.0.0.1", port=1883)
i = 0
while i <= 100000:
message_info = client.publish(topic='xxxx01', payload=dumps((f'my message numbero: {i}', os.urandom(SIZE), datetime.now().timestamp())), retain=False, qos=0)
i += 1
client.disconnect()
And we got next results:
......
msg.topic: xxxx01 avg_ts: 0.052382999198 df>> 2022-08-03 08:05:41.531240 now() >>2022-08-03 11:05:41.584121
msg.topic: xxxx01 avg_ts: 0.058017518886 df>> 2022-08-03 08:05:41.571821 now() >>2022-08-03 11:05:41.635525
msg.topic: xxxx01 avg_ts: 0.057231320326 df>> 2022-08-03 08:05:41.609001 now() >>2022-08-03 11:05:41.665505
So average latency between publish and consume is 55ms, so even slower, than with Mosquitto.
I have a dream, that we can find MQTT broker with latency about 1-2 ms
Probably, I have make some tuning ?
additional info:
...sysctl:
fs.file-max = 9223372036854775807
fs.nr_open = 2097152
net.core.somaxconn = 32768
net.ipv4.tcp_max_syn_backlog = 16384
net.core.netdev_max_backlog = 16384
net.core.rmem_default = 262144
net.core.wmem_default = 262144
net.core.rmem_max = 16777216
net.core.wmem_max = 16777216
net.core.optmem_max = 16777216
net.ipv4.tcp_rmem = 1024 4096 16777216
net.ipv4.tcp_wmem = 1024 4096 16777216
net.ipv4.tcp_max_tw_buckets = 1048576
... limits:
root - nofile unlimited
* - nofile unlimited
My PC with 64Gb RAM, 24 Cores Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
emqx 5.0.4 at Debian 11
And I don’t need persitence in my broker, did not found, how to turn it off/
What I am doing wrong ?