소스 검색

MAPREDUCE-5464. Add analogs of the SLOTS_MILLIS counters that jive with the YARN resource model (Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.3@1562230 13f79535-47bb-0310-9956-ffa450edef68
Sanford Ryza 11 년 전
부모
커밋
ad6f4b9809

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -58,6 +58,9 @@ Release 2.4.0 - UNRELEASED
     MAPREDUCE-5725. Make explicit that TestNetworkedJob relies on the Capacity
     Scheduler (Sandy Ryza)
 
+    MAPREDUCE-5464. Add analogs of the SLOTS_MILLIS counters that jive with the
+    YARN resource model (Sandy Ryza)
+
   OPTIMIZATIONS
 
     MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)

+ 26 - 34
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -1261,57 +1261,56 @@ public abstract class TaskAttemptImpl implements
       }
     }
   }
-
-  private static long computeSlotMillis(TaskAttemptImpl taskAttempt) {
+  
+  private static void updateMillisCounters(JobCounterUpdateEvent jce,
+      TaskAttemptImpl taskAttempt) {
     TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
-    int slotMemoryReq =
+    long duration = (taskAttempt.getFinishTime() - taskAttempt.getLaunchTime());
+    int mbRequired =
         taskAttempt.getMemoryRequired(taskAttempt.conf, taskType);
+    int vcoresRequired = taskAttempt.getCpuRequired(taskAttempt.conf, taskType);
 
     int minSlotMemSize = taskAttempt.conf.getInt(
       YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
       YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
 
     int simSlotsRequired =
-        minSlotMemSize == 0 ? 0 : (int) Math.ceil((float) slotMemoryReq
+        minSlotMemSize == 0 ? 0 : (int) Math.ceil((float) mbRequired
             / minSlotMemSize);
 
-    long slotMillisIncrement =
-        simSlotsRequired
-            * (taskAttempt.getFinishTime() - taskAttempt.getLaunchTime());
-    return slotMillisIncrement;
+    if (taskType == TaskType.MAP) {
+      jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, simSlotsRequired * duration);
+      jce.addCounterUpdate(JobCounter.MB_MILLIS_MAPS, duration * mbRequired);
+      jce.addCounterUpdate(JobCounter.VCORES_MILLIS_MAPS, duration * vcoresRequired);
+      jce.addCounterUpdate(JobCounter.MILLIS_MAPS, duration);
+    } else {
+      jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, simSlotsRequired * duration);
+      jce.addCounterUpdate(JobCounter.MB_MILLIS_REDUCES, duration * mbRequired);
+      jce.addCounterUpdate(JobCounter.VCORES_MILLIS_REDUCES, duration * vcoresRequired);
+      jce.addCounterUpdate(JobCounter.MILLIS_REDUCES, duration);
+    }
   }
 
   private static JobCounterUpdateEvent createJobCounterUpdateEventTASucceeded(
       TaskAttemptImpl taskAttempt) {
-    long slotMillis = computeSlotMillis(taskAttempt);
     TaskId taskId = taskAttempt.attemptId.getTaskId();
     JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId());
-    jce.addCounterUpdate(
-      taskId.getTaskType() == TaskType.MAP ?
-        JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES,
-        slotMillis);
+    updateMillisCounters(jce, taskAttempt);
     return jce;
   }
-
+  
   private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed(
       TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted) {
     TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
     JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId());
     
-    long slotMillisIncrement = computeSlotMillis(taskAttempt);
-    
     if (taskType == TaskType.MAP) {
       jce.addCounterUpdate(JobCounter.NUM_FAILED_MAPS, 1);
-      if(!taskAlreadyCompleted) {
-        // dont double count the elapsed time
-        jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement);
-      }
     } else {
       jce.addCounterUpdate(JobCounter.NUM_FAILED_REDUCES, 1);
-      if(!taskAlreadyCompleted) {
-        // dont double count the elapsed time
-        jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement);
-      }
+    }
+    if (!taskAlreadyCompleted) {
+      updateMillisCounters(jce, taskAttempt);
     }
     return jce;
   }
@@ -1321,20 +1320,13 @@ public abstract class TaskAttemptImpl implements
     TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
     JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId());
     
-    long slotMillisIncrement = computeSlotMillis(taskAttempt);
-    
     if (taskType == TaskType.MAP) {
       jce.addCounterUpdate(JobCounter.NUM_KILLED_MAPS, 1);
-      if(!taskAlreadyCompleted) {
-        // dont double count the elapsed time
-        jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement);
-      }
     } else {
       jce.addCounterUpdate(JobCounter.NUM_KILLED_REDUCES, 1);
-      if(!taskAlreadyCompleted) {
-        // dont double count the elapsed time
-        jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement);
-      }
+    }
+    if (!taskAlreadyCompleted) {
+      updateMillisCounters(jce, taskAttempt);
     }
     return jce;
   }  

+ 22 - 11
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java

@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapTaskAttemptImpl;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@@ -182,13 +183,13 @@ public class TestTaskAttempt{
   }
 
   @Test
-  public void testSlotMillisCounterUpdate() throws Exception {
-    verifySlotMillis(2048, 2048, 1024);
-    verifySlotMillis(2048, 1024, 1024);
-    verifySlotMillis(10240, 1024, 2048);
+  public void testMillisCountersUpdate() throws Exception {
+    verifyMillisCounters(2048, 2048, 1024);
+    verifyMillisCounters(2048, 1024, 1024);
+    verifyMillisCounters(10240, 1024, 2048);
   }
 
-  public void verifySlotMillis(int mapMemMb, int reduceMemMb,
+  public void verifyMillisCounters(int mapMemMb, int reduceMemMb,
       int minContainerSize) throws Exception {
     Clock actualClock = new SystemClock();
     ControlledClock clock = new ControlledClock(actualClock);
@@ -232,13 +233,23 @@ public class TestTaskAttempt{
     Assert.assertEquals(mta.getLaunchTime(), 10);
     Assert.assertEquals(rta.getFinishTime(), 11);
     Assert.assertEquals(rta.getLaunchTime(), 10);
+    Counters counters = job.getAllCounters();
     Assert.assertEquals((int) Math.ceil((float) mapMemMb / minContainerSize),
-        job.getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_MAPS)
-            .getValue());
-    Assert.assertEquals(
-        (int) Math.ceil((float) reduceMemMb / minContainerSize), job
-            .getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_REDUCES)
-            .getValue());
+        counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue());
+    Assert.assertEquals((int) Math.ceil((float) reduceMemMb / minContainerSize),
+        counters.findCounter(JobCounter.SLOTS_MILLIS_REDUCES).getValue());
+    Assert.assertEquals(1,
+        counters.findCounter(JobCounter.MILLIS_MAPS).getValue());
+    Assert.assertEquals(1,
+        counters.findCounter(JobCounter.MILLIS_REDUCES).getValue());
+    Assert.assertEquals(mapMemMb,
+        counters.findCounter(JobCounter.MB_MILLIS_MAPS).getValue());
+    Assert.assertEquals(reduceMemMb,
+        counters.findCounter(JobCounter.MB_MILLIS_REDUCES).getValue());
+    Assert.assertEquals(1,
+        counters.findCounter(JobCounter.VCORES_MILLIS_MAPS).getValue());
+    Assert.assertEquals(1,
+        counters.findCounter(JobCounter.VCORES_MILLIS_REDUCES).getValue());
   }
 
   private TaskAttemptImpl createMapTaskAttemptImplForTest(

+ 7 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java

@@ -45,5 +45,11 @@ public enum JobCounter {
   TOTAL_LAUNCHED_UBERTASKS,
   NUM_UBER_SUBMAPS,
   NUM_UBER_SUBREDUCES,
-  NUM_FAILED_UBERTASKS
+  NUM_FAILED_UBERTASKS,
+  MILLIS_MAPS,
+  MILLIS_REDUCES,
+  VCORES_MILLIS_MAPS,
+  VCORES_MILLIS_REDUCES,
+  MB_MILLIS_MAPS,
+  MB_MILLIS_REDUCES
 }

+ 6 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties

@@ -25,5 +25,11 @@ DATA_LOCAL_MAPS.name=              Data-local map tasks
 RACK_LOCAL_MAPS.name=              Rack-local map tasks
 SLOTS_MILLIS_MAPS.name=            Total time spent by all maps in occupied slots (ms)
 SLOTS_MILLIS_REDUCES.name=         Total time spent by all reduces in occupied slots (ms)
+MILLIS_MAPS.name=                  Total time spent by all map tasks (ms)
+MILLIS_REDUCES.name=               Total time spent by all reduce tasks (ms)
+MB_MILLIS_MAPS.name=               Total megabyte-seconds taken by all map tasks
+MB_MILLIS_REDUCES.name=            Total megabyte-seconds taken by all reduce tasks
+VCORES_MILLIS_MAPS.name=           Total vcore-seconds taken by all map tasks
+VCORES_MILLIS_REDUCES.name=        Total vcore-seconds taken by all reduce tasks
 FALLOW_SLOTS_MILLIS_MAPS.name=     Total time spent by all maps waiting after reserving slots (ms)
 FALLOW_SLOTS_MILLIS_REDUCES.name=  Total time spent by all reduces waiting after reserving slots (ms)