|
@@ -64,7 +64,9 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
|
|
|
+import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster.DSEvent;
|
|
import org.apache.hadoop.yarn.client.api.YarnClient;
|
|
import org.apache.hadoop.yarn.client.api.YarnClient;
|
|
import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter;
|
|
import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter;
|
|
import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient;
|
|
import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient;
|
|
@@ -81,6 +83,7 @@ import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils;
|
|
import org.apache.hadoop.yarn.server.timeline.TimelineVersion;
|
|
import org.apache.hadoop.yarn.server.timeline.TimelineVersion;
|
|
import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher;
|
|
import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher;
|
|
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
|
|
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
|
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
|
|
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
|
|
@@ -523,15 +526,31 @@ public class TestDistributedShell {
|
|
"appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId()
|
|
"appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId()
|
|
+ "_000001"
|
|
+ "_000001"
|
|
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
|
|
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
|
|
- verifyEntityTypeFileExists(basePath, "DS_APP_ATTEMPT",
|
|
|
|
- appTimestampFileName);
|
|
|
|
-
|
|
|
|
- // Verify DS_CONTAINER entities posted by the client
|
|
|
|
|
|
+ File dsAppAttemptEntityFile = verifyEntityTypeFileExists(basePath,
|
|
|
|
+ "DS_APP_ATTEMPT", appTimestampFileName);
|
|
|
|
+ // Check if required events are published and same idprefix is sent for
|
|
|
|
+ // on each publish.
|
|
|
|
+ verifyEntityForTimelineV2(dsAppAttemptEntityFile,
|
|
|
|
+ DSEvent.DS_APP_ATTEMPT_START.toString(), 1, 1, 0, true);
|
|
|
|
+ // to avoid race condition of testcase, atleast check 40 times with sleep
|
|
|
|
+ // of 50ms
|
|
|
|
+ verifyEntityForTimelineV2(dsAppAttemptEntityFile,
|
|
|
|
+ DSEvent.DS_APP_ATTEMPT_END.toString(), 1, 40, 50, true);
|
|
|
|
+
|
|
|
|
+ // Verify DS_CONTAINER entities posted by the client.
|
|
String containerTimestampFileName =
|
|
String containerTimestampFileName =
|
|
"container_" + appId.getClusterTimestamp() + "_000" + appId.getId()
|
|
"container_" + appId.getClusterTimestamp() + "_000" + appId.getId()
|
|
+ "_01_000002.thist";
|
|
+ "_01_000002.thist";
|
|
- verifyEntityTypeFileExists(basePath, "DS_CONTAINER",
|
|
|
|
- containerTimestampFileName);
|
|
|
|
|
|
+ File dsContainerEntityFile = verifyEntityTypeFileExists(basePath,
|
|
|
|
+ "DS_CONTAINER", containerTimestampFileName);
|
|
|
|
+ // Check if required events are published and same idprefix is sent for
|
|
|
|
+ // on each publish.
|
|
|
|
+ verifyEntityForTimelineV2(dsContainerEntityFile,
|
|
|
|
+ DSEvent.DS_CONTAINER_START.toString(), 1, 1, 0, true);
|
|
|
|
+ // to avoid race condition of testcase, atleast check 40 times with sleep
|
|
|
|
+ // of 50ms
|
|
|
|
+ verifyEntityForTimelineV2(dsContainerEntityFile,
|
|
|
|
+ DSEvent.DS_CONTAINER_END.toString(), 1, 40, 50, true);
|
|
|
|
|
|
// Verify NM posting container metrics info.
|
|
// Verify NM posting container metrics info.
|
|
String containerMetricsTimestampFileName =
|
|
String containerMetricsTimestampFileName =
|
|
@@ -541,29 +560,13 @@ public class TestDistributedShell {
|
|
File containerEntityFile = verifyEntityTypeFileExists(basePath,
|
|
File containerEntityFile = verifyEntityTypeFileExists(basePath,
|
|
TimelineEntityType.YARN_CONTAINER.toString(),
|
|
TimelineEntityType.YARN_CONTAINER.toString(),
|
|
containerMetricsTimestampFileName);
|
|
containerMetricsTimestampFileName);
|
|
- Assert.assertEquals(
|
|
|
|
- "Container created event needs to be published atleast once",
|
|
|
|
- 1,
|
|
|
|
- getNumOfStringOccurrences(containerEntityFile,
|
|
|
|
- ContainerMetricsConstants.CREATED_EVENT_TYPE));
|
|
|
|
-
|
|
|
|
- // to avoid race condition of testcase, atleast check 4 times with sleep
|
|
|
|
- // of 500ms
|
|
|
|
- long numOfContainerFinishedOccurrences = 0;
|
|
|
|
- for (int i = 0; i < 4; i++) {
|
|
|
|
- numOfContainerFinishedOccurrences =
|
|
|
|
- getNumOfStringOccurrences(containerEntityFile,
|
|
|
|
- ContainerMetricsConstants.FINISHED_EVENT_TYPE);
|
|
|
|
- if (numOfContainerFinishedOccurrences > 0) {
|
|
|
|
- break;
|
|
|
|
- } else {
|
|
|
|
- Thread.sleep(500L);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- Assert.assertEquals(
|
|
|
|
- "Container finished event needs to be published atleast once",
|
|
|
|
- 1,
|
|
|
|
- numOfContainerFinishedOccurrences);
|
|
|
|
|
|
+ verifyEntityForTimelineV2(containerEntityFile,
|
|
|
|
+ ContainerMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, true);
|
|
|
|
+
|
|
|
|
+ // to avoid race condition of testcase, atleast check 40 times with sleep
|
|
|
|
+ // of 50ms
|
|
|
|
+ verifyEntityForTimelineV2(containerEntityFile,
|
|
|
|
+ ContainerMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, true);
|
|
|
|
|
|
// Verify RM posting Application life cycle Events are getting published
|
|
// Verify RM posting Application life cycle Events are getting published
|
|
String appMetricsTimestampFileName =
|
|
String appMetricsTimestampFileName =
|
|
@@ -573,29 +576,14 @@ public class TestDistributedShell {
|
|
verifyEntityTypeFileExists(basePath,
|
|
verifyEntityTypeFileExists(basePath,
|
|
TimelineEntityType.YARN_APPLICATION.toString(),
|
|
TimelineEntityType.YARN_APPLICATION.toString(),
|
|
appMetricsTimestampFileName);
|
|
appMetricsTimestampFileName);
|
|
- Assert.assertEquals(
|
|
|
|
- "Application created event should be published atleast once",
|
|
|
|
- 1,
|
|
|
|
- getNumOfStringOccurrences(appEntityFile,
|
|
|
|
- ApplicationMetricsConstants.CREATED_EVENT_TYPE));
|
|
|
|
-
|
|
|
|
- // to avoid race condition of testcase, atleast check 4 times with sleep
|
|
|
|
- // of 500ms
|
|
|
|
- long numOfStringOccurrences = 0;
|
|
|
|
- for (int i = 0; i < 4; i++) {
|
|
|
|
- numOfStringOccurrences =
|
|
|
|
- getNumOfStringOccurrences(appEntityFile,
|
|
|
|
- ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
|
|
|
|
- if (numOfStringOccurrences > 0) {
|
|
|
|
- break;
|
|
|
|
- } else {
|
|
|
|
- Thread.sleep(500L);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- Assert.assertEquals(
|
|
|
|
- "Application finished event should be published atleast once",
|
|
|
|
- 1,
|
|
|
|
- numOfStringOccurrences);
|
|
|
|
|
|
+ // No need to check idprefix for app.
|
|
|
|
+ verifyEntityForTimelineV2(appEntityFile,
|
|
|
|
+ ApplicationMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, false);
|
|
|
|
+
|
|
|
|
+ // to avoid race condition of testcase, atleast check 40 times with sleep
|
|
|
|
+ // of 50ms
|
|
|
|
+ verifyEntityForTimelineV2(appEntityFile,
|
|
|
|
+ ApplicationMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, false);
|
|
|
|
|
|
// Verify RM posting AppAttempt life cycle Events are getting published
|
|
// Verify RM posting AppAttempt life cycle Events are getting published
|
|
String appAttemptMetricsTimestampFileName =
|
|
String appAttemptMetricsTimestampFileName =
|
|
@@ -606,17 +594,10 @@ public class TestDistributedShell {
|
|
verifyEntityTypeFileExists(basePath,
|
|
verifyEntityTypeFileExists(basePath,
|
|
TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
|
|
TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
|
|
appAttemptMetricsTimestampFileName);
|
|
appAttemptMetricsTimestampFileName);
|
|
- Assert.assertEquals(
|
|
|
|
- "AppAttempt register event should be published atleast once",
|
|
|
|
- 1,
|
|
|
|
- getNumOfStringOccurrences(appAttemptEntityFile,
|
|
|
|
- AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE));
|
|
|
|
-
|
|
|
|
- Assert.assertEquals(
|
|
|
|
- "AppAttempt finished event should be published atleast once",
|
|
|
|
- 1,
|
|
|
|
- getNumOfStringOccurrences(appAttemptEntityFile,
|
|
|
|
- AppAttemptMetricsConstants.FINISHED_EVENT_TYPE));
|
|
|
|
|
|
+ verifyEntityForTimelineV2(appAttemptEntityFile,
|
|
|
|
+ AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 1, 1, 0, true);
|
|
|
|
+ verifyEntityForTimelineV2(appAttemptEntityFile,
|
|
|
|
+ AppAttemptMetricsConstants.FINISHED_EVENT_TYPE, 1, 1, 0, true);
|
|
} finally {
|
|
} finally {
|
|
FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
|
|
FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
|
|
}
|
|
}
|
|
@@ -636,22 +617,64 @@ public class TestDistributedShell {
|
|
return entityFile;
|
|
return entityFile;
|
|
}
|
|
}
|
|
|
|
|
|
- private long getNumOfStringOccurrences(File entityFile, String searchString)
|
|
|
|
- throws IOException {
|
|
|
|
- BufferedReader reader = null;
|
|
|
|
- String strLine;
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Checks the events and idprefix published for an entity.
|
|
|
|
+ *
|
|
|
|
+ * @param entityFile Entity file.
|
|
|
|
+ * @param expectedEvent Expected event Id.
|
|
|
|
+ * @param numOfExpectedEvent Number of expected occurences of expected event
|
|
|
|
+ * id.
|
|
|
|
+ * @param checkTimes Number of times to check.
|
|
|
|
+ * @param sleepTime Sleep time for each iteration.
|
|
|
|
+ * @param checkIdPrefix Whether to check idprefix.
|
|
|
|
+ * @throws IOException if entity file reading fails.
|
|
|
|
+ * @throws InterruptedException if sleep is interrupted.
|
|
|
|
+ */
|
|
|
|
+ private void verifyEntityForTimelineV2(File entityFile, String expectedEvent,
|
|
|
|
+ long numOfExpectedEvent, int checkTimes, long sleepTime,
|
|
|
|
+ boolean checkIdPrefix) throws IOException, InterruptedException {
|
|
long actualCount = 0;
|
|
long actualCount = 0;
|
|
- try {
|
|
|
|
- reader = new BufferedReader(new FileReader(entityFile));
|
|
|
|
- while ((strLine = reader.readLine()) != null) {
|
|
|
|
- if (strLine.trim().contains(searchString)) {
|
|
|
|
- actualCount++;
|
|
|
|
|
|
+ for (int i = 0; i < checkTimes; i++) {
|
|
|
|
+ BufferedReader reader = null;
|
|
|
|
+ String strLine = null;
|
|
|
|
+ actualCount = 0;
|
|
|
|
+ try {
|
|
|
|
+ reader = new BufferedReader(new FileReader(entityFile));
|
|
|
|
+ long idPrefix = -1;
|
|
|
|
+ while ((strLine = reader.readLine()) != null) {
|
|
|
|
+ String entityLine = strLine.trim();
|
|
|
|
+ if (entityLine.isEmpty()) {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ if (entityLine.contains(expectedEvent)) {
|
|
|
|
+ actualCount++;
|
|
|
|
+ }
|
|
|
|
+ if (checkIdPrefix) {
|
|
|
|
+ TimelineEntity entity = FileSystemTimelineReaderImpl.
|
|
|
|
+ getTimelineRecordFromJSON(entityLine, TimelineEntity.class);
|
|
|
|
+ Assert.assertTrue("Entity ID prefix expected to be > 0",
|
|
|
|
+ entity.getIdPrefix() > 0);
|
|
|
|
+ if (idPrefix == -1) {
|
|
|
|
+ idPrefix = entity.getIdPrefix();
|
|
|
|
+ } else {
|
|
|
|
+ Assert.assertEquals("Entity ID prefix should be same across " +
|
|
|
|
+ "each publish of same entity",
|
|
|
|
+ idPrefix, entity.getIdPrefix());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ } finally {
|
|
|
|
+ reader.close();
|
|
|
|
+ }
|
|
|
|
+ if (numOfExpectedEvent == actualCount) {
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ if (sleepTime > 0 && i < checkTimes - 1) {
|
|
|
|
+ Thread.sleep(sleepTime);
|
|
}
|
|
}
|
|
- } finally {
|
|
|
|
- reader.close();
|
|
|
|
}
|
|
}
|
|
- return actualCount;
|
|
|
|
|
|
+ Assert.assertEquals("Unexpected number of " + expectedEvent +
|
|
|
|
+ " event published.", numOfExpectedEvent, actualCount);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|