我已经编写了一个代码,可以在主题中以关键值的方式生成消息:
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
# produce json messages
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))
deviceId = "4bc03533ccc94065"
responseId = "c03c4851-701f-4265-aafd-eb133c09c08e"
print deviceId
print responseId
producer.send('collect-response-devices', {'deviceId': deviceId})
producer.send('collect-response-responses', {'responseId' : responseId})
def on_send_success(record_metadata):
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
def on_send_error(excp):
log.error('I am an errback', exc_info=excp)
print excp
# handle exception
# block until all async messages are sent
producer.flush()
# configure multiple retries
producer = KafkaProducer(retries=5)
但是,我的消费者会消费消息,不分配任何内容(
None
在
key
)以及价值部分的所有内容。
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(bootstrap_servers='localhost:9092', auto_offset_reset='earliest', value_deserializer=lambda m: json.loads(m.decode('utf-8')))
consumer.subscribe(['collect-response-devices'])
for message in consumer:
print (message.key, message.value)
这是消费者的输出:
(无,u'deviceid':u'4bc03533cc94065')