WritingYarnApplications.apt.vm 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794
  1. ~~ Licensed under the Apache License, Version 2.0 (the "License");
  2. ~~ you may not use this file except in compliance with the License.
  3. ~~ You may obtain a copy of the License at
  4. ~~
  5. ~~ http://www.apache.org/licenses/LICENSE-2.0
  6. ~~
  7. ~~ Unless required by applicable law or agreed to in writing, software
  8. ~~ distributed under the License is distributed on an "AS IS" BASIS,
  9. ~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. ~~ See the License for the specific language governing permissions and
  11. ~~ limitations under the License. See accompanying LICENSE file.
  12. ---
  13. Hadoop Map Reduce Next Generation-${project.version} - Writing YARN
  14. Applications
  15. ---
  16. ---
  17. ${maven.build.timestamp}
  18. Hadoop MapReduce Next Generation - Writing YARN Applications
  19. \[ {{{./index.html}Go Back}} \]
  20. %{toc|section=1|fromDepth=0}
  21. * Purpose
  22. This document describes, at a high-level, the way to implement new
  23. Applications for YARN.
  24. * Concepts and Flow
  25. The general concept is that an 'Application Submission Client' submits an
  26. 'Application' to the YARN Resource Manager. The client communicates with the
  27. ResourceManager using the 'ClientRMProtocol' to first acquire a new
  28. 'ApplicationId' if needed via ClientRMProtocol#getNewApplication and then
  29. submit the 'Application' to be run via ClientRMProtocol#submitApplication. As
  30. part of the ClientRMProtocol#submitApplication call, the client needs to
  31. provide sufficient information to the ResourceManager to 'launch' the
  32. application's first container i.e. the ApplicationMaster.
  33. You need to provide information such as the details about the local
  34. files/jars that need to be available for your application to run, the actual
  35. command that needs to be executed (with the necessary command line arguments),
  36. any Unix environment settings (optional), etc. Effectively, you need to
  37. describe the Unix process(es) that needs to be launched for your
  38. ApplicationMaster.
  39. The YARN ResourceManager will then launch the ApplicationMaster (as specified)
  40. on an allocated container. The ApplicationMaster is then expected to
  41. communicate with the ResourceManager using the 'AMRMProtocol'. Firstly, the
  42. ApplicationMaster needs to register itself with the ResourceManager. To
  43. complete the task assigned to it, the ApplicationMaster can then request for
  44. and receive containers via AMRMProtocol#allocate. After a container is
  45. allocated to it, the ApplicationMaster communicates with the NodeManager using
  46. ContainerManager#startContainer to launch the container for its task. As part
  47. of launching this container, the ApplicationMaster has to specify the
  48. ContainerLaunchContext which, similar to the ApplicationSubmissionContext,
  49. has the launch information such as command line specification, environment,
  50. etc. Once the task is completed, the ApplicationMaster has to signal the
  51. ResourceManager of its completion via the AMRMProtocol#finishApplicationMaster.
  52. Meanwhile, the client can monitor the application's status by querying the
  53. ResourceManager or by directly querying the ApplicationMaster if it supports
  54. such a service. If needed, it can also kill the application via
  55. ClientRMProtocol#forceKillApplication.
  56. * Interfaces
  57. The interfaces you'd most like be concerned with are:
  58. * ClientRMProtocol - Client\<--\>ResourceManager\
  59. The protocol for a client that wishes to communicate with the
  60. ResourceManager to launch a new application (i.e. the ApplicationMaster),
  61. check on the status of the application or kill the application. For example,
  62. a job-client (a job launching program from the gateway) would use this
  63. protocol.
  64. * AMRMProtocol - ApplicationMaster\<--\>ResourceManager\
  65. The protocol used by the ApplicationMaster to register/unregister itself
  66. to/from the ResourceManager as well as to request for resources from the
  67. Scheduler to complete its tasks.
  68. * ContainerManager - ApplicationMaster\<--\>NodeManager\
  69. The protocol used by the ApplicationMaster to talk to the NodeManager to
  70. start/stop containers and get status updates on the containers if needed.
  71. * Writing a Simple Yarn Application
  72. ** Writing a simple Client
  73. * The first step that a client needs to do is to connect to the
  74. ResourceManager or to be more specific, the ApplicationsManager (AsM)
  75. interface of the ResourceManager.
  76. +---+
  77. ClientRMProtocol applicationsManager;
  78. YarnConfiguration yarnConf = new YarnConfiguration(conf);
  79. InetSocketAddress rmAddress =
  80. NetUtils.createSocketAddr(yarnConf.get(
  81. YarnConfiguration.RM_ADDRESS,
  82. YarnConfiguration.DEFAULT_RM_ADDRESS));
  83. LOG.info("Connecting to ResourceManager at " + rmAddress);
  84. configuration appsManagerServerConf = new Configuration(conf);
  85. appsManagerServerConf.setClass(
  86. YarnConfiguration.YARN_SECURITY_INFO,
  87. ClientRMSecurityInfo.class, SecurityInfo.class);
  88. applicationsManager = ((ClientRMProtocol) rpc.getProxy(
  89. ClientRMProtocol.class, rmAddress, appsManagerServerConf));
  90. +---+
  91. * Once a handle is obtained to the ASM, the client needs to request the
  92. ResourceManager for a new ApplicationId.
  93. +---+
  94. GetNewApplicationRequest request =
  95. Records.newRecord(GetNewApplicationRequest.class);
  96. GetNewApplicationResponse response =
  97. applicationsManager.getNewApplication(request);
  98. LOG.info("Got new ApplicationId=" + response.getApplicationId());
  99. +---+
  100. * The response from the ASM for a new application also contains information
  101. about the cluster such as the minimum/maximum resource capabilities of the
  102. cluster. This is required so that to ensure that you can correctly set the
  103. specifications of the container in which the ApplicationMaster would be
  104. launched. Please refer to GetNewApplicationResponse for more details.
  105. * The main crux of a client is to setup the ApplicationSubmissionContext
  106. which defines all the information needed by the ResourceManager to launch
  107. the ApplicationMaster. A client needs to set the following into the context:
  108. * Application Info: id, name
  109. * Queue, Priority info: Queue to which the application will be submitted,
  110. the priority to be assigned for the application.
  111. * User: The user submitting the application
  112. * ContainerLaunchContext: The information defining the container in which
  113. the ApplicationMaster will be launched and run. The
  114. ContainerLaunchContext, as mentioned previously, defines all the required
  115. information needed to run the ApplicationMaster such as the local
  116. resources (binaries, jars, files etc.), security tokens, environment
  117. settings (CLASSPATH etc.) and the command to be executed.
  118. []
  119. +---+
  120. // Create a new ApplicationSubmissionContext
  121. ApplicationSubmissionContext appContext =
  122. Records.newRecord(ApplicationSubmissionContext.class);
  123. // set the ApplicationId
  124. appContext.setApplicationId(appId);
  125. // set the application name
  126. appContext.setApplicationName(appName);
  127. // Create a new container launch context for the AM's container
  128. ContainerLaunchContext amContainer =
  129. Records.newRecord(ContainerLaunchContext.class);
  130. // Define the local resources required
  131. Map<String, LocalResource> localResources =
  132. new HashMap<String, LocalResource>();
  133. // Lets assume the jar we need for our ApplicationMaster is available in
  134. // HDFS at a certain known path to us and we want to make it available to
  135. // the ApplicationMaster in the launched container
  136. Path jarPath; // <- known path to jar file
  137. FileStatus jarStatus = fs.getFileStatus(jarPath);
  138. LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
  139. // Set the type of resource - file or archive
  140. // archives are untarred at the destination by the framework
  141. amJarRsrc.setType(LocalResourceType.FILE);
  142. // Set visibility of the resource
  143. // Setting to most private option i.e. this file will only
  144. // be visible to this instance of the running application
  145. amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
  146. // Set the location of resource to be copied over into the
  147. // working directory
  148. amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
  149. // Set timestamp and length of file so that the framework
  150. // can do basic sanity checks for the local resource
  151. // after it has been copied over to ensure it is the same
  152. // resource the client intended to use with the application
  153. amJarRsrc.setTimestamp(jarStatus.getModificationTime());
  154. amJarRsrc.setSize(jarStatus.getLen());
  155. // The framework will create a symlink called AppMaster.jar in the
  156. // working directory that will be linked back to the actual file.
  157. // The ApplicationMaster, if needs to reference the jar file, would
  158. // need to use the symlink filename.
  159. localResources.put("AppMaster.jar", amJarRsrc);
  160. // Set the local resources into the launch context
  161. amContainer.setLocalResources(localResources);
  162. // Set up the environment needed for the launch context
  163. Map<String, String> env = new HashMap<String, String>();
  164. // For example, we could setup the classpath needed.
  165. // Assuming our classes or jars are available as local resources in the
  166. // working directory from which the command will be run, we need to append
  167. // "." to the path.
  168. // By default, all the hadoop specific classpaths will already be available
  169. // in $CLASSPATH, so we should be careful not to overwrite it.
  170. String classPathEnv = "$CLASSPATH:./*:";
  171. env.put("CLASSPATH", classPathEnv);
  172. amContainer.setEnvironment(env);
  173. // Construct the command to be executed on the launched container
  174. String command =
  175. "${JAVA_HOME}" + /bin/java" +
  176. " MyAppMaster" +
  177. " arg1 arg2 arg3" +
  178. " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
  179. " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr";
  180. List<String> commands = new ArrayList<String>();
  181. commands.add(command);
  182. // add additional commands if needed
  183. // Set the command array into the container spec
  184. amContainer.setCommands(commands);
  185. // Define the resource requirements for the container
  186. // For now, YARN only supports memory so we set the memory
  187. // requirements.
  188. // If the process takes more than its allocated memory, it will
  189. // be killed by the framework.
  190. // Memory being requested for should be less than max capability
  191. // of the cluster and all asks should be a multiple of the min capability.
  192. Resource capability = Records.newRecord(Resource.class);
  193. capability.setMemory(amMemory);
  194. amContainer.setResource(capability);
  195. // Set the container launch content into the ApplicationSubmissionContext
  196. appContext.setAMContainerSpec(amContainer);
  197. +---+
  198. * After the setup process is complete, the client is finally ready to submit
  199. the application to the ASM.
  200. +---+
  201. // Create the request to send to the ApplicationsManager
  202. SubmitApplicationRequest appRequest =
  203. Records.newRecord(SubmitApplicationRequest.class);
  204. appRequest.setApplicationSubmissionContext(appContext);
  205. // Submit the application to the ApplicationsManager
  206. // Ignore the response as either a valid response object is returned on
  207. // success or an exception thrown to denote the failure
  208. applicationsManager.submitApplication(appRequest);
  209. +---+
  210. * At this point, the ResourceManager will have accepted the application and
  211. in the background, will go through the process of allocating a container
  212. with the required specifications and then eventually setting up and
  213. launching the ApplicationMaster on the allocated container.
  214. * There are multiple ways a client can track progress of the actual task.
  215. * It can communicate with the ResourceManager and request for a report of
  216. the application via ClientRMProtocol#getApplicationReport.
  217. +-----+
  218. GetApplicationReportRequest reportRequest =
  219. Records.newRecord(GetApplicationReportRequest.class);
  220. reportRequest.setApplicationId(appId);
  221. GetApplicationReportResponse reportResponse =
  222. applicationsManager.getApplicationReport(reportRequest);
  223. ApplicationReport report = reportResponse.getApplicationReport();
  224. +-----+
  225. The ApplicationReport received from the ResourceManager consists of the following:
  226. * General application information: ApplicationId, queue to which the
  227. application was submitted, user who submitted the application and the
  228. start time for the application.
  229. * ApplicationMaster details: the host on which the ApplicationMaster is
  230. running, the rpc port (if any) on which it is listening for requests
  231. from clients and a token that the client needs to communicate with
  232. the ApplicationMaster.
  233. * Application tracking information: If the application supports some
  234. form of progress tracking, it can set a tracking url which is
  235. available via ApplicationReport#getTrackingUrl that a client can look
  236. at to monitor progress.
  237. * ApplicationStatus: The state of the application as seen by the
  238. ResourceManager is available via
  239. ApplicationReport#getYarnApplicationState. If the
  240. YarnApplicationState is set to FINISHED, the client should refer to
  241. ApplicationReport#getFinalApplicationStatus to check for the actual
  242. success/failure of the application task itself. In case of failures,
  243. ApplicationReport#getDiagnostics may be useful to shed some more
  244. light on the the failure.
  245. * If the ApplicationMaster supports it, a client can directly query the
  246. ApplicationMaster itself for progress updates via the host:rpcport
  247. information obtained from the ApplicationReport. It can also use the
  248. tracking url obtained from the report if available.
  249. * In certain situations, if the application is taking too long or due to
  250. other factors, the client may wish to kill the application. The
  251. ClientRMProtocol supports the forceKillApplication call that allows a
  252. client to send a kill signal to the ApplicationMaster via the
  253. ResourceManager. An ApplicationMaster if so designed may also support an
  254. abort call via its rpc layer that a client may be able to leverage.
  255. +---+
  256. KillApplicationRequest killRequest =
  257. Records.newRecord(KillApplicationRequest.class);
  258. killRequest.setApplicationId(appId);
  259. applicationsManager.forceKillApplication(killRequest);
  260. +---+
  261. ** Writing an ApplicationMaster
  262. * The ApplicationMaster is the actual owner of the job. It will be launched
  263. by the ResourceManager and via the client will be provided all the necessary
  264. information and resources about the job that it has been tasked with to
  265. oversee and complete.
  266. * As the ApplicationMaster is launched within a container that may (likely
  267. will) be sharing a physical host with other containers, given the
  268. multi-tenancy nature, amongst other issues, it cannot make any assumptions
  269. of things like pre-configured ports that it can listen on.
  270. * When the ApplicationMaster starts up, several parameters are made available
  271. to it via the environment. These include the ContainerId for the
  272. ApplicationMaster container, the application submission time and details
  273. about the NodeManager host running the Application Master.
  274. Ref ApplicationConstants for parameter names.
  275. * All interactions with the ResourceManager require an ApplicationAttemptId
  276. (there can be multiple attempts per application in case of failures). The
  277. ApplicationAttemptId can be obtained from the ApplicationMaster
  278. containerId. There are helper apis to convert the value obtained from the
  279. environment into objects.
  280. +---+
  281. Map<String, String> envs = System.getenv();
  282. String containerIdString =
  283. envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
  284. if (containerIdString == null) {
  285. // container id should always be set in the env by the framework
  286. throw new IllegalArgumentException(
  287. "ContainerId not set in the environment");
  288. }
  289. ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
  290. ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
  291. +---+
  292. * After an ApplicationMaster has initialized itself completely, it needs to
  293. register with the ResourceManager via
  294. AMRMProtocol#registerApplicationMaster. The ApplicationMaster always
  295. communicate via the Scheduler interface of the ResourceManager.
  296. +---+
  297. // Connect to the Scheduler of the ResourceManager.
  298. YarnConfiguration yarnConf = new YarnConfiguration(conf);
  299. InetSocketAddress rmAddress =
  300. NetUtils.createSocketAddr(yarnConf.get(
  301. YarnConfiguration.RM_SCHEDULER_ADDRESS,
  302. YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
  303. LOG.info("Connecting to ResourceManager at " + rmAddress);
  304. AMRMProtocol resourceManager =
  305. (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf);
  306. // Register the AM with the RM
  307. // Set the required info into the registration request:
  308. // ApplicationAttemptId,
  309. // host on which the app master is running
  310. // rpc port on which the app master accepts requests from the client
  311. // tracking url for the client to track app master progress
  312. RegisterApplicationMasterRequest appMasterRequest =
  313. Records.newRecord(RegisterApplicationMasterRequest.class);
  314. appMasterRequest.setApplicationAttemptId(appAttemptID);
  315. appMasterRequest.setHost(appMasterHostname);
  316. appMasterRequest.setRpcPort(appMasterRpcPort);
  317. appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
  318. // The registration response is useful as it provides information about the
  319. // cluster.
  320. // Similar to the GetNewApplicationResponse in the client, it provides
  321. // information about the min/mx resource capabilities of the cluster that
  322. // would be needed by the ApplicationMaster when requesting for containers.
  323. RegisterApplicationMasterResponse response =
  324. resourceManager.registerApplicationMaster(appMasterRequest);
  325. +---+
  326. * The ApplicationMaster has to emit heartbeats to the ResourceManager to keep
  327. it informed that the ApplicationMaster is alive and still running. The
  328. timeout expiry interval at the ResourceManager is defined by a config
  329. setting accessible via YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS with the
  330. default being defined by YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS.
  331. The AMRMProtocol#allocate calls to the ResourceManager count as heartbeats
  332. as it also supports sending progress update information. Therefore, an
  333. allocate call with no containers requested and progress information updated
  334. if any is a valid way for making heartbeat calls to the ResourceManager.
  335. * Based on the task requirements, the ApplicationMaster can ask for a set of
  336. containers to run its tasks on. The ApplicationMaster has to use the
  337. ResourceRequest class to define the following container specifications:
  338. * Hostname: If containers are required to be hosted on a particular rack or
  339. a specific host. '*' is a special value that implies any host will do.
  340. * Resource capability: Currently, YARN only supports memory based resource
  341. requirements so the request should define how much memory is needed. The
  342. value is defined in MB and has to less than the max capability of the
  343. cluster and an exact multiple of the min capability. Memory resources
  344. correspond to physical memory limits imposed on the task containers.
  345. * Priority: When asking for sets of containers, an ApplicationMaster may
  346. define different priorities to each set. For example, the Map-Reduce
  347. ApplicationMaster may assign a higher priority to containers needed
  348. for the Map tasks and a lower priority for the Reduce tasks' containers.
  349. []
  350. +----+
  351. // Resource Request
  352. ResourceRequest rsrcRequest = Records.newRecord(ResourceRequest.class);
  353. // setup requirements for hosts
  354. // whether a particular rack/host is needed
  355. // useful for applications that are sensitive
  356. // to data locality
  357. rsrcRequest.setHostName("*");
  358. // set the priority for the request
  359. Priority pri = Records.newRecord(Priority.class);
  360. pri.setPriority(requestPriority);
  361. rsrcRequest.setPriority(pri);
  362. // Set up resource type requirements
  363. // For now, only memory is supported so we set memory requirements
  364. Resource capability = Records.newRecord(Resource.class);
  365. capability.setMemory(containerMemory);
  366. rsrcRequest.setCapability(capability);
  367. // set no. of containers needed
  368. // matching the specifications
  369. rsrcRequest.setNumContainers(numContainers);
  370. +---+
  371. * After defining the container requirements, the ApplicationMaster has to
  372. construct an AllocateRequest to send to the ResourceManager.
  373. The AllocateRequest consists of:
  374. * Requested containers: The container specifications and the no. of
  375. containers being requested for by the ApplicationMaster from the
  376. ResourceManager.
  377. * Released containers: There may be situations when the ApplicationMaster
  378. may have requested for more containers that it needs or due to failure
  379. issues, decide to use other containers allocated to it. In all such
  380. situations, it is beneficial to the cluster if the ApplicationMaster
  381. releases these containers back to the ResourceManager so that they can be
  382. re-allocated to other applications.
  383. * ResponseId: The response id that will be sent back in the response from
  384. the allocate call.
  385. * Progress update information: The ApplicationMaster can send its progress
  386. update (range between to 0 to 1) to the ResourceManager.
  387. []
  388. +---+
  389. List<ResourceRequest> requestedContainers;
  390. List<ContainerId> releasedContainers
  391. AllocateRequest req = Records.newRecord(AllocateRequest.class);
  392. // The response id set in the request will be sent back in
  393. // the response so that the ApplicationMaster can
  394. // match it to its original ask and act appropriately.
  395. req.setResponseId(rmRequestID);
  396. // Set ApplicationAttemptId
  397. req.setApplicationAttemptId(appAttemptID);
  398. // Add the list of containers being asked for
  399. req.addAllAsks(requestedContainers);
  400. // If the ApplicationMaster has no need for certain
  401. // containers due to over-allocation or for any other
  402. // reason, it can release them back to the ResourceManager
  403. req.addAllReleases(releasedContainers);
  404. // Assuming the ApplicationMaster can track its progress
  405. req.setProgress(currentProgress);
  406. AllocateResponse allocateResponse = resourceManager.allocate(req);
  407. +---+
  408. * The AllocateResponse sent back from the ResourceManager provides the
  409. following information via the AMResponse object:
  410. * Reboot flag: For scenarios when the ApplicationMaster may get out of sync
  411. with the ResourceManager.
  412. * Allocated containers: The containers that have been allocated to the
  413. ApplicationMaster.
  414. * Headroom: Headroom for resources in the cluster. Based on this information
  415. and knowing its needs, an ApplicationMaster can make intelligent decisions
  416. such as re-prioritizing sub-tasks to take advantage of currently allocated
  417. containers, bailing out faster if resources are not becoming available
  418. etc.
  419. * Completed containers: Once an ApplicationMaster triggers a launch an
  420. allocated container, it will receive an update from the ResourceManager
  421. when the container completes. The ApplicationMaster can look into the
  422. status of the completed container and take appropriate actions such as
  423. re-trying a particular sub-task in case of a failure.
  424. []
  425. One thing to note is that containers will not be immediately allocated to
  426. the ApplicationMaster. This does not imply that the ApplicationMaster should
  427. keep on asking the pending count of required containers. Once an allocate
  428. request has been sent, the ApplicationMaster will eventually be allocated
  429. the containers based on cluster capacity, priorities and the scheduling
  430. policy in place. The ApplicationMaster should only request for containers
  431. again if and only if its original estimate changed and it needs additional
  432. containers.
  433. +---+
  434. // Get AMResponse from AllocateResponse
  435. AMResponse amResp = allocateResponse.getAMResponse();
  436. // Retrieve list of allocated containers from the response
  437. // and on each allocated container, lets assume we are launching
  438. // the same job.
  439. List<Container> allocatedContainers = amResp.getAllocatedContainers();
  440. for (Container allocatedContainer : allocatedContainers) {
  441. LOG.info("Launching shell command on a new container."
  442. + ", containerId=" + allocatedContainer.getId()
  443. + ", containerNode=" + allocatedContainer.getNodeId().getHost()
  444. + ":" + allocatedContainer.getNodeId().getPort()
  445. + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
  446. + ", containerState" + allocatedContainer.getState()
  447. + ", containerResourceMemory"
  448. + allocatedContainer.getResource().getMemory());
  449. // Launch and start the container on a separate thread to keep the main
  450. // thread unblocked as all containers may not be allocated at one go.
  451. LaunchContainerRunnable runnableLaunchContainer =
  452. new LaunchContainerRunnable(allocatedContainer);
  453. Thread launchThread = new Thread(runnableLaunchContainer);
  454. launchThreads.add(launchThread);
  455. launchThread.start();
  456. }
  457. // Check what the current available resources in the cluster are
  458. Resource availableResources = amResp.getAvailableResources();
  459. // Based on this information, an ApplicationMaster can make appropriate
  460. // decisions
  461. // Check the completed containers
  462. // Let's assume we are keeping a count of total completed containers,
  463. // containers that failed and ones that completed successfully.
  464. List<ContainerStatus> completedContainers =
  465. amResp.getCompletedContainersStatuses();
  466. for (ContainerStatus containerStatus : completedContainers) {
  467. LOG.info("Got container status for containerID= "
  468. + containerStatus.getContainerId()
  469. + ", state=" + containerStatus.getState()
  470. + ", exitStatus=" + containerStatus.getExitStatus()
  471. + ", diagnostics=" + containerStatus.getDiagnostics());
  472. int exitStatus = containerStatus.getExitStatus();
  473. if (0 != exitStatus) {
  474. // container failed
  475. // -100 is a special case where the container
  476. // was aborted/pre-empted for some reason
  477. if (-100 != exitStatus) {
  478. // application job on container returned a non-zero exit code
  479. // counts as completed
  480. numCompletedContainers.incrementAndGet();
  481. numFailedContainers.incrementAndGet();
  482. }
  483. else {
  484. // something else bad happened
  485. // app job did not complete for some reason
  486. // we should re-try as the container was lost for some reason
  487. // decrementing the requested count so that we ask for an
  488. // additional one in the next allocate call.
  489. numRequestedContainers.decrementAndGet();
  490. // we do not need to release the container as that has already
  491. // been done by the ResourceManager/NodeManager.
  492. }
  493. }
  494. else {
  495. // nothing to do
  496. // container completed successfully
  497. numCompletedContainers.incrementAndGet();
  498. numSuccessfulContainers.incrementAndGet();
  499. }
  500. }
  501. }
  502. +---+
  503. * After a container has been allocated to the ApplicationMaster, it needs to
  504. follow a similar process that the Client followed in setting up the
  505. ContainerLaunchContext for the eventual task that is going to be running on
  506. the allocated Container. Once the ContainerLaunchContext is defined, the
  507. ApplicationMaster can then communicate with the ContainerManager to start
  508. its allocated container.
  509. +---+
  510. //Assuming an allocated Container obtained from AMResponse
  511. Container container;
  512. // Connect to ContainerManager on the allocated container
  513. String cmIpPortStr = container.getNodeId().getHost() + ":"
  514. + container.getNodeId().getPort();
  515. InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
  516. ContainerManager cm =
  517. (ContainerManager)rpc.getProxy(ContainerManager.class, cmAddress, conf);
  518. // Now we setup a ContainerLaunchContext
  519. ContainerLaunchContext ctx =
  520. Records.newRecord(ContainerLaunchContext.class);
  521. ctx.setContainerId(container.getId());
  522. ctx.setResource(container.getResource());
  523. try {
  524. ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
  525. } catch (IOException e) {
  526. LOG.info(
  527. "Getting current user failed when trying to launch the container",
  528. + e.getMessage());
  529. }
  530. // Set the environment
  531. Map<String, String> unixEnv;
  532. // Setup the required env.
  533. // Please note that the launched container does not inherit
  534. // the environment of the ApplicationMaster so all the
  535. // necessary environment settings will need to be re-setup
  536. // for this allocated container.
  537. ctx.setEnvironment(unixEnv);
  538. // Set the local resources
  539. Map<String, LocalResource> localResources =
  540. new HashMap<String, LocalResource>();
  541. // Again, the local resources from the ApplicationMaster is not copied over
  542. // by default to the allocated container. Thus, it is the responsibility
  543. // of the ApplicationMaster to setup all the necessary local resources
  544. // needed by the job that will be executed on the allocated container.
  545. // Assume that we are executing a shell script on the allocated container
  546. // and the shell script's location in the filesystem is known to us.
  547. Path shellScriptPath;
  548. LocalResource shellRsrc = Records.newRecord(LocalResource.class);
  549. shellRsrc.setType(LocalResourceType.FILE);
  550. shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
  551. shellRsrc.setResource(
  552. ConverterUtils.getYarnUrlFromURI(new URI(shellScriptPath)));
  553. shellRsrc.setTimestamp(shellScriptPathTimestamp);
  554. shellRsrc.setSize(shellScriptPathLen);
  555. localResources.put("MyExecShell.sh", shellRsrc);
  556. ctx.setLocalResources(localResources);
  557. // Set the necessary command to execute on the allocated container
  558. String command = "/bin/sh ./MyExecShell.sh"
  559. + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"
  560. + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr";
  561. List<String> commands = new ArrayList<String>();
  562. commands.add(command);
  563. ctx.setCommands(commands);
  564. // Send the start request to the ContainerManager
  565. StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
  566. startReq.setContainerLaunchContext(ctx);
  567. cm.startContainer(startReq);
  568. +---+
  569. * The ApplicationMaster, as mentioned previously, will get updates of
  570. completed containers as part of the response from the AMRMProtocol#allocate
  571. calls. It can also monitor its launched containers pro-actively by querying
  572. the ContainerManager for the status.
  573. +---+
  574. GetContainerStatusRequest statusReq =
  575. Records.newRecord(GetContainerStatusRequest.class);
  576. statusReq.setContainerId(container.getId());
  577. GetContainerStatusResponse statusResp = cm.getContainerStatus(statusReq);
  578. LOG.info("Container Status"
  579. + ", id=" + container.getId()
  580. + ", status=" + statusResp.getStatus());
  581. +---+
  582. ~~** Defining the context in which your code runs
  583. ~~*** Container Resource Requests
  584. ~~*** Local Resources
  585. ~~*** Environment
  586. ~~**** Managing the CLASSPATH
  587. ~~** Security
  588. * FAQ
  589. ** How can I distribute my application's jars to all of the nodes in the YARN
  590. cluster that need it?
  591. You can use the LocalResource to add resources to your application request.
  592. This will cause YARN to distribute the resource to the ApplicationMaster node.
  593. If the resource is a tgz, zip, or jar - you can have YARN unzip it. Then, all
  594. you need to do is add the unzipped folder to your classpath.
  595. For example, when creating your application request:
  596. +---+
  597. File packageFile = new File(packagePath);
  598. Url packageUrl = ConverterUtils.getYarnUrlFromPath(
  599. FileContext.getFileContext.makeQualified(new Path(packagePath)));
  600. packageResource.setResource(packageUrl);
  601. packageResource.setSize(packageFile.length());
  602. packageResource.setTimestamp(packageFile.lastModified());
  603. packageResource.setType(LocalResourceType.ARCHIVE);
  604. packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
  605. resource.setMemory(memory)
  606. containerCtx.setResource(resource)
  607. containerCtx.setCommands(ImmutableList.of(
  608. "java -cp './package/*' some.class.to.Run "
  609. + "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout "
  610. + "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"))
  611. containerCtx.setLocalResources(
  612. Collections.singletonMap("package", packageResource))
  613. appCtx.setApplicationId(appId)
  614. appCtx.setUser(user.getShortUserName)
  615. appCtx.setAMContainerSpec(containerCtx)
  616. request.setApplicationSubmissionContext(appCtx)
  617. applicationsManager.submitApplication(request)
  618. +---+
  619. As you can see, the setLocalResources command takes a map of names to
  620. resources. The name becomes a sym link in your application's cwd, so you can
  621. just refer to the artifacts inside by using ./package/*.
  622. Note: Java's classpath (cp) argument is VERY sensitive.
  623. Make sure you get the syntax EXACTLY correct.
  624. Once your package is distributed to your ApplicationMaster, you'll need to
  625. follow the same process whenever your ApplicationMaster starts a new container
  626. (assuming you want the resources to be sent to your container). The code for
  627. this is the same. You just need to make sure that you give your
  628. ApplicationMaster the package path (either HDFS, or local), so that it can
  629. send the resource URL along with the container ctx.
  630. ** How do I get the ApplicationMaster's ApplicationAttemptId?
  631. The ApplicationAttemptId will be passed to the ApplicationMaster via the
  632. environment and the value from the environment can be converted into an
  633. ApplicationAttemptId object via the ConverterUtils helper function.
  634. ** My container is being killed by the Node Manager
  635. This is likely due to high memory usage exceeding your requested container
  636. memory size. There are a number of reasons that can cause this. First, look
  637. at the process tree that the node manager dumps when it kills your container.
  638. The two things you're interested in are physical memory and virtual memory.
  639. If you have exceeded physical memory limits your app is using too much physical
  640. memory. If you're running a Java app, you can use -hprof to look at what is
  641. taking up space in the heap. If you have exceeded virtual memory, you may
  642. need to increase the value of the the cluster-wide configuration variable
  643. <<<yarn.nodemanager.vmem-pmem-ratio>>>.
  644. ** How do I include native libraries?
  645. Setting -Djava.library.path on the command line while launching a container
  646. can cause native libraries used by Hadoop to not be loaded correctly and can
  647. result in errors. It is cleaner to use LD_LIBRARY_PATH instead.
  648. * Useful Links
  649. * {{{https://issues.apache.org/jira/secure/attachment/12486023/MapReduce_NextGen_Architecture.pdf}Map Reduce Next Generation Architecture}}
  650. * {{{http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/}Map Reduce Next Generation Scheduler}}