瀏覽代碼

svn merge -c 1520156 FIXES: MAPREDUCE-5475. MRClientService does not verify ACLs properly. Contributed by Jason Lowe

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1520157 13f79535-47bb-0310-9956-ffa450edef68
Jason Darrell Lowe 11 年之前
父節點
當前提交
5ddee655e4

+ 4 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -114,6 +114,8 @@ Release 2.1.1-beta - UNRELEASED
     commands to reboot, so that client can continue to track the overall job.
     commands to reboot, so that client can continue to track the overall job.
     (Jian He via vinodkv)
     (Jian He via vinodkv)
 
 
+    MAPREDUCE-5475. MRClientService does not verify ACLs properly (jlowe)
+
 Release 2.1.0-beta - 2013-08-22
 Release 2.1.0-beta - 2013-08-22
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES
@@ -1204,6 +1206,8 @@ Release 0.23.10 - UNRELEASED
     MAPREDUCE-5001. LocalJobRunner has race condition resulting in job
     MAPREDUCE-5001. LocalJobRunner has race condition resulting in job
     failures (Sandy Ryza via jlowe)
     failures (Sandy Ryza via jlowe)
 
 
+    MAPREDUCE-5475. MRClientService does not verify ACLs properly (jlowe)
+
 Release 0.23.9 - 2013-07-08
 Release 0.23.9 - 2013-07-08
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 42 - 22
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java

@@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
@@ -78,6 +79,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
 import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
 import org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp;
 import org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -175,16 +178,22 @@ public class MRClientService extends AbstractService
       return getBindAddress();
       return getBindAddress();
     }
     }
     
     
-    private Job verifyAndGetJob(JobId jobID, 
-        boolean modifyAccess) throws IOException {
+    private Job verifyAndGetJob(JobId jobID,
+        JobACL accessType) throws IOException {
       Job job = appContext.getJob(jobID);
       Job job = appContext.getJob(jobID);
+      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+      if (!job.checkAccess(ugi, accessType)) {
+        throw new AccessControlException("User " + ugi.getShortUserName()
+            + " cannot perform operation " + accessType.name() + " on "
+            + jobID);
+      }
       return job;
       return job;
     }
     }
  
  
     private Task verifyAndGetTask(TaskId taskID, 
     private Task verifyAndGetTask(TaskId taskID, 
-        boolean modifyAccess) throws IOException {
+        JobACL accessType) throws IOException {
       Task task = verifyAndGetJob(taskID.getJobId(), 
       Task task = verifyAndGetJob(taskID.getJobId(), 
-          modifyAccess).getTask(taskID);
+          accessType).getTask(taskID);
       if (task == null) {
       if (task == null) {
         throw new IOException("Unknown Task " + taskID);
         throw new IOException("Unknown Task " + taskID);
       }
       }
@@ -192,9 +201,9 @@ public class MRClientService extends AbstractService
     }
     }
 
 
     private TaskAttempt verifyAndGetAttempt(TaskAttemptId attemptID, 
     private TaskAttempt verifyAndGetAttempt(TaskAttemptId attemptID, 
-        boolean modifyAccess) throws IOException {
+        JobACL accessType) throws IOException {
       TaskAttempt attempt = verifyAndGetTask(attemptID.getTaskId(), 
       TaskAttempt attempt = verifyAndGetTask(attemptID.getTaskId(), 
-          modifyAccess).getAttempt(attemptID);
+          accessType).getAttempt(attemptID);
       if (attempt == null) {
       if (attempt == null) {
         throw new IOException("Unknown TaskAttempt " + attemptID);
         throw new IOException("Unknown TaskAttempt " + attemptID);
       }
       }
@@ -205,7 +214,7 @@ public class MRClientService extends AbstractService
     public GetCountersResponse getCounters(GetCountersRequest request) 
     public GetCountersResponse getCounters(GetCountersRequest request) 
       throws IOException {
       throws IOException {
       JobId jobId = request.getJobId();
       JobId jobId = request.getJobId();
-      Job job = verifyAndGetJob(jobId, false);
+      Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
       GetCountersResponse response =
       GetCountersResponse response =
         recordFactory.newRecordInstance(GetCountersResponse.class);
         recordFactory.newRecordInstance(GetCountersResponse.class);
       response.setCounters(TypeConverter.toYarn(job.getAllCounters()));
       response.setCounters(TypeConverter.toYarn(job.getAllCounters()));
@@ -216,7 +225,7 @@ public class MRClientService extends AbstractService
     public GetJobReportResponse getJobReport(GetJobReportRequest request) 
     public GetJobReportResponse getJobReport(GetJobReportRequest request) 
       throws IOException {
       throws IOException {
       JobId jobId = request.getJobId();
       JobId jobId = request.getJobId();
-      Job job = verifyAndGetJob(jobId, false);
+      Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
       GetJobReportResponse response = 
       GetJobReportResponse response = 
         recordFactory.newRecordInstance(GetJobReportResponse.class);
         recordFactory.newRecordInstance(GetJobReportResponse.class);
       if (job != null) {
       if (job != null) {
@@ -235,7 +244,7 @@ public class MRClientService extends AbstractService
       GetTaskAttemptReportResponse response =
       GetTaskAttemptReportResponse response =
         recordFactory.newRecordInstance(GetTaskAttemptReportResponse.class);
         recordFactory.newRecordInstance(GetTaskAttemptReportResponse.class);
       response.setTaskAttemptReport(
       response.setTaskAttemptReport(
-          verifyAndGetAttempt(taskAttemptId, false).getReport());
+          verifyAndGetAttempt(taskAttemptId, JobACL.VIEW_JOB).getReport());
       return response;
       return response;
     }
     }
 
 
@@ -245,7 +254,8 @@ public class MRClientService extends AbstractService
       TaskId taskId = request.getTaskId();
       TaskId taskId = request.getTaskId();
       GetTaskReportResponse response = 
       GetTaskReportResponse response = 
         recordFactory.newRecordInstance(GetTaskReportResponse.class);
         recordFactory.newRecordInstance(GetTaskReportResponse.class);
-      response.setTaskReport(verifyAndGetTask(taskId, false).getReport());
+      response.setTaskReport(
+          verifyAndGetTask(taskId, JobACL.VIEW_JOB).getReport());
       return response;
       return response;
     }
     }
 
 
@@ -256,7 +266,7 @@ public class MRClientService extends AbstractService
       JobId jobId = request.getJobId();
       JobId jobId = request.getJobId();
       int fromEventId = request.getFromEventId();
       int fromEventId = request.getFromEventId();
       int maxEvents = request.getMaxEvents();
       int maxEvents = request.getMaxEvents();
-      Job job = verifyAndGetJob(jobId, false);
+      Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
       
       
       GetTaskAttemptCompletionEventsResponse response = 
       GetTaskAttemptCompletionEventsResponse response = 
         recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
         recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
@@ -270,9 +280,11 @@ public class MRClientService extends AbstractService
     public KillJobResponse killJob(KillJobRequest request) 
     public KillJobResponse killJob(KillJobRequest request) 
       throws IOException {
       throws IOException {
       JobId jobId = request.getJobId();
       JobId jobId = request.getJobId();
-      String message = "Kill Job received from client " + jobId;
+      UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
+      String message = "Kill job " + jobId + " received from " + callerUGI
+          + " at " + Server.getRemoteAddress();
       LOG.info(message);
       LOG.info(message);
-  	  verifyAndGetJob(jobId, true);
+      verifyAndGetJob(jobId, JobACL.MODIFY_JOB);
       appContext.getEventHandler().handle(
       appContext.getEventHandler().handle(
           new JobDiagnosticsUpdateEvent(jobId, message));
           new JobDiagnosticsUpdateEvent(jobId, message));
       appContext.getEventHandler().handle(
       appContext.getEventHandler().handle(
@@ -287,9 +299,11 @@ public class MRClientService extends AbstractService
     public KillTaskResponse killTask(KillTaskRequest request) 
     public KillTaskResponse killTask(KillTaskRequest request) 
       throws IOException {
       throws IOException {
       TaskId taskId = request.getTaskId();
       TaskId taskId = request.getTaskId();
-      String message = "Kill task received from client " + taskId;
+      UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
+      String message = "Kill task " + taskId + " received from " + callerUGI
+          + " at " + Server.getRemoteAddress();
       LOG.info(message);
       LOG.info(message);
-      verifyAndGetTask(taskId, true);
+      verifyAndGetTask(taskId, JobACL.MODIFY_JOB);
       appContext.getEventHandler().handle(
       appContext.getEventHandler().handle(
           new TaskEvent(taskId, TaskEventType.T_KILL));
           new TaskEvent(taskId, TaskEventType.T_KILL));
       KillTaskResponse response = 
       KillTaskResponse response = 
@@ -302,9 +316,12 @@ public class MRClientService extends AbstractService
     public KillTaskAttemptResponse killTaskAttempt(
     public KillTaskAttemptResponse killTaskAttempt(
         KillTaskAttemptRequest request) throws IOException {
         KillTaskAttemptRequest request) throws IOException {
       TaskAttemptId taskAttemptId = request.getTaskAttemptId();
       TaskAttemptId taskAttemptId = request.getTaskAttemptId();
-      String message = "Kill task attempt received from client " + taskAttemptId;
+      UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
+      String message = "Kill task attempt " + taskAttemptId
+          + " received from " + callerUGI + " at "
+          + Server.getRemoteAddress();
       LOG.info(message);
       LOG.info(message);
-      verifyAndGetAttempt(taskAttemptId, true);
+      verifyAndGetAttempt(taskAttemptId, JobACL.MODIFY_JOB);
       appContext.getEventHandler().handle(
       appContext.getEventHandler().handle(
           new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message));
           new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message));
       appContext.getEventHandler().handle(
       appContext.getEventHandler().handle(
@@ -322,8 +339,8 @@ public class MRClientService extends AbstractService
       
       
       GetDiagnosticsResponse response = 
       GetDiagnosticsResponse response = 
         recordFactory.newRecordInstance(GetDiagnosticsResponse.class);
         recordFactory.newRecordInstance(GetDiagnosticsResponse.class);
-      response.addAllDiagnostics(
-          verifyAndGetAttempt(taskAttemptId, false).getDiagnostics());
+      response.addAllDiagnostics(verifyAndGetAttempt(taskAttemptId,
+          JobACL.VIEW_JOB).getDiagnostics());
       return response;
       return response;
     }
     }
 
 
@@ -332,9 +349,12 @@ public class MRClientService extends AbstractService
     public FailTaskAttemptResponse failTaskAttempt(
     public FailTaskAttemptResponse failTaskAttempt(
         FailTaskAttemptRequest request) throws IOException {
         FailTaskAttemptRequest request) throws IOException {
       TaskAttemptId taskAttemptId = request.getTaskAttemptId();
       TaskAttemptId taskAttemptId = request.getTaskAttemptId();
-      String message = "Fail task attempt received from client " + taskAttemptId;
+      UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
+      String message = "Fail task attempt " + taskAttemptId
+          + " received from " + callerUGI + " at "
+          + Server.getRemoteAddress();
       LOG.info(message);
       LOG.info(message);
-      verifyAndGetAttempt(taskAttemptId, true);
+      verifyAndGetAttempt(taskAttemptId, JobACL.MODIFY_JOB);
       appContext.getEventHandler().handle(
       appContext.getEventHandler().handle(
           new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message));
           new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message));
       appContext.getEventHandler().handle(
       appContext.getEventHandler().handle(
@@ -356,7 +376,7 @@ public class MRClientService extends AbstractService
       GetTaskReportsResponse response = 
       GetTaskReportsResponse response = 
         recordFactory.newRecordInstance(GetTaskReportsResponse.class);
         recordFactory.newRecordInstance(GetTaskReportsResponse.class);
       
       
-      Job job = verifyAndGetJob(jobId, false);
+      Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
       Collection<Task> tasks = job.getTasks(taskType).values();
       Collection<Task> tasks = job.getTasks(taskType).values();
       LOG.info("Getting task report for " + taskType + "   " + jobId
       LOG.info("Getting task report for " + taskType + "   " + jobId
           + ". Report-size will be " + tasks.size());
           + ". Report-size will be " + tasks.size());

+ 85 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java

@@ -18,13 +18,20 @@
 
 
 package org.apache.hadoop.mapreduce.v2.app;
 package org.apache.hadoop.mapreduce.v2.app;
 
 
+import static org.junit.Assert.fail;
+
+import java.security.PrivilegedExceptionAction;
 import java.util.Iterator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.List;
 
 
 import junit.framework.Assert;
 import junit.framework.Assert;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
@@ -32,6 +39,9 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompleti
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@@ -51,6 +61,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -169,6 +181,79 @@ public class TestMRClientService {
     app.waitForState(job, JobState.SUCCEEDED);
     app.waitForState(job, JobState.SUCCEEDED);
   }
   }
 
 
+  @Test
+  public void testViewAclOnlyCannotModify() throws Exception {
+    final MRAppWithClientService app = new MRAppWithClientService(1, 0, false);
+    final Configuration conf = new Configuration();
+    conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+    conf.set(MRJobConfig.JOB_ACL_VIEW_JOB, "viewonlyuser");
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task task = it.next();
+    app.waitForState(task, TaskState.RUNNING);
+    TaskAttempt attempt = task.getAttempts().values().iterator().next();
+    app.waitForState(attempt, TaskAttemptState.RUNNING);
+
+    UserGroupInformation viewOnlyUser =
+        UserGroupInformation.createUserForTesting(
+            "viewonlyuser", new String[] {});
+    Assert.assertTrue("viewonlyuser cannot view job",
+        job.checkAccess(viewOnlyUser, JobACL.VIEW_JOB));
+    Assert.assertFalse("viewonlyuser can modify job",
+        job.checkAccess(viewOnlyUser, JobACL.MODIFY_JOB));
+    MRClientProtocol client = viewOnlyUser.doAs(
+        new PrivilegedExceptionAction<MRClientProtocol>() {
+          @Override
+          public MRClientProtocol run() throws Exception {
+            YarnRPC rpc = YarnRPC.create(conf);
+            return (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
+                app.clientService.getBindAddress(), conf);
+          }
+        });
+
+    KillJobRequest killJobRequest = recordFactory.newRecordInstance(
+        KillJobRequest.class);
+    killJobRequest.setJobId(app.getJobId());
+    try {
+      client.killJob(killJobRequest);
+      fail("viewonlyuser killed job");
+    } catch (AccessControlException e) {
+      // pass
+    }
+
+    KillTaskRequest killTaskRequest = recordFactory.newRecordInstance(
+        KillTaskRequest.class);
+    killTaskRequest.setTaskId(task.getID());
+    try {
+      client.killTask(killTaskRequest);
+      fail("viewonlyuser killed task");
+    } catch (AccessControlException e) {
+      // pass
+    }
+
+    KillTaskAttemptRequest killTaskAttemptRequest =
+        recordFactory.newRecordInstance(KillTaskAttemptRequest.class);
+    killTaskAttemptRequest.setTaskAttemptId(attempt.getID());
+    try {
+      client.killTaskAttempt(killTaskAttemptRequest);
+      fail("viewonlyuser killed task attempt");
+    } catch (AccessControlException e) {
+      // pass
+    }
+
+    FailTaskAttemptRequest failTaskAttemptRequest =
+        recordFactory.newRecordInstance(FailTaskAttemptRequest.class);
+    failTaskAttemptRequest.setTaskAttemptId(attempt.getID());
+    try {
+      client.failTaskAttempt(failTaskAttemptRequest);
+      fail("viewonlyuser killed task attempt");
+    } catch (AccessControlException e) {
+      // pass
+    }
+  }
+
   private void verifyJobReport(JobReport jr) {
   private void verifyJobReport(JobReport jr) {
     Assert.assertNotNull("JobReport is null", jr);
     Assert.assertNotNull("JobReport is null", jr);
     List<AMInfo> amInfos = jr.getAMInfos();
     List<AMInfo> amInfos = jr.getAMInfos();