|
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
|
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
|
|
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
@@ -50,17 +51,17 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
|
|
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
|
|
import org.junit.Test;
|
|
|
-import org.mortbay.log.Log;
|
|
|
|
|
|
public class TestLocalResourcesTrackerImpl {
|
|
|
|
|
|
- @Test
|
|
|
+ @Test(timeout=10000)
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public void test() {
|
|
|
String user = "testuser";
|
|
|
DrainDispatcher dispatcher = null;
|
|
|
try {
|
|
|
- dispatcher = createDispatcher(new Configuration());
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ dispatcher = createDispatcher(conf);
|
|
|
EventHandler<LocalizerEvent> localizerEventHandler =
|
|
|
mock(EventHandler.class);
|
|
|
EventHandler<LocalizerEvent> containerEventHandler =
|
|
@@ -86,7 +87,8 @@ public class TestLocalResourcesTrackerImpl {
|
|
|
localrsrc.put(req1, lr1);
|
|
|
localrsrc.put(req2, lr2);
|
|
|
LocalResourcesTracker tracker =
|
|
|
- new LocalResourcesTrackerImpl(user, dispatcher, localrsrc);
|
|
|
+ new LocalResourcesTrackerImpl(user, dispatcher, localrsrc, false,
|
|
|
+ conf);
|
|
|
|
|
|
ResourceEvent req11Event =
|
|
|
new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1);
|
|
@@ -152,13 +154,14 @@ public class TestLocalResourcesTrackerImpl {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
+ @Test(timeout=10000)
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public void testConsistency() {
|
|
|
String user = "testuser";
|
|
|
DrainDispatcher dispatcher = null;
|
|
|
try {
|
|
|
- dispatcher = createDispatcher(new Configuration());
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ dispatcher = createDispatcher(conf);
|
|
|
EventHandler<LocalizerEvent> localizerEventHandler = mock(EventHandler.class);
|
|
|
EventHandler<LocalizerEvent> containerEventHandler = mock(EventHandler.class);
|
|
|
dispatcher.register(LocalizerEventType.class, localizerEventHandler);
|
|
@@ -172,7 +175,7 @@ public class TestLocalResourcesTrackerImpl {
|
|
|
ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc = new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
|
|
|
localrsrc.put(req1, lr1);
|
|
|
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
|
|
|
- dispatcher, localrsrc);
|
|
|
+ dispatcher, localrsrc, false, conf);
|
|
|
|
|
|
ResourceEvent req11Event = new ResourceRequestEvent(req1,
|
|
|
LocalResourceVisibility.PUBLIC, lc1);
|
|
@@ -221,6 +224,113 @@ public class TestLocalResourcesTrackerImpl {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Test(timeout = 100000)
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ public void testHierarchicalLocalCacheDirectories() {
|
|
|
+ String user = "testuser";
|
|
|
+ DrainDispatcher dispatcher = null;
|
|
|
+ try {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ // setting per directory file limit to 1.
|
|
|
+ conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "37");
|
|
|
+ dispatcher = createDispatcher(conf);
|
|
|
+
|
|
|
+ EventHandler<LocalizerEvent> localizerEventHandler =
|
|
|
+ mock(EventHandler.class);
|
|
|
+ EventHandler<LocalizerEvent> containerEventHandler =
|
|
|
+ mock(EventHandler.class);
|
|
|
+ dispatcher.register(LocalizerEventType.class, localizerEventHandler);
|
|
|
+ dispatcher.register(ContainerEventType.class, containerEventHandler);
|
|
|
+
|
|
|
+ DeletionService mockDelService = mock(DeletionService.class);
|
|
|
+
|
|
|
+ ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
|
|
|
+ new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
|
|
|
+ LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
|
|
|
+ dispatcher, localrsrc, true, conf);
|
|
|
+
|
|
|
+ // This is a random path. NO File creation will take place at this place.
|
|
|
+ Path localDir = new Path("/tmp");
|
|
|
+
|
|
|
+ // Container 1 needs lr1 resource
|
|
|
+ ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
|
|
|
+ LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
|
|
|
+ LocalResourceVisibility.PUBLIC);
|
|
|
+ LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
|
|
|
+
|
|
|
+ // Container 1 requests lr1 to be localized
|
|
|
+ ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1,
|
|
|
+ LocalResourceVisibility.PUBLIC, lc1);
|
|
|
+ tracker.handle(reqEvent1);
|
|
|
+
|
|
|
+ // Simulate the process of localization of lr1
|
|
|
+ Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
|
|
|
+ // Simulate lr1 getting localized
|
|
|
+ ResourceLocalizedEvent rle =
|
|
|
+ new ResourceLocalizedEvent(lr1,
|
|
|
+ new Path(hierarchicalPath1.toUri().toString() +
|
|
|
+ Path.SEPARATOR + "file1"), 120);
|
|
|
+ tracker.handle(rle);
|
|
|
+ // Localization successful.
|
|
|
+ tracker.localizationCompleted(lr1, true);
|
|
|
+
|
|
|
+ LocalResourceRequest lr2 = createLocalResourceRequest(user, 3, 3,
|
|
|
+ LocalResourceVisibility.PUBLIC);
|
|
|
+ Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir);
|
|
|
+ // localization failed.
|
|
|
+ tracker.localizationCompleted(lr2, false);
|
|
|
+
|
|
|
+ /*
|
|
|
+ * The path returned for two localization should be different because we
|
|
|
+ * are limiting one file per sub-directory.
|
|
|
+ */
|
|
|
+ Assert.assertNotSame(hierarchicalPath1, hierarchicalPath2);
|
|
|
+
|
|
|
+ LocalResourceRequest lr3 = createLocalResourceRequest(user, 2, 2,
|
|
|
+ LocalResourceVisibility.PUBLIC);
|
|
|
+ ResourceEvent reqEvent3 = new ResourceRequestEvent(lr3,
|
|
|
+ LocalResourceVisibility.PUBLIC, lc1);
|
|
|
+ tracker.handle(reqEvent3);
|
|
|
+ Path hierarchicalPath3 = tracker.getPathForLocalization(lr3, localDir);
|
|
|
+ tracker.localizationCompleted(lr3, true);
|
|
|
+
|
|
|
+ // Verifying that path created is inside the subdirectory
|
|
|
+ Assert.assertEquals(hierarchicalPath3.toUri().toString(),
|
|
|
+ hierarchicalPath1.toUri().toString() + Path.SEPARATOR + "0");
|
|
|
+
|
|
|
+ // Container 1 releases resource lr1
|
|
|
+ ResourceEvent relEvent1 = new ResourceReleaseEvent(lr1, cId1);
|
|
|
+ tracker.handle(relEvent1);
|
|
|
+
|
|
|
+ // Validate the file counts now
|
|
|
+ int resources = 0;
|
|
|
+ Iterator<LocalizedResource> iter = tracker.iterator();
|
|
|
+ while (iter.hasNext()) {
|
|
|
+ iter.next();
|
|
|
+ resources++;
|
|
|
+ }
|
|
|
+ // There should be only two resources lr1 and lr3 now.
|
|
|
+ Assert.assertEquals(2, resources);
|
|
|
+
|
|
|
+ // Now simulate cache cleanup - removes unused resources.
|
|
|
+ iter = tracker.iterator();
|
|
|
+ while (iter.hasNext()) {
|
|
|
+ LocalizedResource rsrc = iter.next();
|
|
|
+ if (rsrc.getRefCount() == 0) {
|
|
|
+ Assert.assertTrue(tracker.remove(rsrc, mockDelService));
|
|
|
+ resources--;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // lr1 is not used by anyone and will be removed, only lr3 will hang
|
|
|
+ // around
|
|
|
+ Assert.assertEquals(1, resources);
|
|
|
+ } finally {
|
|
|
+ if (dispatcher != null) {
|
|
|
+ dispatcher.stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private boolean createdummylocalizefile(Path path) {
|
|
|
boolean ret = false;
|
|
|
File file = new File(path.toUri().getRawPath().toString());
|