代码之家  ›  专栏  ›  技术社区  ›  Adeel Hashmi

Spark机器学习:RDD变得不可读

  •  1
  • Adeel Hashmi  · 技术社区  · 7 年前

    我正在尝试向 mllib 调用的函数 Word2Vec 在火花中。像 Word2Vec 返回a DataFrame 对于包含所需向量的“result”列,需要一些代码。最后,当代码在Spark中成功运行时,我尝试使用 .foreach println 几行代码。Spark在此步骤崩溃,出现以下错误: NullPointerException . 如果我删除 打印LN 命令我尝试使用RDD的示例方法,但出现了相同的火花错误。不知何故,RDD变得无法读取。

    要了解此ML任务的背景,请参阅 link

    import org.apache.spark._
    import org.apache.spark.rdd._
    import org.apache.spark.SparkContext._
    import org.apache.spark.mllib.feature.HashingTF
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.ml.feature.{LabeledPoint => NewLabeledPoint}
    import org.apache.spark.mllib.tree.GradientBoostedTrees
    import org.apache.spark.mllib.tree.configuration.BoostingStrategy
    import org.apache.spark._
    import org.apache.spark.rdd._
    import org.apache.spark.SparkContext._
    import scala.util.{Success, Try}
    
    import org.apache.spark.ml.feature.Word2Vec
    import org.apache.spark.ml.linalg.Vector
    import org.apache.spark.sql.Row
    
    val input_labelled = labelledTweets.map(
          t => (t._1, word2VecModel2.transform(t._2.toDF("text")).select("result").first().getAs[org.apache.spark.ml.linalg.Vector](0)))
          .map(x => new LabeledPoint((x._1).toDouble, x._2)) 
    
    input_labelled.take(3).foreach(println)
    
    documentDF2: org.apache.spark.sql.DataFrame = [value: array<string>]
    word2Vec2: org.apache.spark.ml.feature.Word2Vec = w2v_643337d9029a
    word2VecModel2: org.apache.spark.ml.feature.Word2VecModel = w2v_643337d9029a
    input_labelled: org.apache.spark.rdd.RDD[org.apache.spark.ml.feature.LabeledPoint] = MapPartitionsRDD[52] at map at <console>:74
    input1: org.apache.spark.sql.DataFrame = [value: array<string>]
    model2: org.apache.spark.ml.linalg.Vector = [9.573533798832813E-5,-1.8443804499634973E-4,3.803069862805999E-5,-4.663512611061804E-5,1.3393058071633097E-4]
    [9.573533798832813E-5,-1.8443804499634973E-4,3.803069862805999E-5,-4.663512611061804E-5,1.3393058071633097E-4]
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 4 times, most recent failure: Lost task 0.3 in stage 21.0 (TID 159, sandbox-hdp.hortonworks.com, executor 1): java.lang.NullPointerException
        at $anonfun$1.apply(<console>:73)
        at $anonfun$1.apply(<console>:73)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    Driver stacktrace:
      at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
      at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
      at scala.Option.foreach(Option.scala:257)
      at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
      at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
      at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
      at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918)
      at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
      at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
      ... 58 elided
    Caused by: java.lang.NullPointerException
      at $anonfun$1.apply(<console>:73)
      at $anonfun$1.apply(<console>:73)
      at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
      at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    
    1 回复  |  直到 7 年前
        1
  •  1
  •   hoyland    7 年前

    很可能,您的一个输入有一个空字段。Spark会懒散地评估,所以在你跑步之前 take(3) ,您实际上没有进行任何计算,这就是为什么没有这一行就没有错误。

    此外,转换 RDD 到a DataFrame 然后应用变压器。