Browse Source

MAPREDUCE-5533. Fixed MR speculation code to track any TaskAttempts that aren't heart-beating for a while, so that we can aggressively speculate instead of waiting for task-timeout. Contributed by Xuan Gong.
svn merge --ignore-ancestry -c 1529229 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.1-beta@1529231 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli 11 năm trước cách đây
mục cha
commit
e6e512dbec

+ 4 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -90,6 +90,10 @@ Release 2.1.2 - UNRELEASED
     MAPREDUCE-5442. $HADOOP_MAPRED_HOME/$HADOOP_CONF_DIR setting not working on
     Windows. (Yingda Chen via cnauroth)
 
+    MAPREDUCE-5533. Fixed MR speculation code to track any TaskAttempts that
+    aren't heart-beating for a while, so that we can aggressively speculate
+    instead of waiting for task-timeout (Xuan Gong via vinodkv)
+
 Release 2.1.1-beta - 2013-09-23
 
   INCOMPATIBLE CHANGES

+ 83 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java

@@ -78,6 +78,16 @@ public class DefaultSpeculator extends AbstractService implements
   private final Map<Task, AtomicBoolean> pendingSpeculations
       = new ConcurrentHashMap<Task, AtomicBoolean>();
 
+  // Used to track any TaskAttempts that aren't heart-beating for a while, so
+  // that we can aggressively speculate instead of waiting for task-timeout.
+  private final ConcurrentMap<TaskAttemptId, TaskAttemptHistoryStatistics>
+      runningTaskAttemptStatistics = new ConcurrentHashMap<TaskAttemptId,
+          TaskAttemptHistoryStatistics>();
+  // Regular heartbeat from tasks is every 3 secs. So if we don't get a
+  // heartbeat in 9 secs (3 heartbeats), we simulate a heartbeat with no change
+  // in progress.
+  private static final long MAX_WAITTING_TIME_FOR_HEARTBEAT = 9 * 1000;
+
   // These are the current needs, not the initial needs.  For each job, these
   //  record the number of attempts that exist and that are actively
   //  waiting for a container [as opposed to running or finished]
@@ -329,6 +339,9 @@ public class DefaultSpeculator extends AbstractService implements
       runningTasks.putIfAbsent(taskID, Boolean.TRUE);
     } else {
       runningTasks.remove(taskID, Boolean.TRUE);
+      if (!stateString.equals(TaskAttemptState.STARTING.name())) {
+        runningTaskAttemptStatistics.remove(attemptID);
+      }
     }
   }
 
@@ -389,6 +402,33 @@ public class DefaultSpeculator extends AbstractService implements
         long estimatedReplacementEndTime
             = now + estimator.estimatedNewAttemptRuntime(taskID);
 
+        float progress = taskAttempt.getProgress();
+        TaskAttemptHistoryStatistics data =
+            runningTaskAttemptStatistics.get(runningTaskAttemptID);
+        if (data == null) {
+          runningTaskAttemptStatistics.put(runningTaskAttemptID,
+            new TaskAttemptHistoryStatistics(estimatedRunTime, progress, now));
+        } else {
+          if (estimatedRunTime == data.getEstimatedRunTime()
+              && progress == data.getProgress()) {
+            // Previous stats are same as same stats
+            if (data.notHeartbeatedInAWhile(now)) {
+              // Stats have stagnated for a while, simulate heart-beat.
+              TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
+              taskAttemptStatus.id = runningTaskAttemptID;
+              taskAttemptStatus.progress = progress;
+              taskAttemptStatus.taskState = taskAttempt.getState();
+              // Now simulate the heart-beat
+              handleAttempt(taskAttemptStatus);
+            }
+          } else {
+            // Stats have changed - update our data structure
+            data.setEstimatedRunTime(estimatedRunTime);
+            data.setProgress(progress);
+            data.resetHeartBeatTime(now);
+          }
+        }
+
         if (estimatedEndTime < now) {
           return PROGRESS_IS_GOOD;
         }
@@ -511,4 +551,47 @@ public class DefaultSpeculator extends AbstractService implements
     // We'll try to issue one map and one reduce speculation per job per run
     return maybeScheduleAMapSpeculation() + maybeScheduleAReduceSpeculation();
   }
+
+  static class TaskAttemptHistoryStatistics {
+
+    private long estimatedRunTime;
+    private float progress;
+    private long lastHeartBeatTime;
+
+    public TaskAttemptHistoryStatistics(long estimatedRunTime, float progress,
+        long nonProgressStartTime) {
+      this.estimatedRunTime = estimatedRunTime;
+      this.progress = progress;
+      resetHeartBeatTime(nonProgressStartTime);
+    }
+
+    public long getEstimatedRunTime() {
+      return this.estimatedRunTime;
+    }
+
+    public float getProgress() {
+      return this.progress;
+    }
+
+    public void setEstimatedRunTime(long estimatedRunTime) {
+      this.estimatedRunTime = estimatedRunTime;
+    }
+
+    public void setProgress(float progress) {
+      this.progress = progress;
+    }
+
+    public boolean notHeartbeatedInAWhile(long now) {
+      if (now - lastHeartBeatTime <= MAX_WAITTING_TIME_FOR_HEARTBEAT) {
+        return false;
+      } else {
+        resetHeartBeatTime(now);
+        return true;
+      }
+    }
+
+    public void resetHeartBeatTime(long lastHeartBeatTime) {
+      this.lastHeartBeatTime = lastHeartBeatTime;
+    }
+  }
 }

+ 13 - 7
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java

@@ -263,16 +263,22 @@ public class MRApp extends MRAppMaster {
   }
 
   public Job submit(Configuration conf) throws Exception {
+    //TODO: fix the bug where the speculator gets events with 
+    //not-fully-constructed objects. For now, disable speculative exec
+    return submit(conf, false, false);
+  }
+
+  public Job submit(Configuration conf, boolean mapSpeculative,
+      boolean reduceSpeculative) throws Exception {
     String user = conf.get(MRJobConfig.USER_NAME, UserGroupInformation
-      .getCurrentUser().getShortUserName());
+        .getCurrentUser().getShortUserName());
     conf.set(MRJobConfig.USER_NAME, user);
     conf.set(MRJobConfig.MR_AM_STAGING_DIR, testAbsPath.toString());
     conf.setBoolean(MRJobConfig.MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR, true);
-    //TODO: fix the bug where the speculator gets events with 
-    //not-fully-constructed objects. For now, disable speculative exec
-    LOG.info("****DISABLING SPECULATIVE EXECUTION*****");
-    conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
-    conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+    // TODO: fix the bug where the speculator gets events with
+    // not-fully-constructed objects. For now, disable speculative exec
+    conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, mapSpeculative);
+    conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, reduceSpeculative);
 
     init(conf);
     start();
@@ -281,7 +287,7 @@ public class MRApp extends MRAppMaster {
 
     // Write job.xml
     String jobFile = MRApps.getJobFile(conf, user,
-      TypeConverter.fromYarn(job.getID()));
+        TypeConverter.fromYarn(job.getID()));
     LOG.info("Writing job conf to " + jobFile);
     new File(jobFile).getParentFile().mkdirs();
     conf.writeXml(new FileOutputStream(jobFile));

+ 220 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java

@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
+import org.apache.hadoop.mapreduce.v2.app.MRApp;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.junit.Test;
+
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class TestSpeculativeExecutionWithMRApp {
+
+  private static final int NUM_MAPPERS = 5;
+  private static final int NUM_REDUCERS = 0;
+
+  @Test(timeout = 60000)
+  public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception {
+
+    Clock actualClock = new SystemClock();
+    ControlledClock clock = new ControlledClock(actualClock);
+    clock.setTime(System.currentTimeMillis());
+
+    MRApp app =
+        new MRApp(NUM_MAPPERS, NUM_REDUCERS, false, "test", true, clock);
+    Job job = app.submit(new Configuration(), true, true);
+    app.waitForState(job, JobState.RUNNING);
+
+    Map<TaskId, Task> tasks = job.getTasks();
+    Assert.assertEquals("Num tasks is not correct", NUM_MAPPERS + NUM_REDUCERS,
+      tasks.size());
+    Iterator<Task> taskIter = tasks.values().iterator();
+    while (taskIter.hasNext()) {
+      app.waitForState(taskIter.next(), TaskState.RUNNING);
+    }
+
+    // Process the update events
+    clock.setTime(System.currentTimeMillis() + 2000);
+    EventHandler appEventHandler = app.getContext().getEventHandler();
+    for (Map.Entry<TaskId, Task> mapTask : tasks.entrySet()) {
+      for (Map.Entry<TaskAttemptId, TaskAttempt> taskAttempt : mapTask
+        .getValue().getAttempts().entrySet()) {
+        TaskAttemptStatus status =
+            createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.8,
+              TaskAttemptState.RUNNING);
+        TaskAttemptStatusUpdateEvent event =
+            new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status);
+        appEventHandler.handle(event);
+      }
+    }
+
+    Random generator = new Random();
+    Object[] taskValues = tasks.values().toArray();
+    Task taskToBeSpeculated =
+        (Task) taskValues[generator.nextInt(taskValues.length)];
+
+    // Other than one random task, finish every other task.
+    for (Map.Entry<TaskId, Task> mapTask : tasks.entrySet()) {
+      for (Map.Entry<TaskAttemptId, TaskAttempt> taskAttempt : mapTask
+        .getValue().getAttempts().entrySet()) {
+        if (mapTask.getKey() != taskToBeSpeculated.getID()) {
+          appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
+            TaskAttemptEventType.TA_DONE));
+          appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
+            TaskAttemptEventType.TA_CONTAINER_CLEANED));
+          app.waitForState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED);
+        }
+      }
+    }
+
+    int maxTimeWait = 10;
+    boolean successfullySpeculated = false;
+    while (maxTimeWait > 0 && !successfullySpeculated) {
+      if (taskToBeSpeculated.getAttempts().size() != 2) {
+        Thread.sleep(1000);
+        clock.setTime(System.currentTimeMillis() + 20000);
+      } else {
+        successfullySpeculated = true;
+      }
+      maxTimeWait--;
+    }
+    Assert
+      .assertTrue("Couldn't speculate successfully", successfullySpeculated);
+  }
+
+  @Test(timeout = 60000)
+  public void testSepculateSuccessfulWithUpdateEvents() throws Exception {
+
+    Clock actualClock = new SystemClock();
+    ControlledClock clock = new ControlledClock(actualClock);
+    clock.setTime(System.currentTimeMillis());
+
+    MRApp app =
+        new MRApp(NUM_MAPPERS, NUM_REDUCERS, false, "test", true, clock);
+    Job job = app.submit(new Configuration(), true, true);
+    app.waitForState(job, JobState.RUNNING);
+
+    Map<TaskId, Task> tasks = job.getTasks();
+    Assert.assertEquals("Num tasks is not correct", NUM_MAPPERS + NUM_REDUCERS,
+      tasks.size());
+    Iterator<Task> taskIter = tasks.values().iterator();
+    while (taskIter.hasNext()) {
+      app.waitForState(taskIter.next(), TaskState.RUNNING);
+    }
+
+    // Process the update events
+    clock.setTime(System.currentTimeMillis() + 1000);
+    EventHandler appEventHandler = app.getContext().getEventHandler();
+    for (Map.Entry<TaskId, Task> mapTask : tasks.entrySet()) {
+      for (Map.Entry<TaskAttemptId, TaskAttempt> taskAttempt : mapTask
+        .getValue().getAttempts().entrySet()) {
+        TaskAttemptStatus status =
+            createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.5,
+              TaskAttemptState.RUNNING);
+        TaskAttemptStatusUpdateEvent event =
+            new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status);
+        appEventHandler.handle(event);
+      }
+    }
+
+    Task speculatedTask = null;
+    int numTasksToFinish = NUM_MAPPERS + NUM_REDUCERS - 1;
+    clock.setTime(System.currentTimeMillis() + 1000);
+    for (Map.Entry<TaskId, Task> task : tasks.entrySet()) {
+      for (Map.Entry<TaskAttemptId, TaskAttempt> taskAttempt : task.getValue()
+        .getAttempts().entrySet()) {
+        if (numTasksToFinish > 0) {
+          appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
+            TaskAttemptEventType.TA_DONE));
+          appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(),
+            TaskAttemptEventType.TA_CONTAINER_CLEANED));
+          numTasksToFinish--;
+          app.waitForState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED);
+        } else {
+          // The last task is chosen for speculation
+          TaskAttemptStatus status =
+              createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.75,
+                TaskAttemptState.RUNNING);
+          speculatedTask = task.getValue();
+          TaskAttemptStatusUpdateEvent event =
+              new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status);
+          appEventHandler.handle(event);
+        }
+      }
+    }
+
+    clock.setTime(System.currentTimeMillis() + 15000);
+    for (Map.Entry<TaskId, Task> task : tasks.entrySet()) {
+      for (Map.Entry<TaskAttemptId, TaskAttempt> taskAttempt : task.getValue()
+        .getAttempts().entrySet()) {
+        if (taskAttempt.getValue().getState() != TaskAttemptState.SUCCEEDED) {
+          TaskAttemptStatus status =
+              createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.75,
+                TaskAttemptState.RUNNING);
+          TaskAttemptStatusUpdateEvent event =
+              new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), status);
+          appEventHandler.handle(event);
+        }
+      }
+    }
+
+    int maxTimeWait = 5;
+    boolean successfullySpeculated = false;
+    while (maxTimeWait > 0 && !successfullySpeculated) {
+      if (speculatedTask.getAttempts().size() != 2) {
+        Thread.sleep(1000);
+      } else {
+        successfullySpeculated = true;
+      }
+      maxTimeWait--;
+    }
+    Assert
+      .assertTrue("Couldn't speculate successfully", successfullySpeculated);
+  }
+
+  private TaskAttemptStatus createTaskAttemptStatus(TaskAttemptId id,
+      float progress, TaskAttemptState state) {
+    TaskAttemptStatus status = new TaskAttemptStatus();
+    status.id = id;
+    status.progress = progress;
+    status.taskState = state;
+    return status;
+  }
+}