瀏覽代碼

MAPREDUCE-2821. Added missing fields (resourcePerMap & resourcePerReduce) to JobSummary logs. Contributed by Mahadev Konar.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1188528 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 13 年之前
父節點
當前提交
fffdf661e3
共有 14 個文件被更改,包括 240 次插入122 次删除
  1. 3 0
      hadoop-mapreduce-project/CHANGES.txt
  2. 30 11
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
  3. 20 25
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java
  4. 5 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
  5. 34 10
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
  6. 13 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
  7. 2 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr
  8. 0 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java
  9. 1 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java
  10. 74 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java
  11. 2 6
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
  12. 1 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java
  13. 54 51
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
  14. 1 4
      hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -1761,6 +1761,9 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-2746. Yarn servers can't communicate with each other with 
     hadoop.security.authorization set to true (acmurthy via mahadev)
 
+    MAPREDUCE-2821. Added missing fields (resourcePerMap & resourcePerReduce)
+    to JobSummary logs. (mahadev via acmurthy)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

+ 30 - 11
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.v2.api.records.Counter;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@@ -91,7 +92,8 @@ public class JobHistoryEventHandler extends AbstractService
   }
 
   /* (non-Javadoc)
-   * @see org.apache.hadoop.yarn.service.AbstractService#init(org.apache.hadoop.conf.Configuration)
+   * @see org.apache.hadoop.yarn.service.AbstractService#init(org.
+   * apache.hadoop.conf.Configuration)
    * Initializes the FileSystem and Path objects for the log and done directories.
    * Creates these directories if they do not already exist.
    */
@@ -155,14 +157,15 @@ public class JobHistoryEventHandler extends AbstractService
                 + doneDirPath
                 + "] based on conf: "
                 + MRJobConfig.MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR
-                + ". Either set to true or pre-create this directory with appropriate permissions";
+                + ". Either set to true or pre-create this directory with" +
+                " appropriate permissions";
         LOG.error(message);
         throw new YarnException(message);
       }
       }
     } catch (IOException e) {
-      LOG.error("Failed checking for the existance of history intermediate done directory: ["
-          + doneDirPath + "]");
+      LOG.error("Failed checking for the existance of history intermediate " +
+      		"done directory: [" + doneDirPath + "]");
       throw new YarnException(e);
     }
 
@@ -380,8 +383,11 @@ public class JobHistoryEventHandler extends AbstractService
       MetaInfo mi = fileMap.get(event.getJobID());
       try {
         HistoryEvent historyEvent = event.getHistoryEvent();
-        mi.writeEvent(historyEvent);
-        processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(), event.getJobID());
+        if (! (historyEvent instanceof NormalizedResourceEvent)) {
+          mi.writeEvent(historyEvent);
+        }
+        processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
+            event.getJobID());
         LOG.info("In HistoryEventHandler "
             + event.getHistoryEvent().getEventType());
       } catch (IOException e) {
@@ -395,7 +401,7 @@ public class JobHistoryEventHandler extends AbstractService
             (JobSubmittedEvent) event.getHistoryEvent();
         mi.getJobIndexInfo().setSubmitTime(jobSubmittedEvent.getSubmitTime());
       }
-
+     
       // If this is JobFinishedEvent, close the writer and setup the job-index
       if (event.getHistoryEvent().getEventType() == EventType.JOB_FINISHED) {
         try {
@@ -415,7 +421,8 @@ public class JobHistoryEventHandler extends AbstractService
       if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED
           || event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) {
         try {
-          JobUnsuccessfulCompletionEvent jucEvent = (JobUnsuccessfulCompletionEvent) event
+          JobUnsuccessfulCompletionEvent jucEvent = 
+              (JobUnsuccessfulCompletionEvent) event
               .getHistoryEvent();
           mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
           mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
@@ -429,7 +436,8 @@ public class JobHistoryEventHandler extends AbstractService
     }
   }
 
-  private void processEventForJobSummary(HistoryEvent event, JobSummary summary, JobId jobId) {
+  public void processEventForJobSummary(HistoryEvent event, JobSummary summary, 
+      JobId jobId) {
     // context.getJob could be used for some of this info as well.
     switch (event.getEventType()) {
     case JOB_SUBMITTED:
@@ -438,6 +446,15 @@ public class JobHistoryEventHandler extends AbstractService
       summary.setQueue(jse.getJobQueueName());
       summary.setJobSubmitTime(jse.getSubmitTime());
       break;
+    case NORMALIZED_RESOURCE:
+      NormalizedResourceEvent normalizedResourceEvent = 
+            (NormalizedResourceEvent) event;
+      if (normalizedResourceEvent.getTaskType() == TaskType.MAP) {
+        summary.setResourcesPerMap(normalizedResourceEvent.getMemory());
+      } else if (normalizedResourceEvent.getTaskType() == TaskType.REDUCE) {
+        summary.setResourcesPerReduce(normalizedResourceEvent.getMemory());
+      }
+      break;  
     case JOB_INITED:
       JobInitedEvent jie = (JobInitedEvent) event;
       summary.setJobLaunchTime(jie.getLaunchTime());
@@ -503,7 +520,8 @@ public class JobHistoryEventHandler extends AbstractService
 
     if (!mi.isWriterActive()) {
       throw new IOException(
-          "Inactive Writer: Likely received multiple JobFinished / JobUnsuccessful events for JobId: ["
+          "Inactive Writer: Likely received multiple JobFinished / " +
+          "JobUnsuccessful events for JobId: ["
               + jobId + "]");
     }
 
@@ -594,7 +612,8 @@ public class JobHistoryEventHandler extends AbstractService
       this.historyFile = historyFile;
       this.confFile = conf;
       this.writer = writer;
-      this.jobIndexInfo = new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null);
+      this.jobIndexInfo = new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1,
+          null);
       this.jobSummary = new JobSummary();
     }
 

+ 20 - 25
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java

@@ -34,7 +34,8 @@ public class JobSummary {
   private int numFailedMaps;
   private int numFinishedReduces;
   private int numFailedReduces;
-  // private int numSlotsPerMap; | Doesn't make sense with potentially different
+  private int resourcesPerMap; // resources used per map/min resource
+  private int resourcesPerReduce; // resources used per reduce/min resource
   // resource models
   // private int numSlotsPerReduce; | Doesn't make sense with potentially
   // different resource models
@@ -112,14 +113,14 @@ public class JobSummary {
     this.numFailedMaps = numFailedMaps;
   }
 
-  // public int getNumSlotsPerMap() {
-  // return numSlotsPerMap;
-  // }
-  //
-  // public void setNumSlotsPerMap(int numSlotsPerMap) {
-  // this.numSlotsPerMap = numSlotsPerMap;
-  // }
-
+  public int getResourcesPerMap() {
+    return resourcesPerMap;
+  }
+  
+  public void setResourcesPerMap(int resourcesPerMap) {
+    this.resourcesPerMap = resourcesPerMap;
+  }
+  
   public int getNumFinishedReduces() {
     return numFinishedReduces;
   }
@@ -136,14 +137,14 @@ public class JobSummary {
     this.numFailedReduces = numFailedReduces;
   }
 
-  // public int getNumSlotsPerReduce() {
-  // return numSlotsPerReduce;
-  // }
-  //
-  // public void setNumSlotsPerReduce(int numSlotsPerReduce) {
-  // this.numSlotsPerReduce = numSlotsPerReduce;
-  // }
-
+  public int getResourcesPerReduce() {
+    return this.resourcesPerReduce;
+  }
+  
+  public void setResourcesPerReduce(int resourcesPerReduce) {
+    this.resourcesPerReduce = resourcesPerReduce;
+  }
+  
   public String getUser() {
     return user;
   }
@@ -184,14 +185,6 @@ public class JobSummary {
     this.reduceSlotSeconds = reduceSlotSeconds;
   }
 
-  // public int getClusterSlotCapacity() {
-  // return clusterSlotCapacity;
-  // }
-  //
-  // public void setClusterSlotCapacity(int clusterSlotCapacity) {
-  // this.clusterSlotCapacity = clusterSlotCapacity;
-  // }
-
   public String getJobSummaryString() {
     SummaryBuilder summary = new SummaryBuilder()
       .add("jobId", jobId)
@@ -200,6 +193,8 @@ public class JobSummary {
       .add("firstMapTaskLaunchTime", firstMapTaskLaunchTime)
       .add("firstReduceTaskLaunchTime", firstReduceTaskLaunchTime)
       .add("finishTime", jobFinishTime)
+      .add("resourcesPerMap", resourcesPerMap)
+      .add("resourcesPerReduce", resourcesPerReduce)
       .add("numMaps", numFinishedMaps + numFailedMaps)
       .add("numReduces", numFinishedReduces + numFailedReduces)
       .add("user", user)

+ 5 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -91,12 +91,12 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
-import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
@@ -115,10 +115,10 @@ import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.hadoop.util.StringUtils;
 
 
 /**
@@ -856,7 +856,7 @@ public abstract class TaskAttemptImpl implements
   private static long computeSlotMillis(TaskAttemptImpl taskAttempt) {
     TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
     int slotMemoryReq =
-        taskAttempt.getMemoryRequired(taskAttempt.conf, taskType);
+       taskAttempt.getMemoryRequired(taskAttempt.conf, taskType);
     int simSlotsRequired =
         slotMemoryReq
             / (taskType == TaskType.MAP ? MAP_MEMORY_MB_DEFAULT
@@ -994,7 +994,7 @@ public abstract class TaskAttemptImpl implements
 
   private static class ContainerAssignedTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({ "unchecked", "deprecation" })
     @Override
     public void transition(final TaskAttemptImpl taskAttempt, 
         TaskAttemptEvent event) {
@@ -1164,6 +1164,7 @@ public abstract class TaskAttemptImpl implements
     @Override
     public void transition(TaskAttemptImpl taskAttempt, 
         TaskAttemptEvent event) {
+      @SuppressWarnings("deprecation")
       TaskAttemptContext taskContext =
         new TaskAttemptContextImpl(new JobConf(taskAttempt.conf),
             TypeConverter.fromYarn(taskAttempt.attemptId));

+ 34 - 10
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.mapreduce.v2.app.rm;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -37,7 +35,12 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
@@ -125,7 +128,7 @@ public class RMContainerAllocator extends RMContainerRequestor
   private float maxReduceRampupLimit = 0;
   private float maxReducePreemptionLimit = 0;
   private float reduceSlowStart = 0;
-
+  
   public RMContainerAllocator(ClientService clientService, AppContext context) {
     super(clientService, context);
   }
@@ -169,6 +172,7 @@ public class RMContainerAllocator extends RMContainerRequestor
     LOG.info("Final Stats: " + getStat());
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public synchronized void handle(ContainerAllocatorEvent event) {
     LOG.info("Processing the event " + event.toString());
@@ -179,7 +183,13 @@ public class RMContainerAllocator extends RMContainerRequestor
         if (mapResourceReqt == 0) {
           mapResourceReqt = reqEvent.getCapability().getMemory();
           int minSlotMemSize = getMinContainerCapability().getMemory();
-          mapResourceReqt = (int) Math.ceil((float) mapResourceReqt/minSlotMemSize) * minSlotMemSize;
+          mapResourceReqt = (int) Math.ceil((float) mapResourceReqt/minSlotMemSize)
+              * minSlotMemSize;
+          JobID id = TypeConverter.fromYarn(applicationId);
+          JobId jobId = TypeConverter.toYarn(id);
+          eventHandler.handle(new JobHistoryEvent(jobId, 
+              new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP,
+              mapResourceReqt)));
           LOG.info("mapResourceReqt:"+mapResourceReqt);
           if (mapResourceReqt > getMaxContainerCapability().getMemory()) {
             String diagMsg = "MAP capability required is more than the supported " +
@@ -199,12 +209,20 @@ public class RMContainerAllocator extends RMContainerRequestor
           reduceResourceReqt = reqEvent.getCapability().getMemory();
           int minSlotMemSize = getMinContainerCapability().getMemory();
           //round off on slotsize
-          reduceResourceReqt = (int) Math.ceil((float) reduceResourceReqt/minSlotMemSize) * minSlotMemSize;
+          reduceResourceReqt = (int) Math.ceil((float) 
+              reduceResourceReqt/minSlotMemSize) * minSlotMemSize;
+          JobID id = TypeConverter.fromYarn(applicationId);
+          JobId jobId = TypeConverter.toYarn(id);
+          eventHandler.handle(new JobHistoryEvent(jobId, 
+              new NormalizedResourceEvent(
+                  org.apache.hadoop.mapreduce.TaskType.REDUCE,
+              reduceResourceReqt)));
           LOG.info("reduceResourceReqt:"+reduceResourceReqt);
           if (reduceResourceReqt > getMaxContainerCapability().getMemory()) {
-            String diagMsg = "REDUCE capability required is more than the supported " +
-            "max container capability in the cluster. Killing the Job. reduceResourceReqt: " + 
-            reduceResourceReqt + " maxContainerCapability:" + getMaxContainerCapability().getMemory();
+            String diagMsg = "REDUCE capability required is more than the " +
+            		"supported max container capability in the cluster. Killing the " +
+            		"Job. reduceResourceReqt: " + reduceResourceReqt +
+            		" maxContainerCapability:" + getMaxContainerCapability().getMemory();
             LOG.info(diagMsg);
             eventHandler.handle(new JobDiagnosticsUpdateEvent(
                 getJob().getID(), diagMsg));
@@ -217,7 +235,8 @@ public class RMContainerAllocator extends RMContainerRequestor
           //add to the front of queue for fail fast
           pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE));
         } else {
-          pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE));//reduces are added to pending and are slowly ramped up
+          pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE));
+          //reduces are added to pending and are slowly ramped up
         }
       }
       
@@ -411,6 +430,7 @@ public class RMContainerAllocator extends RMContainerRequestor
         " availableResources(headroom):" + getAvailableResources();
   }
   
+  @SuppressWarnings("unchecked")
   private List<Container> getResources() throws Exception {
     int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null
     AMResponse response = makeRemoteRequest();
@@ -538,6 +558,7 @@ public class RMContainerAllocator extends RMContainerRequestor
       addContainerReq(req);
     }
     
+    @SuppressWarnings("unchecked")
     private void assign(List<Container> allocatedContainers) {
       Iterator<Container> it = allocatedContainers.iterator();
       LOG.info("Got allocated containers " + allocatedContainers.size());
@@ -694,6 +715,7 @@ public class RMContainerAllocator extends RMContainerRequestor
     }
     
     
+    @SuppressWarnings("unchecked")
     private ContainerRequest assignToFailedMap(Container allocated) {
       //try to assign to earlierFailedMaps if present
       ContainerRequest assigned = null;
@@ -723,6 +745,7 @@ public class RMContainerAllocator extends RMContainerRequestor
       return assigned;
     }
     
+    @SuppressWarnings("unchecked")
     private ContainerRequest assignToMap(Container allocated) {
     //try to assign to maps if present 
       //first by host, then by rack, followed by *
@@ -798,7 +821,8 @@ public class RMContainerAllocator extends RMContainerRequestor
     }
 
     void preemptReduce(int toPreempt) {
-      List<TaskAttemptId> reduceList = new ArrayList(reduces.keySet());
+      List<TaskAttemptId> reduceList = new ArrayList<TaskAttemptId>
+        (reduces.keySet());
       //sort reduces on progress
       Collections.sort(reduceList,
           new Comparator<TaskAttemptId>() {

+ 13 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java

@@ -31,9 +31,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.WrappedJvmID;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@@ -360,6 +363,16 @@ public class MRApp extends MRAppMaster {
         NodeId nodeId = BuilderUtils.newNodeId("localhost", 1234);
         Container container = BuilderUtils.newContainer(cId, nodeId,
             "localhost:9999", null, null, null);
+        JobID id = TypeConverter.fromYarn(applicationId);
+        JobId jobId = TypeConverter.toYarn(id);
+        getContext().getEventHandler().handle(new JobHistoryEvent(jobId, 
+            new NormalizedResourceEvent(
+                org.apache.hadoop.mapreduce.TaskType.REDUCE,
+            100)));
+        getContext().getEventHandler().handle(new JobHistoryEvent(jobId, 
+            new NormalizedResourceEvent(
+                org.apache.hadoop.mapreduce.TaskType.MAP,
+            100)));
         getContext().getEventHandler().handle(
             new TaskAttemptContainerAssignedEvent(event.getAttemptID(),
                 container, null));

+ 2 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr

@@ -225,7 +225,7 @@
           {"name": "counters", "type": "JhCounters"}
       ]
      },
-
+     	
      {"type": "record", "name": "TaskStarted",
       "fields": [
           {"name": "taskid", "type": "string"},
@@ -256,6 +256,7 @@
           "TASK_FINISHED",
           "TASK_FAILED",
           "TASK_UPDATED",
+          "NORMALIZED_RESOURCE",
           "MAP_ATTEMPT_STARTED",
           "MAP_ATTEMPT_FINISHED",
           "MAP_ATTEMPT_FAILED",

+ 0 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java

@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
-import java.io.IOException;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 

+ 1 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java

@@ -18,14 +18,11 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
-import java.io.IOException;
-
+import org.apache.avro.util.Utf8;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.JobID;
 
-import org.apache.avro.util.Utf8;
-
 /**
  * Event to record Failed and Killed completion of jobs
  *

+ 74 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java

@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapreduce.TaskType;
+
+/**
+ * Event to record the normalized map/reduce requirements.
+ * 
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class NormalizedResourceEvent implements HistoryEvent {
+  private int memory;
+  private TaskType taskType;
+  
+  /**
+   * Normalized request when sent to the Resource Manager.
+   * @param taskType the tasktype of the request.
+   * @param memory the normalized memory requirements.
+   */
+  public NormalizedResourceEvent(TaskType taskType, int memory) {
+    this.memory = memory;
+    this.taskType = taskType;
+  }
+  
+  /**
+   * the tasktype for the event.
+   * @return the tasktype for the event.
+   */
+  public TaskType getTaskType() {
+    return this.taskType;
+  }
+  
+  /**
+   * the normalized memory
+   * @return the normalized memory
+   */
+  public int getMemory() {
+    return this.memory;
+  }
+  
+  @Override
+  public EventType getEventType() {
+    return EventType.NORMALIZED_RESOURCE;
+  }
+
+  @Override
+  public Object getDatum() {
+    throw new UnsupportedOperationException("Not a seriable object");
+  }
+
+  @Override
+  public void setDatum(Object datum) {
+    throw new UnsupportedOperationException("Not a seriable object");
+  }
+}

+ 2 - 6
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java

@@ -18,19 +18,15 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
-import java.io.IOException;
-
+import org.apache.avro.util.Utf8;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.ProgressSplitsBlock;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
 
-import org.apache.hadoop.mapred.ProgressSplitsBlock;
-
-import org.apache.avro.util.Utf8;
-
 /**
  * Event to record successful completion of a reduce attempt
  *

+ 1 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java

@@ -18,15 +18,12 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
-import java.io.IOException;
-
+import org.apache.avro.util.Utf8;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
 
-import org.apache.avro.util.Utf8;
-
 /**
  * Event to record the start of a task
  *

+ 54 - 51
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java

@@ -1,20 +1,20 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.hadoop.mapreduce.v2.hs;
 
@@ -54,27 +54,32 @@ import org.junit.Test;
 
 public class TestJobHistoryParsing {
   private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class);
+
   @Test
   public void testHistoryParsing() throws Exception {
     Configuration conf = new Configuration();
     long amStartTimeEst = System.currentTimeMillis();
-    MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(), true);
+    MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(),
+        true);
     app.submit(conf);
     Job job = app.getContext().getAllJobs().values().iterator().next();
     JobId jobId = job.getID();
     LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
     app.waitForState(job, JobState.SUCCEEDED);
-    
-    //make sure all events are flushed
+
+    // make sure all events are flushed
     app.waitForState(Service.STATE.STOPPED);
-    
-    String jobhistoryDir = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
+
+    String jobhistoryDir = JobHistoryUtils
+        .getHistoryIntermediateDoneDirForUser(conf);
     JobHistory jobHistory = new JobHistory();
     jobHistory.init(conf);
-    
-    JobIndexInfo jobIndexInfo = jobHistory.getJobMetaInfo(jobId).getJobIndexInfo();
-    String jobhistoryFileName = FileNameIndexUtils.getDoneFileName(jobIndexInfo);
-    
+
+    JobIndexInfo jobIndexInfo = jobHistory.getJobMetaInfo(jobId)
+        .getJobIndexInfo();
+    String jobhistoryFileName = FileNameIndexUtils
+        .getDoneFileName(jobIndexInfo);
+
     Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
     FSDataInputStream in = null;
     LOG.info("JobHistoryFile is: " + historyFilePath);
@@ -86,27 +91,24 @@ public class TestJobHistoryParsing {
       LOG.info("Can not open history file: " + historyFilePath, ioe);
       throw (new Exception("Can not open History File"));
     }
-    
+
     JobHistoryParser parser = new JobHistoryParser(in);
     JobInfo jobInfo = parser.parse();
-    
-    Assert.assertEquals ("Incorrect username ",
-        "mapred", jobInfo.getUsername());
-    Assert.assertEquals("Incorrect jobName ",
-        "test", jobInfo.getJobname());
-    Assert.assertEquals("Incorrect queuename ",
-        "default", jobInfo.getJobQueueName());
-    Assert.assertEquals("incorrect conf path",
-        "test", jobInfo.getJobConfPath());
-    Assert.assertEquals("incorrect finishedMap ",
-        2, jobInfo.getFinishedMaps());
-    Assert.assertEquals("incorrect finishedReduces ",
-        1, jobInfo.getFinishedReduces());
-    Assert.assertEquals("incorrect uberized ",
-        job.isUber(), jobInfo.getUberized());
+
+    Assert.assertEquals("Incorrect username ", "mapred", jobInfo.getUsername());
+    Assert.assertEquals("Incorrect jobName ", "test", jobInfo.getJobname());
+    Assert.assertEquals("Incorrect queuename ", "default",
+        jobInfo.getJobQueueName());
+    Assert
+        .assertEquals("incorrect conf path", "test", jobInfo.getJobConfPath());
+    Assert.assertEquals("incorrect finishedMap ", 2, jobInfo.getFinishedMaps());
+    Assert.assertEquals("incorrect finishedReduces ", 1,
+        jobInfo.getFinishedReduces());
+    Assert.assertEquals("incorrect uberized ", job.isUber(),
+        jobInfo.getUberized());
     int totalTasks = jobInfo.getAllTasks().size();
     Assert.assertEquals("total number of tasks is incorrect  ", 3, totalTasks);
-    
+
     // Verify aminfo
     Assert.assertEquals(1, jobInfo.getAMInfos().size());
     Assert.assertEquals("testhost", jobInfo.getAMInfos().get(0)
@@ -120,15 +122,15 @@ public class TestJobHistoryParsing {
         && amInfo.getStartTime() >= amStartTimeEst);
 
     ContainerId fakeCid = BuilderUtils.newContainerId(-1, -1, -1, -1);
-    //Assert at taskAttempt level
+    // Assert at taskAttempt level
     for (TaskInfo taskInfo : jobInfo.getAllTasks().values()) {
       int taskAttemptCount = taskInfo.getAllTaskAttempts().size();
-      Assert.assertEquals("total number of task attempts ",
-          1, taskAttemptCount);
-      TaskAttemptInfo taInfo =
-          taskInfo.getAllTaskAttempts().values().iterator().next();
+      Assert
+          .assertEquals("total number of task attempts ", 1, taskAttemptCount);
+      TaskAttemptInfo taInfo = taskInfo.getAllTaskAttempts().values()
+          .iterator().next();
       Assert.assertNotNull(taInfo.getContainerId());
-      //Verify the wrong ctor is not being used. Remove after mrv1 is removed.
+      // Verify the wrong ctor is not being used. Remove after mrv1 is removed.
       Assert.assertFalse(taInfo.getContainerId().equals(fakeCid));
     }
 
@@ -138,9 +140,8 @@ public class TestJobHistoryParsing {
           TypeConverter.fromYarn(task.getID()));
       Assert.assertNotNull("TaskInfo not found", taskInfo);
       for (TaskAttempt taskAttempt : task.getAttempts().values()) {
-        TaskAttemptInfo taskAttemptInfo =
-          taskInfo.getAllTaskAttempts().get(
-              TypeConverter.fromYarn((taskAttempt.getID())));
+        TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get(
+            TypeConverter.fromYarn((taskAttempt.getID())));
         Assert.assertNotNull("TaskAttemptInfo not found", taskAttemptInfo);
         Assert.assertEquals("Incorrect shuffle port for task attempt",
             taskAttempt.getShufflePort(), taskAttemptInfo.getShufflePort());
@@ -151,6 +152,8 @@ public class TestJobHistoryParsing {
         .getIntermediateSummaryFileName(jobId);
     Path summaryFile = new Path(jobhistoryDir, summaryFileName);
     String jobSummaryString = jobHistory.getJobSummary(fc, summaryFile);
+    Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100"));
+    Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100"));
     Assert.assertNotNull(jobSummaryString);
 
     Map<String, String> jobSummaryElements = new HashMap<String, String>();

+ 1 - 4
hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java

@@ -17,16 +17,13 @@
  */
 package org.apache.hadoop.mapreduce.jobhistory;
 
-import java.util.List;
-import java.util.ArrayList;
+import junit.framework.TestCase;
 
 import org.apache.hadoop.mapred.TaskStatus;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskType;
 
-import junit.framework.TestCase;
-
 /**
  * Test various jobhistory events
  */