代码之家  ›  专栏  ›  技术社区  ›  Muz

如果迭代超过20次,Spark ML ALS协作过滤总是失败[重复]

  •  0
  • Muz  · 技术社区  · 6 年前

    我的数据集大小大约是3G,有3.8亿个数据。如果我添加迭代步骤,总是错的。增加内存,增加块或减少块,减少检查点不能解决我的问题。

    原因:java.net.ConnectException:在java.net.PlainSocketImpl.socketConnect(本机方法)上拒绝连接(拒绝连接)

    设置小检查点的方法不能解决我的问题。 StackOverflow-error when applying pyspark ALS's "recommendProductsForUsers" (although cluster of >300GB Ram available)

    这是ALS训练的数据帧,大约有3.8亿行。

    +---------+-----------+------+
    | user_id|item_id|rating|
    +---------+-----------+------+
    |154317644|      58866|     6|
    | 69669214|     601866|     7|
    |126094876|     909352|     3|
    | 45246613|    1484481|     3|
    |123317968|    2101977|     3|
    |   375928|    2681933|     1|
    |136939309|    3375806|     2|
    |  3150751|    4198976|     2|
    | 87648646|    1030196|     3|
    | 57672425|    5385142|     2|
    +---------+-----------+------+
    

    这是训练ALS的代码。

    val als = new ALS()
      .setMaxIter(setMaxIter)
      .setRegParam(setRegParam)
      .setUserCol("user_id")
      .setItemCol("item_id")
      .setRatingCol("rating")
      .setImplicitPrefs(false)
      .setCheckpointInterval(setCheckpointInterval)
      .setRank(setRank)
      .setNumItemBlocks(setNumItemBlocks)
      .setNumUserBlocks(setNumUserBlocks)
    
    val Array(training, test) = ratings.randomSplit(Array(0.9, 0.1))
    val model = als.fit(training)   // wrong in this step
    

    这是发生错误的ALS源代码。

    val srcOut = srcOutBlocks.join(srcFactorBlocks).flatMap {
      case (srcBlockId, (srcOutBlock, srcFactors)) =>
        srcOutBlock.view.zipWithIndex.map { case (activeIndices, dstBlockId) =>
          (dstBlockId, (srcBlockId, activeIndices.map(idx => srcFactors(idx))))
        }
    }
    

    这是异常和错误日志。

        18/08/23 15:05:43 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
    18/08/23 15:13:35 WARN scheduler.TaskSetManager: Lost task 20.0 in stage 56.0 (TID 31322, 6.ai.bjs-datalake.p1staff.com, executor 9): java.lang.StackOverflowError
        at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2669)
        at java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:3170)
        at java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1678)
    
    18/08/23 15:13:35 WARN server.TransportChannelHandler: Exception in connection from /10.191.161.108:23300
    java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:192)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    
    18/08/23 15:13:36 ERROR cluster.YarnClusterScheduler: Lost executor 15 on 2.ai.bjs-datalake.p1staff.com: Container marked as failed: container_e04_1533096025492_4001_01_000016 on host: 2.ai.bjs-datalake.p1staff.com. Exit status: 50. Diagnostics: Exception from container-launch.
    Container id: container_e04_1533096025492_4001_01_000016
    Exit code: 50
    Stack trace: ExitCodeException exitCode=50: 
        at org.apache.hadoop.util.Shell.runCommand(Shell.java:585)
        at org.apache.hadoop.util.Shell.run(Shell.java:482)
    
    
    18/08/23 15:05:43 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
    18/08/23 15:13:35 WARN scheduler.TaskSetManager: Lost task 20.0 in stage 56.0 (TID 31322, 6.ai.bjs-datalake.p1staff.com, executor 9): java.lang.StackOverflowError
        at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2669)
        at java.io.ObjectInputStream$BlockDataInputStream.readInt(ObjectInputStream.java:3170)
        at java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1678)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1739)
    
    18/08/23 15:13:36 ERROR cluster.YarnClusterScheduler: Lost executor 10 on 5.ai.bjs-datalake.p1staff.com: Container marked as failed: container_e04_1533096025492_4001_01_000011 on host: 5.ai.bjs-datalake.p1staff.com. Exit status: 50. Diagnostics: Exception from container-launch.
    Container id: container_e04_1533096025492_4001_01_000011
    Exit code: 50
    Stack trace: ExitCodeException exitCode=50: 
        at org.apache.hadoop.util.Shell.runCommand(Shell.java:585)
        at org.apache.hadoop.util.Shell.run(Shell.java:482)
        at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:776)
        at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
        at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
        at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    

    有人遇到这个错误吗?

    1 回复  |  直到 6 年前
        1
  •  2
  •   Muz    6 年前

    设置了check point目录之后,它就工作了。谢谢@eliasah

    spark.sparkContext.setCheckpointDir("hdfs://datalake/check_point_directory/als")

    如果不设置目录,则检查点将不起作用。