浏览代码

MAPREDUCE-2664. Implement JobCounters for MRv2. Contributed by Siddharth Seth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-279@1150993 13f79535-47bb-0310-9956-ffa450edef68
Sharad Agarwal 14 年之前
父节点
当前提交
e149a1eebc
共有 16 个文件被更改,包括 386 次插入38 次删除
  1. 3 0
      mapreduce/CHANGES.txt
  2. 16 0
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
  3. 19 0
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
  4. 42 0
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobCounterUpdateEvent.java
  5. 1 0
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
  6. 31 9
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
  7. 82 21
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
  8. 1 1
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
  9. 13 0
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
  10. 1 0
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java
  11. 34 0
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
  12. 3 1
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
  13. 2 0
      mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Counters.java
  14. 20 0
      mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/CountersPBImpl.java
  15. 54 6
      mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
  16. 64 0
      mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java

+ 3 - 0
mapreduce/CHANGES.txt

@@ -5,6 +5,9 @@ Trunk (unreleased changes)
 
 
     MAPREDUCE-279
     MAPREDUCE-279
 
 
+    MAPREDUCE-2664. Implement JobCounters for MRv2. (Siddharth Seth via 
+    sharad)
+
     MAPREDUCE-2667. mapred job -kill leaves application in RUNNING state 
     MAPREDUCE-2667. mapred job -kill leaves application in RUNNING state 
     (thomas graves via mahadev)
     (thomas graves via mahadev)
     
     

+ 16 - 0
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java

@@ -36,12 +36,14 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.AMConstants;
 import org.apache.hadoop.mapreduce.v2.app.AMConstants;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -205,9 +207,23 @@ public class LocalContainerLauncher extends AbstractService implements
           }
           }
 
 
           try {
           try {
+            if (remoteTask.isMapOrReduce()) {
+              JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
+              jce.addCounterUpdate(JobCounter.TOTAL_LAUNCHED_UBERTASKS, 1);
+              if (remoteTask.isMapTask()) {
+                jce.addCounterUpdate(JobCounter.NUM_UBER_SUBMAPS, 1);
+              } else {
+                jce.addCounterUpdate(JobCounter.NUM_UBER_SUBREDUCES, 1);
+              }
+              context.getEventHandler().handle(jce);
+            }
             runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks,
             runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks,
                        (numReduceTasks > 0));
                        (numReduceTasks > 0));
+            
           } catch (RuntimeException re) {
           } catch (RuntimeException re) {
+            JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
+            jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1);
+            context.getEventHandler().handle(jce);
             // this is our signal that the subtask failed in some way, so
             // this is our signal that the subtask failed in some way, so
             // simulate a failed JVM/container and send a container-completed
             // simulate a failed JVM/container and send a container-completed
             // event to task attempt (i.e., move state machine from RUNNING
             // event to task attempt (i.e., move state machine from RUNNING

+ 19 - 0
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java

@@ -36,6 +36,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.v2.api.records.Counter;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
@@ -462,6 +464,7 @@ public class JobHistoryEventHandler extends AbstractService
                 .toString());
                 .toString());
       // TODO JOB_FINISHED does not have state. Effectively job history does not
       // TODO JOB_FINISHED does not have state. Effectively job history does not
       // have state about the finished job.
       // have state about the finished job.
+      setSummarySlotSeconds(summary, jobId);
       break;
       break;
     case JOB_FAILED:
     case JOB_FAILED:
     case JOB_KILLED:
     case JOB_KILLED:
@@ -470,10 +473,26 @@ public class JobHistoryEventHandler extends AbstractService
       summary.setNumFinishedMaps(context.getJob(jobId).getTotalMaps());
       summary.setNumFinishedMaps(context.getJob(jobId).getTotalMaps());
       summary.setNumFinishedReduces(context.getJob(jobId).getTotalReduces());
       summary.setNumFinishedReduces(context.getJob(jobId).getTotalReduces());
       summary.setJobFinishTime(juce.getFinishTime());
       summary.setJobFinishTime(juce.getFinishTime());
+      setSummarySlotSeconds(summary, jobId);
       break;
       break;
     }
     }
   }
   }
 
 
+  private void setSummarySlotSeconds(JobSummary summary, JobId jobId) {
+    Counter slotMillisMapCounter =
+        context.getJob(jobId).getCounters()
+            .getCounter(JobCounter.SLOTS_MILLIS_MAPS);
+    if (slotMillisMapCounter != null) {
+      summary.setMapSlotSeconds(slotMillisMapCounter.getValue());
+    }
+    Counter slotMillisReduceCounter =
+        context.getJob(jobId).getCounters()
+            .getCounter(JobCounter.SLOTS_MILLIS_REDUCES);
+    if (slotMillisReduceCounter != null) {
+      summary.setMapSlotSeconds(slotMillisReduceCounter.getValue());
+    }
+  }
+
   protected void closeEventWriter(JobId jobId) throws IOException {
   protected void closeEventWriter(JobId jobId) throws IOException {
 
 
     final MetaInfo mi = fileMap.get(jobId);
     final MetaInfo mi = fileMap.get(jobId);

+ 42 - 0
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobCounterUpdateEvent.java

@@ -0,0 +1,42 @@
+package org.apache.hadoop.mapreduce.v2.app.job.event;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+
+public class JobCounterUpdateEvent extends JobEvent {
+
+  List<CounterIncrementalUpdate> counterUpdates = null;
+  
+  public JobCounterUpdateEvent(JobId jobId) {
+    super(jobId, JobEventType.JOB_COUNTER_UPDATE);
+    counterUpdates = new ArrayList<JobCounterUpdateEvent.CounterIncrementalUpdate>();
+  }
+
+  public void addCounterUpdate(Enum<?> key, long incrValue) {
+    counterUpdates.add(new CounterIncrementalUpdate(key, incrValue));
+  }
+  
+  public List<CounterIncrementalUpdate> getCounterUpdates() {
+    return counterUpdates;
+  }
+  
+  public static class CounterIncrementalUpdate {
+    Enum<?> key;
+    long incrValue;
+    
+    public CounterIncrementalUpdate(Enum<?> key, long incrValue) {
+      this.key = key;
+      this.incrValue = incrValue;
+    }
+    
+    public Enum<?> getCounterKey() {
+      return key;
+    }
+
+    public long getIncrementValue() {
+      return incrValue;
+    }
+  }
+}

+ 1 - 0
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java

@@ -41,6 +41,7 @@ public enum JobEventType {
   //Producer:Any component
   //Producer:Any component
   JOB_DIAGNOSTIC_UPDATE,
   JOB_DIAGNOSTIC_UPDATE,
   INTERNAL_ERROR,
   INTERNAL_ERROR,
+  JOB_COUNTER_UPDATE,
   
   
   //Producer:TaskAttemptListener
   //Producer:TaskAttemptListener
   JOB_TASK_ATTEMPT_FETCH_FAILURE
   JOB_TASK_ATTEMPT_FETCH_FAILURE

+ 31 - 9
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java

@@ -79,6 +79,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
@@ -149,12 +150,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
   private boolean lazyTasksCopyNeeded = false;
   private boolean lazyTasksCopyNeeded = false;
   private volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>();
   private volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>();
   private Counters jobCounters = newCounters();
   private Counters jobCounters = newCounters();
-    // FIXME:  support job-level counters
+    // FIXME:  
     //
     //
-    // Presumably want to define new event type that job-related entities
-    // (e.g., MRAppMaster or LocalContainerLauncher) can emit with some sort
-    // of payload (maybe just Counters?); then define new Job state-machine
-    // transition to handle the event and update jobCounters with payload data.
     // Can then replace task-level uber counters (MR-2424) with job-level ones
     // Can then replace task-level uber counters (MR-2424) with job-level ones
     // sent from LocalContainerLauncher, and eventually including a count of
     // sent from LocalContainerLauncher, and eventually including a count of
     // of uber-AM attempts (probably sent from MRAppMaster).
     // of uber-AM attempts (probably sent from MRAppMaster).
@@ -184,6 +181,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
   private static final TaskAttemptCompletedEventTransition
   private static final TaskAttemptCompletedEventTransition
       TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
       TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
           new TaskAttemptCompletedEventTransition();
           new TaskAttemptCompletedEventTransition();
+  private static final CounterUpdateTransition COUNTER_UPDATE_TRANSITION =
+      new CounterUpdateTransition();
 
 
   protected static final
   protected static final
     StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent> 
     StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent> 
@@ -195,6 +194,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
           .addTransition(JobState.NEW, JobState.NEW,
           .addTransition(JobState.NEW, JobState.NEW,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
               DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(JobState.NEW, JobState.NEW,
+              JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition
           .addTransition
               (JobState.NEW,
               (JobState.NEW,
               EnumSet.of(JobState.INITED, JobState.FAILED),
               EnumSet.of(JobState.INITED, JobState.FAILED),
@@ -211,6 +212,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
           .addTransition(JobState.INITED, JobState.INITED,
           .addTransition(JobState.INITED, JobState.INITED,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
               DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(JobState.INITED, JobState.INITED,
+              JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition(JobState.INITED, JobState.RUNNING,
           .addTransition(JobState.INITED, JobState.RUNNING,
               JobEventType.JOB_START,
               JobEventType.JOB_START,
               new StartTransition())
               new StartTransition())
@@ -243,6 +246,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
           .addTransition(JobState.RUNNING, JobState.RUNNING,
           .addTransition(JobState.RUNNING, JobState.RUNNING,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
               DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(JobState.RUNNING, JobState.RUNNING,
+              JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition(JobState.RUNNING, JobState.RUNNING,
           .addTransition(JobState.RUNNING, JobState.RUNNING,
               JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
               JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
               new TaskAttemptFetchFailureTransition())
               new TaskAttemptFetchFailureTransition())
@@ -263,6 +268,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
           .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
           .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
               DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
+              JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition(
           .addTransition(
               JobState.KILL_WAIT,
               JobState.KILL_WAIT,
               JobState.ERROR, JobEventType.INTERNAL_ERROR,
               JobState.ERROR, JobEventType.INTERNAL_ERROR,
@@ -277,6 +284,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
           .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
           .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
               DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
+              JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition(
           .addTransition(
               JobState.SUCCEEDED,
               JobState.SUCCEEDED,
               JobState.ERROR, JobEventType.INTERNAL_ERROR,
               JobState.ERROR, JobEventType.INTERNAL_ERROR,
@@ -290,6 +299,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
           .addTransition(JobState.FAILED, JobState.FAILED,
           .addTransition(JobState.FAILED, JobState.FAILED,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
               DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(JobState.FAILED, JobState.FAILED,
+              JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition(
           .addTransition(
               JobState.FAILED,
               JobState.FAILED,
               JobState.ERROR, JobEventType.INTERNAL_ERROR,
               JobState.ERROR, JobEventType.INTERNAL_ERROR,
@@ -303,6 +314,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
           .addTransition(JobState.KILLED, JobState.KILLED,
           .addTransition(JobState.KILLED, JobState.KILLED,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
               DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(JobState.KILLED, JobState.KILLED,
+              JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           .addTransition(
           .addTransition(
               JobState.KILLED,
               JobState.KILLED,
               JobState.ERROR, JobEventType.INTERNAL_ERROR,
               JobState.ERROR, JobEventType.INTERNAL_ERROR,
@@ -460,7 +473,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
   @Override
   @Override
   public Counters getCounters() {
   public Counters getCounters() {
     Counters counters = newCounters();
     Counters counters = newCounters();
-    // TODO: compute job-level counters
     readLock.lock();
     readLock.lock();
     try {
     try {
       incrAllCounters(counters, jobCounters);
       incrAllCounters(counters, jobCounters);
@@ -500,7 +512,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
   public static Counters newCounters() {
   public static Counters newCounters() {
     Counters counters = RecordFactoryProvider.getRecordFactory(null)
     Counters counters = RecordFactoryProvider.getRecordFactory(null)
         .newRecordInstance(Counters.class);
         .newRecordInstance(Counters.class);
-//    counters.groups = new HashMap<String, CounterGroup>();
     return counters;
     return counters;
   }
   }
 
 
@@ -519,7 +530,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
         if (group == null) {
         if (group == null) {
           group = RecordFactoryProvider.getRecordFactory(null)
           group = RecordFactoryProvider.getRecordFactory(null)
               .newRecordInstance(CounterGroup.class);
               .newRecordInstance(CounterGroup.class);
-//          group.counters = new HashMap<CharSequence, Counter>();
           group.setName(otherGroup.getName());
           group.setName(otherGroup.getName());
           counters.setCounterGroup(group.getName(), group);
           counters.setCounterGroup(group.getName(), group);
         }
         }
@@ -1363,7 +1373,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
   private void addDiagnostic(String diag) {
   private void addDiagnostic(String diag) {
     diagnostics.add(diag);
     diagnostics.add(diag);
   }
   }
-
+  
   private static class DiagnosticsUpdateTransition implements
   private static class DiagnosticsUpdateTransition implements
       SingleArcTransition<JobImpl, JobEvent> {
       SingleArcTransition<JobImpl, JobEvent> {
     @Override
     @Override
@@ -1372,6 +1382,18 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
           .getDiagnosticUpdate());
           .getDiagnosticUpdate());
     }
     }
   }
   }
+  
+  private static class CounterUpdateTransition implements
+      SingleArcTransition<JobImpl, JobEvent> {
+    @Override
+    public void transition(JobImpl job, JobEvent event) {
+      JobCounterUpdateEvent jce = (JobCounterUpdateEvent) event;
+      for (JobCounterUpdateEvent.CounterIncrementalUpdate ci : jce
+          .getCounterUpdates()) {
+        job.jobCounters.incrCounter(ci.getCounterKey(), ci.getIncrementValue());
+      }
+    }
+  }
 
 
   private static class InternalErrorTransition implements
   private static class InternalErrorTransition implements
       SingleArcTransition<JobImpl, JobEvent> {
       SingleArcTransition<JobImpl, JobEvent> {

+ 82 - 21
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -49,6 +49,7 @@ import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapred.TaskAttemptContextImpl;
 import org.apache.hadoop.mapred.TaskAttemptContextImpl;
 import org.apache.hadoop.mapred.WrappedJvmID;
 import org.apache.hadoop.mapred.WrappedJvmID;
 import org.apache.hadoop.mapred.WrappedProgressSplitsBlock;
 import org.apache.hadoop.mapred.WrappedProgressSplitsBlock;
+import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -73,6 +74,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
@@ -131,6 +133,8 @@ public abstract class TaskAttemptImpl implements
 
 
   private static final Log LOG = LogFactory.getLog(TaskAttemptImpl.class);
   private static final Log LOG = LogFactory.getLog(TaskAttemptImpl.class);
   private static final long MEMORY_SPLITS_RESOLUTION = 1024; //TODO Make configurable?
   private static final long MEMORY_SPLITS_RESOLUTION = 1024; //TODO Make configurable?
+  private static final int MAP_MEMORY_MB_DEFAULT = 1024;
+  private static final int REDUCE_MEMORY_MB_DEFAULT = 1024;
   private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
 
 
   protected final Configuration conf;
   protected final Configuration conf;
@@ -470,9 +474,9 @@ public abstract class TaskAttemptImpl implements
   private int getMemoryRequired(Configuration conf, TaskType taskType) {
   private int getMemoryRequired(Configuration conf, TaskType taskType) {
     int memory = 1024;
     int memory = 1024;
     if (taskType == TaskType.MAP)  {
     if (taskType == TaskType.MAP)  {
-      memory = conf.getInt(MRJobConfig.MAP_MEMORY_MB, 1024);
+      memory = conf.getInt(MRJobConfig.MAP_MEMORY_MB, MAP_MEMORY_MB_DEFAULT);
     } else if (taskType == TaskType.REDUCE) {
     } else if (taskType == TaskType.REDUCE) {
-      memory = conf.getInt(MRJobConfig.REDUCE_MEMORY_MB, 1024);
+      memory = conf.getInt(MRJobConfig.REDUCE_MEMORY_MB, REDUCE_MEMORY_MB_DEFAULT);
     }
     }
     
     
     return memory;
     return memory;
@@ -903,6 +907,42 @@ public abstract class TaskAttemptImpl implements
       finishTime = clock.getTime();
       finishTime = clock.getTime();
     }
     }
   }
   }
+  
+  private static long computeSlotMillis(TaskAttemptImpl taskAttempt) {
+    TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
+    int slotMemoryReq =
+        taskAttempt.getMemoryRequired(taskAttempt.conf, taskType);
+    int simSlotsRequired =
+        slotMemoryReq
+            / (taskType == TaskType.MAP ? MAP_MEMORY_MB_DEFAULT
+                : REDUCE_MEMORY_MB_DEFAULT);
+    // Simulating MRv1 slots for counters by assuming *_MEMORY_MB_DEFAULT
+    // corresponds to a MrV1 slot.
+    // Fallow slot millis is not applicable in MRv2 - since a container is
+    // either assigned with the required memory or is not. No partial
+    // reserveations
+    long slotMillisIncrement =
+        simSlotsRequired
+            * (taskAttempt.getFinishTime() - taskAttempt.getLaunchTime());
+    return slotMillisIncrement;
+  }
+  
+  private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed(
+      TaskAttemptImpl taskAttempt) {
+    TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
+    JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId());
+    
+    long slotMillisIncrement = computeSlotMillis(taskAttempt);
+    
+    if (taskType == TaskType.MAP) {
+      jce.addCounterUpdate(JobCounter.NUM_FAILED_MAPS, 1);
+      jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement);
+    } else {
+      jce.addCounterUpdate(JobCounter.NUM_FAILED_REDUCES, 1);
+      jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement);
+    }
+    return jce;
+  }
 
 
   private static TaskAttemptUnsuccessfulCompletionEvent createTaskAttemptUnsuccessfulCompletionEvent(
   private static TaskAttemptUnsuccessfulCompletionEvent createTaskAttemptUnsuccessfulCompletionEvent(
       TaskAttemptImpl taskAttempt, TaskAttemptState attemptState) {
       TaskAttemptImpl taskAttempt, TaskAttemptState attemptState) {
@@ -1080,8 +1120,11 @@ public abstract class TaskAttemptImpl implements
           break;
           break;
       }
       }
       if (taskAttempt.getLaunchTime() != 0) {
       if (taskAttempt.getLaunchTime() != 0) {
-      TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(
-          taskAttempt, finalState);
+        TaskAttemptUnsuccessfulCompletionEvent tauce =
+            createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
+                finalState);
+        taskAttempt.eventHandler
+            .handle(createJobCounterUpdateEventTAFailed(taskAttempt));
         taskAttempt.eventHandler.handle(new JobHistoryEvent(
         taskAttempt.eventHandler.handle(new JobHistoryEvent(
             taskAttempt.attemptId.getTaskId().getJobId(), tauce));
             taskAttempt.attemptId.getTaskId().getJobId(), tauce));
       } else {
       } else {
@@ -1106,6 +1149,15 @@ public abstract class TaskAttemptImpl implements
       InetSocketAddress nodeHttpInetAddr =
       InetSocketAddress nodeHttpInetAddr =
           NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO:
           NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO:
                                                                   // Costly?
                                                                   // Costly?
+      JobCounterUpdateEvent jce =
+          new JobCounterUpdateEvent(taskAttempt.attemptId.getTaskId()
+              .getJobId());
+      jce.addCounterUpdate(
+          taskAttempt.attemptId.getTaskId().getTaskType() == TaskType.MAP ? 
+              JobCounter.TOTAL_LAUNCHED_MAPS: JobCounter.TOTAL_LAUNCHED_REDUCES
+              , 1);
+      taskAttempt.eventHandler.handle(jce);
+      
       TaskAttemptStartedEvent tase =
       TaskAttemptStartedEvent tase =
         new TaskAttemptStartedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
         new TaskAttemptStartedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
             TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
             TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
@@ -1163,24 +1215,22 @@ public abstract class TaskAttemptImpl implements
       String taskType = 
       String taskType = 
           TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()).toString();
           TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()).toString();
       LOG.info("In TaskAttemptImpl taskType: " + taskType);
       LOG.info("In TaskAttemptImpl taskType: " + taskType);
+      long slotMillis = computeSlotMillis(taskAttempt);
+      JobCounterUpdateEvent jce =
+          new JobCounterUpdateEvent(taskAttempt.attemptId.getTaskId()
+              .getJobId());
+      jce.addCounterUpdate(
+        taskAttempt.attemptId.getTaskId().getTaskType() == TaskType.MAP ? 
+          JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES,
+          slotMillis);
+      taskAttempt.eventHandler.handle(jce);
       taskAttempt.logAttemptFinishedEvent(TaskAttemptState.SUCCEEDED);
       taskAttempt.logAttemptFinishedEvent(TaskAttemptState.SUCCEEDED);
-          /*
-      TaskAttemptFinishedEvent tfe =
-          new TaskAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
-          TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
-          TaskAttemptState.SUCCEEDED.toString(), 
-          taskAttempt.reportedStatus.finishTime, "hostname", 
-          TaskAttemptState.SUCCEEDED.toString(), 
-          TypeConverter.fromYarn(taskAttempt.getCounters()));
-      taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, tfe));
-      */
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
           taskAttempt.attemptId,
           taskAttempt.attemptId,
           TaskEventType.T_ATTEMPT_SUCCEEDED));
           TaskEventType.T_ATTEMPT_SUCCEEDED));
       taskAttempt.eventHandler.handle
       taskAttempt.eventHandler.handle
       (new SpeculatorEvent
       (new SpeculatorEvent
           (taskAttempt.reportedStatus, taskAttempt.clock.getTime()));
           (taskAttempt.reportedStatus, taskAttempt.clock.getTime()));
-
    }
    }
   }
   }
 
 
@@ -1190,9 +1240,13 @@ public abstract class TaskAttemptImpl implements
     public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
     public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
       // set the finish time
       // set the finish time
       taskAttempt.setFinishTime();
       taskAttempt.setFinishTime();
+      
       if (taskAttempt.getLaunchTime() != 0) {
       if (taskAttempt.getLaunchTime() != 0) {
-      TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(
-          taskAttempt, TaskAttemptState.FAILED);
+        taskAttempt.eventHandler
+            .handle(createJobCounterUpdateEventTAFailed(taskAttempt));
+        TaskAttemptUnsuccessfulCompletionEvent tauce =
+            createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
+                TaskAttemptState.FAILED);
         taskAttempt.eventHandler.handle(new JobHistoryEvent(
         taskAttempt.eventHandler.handle(new JobHistoryEvent(
             taskAttempt.attemptId.getTaskId().getJobId(), tauce));
             taskAttempt.attemptId.getTaskId().getJobId(), tauce));
         // taskAttempt.logAttemptFinishedEvent(TaskAttemptState.FAILED); Not
         // taskAttempt.logAttemptFinishedEvent(TaskAttemptState.FAILED); Not
@@ -1245,9 +1299,13 @@ public abstract class TaskAttemptImpl implements
       taskAttempt.addDiagnosticInfo("Too Many fetch failures.Failing the attempt");
       taskAttempt.addDiagnosticInfo("Too Many fetch failures.Failing the attempt");
       //set the finish time
       //set the finish time
       taskAttempt.setFinishTime();
       taskAttempt.setFinishTime();
+      
       if (taskAttempt.getLaunchTime() != 0) {
       if (taskAttempt.getLaunchTime() != 0) {
-      TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(
-          taskAttempt, TaskAttemptState.FAILED);
+        taskAttempt.eventHandler
+            .handle(createJobCounterUpdateEventTAFailed(taskAttempt));
+        TaskAttemptUnsuccessfulCompletionEvent tauce =
+            createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
+                TaskAttemptState.FAILED);
         taskAttempt.eventHandler.handle(new JobHistoryEvent(
         taskAttempt.eventHandler.handle(new JobHistoryEvent(
             taskAttempt.attemptId.getTaskId().getJobId(), tauce));
             taskAttempt.attemptId.getTaskId().getJobId(), tauce));
       }else {
       }else {
@@ -1268,8 +1326,11 @@ public abstract class TaskAttemptImpl implements
       //set the finish time
       //set the finish time
       taskAttempt.setFinishTime();
       taskAttempt.setFinishTime();
       if (taskAttempt.getLaunchTime() != 0) {
       if (taskAttempt.getLaunchTime() != 0) {
-      TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(
-          taskAttempt, TaskAttemptState.KILLED);
+        taskAttempt.eventHandler
+            .handle(createJobCounterUpdateEventTAFailed(taskAttempt));
+        TaskAttemptUnsuccessfulCompletionEvent tauce =
+            createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
+                TaskAttemptState.KILLED);
         taskAttempt.eventHandler.handle(new JobHistoryEvent(
         taskAttempt.eventHandler.handle(new JobHistoryEvent(
             taskAttempt.attemptId.getTaskId().getJobId(), tauce));
             taskAttempt.attemptId.getTaskId().getJobId(), tauce));
       }else {
       }else {

+ 1 - 1
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java

@@ -254,7 +254,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     this.jobToken = jobToken;
     this.jobToken = jobToken;
     this.metrics = metrics;
     this.metrics = metrics;
 
 
-    if (completedTasksFromPreviousRun != null 
+    if (completedTasksFromPreviousRun != null
         && completedTasksFromPreviousRun.contains(taskId)) {
         && completedTasksFromPreviousRun.contains(taskId)) {
       LOG.info("Task is from previous run " + taskId);
       LOG.info("Task is from previous run " + taskId);
       startCount = startCount - 1;
       startCount = startCount - 1;

+ 13 - 0
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java

@@ -22,11 +22,15 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -74,6 +78,15 @@ public class LocalContainerAllocator extends RMCommunicator
       container.setContainerToken(null);
       container.setContainerToken(null);
       container.setNodeHttpAddress("localhost:9999");
       container.setNodeHttpAddress("localhost:9999");
       // send the container-assigned event to task attempt
       // send the container-assigned event to task attempt
+
+      if (event.getAttemptID().getTaskId().getTaskType() == TaskType.MAP) {
+        JobCounterUpdateEvent jce =
+            new JobCounterUpdateEvent(event.getAttemptID().getTaskId()
+                .getJobId());
+        // TODO Setting OTHER_LOCAL_MAP for now.
+        jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
+        eventHandler.handle(jce);
+      }
       eventHandler.handle(new TaskAttemptContainerAssignedEvent(
       eventHandler.handle(new TaskAttemptContainerAssignedEvent(
           event.getAttemptID(), container));
           event.getAttemptID(), container));
     }
     }

+ 1 - 0
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java

@@ -46,6 +46,7 @@ public class ContainerRequestEvent extends ContainerAllocatorEvent {
   public static ContainerRequestEvent createContainerRequestEventForFailedContainer(
   public static ContainerRequestEvent createContainerRequestEventForFailedContainer(
       TaskAttemptId attemptID, 
       TaskAttemptId attemptID, 
       Resource capability) {
       Resource capability) {
+    //ContainerRequest for failed events does not consider rack / node locality?
     return new ContainerRequestEvent(attemptID, capability);
     return new ContainerRequestEvent(attemptID, capability);
   }
   }
 
 

+ 34 - 0
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -18,6 +18,8 @@
 
 
 package org.apache.hadoop.mapreduce.v2.app.rm;
 package org.apache.hadoop.mapreduce.v2.app.rm;
 
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Comparator;
@@ -34,12 +36,14 @@ import java.util.Map.Entry;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AMConstants;
 import org.apache.hadoop.mapreduce.v2.app.AMConstants;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
@@ -456,6 +460,7 @@ public class RMContainerAllocator extends RMContainerRequestor
     private final LinkedList<TaskAttemptId> earlierFailedMaps = 
     private final LinkedList<TaskAttemptId> earlierFailedMaps = 
       new LinkedList<TaskAttemptId>();
       new LinkedList<TaskAttemptId>();
     
     
+    /** Maps from a host to a list of Map tasks with data on the host */
     private final Map<String, LinkedList<TaskAttemptId>> mapsHostMapping = 
     private final Map<String, LinkedList<TaskAttemptId>> mapsHostMapping = 
       new HashMap<String, LinkedList<TaskAttemptId>>();
       new HashMap<String, LinkedList<TaskAttemptId>>();
     private final Map<String, LinkedList<TaskAttemptId>> mapsRackMapping = 
     private final Map<String, LinkedList<TaskAttemptId>> mapsRackMapping = 
@@ -501,6 +506,18 @@ public class RMContainerAllocator extends RMContainerRequestor
         request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP);
         request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP);
       } else {
       } else {
         for (String host : event.getHosts()) {
         for (String host : event.getHosts()) {
+          //host comes from data splitLocations which are hostnames. Containers
+          // use IP addresses.
+          //TODO Temporary fix for locality. Use resolvers from h-common. 
+          // Cache to make this more efficient ?
+          InetAddress addr = null;
+          try {
+            addr = InetAddress.getByName(host);
+          } catch (UnknownHostException e) {
+            LOG.warn("Unable to resolve host to IP for host [: " + host + "]");
+          }
+          if (addr != null) //Fallback to host if resolve fails.
+            host = addr.getHostAddress();
           LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
           LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
           if (list == null) {
           if (list == null) {
             list = new LinkedList<TaskAttemptId>();
             list = new LinkedList<TaskAttemptId>();
@@ -585,6 +602,7 @@ public class RMContainerAllocator extends RMContainerRequestor
       //try to assign to earlierFailedMaps if present
       //try to assign to earlierFailedMaps if present
       assigned = assignToFailedMap(allocated);
       assigned = assignToFailedMap(allocated);
       
       
+      //Assign to reduces before assigning to maps ?
       if (assigned == null) {
       if (assigned == null) {
         assigned = assignToReduce(allocated);
         assigned = assignToReduce(allocated);
       }
       }
@@ -606,6 +624,10 @@ public class RMContainerAllocator extends RMContainerRequestor
         TaskAttemptId tId = earlierFailedMaps.removeFirst();
         TaskAttemptId tId = earlierFailedMaps.removeFirst();
         if (maps.containsKey(tId)) {
         if (maps.containsKey(tId)) {
           assigned = maps.remove(tId);
           assigned = maps.remove(tId);
+          JobCounterUpdateEvent jce =
+            new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
+          jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
+          eventHandler.handle(jce);
           LOG.info("Assigned from earlierFailedMaps");
           LOG.info("Assigned from earlierFailedMaps");
           break;
           break;
         }
         }
@@ -638,6 +660,10 @@ public class RMContainerAllocator extends RMContainerRequestor
           TaskAttemptId tId = list.removeFirst();
           TaskAttemptId tId = list.removeFirst();
           if (maps.containsKey(tId)) {
           if (maps.containsKey(tId)) {
             assigned = maps.remove(tId);
             assigned = maps.remove(tId);
+            JobCounterUpdateEvent jce =
+              new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
+            jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1);
+            eventHandler.handle(jce);
             hostLocalAssigned++;
             hostLocalAssigned++;
             LOG.info("Assigned based on host match " + host);
             LOG.info("Assigned based on host match " + host);
             break;
             break;
@@ -650,6 +676,10 @@ public class RMContainerAllocator extends RMContainerRequestor
             TaskAttemptId tId = list.removeFirst();
             TaskAttemptId tId = list.removeFirst();
             if (maps.containsKey(tId)) {
             if (maps.containsKey(tId)) {
               assigned = maps.remove(tId);
               assigned = maps.remove(tId);
+              JobCounterUpdateEvent jce =
+                new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
+              jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1);
+              eventHandler.handle(jce);
               rackLocalAssigned++;
               rackLocalAssigned++;
               LOG.info("Assigned based on rack match " + rack);
               LOG.info("Assigned based on rack match " + rack);
               break;
               break;
@@ -658,6 +688,10 @@ public class RMContainerAllocator extends RMContainerRequestor
           if (assigned == null && maps.size() > 0) {
           if (assigned == null && maps.size() > 0) {
             TaskAttemptId tId = maps.keySet().iterator().next();
             TaskAttemptId tId = maps.keySet().iterator().next();
             assigned = maps.remove(tId);
             assigned = maps.remove(tId);
+            JobCounterUpdateEvent jce =
+              new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
+            jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
+            eventHandler.handle(jce);
             LOG.info("Assigned based on * match");
             LOG.info("Assigned based on * match");
             break;
             break;
           }
           }

+ 3 - 1
mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java

@@ -469,7 +469,9 @@ public class TestRMContainerAllocator {
         return new EventHandler() {
         return new EventHandler() {
           @Override
           @Override
           public void handle(Event event) {
           public void handle(Event event) {
-            events.add((TaskAttemptContainerAssignedEvent) event);
+            if (event instanceof TaskAttemptContainerAssignedEvent) {
+              events.add((TaskAttemptContainerAssignedEvent) event);
+            } //Ignoring JobCounterUpdateEvents
           }
           }
         };
         };
       }
       }

+ 2 - 0
mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Counters.java

@@ -11,4 +11,6 @@ public interface Counters {
   public abstract void setCounterGroup(String key, CounterGroup value);
   public abstract void setCounterGroup(String key, CounterGroup value);
   public abstract void removeCounterGroup(String key);
   public abstract void removeCounterGroup(String key);
   public abstract void clearCounterGroups();
   public abstract void clearCounterGroups();
+  
+  public abstract void incrCounter(Enum<?> key, long amount);
 }
 }

+ 20 - 0
mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/CountersPBImpl.java

@@ -78,6 +78,26 @@ public class CountersPBImpl extends ProtoBase<CountersProto> implements Counters
     CounterGroup group = getCounterGroup(key.getDeclaringClass().getName());
     CounterGroup group = getCounterGroup(key.getDeclaringClass().getName());
     return group == null ? null : group.getCounter(key.name());
     return group == null ? null : group.getCounter(key.name());
   }
   }
+
+  @Override
+  public void incrCounter(Enum<?> key, long amount) {
+    String groupName = key.getDeclaringClass().getName();
+    if (getCounterGroup(groupName) == null) {
+      CounterGroup cGrp = new CounterGroupPBImpl();
+      cGrp.setName(groupName);
+      cGrp.setDisplayName(groupName);
+      setCounterGroup(groupName, cGrp);
+    }
+    if (getCounterGroup(groupName).getCounter(key.name()) == null) {
+      Counter c = new CounterPBImpl();
+      c.setName(key.name());
+      c.setDisplayName(key.name());
+      c.setValue(0l);
+      getCounterGroup(groupName).setCounter(key.name(), c);
+    }
+    Counter counter = getCounterGroup(groupName).getCounter(key.name());
+    counter.setValue(counter.getValue() + amount);
+  }
  
  
   private void initCounterGroups() {
   private void initCounterGroups() {
     if (this.counterGroups != null) {
     if (this.counterGroups != null) {

+ 54 - 6
mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java

@@ -48,7 +48,9 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -124,7 +126,6 @@ public class TestMRJobs {
   @Test
   @Test
   public void testSleepJob() throws IOException, InterruptedException,
   public void testSleepJob() throws IOException, InterruptedException,
       ClassNotFoundException { 
       ClassNotFoundException { 
-
     LOG.info("\n\n\nStarting testSleepJob().");
     LOG.info("\n\n\nStarting testSleepJob().");
 
 
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
@@ -148,17 +149,35 @@ public class TestMRJobs {
     boolean succeeded = job.waitForCompletion(true);
     boolean succeeded = job.waitForCompletion(true);
     Assert.assertTrue(succeeded);
     Assert.assertTrue(succeeded);
     Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
     Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
-
+    verifySleepJobCounters(job);
+    
+    
     // TODO later:  add explicit "isUber()" checks of some sort (extend
     // TODO later:  add explicit "isUber()" checks of some sort (extend
     // JobStatus?)--compare against MRJobConfig.JOB_UBERTASK_ENABLE value
     // JobStatus?)--compare against MRJobConfig.JOB_UBERTASK_ENABLE value
   }
   }
 
 
+  protected void verifySleepJobCounters(Job job) throws InterruptedException,
+      IOException {
+    Counters counters = job.getCounters();
+    Assert.assertEquals(3, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS)
+        .getValue());
+    Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+        .getValue());
+    Assert.assertEquals(2,
+        counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue());
+    Assert
+        .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
+            && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
+    Assert
+        .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
+            && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
+  }
+
   @Test
   @Test
   public void testRandomWriter() throws IOException, InterruptedException,
   public void testRandomWriter() throws IOException, InterruptedException,
       ClassNotFoundException {
       ClassNotFoundException {
-
+    
     LOG.info("\n\n\nStarting testRandomWriter().");
     LOG.info("\n\n\nStarting testRandomWriter().");
-
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
       LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
       LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
                + " not found. Not running test.");
                + " not found. Not running test.");
@@ -169,8 +188,8 @@ public class TestMRJobs {
     mrCluster.getConfig().set(RandomTextWriterJob.TOTAL_BYTES, "3072");
     mrCluster.getConfig().set(RandomTextWriterJob.TOTAL_BYTES, "3072");
     mrCluster.getConfig().set(RandomTextWriterJob.BYTES_PER_MAP, "1024");
     mrCluster.getConfig().set(RandomTextWriterJob.BYTES_PER_MAP, "1024");
     Job job = randomWriterJob.createJob(mrCluster.getConfig());
     Job job = randomWriterJob.createJob(mrCluster.getConfig());
-    Path outputDir = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
-        "random-output");
+    Path outputDir =
+        new Path(mrCluster.getTestWorkDir().getAbsolutePath(), "random-output");
     FileOutputFormat.setOutputPath(job, outputDir);
     FileOutputFormat.setOutputPath(job, outputDir);
     job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
     job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
     job.setJarByClass(RandomTextWriterJob.class);
     job.setJarByClass(RandomTextWriterJob.class);
@@ -179,6 +198,7 @@ public class TestMRJobs {
     Assert.assertTrue(succeeded);
     Assert.assertTrue(succeeded);
     Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
     Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
     // Make sure there are three files in the output-dir
     // Make sure there are three files in the output-dir
+    
     RemoteIterator<FileStatus> iterator =
     RemoteIterator<FileStatus> iterator =
         FileContext.getFileContext(mrCluster.getConfig()).listStatus(
         FileContext.getFileContext(mrCluster.getConfig()).listStatus(
             outputDir);
             outputDir);
@@ -191,9 +211,22 @@ public class TestMRJobs {
       }
       }
     }
     }
     Assert.assertEquals("Number of part files is wrong!", 3, count);
     Assert.assertEquals("Number of part files is wrong!", 3, count);
+    verifyRandomWriterCounters(job);
 
 
     // TODO later:  add explicit "isUber()" checks of some sort
     // TODO later:  add explicit "isUber()" checks of some sort
   }
   }
+  
+  protected void verifyRandomWriterCounters(Job job)
+      throws InterruptedException, IOException {
+    Counters counters = job.getCounters();
+    Assert.assertEquals(3, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS)
+        .getValue());
+    Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+        .getValue());
+    Assert
+        .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
+            && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
+  }
 
 
   @Test
   @Test
   public void testFailingMapper() throws IOException, InterruptedException,
   public void testFailingMapper() throws IOException, InterruptedException,
@@ -227,9 +260,24 @@ public class TestMRJobs {
     Assert.assertEquals(TaskCompletionEvent.Status.FAILED, 
     Assert.assertEquals(TaskCompletionEvent.Status.FAILED, 
         events[1].getStatus().FAILED);
         events[1].getStatus().FAILED);
     Assert.assertEquals(JobStatus.State.FAILED, job.getJobState());
     Assert.assertEquals(JobStatus.State.FAILED, job.getJobState());
+    verifyFailingMapperCounters(job);
 
 
     // TODO later:  add explicit "isUber()" checks of some sort
     // TODO later:  add explicit "isUber()" checks of some sort
   }
   }
+  
+  protected void verifyFailingMapperCounters(Job job)
+      throws InterruptedException, IOException {
+    Counters counters = job.getCounters();
+    Assert.assertEquals(2, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS)
+        .getValue());
+    Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+        .getValue());
+    Assert.assertEquals(2, counters.findCounter(JobCounter.NUM_FAILED_MAPS)
+        .getValue());
+    Assert
+        .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
+            && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
+  }
 
 
   protected Job runFailingMapperJob()
   protected Job runFailingMapperJob()
   throws IOException, InterruptedException, ClassNotFoundException {
   throws IOException, InterruptedException, ClassNotFoundException {

+ 64 - 0
mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java

@@ -25,7 +25,9 @@ import junit.framework.Assert;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -55,6 +57,32 @@ public class TestUberAM extends TestMRJobs {
     }
     }
     super.testSleepJob();
     super.testSleepJob();
   }
   }
+  
+  @Override
+  protected void verifySleepJobCounters(Job job) throws InterruptedException,
+      IOException {
+    Counters counters = job.getCounters();
+
+    Assert.assertEquals(3, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS)
+        .getValue());
+    Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+        .getValue());
+    Assert.assertEquals(1,
+        counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue());
+    Assert
+        .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
+            && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
+    Assert
+        .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
+            && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
+
+    Assert.assertEquals(3, counters.findCounter(JobCounter.NUM_UBER_SUBMAPS)
+        .getValue());
+    Assert.assertEquals(1, counters.findCounter(JobCounter.NUM_UBER_SUBREDUCES)
+        .getValue());
+    Assert.assertEquals(4,
+        counters.findCounter(JobCounter.TOTAL_LAUNCHED_UBERTASKS).getValue());
+  }
 
 
   @Override
   @Override
   public void testRandomWriter()
   public void testRandomWriter()
@@ -62,6 +90,17 @@ public class TestUberAM extends TestMRJobs {
     super.testRandomWriter();
     super.testRandomWriter();
   }
   }
 
 
+  @Override
+  protected void verifyRandomWriterCounters(Job job)
+      throws InterruptedException, IOException {
+    super.verifyRandomWriterCounters(job);
+    Counters counters = job.getCounters();
+    Assert.assertEquals(3, counters.findCounter(JobCounter.NUM_UBER_SUBMAPS)
+        .getValue());
+    Assert.assertEquals(3,
+        counters.findCounter(JobCounter.TOTAL_LAUNCHED_UBERTASKS).getValue());
+  }
+
   @Override
   @Override
   public void testFailingMapper()
   public void testFailingMapper()
   throws IOException, InterruptedException, ClassNotFoundException {
   throws IOException, InterruptedException, ClassNotFoundException {
@@ -100,9 +139,34 @@ public class TestUberAM extends TestMRJobs {
     Assert.assertEquals(TaskCompletionEvent.Status.FAILED,
     Assert.assertEquals(TaskCompletionEvent.Status.FAILED,
         events[0].getStatus().FAILED);
         events[0].getStatus().FAILED);
     Assert.assertEquals(JobStatus.State.FAILED, job.getJobState());
     Assert.assertEquals(JobStatus.State.FAILED, job.getJobState());
+    
+    //Disabling till UberAM honors MRJobConfig.MAP_MAX_ATTEMPTS
+    //verifyFailingMapperCounters(job);
 
 
     // TODO later:  add explicit "isUber()" checks of some sort
     // TODO later:  add explicit "isUber()" checks of some sort
   }
   }
+  
+  @Override
+  protected void verifyFailingMapperCounters(Job job)
+      throws InterruptedException, IOException {
+    Counters counters = job.getCounters();
+    Assert.assertEquals(2, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS)
+        .getValue());
+    Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+        .getValue());
+    Assert.assertEquals(2, counters.findCounter(JobCounter.NUM_FAILED_MAPS)
+        .getValue());
+    Assert
+        .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
+            && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
+
+    Assert.assertEquals(2,
+        counters.findCounter(JobCounter.TOTAL_LAUNCHED_UBERTASKS).getValue());
+    Assert.assertEquals(2, counters.findCounter(JobCounter.NUM_UBER_SUBMAPS)
+        .getValue());
+    Assert.assertEquals(2, counters
+        .findCounter(JobCounter.NUM_FAILED_UBERTASKS).getValue());
+  }
 
 
 //@Test  //FIXME:  if/when the corresponding TestMRJobs test gets enabled, do so here as well (potentially with mods for ubermode)
 //@Test  //FIXME:  if/when the corresponding TestMRJobs test gets enabled, do so here as well (potentially with mods for ubermode)
   public void testSleepJobWithSecurityOn()
   public void testSleepJobWithSecurityOn()