代码之家  ›  专栏  ›  技术社区  ›  tom10271

java.lang.StackOverflower错误加入spark submit,但不要在IDE中运行

  •  0
  • tom10271  · 技术社区  · 6 年前

    我开发了一个Spark 2.2协作过滤应用程序。它在IntelliJ中运行或调试都很好。我也可以进入Spark Web UI来检查流程。但当我试图将其部署到EMR并在本地测试spark submit时,程序运行不正常。

    spark submit命令的一部分:

    spark-submit -v --master local[*] --deploy-mode client --executor-memory 4G --num-executors 10 --conf spark.executor.extraJavaOptions="-Xss200M " --conf spark.executor.memory="500M" 
    
    def finalStep(sc: SparkContext): Unit = {
            val sameModel = MatrixFactorizationModel.load(sc, "CollaborativeFilter")
            val globalInterestStats = mutable.Map[
                Int, (DescriptiveStatistics, mutable.MutableList[Rating])
            ]()
    
            val taxonsForUsers = sameModel.recommendProductsForUsers(200)
    
            taxonsForUsers
                .collect()
                .flatMap(userToInterestArr => {
                    userToInterestArr._2.map(rating => {
                        if (globalInterestStats.get(rating.product).isEmpty) {
                            globalInterestStats(rating.product) = (
                                new DescriptiveStatistics(),
                                mutable.MutableList[Rating]()
                            )
                        }
    
                        globalInterestStats(rating.product)._1.addValue(rating.rating)
    
                        (rating, userToInterestArr._2)
                    })
                })
                .foreach(ratingToUserInterestArr => {
                    val rating = ratingToUserInterestArr._1
    
                    if (globalInterestStats.get(rating.product).isDefined) {
                        val interestStats = globalInterestStats(rating.product)
                        val userInterests = ratingToUserInterestArr._2
    
                        if (rating.rating >= interestStats._1.getPercentile(75)) {
                            userInterests.foreach(each => interestStats._2 += each)
                        }
                    }
                })
    
            println(globalInterestStats.toSeq.length) // ~300
    
            val globalInterestRDD = sc.parallelize(globalInterestStats.toSeq, 100)// No. of partition does not matter
            val nGlobalInterests = globalInterestStats.map(each => each._2._2.length).sum
    
    // It was not working in spark-submit but I managed to convert this part of code to simplify code before creating the RDD
            val taxonIDFMap = sc.parallelize(
                    globalInterestStats
                        .toSeq
                        .flatMap(each => {
                            each._2._2
                                .foldLeft(mutable.Map[Int, Double]())(op = (accu, value) => {
                                    if (accu.get(value.product).isEmpty) {
                                        accu(value.product) = 1
                                    } else {
                                        accu(value.product) += 1
                                    }
    
                                    accu
                                })
                                .toList
                    }), 100)
                .reduceByKey((accu, value) => accu + value)
                .map(each => {
                    val a: Double = Math.log10(nGlobalInterests / (1 + each._2)) / Math.log10(2)
    
                    (
                        each._1,
                        a
                    )
                })
                .collect()
                .toMap
    
    // Yet I have a way more complicated task need to operate on globalInterestRDD which I cannot simplify the size for Spark to handle
            val result = globalInterestRDD
                .count()
    
            sc.stop()
    
            println(result)
        }
    

    Exception in thread "dispatcher-event-loop-1" java.lang.StackOverflowError
        at java.io.ObjectOutputStream$ReplaceTable.lookup(ObjectOutputStream.java:2399)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1113)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        ...
    

    我猜这与: http://asyncified.io/2016/12/10/mutablelist-and-the-short-path-to-a-stackoverflowerror/

    1 回复  |  直到 5 年前
        1
  •  0
  •   tom10271    5 年前

    问题是

    val globalInterestStats = mutable.Map[
        Int, (DescriptiveStatistics, mutable.MutableList[Rating])
    ]()
    

    应该是

    val globalInterestStats = mutable.Map[
        Int, (DescriptiveStatistics, mutable.ArrayBuffer[Rating])
    ]()
    

    尽管spark应用程序为什么在IDE中工作而不是在spark提交中工作仍然没有意义