代码之家  ›  专栏  ›  技术社区  ›  clstaudt

EC2上的Spark群集仅使用一个节点

  •  2
  • clstaudt  · 技术社区  · 6 年前

    我用 flintrock 在amazon ec2上启动一个8+1节点的spark集群。

    > flintrock --config config.yaml launch cluster-8nodes
    

    然后我使用Flintrock登录到集群:

    > flintrock --config config.yaml login cluster-8nodes
    

    我正在运行的任务本质上就是在一个大的文本文件上计算这个简单的bigram代码:

    @contextmanager
    def use_spark_context(appName):
        conf = SparkConf().setAppName(appName) 
        spark_context = SparkContext(conf=conf)
    
        try:
            print("starting ", appName)
            yield spark_context
        finally:
            spark_context.stop()
            print("stopping ", appName)
    
    with use_spark_context("AppName") as spark:
        text_file = spark.textFile(text_path)
        bigrams = text_file.flatMap(lambda line: line.split(".")) \
                           .map(lambda line: line.strip().split(" ")) \
                           .flatMap(lambda xs: (tuple(x) for x in zip(xs, xs[1:])))
        counts = bigrams.map(lambda bigram: (bigram, 1)) \
                .reduceByKey(lambda x, y: x + y) \
                .filter(lambda bigram: bigram in name_bigrams) \
                .collect()
    

    它保存到一个.py文件中,并在通过Flintrock登录后提交如下:

    > PYSPARK_PYTHON=python3 spark-submit --num-executors 8 my_job.py --input data/bigtext.txt
    

    程序似乎运行良好,并产生以下输出。但是,除一个节点外,所有节点都处于空闲状态。这个设置不应该在集群的8个节点之间分发作业吗?

    18/06/08 09:50:48 INFO Executor: Finished task 10.0 in stage 0.0 (TID 10). 1998 bytes result sent to driver
    18/06/08 09:50:48 INFO TaskSetManager: Starting task 12.0 in stage 0.0 (TID 12, localhost, executor driver, partition 12, PROCESS_LOCAL, 4851 bytes)
    18/06/08 09:50:48 INFO Executor: Running task 12.0 in stage 0.0 (TID 12)
    18/06/08 09:50:48 INFO TaskSetManager: Finished task 10.0 in stage 0.0 (TID 10) in 30285 ms on localhost (executor driver) (11/382)
    18/06/08 09:50:48 INFO HadoopRDD: Input split: file:/home/ec2-user/data/enwiki-extract.txt:402653184+33554432
    18/06/08 09:50:53 INFO PythonRunner: Times: total = 32160, boot = -586, init = 588, finish = 32158
    18/06/08 09:50:54 INFO Executor: Finished task 11.0 in stage 0.0 (TID 11). 1998 bytes result sent to driver
    18/06/08 09:50:54 INFO TaskSetManager: Starting task 13.0 in stage 0.0 (TID 13, localhost, executor driver, partition 13, PROCESS_LOCAL, 4851 bytes)
    18/06/08 09:50:54 INFO TaskSetManager: Finished task 11.0 in stage 0.0 (TID 11) in 32785 ms on localhost (executor driver) (12/382)
    18/06/08 09:50:54 INFO Executor: Running task 13.0 in stage 0.0 (TID 13)
    18/06/08 09:50:54 INFO HadoopRDD: Input split: file:/home/ec2-user/data/enwiki-extract.txt:436207616+33554432
    18/06/08 09:51:19 INFO PythonRunner: Times: total = 30232, boot = -571, init = 578, finish = 30225
    18/06/08 09:51:19 INFO Executor: Finished task 12.0 in stage 0.0 (TID 12). 1998 bytes result sent to driver
    18/06/08 09:51:19 INFO TaskSetManager: Starting task 14.0 in stage 0.0 (TID 14, localhost, executor driver, partition 14, PROCESS_LOCAL, 4851 bytes)
    18/06/08 09:51:19 INFO Executor: Running task 14.0 in stage 0.0 (TID 14)
    18/06/08 09:51:19 INFO TaskSetManager: Finished task 12.0 in stage 0.0 (TID 12) in 30794 ms on localhost (executor driver) (13/382)
    18/06/08 09:51:19 INFO HadoopRDD: Input split: file:/home/ec2-user/data/enwiki-extract.txt:469762048+33554432
    18/06/08 09:51:25 INFO PythonRunner: Times: total = 31385, boot = -608, init = 611, finish = 31382
    18/06/08 09:51:26 INFO Executor: Finished task 13.0 in stage 0.0 (TID 13). 1998 bytes result sent to driver
    18/06/08 09:51:26 INFO TaskSetManager: Starting task 15.0 in stage 0.0 (TID 15, localhost, executor driver, partition 15, PROCESS_LOCAL, 4851 bytes)
    18/06/08 09:51:26 INFO TaskSetManager: Finished task 13.0 in stage 0.0 (TID 13) in 32061 ms on localhost (executor driver) (14/382)
    18/06/08 09:51:26 INFO Executor: Running task 15.0 in stage 0.0 (TID 15)
    18/06/08 09:51:26 INFO HadoopRDD: Input split: file:/home/ec2-user/data/enwiki-extract.txt:503316480+33554432
    

    编辑:如果我指定主url作为输出 flintrock launch spark-submit --master ,作业将启动但失败,因为找不到存储在登录节点上的本地输入文件:

    py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times, most recent failure: Lost task 3.3 in stage 0.0 (TID 30, 172.31.28.28, executor 5): java.io.FileNo$
    FoundException: File file:/home/ec2-user/data/enwiki-extract.txt does not exist
    

    登录节点不也是主节点吗?我的假设是主节点将读取文件并将其分区分发给工作节点。

    1 回复  |  直到 6 年前
        1
  •  0
  •   clstaudt    6 年前

    默认情况下, spark-submit 在本地模式下启动Spark。有效的方法是通过 --master spark://<masterURL>:7077 并设置 --num-executors 至少工作节点的数量,具体取决于群集配置。

    此外,在这种情况下,集群的每个节点都需要文件的完整本地副本。起初,这对我来说是意外的,因为我假设spark会通过网络自动将文件的分区分发给工作人员。