Browse Source

YARN-1115: Provide optional means for a scheduler to check real user ACLs. Contributed by Eric Payne (epayne)

Ahmed Hussein 3 years ago
parent
commit
de120b16ad
14 changed files with 283 additions and 19 deletions
  1. 4 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
  2. 27 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
  3. 3 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
  4. 43 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java
  5. 16 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java
  6. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
  7. 21 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
  8. 5 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  9. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto
  10. 3 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
  11. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
  12. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
  13. 147 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
  14. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md

+ 4 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java

@@ -572,10 +572,12 @@ public class ClientRMService extends AbstractService implements
     // checked here, those that are dependent on RM configuration are validated
     // checked here, those that are dependent on RM configuration are validated
     // in RMAppManager.
     // in RMAppManager.
 
 
+    UserGroupInformation userUgi = null;
     String user = null;
     String user = null;
     try {
     try {
       // Safety
       // Safety
-      user = UserGroupInformation.getCurrentUser().getShortUserName();
+      userUgi = UserGroupInformation.getCurrentUser();
+      user = userUgi.getShortUserName();
     } catch (IOException ie) {
     } catch (IOException ie) {
       LOG.warn("Unable to get the current user.", ie);
       LOG.warn("Unable to get the current user.", ie);
       RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
       RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
@@ -665,7 +667,7 @@ public class ClientRMService extends AbstractService implements
     try {
     try {
       // call RMAppManager to submit application directly
       // call RMAppManager to submit application directly
       rmAppManager.submitApplication(submissionContext,
       rmAppManager.submitApplication(submissionContext,
-          System.currentTimeMillis(), user);
+          System.currentTimeMillis(), userUgi);
 
 
       LOG.info("Application with id " + applicationId.getId() + 
       LOG.info("Application with id " + applicationId.getId() + 
           " submitted by user " + user);
           " submitted by user " + user);

+ 27 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java

@@ -322,16 +322,26 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
     }
     }
   }
   }
 
 
-  @SuppressWarnings("unchecked")
+  @VisibleForTesting
+  @Deprecated
   protected void submitApplication(
   protected void submitApplication(
       ApplicationSubmissionContext submissionContext, long submitTime,
       ApplicationSubmissionContext submissionContext, long submitTime,
       String user) throws YarnException {
       String user) throws YarnException {
+    submitApplication(submissionContext, submitTime,
+        UserGroupInformation.createRemoteUser(user));
+  }
+
+  @VisibleForTesting
+  @SuppressWarnings("unchecked")
+  protected void submitApplication(
+      ApplicationSubmissionContext submissionContext, long submitTime,
+      UserGroupInformation userUgi) throws YarnException {
     ApplicationId applicationId = submissionContext.getApplicationId();
     ApplicationId applicationId = submissionContext.getApplicationId();
 
 
     // Passing start time as -1. It will be eventually set in RMAppImpl
     // Passing start time as -1. It will be eventually set in RMAppImpl
     // constructor.
     // constructor.
     RMAppImpl application = createAndPopulateNewRMApp(
     RMAppImpl application = createAndPopulateNewRMApp(
-        submissionContext, submitTime, user, false, -1);
+        submissionContext, submitTime, userUgi, false, -1);
     try {
     try {
       if (UserGroupInformation.isSecurityEnabled()) {
       if (UserGroupInformation.isSecurityEnabled()) {
         this.rmContext.getDelegationTokenRenewer()
         this.rmContext.getDelegationTokenRenewer()
@@ -364,18 +374,30 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
     ApplicationSubmissionContext appContext =
     ApplicationSubmissionContext appContext =
         appState.getApplicationSubmissionContext();
         appState.getApplicationSubmissionContext();
     ApplicationId appId = appContext.getApplicationId();
     ApplicationId appId = appContext.getApplicationId();
+    UserGroupInformation userUgi = null;
+    if (appState.getRealUser() != null) {
+      UserGroupInformation realUserUgi = null;
+      realUserUgi =
+          UserGroupInformation.createRemoteUser(appState.getRealUser());
+      userUgi = UserGroupInformation.createProxyUser(appState.getUser(),
+          realUserUgi);
+    } else {
+      userUgi = UserGroupInformation.createRemoteUser(appState.getUser());
+    }
 
 
     // create and recover app.
     // create and recover app.
     RMAppImpl application =
     RMAppImpl application =
         createAndPopulateNewRMApp(appContext, appState.getSubmitTime(),
         createAndPopulateNewRMApp(appContext, appState.getSubmitTime(),
-            appState.getUser(), true, appState.getStartTime());
+            userUgi, true, appState.getStartTime());
 
 
     application.handle(new RMAppRecoverEvent(appId, rmState));
     application.handle(new RMAppRecoverEvent(appId, rmState));
   }
   }
 
 
   private RMAppImpl createAndPopulateNewRMApp(
   private RMAppImpl createAndPopulateNewRMApp(
       ApplicationSubmissionContext submissionContext, long submitTime,
       ApplicationSubmissionContext submissionContext, long submitTime,
-      String user, boolean isRecovery, long startTime) throws YarnException {
+      UserGroupInformation userUgi, boolean isRecovery, long startTime)
+      throws YarnException {
+    String user = userUgi.getShortUserName();
     // Do queue mapping
     // Do queue mapping
     if (!isRecovery) {
     if (!isRecovery) {
       // Do queue mapping
       // Do queue mapping
@@ -395,7 +417,6 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
 
 
     // Verify and get the update application priority and set back to
     // Verify and get the update application priority and set back to
     // submissionContext
     // submissionContext
-    UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
 
 
     // Application priority needed to be validated only while submitting. During
     // Application priority needed to be validated only while submitting. During
     // recovery, validated priority could be recovered from submission context.
     // recovery, validated priority could be recovered from submission context.
@@ -437,7 +458,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
     // Create RMApp
     // Create RMApp
     RMAppImpl application =
     RMAppImpl application =
         new RMAppImpl(applicationId, rmContext, this.conf,
         new RMAppImpl(applicationId, rmContext, this.conf,
-            submissionContext.getApplicationName(), user,
+            submissionContext.getApplicationName(), userUgi,
             submissionContext.getQueue(),
             submissionContext.getQueue(),
             submissionContext, this.scheduler, this.masterService,
             submissionContext, this.scheduler, this.masterService,
             submitTime, submissionContext.getApplicationType(),
             submitTime, submissionContext.getApplicationType(),

+ 3 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java

@@ -805,7 +805,8 @@ public abstract class RMStateStore extends AbstractService {
     assert context instanceof ApplicationSubmissionContextPBImpl;
     assert context instanceof ApplicationSubmissionContextPBImpl;
     ApplicationStateData appState =
     ApplicationStateData appState =
         ApplicationStateData.newInstance(app.getSubmitTime(),
         ApplicationStateData.newInstance(app.getSubmitTime(),
-            app.getStartTime(), context, app.getUser(), app.getCallerContext());
+            app.getStartTime(), context, app.getUser(), app.getRealUser(),
+            app.getCallerContext());
     appState.setApplicationTimeouts(app.getApplicationTimeouts());
     appState.setApplicationTimeouts(app.getApplicationTimeouts());
     getRMStateStoreEventHandler().handle(new RMStateStoreAppEvent(appState));
     getRMStateStoreEventHandler().handle(new RMStateStoreAppEvent(appState));
   }
   }
@@ -1038,7 +1039,7 @@ public abstract class RMStateStore extends AbstractService {
     ApplicationStateData appState =
     ApplicationStateData appState =
         ApplicationStateData.newInstance(app.getSubmitTime(),
         ApplicationStateData.newInstance(app.getSubmitTime(),
             app.getStartTime(), app.getApplicationSubmissionContext(),
             app.getStartTime(), app.getApplicationSubmissionContext(),
-            app.getUser(), app.getCallerContext());
+            app.getUser(), app.getRealUser(), app.getCallerContext());
     for(RMAppAttempt appAttempt : app.getAppAttempts().values()) {
     for(RMAppAttempt appAttempt : app.getAppAttempts().values()) {
       appState.attempts.put(appAttempt.getAppAttemptId(), null);
       appState.attempts.put(appAttempt.getAppAttemptId(), null);
     }
     }

+ 43 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java

@@ -92,9 +92,46 @@ public abstract class ApplicationStateData {
   
   
   public static ApplicationStateData newInstance(long submitTime,
   public static ApplicationStateData newInstance(long submitTime,
       long startTime, ApplicationSubmissionContext context, String user) {
       long startTime, ApplicationSubmissionContext context, String user) {
-    return newInstance(submitTime, startTime, context, user, null);
+    return newInstance(submitTime, startTime, context, user,
+        (CallerContext) null);
   }
   }
-  
+
+  public static ApplicationStateData newInstance(long submitTime,
+      long startTime, String user, String realUser,
+      ApplicationSubmissionContext submissionContext, RMAppState state,
+      String diagnostics, long launchTime, long finishTime,
+      CallerContext callerContext) {
+    ApplicationStateData appState =
+        newInstance(submitTime, startTime, user, submissionContext, state,
+            diagnostics, launchTime, finishTime, callerContext);
+    if (realUser != null) {
+      appState.setRealUser(realUser);
+    }
+    return appState;
+  }
+
+  public static ApplicationStateData newInstance(long submitTime,
+      long startTime, String user, String realUser,
+      ApplicationSubmissionContext submissionContext, RMAppState state,
+      String diagnostics, long launchTime, long finishTime,
+      CallerContext callerContext,
+      Map<ApplicationTimeoutType, Long> applicationTimeouts) {
+    ApplicationStateData appState =
+        newInstance(submitTime, startTime, user, submissionContext, state,
+            diagnostics, launchTime, finishTime, callerContext, applicationTimeouts);
+    if (realUser != null) {
+      appState.setRealUser(realUser);
+    }
+    return appState;
+  }
+
+  public static ApplicationStateData newInstance(long submitTime,
+      long startTime, ApplicationSubmissionContext context, String user,
+      String realUser, CallerContext callerContext) {
+    return newInstance(submitTime, startTime, user, realUser, context, null, "",
+        0, 0, callerContext);
+  }
+
   public int getAttemptCount() {
   public int getAttemptCount() {
     return attempts.size();
     return attempts.size();
   }
   }
@@ -213,4 +250,8 @@ public abstract class ApplicationStateData {
   @Public
   @Public
   public abstract void setApplicationTimeouts(
   public abstract void setApplicationTimeouts(
       Map<ApplicationTimeoutType, Long> applicationTimeouts);
       Map<ApplicationTimeoutType, Long> applicationTimeouts);
+
+  public abstract String getRealUser();
+
+  public abstract void setRealUser(String realUser);
 }
 }

+ 16 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java

@@ -148,7 +148,22 @@ public class ApplicationStateDataPBImpl extends ApplicationStateData {
     maybeInitBuilder();
     maybeInitBuilder();
     builder.setUser(user);
     builder.setUser(user);
   }
   }
-  
+
+  @Override
+  public String getRealUser() {
+    ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasRealUser()) {
+      return null;
+    }
+    return (p.getRealUser());
+  }
+
+  @Override
+  public void setRealUser(String realUser) {
+    maybeInitBuilder();
+    builder.setRealUser(realUser);
+  }
+
   @Override
   @Override
   public ApplicationSubmissionContext getApplicationSubmissionContext() {
   public ApplicationSubmissionContext getApplicationSubmissionContext() {
     ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;
     ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java

@@ -309,4 +309,6 @@ public interface RMApp extends EventHandler<RMAppEvent> {
    * @return True/False to confirm whether app is in final states
    * @return True/False to confirm whether app is in final states
    */
    */
   boolean isAppInCompletedStates();
   boolean isAppInCompletedStates();
+
+  String getRealUser();
 }
 }

+ 21 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -133,6 +133,7 @@ public class RMAppImpl implements RMApp, Recoverable {
   private final RMContext rmContext;
   private final RMContext rmContext;
   private final Configuration conf;
   private final Configuration conf;
   private final String user;
   private final String user;
+  private final UserGroupInformation userUgi;
   private final String name;
   private final String name;
   private final ApplicationSubmissionContext submissionContext;
   private final ApplicationSubmissionContext submissionContext;
   private final Dispatcher dispatcher;
   private final Dispatcher dispatcher;
@@ -426,7 +427,18 @@ public class RMAppImpl implements RMApp, Recoverable {
       ApplicationMasterService masterService, long submitTime,
       ApplicationMasterService masterService, long submitTime,
       String applicationType, Set<String> applicationTags,
       String applicationType, Set<String> applicationTags,
       List<ResourceRequest> amReqs, long startTime) {
       List<ResourceRequest> amReqs, long startTime) {
+    this(applicationId, rmContext, config, name,
+       (user != null ? UserGroupInformation.createRemoteUser(user) : null),
+        queue, submissionContext, scheduler, masterService, submitTime,
+        applicationType, applicationTags, amReqs, startTime);
+  }
 
 
+  public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
+      Configuration config, String name, UserGroupInformation userUgi, String queue,
+      ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
+      ApplicationMasterService masterService, long submitTime,
+      String applicationType, Set<String> applicationTags,
+      List<ResourceRequest> amReqs, long startTime) {
     this.systemClock = SystemClock.getInstance();
     this.systemClock = SystemClock.getInstance();
 
 
     this.applicationId = applicationId;
     this.applicationId = applicationId;
@@ -435,7 +447,8 @@ public class RMAppImpl implements RMApp, Recoverable {
     this.dispatcher = rmContext.getDispatcher();
     this.dispatcher = rmContext.getDispatcher();
     this.handler = dispatcher.getEventHandler();
     this.handler = dispatcher.getEventHandler();
     this.conf = config;
     this.conf = config;
-    this.user = user;
+    this.user = (userUgi != null) ? userUgi.getShortUserName() : null;
+    this.userUgi = userUgi;
     this.queue = queue;
     this.queue = queue;
     this.submissionContext = submissionContext;
     this.submissionContext = submissionContext;
     this.scheduler = scheduler;
     this.scheduler = scheduler;
@@ -1321,7 +1334,7 @@ public class RMAppImpl implements RMApp, Recoverable {
 
 
     ApplicationStateData appState =
     ApplicationStateData appState =
         ApplicationStateData.newInstance(this.submitTime, this.startTime,
         ApplicationStateData.newInstance(this.submitTime, this.startTime,
-            this.user, this.submissionContext,
+            this.getUser(), this.getRealUser(), this.submissionContext,
             stateToBeStored, diags, this.launchTime, this.storedFinishTime,
             stateToBeStored, diags, this.launchTime, this.storedFinishTime,
             this.callerContext);
             this.callerContext);
     appState.setApplicationTimeouts(this.applicationTimeouts);
     appState.setApplicationTimeouts(this.applicationTimeouts);
@@ -2084,4 +2097,10 @@ public class RMAppImpl implements RMApp, Recoverable {
               RMAppState state){
               RMAppState state){
       /* TODO fail the application on the failed transition */
       /* TODO fail the application on the failed transition */
   }
   }
+
+  @Override
+  public String getRealUser() {
+    UserGroupInformation realUserUgi = this.userUgi.getRealUser();
+    return (realUserUgi != null) ? realUserUgi.getShortUserName() : null;
+  }
 }
 }

+ 5 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -723,6 +723,10 @@ public class CapacityScheduler extends
       } catch (AccessControlException ace) {
       } catch (AccessControlException ace) {
         // Ignore the exception for recovered app as the app was previously
         // Ignore the exception for recovered app as the app was previously
         // accepted.
         // accepted.
+        LOG.warn("AccessControlException received when trying to recover "
+            + applicationId + " in queue " + queueName  + " for user " + user
+            + ". Since the app was in the queue prior to recovery, the Capacity"
+            + " Scheduler will recover the app anyway.", ace);
       }
       }
       queue.getMetrics().submitApp(user);
       queue.getMetrics().submitApp(user);
       SchedulerApplication<FiCaSchedulerApp> application =
       SchedulerApplication<FiCaSchedulerApp> application =
@@ -2358,7 +2362,7 @@ public class CapacityScheduler extends
       ApplicationStateData appState = ApplicationStateData.newInstance(
       ApplicationStateData appState = ApplicationStateData.newInstance(
           rmApp.getSubmitTime(), rmApp.getStartTime(),
           rmApp.getSubmitTime(), rmApp.getStartTime(),
           rmApp.getApplicationSubmissionContext(), rmApp.getUser(),
           rmApp.getApplicationSubmissionContext(), rmApp.getUser(),
-          rmApp.getCallerContext());
+          rmApp.getRealUser(), rmApp.getCallerContext());
       appState.setApplicationTimeouts(rmApp.getApplicationTimeouts());
       appState.setApplicationTimeouts(rmApp.getApplicationTimeouts());
       appState.setLaunchTime(rmApp.getLaunchTime());
       appState.setLaunchTime(rmApp.getLaunchTime());
       rmContext.getStateStore().updateApplicationStateSynchronously(appState,
       rmContext.getStateStore().updateApplicationStateSynchronously(appState,

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto

@@ -71,6 +71,7 @@ message ApplicationStateDataProto {
     optional hadoop.common.RPCCallerContextProto caller_context = 8;
     optional hadoop.common.RPCCallerContextProto caller_context = 8;
     repeated ApplicationTimeoutMapProto application_timeouts = 9;
     repeated ApplicationTimeoutMapProto application_timeouts = 9;
     optional int64 launch_time = 10;
     optional int64 launch_time = 10;
+    optional string real_user = 11;
 }
 }
 
 
 message ApplicationAttemptStateDataProto {
 message ApplicationAttemptStateDataProto {

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.ha.ClientBaseWithFixes;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
 import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -168,7 +169,8 @@ public abstract class RMHATestBase extends ClientBaseWithFixes{
     @Override
     @Override
     protected void submitApplication(
     protected void submitApplication(
         ApplicationSubmissionContext submissionContext, long submitTime,
         ApplicationSubmissionContext submissionContext, long submitTime,
-        String user) throws YarnException {
+        UserGroupInformation userUgi) throws YarnException {
+      String user = userUgi.getShortUserName();
       //Do nothing, just add the application to RMContext
       //Do nothing, just add the application to RMContext
       RMAppImpl application =
       RMAppImpl application =
           new RMAppImpl(submissionContext.getApplicationId(), this.rmContext,
           new RMAppImpl(submissionContext.getApplicationId(), this.rmContext,

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java

@@ -66,6 +66,10 @@ public abstract class MockAsm extends MockApps {
       throw new UnsupportedOperationException("Not supported yet.");
       throw new UnsupportedOperationException("Not supported yet.");
     }
     }
 
 
+    @Override
+    public String getRealUser() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
     @Override
     @Override
     public ApplicationSubmissionContext getApplicationSubmissionContext() {
     public ApplicationSubmissionContext getApplicationSubmissionContext() {
       throw new UnsupportedOperationException("Not supported yet.");
       throw new UnsupportedOperationException("Not supported yet.");

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java

@@ -110,6 +110,11 @@ public class MockRMApp implements RMApp {
     return user;
     return user;
   }
   }
 
 
+  @Override
+  public String getRealUser() {
+    return null;
+  }
+
   public void setUser(String user) {
   public void setUser(String user) {
     this.user = user;
     this.user = user;
   }
   }

+ 147 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

@@ -44,10 +44,15 @@ import java.util.concurrent.CyclicBarrier;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -59,9 +64,13 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.AccessType;
+import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -77,8 +86,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyWithExclusivePartitions;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyWithExclusivePartitions;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
@@ -4140,4 +4151,140 @@ public class TestLeafQueue {
       cs.stop();
       cs.stop();
     }
     }
   }
   }
+
+  private static class TestRMAppManager extends RMAppManager {
+    TestRMAppManager(RMContext context, YarnScheduler scheduler,
+        ApplicationMasterService masterService,
+        ApplicationACLsManager applicationACLsManager, Configuration conf) {
+      super(context, scheduler, masterService, applicationACLsManager, conf);
+    }
+
+    @Override
+    public void submitApplication(
+        ApplicationSubmissionContext submissionContext, long submitTime,
+        UserGroupInformation userUgi) throws YarnException {
+      super.submitApplication(submissionContext, submitTime, userUgi);
+    }
+  }
+
+  @Test
+  public void testSubmitUsingRealUserAcls() throws Exception {
+    final String realUser = "AdminUser";
+    final String user0 = "user0";
+    final String user1 = "user1";
+    final String queue = "default";
+
+    YarnConfiguration conf = new YarnConfiguration();
+    MockRM rm = new MockRM();
+    rm.init(conf);
+    rm.start();
+    rm.getConfig().setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
+
+    UserGroupInformation realUserUgi =
+        UserGroupInformation.createRemoteUser(realUser);
+    UserGroupInformation ugi0 =
+        UserGroupInformation.createProxyUserForTesting("user0", realUserUgi,
+            new String[] {"group1"});
+    UserGroupInformation ugi1 =
+        UserGroupInformation.createProxyUserForTesting("user1", realUserUgi,
+            new String[] {"group1"});
+    ApplicationId applicationId0 = TestUtils.getMockApplicationId(0);
+    ApplicationId applicationId1 = TestUtils.getMockApplicationId(1);
+    CapacityScheduler cSched = (CapacityScheduler) rm.getResourceScheduler();
+
+    ParentQueue rootQueue = (ParentQueue) cSched.getRootQueue();
+    Map<AccessType, AccessControlList> rootAcls = rootQueue.acls;
+    rootAcls.put(AccessType.SUBMIT_APP, new AccessControlList(realUser));
+    rootAcls.put(AccessType.ADMINISTER_QUEUE, new AccessControlList(realUser));
+
+    LeafQueue defaultQueue = (LeafQueue)cSched.getQueue(queue);
+    Map<AccessType, AccessControlList> a = defaultQueue.acls;
+    a.put(AccessType.SUBMIT_APP, new AccessControlList(realUser));
+    a.put(AccessType.ADMINISTER_QUEUE, new AccessControlList(realUser));
+
+    TestRMAppManager testRmAppManager =
+        new TestRMAppManager(spyRMContext, cSched,
+            rm.getApplicationMasterService(),
+            rm.getApplicationACLsManager(), rm.getConfig());
+    ContainerLaunchContext clc =
+        mock(ContainerLaunchContext.class);
+    ApplicationSubmissionContext asc =
+        ApplicationSubmissionContext.newInstance(
+            applicationId0, "myApp0", "default", Priority.newInstance(0),
+            clc, false, false, 1, Resource.newInstance(1024, 1));
+
+    // Each of the following test cases has a proxied user and a real user.
+    // The proxied users are user0 and user1, depending on the test. The real
+    // user is always AdminUser.
+
+    // Ensure that user0 is not allowed to submit to the default queue when only
+    // the admin user is in the submit ACL and the admin user does not have the
+    // USE_REAL_ACLS character prepended.
+    try {
+      testRmAppManager.submitApplication(asc, System.currentTimeMillis(), ugi0);
+      Assert.fail(user0 + " should not be allowed to submit to the "
+                  + queue + " queue when only admin user is in submit ACL.");
+    } catch (YarnException e) {
+      // This is the expected behavior.
+      assertTrue("Should have received an AccessControlException.",
+          e.getCause() instanceof AccessControlException);
+    }
+
+    // With only user0 in the list of users authorized to submit apps to the
+    // queue, ensure that user0 is allowed to submit to the default queue.
+    a.put(AccessType.SUBMIT_APP, new AccessControlList(user0));
+    a.put(AccessType.ADMINISTER_QUEUE, new AccessControlList(realUser));
+    try {
+      testRmAppManager.submitApplication(asc, System.currentTimeMillis(), ugi0);
+    } catch (YarnException e) {
+      Assert.fail(user0 + " should be allowed to submit to the "
+                  + queue + " queue.");
+    }
+
+    // With only user0 in the list of users authorized to submit apps to the
+    // queue, ensure that user1 is NOT allowed to submit to the default queue
+    try {
+      testRmAppManager.submitApplication(asc, System.currentTimeMillis(), ugi1);
+      Assert.fail(user1 + " should not be allowed to submit to the "
+          + queue + " queue.");
+    } catch (YarnException e) {
+      // This is the expected behavior.
+      assertTrue("Should have received an AccessControlException.",
+          e.getCause() instanceof AccessControlException);
+    }
+
+    // Even though the admin user is in the list of users allowed to submit to
+    // the default queue and user1's real user is the admin user, user1 should
+    // not be allowed to submit to queue because the ACL entry does not have the
+    // special character prepended in the list.
+    a.put(AccessType.SUBMIT_APP,
+        new AccessControlList(user0 + "," + realUser));
+    try {
+      testRmAppManager.submitApplication(asc, System.currentTimeMillis(), ugi1);
+      Assert.fail(user1 + " should not be allowed to submit to the "
+                  + queue + " queue.");
+    } catch (YarnException e) {
+      // This is the expected behavior.
+      assertTrue("Should have received an AccessControlException.",
+          e.getCause() instanceof AccessControlException);
+    }
+
+    // user1 should now be allowed to submit to the default queue because the
+    // admin user is in the ACL list and has the USE_REAL_ACLS character
+    // prepended.
+    a.put(AccessType.SUBMIT_APP,
+        new AccessControlList(user0 + ","
+            + AccessControlList.USE_REAL_ACLS + realUser));
+    asc.setApplicationId(applicationId1);
+    try {
+      testRmAppManager.submitApplication(asc, System.currentTimeMillis(), ugi1);
+    } catch (YarnException e) {
+      LOG.error("failed to submit", e);
+      Assert.fail(user1 + " should be allowed to submit to the "
+                  + queue + " queue when real user is" + realUser + ".");
+    }
+
+    rm.stop();
+    rm.close();
+  }
 }
 }

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md

@@ -142,8 +142,8 @@ Configuration
 | Property | Description |
 | Property | Description |
 |:---- |:---- |
 |:---- |:---- |
 | `yarn.scheduler.capacity.<queue-path>.state` | The *state* of the queue. Can be one of `RUNNING` or `STOPPED`. If a queue is in `STOPPED` state, new applications cannot be submitted to *itself* or *any of its child queues*. Thus, if the *root* queue is `STOPPED` no applications can be submitted to the entire cluster. Existing applications continue to completion, thus the queue can be *drained* gracefully. Value is specified as Enumeration. |
 | `yarn.scheduler.capacity.<queue-path>.state` | The *state* of the queue. Can be one of `RUNNING` or `STOPPED`. If a queue is in `STOPPED` state, new applications cannot be submitted to *itself* or *any of its child queues*. Thus, if the *root* queue is `STOPPED` no applications can be submitted to the entire cluster. Existing applications continue to completion, thus the queue can be *drained* gracefully. Value is specified as Enumeration. |
-| `yarn.scheduler.capacity.root.<queue-path>.acl_submit_applications` | The *ACL* which controls who can *submit* applications to the given queue. If the given user/group has necessary ACLs on the given queue or *one of the parent queues in the hierarchy* they can submit applications. *ACLs* for this property *are* inherited from the parent queue if not specified. |
-| `yarn.scheduler.capacity.root.<queue-path>.acl_administer_queue` | The *ACL* which controls who can *administer* applications on the given queue. If the given user/group has necessary ACLs on the given queue or *one of the parent queues in the hierarchy* they can administer applications. *ACLs* for this property *are* inherited from the parent queue if not specified. |
+| `yarn.scheduler.capacity.root.<queue-path>.acl_submit_applications` | The *ACL* which controls who can *submit* applications to the given queue. If the given user/group has necessary ACLs on the given queue or *one of the parent queues in the hierarchy* they can submit applications. *ACLs* for this property *are* inherited from the parent queue if not specified. If a tilde (~) is prepended to a user name in this list, the real user's ACLs will allow the proxied user to submit to the queue. |
+| `yarn.scheduler.capacity.root.<queue-path>.acl_administer_queue` | The *ACL* which controls who can *administer* applications on the given queue. If the given user/group has necessary ACLs on the given queue or *one of the parent queues in the hierarchy* they can administer applications. *ACLs* for this property *are* inherited from the parent queue if not specified. If a tilde (~) is prepended to a user name in this list, the real user's ACLs will allow the proxied user to administer apps the queue. |
 
 
 **Note:** An *ACL* is of the form *user1*,*user2* *space* *group1*,*group2*. The special value of * implies *anyone*. The special value of *space* implies *no one*. The default is * for the root queue if not specified.
 **Note:** An *ACL* is of the form *user1*,*user2* *space* *group1*,*group2*. The special value of * implies *anyone*. The special value of *space* implies *no one*. The default is * for the root queue if not specified.