代码之家  ›  专栏  ›  技术社区  ›  Jary zhen

日志使用kafka 0.11.0调试(“组{}的协调器发现失败,刷新元数据”,groupId)。x个

  •  0
  • Jary zhen  · 技术社区  · 7 年前

    我正在使用Kafka(版本0.11.0.2)服务器API在localhost中启动Kafka代理。因为它运行没有任何问题。生产者还可以发送消息成功。 但消费者无法获取信息 控制台中没有错误日志。所以我调试了代码并循环 “刷新元数据” .

    这是源代码

    while (coordinatorUnknown()) {
            RequestFuture<Void> future = lookupCoordinator();
            client.poll(future, remainingMs);
    
            if (future.failed()) {
                if (future.isRetriable()) {
                    remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
                    if (remainingMs <= 0)
                        break;
    
                    log.debug("Coordinator discovery failed for group {}, refreshing metadata", groupId);
                    client.awaitMetadataUpdate(remainingMs);
                } else
                    throw future.exception();
            } else if (coordinator != null && client.connectionFailed(coordinator)) {
                // we found the coordinator, but the connection has failed, so mark
                // it dead and backoff before retrying discovery
                coordinatorDead();
                time.sleep(retryBackoffMs);
            }
    
            remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
            if (remainingMs <= 0)
                break;
        }
    

    添加:我将卡夫卡版本更改为 0.10.x个 运行正常。

    这是我的卡夫卡服务器代码。

       private static void startKafkaLocal() throws Exception {
        final File kafkaTmpLogsDir = File.createTempFile("zk_kafka", "2");
        if (kafkaTmpLogsDir.delete() && kafkaTmpLogsDir.mkdir()) {
            Properties props = new Properties();
            props.setProperty("host.name", KafkaProperties.HOSTNAME);
            props.setProperty("port", String.valueOf(KafkaProperties.KAFKA_SERVER_PORT));
            props.setProperty("broker.id", String.valueOf(KafkaProperties.BROKER_ID));
            props.setProperty("zookeeper.connect", KafkaProperties.ZOOKEEPER_CONNECT);
            props.setProperty("log.dirs", kafkaTmpLogsDir.getAbsolutePath());
            //advertised.listeners=PLAINTEXT://xxx.xx.xx.xx:por
      // flush every message.
    
            // flush every 1ms
            props.setProperty("log.default.flush.scheduler.interval.ms", "1");
            props.setProperty("log.flush.interval", "1");
            props.setProperty("log.flush.interval.messages", "1");
            props.setProperty("replica.socket.timeout.ms", "1500");
            props.setProperty("auto.create.topics.enable", "true");
            props.setProperty("num.partitions", "1");
    
            KafkaConfig kafkaConfig = new KafkaConfig(props);
    
            KafkaServerStartable kafka = new KafkaServerStartable(kafkaConfig);
            kafka.startup();
            System.out.println("start kafka ok "+kafka.serverConfig().numPartitions());
        }
    }
    

    谢谢

    1 回复  |  直到 7 年前
        1
  •  1
  •   Mickael Maison    7 年前

    使用kafka 0.11,如果设置 num.partitions 对于1,您还需要设置以下3个设置:

    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    

    运行0.11时,从服务器日志中可以明显看出这一点。