我正在使用
confluent golang
我的卡夫卡客户。我用
AdminClient
在Kafka集群中创建/删除/获取主题。这是我要初始化的代码
管理员客户端
adminClient, err := kafka.NewAdminClient(&kafka.ConfigMap{
"bootstrap.servers": 127.0.0.1:9092,
})
之后,我使用这个类来创建和获取Kafka集群中的所有主题。以下是创建主题的代码:
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
results, err := adminClient.CreateTopics(
ctx,
[]kafka.TopicSpecification{{
Topic: topic,
NumPartitions: numPartitions,
ReplicationFactor: replicationFactor}},
kafka.SetAdminOperationTimeout(TimeOut),
)
之后,我再次获得主题信息:
result, err := adminClient.GetMetadata(&topic, false, 1000)
问题是:如果我得到一个以前不存在的主题,卡夫卡会自动创建这个主题。这是我不想要的行为。请告诉我怎么修这个。