|
@@ -29,6 +29,7 @@ import java.util.List;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.io.Text;
|
|
@@ -136,6 +137,20 @@ import org.apache.hadoop.util.ToolRunner;
|
|
|
@InterfaceAudience.Public
|
|
|
@InterfaceStability.Stable
|
|
|
public class JobClient extends CLI {
|
|
|
+
|
|
|
+ @InterfaceAudience.Private
|
|
|
+ public static final String MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY =
|
|
|
+ "mapreduce.jobclient.retry.policy.enabled";
|
|
|
+ @InterfaceAudience.Private
|
|
|
+ public static final boolean MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT =
|
|
|
+ false;
|
|
|
+ @InterfaceAudience.Private
|
|
|
+ public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY =
|
|
|
+ "mapreduce.jobclient.retry.policy.spec";
|
|
|
+ @InterfaceAudience.Private
|
|
|
+ public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
|
|
|
+ "10000,6,60000,10"; // t1,n1,t2,n2,...
|
|
|
+
|
|
|
public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
|
|
|
private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED;
|
|
|
|
|
@@ -525,6 +540,12 @@ public class JobClient extends CLI {
|
|
|
*/
|
|
|
public RunningJob submitJob(final JobConf conf) throws FileNotFoundException,
|
|
|
IOException {
|
|
|
+ return submitJobInternal(conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ @InterfaceAudience.Private
|
|
|
+ public RunningJob submitJobInternal(final JobConf conf)
|
|
|
+ throws FileNotFoundException, IOException {
|
|
|
try {
|
|
|
conf.setBooleanIfUnset("mapred.mapper.new-api", false);
|
|
|
conf.setBooleanIfUnset("mapred.reducer.new-api", false);
|
|
@@ -956,6 +977,50 @@ public class JobClient extends CLI {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Checks if the job directory is clean and has all the required components
|
|
|
+ * for (re) starting the job
|
|
|
+ */
|
|
|
+ public static boolean isJobDirValid(Path jobDirPath, FileSystem fs)
|
|
|
+ throws IOException {
|
|
|
+ FileStatus[] contents = fs.listStatus(jobDirPath);
|
|
|
+ int matchCount = 0;
|
|
|
+ if (contents != null && contents.length >= 2) {
|
|
|
+ for (FileStatus status : contents) {
|
|
|
+ if ("job.xml".equals(status.getPath().getName())) {
|
|
|
+ ++matchCount;
|
|
|
+ }
|
|
|
+ if ("job.split".equals(status.getPath().getName())) {
|
|
|
+ ++matchCount;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (matchCount == 2) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Fetch the staging area directory for the application
|
|
|
+ *
|
|
|
+ * @return path to staging area directory
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public Path getStagingAreaDir() throws IOException {
|
|
|
+ try {
|
|
|
+ return clientUgi.doAs(new PrivilegedExceptionAction<Path>() {
|
|
|
+ @Override
|
|
|
+ public Path run() throws IOException, InterruptedException {
|
|
|
+ return cluster.getStagingAreaDir();
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ // throw RuntimeException instead for compatibility reasons
|
|
|
+ throw new RuntimeException(ie);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private JobQueueInfo getJobQueueInfo(QueueInfo queue) {
|
|
|
JobQueueInfo ret = new JobQueueInfo(queue);
|
|
|
// make sure to convert any children
|