代码之家  ›  专栏  ›  技术社区  ›  TKW

如何在for循环中单独处理多个拼花地板文件?

  •  0
  • TKW  · 技术社区  · 5 年前

    我有多个拼花文件(大约1000个)。我需要加载它们中的每一个,对其进行处理并将结果保存到配置单元表中。我有一个for循环,但它似乎只能处理2或5个文件,而不能处理1000个文件,因为sparks试图同时加载所有文件,我需要它在同一个spark会话中单独执行。

    我尝试使用for循环,然后使用for-each,并使用了unpersist(),但无论如何都失败了。

    val ids = get_files_IDs()
    ids.foreach(id => {
    println("Starting file " + id)
    var df = load_file(id)
    var values_df = calculate_values(df)
    values_df.write.mode(SaveMode.Overwrite).saveAsTable("table.values_" + id)
    df.unpersist()
    })
    
    def get_files_IDs(): List[String] = {
    var ids = sqlContext.sql("SELECT CAST(id AS varchar(10)) FROM  table.ids WHERE id IS NOT NULL")
    var ids_list = ids.select("id").map(r => r.getString(0)).collect().toList
    return ids_list
    }
    
    def calculate_values(df:org.apache.spark.sql.DataFrame): org.apache.spark.sql.DataFrame ={
    val values_id = df.groupBy($"id", $"date", $"hr_time").agg(avg($"value_a") as "avg_val_a", avg($"value_b") as "avg_value_b")
    return values_id
    }
    
    def load_file(id:String): org.apache.spark.sql.DataFrame = {
    val df = sqlContext.read.parquet("/user/hive/wh/table.db/parquet/values_for_" + id + ".parquet")
    return df
    }
    

    我所期望的是spark加载文件id 1,处理数据,将其保存到配置单元表中,然后关闭该日期并继续使用第二个id,依此类推,直到完成1000个文件。而不是同时加载所有内容。

    任何帮助都将不胜感激!我已经坚持了好几天了。我在用Spark 1.6和Scala谢谢!!

    编辑:添加了定义。希望能有助于获得更好的视野。谢谢您!

    0 回复  |  直到 5 年前
        1
  •  0
  •   TKW    5 年前

    好吧,所以经过多次检查,我意识到这个过程运行良好。它对每个文件进行单独处理并保存结果。问题是,在一些非常具体的情况下,这一过程是漫长的。

    因此,我可以告诉您,使用for循环或for-each,您可以处理多个文件并毫无问题地保存结果。取消持久化和清除缓存确实有助于提高性能。