|
@@ -78,6 +78,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|
import org.apache.hadoop.fs.FSError;
|
|
import org.apache.hadoop.fs.FSError;
|
|
import org.apache.hadoop.fs.FileContext;
|
|
import org.apache.hadoop.fs.FileContext;
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
|
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
|
@@ -1533,6 +1534,103 @@ public class TestResourceLocalizationService {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Test
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
|
+ public void testPublicCacheDirPermission() throws Exception {
|
|
|
|
+
|
|
|
|
+ // Setup state to simulate restart NM with existing state meaning no
|
|
|
|
+ // directory creation during initialization
|
|
|
|
+ NMStateStoreService spyStateStore = spy(nmContext.getNMStateStore());
|
|
|
|
+ when(spyStateStore.canRecover()).thenReturn(true);
|
|
|
|
+ NMContext spyContext = spy(nmContext);
|
|
|
|
+ when(spyContext.getNMStateStore()).thenReturn(spyStateStore);
|
|
|
|
+
|
|
|
|
+ Path localDir = new Path("target", "testPublicCacheDirPermission");
|
|
|
|
+ String sDir = lfs.makeQualified(localDir).toString();
|
|
|
|
+
|
|
|
|
+ conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDir);
|
|
|
|
+ conf.setInt(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, 38);
|
|
|
|
+
|
|
|
|
+ DrainDispatcher dispatcher = new DrainDispatcher();
|
|
|
|
+ EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
|
|
|
|
+ dispatcher.register(ApplicationEventType.class, applicationBus);
|
|
|
|
+ EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
|
|
|
|
+ dispatcher.register(ContainerEventType.class, containerBus);
|
|
|
|
+
|
|
|
|
+ ContainerExecutor exec = mock(ContainerExecutor.class);
|
|
|
|
+ DeletionService delService = mock(DeletionService.class);
|
|
|
|
+ LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
|
|
|
|
+ dirsHandler.init(conf);
|
|
|
|
+
|
|
|
|
+ dispatcher.init(conf);
|
|
|
|
+ dispatcher.start();
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ ResourceLocalizationService rawService = new ResourceLocalizationService(
|
|
|
|
+ dispatcher, exec, delService, dirsHandler, spyContext, null);
|
|
|
|
+ ResourceLocalizationService spyService = spy(rawService);
|
|
|
|
+ doReturn(mockServer).when(spyService).createServer();
|
|
|
|
+ doReturn(lfs).when(spyService)
|
|
|
|
+ .getLocalFileContext(isA(Configuration.class));
|
|
|
|
+
|
|
|
|
+ spyService.init(conf);
|
|
|
|
+ spyService.start();
|
|
|
|
+
|
|
|
|
+ final FsPermission expectedPerm = new FsPermission((short) 0755);
|
|
|
|
+ Path publicCache = new Path(localDir, ContainerLocalizer.FILECACHE);
|
|
|
|
+ FsPermission wrongPerm = new FsPermission((short) 0700);
|
|
|
|
+ Path overflowFolder = new Path(publicCache, "0");
|
|
|
|
+ lfs.mkdir(overflowFolder, wrongPerm, false);
|
|
|
|
+
|
|
|
|
+ spyService.lfs.setUMask(new FsPermission((short) 0777));
|
|
|
|
+
|
|
|
|
+ final String user = "user0";
|
|
|
|
+ // init application
|
|
|
|
+ final Application app = mock(Application.class);
|
|
|
|
+ final ApplicationId appId = BuilderUtils
|
|
|
|
+ .newApplicationId(314159265358979L, 3);
|
|
|
|
+ when(app.getUser()).thenReturn(user);
|
|
|
|
+ when(app.getAppId()).thenReturn(appId);
|
|
|
|
+ spyService.handle(new ApplicationLocalizationEvent(
|
|
|
|
+ LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
|
|
|
|
+ dispatcher.await();
|
|
|
|
+
|
|
|
|
+ // init container.
|
|
|
|
+ final Container c = getMockContainer(appId, 42, user);
|
|
|
|
+
|
|
|
|
+ // init resources
|
|
|
|
+ Random r = new Random();
|
|
|
|
+ long seed = r.nextLong();
|
|
|
|
+ System.out.println("SEED: " + seed);
|
|
|
|
+ r.setSeed(seed);
|
|
|
|
+
|
|
|
|
+ Set<LocalResourceRequest> pubRsrcs = new HashSet<LocalResourceRequest>();
|
|
|
|
+ for (int i = 0; i < 3; i++) {
|
|
|
|
+ LocalResource pubResource = getPublicMockedResource(r, true, conf,
|
|
|
|
+ sDir);
|
|
|
|
+ LocalResourceRequest pubReq = new LocalResourceRequest(pubResource);
|
|
|
|
+ pubRsrcs.add(pubReq);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
|
|
|
|
+ new HashMap<LocalResourceVisibility,
|
|
|
|
+ Collection<LocalResourceRequest>>();
|
|
|
|
+ req.put(LocalResourceVisibility.PUBLIC, pubRsrcs);
|
|
|
|
+
|
|
|
|
+ spyService.handle(new ContainerLocalizationRequestEvent(c, req));
|
|
|
|
+ dispatcher.await();
|
|
|
|
+
|
|
|
|
+ // verify directory creation
|
|
|
|
+
|
|
|
|
+ Assert.assertEquals(
|
|
|
|
+ "Cache directory permissions filecache/0 is incorrect", expectedPerm,
|
|
|
|
+ lfs.getFileStatus(overflowFolder).getPermission());
|
|
|
|
+
|
|
|
|
+ } finally {
|
|
|
|
+ dispatcher.stop();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
@Test(timeout = 20000)
|
|
@Test(timeout = 20000)
|
|
@SuppressWarnings("unchecked")
|
|
@SuppressWarnings("unchecked")
|
|
public void testLocalizerHeartbeatWhenAppCleaningUp() throws Exception {
|
|
public void testLocalizerHeartbeatWhenAppCleaningUp() throws Exception {
|
|
@@ -2488,10 +2586,37 @@ public class TestResourceLocalizationService {
|
|
r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L, false);
|
|
r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L, false);
|
|
return rsrc;
|
|
return rsrc;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ private static LocalResource getMockedResource(Random r,
|
|
|
|
+ LocalResourceVisibility vis, boolean create, Configuration conf,
|
|
|
|
+ String path) {
|
|
|
|
+ String name = Long.toHexString(r.nextLong());
|
|
|
|
+ Path newpath = new Path(path + "/local", name);
|
|
|
|
+ File file = new File(
|
|
|
|
+ Path.getPathWithoutSchemeAndAuthority(newpath).toString());
|
|
|
|
+ try {
|
|
|
|
+ FileSystem.create(FileSystem.get(conf), newpath,
|
|
|
|
+ new FsPermission((short) 0755));
|
|
|
|
+ file.deleteOnExit();
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ // Failed to create test resource
|
|
|
|
+ e.printStackTrace();
|
|
|
|
+ }
|
|
|
|
+ LocalResource mockedResource = BuilderUtils.newLocalResource(
|
|
|
|
+ URL.fromPath(newpath), LocalResourceType.FILE, vis,
|
|
|
|
+ file.getTotalSpace(), file.lastModified(), false);
|
|
|
|
+ return mockedResource;
|
|
|
|
+ }
|
|
|
|
|
|
private static LocalResource getAppMockedResource(Random r) {
|
|
private static LocalResource getAppMockedResource(Random r) {
|
|
return getMockedResource(r, LocalResourceVisibility.APPLICATION);
|
|
return getMockedResource(r, LocalResourceVisibility.APPLICATION);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ private static LocalResource getPublicMockedResource(Random r, boolean create,
|
|
|
|
+ Configuration conf, String path) {
|
|
|
|
+ return getMockedResource(r, LocalResourceVisibility.PUBLIC, create, conf,
|
|
|
|
+ path);
|
|
|
|
+ }
|
|
|
|
|
|
private static LocalResource getPublicMockedResource(Random r) {
|
|
private static LocalResource getPublicMockedResource(Random r) {
|
|
return getMockedResource(r, LocalResourceVisibility.PUBLIC);
|
|
return getMockedResource(r, LocalResourceVisibility.PUBLIC);
|