代码之家  ›  专栏  ›  技术社区  ›  Val Bonn

如何在CRON驱动的DetectDuplicate中摄取所有流文件?

  •  3
  • Val Bonn  · 技术社区  · 6 年前

    在NiFi中,我有一个cron驱动的处理器序列,它每天提供一组流文件,其中包含我感兴趣的两个属性: product_code publication_date

    我需要的是每个 product\u代码 :最新的 发布\u日期

    例如:

    对于此输入:

    flow_1: product_code: A / publication_date : 2018-01-01
    flow_2: product_code: B / publication_date : 2018-01-01
    flow_3: product_code: C / publication_date : 2018-01-01
    flow_4: product_code: A / publication_date : 2018-04-12
    flow_5: product_code: A / publication_date : 2000-12-31
    flow_6: product_code: B / publication_date : 2018-02-02
    flow_7: product_code: B / publication_date : 2018-03-03
    

    预期输出应为:

    flow_3: product_code: C / publication_date : 2018-01-01
    flow_4: product_code: A / publication_date : 2018-04-12
    flow_7: product_code: B / publication_date : 2018-03-03
    

    我测试的算法

    1. 使用 UpdateAttribute 添加属性的处理器 priority 到每个流文件,基于 发布\u日期
    2. 这些更新的流文件被重定向到 PriorityAttributePrioritizer 队列
    3. 流文件保留在此队列中,因为只有一个使用cron驱动的处理器。通过这种方式,我确信队列中的流文件是根据 发布\u日期
    4. 然后CRON触发下一个处理器 DetectDuplicate 基于 product\u代码 属性由于流文件是从最新的项目处理到最旧的项目,我确信当 product\u代码 被检测为重复,这是因为 product\u代码 对于最近的 发布\u日期

    问题

    可悲的是,当cron触发 检测到重复 处理器,仅使用一条消息,其他消息留在队列中。

    如果我将“调度策略”更改为“计时器驱动”,并且“运行调度”为0,那么我的所有流文件都将被消耗,并且输出符合预期。

    有没有办法问我 检测到重复 处理器在队列开始工作时使用队列中的所有消息(而不仅仅是一条消息)?

    或者有没有一种方法可以设置一种调度策略,比如“凌晨2:00开始工作,凌晨4:00停止”?

    你认为有更好的策略来满足需求吗?

    当做

    Val。


    更新1

    (2018-04-13)更多信息,除了布莱恩·本德的评论。

    我知道CRON不是最好的解决方案,但我不知道如何改进我的算法来摆脱它。

    在我的例子中,排队等待重复数据消除的流文件是通过3个REST调用序列生成的:

    • 第一次调用“GetAllCategories”,
    • 然后,对于每个类别,调用“GetSubCategories”,
    • 对于每个子类别,调用“GetProducts”。

    此流文件生成部分通常持续5分钟左右:昨晚,第一个流文件在凌晨2:00:16到达队列,最后一个流文件在凌晨2:04:58到达队列。(这就是我安排 检测到重复 在凌晨3:00运行。)

    如果我的 检测到重复 处理器将被“计时器驱动”调度,到达队列的第一个流文件将在所有流文件到达之前由处理器消耗。

    这将打破全套流文件的顺序。

    我觉得我必须等待所有流文件在 检测到重复 处理器开始工作。

    您是否有改进我的算法的潜在建议?

    2 回复  |  直到 6 年前
        1
  •  4
  •   Bryan Bende    6 年前

    您通常应该对启动流的源处理器使用CRON调度,然后所有其他处理器都应该使用运行调度为0的计时器驱动。

    例如,如果您每天凌晨2:00从目录中提取文件,那么应该使用CRON表达式对GetFile进行调度,以在凌晨2:00启动流,但除此之外,任何内容都不需要CRON调度,因为除非运行GetFile,否则它们将永远不会接收数据。

    如果您希望处理器等待执行,直到所有流文件都可用,您可以使用等待/通知处理器,以便所有流文件在释放到DetectDuplicate处理器之前在等待处理器前面建立。

        2
  •  1
  •   Sivaprasanna Sethuraman    6 年前

    只有一条消息被使用的原因是 CRON 在所有处理器(源处理器和消费处理器/数据流处理器)中启用调度,其执行方式如下:

    例如: 您已经在所有处理器中设置了一个CRON时间表,以便在每天下午2点运行,因此在触发期间,它将使用来自其上游处理器的一个流文件,例如: GetFile 下午2点,其余的流文件将在队列中,下一个流文件将只在第二天下午2点消费,以此类推。这适用于进一步的下游处理器,也就是说,他们每天下午2点一次只消耗flowfile,这本质上是一场灾难。谁希望处理速度慢得像蜗牛?

    这就是为什么你必须遵循布莱恩提到的方法。流量管道的源处理器应仅为 CRON driven ,其余处理器应为 Timer driven 有你想要的跑步时间表,但一般来说 0 sec 用于在流文件出现时使用它。