代码之家  ›  专栏  ›  技术社区  ›  eugene

pyspark,elasticsearch输入,无法显示数据帧

  •  0
  • eugene  · 技术社区  · 5 年前

    我能做到 df.head() 但在我做了之后 withColumn ,我做不到 df.head df.show()

    我不知道发生了什么,同样 如果我创建了 df2 = sqlContext.createDataFrame( [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

    我使用的官方演示数据(accounts.zip)来自 https://www.elastic.co/guide/en/kibana/current/tutorial-load-dataset.html

    import findspark
    findspark.init('/usr/local/spark')
    from pyspark import SparkContext, SparkConf
    
    import pyspark
    
    if 'sc' in locals():
        sc.stop()
    conf = pyspark.SparkConf()
    conf.set("spark.driver.allowMultipleContexts", "true")
    conf.set("spark.driver.extraClassPath", "/usr/local/elasticsearch-hadoop/dist/elasticsearch-spark-20_2.11-6.6.0.jar")
    conf.set("es.index.auto.create", "true")
    conf.set("es.nodes.discovery", "true")
    #conf.set("es.nodes.wan.only", "true")
    sc = pyspark.SparkContext(conf=conf)
    
    sqlContext = pyspark.SQLContext(sc)
    
    df = sqlContext.read.option("es.resource", "relay-foods").format("org.elasticsearch.spark.sql").load()
    
    # ==============> 
    # at this state I can print df fine
    df.show() # works
    
    
    from dateutil import parser
    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType
    
    def order_period(order_date):
        order_date = parser.parse(order_date)
        result = order_date.strftime('%Y-%m')
    
        return result
    udf_order_period = udf(order_period, StringType())
    df_new2 = df.withColumn("OrderPeriod", udf_order_period("OrderDate"))
    # ==================> 
    df_new2.head() # this results in error
    

    错误消息是

    Py4JJavaError: An error occurred while calling o152.collectToPython.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): java.lang.IllegalArgumentException: The value (Buffer(_jsonparsefailure, _split_type_failure)) of the type (scala.collection.convert.Wrappers.JListWrapper) cannot be converted to the string type
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:290)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:285)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
        at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:60)
        at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:57)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1073)
        at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1089)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1127)
        at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
        at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:50)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)
    
    Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
        at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
        at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3258)
        at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3255)
        at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
        at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3255)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.IllegalArgumentException: The value (Buffer(_jsonparsefailure, _split_type_failure)) of the type (scala.collection.convert.Wrappers.JListWrapper) cannot be converted to the string type
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:290)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:285)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
        at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:60)
        at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:57)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1073)
        at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1089)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1127)
        at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
        at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:50)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)
    

    下面是df.show()和df.dtypes

    +--------------------+--------+--------+----------+-------+----------+-----+------------+------+-----------------+--------------------+--------------------+------+----+
    |          @timestamp|@version|CommonId| OrderDate|OrderId|PickupDate|PupId|TotalCharges|UserId|             host|             message|                path|sentAt|tags|
    +--------------------+--------+--------+----------+-------+----------+-----+------------+------+-----------------+--------------------+--------------------+------+----+
    |2019-01-28 22:02:...|       1|   2N3WF|03/12/2009|   1937|04/12/2009|    5|    $147.24 | 39588|devserver-MS-7758|1937,03/12/2009,3...|/tmp/relay-foods.csv|  null|null|
    
    
    [('@timestamp', 'timestamp'),
     ('@version', 'string'),
     ('CommonId', 'string'),
     ('OrderDate', 'string'),
     ('OrderId', 'string'),
     ('PickupDate', 'string'),
     ('PupId', 'string'),
     ('TotalCharges', 'string'),
     ('UserId', 'string'),
     ('host', 'string'),
     ('message', 'string'),
     ('path', 'string'),
     ('sentAt', 'timestamp'),
     ('tags', 'string')]
    
    0 回复  |  直到 5 年前