|
@@ -24,6 +24,7 @@ import java.io.File;
|
|
|
import java.io.FileInputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.io.StringReader;
|
|
|
+import java.lang.reflect.UndeclaredThrowableException;
|
|
|
import java.net.URI;
|
|
|
import java.net.URISyntaxException;
|
|
|
import java.nio.ByteBuffer;
|
|
@@ -86,6 +87,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
import org.apache.hadoop.yarn.api.records.URL;
|
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
|
|
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
|
|
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
|
|
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
|
|
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
|
|
@@ -246,6 +248,9 @@ public class ApplicationMaster {
|
|
|
// File length needed for local resource
|
|
|
private long shellScriptPathLen = 0;
|
|
|
|
|
|
+ // Timeline domain ID
|
|
|
+ private String domainId = null;
|
|
|
+
|
|
|
// Hardcoded path to shell script in launch container's local env
|
|
|
private static final String ExecShellStringPath = Client.SCRIPT_PATH + ".sh";
|
|
|
private static final String ExecBatScripStringtPath = Client.SCRIPT_PATH
|
|
@@ -465,7 +470,9 @@ public class ApplicationMaster {
|
|
|
shellScriptPathLen = Long.valueOf(envs
|
|
|
.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
|
|
|
}
|
|
|
-
|
|
|
+ if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) {
|
|
|
+ domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN);
|
|
|
+ }
|
|
|
if (!scriptPath.isEmpty()
|
|
|
&& (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
|
|
|
LOG.error("Illegal values in env for shell script path" + ", path="
|
|
@@ -515,13 +522,6 @@ public class ApplicationMaster {
|
|
|
@SuppressWarnings({ "unchecked" })
|
|
|
public void run() throws YarnException, IOException {
|
|
|
LOG.info("Starting ApplicationMaster");
|
|
|
- try {
|
|
|
- publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
|
|
|
- DSEvent.DS_APP_ATTEMPT_START);
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error("App Attempt start event could not be published for "
|
|
|
- + appAttemptID.toString(), e);
|
|
|
- }
|
|
|
|
|
|
// Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
|
|
|
// are marked as LimitedPrivate
|
|
@@ -548,6 +548,9 @@ public class ApplicationMaster {
|
|
|
UserGroupInformation.createRemoteUser(appSubmitterUserName);
|
|
|
appSubmitterUgi.addCredentials(credentials);
|
|
|
|
|
|
+ publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
|
|
|
+ DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
|
|
|
+
|
|
|
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
|
|
|
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
|
|
|
amRMClient.init(conf);
|
|
@@ -612,13 +615,9 @@ public class ApplicationMaster {
|
|
|
amRMClient.addContainerRequest(containerAsk);
|
|
|
}
|
|
|
numRequestedContainers.set(numTotalContainers);
|
|
|
- try {
|
|
|
- publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
|
|
|
- DSEvent.DS_APP_ATTEMPT_END);
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error("App Attempt start event could not be published for "
|
|
|
- + appAttemptID.toString(), e);
|
|
|
- }
|
|
|
+
|
|
|
+ publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
|
|
|
+ DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
|
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
@@ -724,12 +723,8 @@ public class ApplicationMaster {
|
|
|
LOG.info("Container completed successfully." + ", containerId="
|
|
|
+ containerStatus.getContainerId());
|
|
|
}
|
|
|
- try {
|
|
|
- publishContainerEndEvent(timelineClient, containerStatus);
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error("Container start event could not be published for "
|
|
|
- + containerStatus.getContainerId().toString(), e);
|
|
|
- }
|
|
|
+ publishContainerEndEvent(
|
|
|
+ timelineClient, containerStatus, domainId, appSubmitterUgi);
|
|
|
}
|
|
|
|
|
|
// ask for more containers if any failed
|
|
@@ -844,13 +839,9 @@ public class ApplicationMaster {
|
|
|
if (container != null) {
|
|
|
applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
|
|
|
}
|
|
|
- try {
|
|
|
- ApplicationMaster.publishContainerStartEvent(
|
|
|
- applicationMaster.timelineClient, container);
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error("Container start event could not be published for "
|
|
|
- + container.getId().toString(), e);
|
|
|
- }
|
|
|
+ ApplicationMaster.publishContainerStartEvent(
|
|
|
+ applicationMaster.timelineClient, container,
|
|
|
+ applicationMaster.domainId, applicationMaster.appSubmitterUgi);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -1050,13 +1041,14 @@ public class ApplicationMaster {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static void publishContainerStartEvent(TimelineClient timelineClient,
|
|
|
- Container container) throws IOException, YarnException {
|
|
|
- TimelineEntity entity = new TimelineEntity();
|
|
|
+ private static void publishContainerStartEvent(
|
|
|
+ final TimelineClient timelineClient, Container container, String domainId,
|
|
|
+ UserGroupInformation ugi) {
|
|
|
+ final TimelineEntity entity = new TimelineEntity();
|
|
|
entity.setEntityId(container.getId().toString());
|
|
|
entity.setEntityType(DSEntity.DS_CONTAINER.toString());
|
|
|
- entity.addPrimaryFilter("user",
|
|
|
- UserGroupInformation.getCurrentUser().getShortUserName());
|
|
|
+ entity.setDomainId(domainId);
|
|
|
+ entity.addPrimaryFilter("user", ugi.getShortUserName());
|
|
|
TimelineEvent event = new TimelineEvent();
|
|
|
event.setTimestamp(System.currentTimeMillis());
|
|
|
event.setEventType(DSEvent.DS_CONTAINER_START.toString());
|
|
@@ -1064,16 +1056,28 @@ public class ApplicationMaster {
|
|
|
event.addEventInfo("Resources", container.getResource().toString());
|
|
|
entity.addEvent(event);
|
|
|
|
|
|
- timelineClient.putEntities(entity);
|
|
|
+ try {
|
|
|
+ ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
|
|
|
+ @Override
|
|
|
+ public TimelinePutResponse run() throws Exception {
|
|
|
+ return timelineClient.putEntities(entity);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Container start event could not be published for "
|
|
|
+ + container.getId().toString(),
|
|
|
+ e instanceof UndeclaredThrowableException ? e.getCause() : e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- private static void publishContainerEndEvent(TimelineClient timelineClient,
|
|
|
- ContainerStatus container) throws IOException, YarnException {
|
|
|
- TimelineEntity entity = new TimelineEntity();
|
|
|
+ private static void publishContainerEndEvent(
|
|
|
+ final TimelineClient timelineClient, ContainerStatus container,
|
|
|
+ String domainId, UserGroupInformation ugi) {
|
|
|
+ final TimelineEntity entity = new TimelineEntity();
|
|
|
entity.setEntityId(container.getContainerId().toString());
|
|
|
entity.setEntityType(DSEntity.DS_CONTAINER.toString());
|
|
|
- entity.addPrimaryFilter("user",
|
|
|
- UserGroupInformation.getCurrentUser().getShortUserName());
|
|
|
+ entity.setDomainId(domainId);
|
|
|
+ entity.addPrimaryFilter("user", ugi.getShortUserName());
|
|
|
TimelineEvent event = new TimelineEvent();
|
|
|
event.setTimestamp(System.currentTimeMillis());
|
|
|
event.setEventType(DSEvent.DS_CONTAINER_END.toString());
|
|
@@ -1081,22 +1085,46 @@ public class ApplicationMaster {
|
|
|
event.addEventInfo("Exit Status", container.getExitStatus());
|
|
|
entity.addEvent(event);
|
|
|
|
|
|
- timelineClient.putEntities(entity);
|
|
|
+ try {
|
|
|
+ ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
|
|
|
+ @Override
|
|
|
+ public TimelinePutResponse run() throws Exception {
|
|
|
+ return timelineClient.putEntities(entity);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Container end event could not be published for "
|
|
|
+ + container.getContainerId().toString(),
|
|
|
+ e instanceof UndeclaredThrowableException ? e.getCause() : e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private static void publishApplicationAttemptEvent(
|
|
|
- TimelineClient timelineClient, String appAttemptId, DSEvent appEvent)
|
|
|
- throws IOException, YarnException {
|
|
|
- TimelineEntity entity = new TimelineEntity();
|
|
|
+ final TimelineClient timelineClient, String appAttemptId,
|
|
|
+ DSEvent appEvent, String domainId, UserGroupInformation ugi) {
|
|
|
+ final TimelineEntity entity = new TimelineEntity();
|
|
|
entity.setEntityId(appAttemptId);
|
|
|
entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
|
|
|
- entity.addPrimaryFilter("user",
|
|
|
- UserGroupInformation.getCurrentUser().getShortUserName());
|
|
|
+ entity.setDomainId(domainId);
|
|
|
+ entity.addPrimaryFilter("user", ugi.getShortUserName());
|
|
|
TimelineEvent event = new TimelineEvent();
|
|
|
event.setEventType(appEvent.toString());
|
|
|
event.setTimestamp(System.currentTimeMillis());
|
|
|
entity.addEvent(event);
|
|
|
|
|
|
- timelineClient.putEntities(entity);
|
|
|
+ try {
|
|
|
+ ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
|
|
|
+ @Override
|
|
|
+ public TimelinePutResponse run() throws Exception {
|
|
|
+ return timelineClient.putEntities(entity);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("App Attempt "
|
|
|
+ + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
|
|
|
+ + " event could not be published for "
|
|
|
+ + appAttemptId.toString(),
|
|
|
+ e instanceof UndeclaredThrowableException ? e.getCause() : e);
|
|
|
+ }
|
|
|
}
|
|
|
}
|