Bladeren bron

MAPREDUCE-2037. Capture intermediate progress, CPU and memory usage for tasks. Contributed by Dick King.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1157253 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 14 jaren geleden
bovenliggende
commit
989c5e90a5
27 gewijzigde bestanden met toevoegingen van 1333 en 146 verwijderingen
  1. 3 0
      mapreduce/CHANGES.txt
  2. 23 0
      mapreduce/src/java/mapred-default.xml
  3. 17 12
      mapreduce/src/java/org/apache/hadoop/mapred/Counters.java
  4. 59 0
      mapreduce/src/java/org/apache/hadoop/mapred/CumulativePeriodicStats.java
  5. 18 10
      mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java
  6. 205 0
      mapreduce/src/java/org/apache/hadoop/mapred/PeriodicStatsAccumulator.java
  7. 86 0
      mapreduce/src/java/org/apache/hadoop/mapred/ProgressSplitsBlock.java
  8. 57 0
      mapreduce/src/java/org/apache/hadoop/mapred/StatePeriodicStats.java
  9. 156 69
      mapreduce/src/java/org/apache/hadoop/mapred/TaskInProgress.java
  10. 60 0
      mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/AvroArrayUtils.java
  11. 15 3
      mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr
  12. 64 4
      mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
  13. 63 5
      mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
  14. 61 4
      mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
  15. 3 0
      mapreduce/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
  16. 71 0
      mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplits.java
  17. 14 1
      mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java
  18. 34 10
      mapreduce/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
  19. 14 1
      mapreduce/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
  20. 207 0
      mapreduce/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
  21. 7 5
      mapreduce/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java
  22. 26 2
      mapreduce/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java
  23. 11 6
      mapreduce/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java
  24. 26 3
      mapreduce/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java
  25. 4 3
      mapreduce/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java
  26. 16 1
      mapreduce/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java
  27. 13 7
      mapreduce/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java

+ 3 - 0
mapreduce/CHANGES.txt

@@ -38,6 +38,9 @@ Trunk (unreleased changes)
 
     MAPREDUCE-2323. Add metrics to the fair scheduler. (todd)
 
+    MAPREDUCE-2037. Capture intermediate progress, CPU and memory usage for
+    tasks. (Dick King via acmurthy) 
+
   IMPROVEMENTS
 
     MAPREDUCE-2187. Reporter sends progress during sort/merge. (Anupam Seth via

+ 23 - 0
mapreduce/src/java/mapred-default.xml

@@ -32,6 +32,29 @@
   </description>
 </property>
 
+<property>
+  <name>mapreduce.jobtracker.jobhistory.task.numberprogresssplits</name>
+  <value>12</value>
+  <description> Every task attempt progresses from 0.0 to 1.0 [unless
+  it fails or is killed].  We record, for each task attempt, certain 
+  statistics over each twelfth of the progress range.  You can change
+  the number of intervals we divide the entire range of progress into
+  by setting this property.  Higher values give more precision to the
+  recorded data, but costs more memory in the job tracker at runtime.
+  Each increment in this attribute costs 16 bytes per running task.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.job.userhistorylocation</name>
+  <value></value>
+  <description> User can specify a location to store the history files of 
+  a particular job. If nothing is specified, the logs are stored in 
+  output directory. The files are stored in "_logs/history/" in the directory.
+  User can stop logging by giving the value "none". 
+  </description>
+</property>
+
 <property>
   <name>mapreduce.jobtracker.jobhistory.completed.location</name>
   <value></value>

+ 17 - 12
mapreduce/src/java/org/apache/hadoop/mapred/Counters.java

@@ -413,23 +413,28 @@ public class Counters implements Writable, Iterable<Counters.Group> {
    * with the specified name.
    */
   public synchronized Group getGroup(String groupName) {
-    // To provide support for deprecated group names  
-    if (groupName.equals("org.apache.hadoop.mapred.Task$Counter")) {
-      groupName = "org.apache.hadoop.mapreduce.TaskCounter";
-      LOG.warn("Group org.apache.hadoop.mapred.Task$Counter is deprecated." +
-               " Use org.apache.hadoop.mapreduce.TaskCounter instead");
-    } else if (groupName.equals(
-                 "org.apache.hadoop.mapred.JobInProgress$Counter")) {
-      groupName = "org.apache.hadoop.mapreduce.JobCounter";
-      LOG.warn("Group org.apache.hadoop.mapred.JobInProgress$Counter " +
-               "is deprecated. Use " +
-               "org.apache.hadoop.mapreduce.JobCounter instead");
-    }
     Group result = counters.get(groupName);
+
     if (result == null) {
+      // To provide support for deprecated group names  
+      if (groupName.equals("org.apache.hadoop.mapred.Task$Counter")) {
+        LOG.warn("Group org.apache.hadoop.mapred.Task$Counter is deprecated." +
+                 " Use org.apache.hadoop.mapreduce.TaskCounter instead");
+        return getGroup("org.apache.hadoop.mapreduce.TaskCounter");
+      } 
+
+      if (groupName.equals
+          ("org.apache.hadoop.mapred.JobInProgress$Counter")) {
+        LOG.warn("Group org.apache.hadoop.mapred.JobInProgress$Counter " +
+                 "is deprecated. Use " +
+                 "org.apache.hadoop.mapreduce.JobCounter instead");
+        return getGroup("org.apache.hadoop.mapreduce.JobCounter");
+      }
+
       result = new Group(groupName);
       counters.put(groupName, result);
     }
+
     return result;
   }
 

+ 59 - 0
mapreduce/src/java/org/apache/hadoop/mapred/CumulativePeriodicStats.java

@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+/**
+ *
+ * This class is a concrete PeriodicStatsAccumulator that deals with
+ *  measurements where the raw data are a measurement of an
+ *  accumulation.  The result in each bucket is the estimate 
+ *  of the progress-weighted change in that quantity over the
+ *  progress range covered by the bucket.
+ *
+ * <p>An easy-to-understand example of this kind of quantity would be
+ *  a distance traveled.  It makes sense to consider that portion of
+ *  the total travel that can be apportioned to each bucket.
+ *
+ */
+class CumulativePeriodicStats extends PeriodicStatsAccumulator {
+  // int's are acceptable here, even though times are normally
+  // long's, because these are a difference and an int won't
+  // overflow for 24 days.  Tasks can't run for more than about a
+  // week for other reasons, and most jobs would be written 
+  int previousValue = 0;
+
+  CumulativePeriodicStats(int count) {
+    super(count);
+  }
+
+  /**
+   *
+   * accumulates a new reading by keeping a running account of the
+   *  value distance from the beginning of the bucket to the end of
+   *  this reading
+   */
+  @Override
+    protected void extendInternal(double newProgress, int newValue) {
+    if (state == null) {
+      return;
+    }
+
+    state.currentAccumulation += (double)(newValue - previousValue);
+    previousValue = newValue;
+  }
+}

+ 18 - 10
mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -2673,25 +2673,29 @@ public class JobInProgress {
         status.getTaskTracker(),  ttStatus.getHttpPort());
     
     jobHistory.logEvent(tse, status.getTaskID().getJobID());
-    
+    TaskAttemptID statusAttemptID = status.getTaskID();
 
     if (status.getIsMap()){
       MapAttemptFinishedEvent mfe = new MapAttemptFinishedEvent(
-          status.getTaskID(), taskType, TaskStatus.State.SUCCEEDED.toString(),
+          statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(),
           status.getMapFinishTime(),
           status.getFinishTime(),  trackerHostname,
           status.getStateString(), 
-          new org.apache.hadoop.mapreduce.Counters(status.getCounters()));
+          new org.apache.hadoop.mapreduce.Counters(status.getCounters()),
+          tip.getSplits(statusAttemptID).burst()
+          );
       
       jobHistory.logEvent(mfe,  status.getTaskID().getJobID());
       
     }else{
       ReduceAttemptFinishedEvent rfe = new ReduceAttemptFinishedEvent(
-          status.getTaskID(), taskType, TaskStatus.State.SUCCEEDED.toString(), 
+          statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(), 
           status.getShuffleFinishTime(),
           status.getSortFinishTime(), status.getFinishTime(),
           trackerHostname, status.getStateString(),
-          new org.apache.hadoop.mapreduce.Counters(status.getCounters()));
+          new org.apache.hadoop.mapreduce.Counters(status.getCounters()),
+          tip.getSplits(statusAttemptID).burst()
+          );
       
       jobHistory.logEvent(rfe,  status.getTaskID().getJobID());
       
@@ -3171,12 +3175,16 @@ public class JobInProgress {
         taskid, taskType, startTime, taskTrackerName, taskTrackerPort);
     
     jobHistory.logEvent(tse, taskid.getJobID());
+
+    ProgressSplitsBlock splits = tip.getSplits(taskStatus.getTaskID());
    
-    TaskAttemptUnsuccessfulCompletionEvent tue = 
-      new TaskAttemptUnsuccessfulCompletionEvent(taskid, 
-          taskType, taskStatus.getRunState().toString(),
-          finishTime, 
-          taskTrackerHostName, diagInfo);
+    TaskAttemptUnsuccessfulCompletionEvent tue =
+      new TaskAttemptUnsuccessfulCompletionEvent
+            (taskid, 
+             taskType, taskStatus.getRunState().toString(),
+             finishTime, 
+             taskTrackerHostName, diagInfo,
+             splits.burst());
     jobHistory.logEvent(tue, taskid.getJobID());
         
     // After this, try to assign tasks with the one after this, so that

+ 205 - 0
mapreduce/src/java/org/apache/hadoop/mapred/PeriodicStatsAccumulator.java

@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/**
+ *
+ * This abstract class that represents a bucketed series of
+ *  measurements of a quantity being measured in a running task
+ *  attempt. 
+ *
+ * <p>The sole constructor is called with a count, which is the
+ *  number of buckets into which we evenly divide the spectrum of
+ *  progress from 0.0D to 1.0D .  In the future we may provide for
+ *  custom split points that don't have to be uniform.
+ *
+ * <p>A subclass determines how we fold readings for portions of a
+ *  bucket and how we interpret the readings by overriding
+ *  {@code extendInternal(...)} and {@code initializeInterval()}
+ */
+public abstract class PeriodicStatsAccumulator {
+  // The range of progress from 0.0D through 1.0D is divided into
+  //  count "progress segments".  This object accumulates an
+  //  estimate of the effective value of a time-varying value during
+  //  the zero-based i'th progress segment, ranging from i/count
+  //  through (i+1)/count . 
+  // This is an abstract class.  We have two implementations: one
+  //  for monotonically increasing time-dependent variables
+  //  [currently, CPU time in milliseconds and wallclock time in
+  //  milliseconds] and one for quantities that can vary arbitrarily
+  //  over time, currently virtual and physical memory used, in
+  //  kilobytes. 
+  // We carry int's here.  This saves a lot of JVM heap space in the
+  //  job tracker per running task attempt [200 bytes per] but it
+  //  has a small downside.
+  // No task attempt can run for more than 57 days nor occupy more
+  //  than two terabytes of virtual memory. 
+  protected final int count;
+  protected final int[] values;
+    
+  static class StatsetState {
+    int oldValue = 0;
+    double oldProgress = 0.0D;
+
+    double currentAccumulation = 0.0D;
+  }
+
+  // We provide this level of indirection to reduce the memory
+  //  footprint of done task attempts.  When a task's progress
+  //  reaches 1.0D, we delete this objecte StatsetState.
+  StatsetState state = new StatsetState();
+
+  PeriodicStatsAccumulator(int count) {
+    this.count = count;
+    this.values = new int[count];
+    for (int i = 0; i < count; ++i) {
+      values[i] = -1;
+    }
+  }
+
+  protected int[] getValues() {
+    return values;
+  }
+
+  // The concrete implementation of this abstract function
+  //  accumulates more data into the current progress segment.
+  //  newProgress [from the call] and oldProgress [from the object]
+  //  must be in [or at the border of] a single progress segment.
+  /**
+   *
+   * adds a new reading to the current bucket.
+   *
+   * @param newProgress the endpoint of the interval this new
+   *                      reading covers
+   * @param newValue the value of the reading at {@code newProgress} 
+   *
+   * The class has three instance variables, {@code oldProgress} and
+   *  {@code oldValue} and {@code currentAccumulation}. 
+   *
+   * {@code extendInternal} can count on three things: 
+   *
+   *   1: The first time it's called in a particular instance, both
+   *      oldXXX's will be zero.
+   *
+   *   2: oldXXX for a later call is the value of newXXX of the
+   *      previous call.  This ensures continuity in accumulation from
+   *      one call to the next.
+   *
+   *   3: {@code currentAccumulation} is owned by 
+   *      {@code initializeInterval} and {@code extendInternal}.
+   */
+  protected abstract void extendInternal(double newProgress, int newValue);
+
+  // What has to be done when you open a new interval
+  /**
+   * initializes the state variables to be ready for a new interval
+   */
+  protected void initializeInterval() {
+    state.currentAccumulation = 0.0D;
+  }
+
+  // called for each new reading
+  /**
+   * This method calls {@code extendInternal} at least once.  It
+   *  divides the current progress interval [from the last call's
+   *  {@code newProgress}  to this call's {@code newProgress} ]
+   *  into one or more subintervals by splitting at any point which
+   *  is an interval boundary if there are any such points.  It
+   *  then calls {@code extendInternal} for each subinterval, or the
+   *  whole interval if there are no splitting points.
+   * 
+   *  <p>For example, if the value was {@code 300} last time with
+   *  {@code 0.3}  progress, and count is {@code 5}, and you get a
+   *  new reading with the variable at {@code 700} and progress at
+   *  {@code 0.7}, you get three calls to {@code extendInternal}:
+   *  one extending from progress {@code 0.3} to {@code 0.4} [the
+   *  next boundary] with a value of {@code 400}, the next one
+   *  through {@code 0.6} with a  value of {@code 600}, and finally
+   *  one at {@code 700} with a progress of {@code 0.7} . 
+   *
+   * @param newProgress the endpoint of the progress range this new
+   *                      reading covers
+   * @param newValue the value of the reading at {@code newProgress} 
+   */    
+  protected void extend(double newProgress, int newValue) {
+    if (state == null || newProgress < state.oldProgress) {
+      return;
+    }
+
+    // This correctness of this code depends on 100% * count = count.
+    int oldIndex = (int)(state.oldProgress * count);
+    int newIndex = (int)(newProgress * count);
+    int originalOldValue = state.oldValue;
+
+    double fullValueDistance = (double)newValue - state.oldValue;
+    double fullProgressDistance = newProgress - state.oldProgress;
+    double originalOldProgress = state.oldProgress;
+
+    // In this loop we detect each subinterval boundary within the
+    //  range from the old progress to the new one.  Then we
+    //  interpolate the value from the old value to the new one to
+    //  infer what its value might have been at each such boundary.
+    //  Lastly we make the necessary calls to extendInternal to fold
+    //  in the data for each trapazoid where no such trapazoid
+    //  crosses a boundary.
+    for (int closee = oldIndex; closee < newIndex; ++closee) {
+      double interpolationProgress = (double)(closee + 1) / count;
+      // In floats, x * y / y might not equal y.
+      interpolationProgress = Math.min(interpolationProgress, newProgress);
+
+      double progressLength = (interpolationProgress - originalOldProgress);
+      double interpolationProportion = progressLength / fullProgressDistance;
+
+      double interpolationValueDistance
+        = fullValueDistance * interpolationProportion;
+
+      // estimates the value at the next [interpolated] subsegment boundary
+      int interpolationValue
+        = (int)interpolationValueDistance + originalOldValue;
+
+      extendInternal(interpolationProgress, interpolationValue);
+
+      advanceState(interpolationProgress, interpolationValue);
+
+      values[closee] = (int)state.currentAccumulation;
+      initializeInterval();
+
+    }
+
+    extendInternal(newProgress, newValue);
+    advanceState(newProgress, newValue);
+
+    if (newIndex == count) {
+      state = null;
+    }
+  }
+
+  protected void advanceState(double newProgress, int newValue) {
+    state.oldValue = newValue;
+    state.oldProgress = newProgress;
+  }    
+
+  int getCount() {
+    return count;
+  }
+
+  int get(int index) {
+    return values[index];
+  }
+}

+ 86 - 0
mapreduce/src/java/org/apache/hadoop/mapred/ProgressSplitsBlock.java

@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.List;
+
+/*
+ * This object gathers the [currently four] PeriodStatset's that we
+ * are gathering for a particular task attempt for packaging and
+ * handling as a single object.
+ */
+public class ProgressSplitsBlock {
+  final PeriodicStatsAccumulator progressWallclockTime;
+  final PeriodicStatsAccumulator progressCPUTime;
+  final PeriodicStatsAccumulator progressVirtualMemoryKbytes;
+  final PeriodicStatsAccumulator progressPhysicalMemoryKbytes;
+
+  static final int[] NULL_ARRAY = new int[0];
+
+  static final int WALLCLOCK_TIME_INDEX = 0;
+  static final int CPU_TIME_INDEX = 1;
+  static final int VIRTUAL_MEMORY_KBYTES_INDEX = 2;
+  static final int PHYSICAL_MEMORY_KBYTES_INDEX = 3;
+
+  static final int DEFAULT_NUMBER_PROGRESS_SPLITS = 12;
+
+  ProgressSplitsBlock(int numberSplits) {
+    progressWallclockTime
+      = new CumulativePeriodicStats(numberSplits);
+    progressCPUTime
+      = new CumulativePeriodicStats(numberSplits);
+    progressVirtualMemoryKbytes
+      = new StatePeriodicStats(numberSplits);
+    progressPhysicalMemoryKbytes
+      = new StatePeriodicStats(numberSplits);
+  }
+
+  // this coordinates with LoggedTaskAttempt.SplitVectorKind
+  int[][] burst() {
+    int[][] result = new int[4][];
+
+    result[WALLCLOCK_TIME_INDEX] = progressWallclockTime.getValues();
+    result[CPU_TIME_INDEX] = progressCPUTime.getValues();
+    result[VIRTUAL_MEMORY_KBYTES_INDEX] = progressVirtualMemoryKbytes.getValues();
+    result[PHYSICAL_MEMORY_KBYTES_INDEX] = progressPhysicalMemoryKbytes.getValues();
+
+    return result;
+  }
+
+  static public int[] arrayGet(int[][] burstedBlock, int index) {
+    return burstedBlock == null ? NULL_ARRAY : burstedBlock[index];
+  }
+
+  static public int[] arrayGetWallclockTime(int[][] burstedBlock) {
+    return arrayGet(burstedBlock, WALLCLOCK_TIME_INDEX);
+  }
+
+  static public int[] arrayGetCPUTime(int[][] burstedBlock) {
+    return arrayGet(burstedBlock, CPU_TIME_INDEX);
+  }
+
+  static public int[] arrayGetVMemKbytes(int[][] burstedBlock) {
+    return arrayGet(burstedBlock, VIRTUAL_MEMORY_KBYTES_INDEX);
+  }
+
+  static public int[] arrayGetPhysMemKbytes(int[][] burstedBlock) {
+    return arrayGet(burstedBlock, PHYSICAL_MEMORY_KBYTES_INDEX);
+  }
+}
+    

+ 57 - 0
mapreduce/src/java/org/apache/hadoop/mapred/StatePeriodicStats.java

@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+
+/**
+ *
+ * This class is a concrete PeriodicStatsAccumulator that deals with
+ *  measurements where the raw data are a measurement of a
+ *  time-varying quantity.  The result in each bucket is the estimate
+ *  of the progress-weighted mean value of that quantity over the
+ *  progress range covered by the bucket.
+ *
+ * <p>An easy-to-understand example of this kind of quantity would be
+ *  a temperature.  It makes sense to consider the mean temperature
+ *  over a progress range.
+ *
+ */
+class StatePeriodicStats extends PeriodicStatsAccumulator {
+  StatePeriodicStats(int count) {
+    super(count);
+  }
+
+  /**
+   *
+   * accumulates a new reading by keeping a running account of the
+   *  area under the piecewise linear curve marked by pairs of
+   *  {@code newProgress, newValue} .
+   */
+  @Override
+    protected void extendInternal(double newProgress, int newValue) {
+    if (state == null) {
+      return;
+    }
+
+    // the effective height of this trapezoid if rectangularized
+    double mean = ((double)newValue + (double)state.oldValue)/2.0D;
+
+    // conceptually mean *  (newProgress - state.oldProgress) / (1 / count)
+    state.currentAccumulation += mean * (newProgress - state.oldProgress) * count;
+  }
+}

+ 156 - 69
mapreduce/src/java/org/apache/hadoop/mapred/TaskInProgress.java

@@ -31,25 +31,32 @@ import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
 import org.apache.hadoop.mapred.JobInProgress.DataStatistics;
 import org.apache.hadoop.mapred.SortedRanges.Range;
+
+import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
 import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+
 import org.apache.hadoop.net.Node;
 
 
+
 /*************************************************************
  * TaskInProgress maintains all the info needed for a
  * Task in the lifetime of its owning Job.  A given Task
  * might be speculatively executed or reexecuted, so we
  * need a level of indirection above the running-id itself.
  * <br>
- * A given TaskInProgress contains multiple taskids,
+ * A given TaskInProgress contains multiple task attempt ids,
  * 0 or more of which might be executing at any one time.
- * (That's what allows speculative execution.)  A taskid
- * is now *never* recycled.  A TIP allocates enough taskids
+ * (That's what allows speculative execution.)  A task attempt id
+ * is now *never* recycled.  A TIP allocates enough task attempt ids
  * to account for all the speculation and failures it will
  * ever have to handle.  Once those are up, the TIP is dead.
  * **************************************************************
@@ -60,6 +67,10 @@ class TaskInProgress {
   static final long SPECULATIVE_LAG = 60 * 1000;
   private static final int NUM_ATTEMPTS_PER_RESTART = 1000;
 
+  private static final long MEMORY_SPLITS_RESOLUTION = 1024;
+
+  static final int DEFAULT_STATISTICS_INTERVALS = 12;
+
   public static final Log LOG = LogFactory.getLog(TaskInProgress.class);
 
   // Defines the TIP
@@ -91,6 +102,10 @@ class TaskInProgress {
   private volatile boolean skipping = false;
   private boolean jobCleanup = false; 
   private boolean jobSetup = false;
+
+  private static Enum CPU_COUNTER_KEY = TaskCounter.CPU_MILLISECONDS;
+  private static Enum VM_BYTES_KEY = TaskCounter.VIRTUAL_MEMORY_BYTES;
+  private static Enum PHYSICAL_BYTES_KEY = TaskCounter.PHYSICAL_MEMORY_BYTES;
    
   // The 'next' usable taskid of this tip
   int nextTaskId = 0;
@@ -109,12 +124,20 @@ class TaskInProgress {
   private JobConf conf;
   private Map<TaskAttemptID,List<String>> taskDiagnosticData =
     new TreeMap<TaskAttemptID,List<String>>();
+
   /**
-   * Map from taskId -> TaskStatus
+   * Map from task attempt Id -> TaskStatus
    */
   TreeMap<TaskAttemptID,TaskStatus> taskStatuses = 
     new TreeMap<TaskAttemptID,TaskStatus>();
 
+  
+  /**
+   * Map from task attempt Id -> splits block
+   */
+  private Map<TaskAttemptID, ProgressSplitsBlock> splitsBlocks
+    = new TreeMap<TaskAttemptID, ProgressSplitsBlock>();
+
   // Map from taskId -> TaskTracker Id, 
   // contains cleanup attempts and where they ran, if any
   private TreeMap<TaskAttemptID, String> cleanupTasks =
@@ -183,6 +206,65 @@ class TaskInProgress {
     }
     this.user = job.getUser();
   }
+
+  synchronized ProgressSplitsBlock getSplits(TaskAttemptID statusAttemptID) {
+    ProgressSplitsBlock result = splitsBlocks.get(statusAttemptID);
+
+    if (result == null) {
+      result
+        = new ProgressSplitsBlock
+            (conf.getInt(JTConfig.JT_JOBHISTORY_TASKPROGRESS_NUMBER_SPLITS,
+                         ProgressSplitsBlock.DEFAULT_NUMBER_PROGRESS_SPLITS));
+      splitsBlocks.put(statusAttemptID, result);
+    }
+
+    return result;
+  }
+
+  private void updateProgressSplits(TaskStatus taskStatus) {
+    if (!taskStatus.getIncludeCounters()) {
+      return;
+    }
+
+    double newProgress = taskStatus.getProgress();
+
+    Counters counters = taskStatus.getCounters();
+
+    TaskAttemptID statusAttemptID = taskStatus.getTaskID();
+    ProgressSplitsBlock splitsBlock = getSplits(statusAttemptID);
+
+    if (splitsBlock != null) {
+
+      long now = JobTracker.getClock().getTime();
+      Long start = getDispatchTime(statusAttemptID);
+
+      if (start != null && now - start <= Integer.MAX_VALUE) {
+        splitsBlock.progressWallclockTime.extend
+          (newProgress, (int)(now - start));
+      }
+
+      Counters.Counter cpuCounter = counters.findCounter(CPU_COUNTER_KEY);
+      if (cpuCounter != null
+          && cpuCounter.getCounter() <= Integer.MAX_VALUE) {
+        splitsBlock.progressCPUTime.extend
+          (newProgress, (int)(cpuCounter.getCounter()));
+      }
+
+      Counters.Counter virtualBytes = counters.findCounter(VM_BYTES_KEY);
+      if (virtualBytes != null) {
+        splitsBlock.progressVirtualMemoryKbytes.extend
+          (newProgress,
+           (int)(virtualBytes.getCounter() / (MEMORY_SPLITS_RESOLUTION)));
+      }
+
+      Counters.Counter physicalBytes = counters.findCounter(PHYSICAL_BYTES_KEY);
+      if (physicalBytes != null) {
+        splitsBlock.progressPhysicalMemoryKbytes.extend
+          (newProgress,
+           (int)(physicalBytes.getCounter() / (MEMORY_SPLITS_RESOLUTION)));
+      }
+    }
+  }
   
   /**
    * Set the max number of attempts before we declare a TIP as "failed"
@@ -294,6 +376,7 @@ class TaskInProgress {
     return execFinishTime;
   }
 
+
   /**
    * Set the exec finish time
    */
@@ -582,23 +665,24 @@ class TaskInProgress {
    * @return has the task changed its state noticeably?
    */
   synchronized boolean updateStatus(TaskStatus status) {
-    TaskAttemptID taskid = status.getTaskID();
-    String tracker = status.getTaskTracker();
-    String diagInfo = status.getDiagnosticInfo();
-    TaskStatus oldStatus = taskStatuses.get(taskid);
-    boolean changed = true;
-    if (diagInfo != null && diagInfo.length() > 0) {
-      LOG.info("Error from " + taskid + " on " +  tracker + ": "+ diagInfo);
-      addDiagnosticInfo(taskid, diagInfo);
-    }
+    try {
+      TaskAttemptID taskid = status.getTaskID();
+      String tracker = status.getTaskTracker();
+      String diagInfo = status.getDiagnosticInfo();
+      TaskStatus oldStatus = taskStatuses.get(taskid);
+      boolean changed = true;
+      if (diagInfo != null && diagInfo.length() > 0) {
+        LOG.info("Error from " + taskid + " on " +  tracker + ": "+ diagInfo);
+        addDiagnosticInfo(taskid, diagInfo);
+      }
     
-    if(skipping) {
-      failedRanges.updateState(status);
-    }
+      if(skipping) {
+        failedRanges.updateState(status);
+      }
     
-    if (oldStatus != null) {
-      TaskStatus.State oldState = oldStatus.getRunState();
-      TaskStatus.State newState = status.getRunState();
+      if (oldStatus != null) {
+        TaskStatus.State oldState = oldStatus.getRunState();
+        TaskStatus.State newState = status.getRunState();
           
       // We should never receive a duplicate success/failure/killed
       // status update for the same taskid! This is a safety check, 
@@ -617,60 +701,63 @@ class TaskInProgress {
         return false;
       }
 
-      // The task is not allowed to move from completed back to running.
-      // We have seen out of order status messagesmoving tasks from complete
-      // to running. This is a spot fix, but it should be addressed more
-      // globally.
-      if ((newState == TaskStatus.State.RUNNING || 
-          newState == TaskStatus.State.UNASSIGNED) &&
-          (oldState == TaskStatus.State.FAILED || 
-           oldState == TaskStatus.State.KILLED || 
-           oldState == TaskStatus.State.FAILED_UNCLEAN || 
-           oldState == TaskStatus.State.KILLED_UNCLEAN || 
-           oldState == TaskStatus.State.SUCCEEDED ||
-           oldState == TaskStatus.State.COMMIT_PENDING)) {
-        return false;
-      }
+        // The task is not allowed to move from completed back to running.
+        // We have seen out of order status messagesmoving tasks from complete
+        // to running. This is a spot fix, but it should be addressed more
+        // globally.
+        if ((newState == TaskStatus.State.RUNNING || 
+             newState == TaskStatus.State.UNASSIGNED) &&
+            (oldState == TaskStatus.State.FAILED || 
+             oldState == TaskStatus.State.KILLED || 
+             oldState == TaskStatus.State.FAILED_UNCLEAN || 
+             oldState == TaskStatus.State.KILLED_UNCLEAN || 
+             oldState == TaskStatus.State.SUCCEEDED ||
+             oldState == TaskStatus.State.COMMIT_PENDING)) {
+          return false;
+        }
       
-      //Do not accept any status once the task is marked FAILED/KILLED
-      //This is to handle the case of the JobTracker timing out a task
-      //due to launch delay, but the TT comes back with any state or 
-      //TT got expired
-      if (oldState == TaskStatus.State.FAILED ||
-          oldState == TaskStatus.State.KILLED) {
-        tasksToKill.put(taskid, true);
-        return false;	  
-      }
+        //Do not accept any status once the task is marked FAILED/KILLED
+        //This is to handle the case of the JobTracker timing out a task
+        //due to launch delay, but the TT comes back with any state or 
+        //TT got expired
+        if (oldState == TaskStatus.State.FAILED ||
+            oldState == TaskStatus.State.KILLED) {
+          tasksToKill.put(taskid, true);
+          return false;	  
+        }
           
-      changed = oldState != newState;
-    }
-    // if task is a cleanup attempt, do not replace the complete status,
-    // update only specific fields.
-    // For example, startTime should not be updated, 
-    // but finishTime has to be updated.
-    if (!isCleanupAttempt(taskid)) {
-      taskStatuses.put(taskid, status);
-      //we don't want to include setup tasks in the task execution stats
-      if (!isJobSetupTask() && ((isMapTask() && job.hasSpeculativeMaps()) || 
-          (!isMapTask() && job.hasSpeculativeReduces()))) {
-        long now = JobTracker.getClock().getTime();
-        double oldProgRate = getOldProgressRate();
-        double currProgRate = getCurrentProgressRate(now);
-        job.updateStatistics(oldProgRate, currProgRate, isMapTask());
-        //we need to store the current progress rate, so that we can
-        //update statistics accurately the next time we invoke
-        //updateStatistics
-        setProgressRate(currProgRate);
+        changed = oldState != newState;
+      }
+      // if task is a cleanup attempt, do not replace the complete status,
+      // update only specific fields.
+      // For example, startTime should not be updated, 
+      // but finishTime has to be updated.
+      if (!isCleanupAttempt(taskid)) {
+        taskStatuses.put(taskid, status);
+        //we don't want to include setup tasks in the task execution stats
+        if (!isJobSetupTask() && ((isMapTask() && job.hasSpeculativeMaps()) || 
+                                  (!isMapTask() && job.hasSpeculativeReduces()))) {
+          long now = JobTracker.getClock().getTime();
+          double oldProgRate = getOldProgressRate();
+          double currProgRate = getCurrentProgressRate(now);
+          job.updateStatistics(oldProgRate, currProgRate, isMapTask());
+          //we need to store the current progress rate, so that we can
+          //update statistics accurately the next time we invoke
+          //updateStatistics
+          setProgressRate(currProgRate);
+        }
+      } else {
+        taskStatuses.get(taskid).statusUpdate(status.getRunState(),
+                                              status.getProgress(), status.getStateString(), status.getPhase(),
+                                              status.getFinishTime());
       }
-    } else {
-      taskStatuses.get(taskid).statusUpdate(status.getRunState(),
-        status.getProgress(), status.getStateString(), status.getPhase(),
-        status.getFinishTime());
-    }
 
-    // Recompute progress
-    recomputeProgress();
-    return changed;
+      // Recompute progress
+      recomputeProgress();
+      return changed;
+    } finally {
+      updateProgressSplits(status);
+    }
   }
 
   /**

+ 60 - 0
mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/AvroArrayUtils.java

@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import java.lang.Integer;
+import java.util.Iterator;
+
+import org.apache.avro.Schema;
+
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+
+public class AvroArrayUtils {
+
+  private static final Schema ARRAY_INT
+      = Schema.createArray(Schema.create(Schema.Type.INT));
+
+  static public GenericArray<Integer> NULL_PROGRESS_SPLITS_ARRAY
+    = new GenericData.Array<Integer>(0, ARRAY_INT);
+
+  public static GenericArray<Integer>
+    toAvro(int values[]) {
+    GenericData.Array<Integer> result
+      = new GenericData.Array<Integer>(values.length, ARRAY_INT);
+
+    for (int i = 0; i < values.length; ++i) {
+      result.add(values[i]);
+    }
+
+    return result;
+  }
+
+  public static int[] fromAvro(GenericArray<Integer> avro) {
+    int[] result = new int[(int)avro.size()];
+
+    int i = 0;
+      
+    for (Iterator<Integer> iter = avro.iterator(); iter.hasNext(); ++i) {
+      result[i] = iter.next();
+    }
+
+    return result;
+  }
+}

+ 15 - 3
mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr

@@ -125,7 +125,11 @@
           {"name": "finishTime", "type": "long"},
           {"name": "hostname", "type": "string"},
           {"name": "state", "type": "string"},
-          {"name": "counters", "type": "JhCounters"}
+          {"name": "counters", "type": "JhCounters"},
+          {"name": "clockSplits", "type": { "type": "array", "items": "int"}},
+          {"name": "cpuUsages", "type": { "type": "array", "items": "int"}},
+          {"name": "vMemKbytes", "type": { "type": "array", "items": "int"}},
+          {"name": "physMemKbytes", "type": { "type": "array", "items": "int"}}
       ]
      },
 
@@ -140,7 +144,11 @@
           {"name": "finishTime", "type": "long"},
           {"name": "hostname", "type": "string"},
           {"name": "state", "type": "string"},
-          {"name": "counters", "type": "JhCounters"}
+          {"name": "counters", "type": "JhCounters"},
+          {"name": "clockSplits", "type": { "type": "array", "items": "int"}},
+          {"name": "cpuUsages", "type": { "type": "array", "items": "int"}},
+          {"name": "vMemKbytes", "type": { "type": "array", "items": "int"}},
+          {"name": "physMemKbytes", "type": { "type": "array", "items": "int"}}
       ]
      },
 
@@ -176,7 +184,11 @@
           {"name": "finishTime", "type": "long"},
           {"name": "hostname", "type": "string"},
           {"name": "status", "type": "string"},
-          {"name": "error", "type": "string"}
+          {"name": "error", "type": "string"},
+          {"name": "clockSplits", "type": { "type": "array", "items": "int"}},
+          {"name": "cpuUsages", "type": { "type": "array", "items": "int"}},
+          {"name": "vMemKbytes", "type": { "type": "array", "items": "int"}},
+          {"name": "physMemKbytes", "type": { "type": "array", "items": "int"}}
       ]
      },
 

+ 64 - 4
mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapred.ProgressSplitsBlock;
 
 import org.apache.avro.util.Utf8;
 
@@ -48,11 +49,19 @@ public class MapAttemptFinishedEvent  implements HistoryEvent {
    * @param hostname Name of the host where the map executed
    * @param state State string for the attempt
    * @param counters Counters for the attempt
+   * @param allSplits the "splits", or a pixelated graph of various
+   *        measurable worker node state variables against progress.
+   *        Currently there are four; wallclock time, CPU time,
+   *        virtual memory and physical memory. 
+   *
+   *        If you have no splits data, code {@code null} for this
+   *        parameter. 
    */
-  public MapAttemptFinishedEvent(TaskAttemptID id, 
-      TaskType taskType, String taskStatus, 
-      long mapFinishTime, long finishTime,
-      String hostname, String state, Counters counters) {
+  public MapAttemptFinishedEvent
+      (TaskAttemptID id, TaskType taskType, String taskStatus, 
+       long mapFinishTime, long finishTime, String hostname,
+       String state, Counters counters,
+       int[][] allSplits) {
     datum.taskid = new Utf8(id.getTaskID().toString());
     datum.attemptId = new Utf8(id.toString());
     datum.taskType = new Utf8(taskType.name());
@@ -62,8 +71,46 @@ public class MapAttemptFinishedEvent  implements HistoryEvent {
     datum.hostname = new Utf8(hostname);
     datum.state = new Utf8(state);
     datum.counters = EventWriter.toAvro(counters);
+
+    datum.clockSplits
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetWallclockTime(allSplits));
+    datum.cpuUsages 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetCPUTime(allSplits));
+    datum.vMemKbytes 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetVMemKbytes(allSplits));
+    datum.physMemKbytes 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits));
+  }
+
+  /** 
+   * @deprecated please use the constructor with an additional
+   *              argument, an array of splits arrays instead.  See
+   *              {@link org.apache.hadoop.mapred.ProgressSplitsBlock}
+   *              for an explanation of the meaning of that parameter.
+   *
+   * Create an event for successful completion of map attempts
+   * @param id Task Attempt ID
+   * @param taskType Type of the task
+   * @param taskStatus Status of the task
+   * @param mapFinishTime Finish time of the map phase
+   * @param finishTime Finish time of the attempt
+   * @param hostname Name of the host where the map executed
+   * @param state State string for the attempt
+   * @param counters Counters for the attempt
+   */
+  @Deprecated
+  public MapAttemptFinishedEvent
+      (TaskAttemptID id, TaskType taskType, String taskStatus, 
+       long mapFinishTime, long finishTime, String hostname,
+       String state, Counters counters) {
+    this(id, taskType, taskStatus, mapFinishTime, finishTime, hostname, state, counters, null);
   }
   
+  
   MapAttemptFinishedEvent() {}
 
   public Object getDatum() { return datum; }
@@ -97,5 +144,18 @@ public class MapAttemptFinishedEvent  implements HistoryEvent {
    public EventType getEventType() {
     return EventType.MAP_ATTEMPT_FINISHED;
   }
+
+  public int[] getClockSplits() {
+    return AvroArrayUtils.fromAvro(datum.clockSplits);
+  }
+  public int[] getCpuUsages() {
+    return AvroArrayUtils.fromAvro(datum.cpuUsages);
+  }
+  public int[] getVMemKbytes() {
+    return AvroArrayUtils.fromAvro(datum.vMemKbytes);
+  }
+  public int[] getPhysMemKbytes() {
+    return AvroArrayUtils.fromAvro(datum.physMemKbytes);
+  }
   
 }

+ 63 - 5
mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java

@@ -27,6 +27,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
 
+import org.apache.hadoop.mapred.ProgressSplitsBlock;
+
 import org.apache.avro.util.Utf8;
 
 /**
@@ -50,12 +52,16 @@ public class ReduceAttemptFinishedEvent  implements HistoryEvent {
    * @param hostname Name of the host where the attempt executed
    * @param state State of the attempt
    * @param counters Counters for the attempt
+   * @param allSplits the "splits", or a pixelated graph of various
+   *        measurable worker node state variables against progress.
+   *        Currently there are four; wallclock time, CPU time,
+   *        virtual memory and physical memory.  
    */
-  public ReduceAttemptFinishedEvent(TaskAttemptID id, 
-      TaskType taskType, String taskStatus, 
-      long shuffleFinishTime, long sortFinishTime, 
-      long finishTime,
-      String hostname, String state, Counters counters) {
+  public ReduceAttemptFinishedEvent
+    (TaskAttemptID id, TaskType taskType, String taskStatus, 
+     long shuffleFinishTime, long sortFinishTime, long finishTime,
+     String hostname, String state, Counters counters,
+     int[][] allSplits) {
     datum.taskid = new Utf8(id.getTaskID().toString());
     datum.attemptId = new Utf8(id.toString());
     datum.taskType = new Utf8(taskType.name());
@@ -66,6 +72,45 @@ public class ReduceAttemptFinishedEvent  implements HistoryEvent {
     datum.hostname = new Utf8(hostname);
     datum.state = new Utf8(state);
     datum.counters = EventWriter.toAvro(counters);
+
+    datum.clockSplits 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetWallclockTime(allSplits));
+    datum.cpuUsages 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetCPUTime(allSplits));
+    datum.vMemKbytes 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetVMemKbytes(allSplits));
+    datum.physMemKbytes 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits));
+  }
+
+  /**
+   * @deprecated please use the constructor with an additional
+   *              argument, an array of splits arrays instead.  See
+   *              {@link org.apache.hadoop.mapred.ProgressSplitsBlock}
+   *              for an explanation of the meaning of that parameter.
+   *
+   * Create an event to record completion of a reduce attempt
+   * @param id Attempt Id
+   * @param taskType Type of task
+   * @param taskStatus Status of the task
+   * @param shuffleFinishTime Finish time of the shuffle phase
+   * @param sortFinishTime Finish time of the sort phase
+   * @param finishTime Finish time of the attempt
+   * @param hostname Name of the host where the attempt executed
+   * @param state State of the attempt
+   * @param counters Counters for the attempt
+   */
+  public ReduceAttemptFinishedEvent
+    (TaskAttemptID id, TaskType taskType, String taskStatus, 
+     long shuffleFinishTime, long sortFinishTime, long finishTime,
+     String hostname, String state, Counters counters) {
+    this(id, taskType, taskStatus,
+         shuffleFinishTime, sortFinishTime, finishTime,
+         hostname, state, counters, null);
   }
 
   ReduceAttemptFinishedEvent() {}
@@ -105,4 +150,17 @@ public class ReduceAttemptFinishedEvent  implements HistoryEvent {
   }
 
 
+  public int[] getClockSplits() {
+    return AvroArrayUtils.fromAvro(datum.clockSplits);
+  }
+  public int[] getCpuUsages() {
+    return AvroArrayUtils.fromAvro(datum.cpuUsages);
+  }
+  public int[] getVMemKbytes() {
+    return AvroArrayUtils.fromAvro(datum.vMemKbytes);
+  }
+  public int[] getPhysMemKbytes() {
+    return AvroArrayUtils.fromAvro(datum.physMemKbytes);
+  }
+
 }

+ 61 - 4
mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java

@@ -27,6 +27,9 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
 
+import org.apache.hadoop.mapred.ProgressSplitsBlock;
+import org.apache.hadoop.mapred.TaskStatus;
+
 import org.apache.avro.util.Utf8;
 
 /**
@@ -47,11 +50,16 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
    * @param finishTime Finish time of the attempt
    * @param hostname Name of the host where the attempt executed
    * @param error Error string
+   * @param allSplits the "splits", or a pixelated graph of various
+   *        measurable worker node state variables against progress.
+   *        Currently there are four; wallclock time, CPU time,
+   *        virtual memory and physical memory.  
    */
-  public TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID id, 
-      TaskType taskType,
-      String status, long finishTime, 
-      String hostname, String error) {
+  public TaskAttemptUnsuccessfulCompletionEvent
+       (TaskAttemptID id, TaskType taskType,
+        String status, long finishTime, 
+        String hostname, String error,
+        int[][] allSplits) {
     datum.taskid = new Utf8(id.getTaskID().toString());
     datum.taskType = new Utf8(taskType.name());
     datum.attemptId = new Utf8(id.toString());
@@ -59,6 +67,40 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
     datum.hostname = new Utf8(hostname);
     datum.error = new Utf8(error);
     datum.status = new Utf8(status);
+
+    datum.clockSplits 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetWallclockTime(allSplits));
+    datum.cpuUsages 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetCPUTime(allSplits));
+    datum.vMemKbytes 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetVMemKbytes(allSplits));
+    datum.physMemKbytes 
+      = AvroArrayUtils.toAvro
+           (ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits));
+  }
+
+  /** 
+   * @deprecated please use the constructor with an additional
+   *              argument, an array of splits arrays instead.  See
+   *              {@link org.apache.hadoop.mapred.ProgressSplitsBlock}
+   *              for an explanation of the meaning of that parameter.
+   *
+   * Create an event to record the unsuccessful completion of attempts
+   * @param id Attempt ID
+   * @param taskType Type of the task
+   * @param status Status of the attempt
+   * @param finishTime Finish time of the attempt
+   * @param hostname Name of the host where the attempt executed
+   * @param error Error string
+   */
+  public TaskAttemptUnsuccessfulCompletionEvent
+       (TaskAttemptID id, TaskType taskType,
+        String status, long finishTime, 
+        String hostname, String error) {
+    this(id, taskType, status, finishTime, hostname, error, null);
   }
 
   TaskAttemptUnsuccessfulCompletionEvent() {}
@@ -101,4 +143,19 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
               : EventType.REDUCE_ATTEMPT_KILLED);
   }
 
+
+
+  public int[] getClockSplits() {
+    return AvroArrayUtils.fromAvro(datum.clockSplits);
+  }
+  public int[] getCpuUsages() {
+    return AvroArrayUtils.fromAvro(datum.cpuUsages);
+  }
+  public int[] getVMemKbytes() {
+    return AvroArrayUtils.fromAvro(datum.vMemKbytes);
+  }
+  public int[] getPhysMemKbytes() {
+    return AvroArrayUtils.fromAvro(datum.physMemKbytes);
+  }
+
 }

+ 3 - 0
mapreduce/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java

@@ -89,6 +89,9 @@ public interface JTConfig extends MRConfig {
     "mapreduce.jobtracker.jobhistory.completed.location";
   public static final String JT_JOBHISTORY_LOCATION = 
     "mapreduce.jobtracker.jobhistory.location";
+  // number of partial task progress reports we retain in job history
+  public static final String JT_JOBHISTORY_TASKPROGRESS_NUMBER_SPLITS =
+    "mapreduce.jobtracker.jobhistory.task.numberprogresssplits";
   public static final String JT_AVG_BLACKLIST_THRESHOLD = 
     "mapreduce.jobtracker.blacklist.average.threshold";
   public static final String JT_SYSTEM_DIR = "mapreduce.jobtracker.system.dir";

+ 71 - 0
mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplits.java

@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestTaskPerformanceSplits {
+  @Test
+  public void testPeriodStatsets() {
+    PeriodicStatsAccumulator cumulative = new CumulativePeriodicStats(8);
+    PeriodicStatsAccumulator status = new StatePeriodicStats(8);
+
+    cumulative.extend(0.0D, 0);
+    cumulative.extend(0.4375D, 700); // 200 per octant
+    cumulative.extend(0.5625D, 1100); // 0.5 = 900
+    cumulative.extend(0.625D, 1300);
+    cumulative.extend(1.0D, 7901);
+
+    int total = 0;
+    int[] results = cumulative.getValues();
+
+    for (int i = 0; i < 8; ++i) {
+      System.err.println("segment i = " + results[i]);
+    }
+
+    assertEquals("Bad interpolation in cumulative segment 0", 200, results[0]);
+    assertEquals("Bad interpolation in cumulative segment 1", 200, results[1]);
+    assertEquals("Bad interpolation in cumulative segment 2", 200, results[2]);
+    assertEquals("Bad interpolation in cumulative segment 3", 300, results[3]);
+    assertEquals("Bad interpolation in cumulative segment 4", 400, results[4]);
+    assertEquals("Bad interpolation in cumulative segment 5", 2200, results[5]);
+    // these are rounded down
+    assertEquals("Bad interpolation in cumulative segment 6", 2200, results[6]);
+    assertEquals("Bad interpolation in cumulative segment 7", 2201, results[7]);
+
+    status.extend(0.0D, 0);
+    status.extend(1.0D/16.0D, 300); // + 75 for bucket 0
+    status.extend(3.0D/16.0D, 700); // + 200 for 0, +300 for 1
+    status.extend(7.0D/16.0D, 2300); // + 450 for 1, + 1500 for 2, + 1050 for 3
+    status.extend(1.0D, 1400);  // +1125 for 3, +2100 for 4, +1900 for 5,
+    ;                           // +1700 for 6, +1500 for 7
+
+    results = status.getValues();
+
+    assertEquals("Bad interpolation in status segment 0", 275, results[0]);
+    assertEquals("Bad interpolation in status segment 1", 750, results[1]);
+    assertEquals("Bad interpolation in status segment 2", 1500, results[2]);
+    assertEquals("Bad interpolation in status segment 3", 2175, results[3]);
+    assertEquals("Bad interpolation in status segment 4", 2100, results[4]);
+    assertEquals("Bad interpolation in status segment 5", 1900, results[5]);
+    assertEquals("Bad interpolation in status segment 6", 1700, results[6]);
+    assertEquals("Bad interpolation in status segment 7", 1500, results[7]);
+  }
+}

+ 14 - 1
mapreduce/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java

@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.mapreduce.jobhistory;
 
+import java.util.List;
+import java.util.ArrayList;
+
 import org.apache.hadoop.mapred.TaskStatus;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -28,6 +31,15 @@ import junit.framework.TestCase;
  * Test various jobhistory events
  */
 public class TestJobHistoryEvents extends TestCase {
+  static final int[][] NULL_SPLITS_ARRAY
+    = new int[org.apache.hadoop.tools.rumen.LoggedTaskAttempt.SplitVectorKind.values().length][];
+
+  static {
+    for (int i = 0; i < NULL_SPLITS_ARRAY.length; ++i) {
+      NULL_SPLITS_ARRAY[i] = new int[0];
+    }
+  }
+ 
   /**
    * Test {@link TaskAttemptStartedEvent} for various task types.
    */
@@ -73,7 +85,8 @@ public class TestJobHistoryEvents extends TestCase {
                                                      String state) {
     for (TaskType t : types) {
       TaskAttemptUnsuccessfulCompletionEvent tauce = 
-        new TaskAttemptUnsuccessfulCompletionEvent(id, t, state, 0L, "", "");
+        new TaskAttemptUnsuccessfulCompletionEvent
+           (id, t, state, 0L, "", "", NULL_SPLITS_ARRAY);
       assertEquals(expected, tauce.getEventType());
     }
   }

+ 34 - 10
mapreduce/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java

@@ -852,6 +852,30 @@ public class TestRumenJobTraces {
   public void testTopologyBuilder() throws Exception {
     final TopologyBuilder subject = new TopologyBuilder();
 
+    // This 4 comes from 
+    //   TaskInProgress.ProgressibleSplitsBlock.burst().size , which 
+    //   is invisible here.
+
+    int[][] splits = new int[4][];
+
+    splits[0] = new int[12];
+    splits[1] = new int[12];
+    splits[2] = new int[12];
+    splits[3] = new int[12];
+
+    for (int j = 0; j < 4; ++j) {
+      for (int i = 0; i < 12; ++i) {
+        splits[j][i] = -1;
+      }
+    }
+
+    for (int i = 0; i < 6; ++i) {
+      splits[0][i] = 500000 * i;
+      splits[1][i] = 300000 * i;
+      splits[2][i] = 500000;
+      splits[3][i] = 700000;
+    }
+
     // currently we extract no host names from the Properties
     subject.process(new Properties());
 
@@ -860,16 +884,16 @@ public class TestRumenJobTraces {
         .valueOf("MAP"), "STATUS", 1234567890L,
         "/194\\.6\\.134\\.64/cluster50261\\.secondleveldomain\\.com",
         "SUCCESS", null));
-    subject.process(new TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID
-        .forName("attempt_200904211745_0003_m_000004_1"), TaskType
-        .valueOf("MAP"), "STATUS", 1234567890L,
-        "/194\\.6\\.134\\.80/cluster50262\\.secondleveldomain\\.com",
-        "MACHINE_EXPLODED"));
-    subject.process(new TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID
-        .forName("attempt_200904211745_0003_m_000004_2"), TaskType
-        .valueOf("MAP"), "STATUS", 1234567890L,
-        "/194\\.6\\.134\\.80/cluster50263\\.secondleveldomain\\.com",
-        "MACHINE_EXPLODED"));
+    subject.process(new TaskAttemptUnsuccessfulCompletionEvent
+                    (TaskAttemptID.forName("attempt_200904211745_0003_m_000004_1"),
+                     TaskType.valueOf("MAP"), "STATUS", 1234567890L,
+                     "/194\\.6\\.134\\.80/cluster50262\\.secondleveldomain\\.com",
+                     "MACHINE_EXPLODED", splits));
+    subject.process(new TaskAttemptUnsuccessfulCompletionEvent
+                    (TaskAttemptID.forName("attempt_200904211745_0003_m_000004_2"),
+                     TaskType.valueOf("MAP"), "STATUS", 1234567890L,
+                     "/194\\.6\\.134\\.80/cluster50263\\.secondleveldomain\\.com",
+                     "MACHINE_EXPLODED", splits));
     subject.process(new TaskStartedEvent(TaskID
         .forName("task_200904211745_0003_m_000004"), 1234567890L, TaskType
         .valueOf("MAP"),

+ 14 - 1
mapreduce/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java

@@ -476,6 +476,11 @@ public class JobBuilder {
     }
 
     attempt.setFinishTime(event.getFinishTime());
+
+    attempt.arraySetClockSplits(event.getClockSplits());
+    attempt.arraySetCpuUsages(event.getCpuUsages());
+    attempt.arraySetVMemKbytes(event.getVMemKbytes());
+    attempt.arraySetPhysMemKbytes(event.getPhysMemKbytes());
   }
 
   private void processTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
@@ -521,6 +526,10 @@ public class JobBuilder {
     attempt.setSortFinished(event.getSortFinishTime());
     attempt
         .incorporateCounters(((ReduceAttemptFinished) event.getDatum()).counters);
+    attempt.arraySetClockSplits(event.getClockSplits());
+    attempt.arraySetCpuUsages(event.getCpuUsages());
+    attempt.arraySetVMemKbytes(event.getVMemKbytes());
+    attempt.arraySetPhysMemKbytes(event.getPhysMemKbytes());
   }
 
   private void processMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
@@ -537,7 +546,11 @@ public class JobBuilder {
     // is redundant, but making this will add future-proofing.
     attempt.setFinishTime(event.getFinishTime());
     attempt
-        .incorporateCounters(((MapAttemptFinished) event.getDatum()).counters);
+      .incorporateCounters(((MapAttemptFinished) event.getDatum()).counters);
+    attempt.arraySetClockSplits(event.getClockSplits());
+    attempt.arraySetCpuUsages(event.getCpuUsages());
+    attempt.arraySetVMemKbytes(event.getVMemKbytes());
+    attempt.arraySetPhysMemKbytes(event.getPhysMemKbytes());
   }
 
   private void processJobUnsuccessfulCompletionEvent(

+ 207 - 0
mapreduce/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.tools.rumen;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 
@@ -71,10 +73,118 @@ public class LoggedTaskAttempt implements DeepCompare {
   // Initialize to default object for backward compatibility
   ResourceUsageMetrics metrics = new ResourceUsageMetrics();
   
+  List<Integer> clockSplits = new ArrayList<Integer>();
+  List<Integer> cpuUsages = new ArrayList<Integer>();
+  List<Integer> vMemKbytes = new ArrayList<Integer>();
+  List<Integer> physMemKbytes = new ArrayList<Integer>();
+
   LoggedTaskAttempt() {
     super();
   }
 
+  // carries the kinds of splits vectors a LoggedTaskAttempt holds.
+  //
+  // Each enumeral has the following methods:
+  //   get(LoggedTaskAttempt attempt)
+  //    returns a List<Integer> with the corresponding value field
+  //   set(LoggedTaskAttempt attempt, List<Integer> newValue)
+  //    sets the value
+  // There is also a pair of methods get(List<List<Integer>>) and
+  //  set(List<List<Integer>>, List<Integer>) which correspondingly
+  //  delivers or sets the appropriate element of the
+  //  List<List<Integer>> .
+  // This makes it easier to add another kind in the future.
+  public enum SplitVectorKind {
+
+    WALLCLOCK_TIME {
+      @Override
+      public List<Integer> get(LoggedTaskAttempt attempt) {
+        return attempt.getClockSplits();
+      }
+      @Override
+      public void set(LoggedTaskAttempt attempt, List<Integer> newValue) {
+        attempt.setClockSplits(newValue);
+      }
+    },
+
+    CPU_USAGE {
+      @Override
+      public List<Integer> get(LoggedTaskAttempt attempt) {
+        return attempt.getCpuUsages();
+      }
+      @Override
+      public void set(LoggedTaskAttempt attempt, List<Integer> newValue) {
+        attempt.setCpuUsages(newValue);
+      }
+    },
+
+    VIRTUAL_MEMORY_KBYTES {
+      @Override
+      public List<Integer> get(LoggedTaskAttempt attempt) {
+        return attempt.getVMemKbytes();
+      }
+      @Override
+      public void set(LoggedTaskAttempt attempt, List<Integer> newValue) {
+        attempt.setVMemKbytes(newValue);
+      }
+    },
+
+    PHYSICAL_MEMORY_KBYTES {
+      @Override
+      public List<Integer> get(LoggedTaskAttempt attempt) {
+        return attempt.getPhysMemKbytes();
+      }
+      @Override
+      public void set(LoggedTaskAttempt attempt, List<Integer> newValue) {
+        attempt.setPhysMemKbytes(newValue);
+      }
+    };
+
+    static private final List<List<Integer>> NULL_SPLITS_VECTOR
+      = new ArrayList<List<Integer>>();
+
+    static {
+      for (SplitVectorKind kind : SplitVectorKind.values() ) {
+        NULL_SPLITS_VECTOR.add(new ArrayList<Integer>());
+      }
+    }
+
+    abstract public List<Integer> get(LoggedTaskAttempt attempt);
+
+    abstract public void set(LoggedTaskAttempt attempt, List<Integer> newValue);
+
+    public List<Integer> get(List<List<Integer>> listSplits) {
+      return listSplits.get(this.ordinal());
+    }
+
+    public void set(List<List<Integer>> listSplits, List<Integer> newValue) {
+      listSplits.set(this.ordinal(), newValue);
+    }
+
+    static public List<List<Integer>> getNullSplitsVector() {
+      return NULL_SPLITS_VECTOR;
+    }
+  }
+
+  /**
+   *
+   * @returns a list of all splits vectors, ordered in enumeral order
+   *           within {@link SplitVectorKind} .  Do NOT use hard-coded
+   *           indices within the return for this with a hard-coded
+   *           index to get individual values; use
+   *           {@code SplitVectorKind.get(LoggedTaskAttempt)} instead.
+   */
+  public List<List<Integer>> allSplitVectors() {
+    List<List<Integer>> result
+      = new ArrayList<List<Integer>>(SplitVectorKind.values().length);
+
+    for (SplitVectorKind kind : SplitVectorKind.values() ) {
+      result.add(kind.get(this));
+    }
+
+    return result;
+  }
+
   static private Set<String> alreadySeenAnySetterAttributes =
       new TreeSet<String>();
 
@@ -89,6 +199,78 @@ public class LoggedTaskAttempt implements DeepCompare {
     }
   }
 
+  public List<Integer> getClockSplits() {
+    return clockSplits;
+  }
+
+  void setClockSplits(List<Integer> clockSplits) {
+    this.clockSplits = clockSplits;
+  }
+
+  void arraySetClockSplits(int[] clockSplits) {
+    List<Integer> result = new ArrayList<Integer>();
+
+    for (int i = 0; i < clockSplits.length; ++i) {
+      result.add(clockSplits[i]);
+    }
+                 
+    this.clockSplits = result;
+  }
+
+  public List<Integer> getCpuUsages() {
+    return cpuUsages;
+  }
+
+  void setCpuUsages(List<Integer> cpuUsages) {
+    this.cpuUsages = cpuUsages;
+  }
+
+  void arraySetCpuUsages(int[] cpuUsages) {
+    List<Integer> result = new ArrayList<Integer>();
+
+    for (int i = 0; i < cpuUsages.length; ++i) {
+      result.add(cpuUsages[i]);
+    }
+                 
+    this.cpuUsages = result;
+  }
+
+  public List<Integer> getVMemKbytes() {
+    return vMemKbytes;
+  }
+
+  void setVMemKbytes(List<Integer> vMemKbytes) {
+    this.vMemKbytes = vMemKbytes;
+  }
+
+  void arraySetVMemKbytes(int[] vMemKbytes) {
+    List<Integer> result = new ArrayList<Integer>();
+
+    for (int i = 0; i < vMemKbytes.length; ++i) {
+      result.add(vMemKbytes[i]);
+    }
+                 
+    this.vMemKbytes = result;
+  }
+
+  public List<Integer> getPhysMemKbytes() {
+    return physMemKbytes;
+  }
+
+  void setPhysMemKbytes(List<Integer> physMemKbytes) {
+    this.physMemKbytes = physMemKbytes;
+  }
+
+  void arraySetPhysMemKbytes(int[] physMemKbytes) {
+    List<Integer> result = new ArrayList<Integer>();
+
+    for (int i = 0; i < physMemKbytes.length; ++i) {
+      result.add(physMemKbytes[i]);
+    }
+                 
+    this.physMemKbytes = result;
+  }
+
   void adjustTimes(long adjustment) {
     startTime += adjustment;
     finishTime += adjustment;
@@ -480,6 +662,26 @@ public class LoggedTaskAttempt implements DeepCompare {
     c1.deepCompare(c2, recurse);
   }
 
+  private void compare1(List<Integer> c1, List<Integer> c2, TreePath loc,
+                        String eltname)
+        throws DeepInequalityException {
+    if (c1 == null && c2 == null) {
+      return;
+    }
+
+    if (c1 == null || c2 == null || c1.size() != c2.size()) {
+      throw new DeepInequalityException
+              (eltname + " miscompared", new TreePath(loc, eltname));
+    }
+
+    for (int i = 0; i < c1.size(); ++i) {
+      if (!c1.get(i).equals(c2.get(i))) {
+        throw new DeepInequalityException("" + c1.get(i) + " != " + c2.get(i),
+                                          new TreePath(loc, eltname, i));
+      }
+    }
+  }    
+
   public void deepCompare(DeepCompare comparand, TreePath loc)
       throws DeepInequalityException {
     if (!(comparand instanceof LoggedTaskAttempt)) {
@@ -518,5 +720,10 @@ public class LoggedTaskAttempt implements DeepCompare {
     compare1(sortFinished, other.sortFinished, loc, "sortFinished");
 
     compare1(location, other.location, loc, "location");
+
+    compare1(clockSplits, other.clockSplits, loc, "clockSplits");
+    compare1(cpuUsages, other.cpuUsages, loc, "cpuUsages");
+    compare1(vMemKbytes, other.vMemKbytes, loc, "vMemKbytes");
+    compare1(physMemKbytes, other.physMemKbytes, loc, "physMemKbytes");
   }
 }

+ 7 - 5
mapreduce/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java

@@ -68,10 +68,13 @@ public class MapAttempt20LineHistoryEventEmitter extends
             (MapAttempt20LineHistoryEventEmitter) thatg;
 
         if (finishTime != null && "success".equalsIgnoreCase(status)) {
-          return new MapAttemptFinishedEvent(taskAttemptID,
-              that.originalTaskType, status, Long.parseLong(finishTime), Long
-                  .parseLong(finishTime), hostName, state,
-              maybeParseCounters(counters));
+          return new MapAttemptFinishedEvent
+            (taskAttemptID,
+              that.originalTaskType, status,
+             Long.parseLong(finishTime),
+             Long.parseLong(finishTime),
+             hostName, state, maybeParseCounters(counters),
+             null);
         }
       }
 
@@ -88,5 +91,4 @@ public class MapAttempt20LineHistoryEventEmitter extends
   List<SingleEventEmitter> nonFinalSEEs() {
     return nonFinals;
   }
-
 }

+ 26 - 2
mapreduce/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java

@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.tools.rumen;
 
+import java.util.List;
+
 import org.apache.hadoop.mapred.TaskStatus.State;
 
 /**
@@ -26,11 +28,33 @@ import org.apache.hadoop.mapred.TaskStatus.State;
 public class MapTaskAttemptInfo extends TaskAttemptInfo {
   private long runtime;
 
-  public MapTaskAttemptInfo(State state, TaskInfo taskInfo, long runtime) {
-    super(state, taskInfo);
+  public MapTaskAttemptInfo(State state, TaskInfo taskInfo,
+                            long runtime, List<List<Integer>> allSplits) {
+    super(state, taskInfo,
+          allSplits == null
+            ? LoggedTaskAttempt.SplitVectorKind.getNullSplitsVector()
+           : allSplits);
     this.runtime = runtime;
   }
 
+  /**
+   *
+   * @deprecated please use the constructor with 
+   *               {@code (state, taskInfo, runtime,
+   *                  List<List<Integer>> allSplits)}
+   *             instead.  
+   *
+   * see {@link LoggedTaskAttempt} for an explanation of
+   *        {@code allSplits}.
+   *
+   * If there are no known splits, use {@code null}.
+   */
+  @Deprecated
+  public MapTaskAttemptInfo(State state, TaskInfo taskInfo,
+                            long runtime) {
+    this(state, taskInfo, runtime, null);
+  }
+
   @Override
   public long getRuntime() {
     return getMapRuntime();

+ 11 - 6
mapreduce/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java

@@ -28,8 +28,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
 
-public class ReduceAttempt20LineHistoryEventEmitter extends
-    TaskAttempt20LineEventEmitter {
+public class ReduceAttempt20LineHistoryEventEmitter
+     extends TaskAttempt20LineEventEmitter {
 
   static List<SingleEventEmitter> nonFinals =
       new LinkedList<SingleEventEmitter>();
@@ -71,10 +71,15 @@ public class ReduceAttempt20LineHistoryEventEmitter extends
           ReduceAttempt20LineHistoryEventEmitter that =
               (ReduceAttempt20LineHistoryEventEmitter) thatg;
 
-          return new ReduceAttemptFinishedEvent(taskAttemptID,
-              that.originalTaskType, status, Long.parseLong(shuffleFinish),
-              Long.parseLong(sortFinish), Long.parseLong(finishTime), hostName,
-              state, maybeParseCounters(counters));
+          return new ReduceAttemptFinishedEvent
+            (taskAttemptID,
+             that.originalTaskType, status,
+             Long.parseLong(shuffleFinish),
+             Long.parseLong(sortFinish),
+             Long.parseLong(finishTime),
+             hostName,
+             state, maybeParseCounters(counters),
+             null);
         }
       }
 

+ 26 - 3
mapreduce/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java

@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.tools.rumen;
 
+import java.util.List;
+
 import org.apache.hadoop.mapred.TaskStatus.State;
 
 /**
@@ -29,13 +31,35 @@ public class ReduceTaskAttemptInfo extends TaskAttemptInfo {
   private long reduceTime;
 
   public ReduceTaskAttemptInfo(State state, TaskInfo taskInfo, long shuffleTime,
-      long mergeTime, long reduceTime) {
-    super(state, taskInfo);
+      long mergeTime, long reduceTime, List<List<Integer>> allSplits) {
+    super(state, taskInfo,
+          allSplits == null
+            ? LoggedTaskAttempt.SplitVectorKind.getNullSplitsVector()
+           : allSplits);
     this.shuffleTime = shuffleTime;
     this.mergeTime = mergeTime;
     this.reduceTime = reduceTime;
   }
 
+
+  /**
+   *
+   * @deprecated please use the constructor with 
+   *               {@code (state, taskInfo, shuffleTime, mergeTime, reduceTime
+   *                  List<List<Integer>> allSplits)}
+   *             instead.  
+   *
+   * see {@link LoggedTaskAttempt} for an explanation of
+   *        {@code allSplits}.
+   *
+   * If there are no known splits, use {@code null}.
+   */
+  @Deprecated
+  public ReduceTaskAttemptInfo(State state, TaskInfo taskInfo, long shuffleTime,
+      long mergeTime, long reduceTime) {
+    this(state, taskInfo, shuffleTime, mergeTime, reduceTime, null);
+  }
+
   /**
    * Get the runtime for the <b>reduce</b> phase of the reduce task-attempt.
    * 
@@ -67,5 +91,4 @@ public class ReduceTaskAttemptInfo extends TaskAttemptInfo {
   public long getRuntime() {
     return (getShuffleRuntime() + getMergeRuntime() + getReduceRuntime());
   }
-
 }

+ 4 - 3
mapreduce/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java

@@ -138,9 +138,10 @@ public abstract class TaskAttempt20LineEventEmitter extends HistoryEventEmitter
         TaskAttempt20LineEventEmitter that =
             (TaskAttempt20LineEventEmitter) thatg;
 
-        return new TaskAttemptUnsuccessfulCompletionEvent(taskAttemptID,
-            that.originalTaskType, status, Long.parseLong(finishTime),
-            hostName, error);
+        return new TaskAttemptUnsuccessfulCompletionEvent
+          (taskAttemptID,
+           that.originalTaskType, status, Long.parseLong(finishTime),
+           hostName, error, null);
       }
 
       return null;

+ 16 - 1
mapreduce/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java

@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.tools.rumen;
 
+import java.util.List;
+
 import org.apache.hadoop.mapred.TaskStatus.State;
 
 /**
@@ -27,13 +29,22 @@ public abstract class TaskAttemptInfo {
   protected final State state;
   protected final TaskInfo taskInfo;
 
-  protected TaskAttemptInfo(State state, TaskInfo taskInfo) {
+  protected final List<List<Integer>> allSplits;
+
+  protected TaskAttemptInfo
+       (State state, TaskInfo taskInfo, List<List<Integer>> allSplits) {
     if (state == State.SUCCEEDED || state == State.FAILED) {
       this.state = state;
     } else {
       throw new IllegalArgumentException("status cannot be " + state);
     }
     this.taskInfo = taskInfo;
+    this.allSplits = allSplits;
+  }
+
+  protected TaskAttemptInfo
+       (State state, TaskInfo taskInfo) {
+    this(state, taskInfo, LoggedTaskAttempt.SplitVectorKind.getNullSplitsVector());
   }
 
   /**
@@ -60,4 +71,8 @@ public abstract class TaskAttemptInfo {
   public TaskInfo getTaskInfo() {
     return taskInfo;
   }
+      
+  public List<Integer> getSplitVector(LoggedTaskAttempt.SplitVectorKind kind) {
+    return kind.get(allSplits);
+  }
 }

+ 13 - 7
mapreduce/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java

@@ -537,7 +537,8 @@ public class ZombieJob implements JobStory {
       }
       taskTime = sanitizeTaskRuntime(taskTime, loggedAttempt.getAttemptID());
       taskTime *= scaleFactor;
-      return new MapTaskAttemptInfo(state, taskInfo, taskTime);
+      return new MapTaskAttemptInfo
+        (state, taskInfo, taskTime, loggedAttempt.allSplitVectors());
     } else {
       throw new IllegalArgumentException("taskType can only be MAP: "
           + loggedTask.getTaskType());
@@ -584,6 +585,9 @@ public class ZombieJob implements JobStory {
   private TaskAttemptInfo getTaskAttemptInfo(LoggedTask loggedTask,
       LoggedTaskAttempt loggedAttempt) {
     TaskInfo taskInfo = getTaskInfo(loggedTask);
+    
+    List<List<Integer>> allSplitVectors = loggedAttempt.allSplitVectors();
+
     State state = convertState(loggedAttempt.getResult());
     if (loggedTask.getTaskType() == Values.MAP) {
       long taskTime;
@@ -594,7 +598,7 @@ public class ZombieJob implements JobStory {
         taskTime = loggedAttempt.getFinishTime() - loggedAttempt.getStartTime();
       }
       taskTime = sanitizeTaskRuntime(taskTime, loggedAttempt.getAttemptID());
-      return new MapTaskAttemptInfo(state, taskInfo, taskTime);
+      return new MapTaskAttemptInfo(state, taskInfo, taskTime, allSplitVectors);
     } else if (loggedTask.getTaskType() == Values.REDUCE) {
       long startTime = loggedAttempt.getStartTime();
       long mergeDone = loggedAttempt.getSortFinished();
@@ -605,7 +609,8 @@ public class ZombieJob implements JobStory {
         // haven't seen reduce task with startTime=0 ever. But if this happens,
         // make up a reduceTime with no shuffle/merge.
         long reduceTime = makeUpReduceRuntime(state);
-        return new ReduceTaskAttemptInfo(state, taskInfo, 0, 0, reduceTime);
+        return new ReduceTaskAttemptInfo
+          (state, taskInfo, 0, 0, reduceTime, allSplitVectors);
       } else {
         if (shuffleDone <= 0) {
           shuffleDone = startTime;
@@ -619,7 +624,7 @@ public class ZombieJob implements JobStory {
         reduceTime = sanitizeTaskRuntime(reduceTime, loggedAttempt.getAttemptID());
         
         return new ReduceTaskAttemptInfo(state, taskInfo, shuffleTime,
-            mergeTime, reduceTime);
+            mergeTime, reduceTime, allSplitVectors);
       }
     } else {
       throw new IllegalArgumentException("taskType for "
@@ -700,7 +705,8 @@ public class ZombieJob implements JobStory {
       runtime = makeUpMapRuntime(state, locality);
       runtime = sanitizeTaskRuntime(runtime, makeTaskAttemptID(taskType,
           taskNumber, taskAttemptNumber).toString());
-      TaskAttemptInfo tai = new MapTaskAttemptInfo(state, taskInfo, runtime);
+      TaskAttemptInfo tai
+        = new MapTaskAttemptInfo(state, taskInfo, runtime, null);
       return tai;
     } else if (taskType == TaskType.REDUCE) {
       State state = State.SUCCEEDED;
@@ -711,8 +717,8 @@ public class ZombieJob implements JobStory {
       // TODO make up state
       // state = makeUpState(taskAttemptNumber, job.getReducerTriesToSucceed());
       reduceTime = makeUpReduceRuntime(state);
-      TaskAttemptInfo tai = new ReduceTaskAttemptInfo(state, taskInfo,
-          shuffleTime, sortTime, reduceTime);
+      TaskAttemptInfo tai = new ReduceTaskAttemptInfo
+        (state, taskInfo, shuffleTime, sortTime, reduceTime, null);
       return tai;
     }