代码之家  ›  专栏  ›  技术社区  ›  Hephaestus

卡夫卡python 1.3.3:KafkaProducer。使用显式密钥发送无法将消息发送到代理

  •  4
  • Hephaestus  · 技术社区  · 7 年前

    ( 可能地 Can't send a keyedMessage to brokers with partitioner.class=kafka.producer.DefaultPartitioner

    producer = kafka.KafkaProducer(bootstrap_servers=[some_addr],
                                   retries=3)
    ...
    msg = json.dumps(some_message)
    res = producer.send(some_topic, value=msg)
    

    producer = kafka.KafkaProducer(bootstrap_servers=[some_addr],
                                   key_serializer=str.encode,
                                   retries=3)
    ...
    try: 
        key = some_message[0]
    except:
        key = None
    msg = json.dumps(some_message)
    res = producer.send(some_topic, value=msg, key=key)
    

    但是,使用此代码, 消息总是从程序中发送到代理。我已经验证了从 some_message 始终是有效字符串。大概我不需要定义我自己的 partitioner ,因为,根据文件:

    此外,使用新代码,当我试图确定我的 send 通过呼叫 res.get kafka.FutureRecordMetadata ), 那个 呼叫抛出一个 TypeError 消息异常 descriptor 'encode' requires a 'str' object but received a 'unicode' .

    (作为一个附带问题,我不确定我会用它做什么。) FutureRecordMetadata succeeded failed 方法,但文档对此保持沉默。文件 假设 RecordMetadata

    无论如何:我不是唯一一个使用kafka python 1.3.3的人,他曾经尝试用分区键发送消息,我还没有在交互管上看到任何描述类似问题的东西(除了我在本文顶部提到的SO问题)。

    我当然愿意相信我做错了什么,但我不知道那可能是什么。是否有一些额外的参数需要提供给 KafkaProducer

    2 回复  |  直到 7 年前
        1
  •  3
  •   Hephaestus    7 年前

    根本问题是我的关键价值是 unicode ,尽管我很确信这是一个 str 因此选择 str.encode key_serializer 是不合适的,是什么导致了 res.get 密钥序列化程序 key.encode('utf-8') 足以发布我的消息,并按预期进行分区。

    documentation 没有详细说明 FutureRecordMetadata get 方法可以提高。文档中唯一的使用示例:

    # Asynchronous by default
    future = producer.send('my-topic', b'raw_bytes')
    
    # Block for 'synchronous' sends
    try:
        record_metadata = future.get(timeout=10)
    except KafkaError:
        # Decide what to do if produce request failed...
        log.exception()
        pass
    

    KafkaError , 这不是真的 收到 任何 异步发布机制在尝试获取消息时遇到的异常。

        2
  •  2
  •   Vega Stipe    4 年前

    我也面临着同样的错误。有一次我添加了json。发送密钥时转储,它起作用了。

    producer.send(topic="first_topic", key=json.dumps(key)
    .encode('utf-8'), value=json.dumps(msg)
    .encode('utf-8'))
    .add_callback(on_send_success).add_errback(on_send_error)