|
@@ -46,6 +46,7 @@ import org.apache.hadoop.util.JarFinder;
|
|
|
import org.apache.hadoop.util.Shell;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|
|
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
|
@@ -86,28 +87,33 @@ public class TestDistributedShell {
|
|
|
|
|
|
@Before
|
|
|
public void setup() throws Exception {
|
|
|
- setupInternal(NUM_NMS);
|
|
|
+ setupInternal(NUM_NMS, currTestName);
|
|
|
}
|
|
|
|
|
|
- protected void setupInternal(int numNodeManager) throws Exception {
|
|
|
-
|
|
|
+ protected void setupInternal(int numNodeManager, TestName testName)
|
|
|
+ throws Exception {
|
|
|
LOG.info("Starting up YARN cluster");
|
|
|
|
|
|
conf = new YarnConfiguration();
|
|
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
|
|
|
conf.set("yarn.log.dir", "target");
|
|
|
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
|
|
- // mark if we need to launch the v1 timeline server
|
|
|
- boolean enableATSV1 = false;
|
|
|
- if (!currTestName.getMethodName().toLowerCase().contains("v2")) {
|
|
|
+
|
|
|
+ if (!testName.getMethodName().toLowerCase().contains("v2")) {
|
|
|
// disable aux-service based timeline collectors
|
|
|
conf.set(YarnConfiguration.NM_AUX_SERVICES, "");
|
|
|
- enableATSV1 = true;
|
|
|
+ conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
|
|
|
+ true);
|
|
|
+ conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, false);
|
|
|
} else {
|
|
|
// enable aux-service based timeline collectors
|
|
|
conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
|
|
|
- conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME
|
|
|
- + ".class", PerNodeTimelineCollectorsAuxService.class.getName());
|
|
|
+ conf.set(YarnConfiguration.NM_AUX_SERVICES + "."
|
|
|
+ + TIMELINE_AUX_SERVICE_NAME + ".class",
|
|
|
+ PerNodeTimelineCollectorsAuxService.class.getName());
|
|
|
+ conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
|
|
|
+ conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
|
|
|
+ false);
|
|
|
}
|
|
|
conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "8");
|
|
|
conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
|
|
@@ -123,12 +129,11 @@ public class TestDistributedShell {
|
|
|
conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
|
|
|
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
|
|
|
true);
|
|
|
- conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
|
|
|
|
|
|
if (yarnCluster == null) {
|
|
|
yarnCluster =
|
|
|
new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1,
|
|
|
- numNodeManager, 1, 1, enableATSV1);
|
|
|
+ numNodeManager, 1, 1);
|
|
|
yarnCluster.init(conf);
|
|
|
|
|
|
yarnCluster.start();
|
|
@@ -303,13 +308,15 @@ public class TestDistributedShell {
|
|
|
if (checkHostname(appReport.getHost()) && appReport.getRpcPort() == -1) {
|
|
|
verified = true;
|
|
|
}
|
|
|
- if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED) {
|
|
|
+
|
|
|
+ if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED
|
|
|
+ && appReport.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
Assert.assertTrue(errorMessage, verified);
|
|
|
t.join();
|
|
|
- LOG.info("Client run completed. Result=" + result);
|
|
|
+ LOG.info("Client run completed for testDSShell. Result=" + result);
|
|
|
Assert.assertTrue(result.get());
|
|
|
|
|
|
if (!isTestingTimelineV2) {
|
|
@@ -364,9 +371,9 @@ public class TestDistributedShell {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void checkTimelineV2(
|
|
|
- boolean haveDomain, ApplicationId appId, boolean defaultFlow)
|
|
|
- throws Exception {
|
|
|
+ private void checkTimelineV2(boolean haveDomain, ApplicationId appId,
|
|
|
+ boolean defaultFlow) throws Exception {
|
|
|
+ LOG.info("Started checkTimelineV2 ");
|
|
|
// For PoC check in /tmp/timeline_service_data YARN-3264
|
|
|
String tmpRoot =
|
|
|
FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
|
|
@@ -417,12 +424,29 @@ public class TestDistributedShell {
|
|
|
verifyEntityTypeFileExists(basePath,
|
|
|
TimelineEntityType.YARN_APPLICATION.toString(),
|
|
|
appMetricsTimestampFileName);
|
|
|
- verifyStringExistsSpecifiedTimes(appEntityFile,
|
|
|
- ApplicationMetricsConstants.CREATED_EVENT_TYPE, 1,
|
|
|
- "Application created event should be published atleast once");
|
|
|
- verifyStringExistsSpecifiedTimes(appEntityFile,
|
|
|
- ApplicationMetricsConstants.FINISHED_EVENT_TYPE, 1,
|
|
|
- "Application finished event should be published atleast once");
|
|
|
+ Assert.assertEquals(
|
|
|
+ "Application created event should be published atleast once",
|
|
|
+ 1,
|
|
|
+ getNumOfStringOccurences(appEntityFile,
|
|
|
+ ApplicationMetricsConstants.CREATED_EVENT_TYPE));
|
|
|
+
|
|
|
+ // to avoid race condition of testcase, atleast check 4 times with sleep
|
|
|
+ // of 500ms
|
|
|
+ long numOfStringOccurences = 0;
|
|
|
+ for (int i = 0; i < 4; i++) {
|
|
|
+ numOfStringOccurences =
|
|
|
+ getNumOfStringOccurences(appEntityFile,
|
|
|
+ ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
|
|
|
+ if (numOfStringOccurences > 0) {
|
|
|
+ break;
|
|
|
+ } else {
|
|
|
+ Thread.sleep(500l);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Assert.assertEquals(
|
|
|
+ "Application finished event should be published atleast once",
|
|
|
+ 1,
|
|
|
+ numOfStringOccurences);
|
|
|
|
|
|
// Verify RM posting AppAttempt life cycle Events are getting published
|
|
|
String appAttemptMetricsTimestampFileName =
|
|
@@ -433,12 +457,17 @@ public class TestDistributedShell {
|
|
|
verifyEntityTypeFileExists(basePath,
|
|
|
TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
|
|
|
appAttemptMetricsTimestampFileName);
|
|
|
- verifyStringExistsSpecifiedTimes(appAttemptEntityFile,
|
|
|
- AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 1,
|
|
|
- "AppAttempt register event should be published atleast once");
|
|
|
- verifyStringExistsSpecifiedTimes(appAttemptEntityFile,
|
|
|
- AppAttemptMetricsConstants.FINISHED_EVENT_TYPE, 1,
|
|
|
- "AppAttempt finished event should be published atleast once");
|
|
|
+ Assert.assertEquals(
|
|
|
+ "AppAttempt register event should be published atleast once",
|
|
|
+ 1,
|
|
|
+ getNumOfStringOccurences(appAttemptEntityFile,
|
|
|
+ AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE));
|
|
|
+
|
|
|
+ Assert.assertEquals(
|
|
|
+ "AppAttempt finished event should be published atleast once",
|
|
|
+ 1,
|
|
|
+ getNumOfStringOccurences(appAttemptEntityFile,
|
|
|
+ AppAttemptMetricsConstants.FINISHED_EVENT_TYPE));
|
|
|
} finally {
|
|
|
FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
|
|
|
}
|
|
@@ -457,8 +486,7 @@ public class TestDistributedShell {
|
|
|
return entityFile;
|
|
|
}
|
|
|
|
|
|
- private void verifyStringExistsSpecifiedTimes(File entityFile,
|
|
|
- String searchString, long expectedNumOfTimes, String errorMsg)
|
|
|
+ private long getNumOfStringOccurences(File entityFile, String searchString)
|
|
|
throws IOException {
|
|
|
BufferedReader reader = null;
|
|
|
String strLine;
|
|
@@ -472,7 +500,7 @@ public class TestDistributedShell {
|
|
|
} finally {
|
|
|
reader.close();
|
|
|
}
|
|
|
- Assert.assertEquals(errorMsg, expectedNumOfTimes, actualCount);
|
|
|
+ return actualCount;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1117,4 +1145,3 @@ public class TestDistributedShell {
|
|
|
return numOfWords;
|
|
|
}
|
|
|
}
|
|
|
-
|