代码之家  ›  专栏  ›  技术社区  ›  Arjun

基于逻辑的spark df追加新列

  •  0
  • Arjun  · 技术社区  · 6 年前

    需要基于其他列向下面的df添加新列。这是df模式

    scala> a.printSchema()
    root
     |-- ID: decimal(22,0) (nullable = true)
     |-- NAME: string (nullable = true)
     |-- AMOUNT: double (nullable = true)
     |-- CODE: integer (nullable = true)
     |-- NAME1: string (nullable = true)
     |-- code1: array (nullable = true)
     |    |-- element: integer (containsNull = true)
     |-- revised_code string (nullable = true)
    

    现在我想根据下面的条件添加一个列say标志

     1=> if code == revised_code, than flag is P
     2 => if code != revised code than I  
     3=> if both code and revised_code is null than no flag.
    

    这是我正在尝试的自定义项,但它给出了 I 对于案例1和案例3。

     def tagsUdf =
        udf((code: String, revised_code: String) =>
          if (code == null  && revised_code == null ) ""
          else if (code == revised_code) "P" else "I")
    
    
    tagsUdf(col("CODE"), col("revised_code"))
    

    有人能指出我犯了什么错吗

    I/P DF
    +-------------+-------+------------+
    |NAME         |   CODE|revised_code|
    +-------------+-------+------------+
    |       amz   |   null|       null|
    |   Watch     |   null|       5812|
    |   Watch     |   null|       5812|
    |   Watch     |   5812|       5812|
    |       amz   |   null|       null|
    |   amz       | 9999  |       4352|
    +-------------+-------+-----------+
    Schema:
    root
     |-- MERCHANT_NAME: string (nullable = true)
     |-- CODE: integer (nullable = true)
     |-- revised_mcc: string (nullable = true)
    
    O/P DF    
    +-------------+-------+-----------------+
    |NAME         |   CODE|revised_code|flag|
    +-------------+-------+-----------------+
    |   amz       |   null|       null| null|
    |   Watch     |   null|       5812|  I  |
    |   Watch     |   null|       5812|  I  |
    |   Watch     |   5812|       5812|  P  |
    |   amz       |   null|       null| null|
    |amz          | 9999  |       4352|  I  |
    +-------------+-------+-----------------+
    
    1 回复  |  直到 6 年前
        1
  •  1
  •   Ramesh Maharjan    6 年前

    你不需要 udf 功能。简单的嵌套 when 内置函数应该起作用。

    import org.apache.spark.sql.functions._
    df.withColumn("CODE", col("CODE").cast("string"))
      .withColumn("flag", when(((isnull(col("CODE")) || col("CODE") === "null") && (isnull(col("revised_code")) || col("revised_code") === "null")), "").otherwise(when(col("CODE") === col("revised_code"), "P").otherwise("I")))
      .show(false)
    

    在这里, CODE 柱子铸成 stringType 在逻辑应用之前使用when,以便 代码 revised_code 比较时在数据类型中匹配。

    注: 代码 列是 IntegerType 在任何情况下都不能为空。