|
@@ -22,20 +22,19 @@ import java.io.ByteArrayOutputStream;
|
|
import java.io.File;
|
|
import java.io.File;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.io.PrintStream;
|
|
import java.io.PrintStream;
|
|
-import java.net.URI;
|
|
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
import java.util.concurrent.BlockingQueue;
|
|
import java.util.concurrent.BlockingQueue;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
-import java.util.concurrent.ThreadPoolExecutor;
|
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.fs.FSError;
|
|
import org.apache.hadoop.fs.FSError;
|
|
|
|
+import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileContext;
|
|
import org.apache.hadoop.fs.FileContext;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
|
|
|
+import org.apache.hadoop.mapreduce.JobContext;
|
|
import org.apache.hadoop.mapreduce.JobCounter;
|
|
import org.apache.hadoop.mapreduce.JobCounter;
|
|
import org.apache.hadoop.mapreduce.MRConfig;
|
|
import org.apache.hadoop.mapreduce.MRConfig;
|
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
|
@@ -47,13 +46,12 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunched
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
|
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
|
-import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
|
|
|
-import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
|
|
|
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
|
|
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
|
|
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
|
|
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
|
|
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
|
|
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
import org.apache.hadoop.yarn.YarnException;
|
|
import org.apache.hadoop.yarn.YarnException;
|
|
|
|
+import org.apache.hadoop.yarn.api.ApplicationConstants;
|
|
import org.apache.hadoop.yarn.service.AbstractService;
|
|
import org.apache.hadoop.yarn.service.AbstractService;
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -80,7 +78,10 @@ public class LocalContainerLauncher extends AbstractService implements
|
|
super(LocalContainerLauncher.class.getName());
|
|
super(LocalContainerLauncher.class.getName());
|
|
this.context = context;
|
|
this.context = context;
|
|
this.umbilical = umbilical;
|
|
this.umbilical = umbilical;
|
|
- // umbilical: MRAppMaster creates (taskAttemptListener), passes to us (TODO/FIXME: pointless to use RPC to talk to self; should create LocalTaskAttemptListener or similar: implement umbilical protocol but skip RPC stuff)
|
|
|
|
|
|
+ // umbilical: MRAppMaster creates (taskAttemptListener), passes to us
|
|
|
|
+ // (TODO/FIXME: pointless to use RPC to talk to self; should create
|
|
|
|
+ // LocalTaskAttemptListener or similar: implement umbilical protocol
|
|
|
|
+ // but skip RPC stuff)
|
|
|
|
|
|
try {
|
|
try {
|
|
curFC = FileContext.getFileContext(curDir.toURI());
|
|
curFC = FileContext.getFileContext(curDir.toURI());
|
|
@@ -152,7 +153,6 @@ public class LocalContainerLauncher extends AbstractService implements
|
|
* ]]
|
|
* ]]
|
|
* - runs Task (runSubMap() or runSubReduce())
|
|
* - runs Task (runSubMap() or runSubReduce())
|
|
* - TA can safely send TA_UPDATE since in RUNNING state
|
|
* - TA can safely send TA_UPDATE since in RUNNING state
|
|
- * [modulo possible TA-state-machine race noted below: CHECK (TODO)]
|
|
|
|
*/
|
|
*/
|
|
private class SubtaskRunner implements Runnable {
|
|
private class SubtaskRunner implements Runnable {
|
|
|
|
|
|
@@ -162,6 +162,7 @@ public class LocalContainerLauncher extends AbstractService implements
|
|
SubtaskRunner() {
|
|
SubtaskRunner() {
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
@Override
|
|
@Override
|
|
public void run() {
|
|
public void run() {
|
|
ContainerLauncherEvent event = null;
|
|
ContainerLauncherEvent event = null;
|
|
@@ -183,7 +184,7 @@ public class LocalContainerLauncher extends AbstractService implements
|
|
|
|
|
|
ContainerRemoteLaunchEvent launchEv =
|
|
ContainerRemoteLaunchEvent launchEv =
|
|
(ContainerRemoteLaunchEvent)event;
|
|
(ContainerRemoteLaunchEvent)event;
|
|
- TaskAttemptId attemptID = launchEv.getTaskAttemptID(); //FIXME: can attemptID ever be null? (only if retrieved over umbilical?)
|
|
|
|
|
|
+ TaskAttemptId attemptID = launchEv.getTaskAttemptID();
|
|
|
|
|
|
Job job = context.getAllJobs().get(attemptID.getTaskId().getJobId());
|
|
Job job = context.getAllJobs().get(attemptID.getTaskId().getJobId());
|
|
int numMapTasks = job.getTotalMaps();
|
|
int numMapTasks = job.getTotalMaps();
|
|
@@ -204,7 +205,6 @@ public class LocalContainerLauncher extends AbstractService implements
|
|
// port number is set to -1 in this case.
|
|
// port number is set to -1 in this case.
|
|
context.getEventHandler().handle(
|
|
context.getEventHandler().handle(
|
|
new TaskAttemptContainerLaunchedEvent(attemptID, -1));
|
|
new TaskAttemptContainerLaunchedEvent(attemptID, -1));
|
|
- //FIXME: race condition here? or do we have same kind of lock on TA handler => MapTask can't send TA_UPDATE before TA_CONTAINER_LAUNCHED moves TA to RUNNING state? (probably latter)
|
|
|
|
|
|
|
|
if (numMapTasks == 0) {
|
|
if (numMapTasks == 0) {
|
|
doneWithMaps = true;
|
|
doneWithMaps = true;
|
|
@@ -259,6 +259,7 @@ public class LocalContainerLauncher extends AbstractService implements
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @SuppressWarnings("deprecation")
|
|
private void runSubtask(org.apache.hadoop.mapred.Task task,
|
|
private void runSubtask(org.apache.hadoop.mapred.Task task,
|
|
final TaskType taskType,
|
|
final TaskType taskType,
|
|
TaskAttemptId attemptID,
|
|
TaskAttemptId attemptID,
|
|
@@ -270,6 +271,19 @@ public class LocalContainerLauncher extends AbstractService implements
|
|
|
|
|
|
try {
|
|
try {
|
|
JobConf conf = new JobConf(getConfig());
|
|
JobConf conf = new JobConf(getConfig());
|
|
|
|
+ conf.set(JobContext.TASK_ID, task.getTaskID().toString());
|
|
|
|
+ conf.set(JobContext.TASK_ATTEMPT_ID, classicAttemptID.toString());
|
|
|
|
+ conf.setBoolean(JobContext.TASK_ISMAP, (taskType == TaskType.MAP));
|
|
|
|
+ conf.setInt(JobContext.TASK_PARTITION, task.getPartition());
|
|
|
|
+ conf.set(JobContext.ID, task.getJobID().toString());
|
|
|
|
+
|
|
|
|
+ // Use the AM's local dir env to generate the intermediate step
|
|
|
|
+ // output files
|
|
|
|
+ String[] localSysDirs = StringUtils.getTrimmedStrings(
|
|
|
|
+ System.getenv(ApplicationConstants.LOCAL_DIR_ENV));
|
|
|
|
+ conf.setStrings(MRConfig.LOCAL_DIR, localSysDirs);
|
|
|
|
+ LOG.info(MRConfig.LOCAL_DIR + " for uber task: "
|
|
|
|
+ + conf.get(MRConfig.LOCAL_DIR));
|
|
|
|
|
|
// mark this as an uberized subtask so it can set task counter
|
|
// mark this as an uberized subtask so it can set task counter
|
|
// (longer-term/FIXME: could redefine as job counter and send
|
|
// (longer-term/FIXME: could redefine as job counter and send
|
|
@@ -285,12 +299,12 @@ public class LocalContainerLauncher extends AbstractService implements
|
|
if (doneWithMaps) {
|
|
if (doneWithMaps) {
|
|
LOG.error("CONTAINER_REMOTE_LAUNCH contains a map task ("
|
|
LOG.error("CONTAINER_REMOTE_LAUNCH contains a map task ("
|
|
+ attemptID + "), but should be finished with maps");
|
|
+ attemptID + "), but should be finished with maps");
|
|
- // throw new RuntimeException() (FIXME: what's appropriate here?)
|
|
|
|
|
|
+ throw new RuntimeException();
|
|
}
|
|
}
|
|
|
|
|
|
MapTask map = (MapTask)task;
|
|
MapTask map = (MapTask)task;
|
|
|
|
+ map.setConf(conf);
|
|
|
|
|
|
- //CODE-REVIEWER QUESTION: why not task.getConf() or map.getConf() instead of conf? do we need Task's localizeConfiguration() run on this first?
|
|
|
|
map.run(conf, umbilical);
|
|
map.run(conf, umbilical);
|
|
|
|
|
|
if (renameOutputs) {
|
|
if (renameOutputs) {
|
|
@@ -305,19 +319,23 @@ public class LocalContainerLauncher extends AbstractService implements
|
|
} else /* TaskType.REDUCE */ {
|
|
} else /* TaskType.REDUCE */ {
|
|
|
|
|
|
if (!doneWithMaps) {
|
|
if (!doneWithMaps) {
|
|
- //check if event-queue empty? whole idea of counting maps vs. checking event queue is a tad wacky...but could enforce ordering (assuming no "lost events") at LocalMRAppMaster [CURRENT BUG(?): doesn't send reduce event until maps all done]
|
|
|
|
|
|
+ // check if event-queue empty? whole idea of counting maps vs.
|
|
|
|
+ // checking event queue is a tad wacky...but could enforce ordering
|
|
|
|
+ // (assuming no "lost events") at LocalMRAppMaster [CURRENT BUG(?):
|
|
|
|
+ // doesn't send reduce event until maps all done]
|
|
LOG.error("CONTAINER_REMOTE_LAUNCH contains a reduce task ("
|
|
LOG.error("CONTAINER_REMOTE_LAUNCH contains a reduce task ("
|
|
+ attemptID + "), but not yet finished with maps");
|
|
+ attemptID + "), but not yet finished with maps");
|
|
- // throw new RuntimeException() (FIXME) // or push reduce event back onto end of queue? (probably former)
|
|
|
|
|
|
+ throw new RuntimeException();
|
|
}
|
|
}
|
|
|
|
|
|
- ReduceTask reduce = (ReduceTask)task;
|
|
|
|
-
|
|
|
|
// a.k.a. "mapreduce.jobtracker.address" in LocalJobRunner:
|
|
// a.k.a. "mapreduce.jobtracker.address" in LocalJobRunner:
|
|
// set framework name to local to make task local
|
|
// set framework name to local to make task local
|
|
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
|
|
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
|
|
conf.set(MRConfig.MASTER_ADDRESS, "local"); // bypass shuffle
|
|
conf.set(MRConfig.MASTER_ADDRESS, "local"); // bypass shuffle
|
|
|
|
|
|
|
|
+ ReduceTask reduce = (ReduceTask)task;
|
|
|
|
+ reduce.setConf(conf);
|
|
|
|
+
|
|
reduce.run(conf, umbilical);
|
|
reduce.run(conf, umbilical);
|
|
//relocalize(); // needed only if more than one reducer supported (is MAPREDUCE-434 fixed yet?)
|
|
//relocalize(); // needed only if more than one reducer supported (is MAPREDUCE-434 fixed yet?)
|
|
}
|
|
}
|
|
@@ -334,18 +352,7 @@ public class LocalContainerLauncher extends AbstractService implements
|
|
try {
|
|
try {
|
|
if (task != null) {
|
|
if (task != null) {
|
|
// do cleanup for the task
|
|
// do cleanup for the task
|
|
-// if (childUGI == null) { // no need to job into doAs block
|
|
|
|
- task.taskCleanup(umbilical);
|
|
|
|
-// } else {
|
|
|
|
-// final Task taskFinal = task;
|
|
|
|
-// childUGI.doAs(new PrivilegedExceptionAction<Object>() {
|
|
|
|
-// @Override
|
|
|
|
-// public Object run() throws Exception {
|
|
|
|
-// taskFinal.taskCleanup(umbilical);
|
|
|
|
-// return null;
|
|
|
|
-// }
|
|
|
|
-// });
|
|
|
|
-// }
|
|
|
|
|
|
+ task.taskCleanup(umbilical);
|
|
}
|
|
}
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
LOG.info("Exception cleaning up: "
|
|
LOG.info("Exception cleaning up: "
|
|
@@ -354,51 +361,21 @@ public class LocalContainerLauncher extends AbstractService implements
|
|
// Report back any failures, for diagnostic purposes
|
|
// Report back any failures, for diagnostic purposes
|
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
|
exception.printStackTrace(new PrintStream(baos));
|
|
exception.printStackTrace(new PrintStream(baos));
|
|
-// if (classicAttemptID != null) {
|
|
|
|
- umbilical.reportDiagnosticInfo(classicAttemptID, baos.toString());
|
|
|
|
-// }
|
|
|
|
|
|
+ umbilical.reportDiagnosticInfo(classicAttemptID, baos.toString());
|
|
throw new RuntimeException();
|
|
throw new RuntimeException();
|
|
|
|
|
|
} catch (Throwable throwable) {
|
|
} catch (Throwable throwable) {
|
|
LOG.fatal("Error running local (uberized) 'child' : "
|
|
LOG.fatal("Error running local (uberized) 'child' : "
|
|
+ StringUtils.stringifyException(throwable));
|
|
+ StringUtils.stringifyException(throwable));
|
|
-// if (classicAttemptID != null) {
|
|
|
|
- Throwable tCause = throwable.getCause();
|
|
|
|
- String cause = (tCause == null)
|
|
|
|
- ? throwable.getMessage()
|
|
|
|
- : StringUtils.stringifyException(tCause);
|
|
|
|
- umbilical.fatalError(classicAttemptID, cause);
|
|
|
|
-// }
|
|
|
|
|
|
+ Throwable tCause = throwable.getCause();
|
|
|
|
+ String cause = (tCause == null)
|
|
|
|
+ ? throwable.getMessage()
|
|
|
|
+ : StringUtils.stringifyException(tCause);
|
|
|
|
+ umbilical.fatalError(classicAttemptID, cause);
|
|
throw new RuntimeException();
|
|
throw new RuntimeException();
|
|
-
|
|
|
|
- } finally {
|
|
|
|
-/*
|
|
|
|
-FIXME: do we need to do any of this stuff? (guessing not since not in own JVM)
|
|
|
|
- RPC.stopProxy(umbilical);
|
|
|
|
- DefaultMetricsSystem.shutdown();
|
|
|
|
- // Shutting down log4j of the child-vm...
|
|
|
|
- // This assumes that on return from Task.run()
|
|
|
|
- // there is no more logging done.
|
|
|
|
- LogManager.shutdown();
|
|
|
|
- */
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
-/* FIXME: may not need renameMapOutputForReduce() anymore? TEST!
|
|
|
|
-
|
|
|
|
-${local.dir}/usercache/$user/appcache/$appId/$contId/ == $cwd for containers;
|
|
|
|
-contains launch_container.sh script, which, when executed, creates symlinks and
|
|
|
|
-sets up env
|
|
|
|
- "$local.dir"/usercache/$user/appcache/$appId/$contId/file.out
|
|
|
|
- "$local.dir"/usercache/$user/appcache/$appId/$contId/file.out.idx (?)
|
|
|
|
- "$local.dir"/usercache/$user/appcache/$appId/output/$taskId/ is where file.out* is moved after MapTask done
|
|
|
|
-
|
|
|
|
- OHO! no further need for this at all? $taskId is unique per subtask
|
|
|
|
- now => should work fine to leave alone. TODO: test with teragen or
|
|
|
|
- similar
|
|
|
|
- */
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Within the _local_ filesystem (not HDFS), all activity takes place within
|
|
* Within the _local_ filesystem (not HDFS), all activity takes place within
|
|
* a single subdir (${local.dir}/usercache/$user/appcache/$appId/$contId/),
|
|
* a single subdir (${local.dir}/usercache/$user/appcache/$appId/$contId/),
|
|
@@ -409,14 +386,21 @@ sets up env
|
|
* filenames instead of "file.out". (All of this is entirely internal,
|
|
* filenames instead of "file.out". (All of this is entirely internal,
|
|
* so there are no particular compatibility issues.)
|
|
* so there are no particular compatibility issues.)
|
|
*/
|
|
*/
|
|
|
|
+ @SuppressWarnings("deprecation")
|
|
private void renameMapOutputForReduce(JobConf conf, TaskAttemptId mapId,
|
|
private void renameMapOutputForReduce(JobConf conf, TaskAttemptId mapId,
|
|
MapOutputFile subMapOutputFile)
|
|
MapOutputFile subMapOutputFile)
|
|
throws IOException {
|
|
throws IOException {
|
|
FileSystem localFs = FileSystem.getLocal(conf);
|
|
FileSystem localFs = FileSystem.getLocal(conf);
|
|
// move map output to reduce input
|
|
// move map output to reduce input
|
|
Path mapOut = subMapOutputFile.getOutputFile();
|
|
Path mapOut = subMapOutputFile.getOutputFile();
|
|
|
|
+ FileStatus mStatus = localFs.getFileStatus(mapOut);
|
|
Path reduceIn = subMapOutputFile.getInputFileForWrite(
|
|
Path reduceIn = subMapOutputFile.getInputFileForWrite(
|
|
- TypeConverter.fromYarn(mapId).getTaskID(), localFs.getLength(mapOut));
|
|
|
|
|
|
+ TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Renaming map output file for task attempt "
|
|
|
|
+ + mapId.toString() + " from original location " + mapOut.toString()
|
|
|
|
+ + " to destination " + reduceIn.toString());
|
|
|
|
+ }
|
|
if (!localFs.mkdirs(reduceIn.getParent())) {
|
|
if (!localFs.mkdirs(reduceIn.getParent())) {
|
|
throw new IOException("Mkdirs failed to create "
|
|
throw new IOException("Mkdirs failed to create "
|
|
+ reduceIn.getParent().toString());
|
|
+ reduceIn.getParent().toString());
|
|
@@ -429,8 +413,7 @@ sets up env
|
|
* Also within the local filesystem, we need to restore the initial state
|
|
* Also within the local filesystem, we need to restore the initial state
|
|
* of the directory as much as possible. Compare current contents against
|
|
* of the directory as much as possible. Compare current contents against
|
|
* the saved original state and nuke everything that doesn't belong, with
|
|
* the saved original state and nuke everything that doesn't belong, with
|
|
- * the exception of the renamed map outputs (see above).
|
|
|
|
-FIXME: do we really need to worry about renamed map outputs, or already moved to output dir on commit? if latter, fix comment
|
|
|
|
|
|
+ * the exception of the renamed map outputs.
|
|
*
|
|
*
|
|
* Any jobs that go out of their way to rename or delete things from the
|
|
* Any jobs that go out of their way to rename or delete things from the
|
|
* local directory are considered broken and deserve what they get...
|
|
* local directory are considered broken and deserve what they get...
|