代码之家  ›  专栏  ›  技术社区  ›  Assafs

如何将Spark客户机提交应用程序翻译为Yarn Rest API?

  •  0
  • Assafs  · 技术社区  · 7 年前

    目前,我有一个工作代码实现,它向yarn提交一个应用程序,使用 spark.deploy.yarn.client公司 . 聚合客户机需要的所有参数很复杂,但提交应用程序很简单:

    ClientArguments cArgs = new ClientArguments(args.toArray(new String[0]));
    client = new Client(cArgs, sparkConf);
    applicationID = client.submitApplication();
    

    在这之前的大部分代码都是 斯帕康 阿尔茨海默病 . 现在我想退休了 顾客 只和休息一起工作。Spark提供了完整的RESTAPI,包括提交应用程序-根据 Spark documentation 这是一个简单的JSON/XML帖子的问题:

    POST http://<rm http address:port>/ws/v1/cluster/apps
    Accept: application/json
    Content-Type: application/json
    {
      "application-id":"application_1404203615263_0001",
      "application-name":"test",
      "am-container-spec":
    {
      "local-resources":
      {
        "entry":
        [
          {
            "key":"AppMaster.jar",
            "value":
            {
              "resource":"hdfs://hdfs-namenode:9000/user/testuser/DistributedShell/demo-app/AppMaster.jar",
              "type":"FILE",
              "visibility":"APPLICATION",
              "size": 43004,
              "timestamp": 1405452071209
            }
          }
        ]
      },
      "commands":
      {
        "command":"{{JAVA_HOME}}/bin/java -Xmx10m org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster --container_memory 10 --container_vcores 1 --num_containers 1 --priority 0 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr"
      },
      "environment":
      {
        "entry":
        [
          {
            "key": "DISTRIBUTEDSHELLSCRIPTTIMESTAMP",
            "value": "1405459400754"
          },
          {
            "key": "CLASSPATH",
            "value": "{{CLASSPATH}}<CPS>./*<CPS>{{HADOOP_CONF_DIR}}<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/*<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/lib/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/lib/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/lib/*<CPS>./log4j.properties"
          },
          {
            "key": "DISTRIBUTEDSHELLSCRIPTLEN",
            "value": "6"
          },
          {
            "key": "DISTRIBUTEDSHELLSCRIPTLOCATION",
            "value": "hdfs://hdfs-namenode:9000/user/testuser/demo-app/shellCommands"
          }
        ]
      }
    },
    "unmanaged-AM":false,
    "max-app-attempts":2,
    "resource":
    {
      "memory":1024,
      "vCores":1
    },
    "application-type":"YARN",
    "keep-containers-across-application-attempts":false,
    "log-aggregation-context":
    {
      "log-include-pattern":"file1",
      "log-exclude-pattern":"file2",
      "rolled-log-include-pattern":"file3",
      "rolled-log-exclude-pattern":"file4",
      "log-aggregation-policy-class-name":"org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AllContainerLogAggregationPolicy",
      "log-aggregation-policy-parameters":""
    },
    "attempt-failures-validity-interval":3600000,
    "reservation-id":"reservation_1454114874_1",
    "am-black-listing-requests":
    {
      "am-black-listing-enabled":true,
      "disable-failure-threshold":0.01
    }
    }
    

    我试图将我的论点转换成这个POST请求的JSON主体,但这似乎是不可能的。有人知道我是否可以从我提交的JSON负载通过REST发送的正在运行的应用程序进行反向工程吗?或者,我可以使用什么映射来获取客户机参数并将它们放在JSON中?

    1 回复  |  直到 7 年前
        1
  •  0
  •   Assafs    7 年前

    经过一番搜索,我只能从RESTAPI提交一个应用程序。这不是一个文件化的过程,所以我把它贴在这里。

    注: 如果在任何时候希望将请求的内容与客户端发送的请求进行比较,请使用调试断点检查客户端使用的应用程序上下文。 开课 org.apache.hadoop.yarn.client.api.impl.YarnClientImpl 然后转到方法 submitApplication(ApplicationSubmissionContext appContext) .

    首先,替换 spark.deploy.yarn.Client 对于RESTAPI请求,解决方案必须确保配置中提到的所有文件在HDFS上都可用。 稍后,它需要编写和上传一个额外的文件,名为 __spark_conf__.zip .

    步骤1

    查看 SparkConf (the Client 的第二个参数):中提到的文件 阿尔贾斯 标签,中提到的文件 主路径 和中提到的文件 文件列表 “。

    对于每个文件,检查HDFS中是否存在该文件,如果不存在,则从本地计算机上载该文件。对于每个文件 FileStatus 来自HDF。 聚合资源列表,该列表是包含这6个属性的每个文件的属性映射:

    • 大小= GETLIN()
    • 时间戳=GetModificationTime())
    • 类型=文件
    • 可见性=公共

    另外两个属性:键和资源。

    • Alljars列表中的文件:键为 斯帕克利布斯 /文件名,资源是文件名。
    • 文件列表中的文件:key是“localentry”标记,resource是“hdfspath”标记。
    • mainjarpath:key中的文件是“ 应用程序 .jar“,资源是文件名。

    步骤2

    创建 _ spark_u conf_uuuu.zip(火花配置拉链) 文件。您可以直接在hdfs中,在通常为 {{HDFS_base_folder}}/user/{{username}}/.sparkStaging/{{application_id}}/__spark_conf__.zip . 此存档文件包含两个文件和一个空目录:一个文件 __spark_hadoop_conf__.xml (重命名为 core-site.xml ,另一个文件称为 __spark_conf__.properties 这是稍微修改过的版本 的 斯帕康 配置中的节。

    创造 _ Spark配置属性 您需要从“”中读取JSON映射。 sparkconf”->“org$apache$spark$sparkconf$$设置 “,并从JSON格式转换每行 “spark.safemine.addcontrol.drivermemory”:“5120M” spark.safemine.addcontrol.drivermemory=5120米

    在文件底部添加6行新行:

    • spark.yarn.cache.confArchive=您将上载到的位置 _ spark_u conf_uuuu.zip(火花配置拉链) 在火花台
    • spark.yarn.cache.visibilities=文件的所有可见性,逗号分隔-基本上是“public,public,…”公众“}
    • spark.yarn.cache.timestamps=文件的所有时间戳,逗号分隔
    • spark.yarn.cache.types=所有类型的文件,逗号分隔-基本上是“文件,文件,…”文件“}
    • spark.yarn.cache.filenames=所有文件名和密钥,记录为资源键和逗号分隔的键
    • spark.yarn.cache.sizes=文件的所有大小,逗号分隔

    确保按照各自的顺序编译5个聚合行。我用了这个代码:

        String confArchive = "spark.yarn.cache.confArchive="+hdfs+"/user/"+userName+"/.sparkStaging/"+applicationId+"/__spark_conf__.zip";
        String filenames = "spark.yarn.cache.filenames=";
        String sizes = "spark.yarn.cache.sizes=";
        String timestamps = "spark.yarn.cache.timestamps=";
        String types = "spark.yarn.cache.types=";
        String visibilities = "spark.yarn.cache.visibilities=";
        for (Map<String,String> localResource:localResources) {
            filenames+=localResource.get("resource")+"#"+localResource.get("key")+",";
            sizes+=localResource.get("size")+",";
            timestamps+=localResource.get("timestamp")+",";
            types+=localResource.get("type")+",";
            visibilities+=localResource.get("visibility")+",";
    
        }
        properties+=confArchive+"\n";
        properties+=filenames.substring(0,filenames.length()-1)+"\n";
        properties+=sizes.substring(0,sizes.length()-1)+"\n";
        properties+=timestamps.substring(0,timestamps.length()-1)+"\n";
        properties+=types.substring(0,types.length()-1)+"\n";
        properties+=visibilities.substring(0,visibilities.length()-1)+"\n";
    

    这个 _ Spark Hadoop配置xml 文件是的简单重命名 核心站点XML ,用它们创建的文件夹命名为 __hadoop_conf__ 而且是空的。

    您可以直接将文件保存到HDF,如下所示:

    private void generateSparkConfInHdfs(String applicationId, String userName, String sparkConfProperties, String sparkHadoopConf) throws IOException {
        String path = hdfs+"/user/"+userName+"/.sparkStaging/"+applicationId+"/__spark_conf__.zip";
        Path hdfsPath = new Path(path);
        ZipOutputStream os = new ZipOutputStream(getHdfs().create(hdfsPath));
        os.putNextEntry(new ZipEntry("__hadoop_conf__/"));
        os.putNextEntry(new ZipEntry("__spark_conf__.properties"));
        os.write(sparkConfProperties.getBytes(),0,sparkConfProperties.getBytes().length);
        os.putNextEntry(new ZipEntry("__spark_hadoop_conf__.xml"));
        os.write(sparkHadoopConf.getBytes(),0,sparkHadoopConf.getBytes().length);
        os.close();
    }
    

    创建完文件后,使用以下规范将其添加到资源列表中:

    • 大小= GETLIN()
    • 时间戳=GetModificationTime())
    • 类型=存档
    • 可见性=私有
    • 密钥= __spark_conf__
    • 资源是临时目录(通常 hdfs_base_folder/user/username/.sparkstaging/application/u spark_conf_uuuuuu.zip )

    浏览完整的资源列表,并使用我们在占位符中收集的值,从中为每个资源创建一个具有此结构的XML/JSON:

        <entry>
            <key>{{key}}</key>
            <value>
                <resource>{{resource}}</resource>
                <size>{{size}}</size>
                <timestamp>{{timestamp}}</timestamp>
                <type>{{type}}</type>
                <visibility>{{visibility}}</visibility>
            </value>
        </entry>
    

    累积的字符串将是 localResources XML段如下所示。

    步骤3

    生成Java命令。您需要从sparkconfig中提取一些元素:

    • DriverMemory-来自 sparkConf
    • ExtraJavaOptions=来自 spark.driver.extraJavaOptions 在属性集合中
    • mainClass-来自 斯帕康
    • argstr-收集所有 ClientArgs 除了一级。

    包含元素的result命令是:

    String command = "$JAVA_HOME/bin/java -server -Xmx"+driverMemory+" -Djava.io.tmpdir=$PWD/tmp "+extraJavaOptions+" -Dspark.yarn.app.container.log.dir=&lt;LOG_DIR&gt; "
                + "org.apache.spark.deploy.yarn.ApplicationMaster --class "+mainClass+" "+argstr+" "
                + "--properties-file $PWD/__spark_conf__/__spark_conf__.properties 1&gt; &lt;LOG_DIR&gt;/stdout 2&gt; &lt;LOG_DIR&gt;/stderr";
    

    步骤4

    正在编译请求XML。

    注释 :我的实现要求AM容器上有一个标签,因此添加了AM容器节点标签表达式。这并不适用于所有情况。

    sparkconf到rest请求的映射如下(在XML中显示,还支持JSON实现):

    <application-submission-context>
        <application-id>"+applicationId+"</application-id> 
        <application-name>"+appName+"</application-name>
        <queue>default</queue>
        <priority>0</priority>
        <am-container-spec>
           <local-resources>+localResources+</local-resources>
           <environment>
              <entry>
                 <key>SPARK_YARN_STAGING_DIR</key>
                 <value>"+hdfs+"/user/"+userName+"/.sparkStaging/"+applicationId+"</value>
              </entry>
              <entry>
                 <key>CLASSPATH</key>
                 <value>$PWD:$PWD/__spark_conf__:$PWD/__spark_libs__/*:/spark-non-hdfs-storage/spark-assembly-2.3.0-hadoop2.7/*:%HADOOP_CONF_DIR%:%HADOOP_COMMON_HOME%/share/hadoop/common/*:%HADOOP_COMMON_HOME%/share/hadoop/common/lib/*:%HADOOP_HDFS_HOME%/share/hadoop/hdfs/*:%HADOOP_HDFS_HOME%/share/hadoop/hdfs/lib/*:%HADOOP_YARN_HOME%/share/hadoop/yarn/*:%HADOOP_YARN_HOME%/share/hadoop/yarn/lib/*:%HADOOP_MAPRED_HOME%/share/hadoop/mapreduce/*:%HADOOP_MAPRED_HOME%/share/hadoop/mapreduce/lib/*:$PWD/__spark_conf__/__hadoop_conf__</value>
              </entry>
              <entry>
                 <key>SPARK_USER</key>
                 <value>"+userName+"</value>
              </entry>
           </environment>
           <commands>
              <command>"+command+"</command>
           </commands>
        </am-container-spec>
        <unmanaged-AM>false</unmanaged-AM>
        <max-app-attempts>1</max-app-attempts>
        <resource>
          <memory>5632</memory>
          <vCores>1</vCores>
        </resource>
        <application-type>SPARK</application-type>
        <keep-containers-across-application-attempts>false</keep-containers-across-application-attempts>
        <application-tags>
          <tag>"+sparkYarnTag+"</tag>
        </application-tags>
        <am-container-node-label-expression>appMngr</am-container-node-label-expression>
        <log-aggregation-context/>
        <attempt-failures-validity-interval>1</attempt-failures-validity-interval>
        <reservation-id/>
    </application-submission-context> 
    

    步骤5:

    通过REST HTTP PUT提交应用程序:

    private void submitApplication (String body, String userName) throws SMSparkManagerException {
        HttpClient client = HttpClientBuilder.create().build();
        HttpPost request = new HttpPost(uri+"?user.name="+userName);
        try {
            request.setEntity(new StringEntity(body, ContentType.APPLICATION_XML));
            HttpResponse response = client.execute(request);
            if (response.getStatusLine().getStatusCode()!=202) {
                throw new SMSparkManagerException("The application could not be submitted to Yarn, response http code "+response.getStatusLine().getStatusCode());
            }
        } catch (UnsupportedEncodingException e) {
            logger.error("The application Could not be submitted due to UnsupportedEncodingException in the provided body: "+body, e );
            throw new SMSparkManagerException("Error in submitting application to yarn");
        } catch (ClientProtocolException e) {
            logger.error("The application Could not be submitted due to ClientProtocolException", e);
            throw new SMSparkManagerException("Error in submitting application to yarn");
        } catch (IOException e) {
            logger.error("The application Could not be submitted due to IOException", e);
            throw new SMSparkManagerException("Error in submitting application to yarn");
        }
    }
    
    推荐文章