经过一番搜索,我只能从RESTAPI提交一个应用程序。这不是一个文件化的过程,所以我把它贴在这里。
注:
如果在任何时候希望将请求的内容与客户端发送的请求进行比较,请使用调试断点检查客户端使用的应用程序上下文。
开课
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl
然后转到方法
submitApplication(ApplicationSubmissionContext appContext)
.
首先,替换
spark.deploy.yarn.Client
对于RESTAPI请求,解决方案必须确保配置中提到的所有文件在HDFS上都可用。
稍后,它需要编写和上传一个额外的文件,名为
__spark_conf__.zip
.
步骤1
查看
SparkConf
(the
Client
的第二个参数):中提到的文件
阿尔贾斯
标签,中提到的文件
主路径
和中提到的文件
文件列表
“。
对于每个文件,检查HDFS中是否存在该文件,如果不存在,则从本地计算机上载该文件。对于每个文件
FileStatus
来自HDF。
聚合资源列表,该列表是包含这6个属性的每个文件的属性映射:
-
大小= GETLIN()
-
时间戳=GetModificationTime())
-
类型=文件
-
可见性=公共
另外两个属性:键和资源。
-
Alljars列表中的文件:键为
斯帕克利布斯
/文件名,资源是文件名。
-
文件列表中的文件:key是“localentry”标记,resource是“hdfspath”标记。
-
mainjarpath:key中的文件是“
应用程序
.jar“,资源是文件名。
步骤2
创建
_ spark_u conf_uuuu.zip(火花配置拉链)
文件。您可以直接在hdfs中,在通常为
{{HDFS_base_folder}}/user/{{username}}/.sparkStaging/{{application_id}}/__spark_conf__.zip
.
此存档文件包含两个文件和一个空目录:一个文件
__spark_hadoop_conf__.xml
(重命名为
core-site.xml
,另一个文件称为
__spark_conf__.properties
这是稍微修改过的版本
的
斯帕康
配置中的节。
创造
_ Spark配置属性
您需要从“”中读取JSON映射。
sparkconf”->“org$apache$spark$sparkconf$$设置
“,并从JSON格式转换每行
“spark.safemine.addcontrol.drivermemory”:“5120M”
到
spark.safemine.addcontrol.drivermemory=5120米
在文件底部添加6行新行:
-
spark.yarn.cache.confArchive=您将上载到的位置
_ spark_u conf_uuuu.zip(火花配置拉链)
在火花台
-
spark.yarn.cache.visibilities=文件的所有可见性,逗号分隔-基本上是“public,public,…”公众“}
-
spark.yarn.cache.timestamps=文件的所有时间戳,逗号分隔
-
spark.yarn.cache.types=所有类型的文件,逗号分隔-基本上是“文件,文件,…”文件“}
-
spark.yarn.cache.filenames=所有文件名和密钥,记录为资源键和逗号分隔的键
-
spark.yarn.cache.sizes=文件的所有大小,逗号分隔
确保按照各自的顺序编译5个聚合行。我用了这个代码:
String confArchive = "spark.yarn.cache.confArchive="+hdfs+"/user/"+userName+"/.sparkStaging/"+applicationId+"/__spark_conf__.zip";
String filenames = "spark.yarn.cache.filenames=";
String sizes = "spark.yarn.cache.sizes=";
String timestamps = "spark.yarn.cache.timestamps=";
String types = "spark.yarn.cache.types=";
String visibilities = "spark.yarn.cache.visibilities=";
for (Map<String,String> localResource:localResources) {
filenames+=localResource.get("resource")+"#"+localResource.get("key")+",";
sizes+=localResource.get("size")+",";
timestamps+=localResource.get("timestamp")+",";
types+=localResource.get("type")+",";
visibilities+=localResource.get("visibility")+",";
}
properties+=confArchive+"\n";
properties+=filenames.substring(0,filenames.length()-1)+"\n";
properties+=sizes.substring(0,sizes.length()-1)+"\n";
properties+=timestamps.substring(0,timestamps.length()-1)+"\n";
properties+=types.substring(0,types.length()-1)+"\n";
properties+=visibilities.substring(0,visibilities.length()-1)+"\n";
这个
_ Spark Hadoop配置xml
文件是的简单重命名
核心站点XML
,用它们创建的文件夹命名为
__hadoop_conf__
而且是空的。
您可以直接将文件保存到HDF,如下所示:
private void generateSparkConfInHdfs(String applicationId, String userName, String sparkConfProperties, String sparkHadoopConf) throws IOException {
String path = hdfs+"/user/"+userName+"/.sparkStaging/"+applicationId+"/__spark_conf__.zip";
Path hdfsPath = new Path(path);
ZipOutputStream os = new ZipOutputStream(getHdfs().create(hdfsPath));
os.putNextEntry(new ZipEntry("__hadoop_conf__/"));
os.putNextEntry(new ZipEntry("__spark_conf__.properties"));
os.write(sparkConfProperties.getBytes(),0,sparkConfProperties.getBytes().length);
os.putNextEntry(new ZipEntry("__spark_hadoop_conf__.xml"));
os.write(sparkHadoopConf.getBytes(),0,sparkHadoopConf.getBytes().length);
os.close();
}
创建完文件后,使用以下规范将其添加到资源列表中:
-
大小= GETLIN()
-
时间戳=GetModificationTime())
-
类型=存档
-
可见性=私有
-
密钥=
__spark_conf__
-
资源是临时目录(通常
hdfs_base_folder/user/username/.sparkstaging/application/u spark_conf_uuuuuu.zip
)
浏览完整的资源列表,并使用我们在占位符中收集的值,从中为每个资源创建一个具有此结构的XML/JSON:
<entry>
<key>{{key}}</key>
<value>
<resource>{{resource}}</resource>
<size>{{size}}</size>
<timestamp>{{timestamp}}</timestamp>
<type>{{type}}</type>
<visibility>{{visibility}}</visibility>
</value>
</entry>
累积的字符串将是
localResources
XML段如下所示。
步骤3
生成Java命令。您需要从sparkconfig中提取一些元素:
-
DriverMemory-来自
sparkConf
-
ExtraJavaOptions=来自
spark.driver.extraJavaOptions
在属性集合中
-
mainClass-来自
斯帕康
-
argstr-收集所有
ClientArgs
除了一级。
包含元素的result命令是:
String command = "$JAVA_HOME/bin/java -server -Xmx"+driverMemory+" -Djava.io.tmpdir=$PWD/tmp "+extraJavaOptions+" -Dspark.yarn.app.container.log.dir=<LOG_DIR> "
+ "org.apache.spark.deploy.yarn.ApplicationMaster --class "+mainClass+" "+argstr+" "
+ "--properties-file $PWD/__spark_conf__/__spark_conf__.properties 1> <LOG_DIR>/stdout 2> <LOG_DIR>/stderr";
步骤4
正在编译请求XML。
注释
:我的实现要求AM容器上有一个标签,因此添加了AM容器节点标签表达式。这并不适用于所有情况。
sparkconf到rest请求的映射如下(在XML中显示,还支持JSON实现):
<application-submission-context>
<application-id>"+applicationId+"</application-id>
<application-name>"+appName+"</application-name>
<queue>default</queue>
<priority>0</priority>
<am-container-spec>
<local-resources>+localResources+</local-resources>
<environment>
<entry>
<key>SPARK_YARN_STAGING_DIR</key>
<value>"+hdfs+"/user/"+userName+"/.sparkStaging/"+applicationId+"</value>
</entry>
<entry>
<key>CLASSPATH</key>
<value>$PWD:$PWD/__spark_conf__:$PWD/__spark_libs__/*:/spark-non-hdfs-storage/spark-assembly-2.3.0-hadoop2.7/*:%HADOOP_CONF_DIR%:%HADOOP_COMMON_HOME%/share/hadoop/common/*:%HADOOP_COMMON_HOME%/share/hadoop/common/lib/*:%HADOOP_HDFS_HOME%/share/hadoop/hdfs/*:%HADOOP_HDFS_HOME%/share/hadoop/hdfs/lib/*:%HADOOP_YARN_HOME%/share/hadoop/yarn/*:%HADOOP_YARN_HOME%/share/hadoop/yarn/lib/*:%HADOOP_MAPRED_HOME%/share/hadoop/mapreduce/*:%HADOOP_MAPRED_HOME%/share/hadoop/mapreduce/lib/*:$PWD/__spark_conf__/__hadoop_conf__</value>
</entry>
<entry>
<key>SPARK_USER</key>
<value>"+userName+"</value>
</entry>
</environment>
<commands>
<command>"+command+"</command>
</commands>
</am-container-spec>
<unmanaged-AM>false</unmanaged-AM>
<max-app-attempts>1</max-app-attempts>
<resource>
<memory>5632</memory>
<vCores>1</vCores>
</resource>
<application-type>SPARK</application-type>
<keep-containers-across-application-attempts>false</keep-containers-across-application-attempts>
<application-tags>
<tag>"+sparkYarnTag+"</tag>
</application-tags>
<am-container-node-label-expression>appMngr</am-container-node-label-expression>
<log-aggregation-context/>
<attempt-failures-validity-interval>1</attempt-failures-validity-interval>
<reservation-id/>
</application-submission-context>
步骤5:
通过REST HTTP PUT提交应用程序:
private void submitApplication (String body, String userName) throws SMSparkManagerException {
HttpClient client = HttpClientBuilder.create().build();
HttpPost request = new HttpPost(uri+"?user.name="+userName);
try {
request.setEntity(new StringEntity(body, ContentType.APPLICATION_XML));
HttpResponse response = client.execute(request);
if (response.getStatusLine().getStatusCode()!=202) {
throw new SMSparkManagerException("The application could not be submitted to Yarn, response http code "+response.getStatusLine().getStatusCode());
}
} catch (UnsupportedEncodingException e) {
logger.error("The application Could not be submitted due to UnsupportedEncodingException in the provided body: "+body, e );
throw new SMSparkManagerException("Error in submitting application to yarn");
} catch (ClientProtocolException e) {
logger.error("The application Could not be submitted due to ClientProtocolException", e);
throw new SMSparkManagerException("Error in submitting application to yarn");
} catch (IOException e) {
logger.error("The application Could not be submitted due to IOException", e);
throw new SMSparkManagerException("Error in submitting application to yarn");
}
}