代码之家  ›  专栏  ›  技术社区  ›  kingledion

Pyspark中RDD到DF的不完全转换

  •  0
  • kingledion  · 技术社区  · 6 年前

    使用PySpark 1.6.3,我试图将RDD转换为数据帧。这是运行在齐柏林飞艇笔记本上的测试代码。感兴趣的RDD是 rdd_ret .

    >>> from pyspark.sql import Row
    >>> rdd_ret.count()
    9301
    >>> rddofrows = rdd_ret.map(lambda x: Row(**x))
    >>> things = rddofrows.take(10000)
    >>> len(things)
    9301
    >>> [type(x) for x in things if type(x) != Row]
    []
    >>> [len(x) for x in things if len(x) != 117]
    []
    

    >>> outdf = rddofrows.toDF(sampleRatio=0.1)
    >>> outdf.count()
    

    TypeError: 'NoneType' object is not iterable

    输出Dataframe对象已生成,但我尝试在其上运行的任何操作(.show();.count();.filter())都会在底部生成相同的堆栈跟踪。我不明白在这种情况下什么可能是非类型的?当然Row对象中的一些值可能有错误,但是为了计数或显示,您应该遍历Dataframe的行,这些行都在那里。

    Traceback (most recent call last):
      File "/tmp/zeppelin_pyspark-5665146503764823323.py", line 360, in <module>
        exec(code, _zcUserQueryNameSpace)
      File "<stdin>", line 1, in <module>
      File "/usr/hdp/current/spark-client/python/pyspark/sql/dataframe.py", line 269, in count
        return int(self._jdf.count())
      File "/usr/hdp/current/spark-client/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
        answer, self.gateway_client, self.target_id, self.name)
      File "/usr/hdp/current/spark-client/python/pyspark/sql/utils.py", line 45, in deco
        return f(*a, **kw)
      File "/usr/hdp/current/spark-client/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
        format(target_id, ".", name), value)
    py4j.protocol.Py4JJavaError: An error occurred while calling o2282.count.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 21 in stage 1256.0 failed 4 times, most recent failure: Lost task 21.3 in stage 1256.0 (TID 62913, usg-kov-e1b-slv005.c.pg-us-n-app-053847.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/usr/hdp/current/spark-client/python/pyspark/worker.py", line 111, in main
        process()
      File "/usr/hdp/current/spark-client/python/pyspark/worker.py", line 106, in process
        serializer.dump_stream(func(split_index, iterator), outfile)
      File "/usr/hdp/current/spark-client/python/pyspark/serializers.py", line 263, in dump_stream
        vs = list(itertools.islice(iterator, batch))
      File "/usr/hdp/current/spark-client/python/pyspark/sql/types.py", line 924, in convert_struct
        return tuple(conv(v) for v, conv in zip(obj, converters))
      File "/usr/hdp/current/spark-client/python/pyspark/sql/types.py", line 924, in <genexpr>
        return tuple(conv(v) for v, conv in zip(obj, converters))
      File "/usr/hdp/current/spark-client/python/pyspark/sql/types.py", line 900, in <lambda>
        return lambda row: [conv(v) for v in row]
    TypeError: 'NoneType' object is not iterable
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
        at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
        at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1831)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1844)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1928)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:934)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:933)
        at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:166)
        at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
        at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
        at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
        at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2087)
        at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1499)
        at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1506)
        at org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1516)
        at org.apache.spark.sql.DataFrame$$anonfun$count$1.apply(DataFrame.scala:1515)
        at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2100)
        at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1515)
        at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:209)
        at java.lang.Thread.run(Thread.java:745)
    

    根据请求,以下是其中一行的外观:

    Row(accountType='individual', added='2018-06-05T01:52:34.257+0000', assignment='null', author='noahmagel', authorCity='null', authorCityCode='null', 
    authorContinent='North America', authorContinentCode='n-a', authorCountry='United States', authorCountryCode='us', authorCounty='null', 
    authorCountyCode='null', authorLocation='n-a,us,,,', authorState='null', authorStateCode='null', avatarUrl='https://pbs.twimg.com/profile_images/613069089263718401/P1BWMsFG_normal.jpg', 
    averageDurationOfVisit=20.0, averageVisits=6.0, backlinks=49850734.0, blogComments=0.0, checked=False, city='null', cityCode='null', continent='North America', 
    continentCode='n-a', country='United States', countryCode='us', county='null', countyCode='null', date='2017-12-11T10:58:36.000+0000', 
    displayUrls=[], domain='twitter.com', engagement=0.0, expandedUrls=[], facebookAuthorId='null', facebookComments=0.0, facebookLikes=0.0, 
    facebookRole='null', facebookShares=0.0, facebookSubtype='null', forumPosts=0.0, forumViews=0.0, fullText='@oli_braun @elonmusk @SpaceX Take my money 💰',
    fullname='noah', gender='male', id=167783541878.0, imageMd5s=None, impact=34.0, importanceAmplification=28.0, importanceReach=40.0, 
    impressions=208.0, influence=502.0, insightsHashtag=[], insightsMentioned=['@elonmusk', '@spacex', '@oli_braun'], instagramCommentCount=0.0, 
    instagramFollowerCount=0.0, instagramFollowingCount=0.0, instagramInteractionsCount=0.0, instagramLikeCount=0.0, instagramPostCount=0.0, 
    interest=['Fine arts', 'Business', 'Technology'], language='en', lastAssignmentDate='null', latitude=0.0, lemmatize=['money'], 
    locationName='null', logoImages=None, longitude=0.0, matchPositions=[], mediaFilter='null', mediaUrls=[], monthlyVisitors=6000000000.0, mozRank=9.6, 
    originalUrl='http://twitter.com/noahmagel/statuses/940173969935818752', outreach=0.0, pageType='twitter', pagesPerVisit=22.0, percentFemaleVisitors=46.0, 
    percentMaleVisitors=54.0, priority='null', professions=[], queryId=1999376256.0, queryName='Braun_English', reach=502.0, 
    replyTo='http://twitter.com/oli_braun/statuses/940171345115144192', resourceId=167783541878.0, resourceType='page', retweetOf='null', 
    sentiment='neutral', shortUrls=[], snippet='@oli_braun @elonmusk @SpaceX Take my money 💰', starred=False, state='null', stateCode='null', status='null', 
    subtype='null', tags=[], textlen=44, threadAuthor='oli_braun', threadCreated='null', threadEntryType='reply', threadId='0', threadURL='null',
    title='noah (@noahmagel): @oli_braun @elonmusk @Spac ...', trackedLinkClicks=0.0, trackedLinks='null', twitterAuthorId='2246429194', 
    twitterFollowers=208.0, twitterFollowing=513.0, twitterPostCount=381.0, twitterReplyCount=0.0, twitterRetweets=0.0, twitterRole='null', 
    twitterVerified=False, updated='2018-06-05T01:52:34.257+0000', url='http://twitter.com/noahmagel/statuses/940173969935818752', wordCount='null')
    
    1 回复  |  直到 6 年前
        1
  •  1
  •   fizloki    4 年前

    sc = spark.sparkContext
    json_rows = ['{"key1": [{"foo": 1}, {"bar": 2}]}', 
                 '{"key2": 1}']
    rows = sc.parallelize(json_rows)
    df = spark.read.json(rows)
    rdd = df.rdd
    new_df = spark.createDataFrame(rdd, samplingRatio=1)
    new_df.head(2)
    

    File "/usr/hdp/current/spark-client/python/pyspark/sql/types.py", line 900, in <lambda>
        return lambda row: [conv(v) for v in row]
    TypeError: 'NoneType' object is not iterable
    

    json_rows = ['{"key1": [1, 2]}', 
                 '{"key2": 1}']
    

    问题是当你有 list ArrayType 是谁的元素 StructType Row 类型。 类型需要转换,请参阅 source code :

    def _need_converter(dataType):
        if isinstance(dataType, StructType):
            return True
        elif isinstance(dataType, ArrayType):
            return _need_converter(dataType.elementType)
    

    因此,它现在将尝试转换数组中的元素( lambda row: [conv(v) for v in row] conv() 里面的元素?

    我的解决方案是通过map展平嵌套的行/结构,这样值就变成str文本,不需要转换。

    import json
    from pyspark.sql.types import Row, ArrayType
    
    def flatten(x):
      x_dict = x.asDict()
      for k, v in x_dict.items():
        if isinstance(v, Row):
          x_dict[k] = json.dumps(v.asDict())
      return x_dict
    
    sc = spark.sparkContext
    rows = sc.parallelize(json_rows)
    df = spark.read.json(rows)
    flat_rdd = df.rdd.map(lambda x: flatten(x))
    flat_df = spark.createDataFrame(flat_rdd, samplingRatio=1)
    flat_df.head(2)
    

    我很好的把整个 反正会红移。基督教青年会。对于上面列表的示例,您可能已经检查了类型 列表 我认为完全可以保持一个嵌套列表,只要它的元素是文本,所以你可以只展平列表的元素,而不是列表本身。

        2
  •  0
  •   hamza tuna    6 年前