Explorar o código

YARN-6004. Refactor TestResourceLocalizationService#testDownloadingResourcesOnContainer so that it is less than 150 lines. (Chris Trezzo via mingma)

Ming Ma %!s(int64=8) %!d(string=hai) anos
pai
achega
7507ccd38a

+ 212 - 164
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

@@ -1124,7 +1124,6 @@ public class TestResourceLocalizationService {
   }
 
   @Test(timeout = 20000)
-  @SuppressWarnings("unchecked")
   public void testDownloadingResourcesOnContainerKill() throws Exception {
     List<Path> localDirs = new ArrayList<Path>();
     String[] sDirs = new String[1];
@@ -1132,13 +1131,6 @@ public class TestResourceLocalizationService {
     sDirs[0] = localDirs.get(0).toString();
 
     conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
-    DrainDispatcher dispatcher = new DrainDispatcher();
-    dispatcher.init(conf);
-    dispatcher.start();
-    EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
-    dispatcher.register(ApplicationEventType.class, applicationBus);
-    EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
-    dispatcher.register(ContainerEventType.class, containerBus);
 
     DummyExecutor exec = new DummyExecutor();
     LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
@@ -1149,6 +1141,7 @@ public class TestResourceLocalizationService {
     delService.init(new Configuration());
     delService.start();
 
+    DrainDispatcher dispatcher = getDispatcher(conf);
     ResourceLocalizationService rawService = new ResourceLocalizationService(
         dispatcher, exec, delService, dirsHandler, nmContext);
     ResourceLocalizationService spyService = spy(rawService);
@@ -1191,180 +1184,235 @@ public class TestResourceLocalizationService {
       spyService.init(conf);
       spyService.start();
 
-      final Application app = mock(Application.class);
-      final ApplicationId appId =
-          BuilderUtils.newApplicationId(314159265358979L, 3);
-      String user = "user0";
-      when(app.getUser()).thenReturn(user);
-      when(app.getAppId()).thenReturn(appId);
-      spyService.handle(new ApplicationLocalizationEvent(
-          LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
-      ArgumentMatcher<ApplicationEvent> matchesAppInit =
+      doLocalization(spyService, dispatcher, exec, delService);
+
+    } finally {
+      spyService.stop();
+      dispatcher.stop();
+      delService.stop();
+    }
+  }
+
+  private DrainDispatcher getDispatcher(Configuration config) {
+    DrainDispatcher dispatcher = new DrainDispatcher();
+    dispatcher.init(config);
+    dispatcher.start();
+    return dispatcher;
+  }
+
+  @SuppressWarnings("unchecked")
+  private EventHandler<ApplicationEvent> getApplicationBus(
+      DrainDispatcher dispatcher) {
+    EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, applicationBus);
+    return applicationBus;
+  }
+
+  @SuppressWarnings("unchecked")
+  private EventHandler<ContainerEvent> getContainerBus(
+      DrainDispatcher dispatcher) {
+    EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+    dispatcher.register(ContainerEventType.class, containerBus);
+    return containerBus;
+  }
+
+  private void initApp(ResourceLocalizationService spyService,
+      EventHandler<ApplicationEvent> applicationBus, Application app,
+      final ApplicationId appId, DrainDispatcher dispatcher) {
+    spyService.handle(new ApplicationLocalizationEvent(
+        LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
+    ArgumentMatcher<ApplicationEvent> matchesAppInit =
         new ArgumentMatcher<ApplicationEvent>() {
           @Override
           public boolean matches(Object o) {
             ApplicationEvent evt = (ApplicationEvent) o;
             return evt.getType() == ApplicationEventType.APPLICATION_INITED
-              && appId == evt.getApplicationID();
+                && appId == evt.getApplicationID();
           }
         };
-      dispatcher.await();
-      verify(applicationBus).handle(argThat(matchesAppInit));
-
-      // Initialize localizer.
-      Random r = new Random();
-      long seed = r.nextLong();
-      System.out.println("SEED: " + seed);
-      r.setSeed(seed);
-      final Container c1 = getMockContainer(appId, 42, "user0");
-      final Container c2 = getMockContainer(appId, 43, "user0");
-      FSDataOutputStream out =
-        new FSDataOutputStream(new DataOutputBuffer(), null);
-      doReturn(out).when(spylfs).createInternal(isA(Path.class),
-          isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
-          anyLong(), isA(Progressable.class), isA(ChecksumOpt.class),
-          anyBoolean());
-      final LocalResource resource1 = getPrivateMockedResource(r);
-      LocalResource resource2 = null;
-      do {
-        resource2 = getPrivateMockedResource(r);
-      } while (resource2 == null || resource2.equals(resource1));
-      LocalResource resource3 = null;
-      do {
-        resource3 = getPrivateMockedResource(r);
-      } while (resource3 == null || resource3.equals(resource1)
-          || resource3.equals(resource2));
+    dispatcher.await();
+    verify(applicationBus).handle(argThat(matchesAppInit));
+  }
 
-      // Send localization requests for container c1 and c2.
-      final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
-      final LocalResourceRequest req2 = new LocalResourceRequest(resource2);
-      final LocalResourceRequest req3 = new LocalResourceRequest(resource3);
-      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
+  private void doLocalization(ResourceLocalizationService spyService,
+      DrainDispatcher dispatcher, DummyExecutor exec,
+      DeletionService delService)
+      throws IOException, URISyntaxException, InterruptedException {
+    final Application app = mock(Application.class);
+    final ApplicationId appId =
+        BuilderUtils.newApplicationId(314159265358979L, 3);
+    String user = "user0";
+    when(app.getUser()).thenReturn(user);
+    when(app.getAppId()).thenReturn(appId);
+    List<LocalResource> resources = initializeLocalizer(appId);
+    LocalResource resource1 = resources.get(0);
+    LocalResource resource2 = resources.get(1);
+    LocalResource resource3 = resources.get(2);
+    final Container c1 = getMockContainer(appId, 42, "user0");
+    final Container c2 = getMockContainer(appId, 43, "user0");
+
+    EventHandler<ApplicationEvent> applicationBus =
+        getApplicationBus(dispatcher);
+    EventHandler<ContainerEvent> containerBus = getContainerBus(dispatcher);
+    initApp(spyService, applicationBus, app, appId, dispatcher);
+
+    // Send localization requests for container c1 and c2.
+    final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
+    final LocalResourceRequest req2 = new LocalResourceRequest(resource2);
+    final LocalResourceRequest req3 = new LocalResourceRequest(resource3);
+    Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
         new HashMap<LocalResourceVisibility,
-                    Collection<LocalResourceRequest>>();
-      List<LocalResourceRequest> privateResourceList =
-          new ArrayList<LocalResourceRequest>();
-      privateResourceList.add(req1);
-      privateResourceList.add(req2);
-      privateResourceList.add(req3);
-      rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
-      spyService.handle(new ContainerLocalizationRequestEvent(c1, rsrcs));
-
-      final LocalResourceRequest req1_1 = new LocalResourceRequest(resource2);
-      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs1 =
+            Collection<LocalResourceRequest>>();
+    List<LocalResourceRequest> privateResourceList =
+        new ArrayList<LocalResourceRequest>();
+    privateResourceList.add(req1);
+    privateResourceList.add(req2);
+    privateResourceList.add(req3);
+    rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
+    spyService.handle(new ContainerLocalizationRequestEvent(c1, rsrcs));
+
+    final LocalResourceRequest req11 = new LocalResourceRequest(resource2);
+    Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs1 =
         new HashMap<LocalResourceVisibility,
-                    Collection<LocalResourceRequest>>();
-      List<LocalResourceRequest> privateResourceList1 =
-          new ArrayList<LocalResourceRequest>();
-      privateResourceList1.add(req1_1);
-      rsrcs1.put(LocalResourceVisibility.PRIVATE, privateResourceList1);
-      spyService.handle(new ContainerLocalizationRequestEvent(c2, rsrcs1));
-
-      dispatcher.await();
-      // Wait for localizers of both container c1 and c2 to begin.
-      exec.waitForLocalizers(2);
-      LocalizerRunner locC1 =
-          spyService.getLocalizerRunner(c1.getContainerId().toString());
-      final String containerIdStr = c1.getContainerId().toString();
-      // Heartbeats from container localizer
-      LocalResourceStatus rsrc1success = mock(LocalResourceStatus.class);
-      LocalResourceStatus rsrc2pending = mock(LocalResourceStatus.class);
-      LocalizerStatus stat = mock(LocalizerStatus.class);
-      when(stat.getLocalizerId()).thenReturn(containerIdStr);
-      when(rsrc1success.getResource()).thenReturn(resource1);
-      when(rsrc2pending.getResource()).thenReturn(resource2);
-      when(rsrc1success.getLocalSize()).thenReturn(4344L);
-      URL locPath = getPath("/some/path");
-      when(rsrc1success.getLocalPath()).thenReturn(locPath);
-      when(rsrc1success.getStatus()).
-          thenReturn(ResourceStatusType.FETCH_SUCCESS);
-      when(rsrc2pending.getStatus()).
-          thenReturn(ResourceStatusType.FETCH_PENDING);
-
-      when(stat.getResources())
-        .thenReturn(Collections.<LocalResourceStatus>emptyList())
-        .thenReturn(Collections.singletonList(rsrc1success))
-        .thenReturn(Collections.singletonList(rsrc2pending))
-        .thenReturn(Collections.singletonList(rsrc2pending))
-        .thenReturn(Collections.<LocalResourceStatus>emptyList());
-
-      // First heartbeat which schedules first resource.
-      LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
-      assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
-
-      // Second heartbeat which reports first resource as success.
-      // Second resource is scheduled.
-      response = spyService.heartbeat(stat);
-      assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
-      final String locPath1 = response.getResourceSpecs().get(0).
-          getDestinationDirectory().getFile();
-
-      // Third heartbeat which reports second resource as pending.
-      // Third resource is scheduled.
-      response = spyService.heartbeat(stat);
-      assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
-      final String locPath2 = response.getResourceSpecs().get(0).
-          getDestinationDirectory().getFile();
-
-      // Container c1 is killed which leads to cleanup
-      spyService.handle(new ContainerLocalizationCleanupEvent(c1, rsrcs));
-
-      // This heartbeat will indicate to container localizer to die as localizer
-      // runner has stopped.
-      response = spyService.heartbeat(stat);
-      assertEquals(LocalizerAction.DIE, response.getLocalizerAction());
-
-      exec.setStopLocalization();
-      dispatcher.await();
-      // verify container notification
-      ArgumentMatcher<ContainerEvent> successContainerLoc =
+            Collection<LocalResourceRequest>>();
+    List<LocalResourceRequest> privateResourceList1 =
+        new ArrayList<LocalResourceRequest>();
+    privateResourceList1.add(req11);
+    rsrcs1.put(LocalResourceVisibility.PRIVATE, privateResourceList1);
+    spyService.handle(new ContainerLocalizationRequestEvent(c2, rsrcs1));
+
+    dispatcher.await();
+    // Wait for localizers of both container c1 and c2 to begin.
+    exec.waitForLocalizers(2);
+    LocalizerRunner locC1 =
+        spyService.getLocalizerRunner(c1.getContainerId().toString());
+
+    LocalizerStatus stat = mockLocalizerStatus(c1, resource1, resource2);
+
+    // First heartbeat which schedules first resource.
+    LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
+    assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+
+    // Second heartbeat which reports first resource as success.
+    // Second resource is scheduled.
+    response = spyService.heartbeat(stat);
+    assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+    final String locPath1 =
+        response.getResourceSpecs().get(0).getDestinationDirectory().getFile();
+
+    // Third heartbeat which reports second resource as pending.
+    // Third resource is scheduled.
+    response = spyService.heartbeat(stat);
+    assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+    final String locPath2 =
+        response.getResourceSpecs().get(0).getDestinationDirectory().getFile();
+
+    // Container c1 is killed which leads to cleanup
+    spyService.handle(new ContainerLocalizationCleanupEvent(c1, rsrcs));
+
+    // This heartbeat will indicate to container localizer to die as localizer
+    // runner has stopped.
+    response = spyService.heartbeat(stat);
+    assertEquals(LocalizerAction.DIE, response.getLocalizerAction());
+
+    exec.setStopLocalization();
+    dispatcher.await();
+    // verify container notification
+    ArgumentMatcher<ContainerEvent> successContainerLoc =
         new ArgumentMatcher<ContainerEvent>() {
           @Override
           public boolean matches(Object o) {
             ContainerEvent evt = (ContainerEvent) o;
             return evt.getType() == ContainerEventType.RESOURCE_LOCALIZED
-              && c1.getContainerId() == evt.getContainerID();
+                && c1.getContainerId() == evt.getContainerID();
           }
         };
-      // Only one resource gets localized for container c1.
-      verify(containerBus).handle(argThat(successContainerLoc));
-
-      Set<Path> paths =
-          Sets.newHashSet(new Path(locPath1), new Path(locPath1 + "_tmp"),
-              new Path(locPath2), new Path(locPath2 + "_tmp"));
-      // Wait for localizer runner thread for container c1 to finish.
-      while (locC1.getState() != Thread.State.TERMINATED) {
-        Thread.sleep(50);
-      }
-      // Verify if downloading resources were submitted for deletion.
-      verify(delService).delete(eq(user),
-          (Path) eq(null), argThat(new DownloadingPathsMatcher(paths)));
-
-      LocalResourcesTracker tracker = spyService.getLocalResourcesTracker(
-          LocalResourceVisibility.PRIVATE, "user0", appId);
-      // Container c1 was killed but this resource was localized before kill
-      // hence its not removed despite ref cnt being 0.
-      LocalizedResource rsrc1 = tracker.getLocalizedResource(req1);
-      assertNotNull(rsrc1);
-      assertEquals(rsrc1.getState(), ResourceState.LOCALIZED);
-      assertEquals(rsrc1.getRefCount(), 0);
-
-      // Container c1 was killed but this resource is referenced by container c2
-      // as well hence its ref cnt is 1.
-      LocalizedResource rsrc2 = tracker.getLocalizedResource(req2);
-      assertNotNull(rsrc2);
-      assertEquals(rsrc2.getState(), ResourceState.DOWNLOADING);
-      assertEquals(rsrc2.getRefCount(), 1);
-
-      // As container c1 was killed and this resource was not referenced by any
-      // other container, hence its removed.
-      LocalizedResource rsrc3 = tracker.getLocalizedResource(req3);
-      assertNull(rsrc3);
-    } finally {
-      spyService.stop();
-      dispatcher.stop();
-      delService.stop();
+    // Only one resource gets localized for container c1.
+    verify(containerBus).handle(argThat(successContainerLoc));
+
+    Set<Path> paths =
+        Sets.newHashSet(new Path(locPath1), new Path(locPath1 + "_tmp"),
+            new Path(locPath2), new Path(locPath2 + "_tmp"));
+    // Wait for localizer runner thread for container c1 to finish.
+    while (locC1.getState() != Thread.State.TERMINATED) {
+      Thread.sleep(50);
     }
+    // Verify if downloading resources were submitted for deletion.
+    verify(delService).delete(eq(user), (Path) eq(null),
+        argThat(new DownloadingPathsMatcher(paths)));
+
+    LocalResourcesTracker tracker = spyService.getLocalResourcesTracker(
+        LocalResourceVisibility.PRIVATE, "user0", appId);
+    // Container c1 was killed but this resource was localized before kill
+    // hence its not removed despite ref cnt being 0.
+    LocalizedResource rsrc1 = tracker.getLocalizedResource(req1);
+    assertNotNull(rsrc1);
+    assertEquals(rsrc1.getState(), ResourceState.LOCALIZED);
+    assertEquals(rsrc1.getRefCount(), 0);
+
+    // Container c1 was killed but this resource is referenced by container c2
+    // as well hence its ref cnt is 1.
+    LocalizedResource rsrc2 = tracker.getLocalizedResource(req2);
+    assertNotNull(rsrc2);
+    assertEquals(rsrc2.getState(), ResourceState.DOWNLOADING);
+    assertEquals(rsrc2.getRefCount(), 1);
+
+    // As container c1 was killed and this resource was not referenced by any
+    // other container, hence its removed.
+    LocalizedResource rsrc3 = tracker.getLocalizedResource(req3);
+    assertNull(rsrc3);
+  }
+
+  private LocalizerStatus mockLocalizerStatus(Container c1,
+      LocalResource resource1, LocalResource resource2) {
+    final String containerIdStr = c1.getContainerId().toString();
+    // Heartbeats from container localizer
+    LocalResourceStatus rsrc1success = mock(LocalResourceStatus.class);
+    LocalResourceStatus rsrc2pending = mock(LocalResourceStatus.class);
+    LocalizerStatus stat = mock(LocalizerStatus.class);
+    when(stat.getLocalizerId()).thenReturn(containerIdStr);
+    when(rsrc1success.getResource()).thenReturn(resource1);
+    when(rsrc2pending.getResource()).thenReturn(resource2);
+    when(rsrc1success.getLocalSize()).thenReturn(4344L);
+    URL locPath = getPath("/some/path");
+    when(rsrc1success.getLocalPath()).thenReturn(locPath);
+    when(rsrc1success.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
+    when(rsrc2pending.getStatus()).thenReturn(ResourceStatusType.FETCH_PENDING);
+
+    when(stat.getResources())
+        .thenReturn(Collections.<LocalResourceStatus> emptyList())
+        .thenReturn(Collections.singletonList(rsrc1success))
+        .thenReturn(Collections.singletonList(rsrc2pending))
+        .thenReturn(Collections.singletonList(rsrc2pending))
+        .thenReturn(Collections.<LocalResourceStatus> emptyList());
+    return stat;
+  }
+
+  @SuppressWarnings("unchecked")
+  private List<LocalResource> initializeLocalizer(ApplicationId appId)
+      throws IOException {
+    // Initialize localizer.
+    Random r = new Random();
+    long seed = r.nextLong();
+    System.out.println("SEED: " + seed);
+    r.setSeed(seed);
+    FSDataOutputStream out =
+        new FSDataOutputStream(new DataOutputBuffer(), null);
+    doReturn(out).when(spylfs).createInternal(isA(Path.class),
+        isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
+        anyLong(), isA(Progressable.class), isA(ChecksumOpt.class),
+        anyBoolean());
+    final LocalResource resource1 = getPrivateMockedResource(r);
+    LocalResource resource2 = null;
+    do {
+      resource2 = getPrivateMockedResource(r);
+    } while (resource2 == null || resource2.equals(resource1));
+    LocalResource resource3 = null;
+    do {
+      resource3 = getPrivateMockedResource(r);
+    } while (resource3 == null || resource3.equals(resource1)
+        || resource3.equals(resource2));
+    return Arrays.asList(resource1, resource2, resource3);
   }
 
   @Test