|
@@ -37,6 +37,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
|
+import org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState;
|
|
|
|
+import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
|
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
import org.junit.After;
|
|
import org.junit.After;
|
|
import org.junit.AfterClass;
|
|
import org.junit.AfterClass;
|
|
@@ -58,7 +60,6 @@ import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
|
|
|
|
-import static org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState;
|
|
|
|
import static org.junit.Assert.assertEquals;
|
|
import static org.junit.Assert.assertEquals;
|
|
import static org.junit.Assert.assertFalse;
|
|
import static org.junit.Assert.assertFalse;
|
|
import static org.junit.Assert.assertNotEquals;
|
|
import static org.junit.Assert.assertNotEquals;
|
|
@@ -91,6 +92,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
|
|
private static ApplicationId mainTestAppId;
|
|
private static ApplicationId mainTestAppId;
|
|
private static Path mainTestAppDirPath;
|
|
private static Path mainTestAppDirPath;
|
|
private static Path testDoneDirPath;
|
|
private static Path testDoneDirPath;
|
|
|
|
+ private static Path testActiveDirPath;
|
|
private static String mainEntityLogFileName;
|
|
private static String mainEntityLogFileName;
|
|
|
|
|
|
private EntityGroupFSTimelineStore store;
|
|
private EntityGroupFSTimelineStore store;
|
|
@@ -125,23 +127,28 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
|
|
+ i);
|
|
+ i);
|
|
sampleAppIds.add(appId);
|
|
sampleAppIds.add(appId);
|
|
}
|
|
}
|
|
|
|
+ testActiveDirPath = getTestRootPath("active");
|
|
// Among all sample applicationIds, choose the first one for most of the
|
|
// Among all sample applicationIds, choose the first one for most of the
|
|
// tests.
|
|
// tests.
|
|
mainTestAppId = sampleAppIds.get(0);
|
|
mainTestAppId = sampleAppIds.get(0);
|
|
- mainTestAppDirPath = getTestRootPath(mainTestAppId.toString());
|
|
|
|
|
|
+ mainTestAppDirPath = new Path(testActiveDirPath, mainTestAppId.toString());
|
|
mainEntityLogFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
|
|
mainEntityLogFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
|
|
+ EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId);
|
|
+ EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId);
|
|
|
|
|
|
testDoneDirPath = getTestRootPath("done");
|
|
testDoneDirPath = getTestRootPath("done");
|
|
config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
|
|
config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
|
|
testDoneDirPath.toString());
|
|
testDoneDirPath.toString());
|
|
|
|
+ config.set(
|
|
|
|
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
|
|
|
|
+ testActiveDirPath.toString());
|
|
}
|
|
}
|
|
|
|
|
|
@Before
|
|
@Before
|
|
public void setup() throws Exception {
|
|
public void setup() throws Exception {
|
|
for (ApplicationId appId : sampleAppIds) {
|
|
for (ApplicationId appId : sampleAppIds) {
|
|
- Path attemotDirPath = new Path(getTestRootPath(appId.toString()),
|
|
|
|
- getAttemptDirName(appId));
|
|
|
|
|
|
+ Path attemotDirPath =
|
|
|
|
+ new Path(new Path(testActiveDirPath, appId.toString()),
|
|
|
|
+ getAttemptDirName(appId));
|
|
createTestFiles(appId, attemotDirPath);
|
|
createTestFiles(appId, attemotDirPath);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -178,7 +185,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
|
|
public void tearDown() throws Exception {
|
|
public void tearDown() throws Exception {
|
|
store.stop();
|
|
store.stop();
|
|
for (ApplicationId appId : sampleAppIds) {
|
|
for (ApplicationId appId : sampleAppIds) {
|
|
- fs.delete(getTestRootPath(appId.toString()), true);
|
|
|
|
|
|
+ fs.delete(new Path(testActiveDirPath,appId.toString()), true);
|
|
}
|
|
}
|
|
if (testJar != null) {
|
|
if (testJar != null) {
|
|
testJar.delete();
|
|
testJar.delete();
|
|
@@ -414,8 +421,88 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Test
|
|
|
|
+ public void testGetEntityPluginRead() throws Exception {
|
|
|
|
+ EntityGroupFSTimelineStore store = null;
|
|
|
|
+ ApplicationId appId =
|
|
|
|
+ ApplicationId.fromString("application_1501509265053_0001");
|
|
|
|
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
|
|
|
|
+ Path userBase = new Path(testActiveDirPath, user);
|
|
|
|
+ Path userAppRoot = new Path(userBase, appId.toString());
|
|
|
|
+ Path attemotDirPath = new Path(userAppRoot, getAttemptDirName(appId));
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ store = createAndStartTimelineStore(AppState.ACTIVE);
|
|
|
|
+ String logFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
|
|
|
|
+ + EntityGroupPlugInForTest.getStandardTimelineGroupId(appId);
|
|
|
|
+ createTestFiles(appId, attemotDirPath, logFileName);
|
|
|
|
+ TimelineEntity entity = store.getEntity(entityNew.getEntityId(),
|
|
|
|
+ entityNew.getEntityType(), EnumSet.allOf(Field.class));
|
|
|
|
+ assertNotNull(entity);
|
|
|
|
+ assertEquals(entityNew.getEntityId(), entity.getEntityId());
|
|
|
|
+ assertEquals(entityNew.getEntityType(), entity.getEntityType());
|
|
|
|
+ } finally {
|
|
|
|
+ if (store != null) {
|
|
|
|
+ store.stop();
|
|
|
|
+ }
|
|
|
|
+ fs.delete(userBase, true);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testScanActiveLogsAndMoveToDonePluginRead() throws Exception {
|
|
|
|
+ EntityGroupFSTimelineStore store = null;
|
|
|
|
+ ApplicationId appId =
|
|
|
|
+ ApplicationId.fromString("application_1501509265053_0002");
|
|
|
|
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
|
|
|
|
+ Path userBase = new Path(testActiveDirPath, user);
|
|
|
|
+ Path userAppRoot = new Path(userBase, appId.toString());
|
|
|
|
+ Path attemotDirPath = new Path(userAppRoot, getAttemptDirName(appId));
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ store = createAndStartTimelineStore(AppState.COMPLETED);
|
|
|
|
+ String logFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
|
|
|
|
+ + EntityGroupPlugInForTest.getStandardTimelineGroupId(appId);
|
|
|
|
+ createTestFiles(appId, attemotDirPath, logFileName);
|
|
|
|
+ store.scanActiveLogs();
|
|
|
|
+
|
|
|
|
+ TimelineEntity entity = store.getEntity(entityNew.getEntityId(),
|
|
|
|
+ entityNew.getEntityType(), EnumSet.allOf(Field.class));
|
|
|
|
+ assertNotNull(entity);
|
|
|
|
+ assertEquals(entityNew.getEntityId(), entity.getEntityId());
|
|
|
|
+ assertEquals(entityNew.getEntityType(), entity.getEntityType());
|
|
|
|
+ } finally {
|
|
|
|
+ if (store != null) {
|
|
|
|
+ store.stop();
|
|
|
|
+ }
|
|
|
|
+ fs.delete(userBase, true);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private EntityGroupFSTimelineStore createAndStartTimelineStore(
|
|
|
|
+ final AppState appstate) {
|
|
|
|
+ // stop before creating new store to get the lock
|
|
|
|
+ store.stop();
|
|
|
|
+
|
|
|
|
+ EntityGroupFSTimelineStore newStore = new EntityGroupFSTimelineStore() {
|
|
|
|
+ @Override
|
|
|
|
+ protected AppState getAppState(ApplicationId appId) throws IOException {
|
|
|
|
+ return appstate;
|
|
|
|
+ }
|
|
|
|
+ };
|
|
|
|
+ newStore.init(config);
|
|
|
|
+ newStore.setFs(fs);
|
|
|
|
+ newStore.start();
|
|
|
|
+ return newStore;
|
|
|
|
+ }
|
|
|
|
+
|
|
private void createTestFiles(ApplicationId appId, Path attemptDirPath)
|
|
private void createTestFiles(ApplicationId appId, Path attemptDirPath)
|
|
throws IOException {
|
|
throws IOException {
|
|
|
|
+ createTestFiles(appId, attemptDirPath, mainEntityLogFileName);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void createTestFiles(ApplicationId appId, Path attemptDirPath,
|
|
|
|
+ String logPath) throws IOException {
|
|
TimelineEntities entities = PluginStoreTestUtils.generateTestEntities();
|
|
TimelineEntities entities = PluginStoreTestUtils.generateTestEntities();
|
|
PluginStoreTestUtils.writeEntities(entities,
|
|
PluginStoreTestUtils.writeEntities(entities,
|
|
new Path(attemptDirPath, TEST_SUMMARY_LOG_FILE_NAME), fs);
|
|
new Path(attemptDirPath, TEST_SUMMARY_LOG_FILE_NAME), fs);
|
|
@@ -429,7 +516,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
|
|
TimelineEntities entityList = new TimelineEntities();
|
|
TimelineEntities entityList = new TimelineEntities();
|
|
entityList.addEntity(entityNew);
|
|
entityList.addEntity(entityNew);
|
|
PluginStoreTestUtils.writeEntities(entityList,
|
|
PluginStoreTestUtils.writeEntities(entityList,
|
|
- new Path(attemptDirPath, mainEntityLogFileName), fs);
|
|
|
|
|
|
+ new Path(attemptDirPath, logPath), fs);
|
|
|
|
|
|
FSDataOutputStream out = fs.create(
|
|
FSDataOutputStream out = fs.create(
|
|
new Path(attemptDirPath, TEST_DOMAIN_LOG_FILE_NAME));
|
|
new Path(attemptDirPath, TEST_DOMAIN_LOG_FILE_NAME));
|