# Apache Kafka

Apache Kafka 数据桥接实现了 EMQX 客户端消息和事件与 Apache Kafka(包括 Confluent) 的桥接,能够提供 EMQX 与企业应用之间高性能、高可靠的数据集成,有效降低应用复杂度并提升扩展性。

同时,EMQX Apache Kafka 集成提供了极高的数据吞吐能力,支持 Apache Kafka 的 SASL/SCRAM、SASL/GSSAPI 等多种安全认证方式以及 TLS 连接,是物联网数据集成首选方案之一。

前置准备

# 功能清单

# 快速开始

# 安装 Kafka

以 macOS 为例,安装并启动 Apache Kafka:

wget https://archive.apache.org/dist/kafka/3.3.1/kafka_2.13-3.3.1.tgz

tar -xzf  kafka_2.13-3.3.1.tgz

cd kafka_2.13-3.3.1

# 以 KRaft 启动 Kafka(可选)
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

bin/kafka-server-start.sh config/kraft/server.properties
1
2
3
4
5
6
7
8
9
10
11
12

更多详细内容请参考 Kafka Quick Start (opens new window)

# 创建 Kafka 主题

在 Kafka 中创建名为 testtopic-intesttopic-out 的两个主题:

bin/kafka-topics.sh --create --topic testtopic-in --bootstrap-server localhost:9092

bin/kafka-topics.sh --create --topic testtopic-out --bootstrap-server localhost:9092
1
2
3

提示

创建数据桥接前必须先在 Kafka 中创建好所需主题。

# 连接到 Apache Kafka

  1. 转到 Dashboard 数据集成 -> 数据桥接页面。
  2. 点击页面右上角的创建。
  3. 在数据桥接类型中选择 Kafka,点击下一步。
  4. 输入数据桥接名称,要求是大小写英文字母或数字组合。
  5. 输入 Kafka 连接信息,主机列表填写 127.0.0.1:9092,其他参数根据实际情况填写。
  6. 配置生产者桥接信息:
    1. MQTT 主题:要桥接的 MQTT 主题,此处填写 t/# 表示将匹配此主题的 MQTT 消息转发至 Kafka。您可以选择将此项留空,通过新建规则指定发往 Kafka 的数据。
    2. Kafka 主题名称:填写 Kafka 中预先创建好的主题 testtopic-in,此处暂不支持使用变量。 模版用于将规则或指定 MQTT 主题的消息转发到我们之前创建的 Kafka 主题。此处您可以使用默认配置,或通过变量构造消息模板。
  7. 高级配置(可选):根据情况配置最大批量字节数、压缩、分区选择策略等参数,详细请参考配置参数

提示

目前 Apache Kafka 集成仅支持单向桥接,即只可以将数据数据发送到 Kafka,无法从 Kafka 消费下发。

# 测试

使用 MQTTX 向 t/1 主题发布消息:

mqttx pub -i emqx_c -t t/1 -m '{ "msg": "Hello Kafka" }'
1

查看数据桥接运行统计,命中、发送成功次数应当 +1。

通过 Kafka 命令 testtopic-in 主题是否写入消息:

bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --topic testtopic-in --from-beginning
1