代码之家  ›  专栏  ›  技术社区  ›  Data_101

如何将pyspark数据帧分为两行

  •  18
  • Data_101  · 技术社区  · 6 年前

    我在Databricks工作。

    我有一个包含500行的数据框,我想创建两个包含100行的数据框,另一个包含其余400行的数据框。

    +--------------------+----------+
    |              userid| eventdate|
    +--------------------+----------+
    |00518b128fc9459d9...|2017-10-09|
    |00976c0b7f2c4c2ca...|2017-12-16|
    |00a60fb81aa74f35a...|2017-12-04|
    |00f9f7234e2c4bf78...|2017-05-09|
    |0146fe6ad7a243c3b...|2017-11-21|
    |016567f169c145ddb...|2017-10-16|
    |01ccd278777946cb8...|2017-07-05|
    

    我尝试了以下操作,但收到一个错误

    df1 = df[:99]
    df2 = df[100:499]
    
    
    TypeError: unexpected item type: <type 'slice'>
    
    6 回复  |  直到 6 年前
        1
  •  17
  •   pault Tanjin    6 年前

    起初我误解了你的意思,以为你想把柱子切成薄片。如果要选择行的子集,一种方法是使用 monotonically_increasing_id() . 从文档中:

    生成的ID保证是单调递增的,并且 唯一,但不连续。

    您可以使用此ID对数据帧进行排序,并使用 limit() 以确保准确获取所需的行。

    例如:

    import pyspark.sql.functions as f
    import string
    
    # create a dummy df with 500 rows and 2 columns
    N = 500
    numbers = [i%26 for i in range(N)]
    letters = [string.ascii_uppercase[n] for n in numbers]
    
    df = sqlCtx.createDataFrame(
        zip(numbers, letters),
        ('numbers', 'letters')
    )
    
    # add an index column
    df = df.withColumn('index', f.monotonically_increasing_id())
    
    # sort ascending and take first 100 rows for df1
    df1 = df.sort('index').limit(100)
    
    # sort descending and take 400 rows for df2
    df2 = df.sort('index', ascending=False).limit(400)
    

    只是为了验证这是否符合您的要求:

    df1.count()
    #100
    df2.count()
    #400
    

    我们还可以验证索引列是否不重叠:

    df1.select(f.min('index').alias('min'), f.max('index').alias('max')).show()
    #+---+---+
    #|min|max|
    #+---+---+
    #|  0| 99|
    #+---+---+
    
    df2.select(f.min('index').alias('min'), f.max('index').alias('max')).show()
    #+---+----------+
    #|min|       max|
    #+---+----------+
    #|100|8589934841|
    #+---+----------+
    
        2
  •  12
  •   Michail N    6 年前

    Spark数据帧不能像您编写的那样进行索引。您可以使用head方法创建以获取n个最上面的行。这将返回Row()对象的列表,而不是数据帧。因此,您可以将它们转换回数据帧,并使用原始数据帧的减法来获取其余的行。

    #Take the 100 top rows convert them to dataframe 
    #Also you need to provide the schema also to avoid errors
    df1 = sqlContext.createDataFrame(df.head(100), df.schema)
    
    #Take the rest of the rows
    df2 = df.subtract(df1)
    

    如果使用spark 2.0+,还可以使用SparkSession代替spark sqlContext。此外,如果您对前100行不感兴趣,并且希望进行随机拆分,则可以使用 randomSplit 这样地:

    df1,df2 = df.randomSplit([0.20, 0.80],seed=1234)
    
        3
  •  5
  •   Bala    6 年前

    如果我不介意在两个数据帧中都有相同的行,那么我可以使用 sample . 例如,我有一个354行的数据帧。

    >>> df.count()
    354
    
    >>> df.sample(False,0.5,0).count() //approx. 50%
    179
    
    >>> df.sample(False,0.1,0).count() //approx. 10%
    34
    

    或者,如果我想在不存在重复项的情况下严格拆分,我可以这样做

    df1 = df.limit(100)     //100 rows
    df2 = df.subtract(df1)  //Remaining rows
    
        4
  •  1
  •   ou_ryperd    4 年前

    通过以下方式尝试:

    df1_list = df.collect()[:99] #this will return list    
    df1 = spark.createDataFrame(df1) #convert it to spark dataframe
    

    同样,对于这一点:

    df2_list = df.collect()[100:499]
    df2 = spark.createDataFrame(df2)
    
        5
  •  0
  •   richardec    2 年前

    在这两种解决方案中,我认为我们都需要改变 df1 df1_list ,和更改 df2 df2_list 在第二句话中。

        6
  •  -2
  •   Statmonger    4 年前

    此处提供的解决方案不太复杂,更类似于要求的解决方案:

    (适用于Spark 2.4以上版本)

    # Starting
    print('Starting row count:',df.count())
    print('Starting column count:',len(df.columns))
    
    # Slice rows
    df2 = df.limit(3)
    print('Sliced row count:',df2.count())
    
    # Slice columns
    cols_list = df.columns[0:1]
    df3 = df.select(cols_list)
    print('Sliced column count:',len(df3.columns))