浏览代码

MAPREDUCE-5000. Fixes getCounters when speculating by fixing the selection of the best attempt for a task. Contributed by Jason Lowe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1445874 13f79535-47bb-0310-9956-ffa450edef68
Siddharth Seth 12 年之前
父节点
当前提交
ac8e7f17c1

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -65,6 +65,9 @@ Release 0.23.7 - UNRELEASED
     MAPREDUCE-4992. AM hangs in RecoveryService when recovering tasks with
     speculative attempts (Robert Parker via jlowe)
 
+    MAPREDUCE-5000. Fixes getCounters when speculating by fixing the selection
+    of the best attempt for a task. (Jason Lowe via sseth)
+
 Release 0.23.6 - 2013-02-06
 
   INCOMPATIBLE CHANGES

+ 4 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java

@@ -532,6 +532,10 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
   //select the nextAttemptNumber with best progress
   // always called inside the Read Lock
   private TaskAttempt selectBestAttempt() {
+    if (successfulAttempt != null) {
+      return attempts.get(successfulAttempt);
+    }
+
     float progress = 0f;
     TaskAttempt result = null;
     for (TaskAttempt at : attempts.values()) {

+ 58 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java

@@ -35,6 +35,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -138,6 +141,7 @@ public class TestTaskImpl {
 
     private float progress = 0;
     private TaskAttemptState state = TaskAttemptState.NEW;
+    private Counters attemptCounters = TaskAttemptImpl.EMPTY_COUNTERS;
 
     public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
         TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
@@ -172,7 +176,15 @@ public class TestTaskImpl {
     public TaskAttemptState getState() {
       return state;
     }
-    
+
+    @Override
+    public Counters getCounters() {
+      return attemptCounters;
+    }
+
+    public void setCounters(Counters counters) {
+      attemptCounters = counters;
+    }
   }
   
   private class MockTask extends Task {
@@ -625,4 +637,49 @@ public class TestTaskImpl {
         TaskEventType.T_ATTEMPT_KILLED));
     assertEquals(TaskState.FAILED, mockTask.getState());
   }
+
+  @Test
+  public void testCountersWithSpeculation() {
+    mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
+        remoteJobConfFile, conf, taskAttemptListener, jobToken,
+        credentials, clock,
+        completedTasksFromPreviousRun, startCount,
+        metrics, appContext) {
+          @Override
+          protected int getMaxAttempts() {
+            return 1;
+          }
+    };
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    updateLastAttemptState(TaskAttemptState.RUNNING);
+    MockTaskAttemptImpl baseAttempt = getLastAttempt();
+
+    // add a speculative attempt
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+        TaskEventType.T_ADD_SPEC_ATTEMPT));
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    updateLastAttemptState(TaskAttemptState.RUNNING);
+    MockTaskAttemptImpl specAttempt = getLastAttempt();
+    assertEquals(2, taskAttempts.size());
+
+    Counters specAttemptCounters = new Counters();
+    Counter cpuCounter = specAttemptCounters.findCounter(
+        TaskCounter.CPU_MILLISECONDS);
+    cpuCounter.setValue(1000);
+    specAttempt.setCounters(specAttemptCounters);
+
+    // have the spec attempt succeed but second attempt at 1.0 progress as well
+    commitTaskAttempt(specAttempt.getAttemptId());
+    specAttempt.setProgress(1.0f);
+    specAttempt.setState(TaskAttemptState.SUCCEEDED);
+    mockTask.handle(new TaskTAttemptEvent(specAttempt.getAttemptId(),
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    assertEquals(TaskState.SUCCEEDED, mockTask.getState());
+    baseAttempt.setProgress(1.0f);
+
+    Counters taskCounters = mockTask.getCounters();
+    assertEquals("wrong counters for task", specAttemptCounters, taskCounters);
+  }
 }