瀏覽代碼

YARN-1800. Fixed NodeManager to gracefully handle RejectedExecutionException in the public-localizer thread-pool. Contributed by Varun Vasudev.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1576545 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 11 年之前
父節點
當前提交
8aab8533a1

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -446,6 +446,9 @@ Release 2.4.0 - UNRELEASED
     YARN-1821. NPE on registerNodeManager if the request has containers for 
     UnmanagedAMs. (kasha)
 
+    YARN-1800. Fixed NodeManager to gracefully handle RejectedExecutionException
+    in the public-localizer thread-pool. (Varun Vasudev via vinodkv)
+
 Release 2.3.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 9 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java

@@ -44,6 +44,7 @@ import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
@@ -683,9 +684,16 @@ public class ResourceLocalizationService extends CompositeService
             }
           } catch (IOException e) {
             rsrc.unlock();
-            // TODO Need to Fix IO Exceptions - Notifying resource
+            publicRsrc.handle(new ResourceFailedLocalizationEvent(request
+              .getResource().getRequest(), e.getMessage()));
             LOG.error("Local path for public localization is not found. "
                 + " May be disks failed.", e);
+          } catch (RejectedExecutionException re) {
+            rsrc.unlock();
+            publicRsrc.handle(new ResourceFailedLocalizationEvent(request
+              .getResource().getRequest(), re.getMessage()));
+            LOG.error("Failed to submit rsrc " + rsrc + " for download."
+                + " Either queue is full or threadpool is shutdown.", re);
           }
         } else {
           rsrc.unlock();

+ 120 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

@@ -84,6 +84,7 @@ import org.apache.hadoop.yarn.api.records.SerializedException;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -109,6 +110,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerRunner;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.PublicLocalizer;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
@@ -126,6 +128,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -681,6 +684,121 @@ public class TestResourceLocalizationService {
       dispatcher.stop();
     }
   }
+  
+  /*
+   * Test case for handling RejectedExecutionException and IOException which can
+   * be thrown when adding public resources to the pending queue.
+   * RejectedExecutionException can be thrown either due to the incoming queue
+   * being full or if the ExecutorCompletionService threadpool is shutdown.
+   * Since it's hard to simulate the queue being full, this test just shuts down
+   * the threadpool and makes sure the exception is handled. If anything is
+   * messed up the async dispatcher thread will cause a system exit causing the
+   * test to fail.
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testPublicResourceAddResourceExceptions() throws Exception {
+    List<Path> localDirs = new ArrayList<Path>();
+    String[] sDirs = new String[4];
+    for (int i = 0; i < 4; ++i) {
+      localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
+      sDirs[i] = localDirs.get(i).toString();
+    }
+    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+    conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
+
+    DrainDispatcher dispatcher = new DrainDispatcher();
+    EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, applicationBus);
+    EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+    dispatcher.register(ContainerEventType.class, containerBus);
+
+    ContainerExecutor exec = mock(ContainerExecutor.class);
+    DeletionService delService = mock(DeletionService.class);
+    LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+    LocalDirsHandlerService dirsHandlerSpy = spy(dirsHandler);
+    dirsHandlerSpy.init(conf);
+
+    dispatcher.init(conf);
+    dispatcher.start();
+
+    try {
+      ResourceLocalizationService rawService =
+          new ResourceLocalizationService(dispatcher, exec, delService,
+            dirsHandlerSpy);
+      ResourceLocalizationService spyService = spy(rawService);
+      doReturn(mockServer).when(spyService).createServer();
+      doReturn(lfs).when(spyService).getLocalFileContext(
+        isA(Configuration.class));
+
+      spyService.init(conf);
+      spyService.start();
+
+      final String user = "user0";
+      // init application
+      final Application app = mock(Application.class);
+      final ApplicationId appId =
+          BuilderUtils.newApplicationId(314159265358979L, 3);
+      when(app.getUser()).thenReturn(user);
+      when(app.getAppId()).thenReturn(appId);
+      spyService.handle(new ApplicationLocalizationEvent(
+        LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
+      dispatcher.await();
+
+      // init resources
+      Random r = new Random();
+      r.setSeed(r.nextLong());
+
+      // Queue localization request for the public resource
+      final LocalResource pubResource = getPublicMockedResource(r);
+      final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource);
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
+          new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
+      req
+        .put(LocalResourceVisibility.PUBLIC, Collections.singletonList(pubReq));
+
+      // init container.
+      final Container c = getMockContainer(appId, 42);
+
+      // first test ioexception
+      Mockito
+        .doThrow(new IOException())
+        .when(dirsHandlerSpy)
+        .getLocalPathForWrite(isA(String.class), Mockito.anyLong(),
+          Mockito.anyBoolean());
+      // send request
+      spyService.handle(new ContainerLocalizationRequestEvent(c, req));
+      dispatcher.await();
+      LocalResourcesTracker tracker =
+          spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
+            user, appId);
+      Assert.assertNull(tracker.getLocalizedResource(pubReq));
+
+      // test RejectedExecutionException
+      Mockito
+        .doCallRealMethod()
+        .when(dirsHandlerSpy)
+        .getLocalPathForWrite(isA(String.class), Mockito.anyLong(),
+          Mockito.anyBoolean());
+
+      // shutdown the thread pool
+      PublicLocalizer publicLocalizer = spyService.getPublicLocalizer();
+      publicLocalizer.threadPool.shutdown();
+
+      spyService.handle(new ContainerLocalizationRequestEvent(c, req));
+      dispatcher.await();
+      tracker =
+          spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
+            user, appId);
+      Assert.assertNull(tracker.getLocalizedResource(pubReq));
+
+    } finally {
+      // if we call stop with events in the queue, an InterruptedException gets
+      // thrown resulting in the dispatcher thread causing a system exit
+      dispatcher.await();
+      dispatcher.stop();
+    }
+  }
 
   @Test(timeout = 100000)
   @SuppressWarnings("unchecked")
@@ -829,6 +947,8 @@ public class TestResourceLocalizationService {
       }
     }
   }
+  
+  
 
   @Test(timeout = 10000)
   @SuppressWarnings("unchecked")