|
@@ -49,6 +49,18 @@ public class TestTaskFail extends TestCase {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ static class CommitterWithFailTaskCleanup extends FileOutputCommitter {
|
|
|
|
+ public void abortTask(TaskAttemptContext context) throws IOException {
|
|
|
|
+ System.exit(-1);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ static class CommitterWithFailTaskCleanup2 extends FileOutputCommitter {
|
|
|
|
+ public void abortTask(TaskAttemptContext context) throws IOException {
|
|
|
|
+ throw new IOException();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
public RunningJob launchJob(JobConf conf,
|
|
public RunningJob launchJob(JobConf conf,
|
|
Path inDir,
|
|
Path inDir,
|
|
Path outDir,
|
|
Path outDir,
|
|
@@ -79,7 +91,34 @@ public class TestTaskFail extends TestCase {
|
|
// return the RunningJob handle.
|
|
// return the RunningJob handle.
|
|
return new JobClient(conf).submitJob(conf);
|
|
return new JobClient(conf).submitJob(conf);
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ private void validateJob(RunningJob job, MiniMRCluster mr)
|
|
|
|
+ throws IOException {
|
|
|
|
+ assertEquals(JobStatus.SUCCEEDED, job.getJobState());
|
|
|
|
+
|
|
|
|
+ JobID jobId = job.getID();
|
|
|
|
+ // construct the task id of first map task
|
|
|
|
+ TaskAttemptID attemptId =
|
|
|
|
+ new TaskAttemptID(new TaskID(jobId, true, 0), 0);
|
|
|
|
+ TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
|
|
|
|
+ getTip(attemptId.getTaskID());
|
|
|
|
+ // this should not be cleanup attempt since the first attempt
|
|
|
|
+ // fails with an exception
|
|
|
|
+ assertTrue(!tip.isCleanupAttempt(attemptId));
|
|
|
|
+ TaskStatus ts =
|
|
|
|
+ mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
|
|
|
|
+ assertTrue(ts != null);
|
|
|
|
+ assertEquals(TaskStatus.State.FAILED, ts.getRunState());
|
|
|
|
+
|
|
|
|
+ attemptId = new TaskAttemptID(new TaskID(jobId, true, 0), 1);
|
|
|
|
+ // this should be cleanup attempt since the second attempt fails
|
|
|
|
+ // with System.exit
|
|
|
|
+ assertTrue(tip.isCleanupAttempt(attemptId));
|
|
|
|
+ ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
|
|
|
|
+ assertTrue(ts != null);
|
|
|
|
+ assertEquals(TaskStatus.State.FAILED, ts.getRunState());
|
|
|
|
+ }
|
|
|
|
+
|
|
public void testWithDFS() throws IOException {
|
|
public void testWithDFS() throws IOException {
|
|
MiniDFSCluster dfs = null;
|
|
MiniDFSCluster dfs = null;
|
|
MiniMRCluster mr = null;
|
|
MiniMRCluster mr = null;
|
|
@@ -91,39 +130,25 @@ public class TestTaskFail extends TestCase {
|
|
dfs = new MiniDFSCluster(conf, 4, true, null);
|
|
dfs = new MiniDFSCluster(conf, 4, true, null);
|
|
fileSys = dfs.getFileSystem();
|
|
fileSys = dfs.getFileSystem();
|
|
mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1);
|
|
mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1);
|
|
- JobConf jobConf = mr.createJobConf();
|
|
|
|
final Path inDir = new Path("./input");
|
|
final Path inDir = new Path("./input");
|
|
final Path outDir = new Path("./output");
|
|
final Path outDir = new Path("./output");
|
|
String input = "The quick brown fox\nhas many silly\nred fox sox\n";
|
|
String input = "The quick brown fox\nhas many silly\nred fox sox\n";
|
|
- RunningJob job = null;
|
|
|
|
-
|
|
|
|
- job = launchJob(jobConf, inDir, outDir, input);
|
|
|
|
- // wait for the job to finish.
|
|
|
|
- while (!job.isComplete());
|
|
|
|
- assertEquals(JobStatus.SUCCEEDED, job.getJobState());
|
|
|
|
-
|
|
|
|
- JobID jobId = job.getID();
|
|
|
|
- // construct the task id of first map task
|
|
|
|
- TaskAttemptID attemptId =
|
|
|
|
- new TaskAttemptID(new TaskID(jobId, true, 0), 0);
|
|
|
|
- TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
|
|
|
|
- getTip(attemptId.getTaskID());
|
|
|
|
- // this should not be cleanup attempt since the first attempt
|
|
|
|
- // fails with an exception
|
|
|
|
- assertTrue(!tip.isCleanupAttempt(attemptId));
|
|
|
|
- TaskStatus ts =
|
|
|
|
- mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
|
|
|
|
- assertTrue(ts != null);
|
|
|
|
- assertEquals(TaskStatus.State.FAILED, ts.getRunState());
|
|
|
|
-
|
|
|
|
- attemptId = new TaskAttemptID(new TaskID(jobId, true, 0), 1);
|
|
|
|
- // this should be cleanup attempt since the second attempt fails
|
|
|
|
- // with System.exit
|
|
|
|
- assertTrue(tip.isCleanupAttempt(attemptId));
|
|
|
|
- ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
|
|
|
|
- assertTrue(ts != null);
|
|
|
|
- assertEquals(TaskStatus.State.FAILED, ts.getRunState());
|
|
|
|
-
|
|
|
|
|
|
+ // launch job with fail tasks
|
|
|
|
+ RunningJob rJob = launchJob(mr.createJobConf(), inDir, outDir, input);
|
|
|
|
+ rJob.waitForCompletion();
|
|
|
|
+ validateJob(rJob, mr);
|
|
|
|
+ // launch job with fail tasks and fail-cleanups
|
|
|
|
+ JobConf jobConf = mr.createJobConf();
|
|
|
|
+ fileSys.delete(outDir, true);
|
|
|
|
+ jobConf.setOutputCommitter(CommitterWithFailTaskCleanup.class);
|
|
|
|
+ rJob = launchJob(jobConf, inDir, outDir, input);
|
|
|
|
+ rJob.waitForCompletion();
|
|
|
|
+ validateJob(rJob, mr);
|
|
|
|
+ fileSys.delete(outDir, true);
|
|
|
|
+ jobConf.setOutputCommitter(CommitterWithFailTaskCleanup2.class);
|
|
|
|
+ rJob = launchJob(jobConf, inDir, outDir, input);
|
|
|
|
+ rJob.waitForCompletion();
|
|
|
|
+ validateJob(rJob, mr);
|
|
} finally {
|
|
} finally {
|
|
if (dfs != null) { dfs.shutdown(); }
|
|
if (dfs != null) { dfs.shutdown(); }
|
|
if (mr != null) { mr.shutdown(); }
|
|
if (mr != null) { mr.shutdown(); }
|