|
@@ -18,6 +18,7 @@
|
|
|
|
|
|
package org.apache.hadoop.mapreduce.v2.app.client;
|
|
|
|
|
|
+import java.io.IOException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collection;
|
|
@@ -28,6 +29,7 @@ import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
|
import org.apache.hadoop.ipc.Server;
|
|
|
+import org.apache.hadoop.mapreduce.JobACL;
|
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
|
|
@@ -188,15 +190,26 @@ public class MRClientService extends AbstractService
|
|
|
}
|
|
|
|
|
|
private Job verifyAndGetJob(JobId jobID,
|
|
|
- boolean modifyAccess) throws YarnRemoteException {
|
|
|
+ JobACL accessType) throws YarnRemoteException {
|
|
|
Job job = appContext.getJob(jobID);
|
|
|
+ UserGroupInformation ugi;
|
|
|
+ try {
|
|
|
+ ugi = UserGroupInformation.getCurrentUser();
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw RPCUtil.getRemoteException(e);
|
|
|
+ }
|
|
|
+ if (!job.checkAccess(ugi, accessType)) {
|
|
|
+ throw RPCUtil.getRemoteException("User " + ugi.getShortUserName()
|
|
|
+ + " cannot perform operation " + accessType.name() + " on "
|
|
|
+ + jobID);
|
|
|
+ }
|
|
|
return job;
|
|
|
}
|
|
|
|
|
|
private Task verifyAndGetTask(TaskId taskID,
|
|
|
- boolean modifyAccess) throws YarnRemoteException {
|
|
|
+ JobACL accessType) throws YarnRemoteException {
|
|
|
Task task = verifyAndGetJob(taskID.getJobId(),
|
|
|
- modifyAccess).getTask(taskID);
|
|
|
+ accessType).getTask(taskID);
|
|
|
if (task == null) {
|
|
|
throw RPCUtil.getRemoteException("Unknown Task " + taskID);
|
|
|
}
|
|
@@ -204,9 +217,9 @@ public class MRClientService extends AbstractService
|
|
|
}
|
|
|
|
|
|
private TaskAttempt verifyAndGetAttempt(TaskAttemptId attemptID,
|
|
|
- boolean modifyAccess) throws YarnRemoteException {
|
|
|
+ JobACL accessType) throws YarnRemoteException {
|
|
|
TaskAttempt attempt = verifyAndGetTask(attemptID.getTaskId(),
|
|
|
- modifyAccess).getAttempt(attemptID);
|
|
|
+ accessType).getAttempt(attemptID);
|
|
|
if (attempt == null) {
|
|
|
throw RPCUtil.getRemoteException("Unknown TaskAttempt " + attemptID);
|
|
|
}
|
|
@@ -217,7 +230,7 @@ public class MRClientService extends AbstractService
|
|
|
public GetCountersResponse getCounters(GetCountersRequest request)
|
|
|
throws YarnRemoteException {
|
|
|
JobId jobId = request.getJobId();
|
|
|
- Job job = verifyAndGetJob(jobId, false);
|
|
|
+ Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
|
|
|
GetCountersResponse response =
|
|
|
recordFactory.newRecordInstance(GetCountersResponse.class);
|
|
|
response.setCounters(TypeConverter.toYarn(job.getAllCounters()));
|
|
@@ -228,7 +241,7 @@ public class MRClientService extends AbstractService
|
|
|
public GetJobReportResponse getJobReport(GetJobReportRequest request)
|
|
|
throws YarnRemoteException {
|
|
|
JobId jobId = request.getJobId();
|
|
|
- Job job = verifyAndGetJob(jobId, false);
|
|
|
+ Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
|
|
|
GetJobReportResponse response =
|
|
|
recordFactory.newRecordInstance(GetJobReportResponse.class);
|
|
|
if (job != null) {
|
|
@@ -247,7 +260,7 @@ public class MRClientService extends AbstractService
|
|
|
GetTaskAttemptReportResponse response =
|
|
|
recordFactory.newRecordInstance(GetTaskAttemptReportResponse.class);
|
|
|
response.setTaskAttemptReport(
|
|
|
- verifyAndGetAttempt(taskAttemptId, false).getReport());
|
|
|
+ verifyAndGetAttempt(taskAttemptId, JobACL.VIEW_JOB).getReport());
|
|
|
return response;
|
|
|
}
|
|
|
|
|
@@ -257,7 +270,8 @@ public class MRClientService extends AbstractService
|
|
|
TaskId taskId = request.getTaskId();
|
|
|
GetTaskReportResponse response =
|
|
|
recordFactory.newRecordInstance(GetTaskReportResponse.class);
|
|
|
- response.setTaskReport(verifyAndGetTask(taskId, false).getReport());
|
|
|
+ response.setTaskReport(
|
|
|
+ verifyAndGetTask(taskId, JobACL.VIEW_JOB).getReport());
|
|
|
return response;
|
|
|
}
|
|
|
|
|
@@ -268,7 +282,7 @@ public class MRClientService extends AbstractService
|
|
|
JobId jobId = request.getJobId();
|
|
|
int fromEventId = request.getFromEventId();
|
|
|
int maxEvents = request.getMaxEvents();
|
|
|
- Job job = verifyAndGetJob(jobId, false);
|
|
|
+ Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
|
|
|
|
|
|
GetTaskAttemptCompletionEventsResponse response =
|
|
|
recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
|
|
@@ -282,9 +296,16 @@ public class MRClientService extends AbstractService
|
|
|
public KillJobResponse killJob(KillJobRequest request)
|
|
|
throws YarnRemoteException {
|
|
|
JobId jobId = request.getJobId();
|
|
|
- String message = "Kill Job received from client " + jobId;
|
|
|
+ UserGroupInformation callerUGI;
|
|
|
+ try {
|
|
|
+ callerUGI = UserGroupInformation.getCurrentUser();
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw RPCUtil.getRemoteException(e);
|
|
|
+ }
|
|
|
+ String message = "Kill job " + jobId + " received from " + callerUGI
|
|
|
+ + " at " + Server.getRemoteAddress();
|
|
|
LOG.info(message);
|
|
|
- verifyAndGetJob(jobId, true);
|
|
|
+ verifyAndGetJob(jobId, JobACL.MODIFY_JOB);
|
|
|
appContext.getEventHandler().handle(
|
|
|
new JobDiagnosticsUpdateEvent(jobId, message));
|
|
|
appContext.getEventHandler().handle(
|
|
@@ -299,9 +320,16 @@ public class MRClientService extends AbstractService
|
|
|
public KillTaskResponse killTask(KillTaskRequest request)
|
|
|
throws YarnRemoteException {
|
|
|
TaskId taskId = request.getTaskId();
|
|
|
- String message = "Kill task received from client " + taskId;
|
|
|
+ UserGroupInformation callerUGI;
|
|
|
+ try {
|
|
|
+ callerUGI = UserGroupInformation.getCurrentUser();
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw RPCUtil.getRemoteException(e);
|
|
|
+ }
|
|
|
+ String message = "Kill task " + taskId + " received from " + callerUGI
|
|
|
+ + " at " + Server.getRemoteAddress();
|
|
|
LOG.info(message);
|
|
|
- verifyAndGetTask(taskId, true);
|
|
|
+ verifyAndGetTask(taskId, JobACL.MODIFY_JOB);
|
|
|
appContext.getEventHandler().handle(
|
|
|
new TaskEvent(taskId, TaskEventType.T_KILL));
|
|
|
KillTaskResponse response =
|
|
@@ -314,9 +342,17 @@ public class MRClientService extends AbstractService
|
|
|
public KillTaskAttemptResponse killTaskAttempt(
|
|
|
KillTaskAttemptRequest request) throws YarnRemoteException {
|
|
|
TaskAttemptId taskAttemptId = request.getTaskAttemptId();
|
|
|
- String message = "Kill task attempt received from client " + taskAttemptId;
|
|
|
+ UserGroupInformation callerUGI;
|
|
|
+ try {
|
|
|
+ callerUGI = UserGroupInformation.getCurrentUser();
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw RPCUtil.getRemoteException(e);
|
|
|
+ }
|
|
|
+ String message = "Kill task attempt " + taskAttemptId
|
|
|
+ + " received from " + callerUGI + " at "
|
|
|
+ + Server.getRemoteAddress();
|
|
|
LOG.info(message);
|
|
|
- verifyAndGetAttempt(taskAttemptId, true);
|
|
|
+ verifyAndGetAttempt(taskAttemptId, JobACL.MODIFY_JOB);
|
|
|
appContext.getEventHandler().handle(
|
|
|
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message));
|
|
|
appContext.getEventHandler().handle(
|
|
@@ -334,8 +370,8 @@ public class MRClientService extends AbstractService
|
|
|
|
|
|
GetDiagnosticsResponse response =
|
|
|
recordFactory.newRecordInstance(GetDiagnosticsResponse.class);
|
|
|
- response.addAllDiagnostics(
|
|
|
- verifyAndGetAttempt(taskAttemptId, false).getDiagnostics());
|
|
|
+ response.addAllDiagnostics(verifyAndGetAttempt(taskAttemptId,
|
|
|
+ JobACL.VIEW_JOB).getDiagnostics());
|
|
|
return response;
|
|
|
}
|
|
|
|
|
@@ -344,9 +380,17 @@ public class MRClientService extends AbstractService
|
|
|
public FailTaskAttemptResponse failTaskAttempt(
|
|
|
FailTaskAttemptRequest request) throws YarnRemoteException {
|
|
|
TaskAttemptId taskAttemptId = request.getTaskAttemptId();
|
|
|
- String message = "Fail task attempt received from client " + taskAttemptId;
|
|
|
+ UserGroupInformation callerUGI;
|
|
|
+ try {
|
|
|
+ callerUGI = UserGroupInformation.getCurrentUser();
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw RPCUtil.getRemoteException(e);
|
|
|
+ }
|
|
|
+ String message = "Fail task attempt " + taskAttemptId
|
|
|
+ + " received from " + callerUGI + " at "
|
|
|
+ + Server.getRemoteAddress();
|
|
|
LOG.info(message);
|
|
|
- verifyAndGetAttempt(taskAttemptId, true);
|
|
|
+ verifyAndGetAttempt(taskAttemptId, JobACL.MODIFY_JOB);
|
|
|
appContext.getEventHandler().handle(
|
|
|
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message));
|
|
|
appContext.getEventHandler().handle(
|
|
@@ -368,7 +412,7 @@ public class MRClientService extends AbstractService
|
|
|
GetTaskReportsResponse response =
|
|
|
recordFactory.newRecordInstance(GetTaskReportsResponse.class);
|
|
|
|
|
|
- Job job = verifyAndGetJob(jobId, false);
|
|
|
+ Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
|
|
|
Collection<Task> tasks = job.getTasks(taskType).values();
|
|
|
LOG.info("Getting task report for " + taskType + " " + jobId
|
|
|
+ ". Report-size will be " + tasks.size());
|