代码之家  ›  专栏  ›  技术社区  ›  Christos Hadjinikolis

Flink IOException:网络缓冲区数量不足

  •  3
  • Christos Hadjinikolis  · 技术社区  · 6 年前

    我正在使用 Flink v1.4.0 。我正在使用 DataSet API (尽管如此,我认为这无关紧要)。

    我正在12核VM上运行一些重载转换。我正在使用2个核心来实现一个 Flink job 其中,我将一些数据存储到 Flink Queryable State 我正在运行另一个 Flink 剩余10个核的作业。

    当我用10个内核运行第二个作业时,我似乎遇到以下错误:

    java.io.IOException: Insufficient number of network buffers: required 10, but only 9 available. The total number of network buffers is currently set to 4096 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:257)
                at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:199)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:618)
                at java.lang.Thread.run(Thread.java:745)
    

    如果我用8核运行它,它就可以正常运行。这是什么原因造成的,为什么我不能使用其他2-->8+2=10芯?

    2 回复  |  直到 6 年前
        1
  •  10
  •   Community uzul    4 年前

    引用Apache Flink常见问题:

    如果以非常高的并行度运行Flink,则可能需要增加网络缓冲区的数量。

    默认情况下,Flink占网络缓冲区JVM堆大小的10%,最小值为64MB,最大值为1GB。您可以通过taskmanager调整所有这些值。网络记忆力分数,taskmanager。网络记忆力min和taskmanager。网络记忆力最大值。

    有关详细信息,请参阅配置参考。

    有一个 dedicated section in the docs for how to configure the network buffers

    总之,您可以在 ./conf/flink-conf.yaml 通过设置 taskmanager.network.numberOfBuffers 参数

    参数应设置为 #slots-per-TM^2 * #TMs * 4 哪里 #slots per TM 是每个TaskManager的插槽数,以及 #TMs 是任务管理器的总数。

    例如,要支持由20台8插槽机器组成的集群,您应该使用大约5000个网络缓冲区以获得最佳吞吐量。默认情况下,每个网络缓冲区的大小为32 KB。在上面的示例中,系统将因此为网络缓冲区分配大约300兆字节。

    详情请参阅文件。

        2
  •  0
  •   RITA KUSHWAHA    2 年前

    我也面临同样的错误

    原因:java。io。IOException:网络数量不足 缓冲区:需要13个,但只有7个可用。的总数 网络缓冲区当前设置为2048,每个缓冲区32768字节。你可以 通过设置配置键来增加此数字 '任务管理器。记忆力网络分数', '任务管理器。记忆力网络最小值',和 '任务管理器。记忆力网络最大值'。

    下面的代码片段解决了我的问题。

    Configuration cfg = new Configuration();
    int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
    cfg.setString("taskmanager.memory.network.max", "1gb");
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(defaultLocalParallelism, cfg);