代码之家  ›  专栏  ›  技术社区  ›  ZygD

将PySpark数据帧与条件结果列联接

  •  0
  • ZygD  · 技术社区  · 4 年前

    我有这些桌子:

    df1                  df2
    +---+------------+   +---+---------+
    | id|   many_cols|   | id|criterion|
    +---+------------+   +---+---------+
    |  1|lots_of_data|   |  1|    false|
    |  2|lots_of_data|   |  1|     true|
    |  3|lots_of_data|   |  1|     true|
    +---+------------+   |  3|    false|
                         +---+---------+
    

    我打算在中创建其他列 df1

    +---+------------+------+
    | id|   many_cols|result|
    +---+------------+------+
    |  1|lots_of_data|     1|
    |  2|lots_of_data|  null|
    |  3|lots_of_data|     0|
    +---+------------+------+
    

    result 应该是 1 true 在里面 df2
    结果 应该是 0 如果没有对应的 在里面
    结果 null 如果没有对应的 id 在里面

    我想不出一个有效的方法来做这件事。我在加入后仅遇到第三个条件:

    df = df1.join(df2, 'id', 'full')
    df.show()
    
    #  +---+------------+---------+
    #  | id|   many_cols|criterion|
    #  +---+------------+---------+
    #  |  1|lots_of_data|    false|
    #  |  1|lots_of_data|     true|
    #  |  1|lots_of_data|     true|
    #  |  3|lots_of_data|    false|
    #  |  2|lots_of_data|     null|
    #  +---+------------+---------+
    

    PySpark数据帧的创建方式如下:

    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    
    spark = SparkSession.builder.getOrCreate()
    
    df1cols = ['id', 'many_cols']
    df1data = [(1, 'lots_of_data'),
               (2, 'lots_of_data'),
               (3, 'lots_of_data')]
    df2cols = ['id', 'criterion']
    df2data = [(1, False),
               (1, True),
               (1, True),
               (3, None)]
    df1 = spark.createDataFrame(df1data, df1cols)
    df2 = spark.createDataFrame(df2data, df2cols)
    
    0 回复  |  直到 4 年前
        1
  •  2
  •   blackbishop    4 年前

    一个简单的方法是使用groupby df2 获得最大值 criterion 通过 id 加入 df1

    from pyspark.sql import functions as F
    
    df2_group = df2.groupBy("id").agg(F.max("criterion").alias("criterion"))
    
    result = df1.join(df2_group, ["id"], "left").withColumn(
        "result",
        F.col("criterion").cast("int")
    ).drop("criterion")
    
    result.show()
    #+---+------------+------+
    #| id|   many_cols|result|
    #+---+------------+------+
    #|  1|lots_of_data|     1|
    #|  3|lots_of_data|     0|
    #|  2|lots_of_data|  null|
    #+---+------------+------+
    
        2
  •  2
  •   mck    4 年前

    您可以尝试使用相关子查询从df2中获取最大布尔值,并将其转换为整数。

    df1.createOrReplaceTempView('df1') 
    df2.createOrReplaceTempView('df2') 
    
    df = spark.sql("""
        select
            df1.*,
            (select int(max(criterion)) from df2 where df1.id = df2.id) as result
        from df1
    """)
    
    df.show()
    +---+------------+------+
    | id|   many_cols|result|
    +---+------------+------+
    |  1|lots_of_data|     1|
    |  3|lots_of_data|     0|
    |  2|lots_of_data|  null|
    +---+------------+------+
    
        3
  •  1
  •   kites    4 年前

    看看这个解决方案。加入后。您可以根据需求使用多个条件检查,并使用when子句相应地分配值,然后根据id和其他列获取结果分组的最大值。如果只对分区使用id,还可以使用window函数来计算结果的最大值。

    from pyspark.sql import functions as F
    from pyspark.sql.window import Window
    
    df1cols = ['id', 'many_cols']
    df1data = [(1, 'lots_of_data'),
               (2, 'lots_of_data'),
               (3, 'lots_of_data')]
    df2cols = ['id', 'criterion']
    df2data = [(1, False),
               (1, True),
               (1, True),
               (3, False)]
    df1 = spark.createDataFrame(df1data, df1cols)
    df2 = spark.createDataFrame(df2data, df2cols)
    
    df2_mod =df2.withColumnRenamed("id", "id_2")
    
    df3=df1.join(df2_mod, on=df1.id== df2_mod.id_2, how='left')
    
    cond1 = (F.col("id")== F.col("id_2"))& (F.col("criterion")==1)
    cond2 = (F.col("id")== F.col("id_2"))& (F.col("criterion")==0)
    cond3 = (F.col("id_2").isNull())
    
    df3.select("id", "many_cols", F.when(cond1, 1).when(cond2,0).when(cond3, F.lit(None)).alias("result"))\
        .groupBy("id", "many_cols").agg(F.max(F.col("result")).alias("result")).orderBy("id").show()
    
    Result:
    ------
    
    +---+------------+------+
    | id|   many_cols|result|
    +---+------------+------+
    |  1|lots_of_data|     1|
    |  2|lots_of_data|  null|
    |  3|lots_of_data|     0|
    +---+------------+------+
    
    

    w=Window().partitionBy("id")
    
    df3.select("id", "many_cols", F.when(cond1, 1).when(cond2,0).when(cond3, F.lit(None)).alias("result"))\
        .select("id", "many_cols", F.max("result").over(w).alias("result")).drop_duplicates().show()
    
        4
  •  1
  •   ZygD    4 年前

    我不得不将建议答案的想法合并起来,以得到最适合我的解决方案。

    # The `cond` variable is very useful, here it represents several complex conditions
    cond = F.col('criterion') == True
    df2_grp = df2.select(
        'id',
        F.when(cond, 1).otherwise(0).alias('c')
    ).groupBy('id').agg(F.max(F.col('c')).alias('result'))
    df = df1.join(df2_grp, 'id', 'left')
    
    df.show()
    #+---+------------+------+
    #| id|   many_cols|result|
    #+---+------------+------+
    #|  1|lots_of_data|     1|
    #|  3|lots_of_data|     0|
    #|  2|lots_of_data|  null|
    #+---+------------+------+