Ver código fonte

MAPREDUCE-2611. Fix counters, finish times etc in job history. (Siddharth Seth via llu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-279@1138166 13f79535-47bb-0310-9956-ffa450edef68
Luke Lu 14 anos atrás
pai
commit
d3d1c584a3
20 arquivos alterados com 279 adições e 31 exclusões
  1. 3 0
      mapreduce/CHANGES.txt
  2. 2 3
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
  3. 21 3
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
  4. 15 0
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/WrappedPeriodicStatsAccumulator.java
  5. 52 0
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/WrappedProgressSplitsBlock.java
  6. 1 0
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
  7. 1 0
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AMConstants.java
  8. 5 0
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java
  9. 29 2
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
  10. 93 9
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
  11. 1 1
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
  12. 1 1
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
  13. 1 0
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
  14. 1 0
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
  15. 14 2
      mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
  16. 6 0
      mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHConfig.java
  17. 19 0
      mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
  18. 1 1
      mapreduce/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
  19. 9 9
      mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java
  20. 4 0
      mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java

+ 3 - 0
mapreduce/CHANGES.txt

@@ -5,6 +5,9 @@ Trunk (unreleased changes)
 
 
     MAPREDUCE-279
     MAPREDUCE-279
 
 
+    MAPREDUCE-2611. Fix counters, finish times etc. in job history.
+    (Siddharth Seth via llu)
+
     Fixing scheduling deadlock in AM because of incorrect headRoom values from
     Fixing scheduling deadlock in AM because of incorrect headRoom values from
     RM. The bug happens when AM releases containers and RM decrements current
     RM. The bug happens when AM releases containers and RM decrements current
     memory usage twice for all those containers. (vinodkv)
     memory usage twice for all those containers. (vinodkv)

+ 2 - 3
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java

@@ -126,9 +126,8 @@ public class MapReduceChildJVM {
       }
       }
     }
     }
 
 
-    // TODO: Put a random pid in env for now.
-    // Long term we will need to get it from the Child
-    env.put("JVM_PID", "12344");
+    //This should not be set here (If an OS check is requied. moved to ContainerLuanch)
+    // env.put("JVM_PID", "`echo $$`");
 
 
     env.put(Constants.STDOUT_LOGFILE_ENV,
     env.put(Constants.STDOUT_LOGFILE_ENV,
         getTaskLogFile(containerLogDir, TaskLog.LogName.STDOUT).toString());
         getTaskLogFile(containerLogDir, TaskLog.LogName.STDOUT).toString());

+ 21 - 3
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java

@@ -181,7 +181,7 @@ public class TaskAttemptListenerImpl extends CompositeService
         TypeConverter.toYarn(taskAttemptID);
         TypeConverter.toYarn(taskAttemptID);
 
 
     taskHeartbeatHandler.receivedPing(attemptID);
     taskHeartbeatHandler.receivedPing(attemptID);
-
+    //Ignorable TaskStatus? - since a task will send a LastStatusUpdate
     context.getEventHandler().handle(
     context.getEventHandler().handle(
         new TaskAttemptEvent(attemptID, 
         new TaskAttemptEvent(attemptID, 
             TaskAttemptEventType.TA_COMMIT_PENDING));
             TaskAttemptEventType.TA_COMMIT_PENDING));
@@ -314,6 +314,24 @@ public class TaskAttemptListenerImpl extends CompositeService
     taskAttemptStatus.counters =
     taskAttemptStatus.counters =
         TypeConverter.toYarn(taskStatus.getCounters());
         TypeConverter.toYarn(taskStatus.getCounters());
 
 
+    // Map Finish time set by the task (map only)
+    if (taskStatus.getIsMap() && taskStatus.getMapFinishTime() != 0) {
+      taskAttemptStatus.mapFinishTime = taskStatus.getMapFinishTime();
+    }
+
+    // Shuffle Finish time set by the task (reduce only).
+    if (!taskStatus.getIsMap() && taskStatus.getShuffleFinishTime() != 0) {
+      taskAttemptStatus.shuffleFinishTime = taskStatus.getShuffleFinishTime();
+    }
+
+    // Sort finish time set by the task (reduce only).
+    if (!taskStatus.getIsMap() && taskStatus.getSortFinishTime() != 0) {
+      taskAttemptStatus.sortFinishTime = taskStatus.getSortFinishTime();
+    }
+
+    // Not Setting the task state. Used by speculation - will be set in TaskAttemptImpl
+    //taskAttemptStatus.taskState =  TypeConverter.toYarn(taskStatus.getRunState());
+    
     //set the fetch failures
     //set the fetch failures
     if (taskStatus.getFetchFailedMaps() != null 
     if (taskStatus.getFetchFailedMaps() != null 
         && taskStatus.getFetchFailedMaps().size() > 0) {
         && taskStatus.getFetchFailedMaps().size() > 0) {
@@ -325,8 +343,8 @@ public class TaskAttemptListenerImpl extends CompositeService
       }
       }
     }
     }
 
 
-    // Task sends the information about the nextRecordRange to the TT
-
+ // Task sends the information about the nextRecordRange to the TT
+    
 //    TODO: The following are not needed here, but needed to be set somewhere inside AppMaster.
 //    TODO: The following are not needed here, but needed to be set somewhere inside AppMaster.
 //    taskStatus.getRunState(); // Set by the TT/JT. Transform into a state TODO
 //    taskStatus.getRunState(); // Set by the TT/JT. Transform into a state TODO
 //    taskStatus.getStartTime(); // Used to be set by the TaskTracker. This should be set by getTask().
 //    taskStatus.getStartTime(); // Used to be set by the TaskTracker. This should be set by getTask().

+ 15 - 0
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/WrappedPeriodicStatsAccumulator.java

@@ -0,0 +1,15 @@
+package org.apache.hadoop.mapred;
+
+//Workaround for PeriodicStateAccumulator being package access
+public class WrappedPeriodicStatsAccumulator {
+
+  private PeriodicStatsAccumulator real;
+
+  public WrappedPeriodicStatsAccumulator(PeriodicStatsAccumulator real) {
+    this.real = real;
+  }
+  
+  public void extend(double newProgress, int newValue) {
+    real.extend(newProgress, newValue);
+  }
+}

+ 52 - 0
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/WrappedProgressSplitsBlock.java

@@ -0,0 +1,52 @@
+package org.apache.hadoop.mapred;
+
+// Workaround for ProgressSplitBlock being package access
+public class WrappedProgressSplitsBlock extends ProgressSplitsBlock {
+
+  public static final int DEFAULT_NUMBER_PROGRESS_SPLITS = 12;
+
+  private WrappedPeriodicStatsAccumulator wrappedProgressWallclockTime;
+  private WrappedPeriodicStatsAccumulator wrappedProgressCPUTime;
+  private WrappedPeriodicStatsAccumulator wrappedProgressVirtualMemoryKbytes;
+  private WrappedPeriodicStatsAccumulator wrappedProgressPhysicalMemoryKbytes;
+
+  public WrappedProgressSplitsBlock(int numberSplits) {
+    super(numberSplits);
+  }
+
+  public int[][] burst() {
+    return super.burst();
+  }
+
+  public WrappedPeriodicStatsAccumulator getProgressWallclockTime() {
+    if (wrappedProgressWallclockTime == null) {
+      wrappedProgressWallclockTime = new WrappedPeriodicStatsAccumulator(
+          progressWallclockTime);
+    }
+    return wrappedProgressWallclockTime;
+  }
+
+  public WrappedPeriodicStatsAccumulator getProgressCPUTime() {
+    if (wrappedProgressCPUTime == null) {
+      wrappedProgressCPUTime = new WrappedPeriodicStatsAccumulator(
+          progressCPUTime);
+    }
+    return wrappedProgressCPUTime;
+  }
+
+  public WrappedPeriodicStatsAccumulator getProgressVirtualMemoryKbytes() {
+    if (wrappedProgressVirtualMemoryKbytes == null) {
+      wrappedProgressVirtualMemoryKbytes = new WrappedPeriodicStatsAccumulator(
+          progressVirtualMemoryKbytes);
+    }
+    return wrappedProgressVirtualMemoryKbytes;
+  }
+
+  public WrappedPeriodicStatsAccumulator getProgressPhysicalMemoryKbytes() {
+    if (wrappedProgressPhysicalMemoryKbytes == null) {
+      wrappedProgressPhysicalMemoryKbytes = new WrappedPeriodicStatsAccumulator(
+          progressPhysicalMemoryKbytes);
+    }
+    return wrappedProgressPhysicalMemoryKbytes;
+  }
+}

+ 1 - 0
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java

@@ -103,6 +103,7 @@ class YarnChild {
 
 
     // report non-pid to application master
     // report non-pid to application master
     JvmContext context = new JvmContext(jvmId, "-1000");
     JvmContext context = new JvmContext(jvmId, "-1000");
+    LOG.debug("PID: " + System.getenv().get("JVM_PID"));
     Task task = null;
     Task task = null;
     UserGroupInformation childUGI = null;
     UserGroupInformation childUGI = null;
 
 

+ 1 - 0
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AMConstants.java

@@ -70,4 +70,5 @@ public interface AMConstants {
 
 
   public static final String NODE_BLACKLISTING_ENABLE = MRConstants.YARN_MR_PREFIX
   public static final String NODE_BLACKLISTING_ENABLE = MRConstants.YARN_MR_PREFIX
   + "node.blacklisting.enable";
   + "node.blacklisting.enable";
+  
 }
 }

+ 5 - 0
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java

@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.hadoop.mapreduce.v2.api.records.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 
 
 
 
 public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent {
 public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent {
@@ -52,5 +53,9 @@ public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent {
     public Phase phase;
     public Phase phase;
     public long outputSize;
     public long outputSize;
     public List<TaskAttemptId> fetchFailedMaps;
     public List<TaskAttemptId> fetchFailedMaps;
+    public long mapFinishTime;
+    public long shuffleFinishTime;
+    public long sortFinishTime;
+    public TaskAttemptState taskState;
   }
   }
 }
 }

+ 29 - 2
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java

@@ -452,6 +452,33 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     }
     }
   }
   }
 
 
+  private Counters getTypeCounters(Set<TaskId> taskIds) {
+    Counters counters = newCounters();
+    for (TaskId taskId : taskIds) {
+      Task task = tasks.get(taskId);
+      incrAllCounters(counters, task.getCounters());
+    }
+    return counters;
+  }
+
+  private Counters getMapCounters() {
+    readLock.lock();
+    try {
+      return getTypeCounters(mapTasks);
+    } finally {
+      readLock.unlock();
+    }
+  }
+  
+  private Counters getReduceCounters() {
+    readLock.lock();
+    try {
+      return getTypeCounters(reduceTasks);
+    } finally {
+      readLock.unlock();
+    }
+  }
+  
   public static Counters newCounters() {
   public static Counters newCounters() {
     Counters counters = RecordFactoryProvider.getRecordFactory(null)
     Counters counters = RecordFactoryProvider.getRecordFactory(null)
         .newRecordInstance(Counters.class);
         .newRecordInstance(Counters.class);
@@ -1066,8 +1093,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
         job.oldJobId, job.finishTime,
         job.oldJobId, job.finishTime,
         job.succeededMapTaskCount, job.succeededReduceTaskCount,
         job.succeededMapTaskCount, job.succeededReduceTaskCount,
         job.failedMapTaskCount, job.failedReduceTaskCount,
         job.failedMapTaskCount, job.failedReduceTaskCount,
-        TypeConverter.fromYarn(job.getCounters()), //TODO replace with MapCounters
-        TypeConverter.fromYarn(job.getCounters()), //TODO replace with ReduceCoutners
+        TypeConverter.fromYarn(job.getMapCounters()),
+        TypeConverter.fromYarn(job.getReduceCounters()),
         TypeConverter.fromYarn(job.getCounters()));
         TypeConverter.fromYarn(job.getCounters()));
     return jfe;
     return jfe;
   }
   }

+ 93 - 9
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -43,13 +43,16 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceChildJVM;
 import org.apache.hadoop.mapred.MapReduceChildJVM;
+import org.apache.hadoop.mapred.ProgressSplitsBlock;
 import org.apache.hadoop.mapred.ShuffleHandler;
 import org.apache.hadoop.mapred.ShuffleHandler;
 import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapred.TaskAttemptContextImpl;
 import org.apache.hadoop.mapred.TaskAttemptContextImpl;
 import org.apache.hadoop.mapred.WrappedJvmID;
 import org.apache.hadoop.mapred.WrappedJvmID;
+import org.apache.hadoop.mapred.WrappedProgressSplitsBlock;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@@ -60,6 +63,8 @@ import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionE
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.v2.MRConstants;
 import org.apache.hadoop.mapreduce.v2.MRConstants;
+import org.apache.hadoop.mapreduce.v2.api.records.Counter;
+import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
 import org.apache.hadoop.mapreduce.v2.api.records.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -88,6 +93,7 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.Credentials;
@@ -124,6 +130,7 @@ public abstract class TaskAttemptImpl implements
       EventHandler<TaskAttemptEvent> {
       EventHandler<TaskAttemptEvent> {
 
 
   private static final Log LOG = LogFactory.getLog(TaskAttemptImpl.class);
   private static final Log LOG = LogFactory.getLog(TaskAttemptImpl.class);
+  private static final long MEMORY_SPLITS_RESOLUTION = 1024; //TODO Make configurable?
   private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
 
 
   protected final Configuration conf;
   protected final Configuration conf;
@@ -147,6 +154,7 @@ public abstract class TaskAttemptImpl implements
 
 
   private long launchTime;
   private long launchTime;
   private long finishTime;
   private long finishTime;
+  private WrappedProgressSplitsBlock progressSplitBlock;
 
 
   private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION =
   private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION =
     new CleanupContainerTransition();
     new CleanupContainerTransition();
@@ -900,11 +908,82 @@ public abstract class TaskAttemptImpl implements
         TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
         TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
         attemptState.toString(), taskAttempt.finishTime,
         attemptState.toString(), taskAttempt.finishTime,
         taskAttempt.containerMgrAddress == null ? "UNKNOWN" : taskAttempt.containerMgrAddress,
         taskAttempt.containerMgrAddress == null ? "UNKNOWN" : taskAttempt.containerMgrAddress,
-        taskAttempt.reportedStatus.diagnosticInfo.toString());
-      //TODO Different constructor - allSplits
+        taskAttempt.reportedStatus.diagnosticInfo.toString(),
+        taskAttempt.getProgressSplitBlock().burst());
     return tauce;
     return tauce;
   }
   }
 
 
+  private WrappedProgressSplitsBlock getProgressSplitBlock() {
+    readLock.lock();
+    try {
+      if (progressSplitBlock == null) {
+        progressSplitBlock = new WrappedProgressSplitsBlock(conf.getInt(
+            JHConfig.JOBHISTORY_TASKPROGRESS_NUMBER_SPLITS_KEY,
+            WrappedProgressSplitsBlock.DEFAULT_NUMBER_PROGRESS_SPLITS));
+      }
+      return progressSplitBlock;
+    } finally {
+      readLock.unlock();
+    }
+  }
+  
+  private void updateProgressSplits() {
+    double newProgress = reportedStatus.progress;
+    Counters counters = reportedStatus.counters;
+    if (counters == null)
+      return;
+
+    WrappedProgressSplitsBlock splitsBlock = getProgressSplitBlock();
+    if (splitsBlock != null) {
+      long now = clock.getTime();
+      long start = getLaunchTime(); // TODO Ensure not 0
+
+      if (start != 0 && now - start <= Integer.MAX_VALUE) {
+        splitsBlock.getProgressWallclockTime().extend(newProgress,
+            (int) (now - start));
+      }
+
+      // TODO Fix the Counter API
+      CounterGroup cpuCounterGroup = counters
+          .getCounterGroup(TaskCounter.CPU_MILLISECONDS.getDeclaringClass()
+              .getName());
+      if (cpuCounterGroup != null) {
+        Counter cpuCounter = cpuCounterGroup
+            .getCounter(TaskCounter.CPU_MILLISECONDS.name());
+        if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) {
+          splitsBlock.getProgressCPUTime().extend(newProgress,
+              (int) cpuCounter.getValue());
+        }
+      }
+
+      // TODO Fix the Counter API
+      CounterGroup vbCounterGroup = counters
+          .getCounterGroup(TaskCounter.VIRTUAL_MEMORY_BYTES.getDeclaringClass()
+              .getName());
+      if (vbCounterGroup != null) {
+        Counter virtualBytes = vbCounterGroup
+            .getCounter(TaskCounter.VIRTUAL_MEMORY_BYTES.name());
+        if (virtualBytes != null) {
+          splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress,
+              (int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
+        }
+      }
+
+      // TODO Fix the Counter API
+      CounterGroup pbCounterGroup = counters
+          .getCounterGroup(TaskCounter.PHYSICAL_MEMORY_BYTES
+              .getDeclaringClass().getName());
+      if (pbCounterGroup != null) {
+        Counter physicalBytes = pbCounterGroup
+            .getCounter(TaskCounter.PHYSICAL_MEMORY_BYTES.name());
+        if (physicalBytes != null) {
+          splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress,
+              (int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
+        }
+      }
+    }
+  }
+
   private static class RequestContainerTransition implements
   private static class RequestContainerTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
     private final boolean rescheduled;
     private final boolean rescheduled;
@@ -1150,10 +1229,11 @@ public abstract class TaskAttemptImpl implements
          new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
          new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
          TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
          TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
          state.toString(),
          state.toString(),
-         finishTime, //TODO TaskAttemptStatus changes. MapFinishTime
+         this.reportedStatus.mapFinishTime,
          finishTime, this.containerMgrAddress == null ? "UNKNOWN" : this.containerMgrAddress,
          finishTime, this.containerMgrAddress == null ? "UNKNOWN" : this.containerMgrAddress,
-         state.toString(), //TODO state is a progress string.
-         TypeConverter.fromYarn(getCounters()),null);
+         this.reportedStatus.stateString,
+         TypeConverter.fromYarn(getCounters()),
+         getProgressSplitBlock().burst());
          eventHandler.handle(
          eventHandler.handle(
            new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe));
            new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe));
     } else {
     } else {
@@ -1161,11 +1241,12 @@ public abstract class TaskAttemptImpl implements
          new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
          new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
          TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
          TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
          state.toString(),
          state.toString(),
-         finishTime, //TODO TaskAttemptStatus changes. ShuffleFinishTime
-         finishTime, //TODO TaskAttemptStatus changes. SortFinishTime
+         this.reportedStatus.shuffleFinishTime,
+         this.reportedStatus.sortFinishTime,
          finishTime, this.containerMgrAddress == null ? "UNKNOWN" : this.containerMgrAddress,
          finishTime, this.containerMgrAddress == null ? "UNKNOWN" : this.containerMgrAddress,
-         state.toString(),
-         TypeConverter.fromYarn(getCounters()),null);
+         this.reportedStatus.stateString,
+         TypeConverter.fromYarn(getCounters()),
+         getProgressSplitBlock().burst());
          eventHandler.handle(
          eventHandler.handle(
            new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe));
            new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe));
     }
     }
@@ -1252,6 +1333,7 @@ public abstract class TaskAttemptImpl implements
               .getReportedTaskAttemptStatus();
               .getReportedTaskAttemptStatus();
       // Now switch the information in the reportedStatus
       // Now switch the information in the reportedStatus
       taskAttempt.reportedStatus = newReportedStatus;
       taskAttempt.reportedStatus = newReportedStatus;
+      taskAttempt.reportedStatus.taskState = taskAttempt.getState();
 
 
       // send event to speculator about the reported status
       // send event to speculator about the reported status
       taskAttempt.eventHandler.handle
       taskAttempt.eventHandler.handle
@@ -1260,6 +1342,7 @@ public abstract class TaskAttemptImpl implements
       
       
       //add to diagnostic
       //add to diagnostic
       taskAttempt.addDiagnosticInfo(newReportedStatus.diagnosticInfo);
       taskAttempt.addDiagnosticInfo(newReportedStatus.diagnosticInfo);
+      taskAttempt.updateProgressSplits();
       
       
       //if fetch failures are present, send the fetch failure event to job
       //if fetch failures are present, send the fetch failure event to job
       //this only will happen in reduce attempt type
       //this only will happen in reduce attempt type
@@ -1289,6 +1372,7 @@ public abstract class TaskAttemptImpl implements
     result.diagnosticInfo = new String("");
     result.diagnosticInfo = new String("");
     result.phase = Phase.STARTING;
     result.phase = Phase.STARTING;
     result.stateString = new String("NEW");
     result.stateString = new String("NEW");
+    result.taskState = TaskAttemptState.NEW;
     Counters counters = recordFactory.newRecordInstance(Counters.class);
     Counters counters = recordFactory.newRecordInstance(Counters.class);
 //    counters.groups = new HashMap<String, CounterGroup>();
 //    counters.groups = new HashMap<String, CounterGroup>();
     result.counters = counters;
     result.counters = counters;

+ 1 - 1
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java

@@ -331,7 +331,7 @@ public class RecoveryService extends CompositeService implements Recovery {
       taskAttemptStatus.id = yarnAttemptID;
       taskAttemptStatus.id = yarnAttemptID;
       taskAttemptStatus.progress = 1.0f;
       taskAttemptStatus.progress = 1.0f;
       taskAttemptStatus.diagnosticInfo = "";
       taskAttemptStatus.diagnosticInfo = "";
-      taskAttemptStatus.stateString = attemptInfo.getState();
+      taskAttemptStatus.stateString = attemptInfo.getTaskStatus(); 
       // taskAttemptStatus.outputSize = attemptInfo.getOutputSize();
       // taskAttemptStatus.outputSize = attemptInfo.getOutputSize();
       taskAttemptStatus.phase = Phase.CLEANUP;
       taskAttemptStatus.phase = Phase.CLEANUP;
       org.apache.hadoop.mapreduce.Counters cntrs = attemptInfo.getCounters();
       org.apache.hadoop.mapreduce.Counters cntrs = attemptInfo.getCounters();

+ 1 - 1
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java

@@ -298,7 +298,7 @@ public class DefaultSpeculator extends AbstractService implements
    */
    */
   protected void statusUpdate(TaskAttemptStatus reportedStatus, long timestamp) {
   protected void statusUpdate(TaskAttemptStatus reportedStatus, long timestamp) {
 
 
-    String stateString = reportedStatus.stateString.toString();
+    String stateString = reportedStatus.taskState.toString();
 
 
     TaskAttemptId attemptID = reportedStatus.id;
     TaskAttemptId attemptID = reportedStatus.id;
     TaskId taskID = attemptID.getTaskId();
     TaskId taskID = attemptID.getTaskId();

+ 1 - 0
mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java

@@ -85,6 +85,7 @@ public class TestMRClientService {
     taskAttemptStatus.progress = 0.5f;
     taskAttemptStatus.progress = 0.5f;
     taskAttemptStatus.diagnosticInfo = diagnostic2;
     taskAttemptStatus.diagnosticInfo = diagnostic2;
     taskAttemptStatus.stateString = "RUNNING";
     taskAttemptStatus.stateString = "RUNNING";
+    taskAttemptStatus.taskState = TaskAttemptState.RUNNING;
     taskAttemptStatus.phase = Phase.MAP;
     taskAttemptStatus.phase = Phase.MAP;
     taskAttemptStatus.outputSize = 3;
     taskAttemptStatus.outputSize = 3;
     // send the status update
     // send the status update

+ 1 - 0
mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java

@@ -191,6 +191,7 @@ public class TestRuntimeEstimators {
             status.id = attempt.getID();
             status.id = attempt.getID();
             status.progress = attempt.getProgress();
             status.progress = attempt.getProgress();
             status.stateString = attempt.getState().name();
             status.stateString = attempt.getState().name();
+            status.taskState = attempt.getState();
             SpeculatorEvent event = new SpeculatorEvent(status, clock.getTime());
             SpeculatorEvent event = new SpeculatorEvent(status, clock.getTime());
             speculator.handle(event);
             speculator.handle(event);
           }
           }

+ 14 - 2
mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
@@ -125,6 +126,16 @@ public class TypeConverter {
     return taskId;
     return taskId;
   }
   }
 
 
+  public static TaskAttemptState toYarn(org.apache.hadoop.mapred.TaskStatus.State state) {
+    if (state == org.apache.hadoop.mapred.TaskStatus.State.KILLED_UNCLEAN) {
+      return TaskAttemptState.KILLED;
+    }
+    if (state == org.apache.hadoop.mapred.TaskStatus.State.FAILED_UNCLEAN) {
+      return TaskAttemptState.FAILED;
+    }
+    return TaskAttemptState.valueOf(state.toString());
+  }
+  
   public static Phase toYarn(org.apache.hadoop.mapred.TaskStatus.Phase phase) {
   public static Phase toYarn(org.apache.hadoop.mapred.TaskStatus.Phase phase) {
     switch (phase) {
     switch (phase) {
     case STARTING:
     case STARTING:
@@ -211,10 +222,11 @@ public class TypeConverter {
     org.apache.hadoop.mapreduce.Counters counters = 
     org.apache.hadoop.mapreduce.Counters counters = 
       new org.apache.hadoop.mapreduce.Counters();
       new org.apache.hadoop.mapreduce.Counters();
     for (CounterGroup yGrp : yCntrs.getAllCounterGroups().values()) {
     for (CounterGroup yGrp : yCntrs.getAllCounterGroups().values()) {
+      counters.addGroup(yGrp.getName(), yGrp.getDisplayName());
       for (Counter yCntr : yGrp.getAllCounters().values()) {
       for (Counter yCntr : yGrp.getAllCounters().values()) {
         org.apache.hadoop.mapreduce.Counter c = 
         org.apache.hadoop.mapreduce.Counter c = 
-          counters.findCounter(yGrp.getDisplayName(), 
-              yCntr.getDisplayName());
+          counters.findCounter(yGrp.getName(), 
+              yCntr.getName());
         c.setValue(yCntr.getValue());
         c.setValue(yCntr.getValue());
       }
       }
     }
     }

+ 6 - 0
mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHConfig.java

@@ -90,4 +90,10 @@ public class JHConfig {
   public static final String HS_CLIENT_THREADS = 
   public static final String HS_CLIENT_THREADS = 
     HS_PREFIX + "client.threads";
     HS_PREFIX + "client.threads";
   public static final int DEFAULT_HS_CLIENT_THREADS = 10;
   public static final int DEFAULT_HS_CLIENT_THREADS = 10;
+  
+//From JTConfig. May need to be moved elsewhere.
+  public static final String JOBHISTORY_TASKPROGRESS_NUMBER_SPLITS_KEY = 
+    "mapreduce.jobtracker.jobhistory.task.numberprogresssplits";
+  
+  public static int DEFAULT_NUMBER_PROGRESS_SPLITS = 12;
 }
 }

+ 19 - 0
mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.MRConstants;
 import org.apache.hadoop.mapreduce.v2.MRConstants;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@@ -123,10 +124,28 @@ public class JobHistoryUtils {
     }
     }
   };
   };
 
 
+  /**
+   * Checks whether the provided path string is a valid job history file.
+   * @param pathString the path to be checked.
+   * @return
+   */
   public static boolean isValidJobHistoryFileName(String pathString) {
   public static boolean isValidJobHistoryFileName(String pathString) {
     return pathString.endsWith(JOB_HISTORY_FILE_EXTENSION);
     return pathString.endsWith(JOB_HISTORY_FILE_EXTENSION);
   }
   }
 
 
+  /**
+   * Returns the jobId from a job history file name.
+   * @param pathString the path string.
+   * @return the JobId
+   * @throws IOException if the filename format is invalid.
+   */
+  public static JobID getJobIDFromHistoryFilePath(String pathString) throws IOException {
+    String [] parts = pathString.split(File.separator);
+    String fileNamePart = parts[parts.length -1];
+    JobIndexInfo jobIndexInfo =  FileNameIndexUtils.getIndexInfo(fileNamePart);
+    return TypeConverter.fromYarn(jobIndexInfo.getJobId());
+  }
+
   /**
   /**
    * Gets a PathFilter which would match configuration files.
    * Gets a PathFilter which would match configuration files.
    * @return
    * @return

+ 1 - 1
mapreduce/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java

@@ -480,7 +480,7 @@ public class TaskLog {
     // Export the pid of taskJvm to env variable JVM_PID.
     // Export the pid of taskJvm to env variable JVM_PID.
     // Currently pid is not used on Windows
     // Currently pid is not used on Windows
     if (!Shell.WINDOWS) {
     if (!Shell.WINDOWS) {
-      mergedCmd.append("export JVM_PID=`echo $$` ; ");
+      mergedCmd.append("export JVM_PID=$$; ");
     }
     }
 
 
     if (setup != null) {
     if (setup != null) {

+ 9 - 9
mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java

@@ -152,15 +152,15 @@ public abstract class ResourceCalculatorPlugin extends Configured {
 
 
     // FIXME
     // FIXME
     // No class given, try a os specific class
     // No class given, try a os specific class
-//    try {
-//      String osName = System.getProperty("os.name");
-//      if (osName.startsWith("Linux")) {
-//        return new LinuxResourceCalculatorPlugin();
-//      }
-//    } catch (SecurityException se) {
-//      // Failed to get Operating System name.
-//      return null;
-//    }
+    try {
+      String osName = System.getProperty("os.name");
+      if (osName.startsWith("Linux")) {
+        return new LinuxResourceCalculatorPlugin();
+      }
+    } catch (SecurityException se) {
+      // Failed to get Operating System name.
+      return null;
+    }
 
 
     // Not supported on this system.
     // Not supported on this system.
     return null;
     return null;

+ 4 - 0
mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java

@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -287,6 +288,9 @@ public class ContainerLaunch implements Callable<Integer> {
       sb.env("YARN_HOME", System.getenv("YARN_HOME"));
       sb.env("YARN_HOME", System.getenv("YARN_HOME"));
     }
     }
     sb.env(ApplicationConstants.LOCAL_DIR_ENV, StringUtils.join(",", appDirs));
     sb.env(ApplicationConstants.LOCAL_DIR_ENV, StringUtils.join(",", appDirs));
+    if (!Shell.WINDOWS) {
+      sb.env("JVM_PID", "$$");
+    }
     if (environment != null) {
     if (environment != null) {
       for (Map.Entry<String,String> env : environment.entrySet()) {
       for (Map.Entry<String,String> env : environment.entrySet()) {
         sb.env(env.getKey().toString(), env.getValue().toString());
         sb.env(env.getKey().toString(), env.getValue().toString());