代码之家  ›  专栏  ›  技术社区  ›  Ged

使用Scala将列分配给Spark数据帧中的其他列

  •  0
  • Ged  · 技术社区  · 5 年前

    为了提高我的Scala技能和答案,我考虑了这个很好的问题: Extract a column value and assign it to another column as an array in spark dataframe

    import spark.implicits._   
    import org.apache.spark.sql.functions._
    
    val df = sc.parallelize(Seq(
        ("r1", 1, 1),
        ("r2", 6, 4),
        ("r3", 4, 1),
        ("r4", 1, 2)
      )).toDF("ID", "a", "b")
    
    val uniqueVal = df.select("b").distinct().map(x => x.getAs[Int](0)).collect.toList    
    def myfun: Int => List[Int] = _ => uniqueVal 
    def myfun_udf = udf(myfun)
    
    df.withColumn("X", myfun_udf( col("b") )).show
    
    +---+---+---+---------+
    | ID|  a|  b|        X|
    +---+---+---+---------+
    | r1|  1|  1|[1, 4, 2]|
    | r2|  6|  4|[1, 4, 2]|
    | r3|  4|  1|[1, 4, 2]|
    | r4|  1|  2|[1, 4, 2]|
    +---+---+---+---------+
    

    有效,但是:

    • 我注意到b列放了两次。
    • 我也可以把第二个语句放在a列,得到相同的结果。E、 那又是什么意思呢?

    df.带列(“X”,myfun_udf(col(“a”)))。显示

    • 所以,我想知道为什么第二个col是input?
    • 以及如何使其对所有列通用?

    所以,这是我在别处看到的代码,但我遗漏了一些东西。

    0 回复  |  直到 5 年前
        1
  •  1
  •   user11174953    5 年前

    你所显示的代码没有多大意义:

    • 它是不可伸缩的-在最坏的情况下,每一行的大小与大小成正比
    • 你已经知道这根本不需要争论。
    • 不需要(重要的是不需要) udf 当时(2016年12月23日,Spark 1.6和2.0发布)
    • 如果你还想用 空变量就足够了

    vote accordingly )继续前进。

    所以怎么能做到:

    • 自定义项 自定义项 nullary 功能:

      val uniqueBVal: Seq[Int] = ???
      val addUniqueBValCol = udf(() => uniqueBVal)
      
      df.withColumn("X", addUniqueBValCol())
      

      概括为:

      import scala.reflect.runtime.universe.TypeTag
      
      def addLiteral[T : TypeTag](xs: Seq[T]) = udf(() => xs)
      
      val x = addLiteral[Int](uniqueBVal)
      df.withColumn("X", x())
      
    • 最好不要用 自定义项 :

      import org.apache.spark.sql.functions._
      
      df.withColumn("x", array(uniquBVal map lit: _*))
      
    • 截至

      正如一开始提到的,整个概念很难辩护。任一窗口功能(完全不可扩展)

      import org.apache.spark.sql.expressions.Window
      
      val w = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
      df.select($"*" +: df.columns.map(c => collect_set(c).over(w).alias(s"${c}_unique")): _*)
      

      val uniqueValues = df.select(
        df.columns map (c => collect_set(col(c)).alias(s"${c}_unique")):_*
      )
      df.crossJoin(uniqueValues)
      

      不过,一般来说,如果实际应用程序出现这种情况,您必须重新考虑您的方法,除非您确定,列的基数很小并且有严格的上界。

    外卖信息是-不要相信随机人在互联网上发布的随机代码。包括这个。