代码之家  ›  专栏  ›  技术社区  ›  jesantana

通过在每次应用程序重新启动时创建新的使用者组来重置主题偏移量

  •  0
  • jesantana  · 技术社区  · 6 年前

    我有一个卡夫卡主题和一个消费者,在Spring云应用程序中分配了一个消费者组(必须)。作为一项要求,在每次应用程序重启时,我都需要从头开始读取所有接收到的消息。这应该由 resetOffsets 属性,但从 this issue 它目前不起作用。

    我发现 this workaround 在kafka消费者api中使用,建议在每次重新启动时为消费者组指定一个新的随机名称,作为从最早开始读取的一种方式。在春季云流中是否可能/推荐?如何为消费者组定义动态名称?

    3 回复  |  直到 6 年前
        1
  •  1
  •   Gary Russell    6 年前

    是的,它也可以用于SCSt,但正如您所说,设置随机组id有点棘手,尽管您可以将其设置为 System.property 在启动之前 SpringApplication

    如果您直接使用spring kafka,那么它很容易实现 ConsumerSeekAware 你可以 seekToBeginning 分配分区时。

    但是,使用SCSt,您无法直接访问侦听器。

    一种解决方法是在启动 Spring应用程序 通过创建具有相同组id的消费者,这会有点棘手,但如果你有多个应用程序实例,因为你每次可能会得到不同的分区。

    我们将再次考虑解决该问题(我刚刚对此发表了评论)。

        2
  •  1
  •   Mickael Maison    6 年前

    如果每次都要求应用程序从头开始重新启动,则有几个选项:

    1. 您可以将提交的偏移重置为 earliest 重新启动应用程序之前,请使用 kafka-consumer-groups.sh 工具( kafka.admin.ConsumerGroupCommand.scala )

    2. 重新启动后,应用程序可以查找到开头并手动提交偏移量0。如果你有 auto.offset.reset 设置为 最早的 即使0不是有效的偏移量,它也将从头开始。

    3. 您可以使用其他消费者 group.id 每次都有价值。在您的消费者中 Configuration bean,插入 Properties 对象如下:

      properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
      

    最后,您是否使用委员会补偿?如果没有,只需禁用 enable.auto.commit 然后应用程序将始终遵循 汽车抵消重置 背景

    选项1和2通常是首选的,因为它们保持一致的 组id号 允许轻松地将使用者实例添加到组并监视th组。

        3
  •  0
  •   wideawakening    3 年前

    在里面 春天的卡夫卡, 如果您正在配置消费者va配置文件,如 应用亚马尔 (而不是通过编程)您可以使用 SpEL公司 (spring表达语言)提供 基于UUID的消费者群体 每次执行时 +最早偏移量

    # consume-all-configuration
    auto-offset-reset: earliest
    group: consumer-local-#{ T(java.util.UUID).randomUUID().toString() }