Просмотр исходного кода

YARN-5090. Add End-to-End test-cases for DistributedScheduling using MiniYarnCluster. (asuresh)

(cherry picked from commit 8a9ecb7584b529be1de3a6e8850041e211c88ebc)
Arun Suresh 9 лет назад
Родитель
Сommit
a555a320e8

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java

@@ -291,7 +291,7 @@ public class TestAMRMProxy {
     }
   }
 
-  private ApplicationMasterProtocol createAMRMProtocol(YarnClient rmClient,
+  protected ApplicationMasterProtocol createAMRMProtocol(YarnClient rmClient,
       ApplicationId appId, MiniYARNCluster cluster,
       final Configuration yarnConf)
           throws IOException, InterruptedException, YarnException {
@@ -331,7 +331,7 @@ public class TestAMRMProxy {
         });
   }
 
-  private AllocateRequest createAllocateRequest(List<NodeReport> listNode) {
+  protected AllocateRequest createAllocateRequest(List<NodeReport> listNode) {
     // The test needs AMRMClient to create a real allocate request
     AMRMClientImpl<ContainerRequest> amClient =
         new AMRMClientImpl<ContainerRequest>();
@@ -361,7 +361,7 @@ public class TestAMRMProxy {
         new ArrayList<ContainerId>(), resourceBlacklistRequest);
   }
 
-  private ApplicationId createApp(YarnClient yarnClient,
+  protected ApplicationId createApp(YarnClient yarnClient,
       MiniYARNCluster yarnCluster) throws Exception {
 
     ApplicationSubmissionContext appContext =

+ 307 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java

@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Validates End2End Distributed Scheduling flow which includes the AM
+ * specifying OPPORTUNISTIC containers in its resource requests,
+ * the AMRMProxyService on the NM, the LocalScheduler RequestInterceptor on
+ * the NM and the DistributedSchedulingProtocol used by the framework to talk
+ * to the DistributedSchedulingService running on the RM.
+ */
+public class TestDistributedScheduling extends TestAMRMProxy {
+
+  private static final Log LOG =
+      LogFactory.getLog(TestDistributedScheduling.class);
+
+  /**
+   * Validates if Allocate Requests containing only OPPORTUNISTIC container
+   * requests are satisfied instantly.
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 60000)
+  public void testOpportunisticExecutionTypeRequestE2E() throws Exception {
+    MiniYARNCluster cluster =
+        new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1);
+    YarnClient rmClient = null;
+    ApplicationMasterProtocol client;
+
+    try {
+      Configuration conf = new YarnConfiguration();
+      conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
+      conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
+      conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
+      cluster.init(conf);
+      cluster.start();
+      final Configuration yarnConf = cluster.getConfig();
+
+      // the client has to connect to AMRMProxy
+
+      yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+          YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
+      rmClient = YarnClient.createYarnClient();
+      rmClient.init(yarnConf);
+      rmClient.start();
+
+      // Submit application
+
+      ApplicationId appId = createApp(rmClient, cluster);
+
+      client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
+
+      LOG.info("testDistributedSchedulingE2E - Register");
+
+      RegisterApplicationMasterResponse responseRegister =
+          client.registerApplicationMaster(RegisterApplicationMasterRequest
+              .newInstance(NetUtils.getHostname(), 1024, ""));
+
+      Assert.assertNotNull(responseRegister);
+      Assert.assertNotNull(responseRegister.getQueue());
+      Assert.assertNotNull(responseRegister.getApplicationACLs());
+      Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
+      Assert
+          .assertNotNull(responseRegister.getContainersFromPreviousAttempts());
+      Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
+      Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
+
+      RMApp rmApp =
+          cluster.getResourceManager().getRMContext().getRMApps().get(appId);
+      Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
+
+      LOG.info("testDistributedSchedulingE2E - Allocate");
+
+      AllocateRequest request =
+          createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
+
+      // Replace 'ANY' requests with OPPORTUNISTIC aks and remove
+      // everything else
+      List<ResourceRequest> newAskList = new ArrayList<>();
+      for (ResourceRequest rr : request.getAskList()) {
+        if (ResourceRequest.ANY.equals(rr.getResourceName())) {
+          ResourceRequest newRR = ResourceRequest.newInstance(rr
+                  .getPriority(), rr.getResourceName(),
+              rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
+              rr.getNodeLabelExpression(), ExecutionType.OPPORTUNISTIC);
+          newAskList.add(newRR);
+        }
+      }
+      request.setAskList(newAskList);
+
+      AllocateResponse allocResponse = client.allocate(request);
+      Assert.assertNotNull(allocResponse);
+
+      // Ensure that all the requests are satisfied immediately
+      Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
+
+      // Verify that the allocated containers are OPPORTUNISTIC
+      for (Container allocatedContainer : allocResponse
+          .getAllocatedContainers()) {
+        ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
+            .newContainerTokenIdentifier(
+                allocatedContainer.getContainerToken());
+        Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+            containerTokenIdentifier.getExecutionType());
+      }
+
+      LOG.info("testDistributedSchedulingE2E - Finish");
+
+      FinishApplicationMasterResponse responseFinish =
+          client.finishApplicationMaster(FinishApplicationMasterRequest
+              .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
+
+      Assert.assertNotNull(responseFinish);
+
+    } finally {
+      if (rmClient != null) {
+        rmClient.stop();
+      }
+      cluster.stop();
+    }
+  }
+
+  /**
+   * Validates if Allocate Requests containing both GUARANTEED and OPPORTUNISTIC
+   * container requests works as expected.
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 60000)
+  public void testMixedExecutionTypeRequestE2E() throws Exception {
+    MiniYARNCluster cluster =
+        new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1);
+    YarnClient rmClient = null;
+    ApplicationMasterProtocol client;
+
+    try {
+      Configuration conf = new YarnConfiguration();
+      conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
+      conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
+      conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
+      cluster.init(conf);
+      cluster.start();
+      final Configuration yarnConf = cluster.getConfig();
+
+      // the client has to connect to AMRMProxy
+
+      yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+          YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
+      rmClient = YarnClient.createYarnClient();
+      rmClient.init(yarnConf);
+      rmClient.start();
+
+      // Submit application
+
+      ApplicationId appId = createApp(rmClient, cluster);
+
+      client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
+
+      LOG.info("testDistributedSchedulingE2E - Register");
+
+      RegisterApplicationMasterResponse responseRegister =
+          client.registerApplicationMaster(RegisterApplicationMasterRequest
+              .newInstance(NetUtils.getHostname(), 1024, ""));
+
+      Assert.assertNotNull(responseRegister);
+      Assert.assertNotNull(responseRegister.getQueue());
+      Assert.assertNotNull(responseRegister.getApplicationACLs());
+      Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
+      Assert
+          .assertNotNull(responseRegister.getContainersFromPreviousAttempts());
+      Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
+      Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
+
+      RMApp rmApp =
+          cluster.getResourceManager().getRMContext().getRMApps().get(appId);
+      Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
+
+      LOG.info("testDistributedSchedulingE2E - Allocate");
+
+      AllocateRequest request =
+          createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
+      List<ResourceRequest> askList = request.getAskList();
+      List<ResourceRequest> newAskList = new ArrayList<>(askList);
+
+      // Duplicate all ANY requests marking them as opportunistic
+      for (ResourceRequest rr : askList) {
+        if (ResourceRequest.ANY.equals(rr.getResourceName())) {
+          ResourceRequest newRR = ResourceRequest.newInstance(rr
+              .getPriority(), rr.getResourceName(),
+              rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
+              rr.getNodeLabelExpression(), ExecutionType.OPPORTUNISTIC);
+          newAskList.add(newRR);
+        }
+      }
+      request.setAskList(newAskList);
+
+      AllocateResponse allocResponse = client.allocate(request);
+      Assert.assertNotNull(allocResponse);
+
+      // Ensure that all the requests are satisfied immediately
+      Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
+
+      // Verify that the allocated containers are OPPORTUNISTIC
+      for (Container allocatedContainer : allocResponse
+          .getAllocatedContainers()) {
+        ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
+            .newContainerTokenIdentifier(
+            allocatedContainer.getContainerToken());
+        Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+            containerTokenIdentifier.getExecutionType());
+      }
+
+      request.setAskList(new ArrayList<ResourceRequest>());
+      request.setResponseId(request.getResponseId() + 1);
+
+      Thread.sleep(1000);
+
+      // RM should allocate GUARANTEED containers within 2 calls to allocate()
+      allocResponse = client.allocate(request);
+      Assert.assertNotNull(allocResponse);
+      Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
+
+      // Verify that the allocated containers are GUARANTEED
+      for (Container allocatedContainer : allocResponse
+          .getAllocatedContainers()) {
+        ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
+            .newContainerTokenIdentifier(
+                allocatedContainer.getContainerToken());
+        Assert.assertEquals(ExecutionType.GUARANTEED,
+            containerTokenIdentifier.getExecutionType());
+      }
+
+      LOG.info("testDistributedSchedulingE2E - Finish");
+
+      FinishApplicationMasterResponse responseFinish =
+          client.finishApplicationMaster(FinishApplicationMasterRequest
+              .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
+
+      Assert.assertNotNull(responseFinish);
+
+    } finally {
+      if (rmClient != null) {
+        rmClient.stop();
+      }
+      cluster.stop();
+    }
+  }
+
+  @Ignore
+  @Override
+  public void testAMRMProxyE2E() throws Exception { }
+
+  @Ignore
+  @Override
+  public void testE2ETokenRenewal() throws Exception { }
+
+  @Ignore
+  @Override
+  public void testE2ETokenSwap() throws Exception { }
+}

+ 49 - 19
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java

@@ -23,9 +23,18 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+    .AllocateResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+    .FinishApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+    .FinishApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+    .RegisterApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+    .RegisterApplicationMasterResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -43,6 +52,10 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
+    .DistSchedAllocateResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
+    .DistSchedRegisterResponsePBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
     .AMLivelinessMonitor;
 
@@ -139,18 +152,30 @@ public class TestDistributedSchedulingService {
     // ApplicationMasterProtocol clients
     RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class,
         ProtobufRpcEngine.class);
-    ApplicationMasterProtocol ampProxy =
-        (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol
-            .class, NetUtils.getConnectAddress(server), conf);
-    RegisterApplicationMasterResponse regResp = ampProxy.registerApplicationMaster(
-            factory.newRecordInstance(RegisterApplicationMasterRequest.class));
+    ApplicationMasterProtocolPB ampProxy =
+        RPC.getProxy(ApplicationMasterProtocolPB
+        .class, 1, NetUtils.getConnectAddress(server), conf);
+    RegisterApplicationMasterResponse regResp =
+        new RegisterApplicationMasterResponsePBImpl(
+            ampProxy.registerApplicationMaster(null,
+                ((RegisterApplicationMasterRequestPBImpl)factory
+                    .newRecordInstance(
+                        RegisterApplicationMasterRequest.class)).getProto()));
     Assert.assertEquals("dummyQueue", regResp.getQueue());
-    FinishApplicationMasterResponse finishResp = ampProxy
-        .finishApplicationMaster(factory.newRecordInstance(
-            FinishApplicationMasterRequest.class));
+    FinishApplicationMasterResponse finishResp =
+        new FinishApplicationMasterResponsePBImpl(
+            ampProxy.finishApplicationMaster(null,
+                ((FinishApplicationMasterRequestPBImpl)factory
+                    .newRecordInstance(
+                        FinishApplicationMasterRequest.class)).getProto()
+            ));
     Assert.assertEquals(false, finishResp.getIsUnregistered());
-    AllocateResponse allocResp = ampProxy
-        .allocate(factory.newRecordInstance(AllocateRequest.class));
+    AllocateResponse allocResp =
+        new AllocateResponsePBImpl(
+            ampProxy.allocate(null,
+                ((AllocateRequestPBImpl)factory
+                    .newRecordInstance(AllocateRequest.class)).getProto())
+        );
     Assert.assertEquals(12345, allocResp.getNumClusterNodes());
 
 
@@ -158,17 +183,22 @@ public class TestDistributedSchedulingService {
     // DistributedSchedulerProtocol clients as well
     RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class,
         ProtobufRpcEngine.class);
-    DistributedSchedulerProtocol dsProxy =
-        (DistributedSchedulerProtocol) rpc.getProxy(DistributedSchedulerProtocol
-            .class, NetUtils.getConnectAddress(server), conf);
+    DistributedSchedulerProtocolPB dsProxy =
+        RPC.getProxy(DistributedSchedulerProtocolPB
+            .class, 1, NetUtils.getConnectAddress(server), conf);
 
     DistSchedRegisterResponse dsRegResp =
-        dsProxy.registerApplicationMasterForDistributedScheduling(
-        factory.newRecordInstance(RegisterApplicationMasterRequest.class));
+        new DistSchedRegisterResponsePBImpl(
+            dsProxy.registerApplicationMasterForDistributedScheduling(null,
+                ((RegisterApplicationMasterRequestPBImpl)factory
+                    .newRecordInstance(RegisterApplicationMasterRequest.class))
+                    .getProto()));
     Assert.assertEquals(54321l, dsRegResp.getContainerIdStart());
     DistSchedAllocateResponse dsAllocResp =
-        dsProxy.allocateForDistributedScheduling(
-            factory.newRecordInstance(AllocateRequest.class));
+        new DistSchedAllocateResponsePBImpl(
+            dsProxy.allocateForDistributedScheduling(null,
+                ((AllocateRequestPBImpl)factory
+                    .newRecordInstance(AllocateRequest.class)).getProto()));
     Assert.assertEquals(
         "h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
   }

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java

@@ -861,6 +861,11 @@ public class MiniYARNCluster extends CompositeService {
           localToken);
       RequestInterceptor rt = getPipelines()
           .get(applicationAttemptId.getApplicationId()).getRootInterceptor();
+      // The DefaultRequestInterceptor will generally be the last
+      // interceptor
+      while (rt.getNextInterceptor() != null) {
+        rt = rt.getNextInterceptor();
+      }
       if (rt instanceof DefaultRequestInterceptor) {
         ((DefaultRequestInterceptor) rt)
             .setRMClient(getResourceManager().getApplicationMasterService());