Parcourir la source

Merge r1506750 from trunk to branch-2 for YARN-875. Application can hang if AMRMClientAsync callback thread has exception (Xuan Gong via bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1506752 13f79535-47bb-0310-9956-ffa450edef68
Bikas Saha il y a 12 ans
Parent
commit
5f64166ba3

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -726,6 +726,9 @@ Release 2.1.0-beta - 2013-07-02
     YARN-873. YARNClient.getApplicationReport(unknownAppId) returns a null
     report (Xuan Gong via bikas)
 
+    YARN-875. Application can hang if AMRMClientAsync callback thread has
+    exception (Xuan Gong via bikas)
+
   BREAKDOWN OF HADOOP-8562/YARN-191 SUBTASKS AND RELATED JIRAS
 
     YARN-158. Yarn creating package-info.java must not depend on sh.

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java

@@ -649,8 +649,9 @@ public class ApplicationMaster {
     }
 
     @Override
-    public void onError(Exception e) {
+    public void onError(Throwable e) {
       done = true;
+      resourceManager.stop();
     }
   }
 

+ 8 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java

@@ -220,6 +220,13 @@ extends AbstractService {
     
     public float getProgress();
     
-    public void onError(Exception e);
+    /**
+     * Called when error comes from RM communications as well as from errors in
+     * the callback itself from the app. Calling
+     * stop() is the recommended action.
+     *
+     * @param e
+     */
+    public void onError(Throwable e);
   }
 }

+ 48 - 39
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java

@@ -217,7 +217,7 @@ extends AMRMClientAsync<T> {
         // synchronization ensures we don't send heartbeats after unregistering
         synchronized (unregisterHeartbeatLock) {
           if (!keepRunning) {
-            break;
+            return;
           }
             
           try {
@@ -227,13 +227,13 @@ extends AMRMClientAsync<T> {
             savedException = ex;
             // interrupt handler thread in case it waiting on the queue
             handlerThread.interrupt();
-            break;
+            return;
           } catch (IOException e) {
             LOG.error("IO exception on heartbeat", e);
             savedException = e;
             // interrupt handler thread in case it waiting on the queue
             handlerThread.interrupt();
-            break;
+            return;
           }
         }
         if (response != null) {
@@ -266,51 +266,60 @@ extends AMRMClientAsync<T> {
     }
     
     public void run() {
-      while (keepRunning) {
-        AllocateResponse response;
+      while (true) {
+        if (!keepRunning) {
+          return;
+        }
         try {
+          AllocateResponse response;
           if(savedException != null) {
             LOG.error("Stopping callback due to: ", savedException);
             handler.onError(savedException);
-            break;
+            return;
+          }
+          try {
+            response = responseQueue.take();
+          } catch (InterruptedException ex) {
+            LOG.info("Interrupted while waiting for queue", ex);
+            continue;
           }
-          response = responseQueue.take();
-        } catch (InterruptedException ex) {
-          LOG.info("Interrupted while waiting for queue", ex);
-          continue;
-        }
 
-        if (response.getAMCommand() != null) {
-          switch(response.getAMCommand()) {
-          case AM_RESYNC:
-          case AM_SHUTDOWN:
-            handler.onShutdownRequest();
-            LOG.info("Shutdown requested. Stopping callback.");
-            return;
-          default:
-            String msg =
-                  "Unhandled value of AMCommand: " + response.getAMCommand();
-            LOG.error(msg);
-            throw new YarnRuntimeException(msg);
+          if (response.getAMCommand() != null) {
+            switch(response.getAMCommand()) {
+            case AM_RESYNC:
+            case AM_SHUTDOWN:
+              handler.onShutdownRequest();
+              LOG.info("Shutdown requested. Stopping callback.");
+              return;
+            default:
+              String msg =
+                    "Unhandled value of RM AMCommand: " + response.getAMCommand();
+              LOG.error(msg);
+              throw new YarnRuntimeException(msg);
+            }
+          }
+          List<NodeReport> updatedNodes = response.getUpdatedNodes();
+          if (!updatedNodes.isEmpty()) {
+            handler.onNodesUpdated(updatedNodes);
           }
-        }
-        List<NodeReport> updatedNodes = response.getUpdatedNodes();
-        if (!updatedNodes.isEmpty()) {
-          handler.onNodesUpdated(updatedNodes);
-        }
-        
-        List<ContainerStatus> completed =
-            response.getCompletedContainersStatuses();
-        if (!completed.isEmpty()) {
-          handler.onContainersCompleted(completed);
-        }
 
-        List<Container> allocated = response.getAllocatedContainers();
-        if (!allocated.isEmpty()) {
-          handler.onContainersAllocated(allocated);
+          List<ContainerStatus> completed =
+              response.getCompletedContainersStatuses();
+          if (!completed.isEmpty()) {
+            handler.onContainersCompleted(completed);
+          }
+
+          List<Container> allocated = response.getAllocatedContainers();
+          if (!allocated.isEmpty()) {
+            handler.onContainersAllocated(allocated);
+          }
+
+          progress = handler.getProgress();
+        } catch (Throwable ex) {
+          handler.onError(ex);
+          // re-throw exception to end the thread
+          throw new YarnRuntimeException(ex);
         }
-        
-        progress = handler.getProgress();
       }
     }
   }

+ 87 - 15
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java

@@ -21,10 +21,12 @@ package org.apache.hadoop.yarn.client.api.async.impl;
 import static org.mockito.Matchers.anyFloat;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.spy;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -54,6 +56,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -264,13 +267,13 @@ public class TestAMRMClientAsync {
 
     AMRMClientAsync<ContainerRequest> asyncClient =
         AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
-    callbackHandler.registerAsyncClient(asyncClient);
+    callbackHandler.asynClient = asyncClient;
     asyncClient.init(conf);
     asyncClient.start();
 
     synchronized (callbackHandler.notifier) {
       asyncClient.registerApplicationMaster("localhost", 1234, null);
-      while(callbackHandler.stop == false) {
+      while(callbackHandler.notify == false) {
         try {
           callbackHandler.notifier.wait();
         } catch (InterruptedException e) {
@@ -280,6 +283,65 @@ public class TestAMRMClientAsync {
     }
   }
 
+  void runCallBackThrowOutException(TestCallbackHandler2 callbackHandler) throws
+        InterruptedException, YarnException, IOException {
+    Configuration conf = new Configuration();
+    @SuppressWarnings("unchecked")
+    AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
+
+    List<ContainerStatus> completed = Arrays.asList(
+        ContainerStatus.newInstance(newContainerId(0, 0, 0, 0),
+            ContainerState.COMPLETE, "", 0));
+    final AllocateResponse response = createAllocateResponse(completed,
+        new ArrayList<Container>(), null);
+
+    when(client.allocate(anyFloat())).thenReturn(response);
+    AMRMClientAsync<ContainerRequest> asyncClient =
+        AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
+    callbackHandler.asynClient = asyncClient;
+    callbackHandler.throwOutException = true;
+    asyncClient.init(conf);
+    asyncClient.start();
+
+    // call register and wait for error callback and stop
+    synchronized (callbackHandler.notifier) {
+      asyncClient.registerApplicationMaster("localhost", 1234, null);
+      while(callbackHandler.notify == false) {
+        try {
+          callbackHandler.notifier.wait();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+    // verify error invoked
+    verify(callbackHandler, times(0)).getProgress();
+    verify(callbackHandler, times(1)).onError(any(Exception.class));
+    // sleep to wait for a few heartbeat calls that can trigger callbacks
+    Thread.sleep(50);
+    // verify no more invocations after the first one.
+    // ie. callback thread has stopped
+    verify(callbackHandler, times(0)).getProgress();
+    verify(callbackHandler, times(1)).onError(any(Exception.class));
+  }
+
+  @Test (timeout = 5000)
+  public void testCallBackThrowOutException() throws YarnException,
+      IOException, InterruptedException {
+    // test exception in callback with app calling stop() on app.onError()
+    TestCallbackHandler2 callbackHandler = spy(new TestCallbackHandler2());
+    runCallBackThrowOutException(callbackHandler);
+  }
+
+  @Test (timeout = 5000)
+  public void testCallBackThrowOutExceptionNoStop() throws YarnException,
+      IOException, InterruptedException {
+    // test exception in callback with app not calling stop() on app.onError()
+    TestCallbackHandler2 callbackHandler = spy(new TestCallbackHandler2());
+    callbackHandler.stop = false;
+    runCallBackThrowOutException(callbackHandler);
+  }
+
   private AllocateResponse createAllocateResponse(
       List<ContainerStatus> completed, List<Container> allocated,
       List<NMToken> nmTokens) {
@@ -378,8 +440,8 @@ public class TestAMRMClientAsync {
     }
 
     @Override
-    public void onError(Exception e) {
-      savedException = e;
+    public void onError(Throwable e) {
+      savedException = new Exception(e.getMessage());
       synchronized (notifier) {
         notifier.notifyAll();        
       }
@@ -390,10 +452,16 @@ public class TestAMRMClientAsync {
     Object notifier = new Object();
     @SuppressWarnings("rawtypes")
     AMRMClientAsync asynClient;
-    boolean stop = false;
+    boolean stop = true;
+    boolean notify = false;
+    boolean throwOutException = false;
 
     @Override
-    public void onContainersCompleted(List<ContainerStatus> statuses) {}
+    public void onContainersCompleted(List<ContainerStatus> statuses) {
+      if (throwOutException) {
+        throw new YarnRuntimeException("Exception from callback handler");
+      }
+    }
 
     @Override
     public void onContainersAllocated(List<Container> containers) {}
@@ -406,20 +474,24 @@ public class TestAMRMClientAsync {
 
     @Override
     public float getProgress() {
-      asynClient.stop();
-      stop = true;
-      synchronized (notifier) {
-        notifier.notifyAll();
-      }
+      callStopAndNotify();
       return 0;
     }
 
     @Override
-    public void onError(Exception e) {}
+    public void onError(Throwable e) {
+      Assert.assertEquals(e.getMessage(), "Exception from callback handler");
+      callStopAndNotify();
+    }
 
-    public void registerAsyncClient(
-        AMRMClientAsync<ContainerRequest> asyncClient) {
-      this.asynClient = asyncClient;
+    void callStopAndNotify() {
+      if(stop) {
+        asynClient.stop();
+      }
+      notify = true;
+      synchronized (notifier) {
+        notifier.notifyAll();
+      }
     }
   }
 }