Confluent Kafka Python Producer 简介

pythonserver side programmingprogramming

如今,数据是数字生态系统的重要组成部分,每个现代应用程序都依赖于其有效的管理和处理。在这个数据驱动的时代,Apache Kafka 是一种强大的事件流技术,可提供高吞吐量解决方案。使用 Confluent 的 Apache Kafka Python 客户端,这些强大的功能可无缝集成到您的 Python 应用程序中。本文全面概述了 Confluent Kafka Python Producer,并提供了有用的示例来帮助您入门。

什么是 Confluent Kafka Python Producer?

作为 Confluent 的 Kafka Python 客户端库的一个组件,Confluent Kafka Python Producer 为 Apache Kafka 强大的数据流功能提供了 Pythonic 接口。与 Kafka Consumer 结合,它使 Python 程序能够充分参与基于 Kafka 的分布式系统,使它们能够向 Kafka 主题生成数据。

开始使用 Confluent Kafka Python Producer

Pip 是 Python 的软件包安装程序,可用于安装 Confluent Kafka Python Producer。要安装,请发出以下命令 -

pip install confluent-kafka

安装 Kafka Producer 后,您可以在 Python 脚本中导入它 -

from confluent_kafka import Producer

让 Confluent Kafka Python Producer 投入工作

现在让我们探索如何使用 Confluent Kafka Python Producer 向 Kafka 发送消息。

示例 1:生成一条简单消息

如何创建对 Kafka 主题的直接响应,如下所示关注 −

from confluent_kafka import Producer

p = Producer({'bootstrap.servers': 'localhost:9092'})

p.produce('mytopic', 'Hello, Kafka!')

p.flush()

此脚本通过创建 Kafka 生产器与位于 localhost:9092 的 Kafka 代理建立连接。为确保消息已发送,它首先向主题"mytopic"生成消息"Hello, Kafka!",然后再刷新生产器的消息队列。

示例 2:处理消息传递报告

此外,Confluent Kafka 生产器可以报告向其主题传递消息的成功情况 −

from confluent_kafka import Producer

def delivery_report(err, msg):
   if err is not None:
      print(f'Message delivery failed: {err}')
   else:
      print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

p = Producer({'bootstrap.servers': 'localhost:9092'})

p.produce('mytopic', 'Hello, Kafka!', callback=delivery_report)

p.flush()

此处,调用回调函数 delivery_report 时会给出消息,该函数是 produce 方法的一部分。

示例 3:生成键值消息

Kafka 消息通常包含键和值。创建键值消息的方法如下 

from confluent_kafka import Producer

p = Producer({'bootstrap.servers': 'localhost:9092'})

p.produce('mytopic', key='mykey', value='myvalue')

p.flush()

此脚本使用键"mykey"和"myvalue"为主题"mytopic"生成消息。

示例 4:生成 Avro 消息

借助数据序列化技术 Avro,您可以加密消息的架构。这在为将由不同消费者使用的主题创建通信时特别有用,每个消费者可能需要不同的格式。要创建 Avro 消息,请按照以下步骤操作

from confluent_kafka import avro, Producer
from confluent_kafka.avro import AvroProducer

value_schema = avro.load('value_schema.avsc')
key_schema = avro.load('key_schema.avsc')
value = {"name": "Value"}
key = {"name": "Key"}

avroProducer = AvroProducer({
   'bootstrap.servers': 'localhost:9092', 
   'schema.registry.url': 'http://127.0.0.1:8081'
   }, default_key_schema=key_schema, default_value_schema=value_schema)

avroProducer.produce(topic='my_topic', value=value, key=key)
avroProducer.flush()

此脚本为主题"my_topic"创建一条消息,其键和值符合所提供的 Avro 架构。

示例 5:配置消息压缩

为了节省带宽,您可以将 Kafka Producer 设置为在发送消息之前对其进行压缩。以下是一个例子 

from confluent_kafka import Producer

p = Producer({
   'bootstrap.servers': 'localhost:9092',
   'compression.type': 'gzip',
})

p.produce('mytopic', 'Hello, Kafka!')
p.flush()

此脚本创建了一个 Kafka Producer,它使用 gzip 压缩消息,然后再将其传递到主题。

结论

Confluent 的 Kafka Python Producer 是一个功能强大且适应性强的解决方案,可使 Python 应用程序充分利用 Kafka 强大的数据流功能。无论您是构建复杂的分布式系统还是只需要可靠的数据流,它都是一个至关重要的工具。

从安装到 Python 应用程序中的实际使用,本分析已解决所有问题。详细介绍了五个示例:构建简单消息、管理传递报告、生成键值消息、构建 Avro 消息和自定义消息压缩。

但请记住,Confluent 的 Kafka Python Producer 提供的内容远不止本书所涵盖的内容。我们建议查阅官方 Confluent 文档并继续尝试高级用法,例如与 Kafka Streams 集成或开发自定义序列化器。


相关文章