代码之家  ›  专栏  ›  技术社区  ›  Rahul Koshaley

具有多个上下文的Spark作业失败

  •  2
  • Rahul Koshaley  · 技术社区  · 8 年前

    在spark应用程序中,我们创建了两个spark上下文,

    1) 用于从文件系统读取数据。

    2) 用于将数据连接和加载到Cassandra。

    在一个应用程序中,我们只能运行一个spark-spark上下文,所以我们先停止,然后再开始第二个。

    我遇到了以下错误。

    Error 1) 16/03/10 05:40:44 ERROR Utils: Uncaught exception in thread      Thread-2
    java.io.IOException: Target log file already exists        (hdfs:///var/log/spark/apps/application_1457586850134_0001_2)
        at `org.apache.spark.scheduler.EventLoggingListener.stop(EventLoggingListener.scala:225)`
    

    这是因为,在hadoop中,在运行作业(第一个上下文)时不需要存在目录/文件,所以在运行第二个上下文时,存在日志目录/日志文件 上述错误也是如此。

    我通过设置spark.eventLog.overwrite=true解决了这个问题


    Error 2) WARN executor.CoarseGrainedExecutorBackend: An unknown (ip-10-93-141-13.ec2.internal:48849) driver disconnected. 16/03/10 06:47:37                                                            ERROR executor.CoarseGrainedExecutorBackend: Driver 10.93.141.13:48849disassociated! Shutting down.
    

    我尝试增加

    spark.yarn.driver.memoryOverhead=1024

    spark.yarn.executor.memoryOverhead=1024

    但问题仍然存在。


    错误3)

    Exception in thread "main" java.io.IOException: Failed to connect to /10.93.141.13:46008
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
        at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:200)
        at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:187)
        at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:183)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
    

    我已经签入核心节点,它没有在端口46008上侦听。


    错误4)

    WARN YarnAllocator: Container marked as failed:              container_1457586850134_0006_01_000006 on host: ip-10-164-169-         46.ec2.internal. Exit status: 1. Diagnostics: Exception from container-   launch.
    Container id: container_1457586850134_0006_01_000006
    Exit code: 1
    Stack trace: ExitCodeException exitCode=1:
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:545)
    at org.apache.hadoop.util.Shell.run(Shell.java:456)
    at   org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:722)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
    
    
    Container exited with a non-zero exit code 1
    
    16/03/10 06:47:17 WARN ApplicationMaster: Reporter thread fails 1 time(s) in a row.
    java.lang.IllegalStateException: RpcEnv already stopped.
    at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:159)
        at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:131)
        at org.apache.spark.rpc.netty.NettyRpcEnv.send(NettyRpcEnv.scala:192)
        at org.apache.spark.rpc.netty.NettyRpcEndpointRef.send(NettyRpcEnv.scala:516)
        at org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$processCompletedContainers$1$$anonfun$apply$7.apply(YarnAllocator.scala:531)
        at org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$processCompletedContainers$1$$anonfun$apply$7.apply(YarnAllocator.scala:512)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$processCompletedContainers$1.apply(YarnAllocator.scala:512)
        at org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$processCompletedContainers$1.apply(YarnAllocator.scala:442)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at org.apache.spark.deploy.yarn.YarnAllocator.processCompletedContainers(YarnAllocator.scala:442)
        at org.apache.spark.deploy.yarn.YarnAllocator.allocateResources(YarnAllocator.scala:242)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$1.run(ApplicationMaster.scala:368)
    

    上述错误是因为容器连续发生故障。


    正如邮件列表中所提到的,上述火花问题似乎是由于多个上下文造成的。

    https://mail-archives.apache.org/mod_mbox/spark-issues/201602.mbox/%3CJIRA.12936774.1454603470000.316442.1454707659978@Atlassian.JIRA%3E

    sparkContext。stop()无法释放资源。


    我正在使用以下选项运行

    --class com.mobi.vserv.driver.Query5kPids                            
      --conf spark.eventLog.overwrite=true  
      --conf spark.yarn.executor.memoryOverhead=1024                       
      --conf spark.yarn.driver.memoryOverhead=1024                             
      --num-executors 4                
      --executor-memory 3g            
      --executor-cores 2                  
      --driver-memory 3g
    

    我在EMR上运行,有一个主节点和两个从节点,主节点有8核和16GB内存,每个从节点有4核5120MB可用内存。

    下面是我的代码。

        public class Query5kPids implements Serializable{
    
        static List<UserSetGet> ListFromS3 = new ArrayList<UserSetGet>();
    
        public static void main(String[] args) throws JSONException, IOException, InterruptedException, URISyntaxException {
    
    
        SparkConf conf = new SparkConf();
        conf.setAppName("Spark-Cassandra Integration");
        conf.setMaster("yarn-cluster");
        conf.set("spark.cassandra.connection.host", "12.16.193.19");
        conf.set("spark.cassandra.connection.port", "9042");
    
    
        SparkConf conf1 = new SparkConf().setAppName("SparkAutomation").setMaster("yarn-cluster");
    
        Query5kPids app1 = new Query5kPids(conf1);
        app1.run1(file);
    
        Query5kPids app = new Query5kPids(conf);
        System.out.println("Both RDD has been generated");
        app.run();
    
       }
    
       private void run() throws JSONException, IOException, InterruptedException {
    
       JavaSparkContext sc = new JavaSparkContext(conf);
       query(sc);
       sc.stop();
       }
    
       private void run1(File file) throws JSONException, IOException,     InterruptedException {
       JavaSparkContext sc = new JavaSparkContext(conf);
       getData(sc,file);
       sc.stop();
    
      }
    
       private void getData(JavaSparkContext sc, File file) {
    
       JavaRDD<String> Data = sc.textFile(file.toString());
       System.out.println("RDD Count is " + Data.count());
    
       // Other map opetations to convert to UserSetGet RDD.
       ListFromS3 = Data.collect();
    
       }
       private void query(JavaSparkContext sc) {
    
       System.out.prin`enter code here`tln("RDD Count is " +  ListFromS3.size());
       //This gets printed. 
       //Which means it application is coming to the second part of the program.
    
       for (int i = 0; i < ListFromS3.size(); i++) {
    
       sb.append(ListFromS3.get(i).getApnid());
       sb.append(',');
       }
       sb.setLength(sb.length() - 3);
    
       JavaRDD<CassandraRow> cassandraRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable("dmp", "user_profile_spark_test").select       ("app_day_count", "app_first_seen","app_last_seen", "app_usage_count", "total_day_count", "total_usage_count")
     .where("apnid IN ('" + sb + "')");
    
       if(cassandraRDD.isEmpty()){
    
       JavaRDD<UserSetGet> rddFromGz = sc.parallelize(ListFromS3);
    
       CassandraJavaUtil.javaFunctions(rddFromGz).writerBuilder("dmp", "user_profile_spark_test", mapToRow(UserSetGet.class)).saveToCassand();
            logger.info("DataSaved");
       }
       }
    
       }
    

    下面是我的POM

       <dependencies>
       <dependency>
       <groupId>org.apache-extras.cassandra-jdbc</groupId>
       <artifactId>cassandra-jdbc</artifactId>
       <version>1.2.5</version>
       </dependency>
    
       <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <version>3.8.1</version>
       <scope>test</scope>
       </dependency>
       <dependency>
       <groupId>org.codehaus.jettison</groupId>
       <artifactId>jettison</artifactId>
       <version>1.3.7</version>
       </dependency>
       <dependency>
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
       <version>1.2.17</version>
       </dependency>
    
    
       <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_2.10</artifactId>
       <version>1.6.0</version>
       </dependency>
    
       <dependency>
       <groupId>com.datastax.spark</groupId>
       <artifactId>spark-cassandra-connector_2.10</artifactId>
       <version>1.5.0-M1</version>
       </dependency>
    
       <dependency>
       <groupId>com.datastax.cassandra</groupId>
       <artifactId>cassandra-driver-core</artifactId>
       <version>2.1.6</version>
       </dependency>
    
       <dependency>
       <groupId>com.datastax.spark</groupId>
       <artifactId>spark-cassandra-connector-java_2.10</artifactId>
       <version>1.5.0-M3</version>
       </dependency>
    
       <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-collections4</artifactId>
       <version>4.1</version>
       </dependency>
       </dependencies>
    
    1 回复  |  直到 2 年前
        1
  •  1
  •   Nikolay Vasiliev    4 年前

    我们正在使用本地spark上下文运行测试,并使用以下“黑客”解决冲突问题:

    sc.stop()
    // To avoid Akka rebinding to the same port, since it doesn't unbind 
    // immediately on shutdown
    
    System.clearProperty("spark.driver.port")
    

    你为什么使用两种不同的火花情境?为什么你不能只用1?