|
@@ -192,6 +192,46 @@ public class TestSpeculativeExecution {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public static class FailOnceMapper extends
|
|
|
+ Mapper<Object, Text, Text, IntWritable> {
|
|
|
+
|
|
|
+ public void map(Object key, Text value, Context context)
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+ TaskAttemptID taid = context.getTaskAttemptID();
|
|
|
+ try{
|
|
|
+ Thread.sleep(2000);
|
|
|
+ } catch(InterruptedException ie) {
|
|
|
+ // Ignore
|
|
|
+ }
|
|
|
+ // Fail mapper only for first attempt
|
|
|
+ if (taid.getId() == 0) {
|
|
|
+ throw new RuntimeException("Failing this mapper");
|
|
|
+ }
|
|
|
+
|
|
|
+ context.write(value, new IntWritable(1));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public static class FailOnceReducer extends
|
|
|
+ Reducer<Text, IntWritable, Text, IntWritable> {
|
|
|
+
|
|
|
+ public void reduce(Text key, Iterable<IntWritable> values,
|
|
|
+ Context context) throws IOException, InterruptedException {
|
|
|
+ TaskAttemptID taid = context.getTaskAttemptID();
|
|
|
+ try{
|
|
|
+ Thread.sleep(2000);
|
|
|
+ } catch(InterruptedException ie) {
|
|
|
+ // Ignore
|
|
|
+ }
|
|
|
+ // Fail reduce only for first attempt
|
|
|
+ if (taid.getId() == 0) {
|
|
|
+ throw new RuntimeException("Failing this reducer");
|
|
|
+ }
|
|
|
+ context.write(key, new IntWritable(0));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
@Test
|
|
|
public void testSpeculativeExecution() throws Exception {
|
|
|
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
|
|
@@ -218,6 +258,30 @@ public class TestSpeculativeExecution {
|
|
|
Assert.assertEquals(0, counters.findCounter(JobCounter.NUM_FAILED_MAPS)
|
|
|
.getValue());
|
|
|
|
|
|
+
|
|
|
+ /*------------------------------------------------------------------
|
|
|
+ * Test that Map/Red does not speculate if MAP_SPECULATIVE and
|
|
|
+ * REDUCE_SPECULATIVE are both false. When map tasks fail once and time out,
|
|
|
+ * we shouldn't launch two simultaneous attempts. MAPREDUCE-7278
|
|
|
+ * -----------------------------------------------------------------
|
|
|
+ */
|
|
|
+ job = runNonSpecFailOnceTest();
|
|
|
+
|
|
|
+ succeeded = job.waitForCompletion(true);
|
|
|
+ Assert.assertTrue(succeeded);
|
|
|
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
|
|
|
+ counters = job.getCounters();
|
|
|
+ // We will have 4 total since 2 map tasks fail and relaunch attempt once
|
|
|
+ Assert.assertEquals(4,
|
|
|
+ counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS).getValue());
|
|
|
+ Assert.assertEquals(4,
|
|
|
+ counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue());
|
|
|
+ // Ensure no maps or reduces killed due to accidental speculation
|
|
|
+ Assert.assertEquals(0,
|
|
|
+ counters.findCounter(JobCounter.NUM_KILLED_MAPS).getValue());
|
|
|
+ Assert.assertEquals(0,
|
|
|
+ counters.findCounter(JobCounter.NUM_KILLED_REDUCES).getValue());
|
|
|
+
|
|
|
/*----------------------------------------------------------------------
|
|
|
* Test that Mapper speculates if MAP_SPECULATIVE is true and
|
|
|
* REDUCE_SPECULATIVE is false.
|
|
@@ -295,7 +359,48 @@ public class TestSpeculativeExecution {
|
|
|
|
|
|
// Delete output directory if it exists.
|
|
|
try {
|
|
|
- localFs.delete(TEST_OUT_DIR,true);
|
|
|
+ localFs.delete(TEST_OUT_DIR, true);
|
|
|
+ } catch (IOException e) {
|
|
|
+ // ignore
|
|
|
+ }
|
|
|
+
|
|
|
+ // Creates the Job Configuration
|
|
|
+ job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
|
|
|
+ job.setMaxMapAttempts(2);
|
|
|
+
|
|
|
+ job.submit();
|
|
|
+
|
|
|
+ return job;
|
|
|
+ }
|
|
|
+
|
|
|
+ private Job runNonSpecFailOnceTest()
|
|
|
+ throws IOException, ClassNotFoundException, InterruptedException {
|
|
|
+
|
|
|
+ Path first = createTempFile("specexec_map_input1", "a\nz");
|
|
|
+ Path secnd = createTempFile("specexec_map_input2", "a\nz");
|
|
|
+
|
|
|
+ Configuration conf = mrCluster.getConfig();
|
|
|
+ conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
|
|
|
+ conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
|
|
|
+ // Prevent blacklisting since tasks fail once
|
|
|
+ conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, false);
|
|
|
+ // Setting small task exit timeout values reproduces MAPREDUCE-7278
|
|
|
+ conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT, 20);
|
|
|
+ conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS, 10);
|
|
|
+ Job job = Job.getInstance(conf);
|
|
|
+ job.setJarByClass(TestSpeculativeExecution.class);
|
|
|
+ job.setMapperClass(FailOnceMapper.class);
|
|
|
+ job.setReducerClass(FailOnceReducer.class);
|
|
|
+ job.setOutputKeyClass(Text.class);
|
|
|
+ job.setOutputValueClass(IntWritable.class);
|
|
|
+ job.setNumReduceTasks(2);
|
|
|
+ FileInputFormat.setInputPaths(job, first);
|
|
|
+ FileInputFormat.addInputPath(job, secnd);
|
|
|
+ FileOutputFormat.setOutputPath(job, TEST_OUT_DIR);
|
|
|
+
|
|
|
+ // Delete output directory if it exists.
|
|
|
+ try {
|
|
|
+ localFs.delete(TEST_OUT_DIR, true);
|
|
|
} catch (IOException e) {
|
|
|
// ignore
|
|
|
}
|