Browse Source

YARN-1893. Mark AtMostOnce annotation to ApplicationMasterProtocol#allocate. Contributed by Xuan Gong.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1583203 13f79535-47bb-0310-9956-ffa450edef68
Jian He 11 years ago
parent
commit
3532d96ff6

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -596,6 +596,9 @@ Release 2.4.0 - UNRELEASED
     YARN-1873. Fixed TestDistributedShell failure when the test cases are out of
     order. (Mit Desai via zjshen)
 
+    YARN-1893. Mark AtMostOnce annotation to ApplicationMasterProtocol#allocate.
+    (Xuan Gong via jianhe)
+
 Release 2.3.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationMasterProtocol.java

@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.io.retry.AtMostOnce;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -162,6 +163,7 @@ public interface ApplicationMasterProtocol {
    */
   @Public
   @Stable
+  @AtMostOnce
   public AllocateResponse allocate(AllocateRequest request) 
   throws YarnException, IOException;
 }

+ 54 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java

@@ -33,6 +33,8 @@ import junit.framework.Assert;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.ClientBaseWithFixes;
 import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
@@ -67,14 +69,18 @@ import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerReport;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
@@ -82,6 +88,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -96,6 +103,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
+import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
 import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
@@ -257,11 +265,13 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes{
   }
 
   protected void startHACluster(int numOfNMs, boolean overrideClientRMService,
-      boolean overrideRTS) throws Exception {
+      boolean overrideRTS, boolean overrideApplicationMasterService)
+      throws Exception {
     conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
     cluster =
         new MiniYARNClusterForHATesting(TestRMFailover.class.getName(), 2,
-            numOfNMs, 1, 1, false, overrideClientRMService, overrideRTS);
+            numOfNMs, 1, 1, false, overrideClientRMService, overrideRTS,
+            overrideApplicationMasterService);
     cluster.resetStartFailoverFlag(false);
     cluster.init(conf);
     cluster.start();
@@ -285,17 +295,19 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes{
 
     private boolean overrideClientRMService;
     private boolean overrideRTS;
+    private boolean overrideApplicationMasterService;
     private final AtomicBoolean startFailover = new AtomicBoolean(false);
     private final AtomicBoolean failoverTriggered = new AtomicBoolean(false);
 
     public MiniYARNClusterForHATesting(String testName,
         int numResourceManagers, int numNodeManagers, int numLocalDirs,
         int numLogDirs, boolean enableAHS, boolean overrideClientRMService,
-        boolean overrideRTS) {
+        boolean overrideRTS, boolean overrideApplicationMasterService) {
       super(testName, numResourceManagers, numNodeManagers, numLocalDirs,
           numLogDirs, enableAHS);
       this.overrideClientRMService = overrideClientRMService;
       this.overrideRTS = overrideRTS;
+      this.overrideApplicationMasterService = overrideApplicationMasterService;
     }
 
     public boolean getStartFailoverFlag() {
@@ -324,6 +336,11 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes{
       if (count >= maximumWaittingTime) {
         return false;
       }
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        // DO NOTHING
+      }
       return true;
     }
 
@@ -354,6 +371,14 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes{
           }
           return super.createResourceTrackerService();
         }
+        @Override
+        protected ApplicationMasterService createApplicationMasterService() {
+          if (overrideApplicationMasterService) {
+            return new CustomedApplicationMasterService(this.rmContext,
+                this.scheduler);
+          }
+          return super.createApplicationMasterService();
+        }
       };
     }
 
@@ -717,5 +742,31 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes{
         return super.nodeHeartbeat(request);
       }
     }
+
+    private class CustomedApplicationMasterService extends
+        ApplicationMasterService {
+      public CustomedApplicationMasterService(RMContext rmContext,
+          YarnScheduler scheduler) {
+        super(rmContext, scheduler);
+      }
+
+      @Override
+      public AllocateResponse allocate(AllocateRequest request)
+          throws YarnException, IOException {
+        resetStartFailoverFlag(true);
+        // make sure failover has been triggered
+        Assert.assertTrue(waittingForFailOver());
+        return createFakeAllocateResponse();
+      }
+
+    }
+
+    public AllocateResponse createFakeAllocateResponse() {
+      return AllocateResponse.newInstance(-1,
+          new ArrayList<ContainerStatus>(),
+          new ArrayList<Container>(), new ArrayList<NodeReport>(),
+          Resource.newInstance(1024, 2), AMCommand.AM_RESYNC, 1,
+          null, new ArrayList<NMToken>());
+    }
   }
 }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationClientProtocolOnHA.java

@@ -51,7 +51,7 @@ public class TestApplicationClientProtocolOnHA extends ProtocolHATestBase {
 
   @Before
   public void initiate() throws Exception {
-    startHACluster(1, true, false);
+    startHACluster(1, true, false, false);
     Configuration conf = new YarnConfiguration(this.conf);
     client = createAndStartYarnClient(conf);
   }

+ 94 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java

@@ -0,0 +1,94 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestApplicationMasterServiceOnHA extends ProtocolHATestBase{
+  private ApplicationMasterProtocol amClient;
+  private ApplicationAttemptId attemptId ;
+  RMAppAttempt appAttempt;
+
+  @Before
+  public void initiate() throws Exception {
+    startHACluster(0, false, false, true);
+    attemptId = this.cluster.createFakeApplicationAttemptId();
+    amClient = ClientRMProxy
+        .createRMProxy(this.conf, ApplicationMasterProtocol.class);
+
+    AMRMTokenIdentifier id =
+        new AMRMTokenIdentifier(attemptId);
+    Token<AMRMTokenIdentifier> appToken =
+        new Token<AMRMTokenIdentifier>(id, this.cluster.getResourceManager()
+            .getRMContext().getAMRMTokenSecretManager());
+    appToken.setService(new Text("appToken service"));
+    UserGroupInformation.setLoginUser(UserGroupInformation
+        .createRemoteUser(UserGroupInformation.getCurrentUser()
+            .getUserName()));
+    UserGroupInformation.getCurrentUser().addToken(appToken);
+    syncToken(appToken);
+  }
+
+  @After
+  public void shutDown() {
+    if(this.amClient != null) {
+      RPC.stopProxy(this.amClient);
+    }
+  }
+
+  @Test(timeout = 15000)
+  public void testAllocateOnHA() throws YarnException, IOException {
+    AllocateRequest request = AllocateRequest.newInstance(0, 50f,
+        new ArrayList<ResourceRequest>(),
+        new ArrayList<ContainerId>(),
+        ResourceBlacklistRequest.newInstance(new ArrayList<String>(),
+            new ArrayList<String>()));
+    AllocateResponse response = amClient.allocate(request);
+    Assert.assertEquals(response, this.cluster.createFakeAllocateResponse());
+  }
+
+  private void syncToken(Token<AMRMTokenIdentifier> token) throws IOException {
+    for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) {
+      this.cluster.getResourceManager(i).getRMContext()
+          .getAMRMTokenSecretManager().addPersistedPassword(token);
+    }
+  }
+}

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java

@@ -41,7 +41,7 @@ public class TestResourceTrackerOnHA extends ProtocolHATestBase{
 
   @Before
   public void initiate() throws Exception {
-    startHACluster(0, false, true);
+    startHACluster(0, false, true, false);
     this.resourceTracker = getRMClient();
   }
 

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java

@@ -739,4 +739,8 @@ public class MiniYARNCluster extends CompositeService {
       }
     };
   }
+
+  public int getNumOfResourceManager() {
+    return this.resourceManagers.length;
+  }
 }