|
@@ -18,6 +18,7 @@
|
|
|
package org.apache.hadoop.mapred;
|
|
|
|
|
|
import java.io.File;
|
|
|
+import java.io.FileInputStream;
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.FileOutputStream;
|
|
|
import java.io.IOException;
|
|
@@ -70,6 +71,7 @@ import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.http.HttpServer;
|
|
|
import org.apache.hadoop.io.IntWritable;
|
|
|
+import org.apache.hadoop.io.SecureIOUtils;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
|
import org.apache.hadoop.ipc.Server;
|
|
@@ -2897,6 +2899,15 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
return task.getTaskID().hashCode();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private void authorizeJVM(JobID jobId) throws IOException {
|
|
|
+ String currentJobId =
|
|
|
+ UserGroupInformation.getCurrentUser().getUserName();
|
|
|
+ if (!currentJobId.equals(jobId.toString())) {
|
|
|
+ throw new IOException ("JVM with " + currentJobId +
|
|
|
+ " is not authorized for " + jobId);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
|
|
|
// ///////////////////////////////////////////////////////////////
|
|
@@ -2908,6 +2919,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
*/
|
|
|
public synchronized JvmTask getTask(JvmContext context)
|
|
|
throws IOException {
|
|
|
+ authorizeJVM(context.jvmId.getJobId());
|
|
|
JVMId jvmId = context.jvmId;
|
|
|
LOG.debug("JVM with ID : " + jvmId + " asked for a task");
|
|
|
// save pid of task JVM sent by child
|
|
@@ -2948,6 +2960,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
public synchronized boolean statusUpdate(TaskAttemptID taskid,
|
|
|
TaskStatus taskStatus)
|
|
|
throws IOException {
|
|
|
+ authorizeJVM(taskid.getJobID());
|
|
|
TaskInProgress tip = tasks.get(taskid);
|
|
|
if (tip != null) {
|
|
|
tip.reportProgress(taskStatus);
|
|
@@ -2963,6 +2976,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
* diagnostic info
|
|
|
*/
|
|
|
public synchronized void reportDiagnosticInfo(TaskAttemptID taskid, String info) throws IOException {
|
|
|
+ authorizeJVM(taskid.getJobID());
|
|
|
TaskInProgress tip = tasks.get(taskid);
|
|
|
if (tip != null) {
|
|
|
tip.reportDiagnosticInfo(info);
|
|
@@ -2973,6 +2987,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
|
|
|
public synchronized void reportNextRecordRange(TaskAttemptID taskid,
|
|
|
SortedRanges.Range range) throws IOException {
|
|
|
+ authorizeJVM(taskid.getJobID());
|
|
|
TaskInProgress tip = tasks.get(taskid);
|
|
|
if (tip != null) {
|
|
|
tip.reportNextRecordRange(range);
|
|
@@ -2984,6 +2999,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
|
|
|
/** Child checking to see if we're alive. Normally does nothing.*/
|
|
|
public synchronized boolean ping(TaskAttemptID taskid) throws IOException {
|
|
|
+ authorizeJVM(taskid.getJobID());
|
|
|
return tasks.get(taskid) != null;
|
|
|
}
|
|
|
|
|
@@ -2994,6 +3010,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
public synchronized void commitPending(TaskAttemptID taskid,
|
|
|
TaskStatus taskStatus)
|
|
|
throws IOException {
|
|
|
+ authorizeJVM(taskid.getJobID());
|
|
|
LOG.info("Task " + taskid + " is in commit-pending," +"" +
|
|
|
" task state:" +taskStatus.getRunState());
|
|
|
statusUpdate(taskid, taskStatus);
|
|
@@ -3003,7 +3020,9 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
/**
|
|
|
* Child checking whether it can commit
|
|
|
*/
|
|
|
- public synchronized boolean canCommit(TaskAttemptID taskid) {
|
|
|
+ public synchronized boolean canCommit(TaskAttemptID taskid)
|
|
|
+ throws IOException {
|
|
|
+ authorizeJVM(taskid.getJobID());
|
|
|
return commitResponses.contains(taskid); //don't remove it now
|
|
|
}
|
|
|
|
|
@@ -3012,6 +3031,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
*/
|
|
|
public synchronized void done(TaskAttemptID taskid)
|
|
|
throws IOException {
|
|
|
+ authorizeJVM(taskid.getJobID());
|
|
|
TaskInProgress tip = tasks.get(taskid);
|
|
|
commitResponses.remove(taskid);
|
|
|
if (tip != null) {
|
|
@@ -3027,6 +3047,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
*/
|
|
|
public synchronized void shuffleError(TaskAttemptID taskId, String message)
|
|
|
throws IOException {
|
|
|
+ authorizeJVM(taskId.getJobID());
|
|
|
LOG.fatal("Task: " + taskId + " - Killed due to Shuffle Failure: " + message);
|
|
|
TaskInProgress tip = runningTasks.get(taskId);
|
|
|
tip.reportDiagnosticInfo("Shuffle Error: " + message);
|
|
@@ -3038,6 +3059,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
*/
|
|
|
public synchronized void fsError(TaskAttemptID taskId, String message)
|
|
|
throws IOException {
|
|
|
+ authorizeJVM(taskId.getJobID());
|
|
|
LOG.fatal("Task: " + taskId + " - Killed due to FSError: " + message);
|
|
|
TaskInProgress tip = runningTasks.get(taskId);
|
|
|
tip.reportDiagnosticInfo("FSError: " + message);
|
|
@@ -3049,6 +3071,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
*/
|
|
|
public synchronized void fatalError(TaskAttemptID taskId, String msg)
|
|
|
throws IOException {
|
|
|
+ authorizeJVM(taskId.getJobID());
|
|
|
LOG.fatal("Task: " + taskId + " - Killed : " + msg);
|
|
|
TaskInProgress tip = runningTasks.get(taskId);
|
|
|
tip.reportDiagnosticInfo("Error: " + msg);
|
|
@@ -3058,6 +3081,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
public synchronized MapTaskCompletionEventsUpdate getMapCompletionEvents(
|
|
|
JobID jobId, int fromEventId, int maxLocs, TaskAttemptID id)
|
|
|
throws IOException {
|
|
|
+ authorizeJVM(jobId);
|
|
|
TaskCompletionEvent[]mapEvents = TaskCompletionEvent.EMPTY_ARRAY;
|
|
|
synchronized (shouldReset) {
|
|
|
if (shouldReset.remove(id)) {
|
|
@@ -3316,7 +3340,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
// true iff IOException was caused by attempt to access input
|
|
|
boolean isInputException = true;
|
|
|
OutputStream outStream = null;
|
|
|
- FSDataInputStream mapOutputIn = null;
|
|
|
+ FileInputStream mapOutputIn = null;
|
|
|
|
|
|
long totalRead = 0;
|
|
|
ShuffleServerInstrumentation shuffleMetrics =
|
|
@@ -3339,12 +3363,14 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
context.getAttribute("local.file.system")).getRaw();
|
|
|
|
|
|
String userName = null;
|
|
|
+ String runAsUserName = null;
|
|
|
synchronized (tracker.runningJobs) {
|
|
|
RunningJob rjob = tracker.runningJobs.get(JobID.forName(jobId));
|
|
|
if (rjob == null) {
|
|
|
throw new IOException("Unknown job " + jobId + "!!");
|
|
|
}
|
|
|
userName = rjob.jobConf.getUser();
|
|
|
+ runAsUserName = tracker.getTaskController().getRunAsUser(rjob.jobConf);
|
|
|
}
|
|
|
// Index file
|
|
|
Path indexFileName =
|
|
@@ -3363,7 +3389,8 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
* the map-output for the given reducer is available.
|
|
|
*/
|
|
|
IndexRecord info =
|
|
|
- tracker.indexCache.getIndexInformation(mapId, reduce,indexFileName);
|
|
|
+ tracker.indexCache.getIndexInformation(mapId, reduce,indexFileName,
|
|
|
+ runAsUserName);
|
|
|
|
|
|
//set the custom "from-map-task" http header to the map task from which
|
|
|
//the map output data is being transferred
|
|
@@ -3391,10 +3418,11 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
* send it to the reducer.
|
|
|
*/
|
|
|
//open the map-output file
|
|
|
- mapOutputIn = rfs.open(mapOutputFileName);
|
|
|
+ mapOutputIn = SecureIOUtils.openForRead(
|
|
|
+ new File(mapOutputFileName.toUri().getPath()), runAsUserName, null);
|
|
|
|
|
|
//seek to the correct offset for the reduce
|
|
|
- mapOutputIn.seek(info.startOffset);
|
|
|
+ mapOutputIn.skip(info.startOffset);
|
|
|
long rem = info.partLength;
|
|
|
int len =
|
|
|
mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ));
|