|
@@ -190,6 +190,28 @@ public class TestNMLeveldbStateStoreService {
|
|
|
return containerTokens;
|
|
|
}
|
|
|
|
|
|
+ private List<LocalizedResourceProto> loadCompletedResources(
|
|
|
+ RecoveryIterator<LocalizedResourceProto> it) throws IOException {
|
|
|
+ List<LocalizedResourceProto> completedResources =
|
|
|
+ new ArrayList<LocalizedResourceProto>();
|
|
|
+ while (it != null && it.hasNext()) {
|
|
|
+ completedResources.add(it.next());
|
|
|
+ }
|
|
|
+ return completedResources;
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map<LocalResourceProto, Path> loadStartedResources(
|
|
|
+ RecoveryIterator <Map.Entry<LocalResourceProto, Path>> it)
|
|
|
+ throws IOException {
|
|
|
+ Map<LocalResourceProto, Path> startedResources =
|
|
|
+ new HashMap<LocalResourceProto, Path>();
|
|
|
+ while (it != null &&it.hasNext()) {
|
|
|
+ Map.Entry<LocalResourceProto, Path> entry = it.next();
|
|
|
+ startedResources.put(entry.getKey(), entry.getValue());
|
|
|
+ }
|
|
|
+ return startedResources;
|
|
|
+ }
|
|
|
+
|
|
|
private void restartStateStore() throws IOException {
|
|
|
// need to close so leveldb releases database lock
|
|
|
if (stateStore != null) {
|
|
@@ -205,8 +227,10 @@ public class TestNMLeveldbStateStoreService {
|
|
|
assertNotNull(state);
|
|
|
LocalResourceTrackerState pubts = state.getPublicTrackerState();
|
|
|
assertNotNull(pubts);
|
|
|
- assertTrue(pubts.getLocalizedResources().isEmpty());
|
|
|
- assertTrue(pubts.getInProgressResources().isEmpty());
|
|
|
+ assertTrue(loadCompletedResources(pubts.getCompletedResourcesIterator())
|
|
|
+ .isEmpty());
|
|
|
+ assertTrue(loadStartedResources(pubts.getStartedResourcesIterator())
|
|
|
+ .isEmpty());
|
|
|
assertTrue(loadUserResources(state.getIterator()).isEmpty());
|
|
|
}
|
|
|
|
|
@@ -517,6 +541,111 @@ public class TestNMLeveldbStateStoreService {
|
|
|
return StartContainerRequest.newInstance(clc, containerToken);
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testLocalTrackerStateIterator() throws IOException {
|
|
|
+ String user1 = "somebody";
|
|
|
+ ApplicationId appId1 = ApplicationId.newInstance(1, 1);
|
|
|
+ ApplicationId appId2 = ApplicationId.newInstance(2, 2);
|
|
|
+
|
|
|
+ String user2 = "someone";
|
|
|
+ ApplicationId appId3 = ApplicationId.newInstance(3, 3);
|
|
|
+
|
|
|
+ // start and finish local resource for applications
|
|
|
+ Path appRsrcPath1 = new Path("hdfs://some/app/resource1");
|
|
|
+ LocalResourcePBImpl rsrcPb1 = (LocalResourcePBImpl)
|
|
|
+ LocalResource.newInstance(
|
|
|
+ URL.fromPath(appRsrcPath1),
|
|
|
+ LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
|
|
|
+ 123L, 456L);
|
|
|
+ LocalResourceProto appRsrcProto1 = rsrcPb1.getProto();
|
|
|
+ Path appRsrcLocalPath1 = new Path("/some/local/dir/for/apprsrc1");
|
|
|
+ Path appRsrcPath2 = new Path("hdfs://some/app/resource2");
|
|
|
+ LocalResourcePBImpl rsrcPb2 = (LocalResourcePBImpl)
|
|
|
+ LocalResource.newInstance(
|
|
|
+ URL.fromPath(appRsrcPath2),
|
|
|
+ LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
|
|
|
+ 123L, 456L);
|
|
|
+ LocalResourceProto appRsrcProto2 = rsrcPb2.getProto();
|
|
|
+ Path appRsrcLocalPath2 = new Path("/some/local/dir/for/apprsrc2");
|
|
|
+ Path appRsrcPath3 = new Path("hdfs://some/app/resource3");
|
|
|
+ LocalResourcePBImpl rsrcPb3 = (LocalResourcePBImpl)
|
|
|
+ LocalResource.newInstance(
|
|
|
+ URL.fromPath(appRsrcPath3),
|
|
|
+ LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
|
|
|
+ 123L, 456L);
|
|
|
+ LocalResourceProto appRsrcProto3 = rsrcPb3.getProto();
|
|
|
+ Path appRsrcLocalPath3 = new Path("/some/local/dir/for/apprsrc2");
|
|
|
+
|
|
|
+ stateStore.startResourceLocalization(user1, appId1, appRsrcProto1,
|
|
|
+ appRsrcLocalPath1);
|
|
|
+ stateStore.startResourceLocalization(user1, appId2, appRsrcProto2,
|
|
|
+ appRsrcLocalPath2);
|
|
|
+ stateStore.startResourceLocalization(user2, appId3, appRsrcProto3,
|
|
|
+ appRsrcLocalPath3);
|
|
|
+
|
|
|
+ LocalizedResourceProto appLocalizedProto1 =
|
|
|
+ LocalizedResourceProto.newBuilder()
|
|
|
+ .setResource(appRsrcProto1)
|
|
|
+ .setLocalPath(appRsrcLocalPath1.toString())
|
|
|
+ .setSize(1234567L)
|
|
|
+ .build();
|
|
|
+ LocalizedResourceProto appLocalizedProto2 =
|
|
|
+ LocalizedResourceProto.newBuilder()
|
|
|
+ .setResource(appRsrcProto2)
|
|
|
+ .setLocalPath(appRsrcLocalPath2.toString())
|
|
|
+ .setSize(1234567L)
|
|
|
+ .build();
|
|
|
+ LocalizedResourceProto appLocalizedProto3 =
|
|
|
+ LocalizedResourceProto.newBuilder()
|
|
|
+ .setResource(appRsrcProto3)
|
|
|
+ .setLocalPath(appRsrcLocalPath3.toString())
|
|
|
+ .setSize(1234567L)
|
|
|
+ .build();
|
|
|
+
|
|
|
+
|
|
|
+ stateStore.finishResourceLocalization(user1, appId1, appLocalizedProto1);
|
|
|
+ stateStore.finishResourceLocalization(user1, appId2, appLocalizedProto2);
|
|
|
+ stateStore.finishResourceLocalization(user2, appId3, appLocalizedProto3);
|
|
|
+
|
|
|
+
|
|
|
+ List<LocalizedResourceProto> completedResources =
|
|
|
+ new ArrayList<LocalizedResourceProto>();
|
|
|
+ Map<LocalResourceProto, Path> startedResources =
|
|
|
+ new HashMap<LocalResourceProto, Path>();
|
|
|
+
|
|
|
+ // restart and verify two users exist and two apps completed for user1.
|
|
|
+ restartStateStore();
|
|
|
+ RecoveredLocalizationState state = stateStore.loadLocalizationState();
|
|
|
+ Map<String, RecoveredUserResources> userResources =
|
|
|
+ loadUserResources(state.getIterator());
|
|
|
+ assertEquals(2, userResources.size());
|
|
|
+
|
|
|
+ RecoveredUserResources uResource = userResources.get(user1);
|
|
|
+ assertEquals(2, uResource.getAppTrackerStates().size());
|
|
|
+ LocalResourceTrackerState app1ts =
|
|
|
+ uResource.getAppTrackerStates().get(appId1);
|
|
|
+ assertNotNull(app1ts);
|
|
|
+ completedResources = loadCompletedResources(
|
|
|
+ app1ts.getCompletedResourcesIterator());
|
|
|
+ startedResources = loadStartedResources(
|
|
|
+ app1ts.getStartedResourcesIterator());
|
|
|
+ assertTrue(startedResources.isEmpty());
|
|
|
+ assertEquals(1, completedResources.size());
|
|
|
+ assertEquals(appLocalizedProto1,
|
|
|
+ completedResources.iterator().next());
|
|
|
+ LocalResourceTrackerState app2ts =
|
|
|
+ uResource.getAppTrackerStates().get(appId2);
|
|
|
+ assertNotNull(app2ts);
|
|
|
+ completedResources = loadCompletedResources(
|
|
|
+ app2ts.getCompletedResourcesIterator());
|
|
|
+ startedResources = loadStartedResources(
|
|
|
+ app2ts.getStartedResourcesIterator());
|
|
|
+ assertTrue(startedResources.isEmpty());
|
|
|
+ assertEquals(1, completedResources.size());
|
|
|
+ assertEquals(appLocalizedProto2,
|
|
|
+ completedResources.iterator().next());
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testStartResourceLocalization() throws IOException {
|
|
|
String user = "somebody";
|
|
@@ -534,27 +663,44 @@ public class TestNMLeveldbStateStoreService {
|
|
|
stateStore.startResourceLocalization(user, appId, appRsrcProto,
|
|
|
appRsrcLocalPath);
|
|
|
|
|
|
+ List<LocalizedResourceProto> completedResources =
|
|
|
+ new ArrayList<LocalizedResourceProto>();
|
|
|
+ Map<LocalResourceProto, Path> startedResources =
|
|
|
+ new HashMap<LocalResourceProto, Path>();
|
|
|
+
|
|
|
// restart and verify only app resource is marked in-progress
|
|
|
restartStateStore();
|
|
|
RecoveredLocalizationState state = stateStore.loadLocalizationState();
|
|
|
LocalResourceTrackerState pubts = state.getPublicTrackerState();
|
|
|
- assertTrue(pubts.getLocalizedResources().isEmpty());
|
|
|
- assertTrue(pubts.getInProgressResources().isEmpty());
|
|
|
+ completedResources = loadCompletedResources(
|
|
|
+ pubts.getCompletedResourcesIterator());
|
|
|
+ startedResources = loadStartedResources(
|
|
|
+ pubts.getStartedResourcesIterator());
|
|
|
+ assertTrue(completedResources.isEmpty());
|
|
|
+ assertTrue(startedResources.isEmpty());
|
|
|
Map<String, RecoveredUserResources> userResources =
|
|
|
loadUserResources(state.getIterator());
|
|
|
assertEquals(1, userResources.size());
|
|
|
RecoveredUserResources rur = userResources.get(user);
|
|
|
LocalResourceTrackerState privts = rur.getPrivateTrackerState();
|
|
|
assertNotNull(privts);
|
|
|
- assertTrue(privts.getLocalizedResources().isEmpty());
|
|
|
- assertTrue(privts.getInProgressResources().isEmpty());
|
|
|
+ completedResources = loadCompletedResources(
|
|
|
+ privts.getCompletedResourcesIterator());
|
|
|
+ startedResources = loadStartedResources(
|
|
|
+ privts.getStartedResourcesIterator());
|
|
|
+ assertTrue(completedResources.isEmpty());
|
|
|
+ assertTrue(startedResources.isEmpty());
|
|
|
assertEquals(1, rur.getAppTrackerStates().size());
|
|
|
LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId);
|
|
|
assertNotNull(appts);
|
|
|
- assertTrue(appts.getLocalizedResources().isEmpty());
|
|
|
- assertEquals(1, appts.getInProgressResources().size());
|
|
|
+ completedResources = loadCompletedResources(
|
|
|
+ appts.getCompletedResourcesIterator());
|
|
|
+ startedResources = loadStartedResources(
|
|
|
+ appts.getStartedResourcesIterator());
|
|
|
+ assertTrue(completedResources.isEmpty());
|
|
|
+ assertEquals(1, startedResources.size());
|
|
|
assertEquals(appRsrcLocalPath,
|
|
|
- appts.getInProgressResources().get(appRsrcProto));
|
|
|
+ startedResources.get(appRsrcProto));
|
|
|
|
|
|
// start some public and private resources
|
|
|
Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
|
|
@@ -589,28 +735,40 @@ public class TestNMLeveldbStateStoreService {
|
|
|
restartStateStore();
|
|
|
state = stateStore.loadLocalizationState();
|
|
|
pubts = state.getPublicTrackerState();
|
|
|
- assertTrue(pubts.getLocalizedResources().isEmpty());
|
|
|
- assertEquals(2, pubts.getInProgressResources().size());
|
|
|
+ completedResources = loadCompletedResources(
|
|
|
+ pubts.getCompletedResourcesIterator());
|
|
|
+ startedResources = loadStartedResources(
|
|
|
+ pubts.getStartedResourcesIterator());
|
|
|
+ assertTrue(completedResources.isEmpty());
|
|
|
+ assertEquals(2, startedResources.size());
|
|
|
assertEquals(pubRsrcLocalPath1,
|
|
|
- pubts.getInProgressResources().get(pubRsrcProto1));
|
|
|
+ startedResources.get(pubRsrcProto1));
|
|
|
assertEquals(pubRsrcLocalPath2,
|
|
|
- pubts.getInProgressResources().get(pubRsrcProto2));
|
|
|
+ startedResources.get(pubRsrcProto2));
|
|
|
userResources = loadUserResources(state.getIterator());
|
|
|
assertEquals(1, userResources.size());
|
|
|
rur = userResources.get(user);
|
|
|
privts = rur.getPrivateTrackerState();
|
|
|
assertNotNull(privts);
|
|
|
- assertTrue(privts.getLocalizedResources().isEmpty());
|
|
|
- assertEquals(1, privts.getInProgressResources().size());
|
|
|
+ completedResources = loadCompletedResources(
|
|
|
+ privts.getCompletedResourcesIterator());
|
|
|
+ startedResources = loadStartedResources(
|
|
|
+ privts.getStartedResourcesIterator());
|
|
|
+ assertTrue(completedResources.isEmpty());
|
|
|
+ assertEquals(1, startedResources.size());
|
|
|
assertEquals(privRsrcLocalPath,
|
|
|
- privts.getInProgressResources().get(privRsrcProto));
|
|
|
+ startedResources.get(privRsrcProto));
|
|
|
assertEquals(1, rur.getAppTrackerStates().size());
|
|
|
appts = rur.getAppTrackerStates().get(appId);
|
|
|
assertNotNull(appts);
|
|
|
- assertTrue(appts.getLocalizedResources().isEmpty());
|
|
|
- assertEquals(1, appts.getInProgressResources().size());
|
|
|
+ completedResources = loadCompletedResources(
|
|
|
+ appts.getCompletedResourcesIterator());
|
|
|
+ startedResources = loadStartedResources(
|
|
|
+ appts.getStartedResourcesIterator());
|
|
|
+ assertTrue(completedResources.isEmpty());
|
|
|
+ assertEquals(1, startedResources.size());
|
|
|
assertEquals(appRsrcLocalPath,
|
|
|
- appts.getInProgressResources().get(appRsrcProto));
|
|
|
+ startedResources.get(appRsrcProto));
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -637,27 +795,44 @@ public class TestNMLeveldbStateStoreService {
|
|
|
.build();
|
|
|
stateStore.finishResourceLocalization(user, appId, appLocalizedProto);
|
|
|
|
|
|
+ List<LocalizedResourceProto> completedResources =
|
|
|
+ new ArrayList<LocalizedResourceProto>();
|
|
|
+ Map<LocalResourceProto, Path> startedResources =
|
|
|
+ new HashMap<LocalResourceProto, Path>();
|
|
|
+
|
|
|
// restart and verify only app resource is completed
|
|
|
restartStateStore();
|
|
|
RecoveredLocalizationState state = stateStore.loadLocalizationState();
|
|
|
LocalResourceTrackerState pubts = state.getPublicTrackerState();
|
|
|
- assertTrue(pubts.getLocalizedResources().isEmpty());
|
|
|
- assertTrue(pubts.getInProgressResources().isEmpty());
|
|
|
+ completedResources = loadCompletedResources(
|
|
|
+ pubts.getCompletedResourcesIterator());
|
|
|
+ startedResources = loadStartedResources(
|
|
|
+ pubts.getStartedResourcesIterator());
|
|
|
+ assertTrue(completedResources.isEmpty());
|
|
|
+ assertTrue(startedResources.isEmpty());
|
|
|
Map<String, RecoveredUserResources> userResources =
|
|
|
loadUserResources(state.getIterator());
|
|
|
assertEquals(1, userResources.size());
|
|
|
RecoveredUserResources rur = userResources.get(user);
|
|
|
LocalResourceTrackerState privts = rur.getPrivateTrackerState();
|
|
|
assertNotNull(privts);
|
|
|
- assertTrue(privts.getLocalizedResources().isEmpty());
|
|
|
- assertTrue(privts.getInProgressResources().isEmpty());
|
|
|
+ completedResources = loadCompletedResources(
|
|
|
+ privts.getCompletedResourcesIterator());
|
|
|
+ startedResources = loadStartedResources(
|
|
|
+ privts.getStartedResourcesIterator());
|
|
|
+ assertTrue(completedResources.isEmpty());
|
|
|
+ assertTrue(startedResources.isEmpty());
|
|
|
assertEquals(1, rur.getAppTrackerStates().size());
|
|
|
LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId);
|
|
|
assertNotNull(appts);
|
|
|
- assertTrue(appts.getInProgressResources().isEmpty());
|
|
|
- assertEquals(1, appts.getLocalizedResources().size());
|
|
|
+ completedResources = loadCompletedResources(
|
|
|
+ appts.getCompletedResourcesIterator());
|
|
|
+ startedResources = loadStartedResources(
|
|
|
+ appts.getStartedResourcesIterator());
|
|
|
+ assertTrue(startedResources.isEmpty());
|
|
|
+ assertEquals(1, completedResources.size());
|
|
|
assertEquals(appLocalizedProto,
|
|
|
- appts.getLocalizedResources().iterator().next());
|
|
|
+ completedResources.iterator().next());
|
|
|
|
|
|
// start some public and private resources
|
|
|
Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
|
|
@@ -708,28 +883,40 @@ public class TestNMLeveldbStateStoreService {
|
|
|
restartStateStore();
|
|
|
state = stateStore.loadLocalizationState();
|
|
|
pubts = state.getPublicTrackerState();
|
|
|
- assertEquals(1, pubts.getLocalizedResources().size());
|
|
|
+ completedResources = loadCompletedResources(
|
|
|
+ pubts.getCompletedResourcesIterator());
|
|
|
+ startedResources = loadStartedResources(
|
|
|
+ pubts.getStartedResourcesIterator());
|
|
|
+ assertEquals(1, completedResources.size());
|
|
|
assertEquals(pubLocalizedProto1,
|
|
|
- pubts.getLocalizedResources().iterator().next());
|
|
|
- assertEquals(1, pubts.getInProgressResources().size());
|
|
|
+ completedResources.iterator().next());
|
|
|
+ assertEquals(1, startedResources.size());
|
|
|
assertEquals(pubRsrcLocalPath2,
|
|
|
- pubts.getInProgressResources().get(pubRsrcProto2));
|
|
|
+ startedResources.get(pubRsrcProto2));
|
|
|
userResources = loadUserResources(state.getIterator());
|
|
|
assertEquals(1, userResources.size());
|
|
|
rur = userResources.get(user);
|
|
|
privts = rur.getPrivateTrackerState();
|
|
|
assertNotNull(privts);
|
|
|
- assertEquals(1, privts.getLocalizedResources().size());
|
|
|
+ completedResources = loadCompletedResources(
|
|
|
+ privts.getCompletedResourcesIterator());
|
|
|
+ startedResources = loadStartedResources(
|
|
|
+ privts.getStartedResourcesIterator());
|
|
|
+ assertEquals(1, completedResources.size());
|
|
|
assertEquals(privLocalizedProto,
|
|
|
- privts.getLocalizedResources().iterator().next());
|
|
|
- assertTrue(privts.getInProgressResources().isEmpty());
|
|
|
+ completedResources.iterator().next());
|
|
|
+ assertTrue(startedResources.isEmpty());
|
|
|
assertEquals(1, rur.getAppTrackerStates().size());
|
|
|
appts = rur.getAppTrackerStates().get(appId);
|
|
|
assertNotNull(appts);
|
|
|
- assertTrue(appts.getInProgressResources().isEmpty());
|
|
|
- assertEquals(1, appts.getLocalizedResources().size());
|
|
|
+ completedResources = loadCompletedResources(
|
|
|
+ appts.getCompletedResourcesIterator());
|
|
|
+ startedResources = loadStartedResources(
|
|
|
+ appts.getStartedResourcesIterator());
|
|
|
+ assertTrue(startedResources.isEmpty());
|
|
|
+ assertEquals(1, completedResources.size());
|
|
|
assertEquals(appLocalizedProto,
|
|
|
- appts.getLocalizedResources().iterator().next());
|
|
|
+ completedResources.iterator().next());
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -817,10 +1004,14 @@ public class TestNMLeveldbStateStoreService {
|
|
|
restartStateStore();
|
|
|
RecoveredLocalizationState state = stateStore.loadLocalizationState();
|
|
|
LocalResourceTrackerState pubts = state.getPublicTrackerState();
|
|
|
- assertTrue(pubts.getInProgressResources().isEmpty());
|
|
|
- assertEquals(1, pubts.getLocalizedResources().size());
|
|
|
+ List<LocalizedResourceProto> completedResources =
|
|
|
+ loadCompletedResources(pubts.getCompletedResourcesIterator());
|
|
|
+ Map<LocalResourceProto, Path> startedResources =
|
|
|
+ loadStartedResources(pubts.getStartedResourcesIterator());
|
|
|
+ assertTrue(startedResources.isEmpty());
|
|
|
+ assertEquals(1, completedResources.size());
|
|
|
assertEquals(pubLocalizedProto1,
|
|
|
- pubts.getLocalizedResources().iterator().next());
|
|
|
+ completedResources.iterator().next());
|
|
|
Map<String, RecoveredUserResources> userResources =
|
|
|
loadUserResources(state.getIterator());
|
|
|
assertTrue(userResources.isEmpty());
|