代码之家  ›  专栏  ›  技术社区  ›  Yanick Salzmann

无法打开Kafka流的存储,因为状态无效

  •  0
  • Yanick Salzmann  · 技术社区  · 6 年前

    我正在尝试使用Kafka Streams,并创建了以下拓扑:

        KStream<String, HistoryEvent> eventStream = builder.stream(applicationTopicName, Consumed.with(Serdes.String(),
                historyEventSerde));
    
        eventStream.selectKey((key, value) -> new HistoryEventKey(key, value.getIdentifier()))
                .groupByKey()
                .reduce((e1, e2) -> e2, Materialized.as(streamByKeyStoreName));
    

    我稍后会像这样启动流:

    private void startKafkaStreams(KafkaStreams streams) {
        CompletableFuture<KafkaStreams.State> stateFuture = new CompletableFuture<>();
        streams.setStateListener((newState, oldState) -> {
            if(stateFuture.isDone()) {
                return;
            }
    
            if(newState == KafkaStreams.State.RUNNING || newState == KafkaStreams.State.ERROR) {
                stateFuture.complete(newState);
            }
        });
    
        streams.start();
        try {
            KafkaStreams.State finalState = stateFuture.get();
            if(finalState != KafkaStreams.State.RUNNING) {
                // ...
            }
        } catch (InterruptedException ex) {
            // ...
        } catch(ExecutionException ex) {
            // ...
        }
    }
    

    RUNNING

    public KafkaFlowHistory createFlowHistory(String flowId) {
        ReadOnlyKeyValueStore<HistoryEventKey, HistoryEvent> store = streams.store(streamByKeyStoreName,
                QueryableStoreTypes.keyValueStore());
        return new KafkaFlowHistory(flowId, store, event -> topicProducer.send(new ProducerRecord<>(applicationTopicName, flowId, event)));
    }
    

    createFlowHistory 在中完成未来的初始化后调用 跑步

    org.apache.kafka.streams.errors.InvalidStateStoreException:无法获取 按键列出的状态存储流事件流文件服务测试实例 因为流线程是分配的,没有运行

    显然,线程的状态已更改。在尝试查询存储并等待Kafka的内部线程进入正确状态时,是否需要手动处理此问题?

    1 回复  |  直到 6 年前
        1
  •  8
  •   Matthias J. Sax    5 年前

    旧版本 ( 之前 2.2.0)

    启动时,Kafka Streams执行以下状态转换:

    CREATED -> RUNNING -> REBALANCING -> RUNNING
    

    您需要等待第二个运行状态,然后才能查询。

    新版本:

    启动时的状态转换行为已更改(通过 https://issues.apache.org/jira/browse/KAFKA-7657

    CREATED -> REBALANCING -> RUNNING
    

    因此,您不应该再讨论这个问题。