|
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
|
|
|
|
|
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
|
|
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
|
|
+import org.apache.hadoop.util.DurationInfo;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -454,43 +455,44 @@ public class FileOutputCommitter extends PathOutputCommitter {
|
|
|
*/
|
|
|
private void mergePaths(FileSystem fs, final FileStatus from,
|
|
|
final Path to, JobContext context) throws IOException {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Merging data from " + from + " to " + to);
|
|
|
- }
|
|
|
- reportProgress(context);
|
|
|
- FileStatus toStat;
|
|
|
- try {
|
|
|
- toStat = fs.getFileStatus(to);
|
|
|
- } catch (FileNotFoundException fnfe) {
|
|
|
- toStat = null;
|
|
|
- }
|
|
|
-
|
|
|
- if (from.isFile()) {
|
|
|
- if (toStat != null) {
|
|
|
- if (!fs.delete(to, true)) {
|
|
|
- throw new IOException("Failed to delete " + to);
|
|
|
- }
|
|
|
+ try (DurationInfo d = new DurationInfo(LOG,
|
|
|
+ false,
|
|
|
+ "Merging data from %s to %s", from, to)) {
|
|
|
+ reportProgress(context);
|
|
|
+ FileStatus toStat;
|
|
|
+ try {
|
|
|
+ toStat = fs.getFileStatus(to);
|
|
|
+ } catch (FileNotFoundException fnfe) {
|
|
|
+ toStat = null;
|
|
|
}
|
|
|
|
|
|
- if (!fs.rename(from.getPath(), to)) {
|
|
|
- throw new IOException("Failed to rename " + from + " to " + to);
|
|
|
- }
|
|
|
- } else if (from.isDirectory()) {
|
|
|
- if (toStat != null) {
|
|
|
- if (!toStat.isDirectory()) {
|
|
|
+ if (from.isFile()) {
|
|
|
+ if (toStat != null) {
|
|
|
if (!fs.delete(to, true)) {
|
|
|
throw new IOException("Failed to delete " + to);
|
|
|
}
|
|
|
- renameOrMerge(fs, from, to, context);
|
|
|
- } else {
|
|
|
- //It is a directory so merge everything in the directories
|
|
|
- for (FileStatus subFrom : fs.listStatus(from.getPath())) {
|
|
|
- Path subTo = new Path(to, subFrom.getPath().getName());
|
|
|
- mergePaths(fs, subFrom, subTo, context);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!fs.rename(from.getPath(), to)) {
|
|
|
+ throw new IOException("Failed to rename " + from + " to " + to);
|
|
|
+ }
|
|
|
+ } else if (from.isDirectory()) {
|
|
|
+ if (toStat != null) {
|
|
|
+ if (!toStat.isDirectory()) {
|
|
|
+ if (!fs.delete(to, true)) {
|
|
|
+ throw new IOException("Failed to delete " + to);
|
|
|
+ }
|
|
|
+ renameOrMerge(fs, from, to, context);
|
|
|
+ } else {
|
|
|
+ //It is a directory so merge everything in the directories
|
|
|
+ for (FileStatus subFrom : fs.listStatus(from.getPath())) {
|
|
|
+ Path subTo = new Path(to, subFrom.getPath().getName());
|
|
|
+ mergePaths(fs, subFrom, subTo, context);
|
|
|
+ }
|
|
|
}
|
|
|
+ } else {
|
|
|
+ renameOrMerge(fs, from, to, context);
|
|
|
}
|
|
|
- } else {
|
|
|
- renameOrMerge(fs, from, to, context);
|
|
|
}
|
|
|
}
|
|
|
}
|