Browse Source

Merging r1510071 from branch-2 to branch-2.1-beta for YARN-994. HeartBeat thread in AMRMClientAsync does not handle runtime exception correctly (Xuan Gong via bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.1-beta@1510076 13f79535-47bb-0310-9956-ffa450edef68
Bikas Saha 11 years ago
parent
commit
9660d9554c

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -30,6 +30,9 @@ Release 2.1.1-beta - UNRELEASED
     YARN-906. Fixed a bug in NodeManager where cancelling ContainerLaunch at
     YARN-906. Fixed a bug in NodeManager where cancelling ContainerLaunch at
     KILLING state causes that the container to hang. (Zhijie Shen via vinodkv)
     KILLING state causes that the container to hang. (Zhijie Shen via vinodkv)
 
 
+    YARN-994. HeartBeat thread in AMRMClientAsync does not handle runtime
+    exception correctly (Xuan Gong via bikas)
+
 Release 2.1.0-beta - 2013-08-06
 Release 2.1.0-beta - 2013-08-06
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 3 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java

@@ -65,7 +65,7 @@ extends AMRMClientAsync<T> {
   private volatile boolean keepRunning;
   private volatile boolean keepRunning;
   private volatile float progress;
   private volatile float progress;
   
   
-  private volatile Exception savedException;
+  private volatile Throwable savedException;
   
   
   public AMRMClientAsyncImpl(int intervalMs, CallbackHandler callbackHandler) {
   public AMRMClientAsyncImpl(int intervalMs, CallbackHandler callbackHandler) {
     this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
     this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
@@ -222,18 +222,12 @@ extends AMRMClientAsync<T> {
             
             
           try {
           try {
             response = client.allocate(progress);
             response = client.allocate(progress);
-          } catch (YarnException ex) {
-            LOG.error("Yarn exception on heartbeat", ex);
+          } catch (Throwable ex) {
+            LOG.error("Exception on heartbeat", ex);
             savedException = ex;
             savedException = ex;
             // interrupt handler thread in case it waiting on the queue
             // interrupt handler thread in case it waiting on the queue
             handlerThread.interrupt();
             handlerThread.interrupt();
             return;
             return;
-          } catch (IOException e) {
-            LOG.error("IO exception on heartbeat", e);
-            savedException = e;
-            // interrupt handler thread in case it waiting on the queue
-            handlerThread.interrupt();
-            return;
           }
           }
         }
         }
         if (response != null) {
         if (response != null) {

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java

@@ -277,6 +277,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
   public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
       String appMessage, String appTrackingUrl) throws YarnException,
       String appMessage, String appTrackingUrl) throws YarnException,
       IOException {
       IOException {
+    Preconditions.checkArgument(appStatus != null,
+        "AppStatus should not be null.");
     FinishApplicationMasterRequest request =
     FinishApplicationMasterRequest request =
         FinishApplicationMasterRequest.newInstance(appStatus, appMessage,
         FinishApplicationMasterRequest.newInstance(appStatus, appMessage,
           appTrackingUrl);
           appTrackingUrl);

+ 19 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java

@@ -159,14 +159,26 @@ public class TestAMRMClientAsync {
 
 
   @Test(timeout=10000)
   @Test(timeout=10000)
   public void testAMRMClientAsyncException() throws Exception {
   public void testAMRMClientAsyncException() throws Exception {
+    String exStr = "TestException";
+    YarnException mockException = mock(YarnException.class);
+    when(mockException.getMessage()).thenReturn(exStr);
+    runHeartBeatThrowOutException(mockException);
+  }
+
+  @Test(timeout=10000)
+  public void testAMRMClientAsyncRunTimeException() throws Exception {
+    String exStr = "TestRunTimeException";
+    RuntimeException mockRunTimeException = mock(RuntimeException.class);
+    when(mockRunTimeException.getMessage()).thenReturn(exStr);
+    runHeartBeatThrowOutException(mockRunTimeException);
+  }
+
+  private void runHeartBeatThrowOutException(Exception ex) throws Exception{
     Configuration conf = new Configuration();
     Configuration conf = new Configuration();
     TestCallbackHandler callbackHandler = new TestCallbackHandler();
     TestCallbackHandler callbackHandler = new TestCallbackHandler();
     @SuppressWarnings("unchecked")
     @SuppressWarnings("unchecked")
     AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
     AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
-    String exStr = "TestException";
-    YarnException mockException = mock(YarnException.class);
-    when(mockException.getMessage()).thenReturn(exStr);
-    when(client.allocate(anyFloat())).thenThrow(mockException);
+    when(client.allocate(anyFloat())).thenThrow(ex);
 
 
     AMRMClientAsync<ContainerRequest> asyncClient = 
     AMRMClientAsync<ContainerRequest> asyncClient = 
         AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
         AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
@@ -183,14 +195,14 @@ public class TestAMRMClientAsync {
         }
         }
       }
       }
     }
     }
-    
-    Assert.assertTrue(callbackHandler.savedException.getMessage().contains(exStr));
+    Assert.assertTrue(callbackHandler.savedException.getMessage().contains(
+        ex.getMessage()));
     
     
     asyncClient.stop();
     asyncClient.stop();
     // stopping should have joined all threads and completed all callbacks
     // stopping should have joined all threads and completed all callbacks
     Assert.assertTrue(callbackHandler.callbackCount == 0);
     Assert.assertTrue(callbackHandler.callbackCount == 0);
   }
   }
-  
+
   @Test//(timeout=10000)
   @Test//(timeout=10000)
   public void testAMRMClientAsyncReboot() throws Exception {
   public void testAMRMClientAsyncReboot() throws Exception {
     Configuration conf = new Configuration();
     Configuration conf = new Configuration();