浏览代码

YARN-8649. NPE in localizer hearbeat processing if a container is killed while localizing. Contributed by lujie

Jason Lowe 6 年之前
父节点
当前提交
585ebd873a

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java

@@ -500,6 +500,11 @@ class LocalResourcesTrackerImpl implements LocalResourcesTracker {
 
 
     Path localPath = new Path(rPath, req.getPath().getName());
     Path localPath = new Path(rPath, req.getPath().getName());
     LocalizedResource rsrc = localrsrc.get(req);
     LocalizedResource rsrc = localrsrc.get(req);
+    if (rsrc == null) {
+      LOG.warn("Resource " + req + " has been removed"
+          + " and will no longer be localized");
+      return null;
+    }
     rsrc.setLocalPath(localPath);
     rsrc.setLocalPath(localPath);
     LocalResource lr = LocalResource.newInstance(req.getResource(),
     LocalResource lr = LocalResource.newInstance(req.getResource(),
         req.getType(), req.getVisibility(), req.getSize(),
         req.getType(), req.getVisibility(), req.getSize(),

+ 8 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java

@@ -885,6 +885,9 @@ public class ResourceLocalizationService extends CompositeService
             Path publicDirDestPath =
             Path publicDirDestPath =
                 publicRsrc.getPathForLocalization(key, publicRootPath,
                 publicRsrc.getPathForLocalization(key, publicRootPath,
                     delService);
                     delService);
+            if (publicDirDestPath == null) {
+              return;
+            }
             if (!publicDirDestPath.getParent().equals(publicRootPath)) {
             if (!publicDirDestPath.getParent().equals(publicRootPath)) {
               createParentDirs(publicDirDestPath, publicRootPath);
               createParentDirs(publicDirDestPath, publicRootPath);
               if (diskValidator != null) {
               if (diskValidator != null) {
@@ -1175,10 +1178,11 @@ public class ResourceLocalizationService extends CompositeService
           LocalResourcesTracker tracker = getLocalResourcesTracker(
           LocalResourcesTracker tracker = getLocalResourcesTracker(
               next.getVisibility(), user, applicationId);
               next.getVisibility(), user, applicationId);
           if (tracker != null) {
           if (tracker != null) {
-            ResourceLocalizationSpec resource =
-                NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
-                getPathForLocalization(next, tracker));
-            rsrcs.add(resource);
+            Path localPath = getPathForLocalization(next, tracker);
+            if (localPath != null) {
+              rsrcs.add(NodeManagerBuilderUtils.newResourceLocalizationSpec(
+                  next, localPath));
+            }
           }
           }
         } catch (IOException e) {
         } catch (IOException e) {
           LOG.error("local path for PRIVATE localization could not be " +
           LOG.error("local path for PRIVATE localization could not be " +

+ 11 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

@@ -1717,8 +1717,18 @@ public class TestResourceLocalizationService {
       assertEquals("NM should tell localizer to be LIVE in Heartbeat.",
       assertEquals("NM should tell localizer to be LIVE in Heartbeat.",
           LocalizerAction.LIVE, response.getLocalizerAction());
           LocalizerAction.LIVE, response.getLocalizerAction());
 
 
-      // Cleanup application.
+      // Cleanup container.
       spyService.handle(new ContainerLocalizationCleanupEvent(c, rsrcs));
       spyService.handle(new ContainerLocalizationCleanupEvent(c, rsrcs));
+      dispatcher.await();
+      try {
+        /*Directly send heartbeat to introduce race as container
+          is being cleaned up.*/
+        locRunnerForContainer.processHeartbeat(
+              Collections.singletonList(rsrcSuccess));
+      } catch (Exception e) {
+        fail("Exception should not have been thrown on processing heartbeat");
+      }
+      // Cleanup application.
       spyService.handle(new ApplicationLocalizationEvent(
       spyService.handle(new ApplicationLocalizationEvent(
           LocalizationEventType.DESTROY_APPLICATION_RESOURCES, app));
           LocalizationEventType.DESTROY_APPLICATION_RESOURCES, app));
       dispatcher.await();
       dispatcher.await();