代码之家  ›  专栏  ›  技术社区  ›  Christos Hadjinikolis

Flink:执行管道java时出错。util。同时发生的TimeoutException:期货在[10000毫秒]后超时

  •  2
  • Christos Hadjinikolis  · 技术社区  · 6 年前

    我正在使用 Flink v1.4.0

    我在利用 Flink 的原生图形API(Gelly),我正在使用它处理1200万个数据点(边)。我目前正在通过 IntelliJ 使用 弗林克 在同一JVM中执行所有TaskManager和JobManager的微型集群。

    当我加载数据时,在获取 弗林克 要运行的作业,我始终会遇到以下异常:

    ...
    Connected to JobManager at Actor[akka://flink/user/jobmanager_1#XXXXXXXXXX] with leader session id XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.
    322062 [main] ERROR com.somecompany.some.domain.name.some.javaClass- Error executing pipeline
    java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
                at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
                at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157)
                at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
                at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
                at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    

    。。。

    我确保通过以下方式编辑运行配置 IntelliJ公司 要添加:

    -Dakka.client.timeout:600s
    -Dakka.ask.timeout:600s
    

    但例外情况依然存在,至于是什么原因造成的,我没有其他线索。当我减小数据大小时,一切正常。

    当我试图通过网络提交同一份工作时,也会出现同样的问题 Flink UI 在的本地版本上运行 弗林克 我已在群集上安装。在这种情况下,作业永远不会启动,我甚至无法预览自动生成的运算符DAG。当我减少要处理的数据量时,问题再次消失。我还更新了 flink-conf.yaml 包括相同的 akka 配置属性,但这没有什么区别。

    如何修复此问题?

    1 回复  |  直到 6 年前
        1
  •  0
  •   Christos Hadjinikolis    6 年前

    在IntelliJ中运行Flink作业时,依赖Flink微型集群。迷你集群不同于在独立、纱线或Mesos上运行Flink,因为它依赖于单个JVM。此外,微型集群以多种方式进行了预配置,并且并不总是能够更改该配置(至少在某些设置方面)。

    在将作业提交到集群时(而不是在通过小型集群运行作业时),我必须更改的一件事是分配给作业管理器的堆内存的大小。这是必要的,因为加载要处理的数据不是我想要运行的Flink作业的一部分(这在Flink中不是标准做法,实际上这样做是错误的)。通过增加作业管理器的堆,我能够让我的作业运行,但最终我必须为我的Flink作业定义一种新的输入格式,以避免作业管理器不得不预加载数据以供执行——无论如何,这不应该是它的责任。

    对于眼前的问题:无法通过IntelliJ(据我所知)将堆内存分配给作业管理器,因此作业总是会失败。