Browse Source

MAPREDUCE-6439. AM may fail instead of retrying if RM shuts down during the allocate call. (Anubhav Dhoot via kasha)

Karthik Kambatla 9 years ago
parent
commit
8dfec7a197

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -573,6 +573,9 @@ Release 2.7.2 - UNRELEASED
     MAPREDUCE-6426. TestShuffleHandler#testGetMapOutputInfo is failing.
     (zhihai xu via devaraj)
 
+    MAPREDUCE-6439. AM may fail instead of retrying if RM shuts down during the
+    allocate call. (Anubhav Dhoot via kasha)
+
 Release 2.7.1 - 2015-07-06 
 
   INCOMPATIBLE CHANGES

+ 27 - 24
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java

@@ -270,35 +270,38 @@ public abstract class RMCommunicator extends AbstractService
     super.serviceStop();
   }
 
-  protected void startAllocatorThread() {
-    allocatorThread = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+  @VisibleForTesting
+  public class AllocatorRunnable implements Runnable {
+    @Override
+    public void run() {
+      while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+        try {
+          Thread.sleep(rmPollInterval);
           try {
-            Thread.sleep(rmPollInterval);
-            try {
-              heartbeat();
-            } catch (YarnRuntimeException e) {
-              LOG.error("Error communicating with RM: " + e.getMessage() , e);
-              return;
-            } catch (Exception e) {
-              LOG.error("ERROR IN CONTACTING RM. ", e);
-              continue;
-              // TODO: for other exceptions
-            }
-
-            lastHeartbeatTime = context.getClock().getTime();
-            executeHeartbeatCallbacks();
-          } catch (InterruptedException e) {
-            if (!stopped.get()) {
-              LOG.warn("Allocated thread interrupted. Returning.");
-            }
+            heartbeat();
+          } catch (RMContainerAllocationException e) {
+            LOG.error("Error communicating with RM: " + e.getMessage() , e);
             return;
+          } catch (Exception e) {
+            LOG.error("ERROR IN CONTACTING RM. ", e);
+            continue;
+            // TODO: for other exceptions
+          }
+
+          lastHeartbeatTime = context.getClock().getTime();
+          executeHeartbeatCallbacks();
+        } catch (InterruptedException e) {
+          if (!stopped.get()) {
+            LOG.warn("Allocated thread interrupted. Returning.");
           }
+          return;
         }
       }
-    });
+    }
+  }
+
+  protected void startAllocatorThread() {
+    allocatorThread = new Thread(new AllocatorRunnable());
     allocatorThread.setName("RMCommunicator Allocator");
     allocatorThread.start();
   }

+ 31 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocationException.java

@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+/**
+ * Exception to denote fatal failure in allocating containers from RM.
+ */
+public class RMContainerAllocationException extends Exception {
+  private static final long serialVersionUID = 1L;
+  public RMContainerAllocationException(Throwable cause) { super(cause); }
+  public RMContainerAllocationException(String message) { super(message); }
+  public RMContainerAllocationException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

+ 2 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -699,7 +699,7 @@ public class RMContainerAllocator extends RMContainerRequestor
       // this application must clean itself up.
       eventHandler.handle(new JobEvent(this.getJob().getID(),
         JobEventType.JOB_AM_REBOOT));
-      throw new YarnRuntimeException(
+      throw new RMContainerAllocationException(
         "Resource Manager doesn't recognize AttemptId: "
             + this.getContext().getApplicationAttemptId(), e);
     } catch (ApplicationMasterNotRegisteredException e) {
@@ -717,7 +717,7 @@ public class RMContainerAllocator extends RMContainerRequestor
         LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
         eventHandler.handle(new JobEvent(this.getJob().getID(),
                                          JobEventType.JOB_AM_REBOOT));
-        throw new YarnRuntimeException("Could not contact RM after " +
+        throw new RMContainerAllocationException("Could not contact RM after " +
                                 retryInterval + " milliseconds.");
       }
       // Throw this up to the caller, which may decide to ignore it and

+ 99 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMCommunicator.java

@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator.AllocatorRunnable;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.util.Clock;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestRMCommunicator {
+
+  class MockRMCommunicator extends RMCommunicator {
+    public MockRMCommunicator(ClientService clientService, AppContext context) {
+      super(clientService, context);
+    }
+
+    @Override
+    protected void heartbeat() throws Exception {
+    }
+  }
+
+  @Test(timeout = 2000)
+  public void testRMContainerAllocatorExceptionIsHandled() throws Exception {
+    ClientService mockClientService = mock(ClientService.class);
+    AppContext mockContext = mock(AppContext.class);
+    MockRMCommunicator mockRMCommunicator =
+        new MockRMCommunicator(mockClientService, mockContext);
+    RMCommunicator communicator = spy(mockRMCommunicator);
+    Clock mockClock = mock(Clock.class);
+    when(mockContext.getClock()).thenReturn(mockClock);
+
+    doThrow(new RMContainerAllocationException("Test")).doNothing()
+        .when(communicator).heartbeat();
+
+    when(mockClock.getTime()).thenReturn(1L).thenThrow(new AssertionError(
+        "GetClock called second time, when it should not have since the " +
+        "thread should have quit"));
+
+    AllocatorRunnable testRunnable = communicator.new AllocatorRunnable();
+    testRunnable.run();
+  }
+
+  @Test(timeout = 2000)
+  public void testRMContainerAllocatorYarnRuntimeExceptionIsHandled()
+      throws Exception {
+    ClientService mockClientService = mock(ClientService.class);
+    AppContext mockContext = mock(AppContext.class);
+    MockRMCommunicator mockRMCommunicator =
+        new MockRMCommunicator(mockClientService, mockContext);
+    final RMCommunicator communicator = spy(mockRMCommunicator);
+    Clock mockClock = mock(Clock.class);
+    when(mockContext.getClock()).thenReturn(mockClock);
+
+    doThrow(new YarnRuntimeException("Test")).doNothing()
+        .when(communicator).heartbeat();
+
+    when(mockClock.getTime()).thenReturn(1L).thenAnswer(new Answer<Integer>() {
+      @Override
+      public Integer answer(InvocationOnMock invocation) throws Throwable {
+        communicator.stop();
+        return 2;
+      }
+    }).thenThrow(new AssertionError(
+        "GetClock called second time, when it should not have since the thread " +
+        "should have quit"));
+
+    AllocatorRunnable testRunnable = communicator.new AllocatorRunnable();
+    testRunnable.run();
+
+    verify(mockClock, times(2)).getTime();
+  }
+}

+ 40 - 8
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java

@@ -1830,12 +1830,7 @@ public class TestRMContainerAllocator {
         }
       }, 100, 10000);
       // run the scheduler
-      try {
-        super.heartbeat();
-      } catch (Exception e) {
-        LOG.error("error in heartbeat ", e);
-        throw new YarnRuntimeException(e);
-      }
+      super.heartbeat();
 
       List<TaskAttemptContainerAssignedEvent> result
         = new ArrayList<TaskAttemptContainerAssignedEvent>(events);
@@ -1885,7 +1880,7 @@ public class TestRMContainerAllocator {
     @Override
     protected AllocateResponse makeRemoteRequest() throws IOException,
       YarnException {
-      throw new YarnRuntimeException("for testing");
+      throw new IOException("for testing");
     }
   }
 
@@ -2450,7 +2445,7 @@ public class TestRMContainerAllocator {
     try {
       allocator.schedule();
       Assert.fail("Should Have Exception");
-    } catch (YarnRuntimeException e) {
+    } catch (RMContainerAllocationException e) {
       Assert.assertTrue(e.getMessage().contains("Could not contact RM after"));
     }
     dispatcher.await();
@@ -2671,6 +2666,43 @@ public class TestRMContainerAllocator {
     allocator.close();
   }
 
+  @Test(expected = RMContainerAllocationException.class)
+  public void testAttemptNotFoundCausesRMCommunicatorException()
+      throws Exception {
+
+    Configuration conf = new Configuration();
+    MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+        .getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+    amNodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+        appAttemptId, mockJob);
+
+    // Now kill the application
+    rm.killApp(app.getApplicationId());
+
+    allocator.schedule();
+  }
+
   private static class MockScheduler implements ApplicationMasterProtocol {
     ApplicationAttemptId attemptId;
     long nextContainerId = 10;