我想实现一个分布式微服务架构上的消息传递的kafka。
我正在使用
皮卡夫卡
实现了虚拟生产者和(平衡的)消费者。我把所有的消费者分配给
同一消费群体
. 我可以使用
python和控制台
同时,甚至运行时添加它们。
不过,我对消费者有意见。我可以创建多个Python用户,甚至运行时添加它们。但是当我将控制台用户(卡夫卡控制台用户)添加到Python消费者的组时,我得到了互斥错误:
从用户ID 'BiMalHalsMaBooBo.Pro。本地:1722EEA073-4BE4-9D978B7FB15B0B30B'(错误:{ PykaFak.Exist.unNeNeMeNid ID::(0, 1)})提交主题“B'MigalySimPayPosiple主题”偏移的错误
此外,这两个(即使它们属于同一个消费群体)正在消耗消息(Python消费者在它们自己和控制台消费者之间平衡它们)。
现在,我对卡夫卡是新的,但我的第一印象是,卡夫卡不应该对消费者的实施不可知,所以结合他们应该是可能的。我的理解是PykaFKA还是PikaFKA的实现?
生产者:
from pykafka import KafkaClient
from time import sleep
client = KafkaClient(hosts="localhost:9092")
print(client.brokers)
print(client.topics)
topic = client.topics[b'michal_sample_topic']
with topic.get_sync_producer() as producer:
while True:
producer.produce(
bytes(
input('Send test message:'),
'utf-8'
)
)
消费者:
from pykafka import KafkaClient
client = KafkaClient(hosts="localhost:9092")
print(client.brokers)
print(client.topics)
topic = client.topics[b'michal_sample_topic']
balanced_consumer = topic.get_balanced_consumer(
consumer_group=b'testing',
auto_commit_enable=True,
zookeeper_connect='localhost:2181'
)
for message in balanced_consumer:
if message is not None:
print(f'{message.offset} {message.value}')