Преглед изворни кода

YARN-3024. LocalizerRunner should give DIE action when all resources are
localized. Contributed by Chengbing Liu

(cherry picked from commit 0d6bd62102f94c55d59f7a0a86a684e99d746127)
(cherry picked from commit a7696b3fbfacd98a892bbb3678663658c7b9d2bd)
(cherry picked from commit 9e30232004ab7c3c3bfde3b8b27c37fa7065f6be)

Xuan пре 10 година
родитељ
комит
9af5b1dcd0

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -129,6 +129,9 @@ Release 2.6.1 - UNRELEASED
     YARN-3487. CapacityScheduler scheduler lock obtained unnecessarily when 
     calling getQueue (Jason Lowe via wangda)
 
+    YARN-3024. LocalizerRunner should give DIE action when all resources are
+    localized. (Chengbing Liu via xgong)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

+ 41 - 64
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java

@@ -763,7 +763,7 @@ public class ResourceLocalizationService extends CompositeService
        */
 
       if (rsrc.tryAcquire()) {
-        if (rsrc.getState().equals(ResourceState.DOWNLOADING)) {
+        if (rsrc.getState() == ResourceState.DOWNLOADING) {
           LocalResource resource = request.getResource().getRequest();
           try {
             Path publicRootPath =
@@ -896,7 +896,7 @@ public class ResourceLocalizationService extends CompositeService
          LocalizedResource nRsrc = evt.getResource();
          // Resource download should take place ONLY if resource is in
          // Downloading state
-         if (!ResourceState.DOWNLOADING.equals(nRsrc.getState())) {
+         if (nRsrc.getState() != ResourceState.DOWNLOADING) {
            i.remove();
            continue;
          }
@@ -907,7 +907,7 @@ public class ResourceLocalizationService extends CompositeService
           * 2) Resource is still in DOWNLOADING state
           */
          if (nRsrc.tryAcquire()) {
-           if (nRsrc.getState().equals(ResourceState.DOWNLOADING)) {
+           if (nRsrc.getState() == ResourceState.DOWNLOADING) {
              LocalResourceRequest nextRsrc = nRsrc.getRequest();
              LocalResource next =
                  recordFactory.newRecordInstance(LocalResource.class);
@@ -937,41 +937,9 @@ public class ResourceLocalizationService extends CompositeService
       String user = context.getUser();
       ApplicationId applicationId =
           context.getContainerId().getApplicationAttemptId().getApplicationId();
-      // The localizer has just spawned. Start giving it resources for
-      // remote-fetching.
-      if (remoteResourceStatuses.isEmpty()) {
-        LocalResource next = findNextResource();
-        if (next != null) {
-          response.setLocalizerAction(LocalizerAction.LIVE);
-          try {
-            ArrayList<ResourceLocalizationSpec> rsrcs =
-                new ArrayList<ResourceLocalizationSpec>();
-            ResourceLocalizationSpec rsrc =
-                NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
-                  getPathForLocalization(next));
-            rsrcs.add(rsrc);
-            response.setResourceSpecs(rsrcs);
-          } catch (IOException e) {
-            LOG.error("local path for PRIVATE localization could not be found."
-                + "Disks might have failed.", e);
-          } catch (URISyntaxException e) {
-            // TODO fail? Already translated several times...
-          }
-        } else if (pending.isEmpty()) {
-          // TODO: Synchronization
-          response.setLocalizerAction(LocalizerAction.DIE);
-        } else {
-          response.setLocalizerAction(LocalizerAction.LIVE);
-        }
-        return response;
-      }
-      ArrayList<ResourceLocalizationSpec> rsrcs =
-          new ArrayList<ResourceLocalizationSpec>();
-       /*
-        * TODO : It doesn't support multiple downloads per ContainerLocalizer
-        * at the same time. We need to think whether we should support this.
-        */
 
+      LocalizerAction action = LocalizerAction.LIVE;
+      // Update resource statuses.
       for (LocalResourceStatus stat : remoteResourceStatuses) {
         LocalResource rsrc = stat.getResource();
         LocalResourceRequest req = null;
@@ -1000,33 +968,8 @@ public class ResourceLocalizationService extends CompositeService
             // list
             assoc.getResource().unlock();
             scheduled.remove(req);
-            
-            if (pending.isEmpty()) {
-              // TODO: Synchronization
-              response.setLocalizerAction(LocalizerAction.DIE);
-              break;
-            }
-            response.setLocalizerAction(LocalizerAction.LIVE);
-            LocalResource next = findNextResource();
-            if (next != null) {
-              try {
-                ResourceLocalizationSpec resource =
-                    NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
-                      getPathForLocalization(next));
-                rsrcs.add(resource);
-              } catch (IOException e) {
-                LOG.error("local path for PRIVATE localization could not be " +
-                  "found. Disks might have failed.", e);
-              } catch (IllegalArgumentException e) {
-                LOG.error("Inorrect path for PRIVATE localization."
-                    + next.getResource().getFile(), e);
-              } catch (URISyntaxException e) {
-                  //TODO fail? Already translated several times...
-              }
-            }
             break;
           case FETCH_PENDING:
-            response.setLocalizerAction(LocalizerAction.LIVE);
             break;
           case FETCH_FAILURE:
             final String diagnostics = stat.getException().toString();
@@ -1040,17 +983,51 @@ public class ResourceLocalizationService extends CompositeService
             // list
             assoc.getResource().unlock();
             scheduled.remove(req);
-            
             break;
           default:
             LOG.info("Unknown status: " + stat.getStatus());
-            response.setLocalizerAction(LocalizerAction.DIE);
+            action = LocalizerAction.DIE;
             getLocalResourcesTracker(req.getVisibility(), user, applicationId)
               .handle(new ResourceFailedLocalizationEvent(
                   req, stat.getException().getMessage()));
             break;
         }
       }
+      if (action == LocalizerAction.DIE) {
+        response.setLocalizerAction(action);
+        return response;
+      }
+
+      // Give the localizer resources for remote-fetching.
+      List<ResourceLocalizationSpec> rsrcs =
+          new ArrayList<ResourceLocalizationSpec>();
+
+      /*
+       * TODO : It doesn't support multiple downloads per ContainerLocalizer
+       * at the same time. We need to think whether we should support this.
+       */
+      LocalResource next = findNextResource();
+      if (next != null) {
+        try {
+          ResourceLocalizationSpec resource =
+              NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
+                getPathForLocalization(next));
+          rsrcs.add(resource);
+        } catch (IOException e) {
+          LOG.error("local path for PRIVATE localization could not be " +
+            "found. Disks might have failed.", e);
+        } catch (IllegalArgumentException e) {
+          LOG.error("Inorrect path for PRIVATE localization."
+              + next.getResource().getFile(), e);
+        } catch (URISyntaxException e) {
+            //TODO fail? Already translated several times...
+        }
+      } else if (pending.isEmpty()) {
+        // TODO: Synchronization
+        action = LocalizerAction.DIE;
+      }
+
+      response.setLocalizerAction(action);
       response.setResourceSpecs(rsrcs);
       return response;
     }

+ 50 - 21
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

@@ -832,10 +832,16 @@ public class TestResourceLocalizationService {
       do {
         resource2 = getPrivateMockedResource(r);
       } while (resource2 == null || resource2.equals(resource1));
+      LocalResource resource3 = null;
+      do {
+        resource3 = getPrivateMockedResource(r);
+      } while (resource3 == null || resource3.equals(resource1)
+          || resource3.equals(resource2));
       // above call to make sure we don't get identical resources.
       
       final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
       final LocalResourceRequest req2 = new LocalResourceRequest(resource2);
+      final LocalResourceRequest req3 = new LocalResourceRequest(resource3);
       Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
         new HashMap<LocalResourceVisibility, 
                     Collection<LocalResourceRequest>>();
@@ -843,6 +849,7 @@ public class TestResourceLocalizationService {
           new ArrayList<LocalResourceRequest>();
       privateResourceList.add(req1);
       privateResourceList.add(req2);
+      privateResourceList.add(req3);
       rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
       spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
       // Sigh. Thread init of private localizer not accessible
@@ -857,30 +864,47 @@ public class TestResourceLocalizationService {
       Path localizationTokenPath = tokenPathCaptor.getValue();
 
       // heartbeat from localizer
-      LocalResourceStatus rsrcStat1 = mock(LocalResourceStatus.class);
-      LocalResourceStatus rsrcStat2 = mock(LocalResourceStatus.class);
+      LocalResourceStatus rsrc1success = mock(LocalResourceStatus.class);
+      LocalResourceStatus rsrc2pending = mock(LocalResourceStatus.class);
+      LocalResourceStatus rsrc2success = mock(LocalResourceStatus.class);
+      LocalResourceStatus rsrc3success = mock(LocalResourceStatus.class);
       LocalizerStatus stat = mock(LocalizerStatus.class);
       when(stat.getLocalizerId()).thenReturn(ctnrStr);
-      when(rsrcStat1.getResource()).thenReturn(resource1);
-      when(rsrcStat2.getResource()).thenReturn(resource2);
-      when(rsrcStat1.getLocalSize()).thenReturn(4344L);
-      when(rsrcStat2.getLocalSize()).thenReturn(2342L);
+      when(rsrc1success.getResource()).thenReturn(resource1);
+      when(rsrc2pending.getResource()).thenReturn(resource2);
+      when(rsrc2success.getResource()).thenReturn(resource2);
+      when(rsrc3success.getResource()).thenReturn(resource3);
+      when(rsrc1success.getLocalSize()).thenReturn(4344L);
+      when(rsrc2success.getLocalSize()).thenReturn(2342L);
+      when(rsrc3success.getLocalSize()).thenReturn(5345L);
       URL locPath = getPath("/cache/private/blah");
-      when(rsrcStat1.getLocalPath()).thenReturn(locPath);
-      when(rsrcStat2.getLocalPath()).thenReturn(locPath);
-      when(rsrcStat1.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
-      when(rsrcStat2.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
+      when(rsrc1success.getLocalPath()).thenReturn(locPath);
+      when(rsrc2success.getLocalPath()).thenReturn(locPath);
+      when(rsrc3success.getLocalPath()).thenReturn(locPath);
+      when(rsrc1success.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
+      when(rsrc2pending.getStatus()).thenReturn(ResourceStatusType.FETCH_PENDING);
+      when(rsrc2success.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
+      when(rsrc3success.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
+
+      // Four heartbeats with sending:
+      // 1 - empty
+      // 2 - resource1 FETCH_SUCCESS
+      // 3 - resource2 FETCH_PENDING
+      // 4 - resource2 FETCH_SUCCESS, resource3 FETCH_SUCCESS
+      List<LocalResourceStatus> rsrcs4 = new ArrayList<LocalResourceStatus>();
+      rsrcs4.add(rsrc2success);
+      rsrcs4.add(rsrc3success);
       when(stat.getResources())
         .thenReturn(Collections.<LocalResourceStatus>emptyList())
-        .thenReturn(Collections.singletonList(rsrcStat1))
-        .thenReturn(Collections.singletonList(rsrcStat2))
-        .thenReturn(Collections.<LocalResourceStatus>emptyList());
+        .thenReturn(Collections.singletonList(rsrc1success))
+        .thenReturn(Collections.singletonList(rsrc2pending))
+        .thenReturn(rsrcs4);
 
       String localPath = Path.SEPARATOR + ContainerLocalizer.USERCACHE +
           Path.SEPARATOR + "user0" + Path.SEPARATOR +
           ContainerLocalizer.FILECACHE;
-      
-      // get first resource
+
+      // First heartbeat
       LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
       assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
       assertEquals(1, response.getResourceSpecs().size());
@@ -893,7 +917,7 @@ public class TestResourceLocalizationService {
       assertTrue(localizedPath.getFile().endsWith(
         localPath + Path.SEPARATOR + "10"));
 
-      // get second resource
+      // Second heartbeat
       response = spyService.heartbeat(stat);
       assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
       assertEquals(1, response.getResourceSpecs().size());
@@ -907,16 +931,21 @@ public class TestResourceLocalizationService {
       assertTrue(localizedPath.getFile().endsWith(
         localPath + Path.SEPARATOR + "0" + Path.SEPARATOR + "11"));
 
-      // empty rsrc
+      // Third heartbeat
       response = spyService.heartbeat(stat);
       assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
-      assertEquals(0, response.getResourceSpecs().size());
+      assertEquals(1, response.getResourceSpecs().size());
+      assertEquals(req3, new LocalResourceRequest(response.getResourceSpecs()
+          .get(0).getResource()));
+      localizedPath =
+          response.getResourceSpecs().get(0).getDestinationDirectory();
+      assertTrue(localizedPath.getFile().endsWith(
+          localPath + Path.SEPARATOR + "1" + Path.SEPARATOR + "12"));
 
       // get shutdown
       response = spyService.heartbeat(stat);
       assertEquals(LocalizerAction.DIE, response.getLocalizerAction());
 
-
       dispatcher.await();
       // verify container notification
       ArgumentMatcher<ContainerEvent> matchesContainerLoc =
@@ -928,8 +957,8 @@ public class TestResourceLocalizationService {
               && c.getContainerId() == evt.getContainerID();
           }
         };
-      // total 2 resource localzation calls. one for each resource.
-      verify(containerBus, times(2)).handle(argThat(matchesContainerLoc));
+      // total 3 resource localzation calls. one for each resource.
+      verify(containerBus, times(3)).handle(argThat(matchesContainerLoc));
         
       // Verify deletion of localization token.
       verify(delService).delete((String)isNull(), eq(localizationTokenPath));