代码之家  ›  专栏  ›  技术社区  ›  Soheil Pourbafrani

Flink流处理窗口中如何采集后期数据

  •  0
  • Soheil Pourbafrani  · 技术社区  · 6 年前

    假设我有一个数据流,其中包含事件时间数据。我希望在8毫秒的窗口时间内收集输入数据流,并减少每个窗口数据。我用下面的代码来完成:

    aggregatedTuple
              .keyBy( 0).timeWindow(Time.milliseconds(8))
              .reduce(new ReduceFunction<Tuple2<Long, JSONObject>>()
    

    : 例如,数据流的关键是映射到处理毫秒时间戳的最后8个子倍数的处理时间戳。 1531569851297 将映射到 1531569851296 .

    但数据流可能延迟到达并进入错误的窗口时间。例如,假设我将窗口时间设置为8毫秒。如果数据按顺序进入Flink引擎,或者至少在延迟小于窗口时间(8毫秒)的情况下进入Flink引擎,这将是最好的情况。但是假设数据流事件时间(也就是数据流中的一个字段)以30毫秒的延迟到达。因此,它将进入错误的窗口,我认为如果我检查每个数据流的事件时间,因为它想进入窗口,我可以过滤如此晚的数据。 所以我有两个问题:

    • 如何根据数据流进入窗口的需要过滤数据流,并检查在窗口的正确时间戳上创建的数据?
    • 如何在变量中收集这样晚的数据以对其进行一些处理?
    1 回复  |  直到 6 年前
        1
  •  0
  •   David Anderson    6 年前

    Flink有两个不同的、相关的抽象,它们处理具有事件时间戳的流上计算窗口分析的不同方面: 水印 允许迟到 .

    第一, 水印 ,在处理事件时间数据(无论您是否使用Windows)时都会发挥作用。水印提供了有关事件时间进度的信息,并为应用程序编写者提供了处理无序数据的方法。水印和数据流一起流动,每个水印在流中标记一个位置并携带一个时间戳。水印用作断言,在流中的该点上,流现在(可能)已完成到该时间戳--换句话说,水印后面的事件不太可能是在水印指示的时间之前发生的。最常见的水印策略是使用 BoundedOutOfOrdernessTimestampExtractor 假定事件在某个固定的有界延迟内到达。

    这现在提供了延迟的定义——考虑时间戳小于水印时间戳的水印后面的事件 晚的 .

    窗口API提供了 允许迟到 ,默认设置为零。如果允许的延迟大于零,则事件时间窗口的默认触发器将在其相应窗口中接受延迟事件,直至达到允许的延迟的限制。窗口操作将在通常的时间触发一次,然后对于每个延迟事件再次触发,直到允许的延迟间隔结束。在这之后,迟发事件就被丢弃了。

    How can I filter data stream as it wants to enter the window and check 
    if the data created at the right timestamp for the window?
    

    Flink的窗口分配程序负责将事件分配给适当的窗口——正确的事情将自动发生。将根据需要创建新的窗口实例。

    How can I gather such late data in a variable to do some processing on them?
    

    您可以在水印中足够大方以避免出现任何延迟数据,和/或将允许的延迟配置为足够长以适应延迟事件。但是,请注意,Flink将被迫保持所有仍接受延迟事件的窗口处于打开状态,这将延迟垃圾收集旧窗口,并可能消耗大量内存。

    请注意,本讨论假设您希望使用时间窗口——例如,您正在使用的8msec长窗口。Flink还支持计数窗口(例如将事件分组为100个批次)、会话窗口和自定义窗口逻辑。例如,如果使用计数窗口,则水印和延迟不起任何作用。

    如果您希望每个键的分析结果,那么在应用窗口之前,请使用key by按键(例如,按userid)对流进行分区。例如

    stream
      .keyBy(e -> e.userId)
      .timeWindow(Time.seconds(10))
      .reduce(...)
    

    将为每个用户ID生成单独的结果。

    一些相关文件:

    Event Time and Watermarks
    Allowed Lateness

    推荐文章