代码之家  ›  专栏  ›  技术社区  ›  tashoyan

在Docker容器中运行Spark驱动程序-执行器与驱动程序之间没有连接?

  •  7
  • tashoyan  · 技术社区  · 7 年前

    更新: 问题解决了。Docker图像如下: docker-spark-submit

    我用Docker容器中的一个胖罐子运行spark submit。我的独立Spark cluster在3台虚拟机上运行——一台主机和两台工作机。从工作机器上的执行器日志中,我看到执行器具有以下驱动程序URL:

    “--驱动程序url”spark://CoarseGrainedScheduler@172.17.0.2:5001"

    正如我从StandaloneSchedulerBackend的源代码中看到的,它使用spark.driver构建了driverUrl。主机设置:

    val driverUrl = RpcEndpointAddress(
      sc.conf.get("spark.driver.host"),
      sc.conf.get("spark.driver.port").toInt,
      CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
    

    它没有考虑SPARK\u PUBLIC\u DNS环境变量-这是否正确?在容器中,我无法设置spark.driver。托管除容器“内部”IP地址(本例中为172.17.0.2)之外的任何其他内容。当试图设置spark.driver时。主机的IP地址,我得到如下错误:

    警告Utils:服务“sparkDriver”无法在端口5001上绑定。 正在尝试端口5002。

    我试着设置spark.driver。bindAddress到主机的IP地址,但出现了相同的错误。 那么,我如何配置Spark以使用主机IP地址而不是Docker容器地址与驱动程序通信?

    UPD:来自执行器的堆栈跟踪:

    ERROR RpcOutboxMessage: Ask timeout before connecting successfully
    Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713)
        at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66)
        at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
        at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:284)
        at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
    Caused by: org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
        at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216)
        at scala.util.Try$.apply(Try.scala:192)
        at scala.util.Failure.recover(Try.scala:216)
        at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
        at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
        at org.spark_project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
        at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136)
        at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
        at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
        at scala.concurrent.Promise$class.complete(Promise.scala:55)
        at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
        at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
        at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
        at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
        at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
        at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
        at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
        at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
        at scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
        at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
        at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
        at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
        at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
        at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
        at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
        at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153)
        at org.apache.spark.rpc.netty.NettyRpcEnv.org$apache$spark$rpc$netty$NettyRpcEnv$$onFailure$1(NettyRpcEnv.scala:205)
        at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:239)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 120 seconds
        ... 8 more
    
    3 回复  |  直到 4 年前
        1
  •  6
  •   tashoyan    7 年前

    因此,工作配置为:

    • 设置spark.driver。主机到主机的IP地址

    工作Docker图像如下: docker-spark-submit .

        2
  •  5
  •   Thomas Decaux    6 年前

    • 在同一Docker容器中运行Spark 1.6.3 master+worker
    • 从MacOS运行Java应用程序(通过IDE)

    ports:
    - 7077:7077
    - 20002:20002
    - 6060:6060
    

            esSparkConf.setMaster("spark://127.0.0.1:7077");
            esSparkConf.setAppName("datahub_dev");
    
            esSparkConf.setIfMissing("spark.driver.port", "20002");
            esSparkConf.setIfMissing("spark.driver.host", "MAC_OS_LAN_IP");
            esSparkConf.setIfMissing("spark.driver.bindAddress", "0.0.0.0");
            esSparkConf.setIfMissing("spark.blockManager.port", "6060");
    
        3
  •  5
  •   OneCricketeer    4 年前

    我注意到其他答案是使用Spark Standalone(在VM上,如OP或 127.0.0.1

    我想展示一下什么对我运行 jupyter/pyspark-notebook 针对远程AWS Mesos群集,并在Mac上的Docker中本地运行容器。

    these instuctions apply 然而 --net=host
    重要的一步是在Mesos slaves的操作系统上创建笔记本用户,如链接中所述。

    This diagram 有助于调试网络,但没有提及 spark.driver.blockManager.port ,这实际上是使其工作的最后一个参数,我在Spark文档中遗漏了它。否则,Mesos从机上的执行器也会尝试绑定该块管理器端口,而Mesos拒绝分配该端口。

    enter image description here

    • Jupyter用户界面( 8888
    • Spark用户界面( 4040

    这些端口使Mesos能够回到驾驶员身边: 重要的

    • “libprocess”地址+端口似乎通过 LIBPROCESS_PORT Mesos documentation
    • 火花驱动器端口(随机:33139)+16 spark.port.maxRetries
    • 火花块管理器端口(随机:45029)+16 spark.port.maxRetries

    不太相关,但我使用的是Jupyter实验室界面

    export EXT_IP=<your external IP>
    
    docker run \
      -p 8888:8888 -p 4040:4040 \
      -p 37899:37899 \
      -p 33139-33155:33139-33155 \
      -p 45029-45045:45029-45045 \
      -e JUPYTER_ENABLE_LAB=y \
      -e EXT_IP \
      -e LIBPROCESS_ADVERTISE_IP=${EXT_IP} \
      -e LIBPROCESS_PORT=37899 \
      jupyter/pyspark-notebook
    

    一旦开始,我就去 localhost:8888 地址为Jupyter,只需打开一个终端进行简单 spark-shell 行动我还可以为实际打包的代码添加卷装载,但这是下一步。

    spark-env.sh spark-default.conf ,因此我将所有相关会议传递给 现在。提醒:这是在容器内

    spark-shell --master mesos://zk://quorum.in.aws:2181/mesos \
      --conf spark.executor.uri=https://path.to.http.server/spark-2.4.2-bin-hadoop2.7.tgz \
      --conf spark.cores.max=1 \
      --conf spark.executor.memory=1024m \
      --conf spark.driver.host=$LIBPROCESS_ADVERTISE_IP \
      --conf spark.driver.bindAddress=0.0.0.0 \
      --conf spark.driver.port=33139 \
      --conf spark.driver.blockManager.port=45029
    

    这将加载Spark REPL,在一些关于查找Mesos主机和注册框架的输出之后,我然后使用NameNode IP从HDFS读取一些文件(尽管我怀疑任何其他可访问的文件系统或数据库都应该工作)

    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 2.4.2
          /_/
    
    Using Scala version 2.12.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_202)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala> spark.read.text("hdfs://some.hdfs.namenode:9000/tmp/README.md").show(10)
    +--------------------+
    |               value|
    +--------------------+
    |      # Apache Spark|
    |                    |
    |Spark is a fast a...|
    |high-level APIs i...|
    |supports general ...|
    |rich set of highe...|
    |MLlib for machine...|
    |and Spark Streami...|
    |                    |
    |<http://spark.apa...|
    +--------------------+
    only showing top 10 rows