Yarn Cluster模式下Spark创建ApplicationMaster源码解析
Spark创建ApplicationMaster源码解析
原文链接:https://www.leahy.club/archives/spark-core-applicationmaster
源文件:SparkSubmit.scala
SparkSubmit是一个伴生对象,可以静态地访问其属性和方法。SparkSubmit是Spark程序运行起来之后或者打开Spark Shell之后启动的第一个进程。可以通过jps查看,后台中是存在SparkSubmit进程的。
针对Yarn集群部署的cluster模式
-- SparkSubmit.scala
//启动进程
-- main(args:Array[String]) //args参数就是我们提交应用程序时提交的参数
-- new SparkSubmitArguments() //对args进行解析、封装
-- submit() //提交
-- prepareSubmitEnvironment //准备提交环境
-- childMainClass = "org.apache.spark.deploy.yarn.Client" //Yarn集群部署cluster模式
-- childMainClass = orgs.mainClass //Yarn集群部署client模式,那么这个class实际就是我们在提交spark任务时配置的主类
-- doRunMain(runMain()) //运行主程序
-- mainClass = Utils.classForName(childMainClass) //反射加载类,Yarn client的主类或者提交的任务的主类
-- mainclass.getMethod("main", new Array[String](0).getClass) //反射获取main方法
-- mainMethod.invoke() //反射执行main方法
-- Client.scala //运行mainMethod.invoke()之后
-- main()
-- new ClientArguments(argStrings) //封装运行参数
-- new Client() //创建client对象
-- yarnClient = YarnClient.createYarnClient()
-- client.run()
-- submitApplication() //运行client并提交应用
// 封装指令 command = bin/java org.apache.spark.deploy.yarn.ApplicationMaster (cluster模式)
// 或者 command = bin/java org.apache.spark.deploy.yarn.ExecutorLauncher (client模式)
-- createContainerLaunchContext
-- createApplicationSubmissionContext
-- yarnClient.submitApplication(appContext) //Yarn客户端向Yarn提交应用(实质上是提交给RM了)
结合源码详解:
Spark应用程序的入口是
SparkSubmit.scala
,其main方法如下所示:override def main(args: Array[String]): Unit = { // 对提交的应用程序的参数进行封装 val appArgs = new SparkSubmitArguments(args) if (appArgs.verbose) { // scalastyle:off println //打印参数信息 printStream.println(appArgs) // scalastyle:on println } appArgs.action match { //对appArgs中的action参数进行模式匹配 case SparkSubmitAction.SUBMIT => submit(appArgs) case SparkSubmitAction.KILL => kill(appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) } }
首先来看看
main
方法中的SparkSubmitArguments.scala
,如下代码所示。在参数配置类中可以看到master
:设置集群管理方式;executorCores
:设置Executor的核心数等参数。private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env) extends SparkSubmitArgumentsParser { var master: String = null var deployMode: String = null var executorMemory: String = null var executorCores: String = null var totalExecutorCores: String = null ....
接下来看看
SparkSubmitAction.SUBMIT
模式对应的submit
方法具体内容。如下代码所示。其中关键的看runMain
方法,因为代码无论怎么执行都会进入这个方法。private def submit(args: SparkSubmitArguments): Unit = { //准备submit的参数 val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) def doRunMain(): Unit = { if (args.proxyUser != null) { val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, UserGroupInformation.getCurrentUser()) try { proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) } }) } catch { case e: Exception => // Hadoop's AuthorizationException suppresses the exception's stack trace, which // makes the message printed to the output by the JVM not very helpful. Instead, // detect exceptions with empty stack traces here, and treat them differently. if (e.getStackTrace().length == 0) { // scalastyle:off println printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}") // scalastyle:on println exitFn(1) } else { throw e } } } else { runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) } } // In standalone cluster mode, there are two submission gateways: // (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper // (2) The new REST-based gateway introduced in Spark 1.3 // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over // to use the legacy gateway if the master endpoint turns out to be not a REST server. if (args.isStandaloneCluster && args.useRest) { try { // scalastyle:off println printStream.println("Running Spark using the REST application submission protocol.") // scalastyle:on println doRunMain() } catch { // Fail over to use the legacy submission gateway case e: SubmitRestConnectionException => printWarning(s"Master endpoint ${args.master} was not a REST server. " + "Falling back to legacy submission gateway instead.") args.useRest = false submit(args) } // In all other modes, just run the main class as prepared } else { doRunMain() } }
runMain
方法中最核心的代码是利用反射来获取args
参数中获取得到的childMainClass
的具体类型,并获取其类加载器和main方法。所以需要进入到prepareSubmitEnvironment
中再次查看childMainClass
类具体是什么类。private def runMain( childArgs: Seq[String], childClasspath: Seq[String], sysProps: Map[String, String], childMainClass: String, verbose: Boolean): Unit = { //获取类加载器 val loader = if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) { new ChildFirstURLClassLoader(new Array[URL](0), Thread.currentThread.getContextClassLoader) } else { new MutableURLClassLoader(new Array[URL](0), Thread.currentThread.getContextClassLoader) } Thread.currentThread.setContextClassLoader(loader) try { //获取childMainClass的实体 mainClass = Utils.classForName(childMainClass) //获取main方法 val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass) //运行main方法 try { mainMethod.invoke(null, childArgs.toArray)
进入到
prepareSubmitEnvironment
中可以看到,关于childMainClass
有一系列的判断。如果采用Yarn的Client模式那么对应的
childMainClass
就是args.mainClass
,在Cluster模式下就是"org.apache.spark.deploy.yarn.Client"。private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments) : (Seq[String], Seq[String], Map[String, String], String) = { // Return values val childArgs = new ArrayBuffer[String]() val childClasspath = new ArrayBuffer[String]() val sysProps = new HashMap[String, String]() var childMainClass = "" ... //判断部署模式,如果是client模式就是直接赋值为args.mainClass if (deployMode == CLIENT || isYarnCluster) { childMainClass = args.mainClass if (isUserJar(args.primaryResource)) { childClasspath += args.primaryResource } if (args.jars != null) { childClasspath ++= args.jars.split(",") } } ... // if (isYarnCluster) { childMainClass = "org.apache.spark.deploy.yarn.Client"
由于我们讨论的是Yarn的Cluster部署模式,在反射获取到Client类之后,就会被invoke运行。那么我们下一步进入到org.apache.spark.deploy.yarn.Client类中。从下代码中可以看出有一个
ClientArgument
方法进行参数封装,类似于SparkSubmitArgument
。之后会运行CLient
的run
方法。下面我们再进入到run方法中。private object Client extends Logging { def main(argStrings: Array[String]) { if (!sys.props.contains("SPARK_SUBMIT")) { logWarning("WARNING: This client is deprecated and will be removed in a " + "future version of Spark. Use ./bin/spark-submit with \"--master yarn\"") } // Set an env variable indicating we are running in YARN mode. // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes System.setProperty("SPARK_YARN_MODE", "true") val sparkConf = new SparkConf // SparkSubmit would use yarn cache to distribute files & jars in yarn mode, // so remove them from sparkConf here for yarn mode. sparkConf.remove("spark.jars") sparkConf.remove("spark.files") //参数封装 val args = new ClientArguments(argStrings) //运行client new Client(args, sparkConf).run() }
run
方法中的第一步是submitApplication
方法,我们再进入。def run(): Unit = { this.appId = submitApplication()
下面的这段代码非常关键:
在
createContainerLaunchContext
中会新建一个Container并在Container中新建一个AM进程。def submitApplication(): ApplicationId = { var appId: ApplicationId = null ... // Set up the appropriate contexts to launch our AM //启动AM val containerContext = createContainerLaunchContext(newAppResponse) val appContext = createApplicationSubmissionContext(newApp, containerContext) //提交任务到RM中 yarnClient.submitApplication(appContext)
如下是
createContainerLaunchContext
中的代码。可以看到在Cluster模式下,AM就是ApplicationMaster类,Client模式下就是Executor类。
private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse) : ContainerLaunchContext = { val appId = newAppResponse.getApplicationId //一个Container运行一个JVM进程,设置JVM运行参数,可以看到采用了CMS GC if (useConcurrentAndIncrementalGC) { // In our expts, using (default) throughput collector has severe perf ramifications in // multi-tenant machines javaOpts += "-XX:+UseConcMarkSweepGC" javaOpts += "-XX:MaxTenuringThreshold=31" javaOpts += "-XX:SurvivorRatio=8" javaOpts += "-XX:+CMSIncrementalMode" javaOpts += "-XX:+CMSIncrementalPacing" javaOpts += "-XX:CMSIncrementalDutyCycleMin=0" javaOpts += "-XX:CMSIncrementalDutyCycle=10" } ... //反射获取AM的类 val amClass = if (isClusterMode) { Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName } else { Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName }
AM是一个进程,启动AM之后的代码下一节分析。