代码之家  ›  专栏  ›  技术社区  ›  kellanburket

从Kafka存储更改日志读取时发生OffsetAutoFrangeException

  •  1
  • kellanburket  · 技术社区  · 6 年前

    我有一个kafka streams应用程序,它正在从一个商店的changelog中读取数据,偶尔会抛出以下错误:

    org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {topic-partition=offset}
        at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:928)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:485)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1185)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
        at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:84)
        at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:319)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:789)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
    

    我以为消费者应该默认 latest 是的。即使我尝试使用 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 去哪一个 最新的 earliest ,我仍然看到这个错误。为什么?

    1 回复  |  直到 6 年前
        1
  •  2
  •   Matthias J. Sax    6 年前

    配置使用者重置策略只能用于读取实际输入主题。

    对于变更日志主题(即还原案例),重置策略始终设置为 none 在内部,因为kafka streams需要手动处理这个案例。捕获异常并将其记录为警告级别消息。之后,kafka streams做了一些内部清理和手动操作 #seekToBeginning() 以完全重新启动还原进程。

    没有理由担心这个。但是,会记录一条警告消息,通知您有关事件的信息。