Explorar el Código

YARN-6641. Non-public resource localization on a bad disk causes subsequent containers failure. Contributed by Kuhu Shukla

Jason Lowe hace 8 años
padre
commit
b89d59e21e

+ 5 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java

@@ -92,14 +92,6 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
   private AtomicLong uniqueNumberGenerator = new AtomicLong(9);
   private NMStateStoreService stateStore;
 
-  public LocalResourcesTrackerImpl(String user, ApplicationId appId,
-      Dispatcher dispatcher, boolean useLocalCacheDirectoryManager,
-      Configuration conf, NMStateStoreService stateStore) {
-    this(user, appId, dispatcher,
-        new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(),
-        useLocalCacheDirectoryManager, conf, stateStore, null);
-  }
-
   public LocalResourcesTrackerImpl(String user, ApplicationId appId,
       Dispatcher dispatcher, boolean useLocalCacheDirectoryManager,
       Configuration conf, NMStateStoreService stateStore,
@@ -528,4 +520,9 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
     }
     return mgr;
   }
+
+  @VisibleForTesting
+  LocalDirsHandlerService getDirsHandler() {
+    return dirsHandler;
+  }
 }

+ 6 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java

@@ -293,7 +293,7 @@ public class ResourceLocalizationService extends CompositeService
       trackerState = userResources.getPrivateTrackerState();
       if (!trackerState.isEmpty()) {
         LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
-            null, dispatcher, true, super.getConfig(), stateStore);
+            null, dispatcher, true, super.getConfig(), stateStore, dirsHandler);
         LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user,
             tracker);
         if (oldTracker != null) {
@@ -309,7 +309,8 @@ public class ResourceLocalizationService extends CompositeService
           ApplicationId appId = appEntry.getKey();
           String appIdStr = appId.toString();
           LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
-              appId, dispatcher, false, super.getConfig(), stateStore);
+              appId, dispatcher, false, super.getConfig(), stateStore,
+              dirsHandler);
           LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr,
               tracker);
           if (oldTracker != null) {
@@ -447,10 +448,11 @@ public class ResourceLocalizationService extends CompositeService
     // 0) Create application tracking structs
     String userName = app.getUser();
     privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
-        null, dispatcher, true, super.getConfig(), stateStore));
+        null, dispatcher, true, super.getConfig(), stateStore, dirsHandler));
     String appIdStr = app.getAppId().toString();
     appRsrc.putIfAbsent(appIdStr, new LocalResourcesTrackerImpl(app.getUser(),
-        app.getAppId(), dispatcher, false, super.getConfig(), stateStore));
+        app.getAppId(), dispatcher, false, super.getConfig(), stateStore,
+        dirsHandler));
     // 1) Signal container init
     //
     // This is handled by the ApplicationImpl state machine and allows

+ 4 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java

@@ -529,7 +529,7 @@ public class TestLocalResourcesTrackerImpl {
 
     try {
       LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
-          appId, dispatcher, false, conf, stateStore);
+          appId, dispatcher, false, conf, stateStore, null);
       // Container 1 needs lr1 resource
       ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
       LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
@@ -610,7 +610,7 @@ public class TestLocalResourcesTrackerImpl {
 
     try {
       LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
-          appId, dispatcher, false, conf, stateStore);
+          appId, dispatcher, false, conf, stateStore, null);
       // Container 1 needs lr1 resource
       ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
       LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
@@ -672,7 +672,7 @@ public class TestLocalResourcesTrackerImpl {
 
     try {
       LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
-          appId, dispatcher, false, conf, stateStore);
+          appId, dispatcher, false, conf, stateStore, null);
       // Container 1 needs lr1 resource
       ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
       LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
@@ -725,7 +725,7 @@ public class TestLocalResourcesTrackerImpl {
 
     try {
       LocalResourcesTrackerImpl tracker = new LocalResourcesTrackerImpl(user,
-          appId, dispatcher, true, conf, stateStore);
+          appId, dispatcher, true, conf, stateStore, null);
       LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
           LocalResourceVisibility.PUBLIC);
       Assert.assertNull(tracker.getLocalizedResource(lr1));

+ 71 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

@@ -2766,4 +2766,75 @@ public class TestResourceLocalizationService {
     }
   }
 
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testDirHandler() throws Exception {
+    File f = new File(basedir.toString());
+    String[] sDirs = new String[4];
+    List<Path> localDirs = new ArrayList<Path>(sDirs.length);
+    for (int i = 0; i < 4; ++i) {
+      sDirs[i] = f.getAbsolutePath() + i;
+      localDirs.add(new Path(sDirs[i]));
+    }
+    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+    LocalizerTracker mockLocalizerTracker = mock(LocalizerTracker.class);
+    DrainDispatcher dispatcher = new DrainDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, applicationBus);
+    EventHandler<LocalizerEvent> localizerBus = mock(EventHandler.class);
+    dispatcher.register(LocalizerEventType.class, localizerBus);
+
+    ContainerExecutor exec = mock(ContainerExecutor.class);
+    LocalDirsHandlerService mockDirsHandler =
+        mock(LocalDirsHandlerService.class);
+    doReturn(new ArrayList<String>(Arrays.asList(sDirs))).when(
+        mockDirsHandler).getLocalDirsForCleanup();
+    // setup mocks
+    DeletionService delService = mock(DeletionService.class);
+    ResourceLocalizationService rawService =
+        new ResourceLocalizationService(dispatcher, exec, delService,
+            mockDirsHandler, nmContext);
+    ResourceLocalizationService spyService = spy(rawService);
+    doReturn(mockServer).when(spyService).createServer();
+    doReturn(mockLocalizerTracker).when(spyService).createLocalizerTracker(
+        isA(Configuration.class));
+
+    final String user = "user0";
+    // init application
+    final Application app = mock(Application.class);
+    final ApplicationId appId =
+        BuilderUtils.newApplicationId(314159265358979L, 3);
+    when(app.getUser()).thenReturn(user);
+    when(app.getAppId()).thenReturn(appId);
+    when(app.toString()).thenReturn(appId.toString());
+    try {
+      spyService.init(conf);
+      spyService.start();
+
+      spyService.handle(new ApplicationLocalizationEvent(
+          LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
+      dispatcher.await();
+
+      LocalResourcesTracker appTracker =
+          spyService.getLocalResourcesTracker(
+              LocalResourceVisibility.APPLICATION, user, appId);
+      LocalResourcesTracker privTracker =
+          spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE,
+              user, appId);
+      LocalResourcesTracker pubTracker =
+          spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
+              user, appId);
+      Assert.assertNotNull("dirHandler for appTracker is null!",
+          ((LocalResourcesTrackerImpl)appTracker).getDirsHandler());
+      Assert.assertNotNull("dirHandler for privTracker is null!",
+          ((LocalResourcesTrackerImpl)privTracker).getDirsHandler());
+      Assert.assertNotNull("dirHandler for pubTracker is null!",
+          ((LocalResourcesTrackerImpl)pubTracker).getDirsHandler());
+    } finally {
+      dispatcher.stop();
+      delService.stop();
+    }
+  }
 }