|
@@ -188,13 +188,14 @@ public class ApplicationMaster {
|
|
|
private AMRMClientAsync amRMClient;
|
|
|
|
|
|
// In both secure and non-secure modes, this points to the job-submitter.
|
|
|
- private UserGroupInformation appSubmitterUgi;
|
|
|
+ @VisibleForTesting
|
|
|
+ UserGroupInformation appSubmitterUgi;
|
|
|
|
|
|
// Handle to communicate with the Node Manager
|
|
|
private NMClientAsync nmClientAsync;
|
|
|
// Listen to process the response from the Node Manager
|
|
|
private NMCallbackHandler containerListener;
|
|
|
-
|
|
|
+
|
|
|
// Application Attempt Id ( combination of attemptId and fail count )
|
|
|
@VisibleForTesting
|
|
|
protected ApplicationAttemptId appAttemptID;
|
|
@@ -270,7 +271,8 @@ public class ApplicationMaster {
|
|
|
private List<Thread> launchThreads = new ArrayList<Thread>();
|
|
|
|
|
|
// Timeline Client
|
|
|
- private TimelineClient timelineClient;
|
|
|
+ @VisibleForTesting
|
|
|
+ TimelineClient timelineClient;
|
|
|
|
|
|
private final String linux_bash_command = "bash";
|
|
|
private final String windows_command = "cmd /c";
|
|
@@ -496,18 +498,6 @@ public class ApplicationMaster {
|
|
|
}
|
|
|
requestPriority = Integer.parseInt(cliParser
|
|
|
.getOptionValue("priority", "0"));
|
|
|
-
|
|
|
- if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
|
|
|
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
|
|
|
- // Creating the Timeline Client
|
|
|
- timelineClient = TimelineClient.createTimelineClient();
|
|
|
- timelineClient.init(conf);
|
|
|
- timelineClient.start();
|
|
|
- } else {
|
|
|
- timelineClient = null;
|
|
|
- LOG.warn("Timeline service is not enabled");
|
|
|
- }
|
|
|
-
|
|
|
return true;
|
|
|
}
|
|
|
|
|
@@ -527,7 +517,7 @@ public class ApplicationMaster {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
@SuppressWarnings({ "unchecked" })
|
|
|
- public void run() throws YarnException, IOException {
|
|
|
+ public void run() throws YarnException, IOException, InterruptedException {
|
|
|
LOG.info("Starting ApplicationMaster");
|
|
|
|
|
|
// Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
|
|
@@ -554,11 +544,7 @@ public class ApplicationMaster {
|
|
|
appSubmitterUgi =
|
|
|
UserGroupInformation.createRemoteUser(appSubmitterUserName);
|
|
|
appSubmitterUgi.addCredentials(credentials);
|
|
|
-
|
|
|
- if(timelineClient != null) {
|
|
|
- publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
|
|
|
- DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
|
|
|
- }
|
|
|
+
|
|
|
|
|
|
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
|
|
|
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
|
|
@@ -570,6 +556,12 @@ public class ApplicationMaster {
|
|
|
nmClientAsync.init(conf);
|
|
|
nmClientAsync.start();
|
|
|
|
|
|
+ startTimelineClient(conf);
|
|
|
+ if(timelineClient != null) {
|
|
|
+ publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
|
|
|
+ DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
|
|
|
+ }
|
|
|
+
|
|
|
// Setup local RPC Server to accept status requests directly from clients
|
|
|
// TODO need to setup a protocol for client to be able to communicate to
|
|
|
// the RPC server
|
|
@@ -624,10 +616,30 @@ public class ApplicationMaster {
|
|
|
amRMClient.addContainerRequest(containerAsk);
|
|
|
}
|
|
|
numRequestedContainers.set(numTotalContainers);
|
|
|
+ }
|
|
|
|
|
|
- if(timelineClient != null) {
|
|
|
- publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
|
|
|
- DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
|
|
|
+ @VisibleForTesting
|
|
|
+ void startTimelineClient(final Configuration conf)
|
|
|
+ throws YarnException, IOException, InterruptedException {
|
|
|
+ try {
|
|
|
+ appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
|
|
|
+ @Override
|
|
|
+ public Void run() throws Exception {
|
|
|
+ if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
|
|
|
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
|
|
|
+ // Creating the Timeline Client
|
|
|
+ timelineClient = TimelineClient.createTimelineClient();
|
|
|
+ timelineClient.init(conf);
|
|
|
+ timelineClient.start();
|
|
|
+ } else {
|
|
|
+ timelineClient = null;
|
|
|
+ LOG.warn("Timeline service is not enabled");
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } catch (UndeclaredThrowableException e) {
|
|
|
+ throw new YarnException(e.getCause());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -646,6 +658,11 @@ public class ApplicationMaster {
|
|
|
} catch (InterruptedException ex) {}
|
|
|
}
|
|
|
|
|
|
+ if(timelineClient != null) {
|
|
|
+ publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
|
|
|
+ DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
|
|
|
+ }
|
|
|
+
|
|
|
// Join all launched threads
|
|
|
// needed for when we time out
|
|
|
// and we need to release containers
|
|
@@ -1104,18 +1121,11 @@ public class ApplicationMaster {
|
|
|
event.addEventInfo("State", container.getState().name());
|
|
|
event.addEventInfo("Exit Status", container.getExitStatus());
|
|
|
entity.addEvent(event);
|
|
|
-
|
|
|
try {
|
|
|
- ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
|
|
|
- @Override
|
|
|
- public TimelinePutResponse run() throws Exception {
|
|
|
- return timelineClient.putEntities(entity);
|
|
|
- }
|
|
|
- });
|
|
|
- } catch (Exception e) {
|
|
|
+ timelineClient.putEntities(entity);
|
|
|
+ } catch (YarnException | IOException e) {
|
|
|
LOG.error("Container end event could not be published for "
|
|
|
- + container.getContainerId().toString(),
|
|
|
- e instanceof UndeclaredThrowableException ? e.getCause() : e);
|
|
|
+ + container.getContainerId().toString(), e);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1131,20 +1141,13 @@ public class ApplicationMaster {
|
|
|
event.setEventType(appEvent.toString());
|
|
|
event.setTimestamp(System.currentTimeMillis());
|
|
|
entity.addEvent(event);
|
|
|
-
|
|
|
try {
|
|
|
- ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
|
|
|
- @Override
|
|
|
- public TimelinePutResponse run() throws Exception {
|
|
|
- return timelineClient.putEntities(entity);
|
|
|
- }
|
|
|
- });
|
|
|
- } catch (Exception e) {
|
|
|
+ timelineClient.putEntities(entity);
|
|
|
+ } catch (YarnException | IOException e) {
|
|
|
LOG.error("App Attempt "
|
|
|
+ (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
|
|
|
+ " event could not be published for "
|
|
|
- + appAttemptId.toString(),
|
|
|
- e instanceof UndeclaredThrowableException ? e.getCause() : e);
|
|
|
+ + appAttemptId.toString(), e);
|
|
|
}
|
|
|
}
|
|
|
}
|