除了这段代码过度使用
topic.get_producer
(在我的回答中提到
here
),第一个示例比第二个示例运行得快得多的原因是第二个示例有效地以同步模式运行。也就是说,生成的每一条消息都会导致在生成下一条消息之前等待传递确认。如果您对编写异步生成的应用程序感兴趣,那么您可能对更接近第一个示例的内容更感兴趣。正确的方法在
pykafka readme
:
with topic.get_producer(delivery_reports=True) as producer:
count = 0
while True:
count += 1
producer.produce('test msg', partition_key='{}'.format(count))
if count % 10 ** 5 == 0: # adjust this or bring lots of RAM ;)
while True:
try:
msg, exc = producer.get_delivery_report(block=False)
if exc is not None:
print 'Failed to deliver msg {}: {}'.format(
msg.partition_key, repr(exc))
else:
print 'Successfully delivered msg {}'.format(
msg.partition_key)
except Queue.Empty:
break
此代码生成
10 ** 5
异步生成消息,然后停止生成以使用传递报告队列,该队列为生成的每条消息包含一个报告。它打印所有报告的传递错误,并允许在使用完整个队列后恢复生产。这个
10**5
数字可以根据您的内存限制进行调整-它有效地限制了传递报告队列的大小。