做试管的网站,自己做的网站怎么发布,重庆网站推广公司哪家好,单页网站制作视频教程1.SparkSubmit.scala主要调用M-prepareSubmitEnvironment#xff0c;该方法更根据用户定义的参数#xff0c;匹配不同client#xff0c;去调用不同clientApp。(ps#xff1a;本次讲ClientApp 也就是standalone)在M-runMain通过 调用M-Utils.classForName 反射的方式调用 …1.SparkSubmit.scala 主要调用M-prepareSubmitEnvironment该方法更根据用户定义的参数匹配不同client去调用不同clientApp。(ps本次讲ClientApp 也就是standalone) 在M-runMain通过 调用M-Utils.classForName 反射的方式调用 ClientApp 的 M-main ps:如果是localhost 或者是client 直接反射用户的定义的main 几种提交方式 // Following constants are visible for testing.
private[deploy] val YARN_CLUSTER_SUBMIT_CLASS org.apache.spark.deploy.yarn.YarnClusterApplication
private[deploy] val REST_CLUSTER_SUBMIT_CLASS classOf[RestSubmissionClientApp].getName()
private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS classOf[ClientApp].getName()
private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS org.apache.spark.deploy.k8s.submit.KubernetesClientApplicationprivate[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments,conf: Option[HadoopConfiguration] None): (Seq[String], Seq[String], SparkConf, String) 2.ClientApp.scala 最后driver粗粒度就是DriverWrapper 通过Rpc 发送给driver override def onStart(): Unit {driverArgs.cmd match {case launch val mainClass org.apache.spark.deploy.worker.DriverWrapperasyncSendToMasterAndForwardReply[SubmitDriverResponse](RequestSubmitDriver(driverDescription)) 3.Master.scala master 接受之后放入map缓存中调用M-schedule根据资源选择一个work向该work发送启动LaunchDriver的消息 case RequestSubmitDriver(description) if (state ! RecoveryState.ALIVE) {val msg s${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. Can only accept driver submissions in ALIVE state.context.reply(SubmitDriverResponse(self, false, None, msg))} else {logInfo(Driver submitted description.command.mainClass)val driver createDriver(description)persistenceEngine.addDriver(driver)waitingDrivers driverdrivers.add(driver)schedule()// TODO: It might be good to instead have the submission client poll the master to determine// the current status of the driver. For now its simply fire and forget.context.reply(SubmitDriverResponse(self, true, Some(driver.id),sDriver successfully submitted as ${driver.id}))}
}private def schedule(): Unit {if (state ! RecoveryState.ALIVE) {return}// Drivers take strict precedence over executorsval shuffledAliveWorkers Random.shuffle(workers.toSeq.filter(_.state WorkerState.ALIVE))val numWorkersAlive shuffledAliveWorkers.sizevar curPos 0for (driver - waitingDrivers.toList) { // iterate over a copy of waitingDrivers// We assign workers to each waiting driver in a round-robin fashion. For each driver, we// start from the last worker that was assigned a driver, and continue onwards until we have// explored all alive workers.var launched falsevar numWorkersVisited 0while (numWorkersVisited numWorkersAlive !launched) {val worker shuffledAliveWorkers(curPos)numWorkersVisited 1if (worker.memoryFree driver.desc.mem worker.coresFree driver.desc.cores) {launchDriver(worker, driver)waitingDrivers - driverlaunched true}curPos (curPos 1) % numWorkersAlive}}startExecutorsOnWorkers()
}private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {logInfo(Launching driver driver.id on worker worker.id)worker.addDriver(driver)driver.worker Some(worker)worker.endpoint.send(LaunchDriver(driver.id, driver.desc))driver.state DriverState.RUNNING
} 4.Work.scala work接受消息之后new DriverRunner() 调用该对象的M-start case LaunchDriver(driverId, driverDesc) logInfo(sAsked to launch driver $driverId)val driver new DriverRunner(conf,driverId,workDir,sparkHome,driverDesc.copy(command Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),self,workerUri,securityMgr)drivers(driverId) driverdriver.start() 5.DriverRunner.scala 该对象中M-start 中new 了一个线程调用prepareAndRunDriver 最后通过 ProcessBuilder调用 DriverWrapper 的mainstep2中的 private[worker] def start() {new Thread(DriverRunner for driverId) {override def run() {var shutdownHook: AnyRef nulltry {shutdownHook ShutdownHookManager.addShutdownHook { () logInfo(sWorker shutting down, killing driver $driverId)kill()}// prepare driver jars and run driverval exitCode prepareAndRunDriver()// set final state depending on if forcibly killed and process exit codefinalState if (exitCode 0) {Some(DriverState.FINISHED)} else if (killed) {Some(DriverState.KILLED)} else {Some(DriverState.FAILED)}} catch {case e: Exception kill()finalState Some(DriverState.ERROR)finalException Some(e)} finally {if (shutdownHook ! null) {ShutdownHookManager.removeShutdownHook(shutdownHook)}}// notify worker of final driver state, possible exceptionworker.send(DriverStateChanged(driverId, finalState.get, finalException))}}.start()
}private[worker] def prepareAndRunDriver(): Int {val driverDir createWorkingDirectory()val localJarFilename downloadUserJar(driverDir)def substituteVariables(argument: String): String argument match {case {{WORKER_URL}} workerUrlcase {{USER_JAR}} localJarFilenamecase other other}// TODO: If we add ability to submit multiple jars they should also be added hereval builder CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)runDriver(builder, driverDir, driverDesc.supervise)
}private[worker] def prepareAndRunDriver(): Int {val driverDir createWorkingDirectory()val localJarFilename downloadUserJar(driverDir)def substituteVariables(argument: String): String argument match {case {{WORKER_URL}} workerUrlcase {{USER_JAR}} localJarFilenamecase other other}// TODO: If we add ability to submit multiple jars they should also be added hereval builder CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)runDriver(builder, driverDir, driverDesc.supervise)
}6.DriverWrapper.scala 粗粒度Driver client)
开始调用用户指定 jar 和main 真正开始执行我们所写的代码
def main(args: Array[String]) {args.toList match {/** IMPORTANT: Spark 1.3 provides a stable application submission gateway that is both* backward and forward compatible across future Spark versions. Because this gateway* uses this class to launch the driver, the ordering and semantics of the arguments* here must also remain consistent across versions.*/case workerUrl :: userJar :: mainClass :: extraArgs val conf new SparkConf()val host: String Utils.localHostName()val port: Int sys.props.getOrElse(spark.driver.port, 0).toIntval rpcEnv RpcEnv.create(Driver, host, port, conf, new SecurityManager(conf))logInfo(sDriver address: ${rpcEnv.address})rpcEnv.setupEndpoint(workerWatcher, new WorkerWatcher(rpcEnv, workerUrl))val currentLoader Thread.currentThread.getContextClassLoaderval userJarUrl new File(userJar).toURI().toURL()val loader if (sys.props.getOrElse(spark.driver.userClassPathFirst, false).toBoolean) {new ChildFirstURLClassLoader(Array(userJarUrl), currentLoader)} else {new MutableURLClassLoader(Array(userJarUrl), currentLoader)}Thread.currentThread.setContextClassLoader(loader)setupDependencies(loader, userJar)// Delegate to supplied main classval clazz Utils.classForName(mainClass)val mainMethod clazz.getMethod(main, classOf[Array[String]])mainMethod.invoke(null, extraArgs.toArray[String])rpcEnv.shutdown()case _ // scalastyle:off printlnSystem.err.println(Usage: DriverWrapper workerUrl userJar driverMainClass [options])// scalastyle:on printlnSystem.exit(-1)}
} 转载于:https://www.cnblogs.com/chouc/p/10885678.html