|
@@ -19,16 +19,16 @@
|
|
|
package org.apache.hadoop.mapreduce.lib.output;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
-import java.net.URI;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
+import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
-import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.fs.PathFilter;
|
|
|
import org.apache.hadoop.mapreduce.JobContext;
|
|
|
import org.apache.hadoop.mapreduce.JobStatus;
|
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
@@ -37,41 +37,239 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
|
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
|
|
|
|
|
/** An {@link OutputCommitter} that commits files specified
|
|
|
- * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
|
|
|
+ * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
|
|
|
**/
|
|
|
@InterfaceAudience.Public
|
|
|
@InterfaceStability.Stable
|
|
|
public class FileOutputCommitter extends OutputCommitter {
|
|
|
-
|
|
|
private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class);
|
|
|
|
|
|
- /**
|
|
|
- * Temporary directory name
|
|
|
+ /**
|
|
|
+ * Name of directory where pending data is placed. Data that has not been
|
|
|
+ * committed yet.
|
|
|
*/
|
|
|
- protected static final String TEMP_DIR_NAME = "_temporary";
|
|
|
+ public static final String PENDING_DIR_NAME = "_temporary";
|
|
|
public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
|
|
|
- static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
|
|
|
+ public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
|
|
|
"mapreduce.fileoutputcommitter.marksuccessfuljobs";
|
|
|
- private FileSystem outputFileSystem = null;
|
|
|
private Path outputPath = null;
|
|
|
private Path workPath = null;
|
|
|
|
|
|
/**
|
|
|
* Create a file output committer
|
|
|
- * @param outputPath the job's output path
|
|
|
+ * @param outputPath the job's output path, or null if you want the output
|
|
|
+ * committer to act as a noop.
|
|
|
* @param context the task's context
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public FileOutputCommitter(Path outputPath,
|
|
|
TaskAttemptContext context) throws IOException {
|
|
|
+ this(outputPath, (JobContext)context);
|
|
|
if (outputPath != null) {
|
|
|
- this.outputPath = outputPath;
|
|
|
- outputFileSystem = outputPath.getFileSystem(context.getConfiguration());
|
|
|
- workPath = new Path(outputPath,
|
|
|
- getTaskAttemptBaseDirName(context))
|
|
|
- .makeQualified(outputFileSystem);
|
|
|
+ workPath = getTaskAttemptPath(context, outputPath);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Create a file output committer
|
|
|
+ * @param outputPath the job's output path, or null if you want the output
|
|
|
+ * committer to act as a noop.
|
|
|
+ * @param context the task's context
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ @Private
|
|
|
+ public FileOutputCommitter(Path outputPath,
|
|
|
+ JobContext context) throws IOException {
|
|
|
+ if (outputPath != null) {
|
|
|
+ FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
|
|
|
+ this.outputPath = fs.makeQualified(outputPath);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @return the path where final output of the job should be placed. This
|
|
|
+ * could also be considered the committed application attempt path.
|
|
|
+ */
|
|
|
+ private Path getOutputPath() {
|
|
|
+ return this.outputPath;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @return true if we have an output path set, else false.
|
|
|
+ */
|
|
|
+ private boolean hasOutputPath() {
|
|
|
+ return this.outputPath != null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @return the path where the output of pending job attempts are
|
|
|
+ * stored.
|
|
|
+ */
|
|
|
+ private Path getPendingJobAttemptsPath() {
|
|
|
+ return getPendingJobAttemptsPath(getOutputPath());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the location of pending job attempts.
|
|
|
+ * @param out the base output directory.
|
|
|
+ * @return the location of pending job attempts.
|
|
|
+ */
|
|
|
+ private static Path getPendingJobAttemptsPath(Path out) {
|
|
|
+ return new Path(out, PENDING_DIR_NAME);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the Application Attempt Id for this job
|
|
|
+ * @param context the context to look in
|
|
|
+ * @return the Application Attempt Id for a given job.
|
|
|
+ */
|
|
|
+ private static int getAppAttemptId(JobContext context) {
|
|
|
+ return context.getConfiguration().getInt(
|
|
|
+ MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Compute the path where the output of a given job attempt will be placed.
|
|
|
+ * @param context the context of the job. This is used to get the
|
|
|
+ * application attempt id.
|
|
|
+ * @return the path to store job attempt data.
|
|
|
+ */
|
|
|
+ public Path getJobAttemptPath(JobContext context) {
|
|
|
+ return getJobAttemptPath(context, getOutputPath());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Compute the path where the output of a given job attempt will be placed.
|
|
|
+ * @param context the context of the job. This is used to get the
|
|
|
+ * application attempt id.
|
|
|
+ * @param out the output path to place these in.
|
|
|
+ * @return the path to store job attempt data.
|
|
|
+ */
|
|
|
+ public static Path getJobAttemptPath(JobContext context, Path out) {
|
|
|
+ return getJobAttemptPath(getAppAttemptId(context), out);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Compute the path where the output of a given job attempt will be placed.
|
|
|
+ * @param appAttemptId the ID of the application attempt for this job.
|
|
|
+ * @return the path to store job attempt data.
|
|
|
+ */
|
|
|
+ private Path getJobAttemptPath(int appAttemptId) {
|
|
|
+ return getJobAttemptPath(appAttemptId, getOutputPath());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Compute the path where the output of a given job attempt will be placed.
|
|
|
+ * @param appAttemptId the ID of the application attempt for this job.
|
|
|
+ * @return the path to store job attempt data.
|
|
|
+ */
|
|
|
+ private static Path getJobAttemptPath(int appAttemptId, Path out) {
|
|
|
+ return new Path(getPendingJobAttemptsPath(out), String.valueOf(appAttemptId));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Compute the path where the output of pending task attempts are stored.
|
|
|
+ * @param context the context of the job with pending tasks.
|
|
|
+ * @return the path where the output of pending task attempts are stored.
|
|
|
+ */
|
|
|
+ private Path getPendingTaskAttemptsPath(JobContext context) {
|
|
|
+ return getPendingTaskAttemptsPath(context, getOutputPath());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Compute the path where the output of pending task attempts are stored.
|
|
|
+ * @param context the context of the job with pending tasks.
|
|
|
+ * @return the path where the output of pending task attempts are stored.
|
|
|
+ */
|
|
|
+ private static Path getPendingTaskAttemptsPath(JobContext context, Path out) {
|
|
|
+ return new Path(getJobAttemptPath(context, out), PENDING_DIR_NAME);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Compute the path where the output of a task attempt is stored until
|
|
|
+ * that task is committed.
|
|
|
+ *
|
|
|
+ * @param context the context of the task attempt.
|
|
|
+ * @return the path where a task attempt should be stored.
|
|
|
+ */
|
|
|
+ public Path getTaskAttemptPath(TaskAttemptContext context) {
|
|
|
+ return new Path(getPendingTaskAttemptsPath(context),
|
|
|
+ String.valueOf(context.getTaskAttemptID()));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Compute the path where the output of a task attempt is stored until
|
|
|
+ * that task is committed.
|
|
|
+ *
|
|
|
+ * @param context the context of the task attempt.
|
|
|
+ * @param out The output path to put things in.
|
|
|
+ * @return the path where a task attempt should be stored.
|
|
|
+ */
|
|
|
+ public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) {
|
|
|
+ return new Path(getPendingTaskAttemptsPath(context, out),
|
|
|
+ String.valueOf(context.getTaskAttemptID()));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Compute the path where the output of a committed task is stored until
|
|
|
+ * the entire job is committed.
|
|
|
+ * @param context the context of the task attempt
|
|
|
+ * @return the path where the output of a committed task is stored until
|
|
|
+ * the entire job is committed.
|
|
|
+ */
|
|
|
+ public Path getCommittedTaskPath(TaskAttemptContext context) {
|
|
|
+ return getCommittedTaskPath(getAppAttemptId(context), context);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static Path getCommittedTaskPath(TaskAttemptContext context, Path out) {
|
|
|
+ return getCommittedTaskPath(getAppAttemptId(context), context, out);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Compute the path where the output of a committed task is stored until the
|
|
|
+ * entire job is committed for a specific application attempt.
|
|
|
+ * @param appAttemptId the id of the application attempt to use
|
|
|
+ * @param context the context of any task.
|
|
|
+ * @return the path where the output of a committed task is stored.
|
|
|
+ */
|
|
|
+ private Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) {
|
|
|
+ return new Path(getJobAttemptPath(appAttemptId),
|
|
|
+ String.valueOf(context.getTaskAttemptID().getTaskID()));
|
|
|
+ }
|
|
|
+
|
|
|
+ private static Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context, Path out) {
|
|
|
+ return new Path(getJobAttemptPath(appAttemptId, out),
|
|
|
+ String.valueOf(context.getTaskAttemptID().getTaskID()));
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class CommittedTaskFilter implements PathFilter {
|
|
|
+ @Override
|
|
|
+ public boolean accept(Path path) {
|
|
|
+ return !PENDING_DIR_NAME.equals(path.getName());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get a list of all paths where output from committed tasks are stored.
|
|
|
+ * @param context the context of the current job
|
|
|
+ * @return the list of these Paths/FileStatuses.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private FileStatus[] getAllCommittedTaskPaths(JobContext context)
|
|
|
+ throws IOException {
|
|
|
+ Path jobAttemptPath = getJobAttemptPath(context);
|
|
|
+ FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration());
|
|
|
+ return fs.listStatus(jobAttemptPath, new CommittedTaskFilter());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the directory that the task should write results into.
|
|
|
+ * @return the work directory
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public Path getWorkPath() throws IOException {
|
|
|
+ return workPath;
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Create the temporary directory that is the root of all of the task
|
|
@@ -79,116 +277,103 @@ public class FileOutputCommitter extends OutputCommitter {
|
|
|
* @param context the job's context
|
|
|
*/
|
|
|
public void setupJob(JobContext context) throws IOException {
|
|
|
- if (outputPath != null) {
|
|
|
- Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
|
|
|
- Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
|
|
|
- FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
|
|
|
- if (!fileSys.mkdirs(tmpDir)) {
|
|
|
- LOG.error("Mkdirs failed to create " + tmpDir.toString());
|
|
|
+ if (hasOutputPath()) {
|
|
|
+ Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
|
|
|
+ FileSystem fs = pendingJobAttemptsPath.getFileSystem(
|
|
|
+ context.getConfiguration());
|
|
|
+ if (!fs.mkdirs(pendingJobAttemptsPath)) {
|
|
|
+ LOG.error("Mkdirs failed to create " + pendingJobAttemptsPath);
|
|
|
}
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // True if the job requires output.dir marked on successful job.
|
|
|
- // Note that by default it is set to true.
|
|
|
- private boolean shouldMarkOutputDir(Configuration conf) {
|
|
|
- return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true);
|
|
|
- }
|
|
|
-
|
|
|
- // Create a _success file in the job's output dir
|
|
|
- private void markOutputDirSuccessful(MRJobConfig context) throws IOException {
|
|
|
- if (outputPath != null) {
|
|
|
- // create a file in the output folder to mark the job completion
|
|
|
- Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
|
|
|
- outputFileSystem.create(filePath).close();
|
|
|
+ } else {
|
|
|
+ LOG.warn("Output Path is null in setupJob()");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Move all job output to the final place.
|
|
|
+ * The job has completed so move all committed tasks to the final output dir.
|
|
|
* Delete the temporary directory, including all of the work directories.
|
|
|
* Create a _SUCCESS file to make it as successful.
|
|
|
* @param context the job's context
|
|
|
*/
|
|
|
public void commitJob(JobContext context) throws IOException {
|
|
|
- if (outputPath != null) {
|
|
|
- //delete the task temp directory from the current jobtempdir
|
|
|
- Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
|
|
|
- Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
|
|
|
- FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
|
|
|
- if (fileSys.exists(tmpDir)) {
|
|
|
- fileSys.delete(tmpDir, true);
|
|
|
- } else {
|
|
|
- LOG.warn("Task temp dir could not be deleted " + tmpDir);
|
|
|
+ if (hasOutputPath()) {
|
|
|
+ Path finalOutput = getOutputPath();
|
|
|
+ FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
|
|
|
+ for(FileStatus stat: getAllCommittedTaskPaths(context)) {
|
|
|
+ mergePaths(fs, stat, finalOutput);
|
|
|
}
|
|
|
|
|
|
- //move the job output to final place
|
|
|
- Path jobOutputPath =
|
|
|
- new Path(outputPath, getJobAttemptBaseDirName(context));
|
|
|
- moveJobOutputs(outputFileSystem, jobOutputPath, outputPath, jobOutputPath);
|
|
|
-
|
|
|
// delete the _temporary folder and create a _done file in the o/p folder
|
|
|
cleanupJob(context);
|
|
|
- if (shouldMarkOutputDir(context.getConfiguration())) {
|
|
|
- markOutputDirSuccessful(context);
|
|
|
+ // True if the job requires output.dir marked on successful job.
|
|
|
+ // Note that by default it is set to true.
|
|
|
+ if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
|
|
|
+ Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME);
|
|
|
+ fs.create(markerPath).close();
|
|
|
}
|
|
|
+ } else {
|
|
|
+ LOG.warn("Output Path is null in commitJob()");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Move job output to final location
|
|
|
- * @param fs Filesystem handle
|
|
|
- * @param origJobOutputPath The original location of the job output
|
|
|
- * Required to generate the relative path for correct moving of data.
|
|
|
- * @param finalOutputDir The final output directory to which the job output
|
|
|
- * needs to be moved
|
|
|
- * @param jobOutput The current job output directory being moved
|
|
|
- * @throws IOException
|
|
|
+ * Merge two paths together. Anything in from will be moved into to, if there
|
|
|
+ * are any name conflicts while merging the files or directories in from win.
|
|
|
+ * @param fs the File System to use
|
|
|
+ * @param from the path data is coming from.
|
|
|
+ * @param to the path data is going to.
|
|
|
+ * @throws IOException on any error
|
|
|
*/
|
|
|
- private void moveJobOutputs(FileSystem fs, final Path origJobOutputPath,
|
|
|
- Path finalOutputDir, Path jobOutput) throws IOException {
|
|
|
- LOG.debug("Told to move job output from " + jobOutput
|
|
|
- + " to " + finalOutputDir +
|
|
|
- " and orig job output path is " + origJobOutputPath);
|
|
|
- if (fs.isFile(jobOutput)) {
|
|
|
- Path finalOutputPath =
|
|
|
- getFinalPath(finalOutputDir, jobOutput, origJobOutputPath);
|
|
|
- if (!fs.rename(jobOutput, finalOutputPath)) {
|
|
|
- if (!fs.delete(finalOutputPath, true)) {
|
|
|
- throw new IOException("Failed to delete earlier output of job");
|
|
|
- }
|
|
|
- if (!fs.rename(jobOutput, finalOutputPath)) {
|
|
|
- throw new IOException("Failed to save output of job");
|
|
|
- }
|
|
|
- }
|
|
|
- LOG.debug("Moved job output file from " + jobOutput + " to " +
|
|
|
- finalOutputPath);
|
|
|
- } else if (fs.getFileStatus(jobOutput).isDirectory()) {
|
|
|
- LOG.debug("Job output file " + jobOutput + " is a dir");
|
|
|
- FileStatus[] paths = fs.listStatus(jobOutput);
|
|
|
- Path finalOutputPath =
|
|
|
- getFinalPath(finalOutputDir, jobOutput, origJobOutputPath);
|
|
|
- fs.mkdirs(finalOutputPath);
|
|
|
- LOG.debug("Creating dirs along job output path " + finalOutputPath);
|
|
|
- if (paths != null) {
|
|
|
- for (FileStatus path : paths) {
|
|
|
- moveJobOutputs(fs, origJobOutputPath, finalOutputDir, path.getPath());
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ private static void mergePaths(FileSystem fs, final FileStatus from,
|
|
|
+ final Path to)
|
|
|
+ throws IOException {
|
|
|
+ LOG.debug("Merging data from "+from+" to "+to);
|
|
|
+ if(from.isFile()) {
|
|
|
+ if(fs.exists(to)) {
|
|
|
+ if(!fs.delete(to, true)) {
|
|
|
+ throw new IOException("Failed to delete "+to);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if(!fs.rename(from.getPath(), to)) {
|
|
|
+ throw new IOException("Failed to rename "+from+" to "+to);
|
|
|
+ }
|
|
|
+ } else if(from.isDirectory()) {
|
|
|
+ if(fs.exists(to)) {
|
|
|
+ FileStatus toStat = fs.getFileStatus(to);
|
|
|
+ if(!toStat.isDirectory()) {
|
|
|
+ if(!fs.delete(to, true)) {
|
|
|
+ throw new IOException("Failed to delete "+to);
|
|
|
+ }
|
|
|
+ if(!fs.rename(from.getPath(), to)) {
|
|
|
+ throw new IOException("Failed to rename "+from+" to "+to);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ //It is a directory so merge everything in the directories
|
|
|
+ for(FileStatus subFrom: fs.listStatus(from.getPath())) {
|
|
|
+ Path subTo = new Path(to, subFrom.getPath().getName());
|
|
|
+ mergePaths(fs, subFrom, subTo);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ //it does not exist just rename
|
|
|
+ if(!fs.rename(from.getPath(), to)) {
|
|
|
+ throw new IOException("Failed to rename "+from+" to "+to);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
@Deprecated
|
|
|
public void cleanupJob(JobContext context) throws IOException {
|
|
|
- if (outputPath != null) {
|
|
|
- Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
|
|
|
- FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
|
|
|
- if (fileSys.exists(tmpDir)) {
|
|
|
- fileSys.delete(tmpDir, true);
|
|
|
- }
|
|
|
+ if (hasOutputPath()) {
|
|
|
+ Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
|
|
|
+ FileSystem fs = pendingJobAttemptsPath
|
|
|
+ .getFileSystem(context.getConfiguration());
|
|
|
+ fs.delete(pendingJobAttemptsPath, true);
|
|
|
} else {
|
|
|
- LOG.warn("Output Path is null in cleanup");
|
|
|
+ LOG.warn("Output Path is null in cleanupJob()");
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -217,69 +402,40 @@ public class FileOutputCommitter extends OutputCommitter {
|
|
|
* Move the files from the work directory to the job output directory
|
|
|
* @param context the task context
|
|
|
*/
|
|
|
+ @Override
|
|
|
public void commitTask(TaskAttemptContext context)
|
|
|
throws IOException {
|
|
|
- TaskAttemptID attemptId = context.getTaskAttemptID();
|
|
|
- if (workPath != null) {
|
|
|
- context.progress();
|
|
|
- if (outputFileSystem.exists(workPath)) {
|
|
|
- // Move the task outputs to the current job attempt output dir
|
|
|
- Path jobOutputPath =
|
|
|
- new Path(outputPath, getJobAttemptBaseDirName(context));
|
|
|
- moveTaskOutputs(context, outputFileSystem, jobOutputPath, workPath);
|
|
|
- // Delete the temporary task-specific output directory
|
|
|
- if (!outputFileSystem.delete(workPath, true)) {
|
|
|
- LOG.warn("Failed to delete the temporary output" +
|
|
|
- " directory of task: " + attemptId + " - " + workPath);
|
|
|
- }
|
|
|
- LOG.info("Saved output of task '" + attemptId + "' to " +
|
|
|
- jobOutputPath);
|
|
|
- }
|
|
|
- }
|
|
|
+ commitTask(context, null);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Move all of the files from the work directory to the final output
|
|
|
- * @param context the task context
|
|
|
- * @param fs the output file system
|
|
|
- * @param jobOutputDir the final output direcotry
|
|
|
- * @param taskOutput the work path
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- private void moveTaskOutputs(TaskAttemptContext context,
|
|
|
- FileSystem fs,
|
|
|
- Path jobOutputDir,
|
|
|
- Path taskOutput)
|
|
|
+ @Private
|
|
|
+ public void commitTask(TaskAttemptContext context, Path taskAttemptPath)
|
|
|
throws IOException {
|
|
|
TaskAttemptID attemptId = context.getTaskAttemptID();
|
|
|
- context.progress();
|
|
|
- LOG.debug("Told to move taskoutput from " + taskOutput
|
|
|
- + " to " + jobOutputDir);
|
|
|
- if (fs.isFile(taskOutput)) {
|
|
|
- Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput,
|
|
|
- workPath);
|
|
|
- if (!fs.rename(taskOutput, finalOutputPath)) {
|
|
|
- if (!fs.delete(finalOutputPath, true)) {
|
|
|
- throw new IOException("Failed to delete earlier output of task: " +
|
|
|
- attemptId);
|
|
|
- }
|
|
|
- if (!fs.rename(taskOutput, finalOutputPath)) {
|
|
|
- throw new IOException("Failed to save output of task: " +
|
|
|
- attemptId);
|
|
|
- }
|
|
|
+ if (hasOutputPath()) {
|
|
|
+ context.progress();
|
|
|
+ if(taskAttemptPath == null) {
|
|
|
+ taskAttemptPath = getTaskAttemptPath(context);
|
|
|
}
|
|
|
- LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
|
|
|
- } else if(fs.getFileStatus(taskOutput).isDirectory()) {
|
|
|
- LOG.debug("Taskoutput " + taskOutput + " is a dir");
|
|
|
- FileStatus[] paths = fs.listStatus(taskOutput);
|
|
|
- Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, workPath);
|
|
|
- fs.mkdirs(finalOutputPath);
|
|
|
- LOG.debug("Creating dirs along path " + finalOutputPath);
|
|
|
- if (paths != null) {
|
|
|
- for (FileStatus path : paths) {
|
|
|
- moveTaskOutputs(context, fs, jobOutputDir, path.getPath());
|
|
|
+ Path committedTaskPath = getCommittedTaskPath(context);
|
|
|
+ FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
|
|
|
+ if (fs.exists(taskAttemptPath)) {
|
|
|
+ if(fs.exists(committedTaskPath)) {
|
|
|
+ if(!fs.delete(committedTaskPath, true)) {
|
|
|
+ throw new IOException("Could not delete " + committedTaskPath);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if(!fs.rename(taskAttemptPath, committedTaskPath)) {
|
|
|
+ throw new IOException("Could not rename " + taskAttemptPath + " to "
|
|
|
+ + committedTaskPath);
|
|
|
}
|
|
|
+ LOG.info("Saved output of task '" + attemptId + "' to " +
|
|
|
+ committedTaskPath);
|
|
|
+ } else {
|
|
|
+ LOG.warn("No Output found for " + attemptId);
|
|
|
}
|
|
|
+ } else {
|
|
|
+ LOG.warn("Output Path is null in commitTask()");
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -289,38 +445,22 @@ public class FileOutputCommitter extends OutputCommitter {
|
|
|
*/
|
|
|
@Override
|
|
|
public void abortTask(TaskAttemptContext context) throws IOException {
|
|
|
- if (workPath != null) {
|
|
|
- context.progress();
|
|
|
- outputFileSystem.delete(workPath, true);
|
|
|
- }
|
|
|
+ abortTask(context, null);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Find the final name of a given output file, given the job output directory
|
|
|
- * and the work directory.
|
|
|
- * @param jobOutputDir the job's output directory
|
|
|
- * @param taskOutput the specific task output file
|
|
|
- * @param taskOutputPath the job's work directory
|
|
|
- * @return the final path for the specific output file
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- private Path getFinalPath(Path jobOutputDir, Path taskOutput,
|
|
|
- Path taskOutputPath) throws IOException {
|
|
|
- URI taskOutputUri = taskOutput.makeQualified(outputFileSystem.getUri(),
|
|
|
- outputFileSystem.getWorkingDirectory()).toUri();
|
|
|
- URI taskOutputPathUri =
|
|
|
- taskOutputPath.makeQualified(
|
|
|
- outputFileSystem.getUri(),
|
|
|
- outputFileSystem.getWorkingDirectory()).toUri();
|
|
|
- URI relativePath = taskOutputPathUri.relativize(taskOutputUri);
|
|
|
- if (taskOutputUri == relativePath) {
|
|
|
- throw new IOException("Can not get the relative path: base = " +
|
|
|
- taskOutputPathUri + " child = " + taskOutputUri);
|
|
|
- }
|
|
|
- if (relativePath.getPath().length() > 0) {
|
|
|
- return new Path(jobOutputDir, relativePath.getPath());
|
|
|
+ @Private
|
|
|
+ public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException {
|
|
|
+ if (hasOutputPath()) {
|
|
|
+ context.progress();
|
|
|
+ if(taskAttemptPath == null) {
|
|
|
+ taskAttemptPath = getTaskAttemptPath(context);
|
|
|
+ }
|
|
|
+ FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
|
|
|
+ if(!fs.delete(taskAttemptPath, true)) {
|
|
|
+ LOG.warn("Could not delete "+taskAttemptPath);
|
|
|
+ }
|
|
|
} else {
|
|
|
- return jobOutputDir;
|
|
|
+ LOG.warn("Output Path is null in abortTask()");
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -331,16 +471,20 @@ public class FileOutputCommitter extends OutputCommitter {
|
|
|
@Override
|
|
|
public boolean needsTaskCommit(TaskAttemptContext context
|
|
|
) throws IOException {
|
|
|
- return workPath != null && outputFileSystem.exists(workPath);
|
|
|
+ return needsTaskCommit(context, null);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Get the directory that the task should write results into
|
|
|
- * @return the work directory
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- public Path getWorkPath() throws IOException {
|
|
|
- return workPath;
|
|
|
+ @Private
|
|
|
+ public boolean needsTaskCommit(TaskAttemptContext context, Path taskAttemptPath
|
|
|
+ ) throws IOException {
|
|
|
+ if(hasOutputPath()) {
|
|
|
+ if(taskAttemptPath == null) {
|
|
|
+ taskAttemptPath = getTaskAttemptPath(context);
|
|
|
+ }
|
|
|
+ FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
|
|
|
+ return fs.exists(taskAttemptPath);
|
|
|
+ }
|
|
|
+ return false;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -352,43 +496,35 @@ public class FileOutputCommitter extends OutputCommitter {
|
|
|
public void recoverTask(TaskAttemptContext context)
|
|
|
throws IOException {
|
|
|
context.progress();
|
|
|
- Path jobOutputPath =
|
|
|
- new Path(outputPath, getJobAttemptBaseDirName(context));
|
|
|
- int previousAttempt =
|
|
|
- context.getConfiguration().getInt(
|
|
|
- MRJobConfig.APPLICATION_ATTEMPT_ID, 0) - 1;
|
|
|
+ TaskAttemptID attemptId = context.getTaskAttemptID();
|
|
|
+ int previousAttempt = getAppAttemptId(context) - 1;
|
|
|
if (previousAttempt < 0) {
|
|
|
throw new IOException ("Cannot recover task output for first attempt...");
|
|
|
}
|
|
|
-
|
|
|
- Path pathToRecover =
|
|
|
- new Path(outputPath, getJobAttemptBaseDirName(previousAttempt));
|
|
|
- LOG.debug("Trying to recover task from " + pathToRecover
|
|
|
- + " into " + jobOutputPath);
|
|
|
- if (outputFileSystem.exists(pathToRecover)) {
|
|
|
- // Move the task outputs to their final place
|
|
|
- moveJobOutputs(outputFileSystem,
|
|
|
- pathToRecover, jobOutputPath, pathToRecover);
|
|
|
- LOG.info("Saved output of job to " + jobOutputPath);
|
|
|
+
|
|
|
+ Path committedTaskPath = getCommittedTaskPath(context);
|
|
|
+ Path previousCommittedTaskPath = getCommittedTaskPath(
|
|
|
+ previousAttempt, context);
|
|
|
+ FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration());
|
|
|
+
|
|
|
+ LOG.debug("Trying to recover task from " + previousCommittedTaskPath
|
|
|
+ + " into " + committedTaskPath);
|
|
|
+ if (fs.exists(previousCommittedTaskPath)) {
|
|
|
+ if(fs.exists(committedTaskPath)) {
|
|
|
+ if(!fs.delete(committedTaskPath, true)) {
|
|
|
+ throw new IOException("Could not delete "+committedTaskPath);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //Rename can fail if the parent directory does not yet exist.
|
|
|
+ Path committedParent = committedTaskPath.getParent();
|
|
|
+ fs.mkdirs(committedParent);
|
|
|
+ if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
|
|
|
+ throw new IOException("Could not rename " + previousCommittedTaskPath +
|
|
|
+ " to " + committedTaskPath);
|
|
|
+ }
|
|
|
+ LOG.info("Saved output of " + attemptId + " to " + committedTaskPath);
|
|
|
+ } else {
|
|
|
+ LOG.warn(attemptId+" had no output to recover.");
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- protected static String getJobAttemptBaseDirName(JobContext context) {
|
|
|
- int appAttemptId =
|
|
|
- context.getConfiguration().getInt(
|
|
|
- MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
|
|
|
- return getJobAttemptBaseDirName(appAttemptId);
|
|
|
- }
|
|
|
-
|
|
|
- protected static String getJobAttemptBaseDirName(int appAttemptId) {
|
|
|
- return FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
|
|
|
- + appAttemptId;
|
|
|
- }
|
|
|
-
|
|
|
- protected static String getTaskAttemptBaseDirName(
|
|
|
- TaskAttemptContext context) {
|
|
|
- return getJobAttemptBaseDirName(context) + Path.SEPARATOR +
|
|
|
- FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
|
|
|
- "_" + context.getTaskAttemptID().toString();
|
|
|
- }
|
|
|
}
|