Browse Source

YARN-4355. NPE while processing localizer heartbeat. Contributed by Varun Saxena & Jonathan Hung.

Naganarasimha 8 years ago
parent
commit
7ffb9943b8

+ 23 - 19
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java

@@ -1036,7 +1036,6 @@ public class ResourceLocalizationService extends CompositeService
         List<LocalResourceStatus> remoteResourceStatuses) {
       LocalizerHeartbeatResponse response =
         recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
-
       String user = context.getUser();
       ApplicationId applicationId =
           context.getContainerId().getApplicationAttemptId().getApplicationId();
@@ -1059,14 +1058,19 @@ public class ResourceLocalizationService extends CompositeService
           LOG.error("Unknown resource reported: " + req);
           continue;
         }
+        LocalResourcesTracker tracker =
+            getLocalResourcesTracker(req.getVisibility(), user, applicationId);
+        if (tracker == null) {
+          // This is likely due to a race between heartbeat and
+          // app cleaning up.
+          continue;
+        }
         switch (stat.getStatus()) {
           case FETCH_SUCCESS:
             // notify resource
             try {
-            getLocalResourcesTracker(req.getVisibility(), user, applicationId)
-              .handle(
-                new ResourceLocalizedEvent(req, stat.getLocalPath().toPath(),
-                    stat.getLocalSize()));
+              tracker.handle(new ResourceLocalizedEvent(req,
+                  stat.getLocalPath().toPath(), stat.getLocalSize()));
             } catch (URISyntaxException e) { }
 
             // unlocking the resource and removing it from scheduled resource
@@ -1080,9 +1084,8 @@ public class ResourceLocalizationService extends CompositeService
             final String diagnostics = stat.getException().toString();
             LOG.warn(req + " failed: " + diagnostics);
             fetchFailed = true;
-            getLocalResourcesTracker(req.getVisibility(), user, applicationId)
-              .handle(new ResourceFailedLocalizationEvent(
-                  req, diagnostics));
+            tracker.handle(new ResourceFailedLocalizationEvent(req,
+                diagnostics));
 
             // unlocking the resource and removing it from scheduled resource
             // list
@@ -1092,9 +1095,8 @@ public class ResourceLocalizationService extends CompositeService
           default:
             LOG.info("Unknown status: " + stat.getStatus());
             fetchFailed = true;
-            getLocalResourcesTracker(req.getVisibility(), user, applicationId)
-              .handle(new ResourceFailedLocalizationEvent(
-                  req, stat.getException().getMessage()));
+            tracker.handle(new ResourceFailedLocalizationEvent(req,
+                stat.getException().getMessage()));
             break;
         }
       }
@@ -1114,10 +1116,14 @@ public class ResourceLocalizationService extends CompositeService
       LocalResource next = findNextResource();
       if (next != null) {
         try {
-          ResourceLocalizationSpec resource =
-              NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
-                getPathForLocalization(next));
-          rsrcs.add(resource);
+          LocalResourcesTracker tracker = getLocalResourcesTracker(
+              next.getVisibility(), user, applicationId);
+          if (tracker != null) {
+            ResourceLocalizationSpec resource =
+                NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
+                getPathForLocalization(next, tracker));
+            rsrcs.add(resource);
+          }
         } catch (IOException e) {
           LOG.error("local path for PRIVATE localization could not be " +
             "found. Disks might have failed.", e);
@@ -1136,14 +1142,12 @@ public class ResourceLocalizationService extends CompositeService
       return response;
     }
 
-    private Path getPathForLocalization(LocalResource rsrc) throws IOException,
-        URISyntaxException {
+    private Path getPathForLocalization(LocalResource rsrc,
+        LocalResourcesTracker tracker) throws IOException, URISyntaxException {
       String user = context.getUser();
       ApplicationId appId =
           context.getContainerId().getApplicationAttemptId().getApplicationId();
       LocalResourceVisibility vis = rsrc.getVisibility();
-      LocalResourcesTracker tracker =
-          getLocalResourcesTracker(vis, user, appId);
       String cacheDirectory = null;
       if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only
         cacheDirectory = getUserFileCachePath(user);

+ 109 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyLong;
@@ -147,7 +148,6 @@ import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecret
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -1482,6 +1482,114 @@ public class TestResourceLocalizationService {
     }
   }
 
+  @Test(timeout = 20000)
+  @SuppressWarnings("unchecked")
+  public void testLocalizerHeartbeatWhenAppCleaningUp() throws Exception {
+    conf.set(YarnConfiguration.NM_LOCAL_DIRS,
+        lfs.makeQualified(new Path(basedir, 0 + "")).toString());
+    // Start dispatcher.
+    DrainDispatcher dispatcher = new DrainDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    dispatcher.register(ApplicationEventType.class, mock(EventHandler.class));
+    dispatcher.register(ContainerEventType.class, mock(EventHandler.class));
+
+    DummyExecutor exec = new DummyExecutor();
+    LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+    dirsHandler.init(conf);
+    // Start resource localization service.
+    ResourceLocalizationService rawService = new ResourceLocalizationService(
+        dispatcher, exec, mock(DeletionService.class), dirsHandler, nmContext);
+    ResourceLocalizationService spyService = spy(rawService);
+    doReturn(mockServer).when(spyService).createServer();
+    doReturn(lfs).when(spyService).
+        getLocalFileContext(isA(Configuration.class));
+    try {
+      spyService.init(conf);
+      spyService.start();
+
+      // Init application resources.
+      final Application app = mock(Application.class);
+      final ApplicationId appId = BuilderUtils.newApplicationId(1234567890L, 3);
+      when(app.getUser()).thenReturn("user0");
+      when(app.getAppId()).thenReturn(appId);
+      when(app.toString()).thenReturn(appId.toString());
+      spyService.handle(new ApplicationLocalizationEvent(
+          LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
+      dispatcher.await();
+
+      // Initialize localizer.
+      Random r = new Random();
+      long seed = r.nextLong();
+      System.out.println("SEED: " + seed);
+      r.setSeed(seed);
+      final Container c = getMockContainer(appId, 46, "user0");
+      FSDataOutputStream out =
+          new FSDataOutputStream(new DataOutputBuffer(), null);
+      doReturn(out).when(spylfs).createInternal(isA(Path.class),
+          isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
+          anyLong(), isA(Progressable.class), isA(ChecksumOpt.class),
+          anyBoolean());
+      final LocalResource resource1 = getAppMockedResource(r);
+      final LocalResource resource2 = getAppMockedResource(r);
+
+      // Send localization requests for container.
+      // 2 resources generated with APPLICATION visibility.
+      final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
+      final LocalResourceRequest req2 = new LocalResourceRequest(resource2);
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
+          new HashMap<LocalResourceVisibility,
+              Collection<LocalResourceRequest>>();
+      List<LocalResourceRequest> appResourceList = Arrays.asList(req1, req2);
+      rsrcs.put(LocalResourceVisibility.APPLICATION, appResourceList);
+      spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
+      dispatcher.await();
+
+      // Wait for localization to begin.
+      exec.waitForLocalizers(1);
+      final String containerIdStr = c.getContainerId().toString();
+      LocalizerRunner locRunnerForContainer =
+          spyService.getLocalizerRunner(containerIdStr);
+      // Heartbeats from container localizer
+      LocalResourceStatus rsrcSuccess = mock(LocalResourceStatus.class);
+      LocalizerStatus stat = mock(LocalizerStatus.class);
+      when(stat.getLocalizerId()).thenReturn(containerIdStr);
+      when(rsrcSuccess.getResource()).thenReturn(resource1);
+      when(rsrcSuccess.getLocalSize()).thenReturn(4344L);
+      when(rsrcSuccess.getLocalPath()).thenReturn(getPath("/some/path"));
+      when(rsrcSuccess.getStatus()).
+          thenReturn(ResourceStatusType.FETCH_SUCCESS);
+      when(stat.getResources()).
+          thenReturn(Collections.<LocalResourceStatus>emptyList());
+
+      // First heartbeat which schedules first resource.
+      LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
+      assertEquals("NM should tell localizer to be LIVE in Heartbeat.",
+          LocalizerAction.LIVE, response.getLocalizerAction());
+
+      // Cleanup application.
+      spyService.handle(new ContainerLocalizationCleanupEvent(c, rsrcs));
+      spyService.handle(new ApplicationLocalizationEvent(
+          LocalizationEventType.DESTROY_APPLICATION_RESOURCES, app));
+      dispatcher.await();
+      try {
+        // Directly send heartbeat to introduce race as app is being cleaned up.
+        locRunnerForContainer.processHeartbeat(
+            Collections.singletonList(rsrcSuccess));
+      } catch (Exception e) {
+        fail("Exception should not have been thrown on processing heartbeat");
+      }
+      // Send another heartbeat.
+      response = spyService.heartbeat(stat);
+      assertEquals("NM should tell localizer to DIE in Heartbeat.",
+          LocalizerAction.DIE, response.getLocalizerAction());
+      exec.setStopLocalization();
+    } finally {
+      spyService.stop();
+      dispatcher.stop();
+    }
+  }
+
   @Test(timeout=20000)
   @SuppressWarnings("unchecked") // mocked generics
   public void testFailedPublicResource() throws Exception {