|
@@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileContext;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
@@ -427,7 +428,7 @@ public class TestLocalResourcesTrackerImpl {
|
|
|
// Simulate the process of localization of lr1
|
|
|
// NOTE: Localization path from tracker has resource ID at end
|
|
|
Path hierarchicalPath1 =
|
|
|
- tracker.getPathForLocalization(lr1, localDir).getParent();
|
|
|
+ tracker.getPathForLocalization(lr1, localDir, null).getParent();
|
|
|
// Simulate lr1 getting localized
|
|
|
ResourceLocalizedEvent rle1 =
|
|
|
new ResourceLocalizedEvent(lr1,
|
|
@@ -444,7 +445,7 @@ public class TestLocalResourcesTrackerImpl {
|
|
|
tracker.handle(reqEvent2);
|
|
|
|
|
|
Path hierarchicalPath2 =
|
|
|
- tracker.getPathForLocalization(lr2, localDir).getParent();
|
|
|
+ tracker.getPathForLocalization(lr2, localDir, null).getParent();
|
|
|
// localization failed.
|
|
|
ResourceFailedLocalizationEvent rfe2 =
|
|
|
new ResourceFailedLocalizationEvent(
|
|
@@ -463,7 +464,7 @@ public class TestLocalResourcesTrackerImpl {
|
|
|
LocalResourceVisibility.PUBLIC, lc1);
|
|
|
tracker.handle(reqEvent3);
|
|
|
Path hierarchicalPath3 =
|
|
|
- tracker.getPathForLocalization(lr3, localDir).getParent();
|
|
|
+ tracker.getPathForLocalization(lr3, localDir, null).getParent();
|
|
|
// localization successful
|
|
|
ResourceLocalizedEvent rle3 =
|
|
|
new ResourceLocalizedEvent(lr3, new Path(hierarchicalPath3.toUri()
|
|
@@ -542,7 +543,8 @@ public class TestLocalResourcesTrackerImpl {
|
|
|
dispatcher.await();
|
|
|
|
|
|
// Simulate the process of localization of lr1
|
|
|
- Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
|
|
|
+ Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir,
|
|
|
+ null);
|
|
|
|
|
|
ArgumentCaptor<LocalResourceProto> localResourceCaptor =
|
|
|
ArgumentCaptor.forClass(LocalResourceProto.class);
|
|
@@ -622,7 +624,8 @@ public class TestLocalResourcesTrackerImpl {
|
|
|
dispatcher.await();
|
|
|
|
|
|
// Simulate the process of localization of lr1
|
|
|
- Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
|
|
|
+ Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir,
|
|
|
+ null);
|
|
|
|
|
|
ArgumentCaptor<LocalResourceProto> localResourceCaptor =
|
|
|
ArgumentCaptor.forClass(LocalResourceProto.class);
|
|
@@ -691,7 +694,8 @@ public class TestLocalResourcesTrackerImpl {
|
|
|
LocalResourceVisibility.APPLICATION, lc2);
|
|
|
tracker.handle(reqEvent2);
|
|
|
dispatcher.await();
|
|
|
- Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir);
|
|
|
+ Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir,
|
|
|
+ null);
|
|
|
long localizedId2 = Long.parseLong(hierarchicalPath2.getName());
|
|
|
Assert.assertEquals(localizedId1 + 1, localizedId2);
|
|
|
} finally {
|
|
@@ -785,6 +789,49 @@ public class TestLocalResourcesTrackerImpl {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ public void testGetPathForLocalization() throws Exception {
|
|
|
+ FileContext lfs = FileContext.getLocalFSFileContext();
|
|
|
+ Path base_path = new Path("target",
|
|
|
+ TestLocalResourcesTrackerImpl.class.getSimpleName());
|
|
|
+ final String user = "someuser";
|
|
|
+ final ApplicationId appId = ApplicationId.newInstance(1, 1);
|
|
|
+ Configuration conf = new YarnConfiguration();
|
|
|
+ DrainDispatcher dispatcher = null;
|
|
|
+ dispatcher = createDispatcher(conf);
|
|
|
+ EventHandler<LocalizerEvent> localizerEventHandler =
|
|
|
+ mock(EventHandler.class);
|
|
|
+ EventHandler<LocalizerEvent> containerEventHandler =
|
|
|
+ mock(EventHandler.class);
|
|
|
+ dispatcher.register(LocalizerEventType.class, localizerEventHandler);
|
|
|
+ dispatcher.register(ContainerEventType.class, containerEventHandler);
|
|
|
+ NMStateStoreService stateStore = mock(NMStateStoreService.class);
|
|
|
+ DeletionService delService = mock(DeletionService.class);
|
|
|
+ try {
|
|
|
+ LocalResourceRequest req1 = createLocalResourceRequest(user, 1, 1,
|
|
|
+ LocalResourceVisibility.PUBLIC);
|
|
|
+ LocalizedResource lr1 = createLocalizedResource(req1, dispatcher);
|
|
|
+ ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
|
|
|
+ new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
|
|
|
+ localrsrc.put(req1, lr1);
|
|
|
+ LocalResourcesTrackerImpl tracker = new LocalResourcesTrackerImpl(user,
|
|
|
+ appId, dispatcher, localrsrc, true, conf, stateStore, null);
|
|
|
+ Path conflictPath = new Path(base_path, "10");
|
|
|
+ Path qualifiedConflictPath = lfs.makeQualified(conflictPath);
|
|
|
+ lfs.mkdir(qualifiedConflictPath, null, true);
|
|
|
+ Path rPath = tracker.getPathForLocalization(req1, base_path,
|
|
|
+ delService);
|
|
|
+ Assert.assertFalse(lfs.util().exists(rPath));
|
|
|
+ verify(delService, times(1)).delete(eq(user), eq(conflictPath));
|
|
|
+ } finally {
|
|
|
+ lfs.delete(base_path, true);
|
|
|
+ if (dispatcher != null) {
|
|
|
+ dispatcher.stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@SuppressWarnings("unchecked")
|
|
|
@Test
|
|
|
public void testResourcePresentInGoodDir() throws IOException {
|
|
@@ -832,8 +879,10 @@ public class TestLocalResourcesTrackerImpl {
|
|
|
tracker.handle(req21Event);
|
|
|
dispatcher.await();
|
|
|
// Localize resource1
|
|
|
- Path p1 = tracker.getPathForLocalization(req1, new Path("/tmp/somedir1"));
|
|
|
- Path p2 = tracker.getPathForLocalization(req2, new Path("/tmp/somedir2"));
|
|
|
+ Path p1 = tracker.getPathForLocalization(req1,
|
|
|
+ new Path("/tmp/somedir1"), null);
|
|
|
+ Path p2 = tracker.getPathForLocalization(req2,
|
|
|
+ new Path("/tmp/somedir2"), null);
|
|
|
ResourceLocalizedEvent rle1 = new ResourceLocalizedEvent(req1, p1, 1);
|
|
|
tracker.handle(rle1);
|
|
|
ResourceLocalizedEvent rle2 = new ResourceLocalizedEvent(req2, p2, 1);
|