Prechádzať zdrojové kódy

MAPREDUCE-3404. Corrected MR AM to honor speculative configuration and enable speculating either maps or reduces. Contributed by Eric Payne.
svn merge --ignore-ancestry -c 1231395 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1231397 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli 13 rokov pred
rodič
commit
9febd5869c

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -433,6 +433,9 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3532. Modified NM to report correct http address when an ephemeral
     web port is configured. (Bhallamudi Venkata Siva Kamesh via vinodkv)
 
+    MAPREDUCE-3404. Corrected MR AM to honor speculative configuration and enable
+    speculating either maps or reduces. (Eric Payne via vinodkv)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

+ 26 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

@@ -258,7 +258,7 @@ public class MRAppMaster extends CompositeService {
     dispatcher.register(TaskAttemptEventType.class, 
         new TaskAttemptEventDispatcher());
     dispatcher.register(TaskCleaner.EventType.class, taskCleaner);
-    
+   
     if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
         || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
       //optional service to speculate on task attempts' progress
@@ -881,9 +881,31 @@ public class MRAppMaster extends CompositeService {
     }
     @Override
     public void handle(SpeculatorEvent event) {
-      if (!disabled && 
-          (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
-          || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false))) {
+      if (disabled) {
+        return;
+      }
+
+      TaskId tId = event.getTaskID();
+      TaskType tType = null;
+      /* event's TaskId will be null if the event type is JOB_CREATE or
+       * ATTEMPT_STATUS_UPDATE
+       */
+      if (tId != null) {
+        tType = tId.getTaskType(); 
+      }
+      boolean shouldMapSpec =
+              conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false);
+      boolean shouldReduceSpec =
+              conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+
+      /* The point of the following is to allow the MAP and REDUCE speculative
+       * config values to be independent:
+       * IF spec-exec is turned on for maps AND the task is a map task
+       * OR IF spec-exec is turned on for reduces AND the task is a reduce task
+       * THEN call the speculator to handle the event.
+       */
+      if ( (shouldMapSpec && (tType == null || tType == TaskType.MAP))
+        || (shouldReduceSpec && (tType == null || tType == TaskType.REDUCE))) {
         // Speculator IS enabled, direct the event to there.
         speculator.handle(event);
       }

+ 309 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java

@@ -0,0 +1,309 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSpeculativeExecution {
+
+  /*
+   * This class is used to control when speculative execution happens.
+   */
+  public static class TestSpecEstimator extends LegacyTaskRuntimeEstimator {
+    private static final long SPECULATE_THIS = 999999L;
+
+    public TestSpecEstimator() {
+      super();
+    }
+
+    /*
+     * This will only be called if speculative execution is turned on.
+     * 
+     * If either mapper or reducer speculation is turned on, this will be
+     * called.
+     * 
+     * This will cause speculation to engage for the first mapper or first
+     * reducer (that is, attempt ID "*_m_000000_0" or "*_r_000000_0")
+     * 
+     * If this attempt is killed, the retry will have attempt id 1, so it
+     * will not engage speculation again.
+     */
+    @Override
+    public long estimatedRuntime(TaskAttemptId id) {
+      if ((id.getTaskId().getId() == 0) && (id.getId() == 0)) {
+        return SPECULATE_THIS;
+      }
+      return super.estimatedRuntime(id);
+    }
+  }
+
+  private static final Log LOG = LogFactory.getLog(TestSpeculativeExecution.class);
+
+  protected static MiniMRYarnCluster mrCluster;
+
+  private static Configuration initialConf = new Configuration();
+  private static FileSystem localFs;
+  static {
+    try {
+      localFs = FileSystem.getLocal(initialConf);
+    } catch (IOException io) {
+      throw new RuntimeException("problem getting local fs", io);
+    }
+  }
+
+  private static Path TEST_ROOT_DIR =
+          new Path("target",TestSpeculativeExecution.class.getName() + "-tmpDir")
+               .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+  static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
+  private static Path TEST_OUT_DIR = new Path(TEST_ROOT_DIR, "test.out.dir");
+
+  @BeforeClass
+  public static void setup() throws IOException {
+
+    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+               + " not found. Not running test.");
+      return;
+    }
+
+    if (mrCluster == null) {
+      mrCluster = new MiniMRYarnCluster(TestSpeculativeExecution.class.getName(), 4);
+      Configuration conf = new Configuration();
+      mrCluster.init(conf);
+      mrCluster.start();
+    }
+
+    // workaround the absent public distcache.
+    localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
+    localFs.setPermission(APP_JAR, new FsPermission("700"));
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    if (mrCluster != null) {
+      mrCluster.stop();
+      mrCluster = null;
+    }
+  }
+
+  public static class SpeculativeMapper extends
+    Mapper<Object, Text, Text, IntWritable> {
+
+    public void map(Object key, Text value, Context context)
+            throws IOException, InterruptedException {
+      // Make one mapper slower for speculative execution
+      TaskAttemptID taid = context.getTaskAttemptID();
+      long sleepTime = 100;
+      Configuration conf = context.getConfiguration();
+      boolean test_speculate_map =
+              conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false);
+
+      // IF TESTING MAPPER SPECULATIVE EXECUTION:
+      //   Make the "*_m_000000_0" attempt take much longer than the others.
+      //   When speculative execution is enabled, this should cause the attempt
+      //   to be killed and restarted. At that point, the attempt ID will be
+      //   "*_m_000000_1", so sleepTime will still remain 100ms.
+      if ( (taid.getTaskType() == TaskType.MAP) && test_speculate_map
+            && (taid.getTaskID().getId() == 0) && (taid.getId() == 0)) {
+        sleepTime = 10000;
+      }
+      try{
+        Thread.sleep(sleepTime);
+      } catch(InterruptedException ie) {
+        // Ignore
+      }
+      context.write(value, new IntWritable(1));
+    }
+  }
+
+  public static class SpeculativeReducer extends
+    Reducer<Text,IntWritable,Text,IntWritable> {
+
+    public void reduce(Text key, Iterable<IntWritable> values, 
+                           Context context) throws IOException, InterruptedException {
+      // Make one reducer slower for speculative execution
+      TaskAttemptID taid = context.getTaskAttemptID();
+      long sleepTime = 100;
+      Configuration conf = context.getConfiguration();
+      boolean test_speculate_reduce =
+                conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+
+      // IF TESTING REDUCE SPECULATIVE EXECUTION:
+      //   Make the "*_r_000000_0" attempt take much longer than the others.
+      //   When speculative execution is enabled, this should cause the attempt
+      //   to be killed and restarted. At that point, the attempt ID will be
+      //   "*_r_000000_1", so sleepTime will still remain 100ms.
+      if ( (taid.getTaskType() == TaskType.REDUCE) && test_speculate_reduce
+            && (taid.getTaskID().getId() == 0) && (taid.getId() == 0)) {
+        sleepTime = 10000;
+      }
+      try{
+        Thread.sleep(sleepTime);
+      } catch(InterruptedException ie) {
+        // Ignore
+      }
+      context.write(key,new IntWritable(0));
+    }
+  }
+
+  @Test
+  public void testSpeculativeExecution() throws Exception {
+    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+           + " not found. Not running test.");
+      return;
+    }
+
+    /*------------------------------------------------------------------
+     * Test that Map/Red does not speculate if MAP_SPECULATIVE and 
+     * REDUCE_SPECULATIVE are both false.
+     * -----------------------------------------------------------------
+     */
+    Job job = runSpecTest(false, false);
+
+    boolean succeeded = job.waitForCompletion(true);
+    Assert.assertTrue(succeeded);
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+    Counters counters = job.getCounters();
+    Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+            .getValue());
+    Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES)
+            .getValue());
+    Assert.assertEquals(0, counters.findCounter(JobCounter.NUM_FAILED_MAPS)
+            .getValue());
+
+    /*----------------------------------------------------------------------
+     * Test that Mapper speculates if MAP_SPECULATIVE is true and
+     * REDUCE_SPECULATIVE is false.
+     * ---------------------------------------------------------------------
+     */
+    job = runSpecTest(true, false);
+
+    succeeded = job.waitForCompletion(true);
+    Assert.assertTrue(succeeded);
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+    counters = job.getCounters();
+
+    // The long-running map will be killed and a new one started.
+    Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+            .getValue());
+    Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES)
+            .getValue());
+    Assert.assertEquals(1, counters.findCounter(JobCounter.NUM_FAILED_MAPS)
+            .getValue());
+
+    /*----------------------------------------------------------------------
+     * Test that Reducer speculates if REDUCE_SPECULATIVE is true and
+     * MAP_SPECULATIVE is false.
+     * ---------------------------------------------------------------------
+     */
+    job = runSpecTest(false, true);
+
+    succeeded = job.waitForCompletion(true);
+    Assert.assertTrue(succeeded);
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+    counters = job.getCounters();
+
+    // The long-running map will be killed and a new one started.
+    Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+            .getValue());
+    Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES)
+            .getValue());
+  }
+
+  private Path createTempFile(String filename, String contents)
+      throws IOException {
+    Path path = new Path(TEST_ROOT_DIR, filename);
+    FSDataOutputStream os = localFs.create(path);
+    os.writeBytes(contents);
+    os.close();
+    localFs.setPermission(path, new FsPermission("700"));
+    return path;
+  }
+  
+  private Job runSpecTest(boolean mapspec, boolean redspec)
+      throws IOException, ClassNotFoundException, InterruptedException {
+
+    Path first = createTempFile("specexec_map_input1", "a\nz");
+    Path secnd = createTempFile("specexec_map_input2", "a\nz");
+
+    Configuration conf = mrCluster.getConfig();
+    conf.setBoolean(MRJobConfig.MAP_SPECULATIVE,mapspec);
+    conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE,redspec);
+    conf.setClass(MRJobConfig.MR_AM_TASK_ESTIMATOR,
+            TestSpecEstimator.class,
+            TaskRuntimeEstimator.class);
+
+    Job job = Job.getInstance(conf);
+    job.setJarByClass(TestSpeculativeExecution.class);
+    job.setMapperClass(SpeculativeMapper.class);
+    job.setReducerClass(SpeculativeReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+    job.setNumReduceTasks(2);
+    FileInputFormat.setInputPaths(job, first);
+    FileInputFormat.addInputPath(job, secnd);
+    FileOutputFormat.setOutputPath(job, TEST_OUT_DIR);
+
+    // Delete output directory if it exists.
+    try {
+      localFs.delete(TEST_OUT_DIR,true);
+    } catch (IOException e) {
+      // ignore
+    }
+
+    // Creates the Job Configuration
+    job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+    job.createSymlink();
+    job.setMaxMapAttempts(2);
+
+    job.submit();
+
+    return job;
+  }
+}