# Amazon Kinesis

EMQX 支持与 Amazon Kinesis Data Streams (opens new window) 无缝集成,从而能进一步与其他 AWS 服务集成,实时提取、处理和分析 MQTT 数据。

提示

EMQX 企业版功能。EMQX 企业版可以为您带来更全面的关键业务场景覆盖、更丰富的数据集成支持,更高的生产级可靠性保证以及 24/7 的全球技术支持,欢迎免费试用 (opens new window)

前置准备

# 功能清单

# 快速开始教程

本节介绍如何配置 Amaxon Kinesis 数据桥接,包括如何设置 Kinesis 服务、创建数据桥接和转发数据到 Kinesis Data Stream 的规则以及测试数据桥接和规则等主题。

# 在 Amazon Kinesis Data Streams 中创建数据流

按照以下步骤通过 AWS 管理控制台创建数据流(详细信息请参阅本教程 (opens new window))。

  1. 登录 AWS 管理控制台并打开 Kinesis 控制台 (opens new window)
  2. 在导航栏中,展开区域选择器并选择一个区域。
  3. 选择创建数据流
  4. 创建 Kinesis 流页面,为您的数据流输入名称,然后选择按需容量模式。

# 在本地模拟 Amazon Kinesis Data Streams

为了便于开发和测试,您可以通过 LocalStack (opens new window) 在本地模拟 Amazon Kinesis Data Streams 服务。有了 LocalStack,您可以在本地机器上运行 AWS 应用,无需连接到远程云提供商。

  1. 安装 LocalStack 并使用 Docker Image 运行:

    # To start the LocalStack docker image locally
    docker run --name localstack -p '4566:4566' -e 'KINESIS_LATENCY=0' -d localstack/localstack:2.1
    
    # Access the container
    docker exec -it localstack bash
    
    1
    2
    3
    4
    5
  2. 创建一个只有一个分片的流,名称设为 my_stream

    awslocal kinesis create-stream --stream-name "my_stream" --shard-count 1
    
    1

# 创建 Amazon Kinesis 数据桥接

  1. 转到 EMQX Dashboard,点击集成->数据桥接

  2. 点击页面右上角的创建

  3. 创建数据桥接页面,点击选择 Amazon Kinesis,然后点击下一步

  4. 为数据桥接输入一个名称。名称应为大写/小写字母和数字的组合。

  5. 输入 Amazon Kinesis Data Streams 服务的连接信息:

  6. Payload Template 字段中,将其留空或定义模板。

    • 如果留空,它将使用 JSON 格式编码 MQTT 消息中的所有可见输入,例如 clientid、topic、payload 等。
    • 如果使用定义的模板,${variable_name} 形式的占位符将使用 MQTT 上下文中的相应值进行填充。例如,如果 MQTT 消息主题是 my/topic${topic} 将被替换为 my/topic
  7. 高级配置(可选),根据情况配置队列与批量等参数,详细请参考数据桥接简介中的配置参数。

  8. 在点击创建之前,您可以点击测试连接性以测试桥接。

  9. 点击创建按钮完成数据桥接创建。

    在弹出的创建成功对话框中您可以点击创建规则,继续创建规则以指定需要写入 Amazon Kinesis 的数据。您也可以按照创建 Amazon Kinesis 数据桥接规则中的步骤来创建规则。

# 创建 Amazon Kinesis 数据桥接规则

接下来您可以创建一条规则以指定需要写入 Amazon Kinesis 的数据。

  1. 在 EMQX Dashboard 左侧导航栏中点击集成 -> **规则 **.

  2. 点击页面右上角的创建

  3. Input my_rule as the rule ID.

  4. 输入规则 ID my_rule,在 SQL 编辑器中输入规则。例如将 t/# 主题的 MQTT 消息存储至 Amazon Kinesis Data Streams,需输入以下 SQL 语法:

    注意:如果您希望制定自己的 SQL 语法,需要确保规则选出的字段(SELECT 部分)包含所有 SQL 模板中用到的变量。

    SELECT
      *
    FROM
      "t/#"
    
    1
    2
    3
    4
  5. 点击添加动作,在动作下拉框中选择使用数据桥接转发选项,选择先前创建好的 Amazon Kinesis 数据桥接。点击添加

  6. 点击最下方创建按钮完成规则创建。

至此您已经完成整个 Amazon Kinesis 数据桥接创建过程,可以前往 集成 -> Flows 页面查看拓扑图,此时应当看到 t/# 主题的消息经过名为 my_rule 的规则处理,处理结果交由 Amazon Kinesis 存储。

# 测试桥接和规则

  1. 使用 MQTTX 向 t/my_topic 主题发布一条消息:

    mqttx pub -i emqx_c -t t/my_topic -m '{ "msg": "hello Amazon Kinesis" }'
    
    1
  2. 查看 Amazon Kinesis 的数据桥接中的运行统计,命中、发送成功次数均 +1。

  3. 转到 Amazon Kinesis 数据查看器 (opens new window)。您应该可以看到数据流指定分片内的数据记录。

# 使用 LocalStack 查看数据

如果您使用 LocalStack,通过以下步骤查看接收到的数据。

  1. 在发送数据到桥接之前,使用以下命令获取 ShardIterator

    awslocal kinesis get-shard-iterator --stream-name my_stream --shard-id shardId-000000000000 --shard-iterator-type LATEST
    {
    "ShardIterator": "AAAAAAAAAAG3YjBK9sp0uSIFGTPIYBI17bJ1RsqX4uJmRllBAZmFRnjq1kPLrgcyn7RVigmH+WsGciWpImxjXYLJhmqI2QO/DrlLfp6d1IyJFixg1s+MhtKoM6IOH0Tb2CPW9NwPYoT809x03n1zL8HbkXg7hpZjWXPmsEvkXjn4UCBf5dBerq7NLKS3RtAmOiXVN6skPpk="
    }
    
    1
    2
    3
    4
  2. 使用 MQTTX 向 t/my_topic 主题发布一条消息:

    mqttx pub -i emqx_c -t t/my_topic -m '{ "msg": "hello Amazon Kinesis" }'
    
    1
  3. 查看数据记录并解码接收到的数据:

    awslocal kinesis get-records --shard-iterator="AAAAAAAAAAG3YjBK9sp0uSIFGTPIYBI17bJ1RsqX4uJmRllBAZmFRnjq1kPLrgcyn7RVigmH+WsGciWpImxjXYLJhmqI2QO/DrlLfp6d1IyJFixg1s+MhtKoM6IOH0Tb2CPW9NwPYoT809x03n1zL8HbkXg7hpZjWXPmsEvkXjn4UCBf5dBerq7NLKS3RtAmOiXVN6skPpk="
    {
        "Records": [
            {
                "SequenceNumber": "49642650476690467334495639799144299020426020544120356866",
                "ApproximateArrivalTimestamp": 1689389148.261,
                "Data": "eyAibXNnIjogImhlbGxvIEFtYXpvbiBLaW5lc2lzIiB9",
                "PartitionKey": "key",
                "EncryptionType": "NONE"
            }
        ],
        "NextShardIterator": "AAAAAAAAAAFj5M3+6XUECflJAlkoSNHV/LBciTYY9If2z1iP+egC/PtdVI2t1HCf3L0S6efAxb01UtvI+3ZSh6BO02+L0BxP5ssB6ONBPfFgqvUIjbfu0GOmzUaPiHTqS8nNjoBtqk0fkYFDOiATdCCnMSqZDVqvARng5oiObgigmxq8InciH+xry2vce1dF9+RRFkKLBc0=",
        "MillisBehindLatest": 0
    }
    
    echo 'eyAibXNnIjogImhlbGxvIEFtYXpvbiBLaW5lc2lzIiB9' | base64 -d
    { "msg": "hello Amazon Kinesis" }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17