|
@@ -217,14 +217,11 @@ public class ApplicationMaster {
|
|
|
// Tracking url to which app master publishes info for clients to monitor
|
|
|
private String appMasterTrackingUrl = "";
|
|
|
|
|
|
- private boolean newTimelineService = false;
|
|
|
+ private boolean timelineServiceV2 = false;
|
|
|
|
|
|
// For posting entities in new timeline service in a non-blocking way
|
|
|
// TODO replace with event loop in TimelineClient.
|
|
|
- private static ExecutorService threadPool =
|
|
|
- Executors.newCachedThreadPool(
|
|
|
- new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
|
|
|
- .build());
|
|
|
+ private ExecutorService threadPool;
|
|
|
|
|
|
// App Master configuration
|
|
|
// No. of containers to run shell command on
|
|
@@ -314,8 +311,10 @@ public class ApplicationMaster {
|
|
|
}
|
|
|
appMaster.run();
|
|
|
result = appMaster.finish();
|
|
|
-
|
|
|
- shutdownAndAwaitTermination();
|
|
|
+
|
|
|
+ if (appMaster.threadPool != null) {
|
|
|
+ appMaster.shutdownAndAwaitTermination();
|
|
|
+ }
|
|
|
} catch (Throwable t) {
|
|
|
LOG.fatal("Error running ApplicationMaster", t);
|
|
|
LogManager.shutdown();
|
|
@@ -329,16 +328,22 @@ public class ApplicationMaster {
|
|
|
System.exit(2);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
//TODO remove threadPool after adding non-blocking call in TimelineClient
|
|
|
- private static void shutdownAndAwaitTermination() {
|
|
|
+ private ExecutorService createThreadPool() {
|
|
|
+ return Executors.newCachedThreadPool(
|
|
|
+ new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
|
|
|
+ .build());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void shutdownAndAwaitTermination() {
|
|
|
threadPool.shutdown();
|
|
|
try {
|
|
|
// Wait a while for existing tasks to terminate
|
|
|
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
|
|
|
threadPool.shutdownNow();
|
|
|
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
|
|
|
- LOG.error("ThreadPool did not terminate");
|
|
|
+ LOG.error("ThreadPool did not terminate");
|
|
|
}
|
|
|
} catch (InterruptedException ie) {
|
|
|
threadPool.shutdownNow();
|
|
@@ -404,8 +409,7 @@ public class ApplicationMaster {
|
|
|
"No. of containers on which the shell command needs to be executed");
|
|
|
opts.addOption("priority", true, "Application Priority. Default 0");
|
|
|
opts.addOption("debug", false, "Dump out debug information");
|
|
|
- opts.addOption("timeline_service_version", true,
|
|
|
- "Version for timeline service");
|
|
|
+
|
|
|
opts.addOption("help", false, "Print usage");
|
|
|
CommandLine cliParser = new GnuParser().parse(opts, args);
|
|
|
|
|
@@ -542,27 +546,15 @@ public class ApplicationMaster {
|
|
|
requestPriority = Integer.parseInt(cliParser
|
|
|
.getOptionValue("priority", "0"));
|
|
|
|
|
|
- if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
|
|
|
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
|
|
|
- if (cliParser.hasOption("timeline_service_version")) {
|
|
|
- String timelineServiceVersion =
|
|
|
- cliParser.getOptionValue("timeline_service_version", "v1");
|
|
|
- if (timelineServiceVersion.trim().equalsIgnoreCase("v1")) {
|
|
|
- newTimelineService = false;
|
|
|
- } else if (timelineServiceVersion.trim().equalsIgnoreCase("v2")) {
|
|
|
- newTimelineService = true;
|
|
|
- } else {
|
|
|
- throw new IllegalArgumentException(
|
|
|
- "timeline_service_version is not set properly, should be 'v1' or 'v2'");
|
|
|
- }
|
|
|
+ if (YarnConfiguration.timelineServiceEnabled(conf)) {
|
|
|
+ timelineServiceV2 =
|
|
|
+ YarnConfiguration.timelineServiceV2Enabled(conf);
|
|
|
+ if (timelineServiceV2) {
|
|
|
+ threadPool = createThreadPool();
|
|
|
}
|
|
|
} else {
|
|
|
timelineClient = null;
|
|
|
LOG.warn("Timeline service is not enabled");
|
|
|
- if (cliParser.hasOption("timeline_service_version")) {
|
|
|
- throw new IllegalArgumentException(
|
|
|
- "Timeline service is not enabled");
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
return true;
|
|
@@ -624,16 +616,16 @@ public class ApplicationMaster {
|
|
|
nmClientAsync.start();
|
|
|
|
|
|
startTimelineClient(conf);
|
|
|
- // need to bind timelineClient
|
|
|
- amRMClient.registerTimelineClient(timelineClient);
|
|
|
+ if (timelineServiceV2) {
|
|
|
+ // need to bind timelineClient
|
|
|
+ amRMClient.registerTimelineClient(timelineClient);
|
|
|
+ }
|
|
|
if(timelineClient != null) {
|
|
|
- if (newTimelineService) {
|
|
|
- publishApplicationAttemptEventOnNewTimelineService(timelineClient,
|
|
|
- appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_START, domainId,
|
|
|
- appSubmitterUgi);
|
|
|
+ if (timelineServiceV2) {
|
|
|
+ publishApplicationAttemptEventOnTimelineServiceV2(
|
|
|
+ DSEvent.DS_APP_ATTEMPT_START);
|
|
|
} else {
|
|
|
- publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
|
|
|
- DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
|
|
|
+ publishApplicationAttemptEvent(DSEvent.DS_APP_ATTEMPT_START);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -704,10 +696,9 @@ public class ApplicationMaster {
|
|
|
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
|
|
|
@Override
|
|
|
public Void run() throws Exception {
|
|
|
- if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
|
|
|
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
|
|
|
+ if (YarnConfiguration.timelineServiceEnabled(conf)) {
|
|
|
// Creating the Timeline Client
|
|
|
- if (newTimelineService) {
|
|
|
+ if (timelineServiceV2) {
|
|
|
timelineClient = TimelineClient.createTimelineClient(
|
|
|
appAttemptID.getApplicationId());
|
|
|
} else {
|
|
@@ -743,13 +734,11 @@ public class ApplicationMaster {
|
|
|
}
|
|
|
|
|
|
if (timelineClient != null) {
|
|
|
- if (newTimelineService) {
|
|
|
- publishApplicationAttemptEventOnNewTimelineService(timelineClient,
|
|
|
- appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_END, domainId,
|
|
|
- appSubmitterUgi);
|
|
|
+ if (timelineServiceV2) {
|
|
|
+ publishApplicationAttemptEventOnTimelineServiceV2(
|
|
|
+ DSEvent.DS_APP_ATTEMPT_END);
|
|
|
} else {
|
|
|
- publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
|
|
|
- DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
|
|
|
+ publishApplicationAttemptEvent(DSEvent.DS_APP_ATTEMPT_END);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -856,12 +845,10 @@ public class ApplicationMaster {
|
|
|
+ containerStatus.getContainerId());
|
|
|
}
|
|
|
if(timelineClient != null) {
|
|
|
- if (newTimelineService) {
|
|
|
- publishContainerEndEventOnNewTimelineService(
|
|
|
- timelineClient, containerStatus, domainId, appSubmitterUgi);
|
|
|
+ if (timelineServiceV2) {
|
|
|
+ publishContainerEndEventOnTimelineServiceV2(containerStatus);
|
|
|
} else {
|
|
|
- publishContainerEndEvent(
|
|
|
- timelineClient, containerStatus, domainId, appSubmitterUgi);
|
|
|
+ publishContainerEndEvent(containerStatus);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -984,14 +971,11 @@ public class ApplicationMaster {
|
|
|
applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
|
|
|
}
|
|
|
if(applicationMaster.timelineClient != null) {
|
|
|
- if (applicationMaster.newTimelineService) {
|
|
|
- ApplicationMaster.publishContainerStartEventOnNewTimelineService(
|
|
|
- applicationMaster.timelineClient, container,
|
|
|
- applicationMaster.domainId, applicationMaster.appSubmitterUgi);
|
|
|
+ if (applicationMaster.timelineServiceV2) {
|
|
|
+ applicationMaster.publishContainerStartEventOnTimelineServiceV2(
|
|
|
+ container);
|
|
|
} else {
|
|
|
- ApplicationMaster.publishContainerStartEvent(
|
|
|
- applicationMaster.timelineClient, container,
|
|
|
- applicationMaster.domainId, applicationMaster.appSubmitterUgi);
|
|
|
+ applicationMaster.publishContainerStartEvent(container);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1207,14 +1191,12 @@ public class ApplicationMaster {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static void publishContainerStartEvent(
|
|
|
- final TimelineClient timelineClient, Container container, String domainId,
|
|
|
- UserGroupInformation ugi) {
|
|
|
+ private void publishContainerStartEvent(Container container) {
|
|
|
final TimelineEntity entity = new TimelineEntity();
|
|
|
entity.setEntityId(container.getId().toString());
|
|
|
entity.setEntityType(DSEntity.DS_CONTAINER.toString());
|
|
|
entity.setDomainId(domainId);
|
|
|
- entity.addPrimaryFilter("user", ugi.getShortUserName());
|
|
|
+ entity.addPrimaryFilter("user", appSubmitterUgi.getShortUserName());
|
|
|
TimelineEvent event = new TimelineEvent();
|
|
|
event.setTimestamp(System.currentTimeMillis());
|
|
|
event.setEventType(DSEvent.DS_CONTAINER_START.toString());
|
|
@@ -1223,13 +1205,13 @@ public class ApplicationMaster {
|
|
|
entity.addEvent(event);
|
|
|
|
|
|
try {
|
|
|
- ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
|
|
|
- @Override
|
|
|
- public TimelinePutResponse run() throws Exception {
|
|
|
- return processTimelineResponseErrors(
|
|
|
- timelineClient.putEntities(entity));
|
|
|
- }
|
|
|
- });
|
|
|
+ appSubmitterUgi.doAs(
|
|
|
+ new PrivilegedExceptionAction<TimelinePutResponse>() {
|
|
|
+ @Override
|
|
|
+ public TimelinePutResponse run() throws Exception {
|
|
|
+ return timelineClient.putEntities(entity);
|
|
|
+ }
|
|
|
+ });
|
|
|
} catch (Exception e) {
|
|
|
LOG.error("Container start event could not be published for "
|
|
|
+ container.getId().toString(),
|
|
@@ -1237,14 +1219,12 @@ public class ApplicationMaster {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static void publishContainerEndEvent(
|
|
|
- final TimelineClient timelineClient, ContainerStatus container,
|
|
|
- String domainId, UserGroupInformation ugi) {
|
|
|
+ private void publishContainerEndEvent(ContainerStatus container) {
|
|
|
final TimelineEntity entity = new TimelineEntity();
|
|
|
entity.setEntityId(container.getContainerId().toString());
|
|
|
entity.setEntityType(DSEntity.DS_CONTAINER.toString());
|
|
|
entity.setDomainId(domainId);
|
|
|
- entity.addPrimaryFilter("user", ugi.getShortUserName());
|
|
|
+ entity.addPrimaryFilter("user", appSubmitterUgi.getShortUserName());
|
|
|
TimelineEvent event = new TimelineEvent();
|
|
|
event.setTimestamp(System.currentTimeMillis());
|
|
|
event.setEventType(DSEvent.DS_CONTAINER_END.toString());
|
|
@@ -1260,14 +1240,12 @@ public class ApplicationMaster {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static void publishApplicationAttemptEvent(
|
|
|
- final TimelineClient timelineClient, String appAttemptId,
|
|
|
- DSEvent appEvent, String domainId, UserGroupInformation ugi) {
|
|
|
+ private void publishApplicationAttemptEvent(DSEvent appEvent) {
|
|
|
final TimelineEntity entity = new TimelineEntity();
|
|
|
- entity.setEntityId(appAttemptId);
|
|
|
+ entity.setEntityId(appAttemptID.toString());
|
|
|
entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
|
|
|
entity.setDomainId(domainId);
|
|
|
- entity.addPrimaryFilter("user", ugi.getShortUserName());
|
|
|
+ entity.addPrimaryFilter("user", appSubmitterUgi.getShortUserName());
|
|
|
TimelineEvent event = new TimelineEvent();
|
|
|
event.setEventType(appEvent.toString());
|
|
|
event.setTimestamp(System.currentTimeMillis());
|
|
@@ -1279,7 +1257,7 @@ public class ApplicationMaster {
|
|
|
LOG.error("App Attempt "
|
|
|
+ (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
|
|
|
+ " event could not be published for "
|
|
|
- + appAttemptId.toString(), e);
|
|
|
+ + appAttemptID, e);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1327,27 +1305,24 @@ public class ApplicationMaster {
|
|
|
return new Thread(runnableLaunchContainer);
|
|
|
}
|
|
|
|
|
|
- private static void publishContainerStartEventOnNewTimelineService(
|
|
|
- final TimelineClient timelineClient, final Container container,
|
|
|
- final String domainId, final UserGroupInformation ugi) {
|
|
|
+ private void publishContainerStartEventOnTimelineServiceV2(
|
|
|
+ final Container container) {
|
|
|
Runnable publishWrapper = new Runnable() {
|
|
|
public void run() {
|
|
|
- publishContainerStartEventOnNewTimelineServiceBase(timelineClient,
|
|
|
- container, domainId, ugi);
|
|
|
+ publishContainerStartEventOnTimelineServiceV2Base(container);
|
|
|
}
|
|
|
};
|
|
|
threadPool.execute(publishWrapper);
|
|
|
}
|
|
|
|
|
|
- private static void publishContainerStartEventOnNewTimelineServiceBase(
|
|
|
- final TimelineClient timelineClient, Container container, String domainId,
|
|
|
- UserGroupInformation ugi) {
|
|
|
+ private void publishContainerStartEventOnTimelineServiceV2Base(
|
|
|
+ Container container) {
|
|
|
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
|
|
|
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
|
|
|
entity.setId(container.getId().toString());
|
|
|
entity.setType(DSEntity.DS_CONTAINER.toString());
|
|
|
//entity.setDomainId(domainId);
|
|
|
- entity.addInfo("user", ugi.getShortUserName());
|
|
|
+ entity.addInfo("user", appSubmitterUgi.getShortUserName());
|
|
|
|
|
|
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
|
|
|
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
|
|
@@ -1358,7 +1333,7 @@ public class ApplicationMaster {
|
|
|
entity.addEvent(event);
|
|
|
|
|
|
try {
|
|
|
- ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
+ appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
@Override
|
|
|
public TimelinePutResponse run() throws Exception {
|
|
|
timelineClient.putEntities(entity);
|
|
@@ -1372,27 +1347,24 @@ public class ApplicationMaster {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static void publishContainerEndEventOnNewTimelineService(
|
|
|
- final TimelineClient timelineClient, final ContainerStatus container,
|
|
|
- final String domainId, final UserGroupInformation ugi) {
|
|
|
+ private void publishContainerEndEventOnTimelineServiceV2(
|
|
|
+ final ContainerStatus container) {
|
|
|
Runnable publishWrapper = new Runnable() {
|
|
|
public void run() {
|
|
|
- publishContainerEndEventOnNewTimelineServiceBase(timelineClient,
|
|
|
- container, domainId, ugi);
|
|
|
+ publishContainerEndEventOnTimelineServiceV2Base(container);
|
|
|
}
|
|
|
};
|
|
|
threadPool.execute(publishWrapper);
|
|
|
}
|
|
|
|
|
|
- private static void publishContainerEndEventOnNewTimelineServiceBase(
|
|
|
- final TimelineClient timelineClient, final ContainerStatus container,
|
|
|
- final String domainId, final UserGroupInformation ugi) {
|
|
|
+ private void publishContainerEndEventOnTimelineServiceV2Base(
|
|
|
+ final ContainerStatus container) {
|
|
|
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
|
|
|
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
|
|
|
entity.setId(container.getContainerId().toString());
|
|
|
entity.setType(DSEntity.DS_CONTAINER.toString());
|
|
|
//entity.setDomainId(domainId);
|
|
|
- entity.addInfo("user", ugi.getShortUserName());
|
|
|
+ entity.addInfo("user", appSubmitterUgi.getShortUserName());
|
|
|
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
|
|
|
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
|
|
|
event.setTimestamp(System.currentTimeMillis());
|
|
@@ -1402,7 +1374,7 @@ public class ApplicationMaster {
|
|
|
entity.addEvent(event);
|
|
|
|
|
|
try {
|
|
|
- ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
+ appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
@Override
|
|
|
public TimelinePutResponse run() throws Exception {
|
|
|
timelineClient.putEntities(entity);
|
|
@@ -1416,29 +1388,25 @@ public class ApplicationMaster {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static void publishApplicationAttemptEventOnNewTimelineService(
|
|
|
- final TimelineClient timelineClient, final String appAttemptId,
|
|
|
- final DSEvent appEvent, final String domainId,
|
|
|
- final UserGroupInformation ugi) {
|
|
|
+ private void publishApplicationAttemptEventOnTimelineServiceV2(
|
|
|
+ final DSEvent appEvent) {
|
|
|
|
|
|
Runnable publishWrapper = new Runnable() {
|
|
|
public void run() {
|
|
|
- publishApplicationAttemptEventOnNewTimelineServiceBase(timelineClient,
|
|
|
- appAttemptId, appEvent, domainId, ugi);
|
|
|
+ publishApplicationAttemptEventOnTimelineServiceV2Base(appEvent);
|
|
|
}
|
|
|
};
|
|
|
threadPool.execute(publishWrapper);
|
|
|
}
|
|
|
|
|
|
- private static void publishApplicationAttemptEventOnNewTimelineServiceBase(
|
|
|
- final TimelineClient timelineClient, String appAttemptId,
|
|
|
- DSEvent appEvent, String domainId, UserGroupInformation ugi) {
|
|
|
+ private void publishApplicationAttemptEventOnTimelineServiceV2Base(
|
|
|
+ DSEvent appEvent) {
|
|
|
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
|
|
|
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
|
|
|
- entity.setId(appAttemptId);
|
|
|
+ entity.setId(appAttemptID.toString());
|
|
|
entity.setType(DSEntity.DS_APP_ATTEMPT.toString());
|
|
|
//entity.setDomainId(domainId);
|
|
|
- entity.addInfo("user", ugi.getShortUserName());
|
|
|
+ entity.addInfo("user", appSubmitterUgi.getShortUserName());
|
|
|
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
|
|
|
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
|
|
|
event.setId(appEvent.toString());
|
|
@@ -1446,7 +1414,7 @@ public class ApplicationMaster {
|
|
|
entity.addEvent(event);
|
|
|
|
|
|
try {
|
|
|
- ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
+ appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
@Override
|
|
|
public TimelinePutResponse run() throws Exception {
|
|
|
timelineClient.putEntities(entity);
|
|
@@ -1457,7 +1425,7 @@ public class ApplicationMaster {
|
|
|
LOG.error("App Attempt "
|
|
|
+ (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
|
|
|
+ " event could not be published for "
|
|
|
- + appAttemptId.toString(),
|
|
|
+ + appAttemptID,
|
|
|
e instanceof UndeclaredThrowableException ? e.getCause() : e);
|
|
|
}
|
|
|
}
|