代码之家  ›  专栏  ›  技术社区  ›  Alexander

PySpark 2.2分解删除空行(如何实现explode\u outer)?[复制]

  •  1
  • Alexander  · 技术社区  · 6 年前

    我正在处理PySpark数据帧中的一些深度嵌套的数据。当我试图将结构扁平化为行和列时,我注意到当我调用 withColumn null 在source列中,则从结果数据帧中删除该行。相反,我想找到一种方法来保留行和 在结果列中。

    要使用的示例数据帧:

    from pyspark.sql.functions import explode, first, col, monotonically_increasing_id
    from pyspark.sql import Row
    
    df = spark.createDataFrame([
      Row(dataCells=[Row(posx=0, posy=1, posz=.5, value=1.5, shape=[Row(_type='square', _len=1)]), 
                     Row(posx=1, posy=3, posz=.5, value=4.5, shape=[]), 
                     Row(posx=2, posy=5, posz=.5, value=7.5, shape=[Row(_type='circle', _len=.5)])
        ])
    ])
    

    def flatten_struct_cols(df):
        flat_cols = [column[0] for column in df.dtypes if 'struct' not in column[1][:6]]
        struct_columns = [column[0] for column in df.dtypes if 'struct' in column[1][:6]]
    
        df = df.select(flat_cols +
                       [col(sc + '.' + c).alias(sc + '_' + c)
                       for sc in struct_columns
                       for c in df.select(sc + '.*').columns])
    
        return df
    

    模式如下:

    df.printSchema()
    
    root
     |-- dataCells: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- posx: long (nullable = true)
     |    |    |-- posy: long (nullable = true)
     |    |    |-- posz: double (nullable = true)
     |    |    |-- shape: array (nullable = true)
     |    |    |    |-- element: struct (containsNull = true)
     |    |    |    |    |-- _len: long (nullable = true)
     |    |    |    |    |-- _type: string (nullable = true)
     |    |    |-- value: double (nullable = true)
    

    df.show(3)
    
    +--------------------+
    |           dataCells|
    +--------------------+
    |[[0,1,0.5,Wrapped...|
    +--------------------+
    

    我从分解数组开始,因为我想把这个包含一个struct数组的数组转换成行和列。然后,我将结构字段展平为新列。

    df = df.withColumn('dataCells', explode(col('dataCells')))
    df = flatten_struct_cols(df)
    df.show(3)
    

    +--------------+--------------+--------------+---------------+---------------+
    |dataCells_posx|dataCells_posy|dataCells_posz|dataCells_shape|dataCells_value|
    +--------------+--------------+--------------+---------------+---------------+
    |             0|             1|           0.5|   [[1,square]]|            1.5|
    |             1|             3|           0.5|             []|            4.5|
    |             2|             5|           0.5|[[null,circle]]|            7.5|
    +--------------+--------------+--------------+---------------+---------------+
    

    一切都很好,正如我所期望的那样 explode 这个 dataCells_shape 具有空/空值的列。

    df = df.withColumn('dataCells_shape', explode(col('dataCells_shape')))
    df.show(3)
    

    这将从数据帧中删除第二行:

    +--------------+--------------+--------------+---------------+---------------+
    |dataCells_posx|dataCells_posy|dataCells_posz|dataCells_shape|dataCells_value|
    +--------------+--------------+--------------+---------------+---------------+
    |             0|             1|           0.5|     [1,square]|            1.5|
    |             2|             5|           0.5|  [null,circle]|            7.5|
    +--------------+--------------+--------------+---------------+---------------+
    

    .withColumn 爆炸 不管怎样都能得到同样的结果。

    我还尝试创建一个 UDF 爆炸 函数,如果行不是空的/空的,但我遇到了JVM错误处理 无效的 .

    from pyspark.sql.functions import udf
    from pyspark.sql.types import NullType, StructType
    
    def explode_if_not_null(trow):
        if trow:
            return explode(trow)
        else:
            return NullType
    
    func_udf = udf(explode_if_not_null, StructType())
    df = df.withColumn('dataCells_shape_test', func_udf(df['dataCells_shape']))
    df.show(3)
    
    AttributeError: 'NoneType' object has no attribute '_jvm'
    

    有谁能给我一个爆炸或扁平化的建议吗 ArrayType 当列为 无效的 ?

    我使用的是PySpark 2.2.0

    编辑:

    尽可能按照提供的链接 dupe .isNotNull().otherwise() 将结构架构提供给 .otherwise 但该行仍从结果集中删除。

    df.withColumn("dataCells_shape_test", explode(when(col("dataCells_shape").isNotNull(), col("dataCells_shape"))
                                                  .otherwise(array(lit(None).cast(df.select(col("dataCells_shape").getItem(0))
                                                                                                                  .dtypes[0][1])
                                                                  )
                                                            )
                                                 )
                 ).show()
    
    +--------------+--------------+--------------+---------------+---------------+--------------------+
    |dataCells_posx|dataCells_posy|dataCells_posz|dataCells_shape|dataCells_value|dataCells_shape_test|
    +--------------+--------------+--------------+---------------+---------------+--------------------+
    |             0|             1|           0.5|   [[1,square]]|            1.5|          [1,square]|
    |             2|             5|           0.5|[[null,circle]]|            7.5|       [null,circle]|
    +--------------+--------------+--------------+---------------+---------------+--------------------+
    
    2 回复  |  直到 6 年前
        1
  •  5
  •   pault Tanjin    6 年前

    多亏了 pault 因为你指点我 this question this question 关于将Python映射到Java。我得到了一个有效的解决方案:

    from pyspark.sql.column import Column, _to_java_column
    
    def explode_outer(col):
        _explode_outer = sc._jvm.org.apache.spark.sql.functions.explode_outer 
        return Column(_explode_outer(_to_java_column(col)))
    
    new_df = df.withColumn("dataCells_shape", explode_outer(col("dataCells_shape")))
    
    +--------------+--------------+--------------+---------------+---------------+
    |dataCells_posx|dataCells_posy|dataCells_posz|dataCells_shape|dataCells_value|
    +--------------+--------------+--------------+---------------+---------------+
    |             0|             1|           0.5|     [1,square]|            1.5|
    |             1|             3|           0.5|           null|            4.5|
    |             2|             5|           0.5|  [null,circle]|            7.5|
    +--------------+--------------+--------------+---------------+---------------+
    
    root
     |-- dataCells_posx: long (nullable = true)
     |-- dataCells_posy: long (nullable = true)
     |-- dataCells_posz: double (nullable = true)
     |-- dataCells_shape: struct (nullable = true)
     |    |-- _len: long (nullable = true)
     |    |-- _type: string (nullable = true)
     |-- dataCells_value: double (nullable = true)
    

    explode_outer 在spark 2.2中定义(但由于某些原因,API包装器直到2.3版才在pyspark中实现)。这个解决方案为已经实现的java函数创建一个包装器。

        2
  •  0
  •   iurii_n    6 年前

    对于这种复杂的结构,编写映射函数并在中使用它会更容易 flatMap

    def flat_arr(row):
        rows = []
        # apply some logic to fill rows list with more "rows"
        return rows
    
    rdd = df.rdd.flatMap(flat_arr)
    schema = StructType(
        StructField('field1', StringType()),
        # define more fields
    )
    df = df.sql_ctx.createDataFrame(rdd, schema)
    df.show()
    

    这个解决方案看起来比应用程序要长一些 withColumn ,但这可能是您的解决方案的第一次迭代,这样您就可以看到如何将其转换为 带列