代码之家  ›  专栏  ›  技术社区  ›  GregH

在生成一批消息后,我是否可以从卡夫卡产品中调用get\u delivery\u report一次?

  •  1
  • GregH  · 技术社区  · 7 年前

    我正在使用pykafka,并且有一个生产者正在异步生产并使用delivery\u报告。我知道必须使用“get\u delivery\u report”方法读取交付报告,并且我知道必须在与生成的消息相同的线程中调用它。但是,是否必须在每次调用后调用get\u DelieEvery\u report才能生成,还是可以调用一次?如果发生多个失败的发送,则get\u delivery\u报告将返回所有失败的发送。例如,假设我异步发送100条消息:

    for x in xrange(100):
       with topic.get_producer(delivery_reports=Try, sync=False) as producer:
          producer.produce("Test Message")
    
    msg, exc = producer.get_delivery_report()
    

    或者必须是:

    for x in xrange(100):
       with topic.get_producer(delivery_reports=Try, sync=False) as producer:
          producer.produce("Test Message")
          msg, exc = producer.get_delivery_report()
    

    第一个似乎比第二个跑得快得多。

    1 回复  |  直到 7 年前
        1
  •  0
  •   Emmett Butler    7 年前

    除了这段代码过度使用 topic.get_producer (在我的回答中提到 here ),第一个示例比第二个示例运行得快得多的原因是第二个示例有效地以同步模式运行。也就是说,生成的每一条消息都会导致在生成下一条消息之前等待传递确认。如果您对编写异步生成的应用程序感兴趣,那么您可能对更接近第一个示例的内容更感兴趣。正确的方法在 pykafka readme :

    with topic.get_producer(delivery_reports=True) as producer:
        count = 0
        while True:
            count += 1
            producer.produce('test msg', partition_key='{}'.format(count))
            if count % 10 ** 5 == 0:  # adjust this or bring lots of RAM ;)
                while True:
                    try:
                        msg, exc = producer.get_delivery_report(block=False)
                        if exc is not None:
                            print 'Failed to deliver msg {}: {}'.format(
                                msg.partition_key, repr(exc))
                        else:
                            print 'Successfully delivered msg {}'.format(
                            msg.partition_key)
                    except Queue.Empty:
                        break
    

    此代码生成 10 ** 5 异步生成消息,然后停止生成以使用传递报告队列,该队列为生成的每条消息包含一个报告。它打印所有报告的传递错误,并允许在使用完整个队列后恢复生产。这个 10**5 数字可以根据您的内存限制进行调整-它有效地限制了传递报告队列的大小。