|
@@ -31,6 +31,7 @@ import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.conf.Configurable;
|
|
import org.apache.hadoop.conf.Configurable;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
|
+import org.apache.hadoop.fs.ContentSummary;
|
|
import org.apache.hadoop.dfs.DistributedFileSystem;
|
|
import org.apache.hadoop.dfs.DistributedFileSystem;
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
@@ -472,7 +473,29 @@ abstract class Task implements Writable, Configurable {
|
|
Thread.currentThread().interrupt(); // interrupt ourself
|
|
Thread.currentThread().interrupt(); // interrupt ourself
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- umbilical.done(taskId);
|
|
|
|
|
|
+ // Check whether there is any task output
|
|
|
|
+ boolean shouldBePromoted = false;
|
|
|
|
+ try {
|
|
|
|
+ if (taskOutputPath != null) {
|
|
|
|
+ // Get the file-system for the task output directory
|
|
|
|
+ FileSystem fs = taskOutputPath.getFileSystem(conf);
|
|
|
|
+ if (fs.exists(taskOutputPath)) {
|
|
|
|
+ // Get the summary for the folder
|
|
|
|
+ ContentSummary summary = fs.getContentSummary(taskOutputPath);
|
|
|
|
+ // Check if the directory contains data to be promoted
|
|
|
|
+ // i.e total-files + total-folders - 1(itself)
|
|
|
|
+ if (summary != null
|
|
|
|
+ && (summary.getFileCount() + summary.getDirectoryCount() - 1)
|
|
|
|
+ > 0) {
|
|
|
|
+ shouldBePromoted = true;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ // To be safe in case of an exception
|
|
|
|
+ shouldBePromoted = true;
|
|
|
|
+ }
|
|
|
|
+ umbilical.done(taskId, shouldBePromoted);
|
|
LOG.info("Task '" + getTaskId() + "' done.");
|
|
LOG.info("Task '" + getTaskId() + "' done.");
|
|
return;
|
|
return;
|
|
} catch (IOException ie) {
|
|
} catch (IOException ie) {
|
|
@@ -576,22 +599,4 @@ abstract class Task implements Writable, Configurable {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Discard the task's output on failure.
|
|
|
|
- *
|
|
|
|
- * @throws IOException
|
|
|
|
- */
|
|
|
|
- void discardTaskOutput() throws IOException {
|
|
|
|
- if (taskOutputPath != null) {
|
|
|
|
- FileSystem fs = taskOutputPath.getFileSystem(conf);
|
|
|
|
- if (fs.exists(taskOutputPath)) {
|
|
|
|
- // Delete the temporary task-specific output directory
|
|
|
|
- FileUtil.fullyDelete(fs, taskOutputPath);
|
|
|
|
- LOG.info("Discarded output of task '" + getTaskId() + "' - "
|
|
|
|
- + taskOutputPath);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
}
|
|
}
|