代码之家  ›  专栏  ›  技术社区  ›  katty

动态地将参数传递给scala中的函数

  •  -1
  • katty  · 技术社区  · 6 年前

    在数据框中,我有1000个字段的记录为字符串,分隔符为逗号,如

    “a、b、c、d、e……高达1000”——第一次记录 “p,q,r,s,t……高达1000”—第二项记录

    我正在使用下面建议的stackoverflow解决方案

    Split 1 column into 3 columns in spark scala

    df.withColumn("_tmp", split($"columnToSplit", "\\.")).select($"_tmp".getItem(0).as("col1"),$"_tmp".getItem(1).as("col2"),$"_tmp".getItem(2).as("col3")).drop("_tmp")
    

    然而,在我的例子中,我有1000个列,在JSON模式中,我可以像这样检索

    column_seq:Seq[Array]=Schema_func.map(_.name)
    for(i <-o to column_seq.length-1){println(i+" " + column_seq(i))}
    

    结果是

    0 col1 1 col2 2 col3 3 col4

    现在我需要将所有这些索引和列名传递给DataFrame的下面函数

    df。使用列(“\u tmp”,拆分($“columnToSplit”,“\\”)。选择($”_tmp).getItem(0)。作为(“col1”),$“_tmp”。获取项目(1)。作为(“col2”),$“_tmp”。获取项目(2)。作为(“col3”))。放下(“tmp”)
    

    在里面

    $"_tmp".getItem(0).as("col1"),$"_tmp".getItem(1).as("col2"),
    

    由于我无法创建包含所有1000列的长语句,有没有有效的方法将上述json模式中的所有参数传递给select函数,这样我就可以拆分列,添加标题,然后将DF转换为parquet。

    1 回复  |  直到 6 年前
        1
  •  0
  •   Tzach Zohar    6 年前

    你可以建立一系列 org.apache.spark.sql.Column ,其中每一项都是选择正确的项目并具有正确的名称的结果,然后 select 这些专栏:

    val columns: Seq[Column] = Schema_func.map(_.name)
      .zipWithIndex // attach index to names
      .map { case (name, index) => $"_tmp".getItem(index) as name }
    
    val result = df
      .withColumn("_tmp", split($"columnToSplit", "\\."))
      .select(columns: _*)
    

    例如,对于此输入:

    case class A(name: String)
    val Schema_func = Seq(A("c1"), A("c2"), A("c3"), A("c4"), A("c5"))
    val df = Seq("a.b.c.d.e").toDF("columnToSplit")
    

    这个 result 将是:

    // +---+---+---+---+---+
    // | c1| c2| c3| c4| c5|
    // +---+---+---+---+---+
    // |  a|  b|  c|  d|  e|
    // +---+---+---+---+---+