代码之家  ›  专栏  ›  技术社区  ›  Ram

使用Spark的集群中的“java.lang.NullPointerException”

  •  0
  • Ram  · 技术社区  · 7 年前

    我试图理解输入上的K均值聚类。csv文件由56376行和两列组成,第一列表示id,第二列表示该数据的一组单词/示例,如下所示:

    **1. 1428951621 do rememb于2013年4月19日至16日抵达米兰


    1. 今天**

    处理这些数据的Scala代码如下所示

    val inputData = sc.textFile("test.csv")
    
    
        // this is a changable parameter for the number of clusters to use for kmeans
       val numClusters = 4;
       // number of iterations for the kmeans
       val numIterations = 10;
       // this is the size of the vectors to be created by Word2Vec this is tunable
       val vectorSize = 600; 
    val filtereddata = inputData.filter(!_.isEmpty).
                                    map(line=>line.split(",",-1)).
                                    map(line=>(line(1),line(1).split(" ").filter(_.nonEmpty)))
    
    
    
    val corpus = inputData.filter(!_.isEmpty).
                              map(line=>line.split(",",-1)).
                              map(line=>line(1).split(" ").toSeq)
       val values:RDD[Seq[String]] = filtereddata.map(s=>s._2)
       val keys = filtereddata.map(s=>s._1)
    /*******************Word2Vec and normalisation*****************************/
       val w2vec = new Word2Vec().setVectorSize(vectorSize);
       val model = w2vec.fit(corpus)
       val outtest:RDD[Seq[Vector]]= values.map(x=>x.map(m=>try {
                 model.transform(m)
               } catch {
               case e: Exception => Vectors.zeros(vectorSize)
               }))
       val convertest = outtest.map(m=>m.map(x=>(x.toArray)))
    
       val withkey = keys.zip(convertest)
       val filterkey = withkey.filter(!_._2.isEmpty)
    
      val keysfinal= filterkey.map(x=>x._1)
      val valfinal= filterkey.map(x=>x._2)
      // for each collections of vectors that is one tweet, add the vectors
      val reducetest = valfinal.map(x=>x.reduce((a,b)=>a.zip(b).map(t=>t._1+t._2)))
      val filtertest = reducetest.map(x=>x.map(m=>(m,x.length)).map(m=>m._1/m._2))
      val test = filtertest.map(x=>new DenseVector(x).asInstanceOf[Vector])
       val normalizer =  new Normalizer()
       val data1= test.map(x=>(normalizer.transform(x)))
    /*********************Clustering Algorithm***********************************/
       val clusters = KMeans.train(data1,numClusters,numIterations)
       val predictions= clusters.predict(data1)
       val clustercount=  keysfinal.zip(predictions).distinct.map(s=>(s._2,1)).reduceByKey(_+_)
       val result= keysfinal.zip(predictions).distinct
       result.saveAsTextFile(fileToSaveResults)
       val wsse = clusters.computeCost(data1)
       println(s"The number of clusters is $numClusters")
       println("The cluster counts are:")
       println(clustercount.collect().mkString(" "))
       println(s"The wsse is: $wsse")
    

    然而,在一些迭代之后,它抛出一个“java.lang.NullPointerException”,并在第36阶段退出。错误如下所示:

    17/10/07 14:42:10 INFO TaskSchedulerImpl: Adding task set 26.0 with 2 tasks
    17/10/07 14:42:10 INFO TaskSetManager: Starting task 0.0 in stage 26.0 (TID 50, localhost, partition 0, ANY, 5149 bytes)
    17/10/07 14:42:10 INFO TaskSetManager: Starting task 1.0 in stage 26.0 (TID 51, localhost, partition 1, ANY, 5149 bytes)
    17/10/07 14:42:10 INFO Executor: Running task 1.0 in stage 26.0 (TID 51)
    17/10/07 14:42:10 INFO Executor: Running task 0.0 in stage 26.0 (TID 50)
    17/10/07 14:42:10 INFO deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
    17/10/07 14:42:10 INFO deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class
    17/10/07 14:42:10 INFO deprecation: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class
    17/10/07 14:42:10 INFO deprecation: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir
    17/10/07 14:42:10 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
    17/10/07 14:42:10 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
    17/10/07 14:42:10 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
    17/10/07 14:42:10 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
    17/10/07 14:42:10 ERROR Executor: Exception in task 0.0 in stage 26.0 (TID 50)
    java.lang.NullPointerException
        at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
        at org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
    

    由于我无法理解,请帮助我将此代码中的问题本地化。

    1 回复  |  直到 7 年前
        1
  •  0
  •   Community Egal    4 年前

    我认为这与你的代码无关。如果传递给 ProcessBuilder null . 所以我想这一定是配置问题或Hadoop中的bug。

    https://www.fachschaft.informatik.tu-darmstadt.de/forum/viewtopic.php?t=34250

    Is it possible to run Hadoop jobs (like the WordCount sample) in the local mode on Windows without Cygwin?