Browse Source

MAPREDUCE-3103. Implement Job ACLs for MRAppMaster. (mahadev) - Merging r1195761 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1195763 13f79535-47bb-0310-9956-ffa450edef68
Mahadev Konar 13 years ago
parent
commit
17795f65ea

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -1857,6 +1857,9 @@ Release 0.23.0 - Unreleased
 
 
     MAPREDUCE-3220. Fixed TestCombineOutputCollector. (Devaraj K via acmurthy) 
     MAPREDUCE-3220. Fixed TestCombineOutputCollector. (Devaraj K via acmurthy) 
 
 
+    MAPREDUCE-3103. Implement Job ACLs for MRAppMaster. 
+    (mahadev)
+
 Release 0.22.0 - Unreleased
 Release 0.22.0 - Unreleased
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 4 - 28
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java

@@ -18,11 +18,9 @@
 
 
 package org.apache.hadoop.mapreduce.v2.app.client;
 package org.apache.hadoop.mapreduce.v2.app.client;
 
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.net.UnknownHostException;
-import java.security.AccessControlException;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collection;
 
 
@@ -32,7 +30,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
@@ -196,13 +193,6 @@ public class MRClientService extends AbstractService
       if (job == null) {
       if (job == null) {
         throw RPCUtil.getRemoteException("Unknown job " + jobID);
         throw RPCUtil.getRemoteException("Unknown job " + jobID);
       }
       }
-      //TODO fix job acls.
-      //JobACL operation = JobACL.VIEW_JOB;
-      //if (modifyAccess) {
-      //  operation = JobACL.MODIFY_JOB;
-      //}
-      //TO disable check access ofr now.
-      //checkAccess(job, operation);
       return job;
       return job;
     }
     }
  
  
@@ -226,24 +216,6 @@ public class MRClientService extends AbstractService
       return attempt;
       return attempt;
     }
     }
 
 
-    private void checkAccess(Job job, JobACL jobOperation) 
-      throws YarnRemoteException {
-      if (!UserGroupInformation.isSecurityEnabled()) {
-        return;
-      }
-      UserGroupInformation callerUGI;
-      try {
-        callerUGI = UserGroupInformation.getCurrentUser();
-      } catch (IOException e) {
-        throw RPCUtil.getRemoteException(e);
-      }
-      if(!job.checkAccess(callerUGI, jobOperation)) {
-        throw RPCUtil.getRemoteException(new AccessControlException("User "
-            + callerUGI.getShortUserName() + " cannot perform operation "
-            + jobOperation.name() + " on " + job.getID()));
-      }
-    }
-
     @Override
     @Override
     public GetCountersResponse getCounters(GetCountersRequest request) 
     public GetCountersResponse getCounters(GetCountersRequest request) 
       throws YarnRemoteException {
       throws YarnRemoteException {
@@ -304,6 +276,7 @@ public class MRClientService extends AbstractService
       return response;
       return response;
     }
     }
     
     
+    @SuppressWarnings("unchecked")
     @Override
     @Override
     public KillJobResponse killJob(KillJobRequest request) 
     public KillJobResponse killJob(KillJobRequest request) 
       throws YarnRemoteException {
       throws YarnRemoteException {
@@ -320,6 +293,7 @@ public class MRClientService extends AbstractService
       return response;
       return response;
     }
     }
 
 
+    @SuppressWarnings("unchecked")
     @Override
     @Override
     public KillTaskResponse killTask(KillTaskRequest request) 
     public KillTaskResponse killTask(KillTaskRequest request) 
       throws YarnRemoteException {
       throws YarnRemoteException {
@@ -334,6 +308,7 @@ public class MRClientService extends AbstractService
       return response;
       return response;
     }
     }
     
     
+    @SuppressWarnings("unchecked")
     @Override
     @Override
     public KillTaskAttemptResponse killTaskAttempt(
     public KillTaskAttemptResponse killTaskAttempt(
         KillTaskAttemptRequest request) throws YarnRemoteException {
         KillTaskAttemptRequest request) throws YarnRemoteException {
@@ -363,6 +338,7 @@ public class MRClientService extends AbstractService
       return response;
       return response;
     }
     }
 
 
+    @SuppressWarnings("unchecked")
     @Override
     @Override
     public FailTaskAttemptResponse failTaskAttempt(
     public FailTaskAttemptResponse failTaskAttempt(
         FailTaskAttemptRequest request) throws YarnRemoteException {
         FailTaskAttemptRequest request) throws YarnRemoteException {

+ 32 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java

@@ -28,9 +28,12 @@ import javax.servlet.http.HttpServletResponse;
 
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.StringHelper;
 import org.apache.hadoop.yarn.util.StringHelper;
 import org.apache.hadoop.yarn.util.Times;
 import org.apache.hadoop.yarn.util.Times;
@@ -267,6 +270,29 @@ public class AppController extends Controller implements AMParams {
     setStatus(HttpServletResponse.SC_NOT_FOUND);
     setStatus(HttpServletResponse.SC_NOT_FOUND);
     setTitle(join("Not found: ", s));
     setTitle(join("Not found: ", s));
   }
   }
+  
+  /**
+   * Render a ACCESS_DENIED error.
+   * @param s the error message to include.
+   */
+  void accessDenied(String s) {
+    setStatus(HttpServletResponse.SC_FORBIDDEN);
+    setTitle(join("Access denied: ", s));
+    throw new RuntimeException("Access denied: " + s);
+  }
+
+  /**
+   * check for job access.
+   * @param job the job that is being accessed
+   */
+  void checkAccess(Job job) {
+    UserGroupInformation callerUgi = UserGroupInformation.createRemoteUser(
+        request().getRemoteUser());
+    if (!job.checkAccess(callerUgi, JobACL.VIEW_JOB)) {
+      accessDenied("User " + request().getRemoteUser() + " does not have " +
+          " permissions.");
+    }
+  }
 
 
   /**
   /**
    * Ensure that a JOB_ID was passed into the page.
    * Ensure that a JOB_ID was passed into the page.
@@ -281,6 +307,9 @@ public class AppController extends Controller implements AMParams {
       if (app.getJob() == null) {
       if (app.getJob() == null) {
         notFound($(JOB_ID));
         notFound($(JOB_ID));
       }
       }
+      /* check for acl access */
+      Job job = app.context.getJob(jobID);
+      checkAccess(job);
     } catch (Exception e) {
     } catch (Exception e) {
       badRequest(e.getMessage() == null ? 
       badRequest(e.getMessage() == null ? 
           e.getClass().getName() : e.getMessage());
           e.getClass().getName() : e.getMessage());
@@ -296,7 +325,8 @@ public class AppController extends Controller implements AMParams {
         throw new RuntimeException("missing task ID");
         throw new RuntimeException("missing task ID");
       }
       }
       TaskId taskID = MRApps.toTaskID($(TASK_ID));
       TaskId taskID = MRApps.toTaskID($(TASK_ID));
-      app.setJob(app.context.getJob(taskID.getJobId()));
+      Job job = app.context.getJob(taskID.getJobId());
+      app.setJob(job);
       if (app.getJob() == null) {
       if (app.getJob() == null) {
         notFound(MRApps.toString(taskID.getJobId()));
         notFound(MRApps.toString(taskID.getJobId()));
       } else {
       } else {
@@ -305,6 +335,7 @@ public class AppController extends Controller implements AMParams {
           notFound($(TASK_ID));
           notFound($(TASK_ID));
         }
         }
       }
       }
+      checkAccess(job);
     } catch (Exception e) {
     } catch (Exception e) {
       badRequest(e.getMessage());
       badRequest(e.getMessage());
     }
     }

+ 4 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java

@@ -74,19 +74,20 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
   private final Map<TaskId, Task> reduceTasks = new HashMap<TaskId, Task>();
   private final Map<TaskId, Task> reduceTasks = new HashMap<TaskId, Task>();
   private final String user;
   private final String user;
   private final Path confFile;
   private final Path confFile;
-  
+  private JobACLsManager aclsMgr;
   private List<TaskAttemptCompletionEvent> completionEvents = null;
   private List<TaskAttemptCompletionEvent> completionEvents = null;
   private JobInfo jobInfo;
   private JobInfo jobInfo;
 
 
   public CompletedJob(Configuration conf, JobId jobId, Path historyFile, 
   public CompletedJob(Configuration conf, JobId jobId, Path historyFile, 
-      boolean loadTasks, String userName, Path confFile) throws IOException {
+      boolean loadTasks, String userName, Path confFile, JobACLsManager aclsMgr) 
+          throws IOException {
     LOG.info("Loading job: " + jobId + " from file: " + historyFile);
     LOG.info("Loading job: " + jobId + " from file: " + historyFile);
     this.conf = conf;
     this.conf = conf;
     this.jobId = jobId;
     this.jobId = jobId;
     this.confFile = confFile;
     this.confFile = confFile;
+    this.aclsMgr = aclsMgr;
     
     
     loadFullHistoryData(loadTasks, historyFile);
     loadFullHistoryData(loadTasks, historyFile);
-
     user = userName;
     user = userName;
     counters = TypeConverter.toYarn(jobInfo.getTotalCounters());
     counters = TypeConverter.toYarn(jobInfo.getTotalCounters());
     diagnostics.add(jobInfo.getErrorInfo());
     diagnostics.add(jobInfo.getErrorInfo());
@@ -314,7 +315,6 @@ public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job
     }
     }
     Map<JobACL, AccessControlList> jobACLs = jobInfo.getJobACLs();
     Map<JobACL, AccessControlList> jobACLs = jobInfo.getJobACLs();
     AccessControlList jobACL = jobACLs.get(jobOperation);
     AccessControlList jobACL = jobACLs.get(jobOperation);
-    JobACLsManager aclsMgr = new JobACLsManager(conf);
     return aclsMgr.checkAccess(callerUGI, jobOperation, 
     return aclsMgr.checkAccess(callerUGI, jobOperation, 
         jobInfo.getUsername(), jobACL);
         jobInfo.getUsername(), jobACL);
   }
   }

+ 5 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java

@@ -48,6 +48,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.JobACLsManager;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobSummary;
 import org.apache.hadoop.mapreduce.jobhistory.JobSummary;
@@ -125,6 +126,8 @@ public class JobHistory extends AbstractService implements HistoryContext   {
   //The number of jobs to maintain in the job list cache.
   //The number of jobs to maintain in the job list cache.
   private int jobListCacheSize;
   private int jobListCacheSize;
   
   
+  private JobACLsManager aclsMgr;
+  
   //The number of loaded jobs.
   //The number of loaded jobs.
   private int loadedJobCacheSize;
   private int loadedJobCacheSize;
   
   
@@ -203,7 +206,7 @@ public class JobHistory extends AbstractService implements HistoryContext   {
           + intermediateDoneDirPath + "]", e);
           + intermediateDoneDirPath + "]", e);
     }
     }
     
     
-    
+    this.aclsMgr = new JobACLsManager(conf);
     
     
     jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
     jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
         DEFAULT_JOBLIST_CACHE_SIZE);
         DEFAULT_JOBLIST_CACHE_SIZE);
@@ -648,7 +651,7 @@ public class JobHistory extends AbstractService implements HistoryContext   {
       try {
       try {
         Job job = new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(), 
         Job job = new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(), 
             metaInfo.getHistoryFile(), true, metaInfo.getJobIndexInfo().getUser(),
             metaInfo.getHistoryFile(), true, metaInfo.getJobIndexInfo().getUser(),
-            metaInfo.getConfFile());
+            metaInfo.getConfFile(), this.aclsMgr);
         addToLoadedJobCache(job);
         addToLoadedJobCache(job);
         return job;
         return job;
       } catch (IOException e) {
       } catch (IOException e) {