浏览代码

MAPREDUCE-2641. Fix the ExponentiallySmoothedTaskRuntimeEstimator and its unit test. (Josh Willis via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-279@1153017 13f79535-47bb-0310-9956-ffa450edef68
Mahadev Konar 14 年之前
父节点
当前提交
9e5fcd2e04

+ 3 - 0
mapreduce/CHANGES.txt

@@ -4,6 +4,9 @@ Trunk (unreleased changes)
 
 
     MAPREDUCE-279
+    
+    MAPREDUCE-2641. Fix the ExponentiallySmoothedTaskRuntimeEstimator
+    and its unit test. (Josh Willis via mahadev)
 
     Fix poms for adding dependency on sl4j-simple. (siddharth seth) 
  

+ 9 - 7
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java

@@ -23,16 +23,15 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.v2.MRConstants;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.app.AMConstants;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
 
-/*
- * This estimator exponentially smooths the rate of progress vrs. wallclock
- *  time.  Conceivably we could write an estimator that smooths time per
- *  unit progress, and get different results.
+/**
+ * This estimator exponentially smooths the rate of progress versus wallclock
+ * time.  Conceivably we could write an estimator that smooths time per
+ * unit progress, and get different results.
  */
 public class ExponentiallySmoothedTaskRuntimeEstimator extends StartEndTimesBase {
 
@@ -64,8 +63,7 @@ public class ExponentiallySmoothedTaskRuntimeEstimator extends StartEndTimesBase
     final float basedOnProgress;
     final long atTime;
 
-    EstimateVector
-        (double value, float basedOnProgress, long atTime) {
+    EstimateVector(double value, float basedOnProgress, long atTime) {
       this.value = value;
       this.basedOnProgress = basedOnProgress;
       this.atTime = atTime;
@@ -94,11 +92,13 @@ public class ExponentiallySmoothedTaskRuntimeEstimator extends StartEndTimesBase
 
   private void incorporateReading
       (TaskAttemptId attemptID, float newProgress, long newTime) {
+    //TODO: Refactor this method, it seems more complicated than necessary.
     AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID);
 
     if (vectorRef == null) {
       estimates.putIfAbsent(attemptID, new AtomicReference<EstimateVector>(null));
       incorporateReading(attemptID, newProgress, newTime);
+      return;
     }
 
     EstimateVector oldVector = vectorRef.get();
@@ -110,6 +110,7 @@ public class ExponentiallySmoothedTaskRuntimeEstimator extends StartEndTimesBase
       }
 
       incorporateReading(attemptID, newProgress, newTime);
+      return;
     }
 
     while (!vectorRef.compareAndSet
@@ -184,6 +185,7 @@ public class ExponentiallySmoothedTaskRuntimeEstimator extends StartEndTimesBase
 
   @Override
   public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+    super.updateAttempt(status, timestamp);
     TaskAttemptId attemptID = status.id;
 
     float progress = status.progress;

+ 12 - 41
mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java

@@ -27,8 +27,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.conf.Configuration;
@@ -64,15 +63,14 @@ import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 
+
 public class TestRuntimeEstimators {
 
   private static int INITIAL_NUMBER_FREE_SLOTS = 600;
@@ -82,9 +80,7 @@ public class TestRuntimeEstimators {
   private static int MAP_TASKS = 200;
   private static int REDUCE_TASKS = 150;
 
-  private Queue<TaskEvent> taskEvents;
-
-  Clock clock;
+  MockClock clock;
 
   Job myJob;
 
@@ -94,7 +90,7 @@ public class TestRuntimeEstimators {
 
   private final AtomicInteger slotsInUse = new AtomicInteger(0);
 
-  Dispatcher dispatcher;
+  AsyncDispatcher dispatcher;
 
   DefaultSpeculator speculator;
 
@@ -114,7 +110,8 @@ public class TestRuntimeEstimators {
   private void coreTestEstimator
       (TaskRuntimeEstimator testedEstimator, int expectedSpeculations) {
     estimator = testedEstimator;
-    taskEvents = new ConcurrentLinkedQueue<TaskEvent>();
+	clock = new MockClock();
+	dispatcher = new AsyncDispatcher();
     myJob = null;
     slotsInUse.set(0);
     completedMaps.set(0);
@@ -122,7 +119,7 @@ public class TestRuntimeEstimators {
     successfulSpeculations.set(0);
     taskTimeSavedBySpeculation.set(0);
 
-    ((MockClock)clock).advanceTime(1000);
+    clock.advanceTime(1000);
 
     Configuration conf = new Configuration();
 
@@ -137,8 +134,8 @@ public class TestRuntimeEstimators {
 
     dispatcher.register(TaskEventType.class, new SpeculationRequestEventHandler());
 
-    ((AsyncDispatcher)dispatcher).init(conf);
-    ((AsyncDispatcher)dispatcher).start();
+    dispatcher.init(conf);
+    dispatcher.start();
 
 
 
@@ -208,7 +205,7 @@ public class TestRuntimeEstimators {
         }
       }
 
-      ((MockClock) clock).advanceTime(1000L);
+      clock.advanceTime(1000L);
 
       if (clock.getTime() % 10000L == 0L) {
         speculator.scanForSpeculations();
@@ -221,46 +218,21 @@ public class TestRuntimeEstimators {
 
   @Test
   public void testLegacyEstimator() throws Exception {
-    clock = new MockClock();
     TaskRuntimeEstimator specificEstimator = new LegacyTaskRuntimeEstimator();
-    Configuration conf = new Configuration();
-    dispatcher = new AsyncDispatcher();
-    myAppContext = new MyAppContext(MAP_TASKS, REDUCE_TASKS);
-
     coreTestEstimator(specificEstimator, 3);
   }
 
-  @Ignore
   @Test
   public void testExponentialEstimator() throws Exception {
-    clock = new MockClock();
     TaskRuntimeEstimator specificEstimator
         = new ExponentiallySmoothedTaskRuntimeEstimator();
-    Configuration conf = new Configuration();
-    dispatcher = new AsyncDispatcher();
-    myAppContext = new MyAppContext(MAP_TASKS, REDUCE_TASKS);
-
-    coreTestEstimator(new LegacyTaskRuntimeEstimator(), 3);
+    coreTestEstimator(specificEstimator, 3);
   }
 
   int taskTypeSlots(TaskType type) {
     return type == TaskType.MAP ? MAP_SLOT_REQUIREMENT : REDUCE_SLOT_REQUIREMENT;
   }
 
-  private boolean jobComplete() {
-    for (Task task : myJob.getTasks().values()) {
-      if (!task.isFinished()) {
-        return false;
-      }
-    }
-
-    return true;
-  }
-
-  private int slotsInUse(int mapSize, int reduceSize) {
-    return slotsInUse.get();
-  }
-
   class SpeculationRequestEventHandler implements EventHandler<TaskEvent> {
 
     @Override
@@ -286,7 +258,7 @@ public class TestRuntimeEstimators {
   class MyTaskImpl implements Task {
     private final TaskId taskID;
     private final Map<TaskAttemptId, TaskAttempt> attempts
-        = new HashMap<TaskAttemptId, TaskAttempt>(4);
+        = new ConcurrentHashMap<TaskAttemptId, TaskAttempt>(4);
 
     MyTaskImpl(JobId jobID, int index, TaskType type) {
       taskID = recordFactory.newRecordInstance(TaskId.class);
@@ -629,7 +601,6 @@ public class TestRuntimeEstimators {
 
         // check for a spectacularly successful speculation
         TaskId taskID = myAttemptID.getTaskId();
-        Task undoneTask = null;
 
         Task task = myJob.getTask(taskID);