瀏覽代碼

YARN-599. Refactoring submitApplication in ClientRMService and RMAppManager to separate out various validation checks depending on whether they rely on RM configuration or not. Contributed by Zhijie Shen.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1477478 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 12 年之前
父節點
當前提交
6de09af244

+ 4 - 0
hadoop-yarn-project/CHANGES.txt

@@ -197,6 +197,10 @@ Release 2.0.5-beta - UNRELEASED
     YARN-591. Moved RM recovery related records out of public API as they do not
     belong there. (vinodkv)
 
+    YARN-599. Refactoring submitApplication in ClientRMService and RMAppManager
+    to separate out various validation checks depending on whether they rely on
+    RM configuration or not. (Zhijie Shen via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 44 - 37
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java

@@ -38,7 +38,6 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
@@ -72,7 +71,6 @@ import org.apache.hadoop.yarn.api.records.DelegationToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
@@ -83,15 +81,11 @@ import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -266,48 +260,61 @@ public class ClientRMService extends AbstractService implements
     ApplicationSubmissionContext submissionContext = request
         .getApplicationSubmissionContext();
     ApplicationId applicationId = submissionContext.getApplicationId();
-    String user = submissionContext.getAMContainerSpec().getUser();
+
+    // ApplicationSubmissionContext needs to be validated for safety - only
+    // those fields that are independent of the RM's configuration will be
+    // checked here, those that are dependent on RM configuration are validated
+    // in RMAppManager.
+
+    String user = null;
     try {
+      // Safety
       user = UserGroupInformation.getCurrentUser().getShortUserName();
-      if (rmContext.getRMApps().get(applicationId) != null) {
-        throw new IOException("Application with id " + applicationId
-            + " is already present! Cannot add a duplicate!");
-      }
-
-      // Safety 
       submissionContext.getAMContainerSpec().setUser(user);
+    } catch (IOException ie) {
+      LOG.warn("Unable to get the current user.", ie);
+      RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
+          ie.getMessage(), "ClientRMService",
+          "Exception in submitting application", applicationId);
+      throw RPCUtil.getRemoteException(ie);
+    }
 
-      // Check whether AM resource requirements are within required limits
-      if (!submissionContext.getUnmanagedAM()) {
-        ResourceRequest amReq = BuilderUtils.newResourceRequest(
-            RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
-            submissionContext.getResource(), 1);
-        try {
-          SchedulerUtils.validateResourceRequest(amReq,
-              scheduler.getMaximumResourceCapability());
-        } catch (InvalidResourceRequestException e) {
-          LOG.warn("RM app submission failed in validating AM resource request"
-              + " for application " + applicationId, e);
-          throw RPCUtil.getRemoteException(e);
-        }
-      }
+    // Though duplication will checked again when app is put into rmContext,
+    // but it is good to fail the invalid submission as early as possible.
+    if (rmContext.getRMApps().get(applicationId) != null) {
+      String message = "Application with id " + applicationId +
+          " is already present! Cannot add a duplicate!";
+      LOG.warn(message);
+      RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
+          message, "ClientRMService", "Exception in submitting application",
+          applicationId);
+      throw RPCUtil.getRemoteException(message);
+    }
+
+    if (submissionContext.getQueue() == null) {
+      submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
+    }
+    if (submissionContext.getApplicationName() == null) {
+      submissionContext.setApplicationName(
+          YarnConfiguration.DEFAULT_APPLICATION_NAME);
+    }
 
-      // This needs to be synchronous as the client can query 
-      // immediately following the submission to get the application status.
-      // So call handle directly and do not send an event.
-      rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System
-          .currentTimeMillis()));
+    try {
+      // call RMAppManager to submit application directly
+      rmAppManager.submitApplication(submissionContext,
+          System.currentTimeMillis(), false);
 
       LOG.info("Application with id " + applicationId.getId() + 
           " submitted by user " + user);
       RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
           "ClientRMService", applicationId);
-    } catch (IOException ie) {
-      LOG.info("Exception in submitting application", ie);
-      RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, 
-          ie.getMessage(), "ClientRMService",
+    } catch (YarnRemoteException e) {
+      LOG.info("Exception in submitting application with id " +
+          applicationId.getId(), e);
+      RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
+          e.getMessage(), "ClientRMService",
           "Exception in submitting application", applicationId);
-      throw RPCUtil.getRemoteException(ie);
+      throw e;
     }
 
     SubmitApplicationResponse response = recordFactory

+ 63 - 52
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java

@@ -31,8 +31,10 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@@ -45,8 +47,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.InvalidResourceRequestException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 
 /**
  * This class manages the list of applications for the resource manager. 
@@ -233,64 +239,77 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
   @SuppressWarnings("unchecked")
   protected void submitApplication(
       ApplicationSubmissionContext submissionContext, long submitTime,
-      boolean isRecovered) {
+      boolean isRecovered) throws YarnRemoteException {
     ApplicationId applicationId = submissionContext.getApplicationId();
-    RMApp application = null;
-    try {
 
-      // Sanity checks
-      if (submissionContext.getQueue() == null) {
-        submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
-      }
-      if (submissionContext.getApplicationName() == null) {
-        submissionContext.setApplicationName(
-            YarnConfiguration.DEFAULT_APPLICATION_NAME);
+    // Validation of the ApplicationSubmissionContext needs to be completed
+    // here. Only those fields that are dependent on RM's configuration are
+    // checked here as they have to be validated whether they are part of new
+    // submission or just being recovered.
+
+    // Check whether AM resource requirements are within required limits
+    if (!submissionContext.getUnmanagedAM()) {
+      ResourceRequest amReq = BuilderUtils.newResourceRequest(
+          RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
+          submissionContext.getResource(), 1);
+      try {
+        SchedulerUtils.validateResourceRequest(amReq,
+            scheduler.getMaximumResourceCapability());
+      } catch (InvalidResourceRequestException e) {
+        LOG.warn("RM app submission failed in validating AM resource request"
+            + " for application " + applicationId, e);
+        throw RPCUtil.getRemoteException(e);
       }
+    }
 
-      // Create RMApp
-      application =
-          new RMAppImpl(applicationId, rmContext, this.conf,
-              submissionContext.getApplicationName(),
-              submissionContext.getAMContainerSpec().getUser(),
-              submissionContext.getQueue(),
-              submissionContext, this.scheduler, this.masterService,
-              submitTime);
-
-      // Sanity check - duplicate?
-      if (rmContext.getRMApps().putIfAbsent(applicationId, application) != 
-          null) {
-        String message = "Application with id " + applicationId
-            + " is already present! Cannot add a duplicate!";
-        LOG.info(message);
-        throw RPCUtil.getRemoteException(message);
-      } 
+    // Create RMApp
+    RMApp application =
+        new RMAppImpl(applicationId, rmContext, this.conf,
+            submissionContext.getApplicationName(),
+            submissionContext.getAMContainerSpec().getUser(),
+            submissionContext.getQueue(),
+            submissionContext, this.scheduler, this.masterService,
+            submitTime);
 
-      // Inform the ACLs Manager
-      this.applicationACLsManager.addApplication(applicationId,
-          submissionContext.getAMContainerSpec().getApplicationACLs());
+    // Concurrent app submissions with same applicationId will fail here
+    // Concurrent app submissions with different applicationIds will not
+    // influence each other
+    if (rmContext.getRMApps().putIfAbsent(applicationId, application) !=
+        null) {
+      String message = "Application with id " + applicationId
+          + " is already present! Cannot add a duplicate!";
+      LOG.warn(message);
+      throw RPCUtil.getRemoteException(message);
+    }
+
+    // Inform the ACLs Manager
+    this.applicationACLsManager.addApplication(applicationId,
+        submissionContext.getAMContainerSpec().getApplicationACLs());
 
+    try {
       // Setup tokens for renewal
       if (UserGroupInformation.isSecurityEnabled()) {
         this.rmContext.getDelegationTokenRenewer().addApplication(
             applicationId,parseCredentials(submissionContext),
             submissionContext.getCancelTokensWhenComplete()
             );
-      }      
-      
-      // All done, start the RMApp
-      this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppEvent(applicationId, isRecovered ? RMAppEventType.RECOVER:
-            RMAppEventType.START));
+      }
     } catch (IOException ie) {
-        LOG.info("RMAppManager submit application exception", ie);
-        if (application != null) {
-          // Sending APP_REJECTED is fine, since we assume that the 
-          // RMApp is in NEW state and thus we havne't yet informed the 
-          // Scheduler about the existence of the application
-          this.rmContext.getDispatcher().getEventHandler().handle(
-              new RMAppRejectedEvent(applicationId, ie.getMessage()));
-        }
+      LOG.warn(
+          "Unable to add the application to the delegation token renewer.",
+          ie);
+      // Sending APP_REJECTED is fine, since we assume that the
+      // RMApp is in NEW state and thus we havne't yet informed the
+      // Scheduler about the existence of the application
+      this.rmContext.getDispatcher().getEventHandler().handle(
+          new RMAppRejectedEvent(applicationId, ie.getMessage()));
+      throw RPCUtil.getRemoteException(ie);
     }
+
+    // All done, start the RMApp
+    this.rmContext.getDispatcher().getEventHandler().handle(
+        new RMAppEvent(applicationId, isRecovered ? RMAppEventType.RECOVER:
+            RMAppEventType.START));
   }
   
   private Credentials parseCredentials(ApplicationSubmissionContext application) 
@@ -377,14 +396,6 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
         checkAppNumCompletedLimit(); 
       } 
       break;
-      case APP_SUBMIT:
-      {
-        ApplicationSubmissionContext submissionContext = 
-            ((RMAppManagerSubmitEvent)event).getSubmissionContext();
-        long submitTime = ((RMAppManagerSubmitEvent)event).getSubmitTime();
-        submitApplication(submissionContext, submitTime, false);
-      }
-      break;
       default:
         LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
       }

+ 0 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java

@@ -19,6 +19,5 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 public enum RMAppManagerEventType {
-  APP_SUBMIT,
   APP_COMPLETED
 }

+ 0 - 43
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java

@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager;
-
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-
-public class RMAppManagerSubmitEvent extends RMAppManagerEvent {
-
-  private final ApplicationSubmissionContext submissionContext;
-  private final long submitTime;
-
-  public RMAppManagerSubmitEvent(
-      ApplicationSubmissionContext submissionContext, long submitTime) {
-    super(submissionContext.getApplicationId(),
-        RMAppManagerEventType.APP_SUBMIT);
-    this.submissionContext = submissionContext;
-    this.submitTime = submitTime;
-  }
-
-  public ApplicationSubmissionContext getSubmissionContext() {
-    return this.submissionContext;
-  }
-  
-  public long getSubmitTime() {
-    return this.submitTime;
-  }
-}

+ 110 - 133
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java

@@ -19,6 +19,9 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
@@ -31,12 +34,15 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -46,11 +52,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessM
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.service.Service;
-import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -163,9 +169,10 @@ public class TestAppManager{
       super.setCompletedAppsMax(max);
     }
     public void submitApplication(
-        ApplicationSubmissionContext submissionContext) {
-      super.submitApplication(
-          submissionContext, System.currentTimeMillis(), false);
+        ApplicationSubmissionContext submissionContext)
+            throws YarnRemoteException {
+      super.submitApplication(submissionContext, System.currentTimeMillis(),
+          false);
     }
   }
 
@@ -179,6 +186,40 @@ public class TestAppManager{
     }
   }
 
+  private RMContext rmContext;
+  private TestRMAppManager appMonitor;
+  private ApplicationSubmissionContext asContext;
+  private ApplicationId appId;
+
+  @Before
+  public void setUp() {
+    long now = System.currentTimeMillis();
+
+    rmContext = mockRMContext(1, now - 10);
+    ResourceScheduler scheduler = mockResourceScheduler();
+    Configuration conf = new Configuration();
+    ApplicationMasterService masterService =
+        new ApplicationMasterService(rmContext, scheduler);
+    appMonitor = new TestRMAppManager(rmContext,
+        new ClientToAMTokenSecretManagerInRM(), scheduler, masterService,
+        new ApplicationACLsManager(conf), conf);
+
+    appId = MockApps.newAppID(1);
+    RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+    asContext =
+        recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+    asContext.setApplicationId(appId);
+    asContext.setAMContainerSpec(mockContainerLaunchContext(recordFactory));
+    asContext.setResource(mockResource());
+    setupDispatcher(rmContext, conf);
+  }
+
+  @After
+  public void tearDown() {
+    setAppEventType(RMAppEventType.KILL);
+    ((Service)rmContext.getDispatcher()).stop();
+  }
+
   @Test
   public void testRMAppRetireNone() throws Exception {
     long now = System.currentTimeMillis();
@@ -334,38 +375,10 @@ public class TestAppManager{
 
   @Test
   public void testRMAppSubmit() throws Exception {
-    long now = System.currentTimeMillis();
-
-    RMContext rmContext = mockRMContext(0, now - 10);
-    ResourceScheduler scheduler = new CapacityScheduler();
-    Configuration conf = new Configuration();
-    ApplicationMasterService masterService =
-        new ApplicationMasterService(rmContext, scheduler);
-    TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
-        new ClientToAMTokenSecretManagerInRM(), scheduler, masterService,
-        new ApplicationACLsManager(conf), conf);
-
-    ApplicationId appID = MockApps.newAppID(1);
-    RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-    ApplicationSubmissionContext context = 
-        recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
-    context.setApplicationId(appID);
-    ContainerLaunchContext amContainer = recordFactory
-        .newRecordInstance(ContainerLaunchContext.class);
-    amContainer.setApplicationACLs(new HashMap<ApplicationAccessType, String>());
-    context.setAMContainerSpec(amContainer);
-    setupDispatcher(rmContext, conf);
-
-    appMonitor.submitApplication(context);
-    RMApp app = rmContext.getRMApps().get(appID);
+    appMonitor.submitApplication(asContext);
+    RMApp app = rmContext.getRMApps().get(appId);
     Assert.assertNotNull("app is null", app);
-    Assert.assertEquals("app id doesn't match", appID, app.getApplicationId());
-    Assert.assertEquals("app name doesn't match", 
-        YarnConfiguration.DEFAULT_APPLICATION_NAME, 
-        app.getName());
-    Assert.assertEquals("app queue doesn't match", 
-        YarnConfiguration.DEFAULT_QUEUE_NAME, 
-        app.getQueue());
+    Assert.assertEquals("app id doesn't match", appId, app.getApplicationId());
     Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState());
 
     // wait for event to be processed
@@ -374,9 +387,8 @@ public class TestAppManager{
         timeoutSecs++ < 20) {
       Thread.sleep(1000);
     }
-    Assert.assertEquals("app event type sent is wrong", RMAppEventType.START, getAppEventType());
-    setAppEventType(RMAppEventType.KILL); 
-    ((Service)rmContext.getDispatcher()).stop();
+    Assert.assertEquals("app event type sent is wrong", RMAppEventType.START,
+        getAppEventType());
   }
 
   @Test (timeout = 30000)
@@ -390,10 +402,7 @@ public class TestAppManager{
         new int[]{ 1, 1, 1, 1 }};
     for (int i = 0; i < globalMaxAppAttempts.length; ++i) {
       for (int j = 0; j < individualMaxAppAttempts.length; ++j) {
-        long now = System.currentTimeMillis();
-
-        RMContext rmContext = mockRMContext(0, now - 10);
-        ResourceScheduler scheduler = new CapacityScheduler();
+        ResourceScheduler scheduler = mockResourceScheduler();
         Configuration conf = new Configuration();
         conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, globalMaxAppAttempts[i]);
         ApplicationMasterService masterService =
@@ -402,21 +411,12 @@ public class TestAppManager{
             new ClientToAMTokenSecretManagerInRM(), scheduler, masterService,
             new ApplicationACLsManager(conf), conf);
 
-        RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-        ApplicationSubmissionContext context =
-            recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
-        ContainerLaunchContext amContainer = recordFactory
-            .newRecordInstance(ContainerLaunchContext.class);
-        amContainer.setApplicationACLs(new HashMap<ApplicationAccessType, String>());
-        context.setAMContainerSpec(amContainer);
-        setupDispatcher(rmContext, conf);
-
-        ApplicationId appID = MockApps.newAppID(1);
-        context.setApplicationId(appID);
+        ApplicationId appID = MockApps.newAppID(i * 4 + j + 1);
+        asContext.setApplicationId(appID);
         if (individualMaxAppAttempts[i][j] != 0) {
-          context.setMaxAppAttempts(individualMaxAppAttempts[i][j]);
+          asContext.setMaxAppAttempts(individualMaxAppAttempts[i][j]);
         }
-        appMonitor.submitApplication(context);
+        appMonitor.submitApplication(asContext);
         RMApp app = rmContext.getRMApps().get(appID);
         Assert.assertEquals("max application attempts doesn't match",
             expectedNums[i][j], app.getMaxAppAttempts());
@@ -428,96 +428,73 @@ public class TestAppManager{
           Thread.sleep(1000);
         }
         setAppEventType(RMAppEventType.KILL);
-        ((Service)rmContext.getDispatcher()).stop();
       }
     }
   }
 
-  @Test (timeout = 3000)
-  public void testRMAppSubmitWithQueueAndName() throws Exception {
-    long now = System.currentTimeMillis();
-
-    RMContext rmContext = mockRMContext(1, now - 10);
-    ResourceScheduler scheduler = new CapacityScheduler();
-    Configuration conf = new Configuration();
-    ApplicationMasterService masterService =
-        new ApplicationMasterService(rmContext, scheduler);
-    TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
-        new ClientToAMTokenSecretManagerInRM(), scheduler, masterService,
-        new ApplicationACLsManager(conf), conf);
-
-    ApplicationId appID = MockApps.newAppID(10);
-    RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-    ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
-    context.setApplicationId(appID);
-    context.setApplicationName("testApp1");
-    context.setQueue("testQueue");
-    ContainerLaunchContext amContainer = recordFactory
-        .newRecordInstance(ContainerLaunchContext.class);
-    amContainer
-        .setApplicationACLs(new HashMap<ApplicationAccessType, String>());
-    context.setAMContainerSpec(amContainer);
+  @Test (timeout = 30000)
+  public void testRMAppSubmitDuplicateApplicationId() throws Exception {
+    ApplicationId appId = MockApps.newAppID(0);
+    asContext.setApplicationId(appId);
+    RMApp appOrig = rmContext.getRMApps().get(appId);
+    Assert.assertTrue("app name matches but shouldn't", "testApp1" != appOrig.getName());
 
-    setupDispatcher(rmContext, conf);
+    // our testApp1 should be rejected and original app with same id should be left in place
+    try {
+      appMonitor.submitApplication(asContext);
+      Assert.fail("Exception is expected when applicationId is duplicate.");
+    } catch (YarnRemoteException e) {
+      Assert.assertTrue("The thrown exception is not the expectd one.",
+          e.getMessage().contains("Cannot add a duplicate!"));
+    }
 
-    appMonitor.submitApplication(context);
-    RMApp app = rmContext.getRMApps().get(appID);
+    // make sure original app didn't get removed
+    RMApp app = rmContext.getRMApps().get(appId);
     Assert.assertNotNull("app is null", app);
-    Assert.assertEquals("app id doesn't match", appID, app.getApplicationId());
-    Assert.assertEquals("app name doesn't match", "testApp1", app.getName());
-    Assert.assertEquals("app queue doesn't match", "testQueue", app.getQueue());
-    Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState());
+    Assert.assertEquals("app id doesn't match", appId, app.getApplicationId());
+    Assert.assertEquals("app state doesn't match", RMAppState.FINISHED, app.getState());
+  }
 
-    // wait for event to be processed
-    int timeoutSecs = 0;
-    while ((getAppEventType() == RMAppEventType.KILL) && 
-        timeoutSecs++ < 20) {
-      Thread.sleep(1000);
+  @Test (timeout = 30000)
+  public void testRMAppSubmitInvalidResourceRequest() throws Exception {
+    asContext.setResource(Resources.createResource(
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1));
+
+    // submit an app
+    try {
+      appMonitor.submitApplication(asContext);
+      Assert.fail("Application submission should fail because resource" +
+          " request is invalid.");
+    } catch (YarnRemoteException e) {
+      // Exception is expected
+      Assert.assertTrue("The thrown exception is not" +
+          " InvalidResourceRequestException",
+          e.getMessage().startsWith("Invalid resource request"));
     }
-    Assert.assertEquals("app event type sent is wrong", RMAppEventType.START, getAppEventType());
-    setAppEventType(RMAppEventType.KILL); 
-    ((Service)rmContext.getDispatcher()).stop();
   }
 
-  @Test
-  public void testRMAppSubmitError() throws Exception {
-    long now = System.currentTimeMillis();
-
-    // specify 1 here and use same appId below so it gets duplicate entry
-    RMContext rmContext = mockRMContext(1, now - 10);
-    ResourceScheduler scheduler = new CapacityScheduler();
-    Configuration conf = new Configuration();
-    ApplicationMasterService masterService =
-        new ApplicationMasterService(rmContext, scheduler);
-    TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
-        new ClientToAMTokenSecretManagerInRM(), scheduler, masterService,
-        new ApplicationACLsManager(conf), conf);
-
-    ApplicationId appID = MockApps.newAppID(0);
-    RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-    ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
-    context.setApplicationId(appID);
-    context.setApplicationName("testApp1");
-    context.setQueue("testQueue");
-
-    setupDispatcher(rmContext, conf);
-
-    RMApp appOrig = rmContext.getRMApps().get(appID);
-    Assert.assertTrue("app name matches but shouldn't", "testApp1" != appOrig.getName());
+  private static ResourceScheduler mockResourceScheduler() {
+    ResourceScheduler scheduler = mock(ResourceScheduler.class);
+    when(scheduler.getMinimumResourceCapability()).thenReturn(
+        Resources.createResource(
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
+    when(scheduler.getMaximumResourceCapability()).thenReturn(
+        Resources.createResource(
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
+    return scheduler;
+  }
 
-    ContainerLaunchContext clc =
-        BuilderUtils.newContainerLaunchContext(null, null, null, null, null,
-            null, null);
-    context.setAMContainerSpec(clc);
-    // our testApp1 should be rejected and original app with same id should be left in place
-    appMonitor.submitApplication(context);
+  private static ContainerLaunchContext mockContainerLaunchContext(
+      RecordFactory recordFactory) {
+    ContainerLaunchContext amContainer = recordFactory.newRecordInstance(
+        ContainerLaunchContext.class);
+    amContainer.setApplicationACLs(new HashMap<ApplicationAccessType, String>());;
+    return amContainer;
+  }
 
-    // make sure original app didn't get removed
-    RMApp app = rmContext.getRMApps().get(appID);
-    Assert.assertNotNull("app is null", app);
-    Assert.assertEquals("app id doesn't match", appID, app.getApplicationId());
-    Assert.assertEquals("app name doesn't matches", appOrig.getName(), app.getName());
-    ((Service)rmContext.getDispatcher()).stop();
+  private static Resource mockResource() {
+    return Resources.createResource(
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
   }
 
 }

+ 85 - 57
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java

@@ -250,17 +250,70 @@ public class TestClientRMService {
         rmContext, null, null, null, dtsm);
     rmService.renewDelegationToken(request);
   }
+
+  @Test (timeout = 30000)
+  @SuppressWarnings ("rawtypes")
+  public void testAppSubmit() throws Exception {
+    YarnScheduler yarnScheduler = mockYarnScheduler();
+    RMContext rmContext = mock(RMContext.class);
+    mockRMContext(yarnScheduler, rmContext);
+    RMStateStore stateStore = mock(RMStateStore.class);
+    when(rmContext.getStateStore()).thenReturn(stateStore);
+    RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler,
+        null, mock(ApplicationACLsManager.class), new Configuration());
+    when(rmContext.getDispatcher().getEventHandler()).thenReturn(
+        new EventHandler<Event>() {
+          public void handle(Event event) {}
+        });
+    ClientRMService rmService =
+        new ClientRMService(rmContext, yarnScheduler, appManager, null, null);
+
+    // without name and queue
+    ApplicationId appId1 = getApplicationId(100);
+    SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(
+        appId1, null, null);
+    try {
+      rmService.submitApplication(submitRequest1);
+    } catch (YarnRemoteException e) {
+      Assert.fail("Exception is not expected.");
+    }
+    RMApp app1 = rmContext.getRMApps().get(appId1);
+    Assert.assertNotNull("app doesn't exist", app1);
+    Assert.assertEquals("app name doesn't match",
+        YarnConfiguration.DEFAULT_APPLICATION_NAME, app1.getName());
+    Assert.assertEquals("app queue doesn't match",
+        YarnConfiguration.DEFAULT_QUEUE_NAME, app1.getQueue());
+
+    // with name and queue
+    String name = MockApps.newAppName();
+    String queue = MockApps.newQueue();
+    ApplicationId appId2 = getApplicationId(101);
+    SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(
+        appId2, name, queue);
+    try {
+      rmService.submitApplication(submitRequest2);
+    } catch (YarnRemoteException e) {
+      Assert.fail("Exception is not expected.");
+    }
+    RMApp app2 = rmContext.getRMApps().get(appId2);
+    Assert.assertNotNull("app doesn't exist", app2);
+    Assert.assertEquals("app name doesn't match", name, app2.getName());
+    Assert.assertEquals("app queue doesn't match", queue, app2.getQueue());
+
+    // duplicate appId
+    try {
+      rmService.submitApplication(submitRequest2);
+      Assert.fail("Exception is expected.");
+    } catch (YarnRemoteException e) {
+      Assert.assertTrue("The thrown exception is not expected.",
+          e.getMessage().contains("Cannot add a duplicate!"));
+    }
+  }
   
   @Test(timeout=4000)
   public void testConcurrentAppSubmit()
       throws IOException, InterruptedException, BrokenBarrierException {
-    YarnScheduler yarnScheduler = mock(YarnScheduler.class);
-    when(yarnScheduler.getMinimumResourceCapability()).thenReturn(
-        Resources.createResource(
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
-    when(yarnScheduler.getMaximumResourceCapability()).thenReturn(
-        Resources.createResource(
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
+    YarnScheduler yarnScheduler = mockYarnScheduler();
     RMContext rmContext = mock(RMContext.class);
     mockRMContext(yarnScheduler, rmContext);
     RMStateStore stateStore = mock(RMStateStore.class);
@@ -270,8 +323,10 @@ public class TestClientRMService {
 
     final ApplicationId appId1 = getApplicationId(100);
     final ApplicationId appId2 = getApplicationId(101);
-    final SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(appId1);
-    final SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(appId2);
+    final SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(
+        appId1, null, null);
+    final SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(
+        appId2, null, null);
     
     final CyclicBarrier startBarrier = new CyclicBarrier(2);
     final CyclicBarrier endBarrier = new CyclicBarrier(2);
@@ -319,61 +374,23 @@ public class TestClientRMService {
     t.join();
   }
 
-  @Test (timeout = 30000)
-  public void testInvalidResourceRequestWhenSubmittingApplication()
-      throws IOException, InterruptedException, BrokenBarrierException {
-    YarnScheduler yarnScheduler = mock(YarnScheduler.class);
-    when(yarnScheduler.getMinimumResourceCapability()).thenReturn(
-        Resources.createResource(
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
-    when(yarnScheduler.getMaximumResourceCapability()).thenReturn(
-        Resources.createResource(
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
-    RMContext rmContext = mock(RMContext.class);
-    mockRMContext(yarnScheduler, rmContext);
-    RMStateStore stateStore = mock(RMStateStore.class);
-    when(rmContext.getStateStore()).thenReturn(stateStore);
-    RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler,
-        null, mock(ApplicationACLsManager.class), new Configuration());
-
-    final ApplicationId appId = getApplicationId(100);
-    final SubmitApplicationRequest submitRequest = mockSubmitAppRequest(appId);
-    Resource resource = Resources.createResource(
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1);
-    when(submitRequest.getApplicationSubmissionContext()
-        .getResource()).thenReturn(resource);
-
-    final ClientRMService rmService =
-        new ClientRMService(rmContext, yarnScheduler, appManager, null, null);
-
-    // submit an app
-    try {
-      rmService.submitApplication(submitRequest);
-      Assert.fail("Application submission should fail because resource" +
-          " request is invalid.");
-    } catch (YarnRemoteException e) {
-      // Exception is expected
-      Assert.assertTrue("The thrown exception is not" +
-          " InvalidResourceRequestException",
-          e.getMessage().startsWith("Invalid resource request"));
-    }
-  }
- 
-  private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId) {
+  private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId,
+      String name, String queue) {
     String user = MockApps.newUserName();
-    String queue = MockApps.newQueue();
 
     ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
 
     Resource resource = Resources.createResource(
         YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
 
-    ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class);
-    when(submissionContext.getAMContainerSpec()).thenReturn(amContainerSpec);
-    when(submissionContext.getAMContainerSpec().getUser()).thenReturn(user);
-    when(submissionContext.getQueue()).thenReturn(queue);
-    when(submissionContext.getApplicationId()).thenReturn(appId);
-    when(submissionContext.getResource()).thenReturn(resource);
+    ApplicationSubmissionContext submissionContext =
+        recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+    submissionContext.setAMContainerSpec(amContainerSpec);
+    submissionContext.getAMContainerSpec().setUser(user);
+    submissionContext.setApplicationName(name);
+    submissionContext.setQueue(queue);
+    submissionContext.setApplicationId(appId);
+    submissionContext.setResource(resource);
 
    SubmitApplicationRequest submitRequest =
        recordFactory.newRecordInstance(SubmitApplicationRequest.class);
@@ -429,4 +446,15 @@ public class TestClientRMService {
         queueName, asContext, yarnScheduler, null , System
             .currentTimeMillis());
   }
+
+  private static YarnScheduler mockYarnScheduler() {
+    YarnScheduler yarnScheduler = mock(YarnScheduler.class);
+    when(yarnScheduler.getMinimumResourceCapability()).thenReturn(
+        Resources.createResource(
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
+    when(yarnScheduler.getMaximumResourceCapability()).thenReturn(
+        Resources.createResource(
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
+    return yarnScheduler;
+  }
 }