Browse Source

YARN-1879. Marked Idempotent/AtMostOnce annotations to ApplicationMasterProtocol for RM fail over. Contributed by Tsuyoshi OZAWA
(cherry picked from commit c3de2412eb7633ff16c67e71e73bbe27a982d984)

Jian He 10 years ago
parent
commit
78a57087bf

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -334,6 +334,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2621. Simplify the output when the user doesn't have the access for
     getDomain(s). (Zhijie Shen via jianhe)
 
+    YARN-1879. Marked Idempotent/AtMostOnce annotations to ApplicationMasterProtocol
+    for RM fail over. (Tsuyoshi OZAWA via jianhe)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java

@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.io.retry.AtMostOnce;
+import org.apache.hadoop.io.retry.Idempotent;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -80,6 +81,7 @@ public interface ApplicationMasterProtocol {
    */
   @Public
   @Stable
+  @Idempotent
   public RegisterApplicationMasterResponse registerApplicationMaster(
       RegisterApplicationMasterRequest request) 
   throws YarnException, IOException;
@@ -104,6 +106,7 @@ public interface ApplicationMasterProtocol {
    */
   @Public
   @Stable
+  @AtMostOnce
   public FinishApplicationMasterResponse finishApplicationMaster(
       FinishApplicationMasterRequest request) 
   throws YarnException, IOException;

+ 63 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java

@@ -18,6 +18,14 @@
 
 package org.apache.hadoop.yarn.client;
 
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -122,7 +130,23 @@ import org.junit.After;
 import org.junit.Before;
 
 
-public abstract class ProtocolHATestBase extends ClientBaseWithFixes{
+/**
+ * Test Base for ResourceManager's Protocol on HA.
+ *
+ * Limited scope:
+ * For all the test cases, we only test whether the method will be re-entered
+ * when failover happens. Does not cover the entire logic of test.
+ *
+ * Test strategy:
+ * Create a separate failover thread with a trigger flag,
+ * override all APIs that are added trigger flag.
+ * When the APIs are called, we will set trigger flag as true to kick off
+ * the failover. So We can make sure the failover happens during process
+ * of the method. If this API is marked as @Idempotent or @AtMostOnce,
+ * the test cases will pass; otherwise, they will throw the exception.
+ *
+ */
+public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
   protected static final HAServiceProtocol.StateChangeRequestInfo req =
       new HAServiceProtocol.StateChangeRequestInfo(
           HAServiceProtocol.RequestSource.REQUEST_BY_USER);
@@ -760,6 +784,43 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes{
         return createFakeAllocateResponse();
       }
 
+      @Override
+      public RegisterApplicationMasterResponse registerApplicationMaster(
+          RegisterApplicationMasterRequest request) throws YarnException,
+          IOException {
+        resetStartFailoverFlag(true);
+        // make sure failover has been triggered
+        Assert.assertTrue(waittingForFailOver());
+        return createFakeRegisterApplicationMasterResponse();
+      }
+
+      @Override
+      public FinishApplicationMasterResponse finishApplicationMaster(
+          FinishApplicationMasterRequest request) throws YarnException,
+          IOException {
+        resetStartFailoverFlag(true);
+        // make sure failover has been triggered
+        Assert.assertTrue(waittingForFailOver());
+        return createFakeFinishApplicationMasterResponse();
+      }
+    }
+
+    public RegisterApplicationMasterResponse
+    createFakeRegisterApplicationMasterResponse() {
+      Resource minCapability = Resource.newInstance(2048, 2);
+      Resource maxCapability = Resource.newInstance(4096, 4);
+      Map<ApplicationAccessType, String> acls =
+          new HashMap<ApplicationAccessType, String>();
+      acls.put(ApplicationAccessType.MODIFY_APP, "*");
+      ByteBuffer key = ByteBuffer.wrap("fake_key".getBytes());
+      return RegisterApplicationMasterResponse.newInstance(minCapability,
+          maxCapability, acls, key, new ArrayList<Container>(), "root",
+          new ArrayList<NMToken>());
+    }
+
+    public FinishApplicationMasterResponse
+    createFakeFinishApplicationMasterResponse() {
+      return FinishApplicationMasterResponse.newInstance(true);
     }
 
     public AllocateResponse createFakeAllocateResponse() {
@@ -770,4 +831,5 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes{
           null, new ArrayList<NMToken>());
     }
   }
+
 }

+ 0 - 92
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java

@@ -1,92 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.hadoop.yarn.client;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.junit.Assert;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-
-public class TestApplicationMasterServiceOnHA extends ProtocolHATestBase{
-  private ApplicationMasterProtocol amClient;
-  private ApplicationAttemptId attemptId ;
-  RMAppAttempt appAttempt;
-
-  @Before
-  public void initiate() throws Exception {
-    startHACluster(0, false, false, true);
-    attemptId = this.cluster.createFakeApplicationAttemptId();
-    amClient = ClientRMProxy
-        .createRMProxy(this.conf, ApplicationMasterProtocol.class);
-
-    Token<AMRMTokenIdentifier> appToken =
-        this.cluster.getResourceManager().getRMContext()
-          .getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId);
-    appToken.setService(ClientRMProxy.getAMRMTokenService(conf));
-    UserGroupInformation.setLoginUser(UserGroupInformation
-        .createRemoteUser(UserGroupInformation.getCurrentUser()
-            .getUserName()));
-    UserGroupInformation.getCurrentUser().addToken(appToken);
-    syncToken(appToken);
-  }
-
-  @After
-  public void shutDown() {
-    if(this.amClient != null) {
-      RPC.stopProxy(this.amClient);
-    }
-  }
-
-  @Test(timeout = 15000)
-  public void testAllocateOnHA() throws YarnException, IOException {
-    AllocateRequest request = AllocateRequest.newInstance(0, 50f,
-        new ArrayList<ResourceRequest>(),
-        new ArrayList<ContainerId>(),
-        ResourceBlacklistRequest.newInstance(new ArrayList<String>(),
-            new ArrayList<String>()));
-    AllocateResponse response = amClient.allocate(request);
-    Assert.assertEquals(response, this.cluster.createFakeAllocateResponse());
-  }
-
-  private void syncToken(Token<AMRMTokenIdentifier> token) throws IOException {
-    for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) {
-      this.cluster.getResourceManager(i).getRMContext()
-          .getAMRMTokenSecretManager().addPersistedPassword(token);
-    }
-  }
-}

+ 9 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

@@ -343,6 +343,15 @@ public class ApplicationMasterService extends AbstractService implements
         authorizeRequest().getApplicationAttemptId();
     ApplicationId appId = applicationAttemptId.getApplicationId();
 
+    RMApp rmApp =
+        rmContext.getRMApps().get(applicationAttemptId.getApplicationId());
+    // checking whether the app exits in RMStateStore at first not to throw
+    // ApplicationDoesNotExistInCacheException before and after
+    // RM work-preserving restart.
+    if (rmApp.isAppFinalStateStored()) {
+      return FinishApplicationMasterResponse.newInstance(true);
+    }
+
     AllocateResponseLock lock = responseMap.get(applicationAttemptId);
     if (lock == null) {
       throwApplicationDoesNotExistInCacheException(applicationAttemptId);
@@ -366,13 +375,6 @@ public class ApplicationMasterService extends AbstractService implements
 
       this.amLivelinessMonitor.receivedPing(applicationAttemptId);
 
-      RMApp rmApp =
-          rmContext.getRMApps().get(appId);
-
-      if (rmApp.isAppFinalStateStored()) {
-        return FinishApplicationMasterResponse.newInstance(true);
-      }
-
       rmContext.getDispatcher().getEventHandler().handle(
           new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
               .getTrackingUrl(), request.getFinalApplicationStatus(), request

+ 38 - 22
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -51,6 +52,7 @@ public class MockAM {
   private final ApplicationAttemptId attemptId;
   private RMContext context;
   private ApplicationMasterProtocol amRMProtocol;
+  private UserGroupInformation ugi;
 
   private final List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
   private final List<ContainerId> releases = new ArrayList<ContainerId>();
@@ -101,15 +103,18 @@ public class MockAM {
     req.setHost("");
     req.setRpcPort(1);
     req.setTrackingUrl("");
-    UserGroupInformation ugi =
-        UserGroupInformation.createRemoteUser(attemptId.toString());
-    Token<AMRMTokenIdentifier> token =
-        context.getRMApps().get(attemptId.getApplicationId())
-          .getRMAppAttempt(attemptId).getAMRMToken();
-    ugi.addTokenIdentifier(token.decodeIdentifier());
+    if (ugi == null) {
+      ugi = UserGroupInformation.createRemoteUser(
+          attemptId.toString());
+      Token<AMRMTokenIdentifier> token =
+          context.getRMApps().get(attemptId.getApplicationId())
+              .getRMAppAttempt(attemptId).getAMRMToken();
+      ugi.addTokenIdentifier(token.decodeIdentifier());
+    }
     try {
       return ugi
-        .doAs(new PrivilegedExceptionAction<RegisterApplicationMasterResponse>() {
+        .doAs(
+            new PrivilegedExceptionAction<RegisterApplicationMasterResponse>() {
           @Override
           public RegisterApplicationMasterResponse run() throws Exception {
             return amRMProtocol.registerApplicationMaster(req);
@@ -245,10 +250,15 @@ public class MockAM {
 
   public void unregisterAppAttempt() throws Exception {
     waitForState(RMAppAttemptState.RUNNING);
+    unregisterAppAttempt(true);
+  }
+
+  public void unregisterAppAttempt(boolean waitForStateRunning)
+      throws Exception {
     final FinishApplicationMasterRequest req =
         FinishApplicationMasterRequest.newInstance(
-          FinalApplicationStatus.SUCCEEDED, "", "");
-    unregisterAppAttempt(req,true);
+            FinalApplicationStatus.SUCCEEDED, "", "");
+    unregisterAppAttempt(req, waitForStateRunning);
   }
 
   public void unregisterAppAttempt(final FinishApplicationMasterRequest req,
@@ -256,19 +266,25 @@ public class MockAM {
     if (waitForStateRunning) {
       waitForState(RMAppAttemptState.RUNNING);
     }
-    UserGroupInformation ugi =
-        UserGroupInformation.createRemoteUser(attemptId.toString());
-    Token<AMRMTokenIdentifier> token =
-        context.getRMApps().get(attemptId.getApplicationId())
-            .getRMAppAttempt(attemptId).getAMRMToken();
-    ugi.addTokenIdentifier(token.decodeIdentifier());
-    ugi.doAs(new PrivilegedExceptionAction<Object>() {
-      @Override
-      public Object run() throws Exception {
-        amRMProtocol.finishApplicationMaster(req);
-        return null;
-      }
-    });
+    if (ugi == null) {
+      ugi =  UserGroupInformation.createRemoteUser(attemptId.toString());
+      Token<AMRMTokenIdentifier> token =
+          context.getRMApps()
+              .get(attemptId.getApplicationId())
+              .getRMAppAttempt(attemptId).getAMRMToken();
+      ugi.addTokenIdentifier(token.decodeIdentifier());
+    }
+    try {
+      ugi.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws Exception {
+          amRMProtocol.finishApplicationMaster(req);
+          return null;
+        }
+      });
+    } catch (UndeclaredThrowableException e) {
+      throw (Exception) e.getCause();
+    }
   }
 
   public ApplicationAttemptId getApplicationAttemptId() {

+ 16 - 16
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java

@@ -23,9 +23,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
-import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
-import org.junit.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,15 +40,19 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.*;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
-
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.Assert;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import static java.lang.Thread.sleep;
-import static org.mockito.Matchers.any;
 
 public class TestApplicationMasterService {
   private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
@@ -240,25 +242,23 @@ public class TestApplicationMasterService {
       FinishApplicationMasterRequest req =
           FinishApplicationMasterRequest.newInstance(
               FinalApplicationStatus.FAILED, "", "");
-      Throwable cause = null;
       try {
         am1.unregisterAppAttempt(req, false);
+        Assert.fail("ApplicationMasterNotRegisteredException should be thrown");
+      } catch (ApplicationMasterNotRegisteredException e) {
+        Assert.assertNotNull(e);
+        Assert.assertNotNull(e.getMessage());
+        Assert.assertTrue(e.getMessage().contains(
+            "Application Master is trying to unregister before registering for:"
+        ));
       } catch (Exception e) {
-        cause = e.getCause();
+        Assert.fail("ApplicationMasterNotRegisteredException should be thrown");
       }
-      Assert.assertNotNull(cause);
-      Assert
-          .assertTrue(cause instanceof ApplicationMasterNotRegisteredException);
-      Assert.assertNotNull(cause.getMessage());
-      Assert
-          .assertTrue(cause
-              .getMessage()
-              .contains(
-                  "Application Master is trying to unregister before registering for:"));
 
       am1.registerAppAttempt();
 
       am1.unregisterAppAttempt(req, false);
+      am1.waitForState(RMAppAttemptState.FINISHING);
     } finally {
       if (rm != null) {
         rm.stop();

+ 46 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -668,6 +670,9 @@ public class TestWorkPreservingRMRestart {
     // create app and launch the AM
     RMApp app0 = rm1.submitApp(200);
     MockAM am0 = MockRM.launchAM(app0, rm1, nm1);
+    // Issuing registerAppAttempt() before and after RM restart to confirm
+    // registerApplicationMaster() is idempotent.
+    am0.registerAppAttempt();
 
     // start new RM
     rm2 = new MockRM(conf, memStore);
@@ -676,6 +681,7 @@ public class TestWorkPreservingRMRestart {
     rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED);
 
     am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
+    // retry registerApplicationMaster() after RM restart.
     am0.registerAppAttempt(true);
 
     rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
@@ -897,4 +903,44 @@ public class TestWorkPreservingRMRestart {
       Thread.sleep(500);
     }
   }
+
+  /**
+   * Testing to confirm that retried finishApplicationMaster() doesn't throw
+   * InvalidApplicationMasterRequest before and after RM restart.
+   */
+  @Test (timeout = 20000)
+  public void testRetriedFinishApplicationMasterRequest()
+      throws Exception {
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    // start RM
+    rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create app and launch the AM
+    RMApp app0 = rm1.submitApp(200);
+    MockAM am0 = MockRM.launchAM(app0, rm1, nm1);
+
+    am0.registerAppAttempt();
+
+    // Emulating following a scenario:
+    // RM1 saves the app in RMStateStore and then crashes,
+    // FinishApplicationMasterResponse#isRegistered still return false,
+    // so AM still retry the 2nd RM
+    MockRM.finishAMAndVerifyAppState(app0, rm1, nm1, am0);
+
+
+    // start new RM
+    rm2 = new MockRM(conf, memStore);
+    rm2.start();
+
+    am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
+    am0.unregisterAppAttempt(false);
+  }
+
 }