代码之家  ›  专栏  ›  技术社区  ›  hasherBaba

无法在kafka python使用者中获取密钥

  •  0
  • hasherBaba  · 技术社区  · 6 年前

    我已经编写了一个代码,可以在主题中以关键值的方式生成消息:

    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    import json
    
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
    
    # produce json messages
    producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))
    
    deviceId = "4bc03533ccc94065"
    responseId = "c03c4851-701f-4265-aafd-eb133c09c08e"
    
    print deviceId
    print responseId
    
    producer.send('collect-response-devices', {'deviceId': deviceId})
    producer.send('collect-response-responses', {'responseId' : responseId})
    
    def on_send_success(record_metadata):
        print(record_metadata.topic)
        print(record_metadata.partition)
        print(record_metadata.offset)
    
    def on_send_error(excp):
        log.error('I am an errback', exc_info=excp)
        print excp
        # handle exception
    
    # block until all async messages are sent
    producer.flush()
    
    # configure multiple retries
    producer = KafkaProducer(retries=5)
    

    但是,我的消费者会消费消息,不分配任何内容( None key )以及价值部分的所有内容。

    from kafka import KafkaConsumer
    import json
    
    consumer = KafkaConsumer(bootstrap_servers='localhost:9092', auto_offset_reset='earliest', value_deserializer=lambda m: json.loads(m.decode('utf-8')))
    consumer.subscribe(['collect-response-devices'])
    for message in consumer:
      print (message.key, message.value)
    

    这是消费者的输出:

    (无,u'deviceid':u'4bc03533cc94065')

    1 回复  |  直到 6 年前
        1
  •  1
  •   Robin Moffatt    6 年前

    根据 docs ,生成消息时需要指定密钥:

    # produce keyed messages to enable hashed partitioning
    producer.send('my-topic', key=b'foo', value=b'bar')
    

    目前,在生成消息时没有指定密钥,因此 None