跳到主要内容

DataOps博客

欢迎改变的地方

向Amazon S3发送Kafka消息

By 张贴在 工程 2020年12月9日

在这篇文章中, 电子游戏网址大全将看看集成电子游戏厅数据收集器引擎(SDC)的最佳实践。, 一个快速的数据摄取引擎, with Kafka, 和 take a deep dive into the details 和 show how to send Kafka messages to Amazon S3.

Apache Kafka is a distributed event streaming platform used by thous和s of companies for streaming analytics 和 for other mission-critical applications. This has become one of the most common components of the big data ecosystem within organizations.

发送Kafka消息给Amazon S3 | 卡夫卡的消费者

In some scenarios an organization may already have an existing 数据管道 bringing messages to Kafka. 在这种情况下, SDC can take on the role of a consumer 和 h和le all of the logic for taking data from Kafka to wherever it needs to go. 例如, you could deliver data from Kafka message to Amazon S3/HDFS/Elasticsearch or whatever destination you choose without writing any code. 使用streamset 卡夫卡的起源 you can take Kafka messages 和 batch them together into appropriately sized 和 push them to desired destination.

向S3发送Kafka消息Kafka消息消费者起源:

  1. 卡夫卡的消费者
  2. 卡夫卡MultiTopic消费者

为什么有两个卡夫卡起源?

假设你有一个应用程序需要从Kafka主题读取消息, 对它们进行一些验证, 并将结果写入另一个数据存储. 在这种情况下,您将创建一个订阅适当主题的消费者管道, 开始接收信息, 验证它们并编写结果.

向S3发送Kafka消息

这可能会在一段时间内有效, but what if the rate at which producers write messages to the topic exceeds the rate at which your application can validate them?  如果您仅限于一个消费者读取和处理数据, 你的申请可能会越来越落后, 无法跟上传入消息的速度. 显然,需要从主题来衡量消费. 就像多个生产者可以写同一个主题一样, we need to allow multiple consumers to read from the same topic 和 as well read from multiple topics, 在它们之间分割数据. 这就是Kafka多主题消费者起源派上用场的地方. 该源使用多个线程来支持并行处理数据.

消费者起源 can then take advantage of additional processors 和 memory to run several consumer threads in parallel. Kafka将跨分区分发消息, 负载将在消费者线程之间共享.

向S3发送Kafka消息

哪一个对我来说是正确的选择?

如果您的SDC引擎运行在具有高资源可用性的更强大的机器上, 然后使用Kafka多主题原点垂直缩放.

向S3发送Kafka消息然而, 如果您计划水平扩展,这是一个更划算的选择, 然后使用Kafka消费者再次分区Kafka主题, 但这次在多个Data Collector实例上运行管道. This has some extra overhead of maintenance if you are manually running multiple instances of this pipeline over multiple data collector instances, 但这很容易实现 电子游戏厅控制中心 通过同一个作业启动多个管道实例.

卡夫卡对S3先进的功能

让电子游戏网址大全看看在两个Kafka起源中都可以利用的一些高级特性.

卡夫卡的安全 

You can configure both Kafka stages – 卡夫卡的消费者 和 Kafka Multitopic Consumer – to connect securely through the following methods:

  • Kerberos
  • SSL / TLS
  • SSL / TLS和Kerberos

Enabling security requires configuring additional Kafka configuration properties in the stage in addition to completing the prerequisite tasks.

记录头属性 

Both 卡夫卡的起源s creates 记录头属性s that include information about the originating file for the record. 特别是当原始数据处理Avro数据时,它将Avro模式包含在 avroSchema 记录头属性.

  • avroSchema:在处理Avro数据时,提供Avro模式
  • offset:记录产生的偏移量
  • partition:记录所在的分区
  • topic:记录起源的主题

向Amazon S3发送Kafka消息所需的Key

You can configure both origins to capture the message keys included in each Kafka message 和 store them in generated records. Kafka消息键可以是字符串值或者Avro消息, 取决于你的Kafka系统是如何配置的. You can store message key values in a record header/field or both 和 can use the values in pipeline processing logic or to write them to destination systems. 如果不需要消息键,可以丢弃它们. 卡夫卡的消费者和Kafka Multi-topic Consumer源默认丢弃消息键.

数据管道概述与实现

The goal here is to read Avro files from a file system directory 和 write them to a Kafka topic using the 电子游戏厅 Kafka Producer. We’ll then use a second pipeline configured with a 卡夫卡的消费者 to read all the messages from that topic, 执行一组转换以屏蔽数据并确定信用卡的类型. And finally send Kafka messages to Amazon S3 by partitioning the data on credit card type 和 make sure that the data stored on S3 is encrypted. 在本博客的第二部分,电子游戏网址大全将重新设计电子游戏网址大全的 数据管道 for scaling 和 for h和ling huge amounts of data running through during your send of Kafka Messages to Amazon.

以下是JSON格式的数据:

{
  “transaction_date”:“dd / mm / YYYY”,
  “card_number”:"0000-0000-0000-0000",
  “card_expiry_date”:“mm / YYYY”,
  “card_security_code”:"0000",
  “purchase_amount”:"$00.00",
  “描述”:“购买的交易说明”
}

先决条件 

  • 一个电子游戏厅数据收集器引擎的工作实例.18.1+)
  • 一个工作的Kafka实例(参见 快速入门 便于本地设置. 最后一次测试版本1.1.0,但旧的和新的版本也应该工作.)

使用Kafka producer发布数据到Kafka topic

让电子游戏网址大全创建Kafka topic——”demo-topic——通过运行此命令

bin / Kafka-topics.Sh——create——topic demo-topic——bootstrap-server localhost:9092 ——分区 3

现在让电子游戏网址大全使用一个简单的数据管道将一些样本数据推送到这个Kafka主题. We will read Avro files from a file system directory 和 write them to a Kafka topic using the 电子游戏厅 Kafka Producer in SDC记录数据格式. Then use another 数据管道 to read the 署记录 data from Kafka 和 write it to Elasticsearch 和 convert data to Avro for S3.

目录到Kafka生产者

使用Kafka消息并将其存储在Amazon S3中

Kafka消费者到Amazon S3

卡夫卡的消费者

  • 让电子游戏网址大全配置卡夫卡的起源来使用来自本地Kafka setup和o的消息n 数据格式 选项卡选择 署记录.

场转换器

  • 恰巧在Avro中,卡号字段被定义为一个整数. 电子游戏网址大全将把它转换成一个字符串值. 所以类型的/card_number',并在' Convert to type '中将其设置为键入字符串. 其余保持默认值.

Jython评估者

  • In this stage we’ll use a small piece of python code to look at the first few digits of the card number 和 figure out what type of card it is. 电子游戏网址大全会将那个card类型添加到一个名为'credit_card_type‘.

场戴面具的人

  • The last step of the process is to mask the card number so that the last 4 digits of the card is all that makes it to the data stores.

向Amazon S3写入Kafka消息

  • 电子游戏网址大全将数据转换回Avro格式,并将其存储在S3 bucket中.
  • On ‘数据格式“选择”Avro”和“在管道配置“对”Avro模式位置”. 然后为Avro schema指定以下模式:
{“名称” : “有条件现金转移支付.avro”,
 “类型”: “记录”,
 “名称”: “有条件现金转移支付”,
 “医生”: “信用卡交易测试”,
 “字段”: [
            {“名称”: “transaction_date”, “类型”: “字符串”},
            {“名称”: “card_number”, “类型”: “字符串”},
            {“名称”: “card_expiry_date”, “类型”: “字符串”},
            {“名称”: “card_security_code”, “类型”: “字符串”},
            {“名称”: “purchase_amount”, “类型”: “字符串”},
            {“名称”: “描述”, “类型”: “字符串”}
           ]
}
  • 为了节省S3 bucket上的存储空间,让电子游戏网址大全在写入数据时压缩数据. Select BZip2 随着 Avro压缩编解码器.
  • 对分区进行写操作。credit_card_type’田地,电子游戏网址大全要用 ${记录:价值(/ credit_card_type)} 表达式为 分区的前缀. With this expression, the destination will create 和 write records to partitions based on the “credit_card_type” 记录中的值.
  • Protecting Sensitive Data: you can use any of the options below for server-side encryption on Amazon S3 to protect sensitive data. 例如,在电子游戏网址大全的例子中,信用卡号码.
    • Amazon s3管理的加密密钥(SSE-S3)
    • AWS kms托管加密密钥(SSE-KMS)
    • 客户提供的加密密钥(SSE-C)

manage-smart-data-管道

电子游戏厅使数据工程师能够构建端到端智能数据管道. Spend your time building, enabling 和 innovating instead of maintaining, rewriting 和 fixing.

Amazon S3上的输出

注意,S3上的输出将被“credit_card_type”

Amazon S3输出

针对大工作负载的数据管道重新设计

现在让电子游戏网址大全假设你必须这么做 扩展上述解决方案 given the scenario that you are dealing with large amounts of data 和 there are multiple upstream applications which are writing to multiple Kafka topics. So the rate at which producers write messages to the topic has exceeded the rate at which this pipeline consumes. 

Also assume there is one more upstream pipeline/application which generates the similar credit card data 和 stores that info in Kafka topic `demo-topic-2`.

bin / Kafka-topics.Sh——create——topic demo-topic-2 ——bootstrap-server localhost:9092 ——分区 2

So instead of recreating the entire 数据管道 from scratch we can easily redesign the existing pipeline by swapping out the origin with Kafka Multitopic Consumer origin. 电子游戏网址大全可以通过增加线程数来垂直扩展管道. 如果需要的话,电子游戏网址大全可以同时阅读多个主题.

Kafka Multitopic到Amazon S3

请注意 that the number of threads should always be less than or equal to the number of partitions which we are reading to achieve better parallelism. 这里电子游戏网址大全有 demo-topic3个分区demo-topic-22分区. 因此,电子游戏网址大全可以将线程数设置为5,以实现更高的并行度.

请注意:样品 data管道 可以在GitHub上找到. 

结论

在这个博客中,电子游戏网址大全学习了如何使用 电子游戏厅 as a Kafka consumer 和 when to choose 卡夫卡的消费者 origin vs Kafka Multitopic Consumer origin to process large amounts of Kafka messages 和 take advantage of parallelism. We also explored various AWS S3 destination capabilities like partitioning 和 server side encryption. 

Here are some resources that will help jump start your journey with designing 和 deploying 数据管道s:

回到顶部

电子游戏网址大全使用cookie来改善您对电子游戏网址大全网站的体验. 单击“允许所有人同意”并继续访问电子游戏网址大全的网站. 隐私政策