代码之家  ›  专栏  ›  技术社区  ›  uh_big_mike_boi

具有缓存和动作的奇怪Spark行为

  •  1
  • uh_big_mike_boi  · 技术社区  · 7 年前

    我一直很想知道为什么我在做一份有火花的工作时会有奇怪的行为。如果我执行操作(A),作业将出错 .show(1) 方法)在缓存数据帧之后或将数据帧写回hdfs之前。

    这里有一个非常类似的帖子:

    Spark SQL SaveMode.Overwrite, getting java.io.FileNotFoundException and requiring 'REFRESH TABLE tableName' .

    基本上,另一篇文章解释说,当你从正在写入的同一个HDFS目录中读取时 SaveMode "overwrite" ,则您将获得 java.io.FileNotFoundException .

    但在这里,我发现,只要在程序中移动操作的位置,就可以得到非常不同的结果——要么完成程序,要么给出这个例外。

    我想知道是否有人能解释为什么Spark在这里不一致?

     val myDF = spark.read.format("csv")
        .option("header", "false")
        .option("delimiter", "\t")
        .schema(schema)
        .load(myPath)
    
    // If I cache it here or persist it then do an action after the cache, it will occasionally 
    // not throw the error. This is when completely restarting the SparkSession so there is no
    // risk of another user interfering on the same JVM.
    
          myDF.cache()
          myDF.show(1)
    
    // Just an example.
    // Many different transformations are then applied...
    
    val secondDF = mergeOtherDFsWithmyDF(myDF, otherDF, thirdDF)
    
    val fourthDF = mergeTwoDFs(thirdDF, StringToCheck, fifthDF)
    
    // Below is the same .show(1) action call as was previously done, only this below
    // action ALWAYS results in a successful completion and the above .show(1) sometimes results
    // in FileNotFoundException and sometimes results in successful completion. The only
    // thing that changes among test runs is only one is executed. Either
    // fourthDF.show(1) or myDF.show(1) is left commented out
    
    fourthDF.show(1)
    fourthDF.write
        .mode(writeMode)
        .option("header", "false")
        .option("delimiter", "\t")
        .csv(myPath)
    
    2 回复  |  直到 7 年前
        1
  •  3
  •   Shaido Aman    7 年前

    尝试使用 count 而不是 show(1) ,我认为问题是由于Spark试图变得聪明,而不是加载整个数据帧(因为 show 不需要一切)。跑步 计数 强制Spark加载并正确缓存所有数据,这有望消除不一致性。

        2
  •  1
  •   Achilleus    7 年前

    Spark只按需实现RDD,大多数操作需要读取DF的所有分区,例如us count(),但take()和first()等操作不需要所有分区。

    在您的情况下,它需要一个分区,因此只有一个分区被物化和缓存。然后,在执行count()时,所有分区都需要具体化并缓存到可用内存允许的范围内。