以下python代码在创建的实例时出错
KafkaAdminClient
.
from kafka.admin import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(
bootstrap_servers=bootstrap_servers,
client_id='test',
security_protocol="SSL"
)
得到以下错误:
---------------------------------------------------------------------------
KafkaConnectionError Traceback (most recent call last)
<ipython-input-10-edd53455dcb0> in <module>()
4 admin_client = KafkaAdminClient(
5 bootstrap_servers=bootstrap_servers,
----> 6 client_id='test', security_protocol="SSL"
7 )
8
/apps/external/4/anaconda3/lib/python3.6/site-packages/kafka/admin/client.py in __init__(self, **configs)
216
217 self._closed = False
--> 218 self._refresh_controller_id()
219 log.debug("KafkaAdminClient started.")
220
/apps/external/4/anaconda3/lib/python3.6/site-packages/kafka/admin/client.py in _refresh_controller_id(self)
271 future = self._send_request_to_node(self._client.least_loaded_node(), request)
272
--> 273 self._wait_for_futures([future])
274
275 response = future.value
/apps/external/4/anaconda3/lib/python3.6/site-packages/kafka/admin/client.py in _wait_for_futures(self, futures)
1340
1341 if future.failed():
-> 1342 raise future.exception # pylint: disable-msg=raising-bad-type
KafkaConnectionError: KafkaConnectionError: socket disconnected
但是,以下代码运行时没有任何错误。
from kafka import KafkaConsumer
consumer = KafkaConsumer(
group_id='test',
bootstrap_servers=bootstrap_servers,
security_protocol="SSL")
print(consumer.topics())