瀏覽代碼

MAPREDUCE-4838. Add additional fields like Locality, Avataar to the JobHistory logs. Contributed by Arun C Murthy and Zhijie Shen

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1440703 13f79535-47bb-0310-9956-ffa450edef68
Siddharth Seth 12 年之前
父節點
當前提交
a0fd5cf321

+ 3 - 0
CHANGES.txt

@@ -163,6 +163,9 @@ Release 1.2.0 - unreleased
 
     MAPREDUCE-4837. Add webservices for Jobtracker. (Arun C Murthy via hitesh)
 
+    MAPREDUCE-4838. Add additional fields like Locality, Avataar to the 
+    JobHistory logs. (Arun C Murthy and Zhijie Shen via sseth)
+
   OPTIMIZATIONS
 
     HDFS-2533. Backport: Remove needless synchronization on some FSDataSet

+ 24 - 0
src/mapred/org/apache/hadoop/mapred/Avataar.java

@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+enum Avataar {
+  VIRGIN,
+  SPECULATIVE
+}

+ 13 - 0
src/mapred/org/apache/hadoop/mapred/JobConf.java

@@ -313,6 +313,19 @@ public class JobConf extends Configuration {
   public static final String MAPRED_REDUCE_TASK_ENV =
     "mapred.reduce.child.env";
 
+  public static final String WORKFLOW_ID = "mapreduce.workflow.id";
+
+  public static final String WORKFLOW_NAME = "mapreduce.workflow.name";
+
+  public static final String WORKFLOW_NODE_NAME =
+      "mapreduce.workflow.node.name";
+
+  public static final String WORKFLOW_ADJACENCY_PREFIX_STRING =
+      "mapreduce.workflow.adjacency.";
+
+  public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN =
+      "^mapreduce\\.workflow\\.adjacency\\..+";
+
   private Credentials credentials = new Credentials();
   
   /**

+ 102 - 30
src/mapred/org/apache/hadoop/mapred/JobHistory.java

@@ -475,7 +475,8 @@ public class JobHistory {
     ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE, 
     SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT, 
     TRACKER_NAME, STATE_STRING, VERSION, MAP_COUNTERS, REDUCE_COUNTERS,
-    VIEW_JOB, MODIFY_JOB, JOB_QUEUE, FAIL_REASON
+    VIEW_JOB, MODIFY_JOB, JOB_QUEUE, FAIL_REASON, LOCALITY, AVATAAR,
+    WORKFLOW_ID, WORKFLOW_NAME, WORKFLOW_NODE_NAME, WORKFLOW_ADJACENCIES
   }
 
   /**
@@ -787,6 +788,10 @@ public class JobHistory {
                   Keys[] keys, String[] values) {
     log(writers, recordType, keys, values, null);
   }
+
+  static class JobHistoryLogger {
+    static final Log LOG = LogFactory.getLog(JobHistoryLogger.class);
+  }
   
   /**
    * Log a number of keys and values with record. the array length of keys and values
@@ -820,15 +825,19 @@ public class JobHistory {
       builder.append(DELIMITER); 
     }
     builder.append(LINE_DELIMITER_CHAR);
-    
+
+    String logLine = builder.toString();
     for (Iterator<PrintWriter> iter = writers.iterator(); iter.hasNext();) {
       PrintWriter out = iter.next();
-      out.println(builder.toString());
+      out.println(logLine);
       if (out.checkError() && id != null) {
         LOG.info("Logging failed for job " + id + "removing PrintWriter from FileManager");
         iter.remove();
       }
     }
+    if (recordType != RecordTypes.Meta) {
+      JobHistoryLogger.LOG.debug(logLine);
+    }
   }
   
   /**
@@ -1266,6 +1275,34 @@ public class JobHistory {
       return user;
     }
     
+    /**
+     * Get the workflow adjacencies from the job conf
+     * The string returned is of the form "key"="value" "key"="value" ...
+     */
+    public static String getWorkflowAdjacencies(Configuration conf) {
+      int prefixLen = JobConf.WORKFLOW_ADJACENCY_PREFIX_STRING.length();
+      Map<String,String> adjacencies = 
+          conf.getValByRegex(JobConf.WORKFLOW_ADJACENCY_PREFIX_PATTERN);
+      if (adjacencies.isEmpty())
+        return "";
+      int size = 0;
+      for (Entry<String,String> entry : adjacencies.entrySet()) {
+        int keyLen = entry.getKey().length();
+        size += keyLen - prefixLen;
+        size += entry.getValue().length() + 6;
+      }
+      StringBuilder sb = new StringBuilder(size);
+      for (Entry<String,String> entry : adjacencies.entrySet()) {
+        int keyLen = entry.getKey().length();
+        sb.append("\"");
+        sb.append(escapeString(entry.getKey().substring(prefixLen, keyLen)));
+        sb.append("\"=\"");
+        sb.append(escapeString(entry.getValue()));
+        sb.append("\" ");
+      }
+      return sb.toString();
+    }
+    
     /**
      * Get the job history file path given the history filename
      */
@@ -1729,11 +1766,19 @@ public class JobHistory {
                        new Keys[]{Keys.JOBID, Keys.JOBNAME, Keys.USER,
                                   Keys.SUBMIT_TIME, Keys.JOBCONF,
                                   Keys.VIEW_JOB, Keys.MODIFY_JOB,
-                                  Keys.JOB_QUEUE}, 
+                                  Keys.JOB_QUEUE, Keys.WORKFLOW_ID,
+                                  Keys.WORKFLOW_NAME, Keys.WORKFLOW_NODE_NAME,
+                                  Keys.WORKFLOW_ADJACENCIES}, 
                        new String[]{jobId.toString(), jobName, user, 
                                     String.valueOf(submitTime) , jobConfPath,
                                     viewJobACL, modifyJobACL,
-                                    jobConf.getQueueName()}, jobId
+                                    jobConf.getQueueName(),
+                                    jobConf.get(jobConf.WORKFLOW_ID, ""),
+                                    jobConf.get(jobConf.WORKFLOW_NAME, ""),
+                                    jobConf.get(jobConf.WORKFLOW_NODE_NAME, ""),
+                                    getWorkflowAdjacencies(jobConf)
+                                    }, 
+                                    jobId
                       ); 
            
       }catch(IOException e){
@@ -2134,7 +2179,14 @@ public class JobHistory {
     public static void logStarted(TaskAttemptID taskAttemptId, long startTime, String hostName){
       logStarted(taskAttemptId, startTime, hostName, -1, Values.MAP.name());
     }
-    
+
+    @Deprecated
+    public static void logStarted(TaskAttemptID taskAttemptId, long startTime,
+        String trackerName, int httpPort, String taskType) {
+      logStarted(taskAttemptId, startTime, trackerName, httpPort, taskType, 
+          Locality.OFF_SWITCH, Avataar.VIRGIN);
+    }
+
     /**
      * Log start time of this map task attempt.
      *  
@@ -2142,25 +2194,31 @@ public class JobHistory {
      * @param startTime start time of task attempt as reported by task tracker. 
      * @param trackerName name of the tracker executing the task attempt.
      * @param httpPort http port of the task tracker executing the task attempt
-     * @param taskType Whether the attempt is cleanup or setup or map 
+     * @param taskType Whether the attempt is cleanup or setup or map
+     * @param locality the data locality of the task attempt
+     * @param Avataar the avataar of the task attempt
      */
     public static void logStarted(TaskAttemptID taskAttemptId, long startTime,
                                   String trackerName, int httpPort, 
-                                  String taskType) {
+                                  String taskType,
+                                  Locality locality, Avataar avataar) {
       JobID id = taskAttemptId.getJobID();
       ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
       if (null != writer){
         JobHistory.log(writer, RecordTypes.MapAttempt, 
                        new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
-                                   Keys.TASK_ATTEMPT_ID, Keys.START_TIME, 
-                                   Keys.TRACKER_NAME, Keys.HTTP_PORT},
+                                   Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
+                                   Keys.TRACKER_NAME, Keys.HTTP_PORT,
+                                   Keys.LOCALITY, Keys.AVATAAR},
                        new String[]{taskType,
                                     taskAttemptId.getTaskID().toString(), 
                                     taskAttemptId.toString(), 
                                     String.valueOf(startTime), trackerName,
-                                    httpPort == -1 ? "" : 
-                                      String.valueOf(httpPort)}, id); 
+                                    httpPort == -1 ? "" : String.valueOf(httpPort),
+                                    locality.toString(), avataar.toString()},
+                       id
+                       ); 
       }
     }
     
@@ -2322,7 +2380,15 @@ public class JobHistory {
                                   long startTime, String hostName){
       logStarted(taskAttemptId, startTime, hostName, -1, Values.REDUCE.name());
     }
-    
+
+    @Deprecated
+    public static void logStarted(TaskAttemptID taskAttemptId, 
+        long startTime, String trackerName, 
+        int httpPort, 
+        String taskType) {
+      logStarted(taskAttemptId, startTime, trackerName, httpPort, taskType, 
+          Locality.OFF_SWITCH, Avataar.VIRGIN);
+    }
     /**
      * Log start time of  Reduce task attempt. 
      * 
@@ -2330,27 +2396,33 @@ public class JobHistory {
      * @param startTime start time
      * @param trackerName tracker name 
      * @param httpPort the http port of the tracker executing the task attempt
-     * @param taskType Whether the attempt is cleanup or setup or reduce 
+     * @param taskType Whether the attempt is cleanup or setup or reduce
+     * @param locality the data locality of the task attempt
+     * @param Avataar the avataar of the task attempt
      */
     public static void logStarted(TaskAttemptID taskAttemptId, 
                                   long startTime, String trackerName, 
                                   int httpPort, 
-                                  String taskType) {
-              JobID id = taskAttemptId.getJobID();
-	      ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
-
-              if (null != writer){
-		  JobHistory.log(writer, RecordTypes.ReduceAttempt, 
-				 new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, 
-					      Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
-					      Keys.TRACKER_NAME, Keys.HTTP_PORT},
-				 new String[]{taskType,
-					      taskAttemptId.getTaskID().toString(), 
-					      taskAttemptId.toString(), 
-					      String.valueOf(startTime), trackerName,
-					      httpPort == -1 ? "" : 
-						String.valueOf(httpPort)}, id); 
-              }
+                                  String taskType,
+                                  Locality locality, Avataar avataar) {
+      JobID id = taskAttemptId.getJobID();
+	    ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
+
+      if (null != writer){
+        JobHistory.log(writer, RecordTypes.ReduceAttempt,
+            new Keys[] {Keys.TASK_TYPE, Keys.TASKID,
+                Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
+                Keys.TRACKER_NAME, Keys.HTTP_PORT,
+                Keys.LOCALITY, Keys.AVATAAR},
+            new String[]{taskType,
+                taskAttemptId.getTaskID().toString(), 
+                taskAttemptId.toString(), 
+                String.valueOf(startTime), trackerName,
+                httpPort == -1 ? "" : 
+                String.valueOf(httpPort),
+                locality.toString(), avataar.toString()}, 
+            id); 
+        }
 	    }
 	    
 	    /**

+ 49 - 16
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -1761,6 +1761,7 @@ public class JobInProgress {
     //
     // So to simplify, increment the data locality counter whenever there is 
     // data locality.
+    Locality locality = Locality.OFF_SWITCH;
     if (tip.isMapTask() && !tip.isJobSetupTask() && !tip.isJobCleanupTask()) {
       // increment the data locality counter for maps
       Node tracker = jobtracker.getNode(tts.getHost());
@@ -1780,31 +1781,39 @@ public class JobInProgress {
           }
         }
       }
-      logAndIncreJobCounters(tip, level, jobtracker.isNodeGroupAware());
+      locality = logAndIncreJobCounters(tip, level, jobtracker.isNodeGroupAware());
     }
+    // Set locality
+    tip.setTaskAttemptLocality(id, locality);
+
+    // Set avataar
+    Avataar avataar = (tip.getActiveTasks().size() > 1) ? Avataar.SPECULATIVE :
+        Avataar.VIRGIN;
+    tip.setTaskAttemptAvataar(id, avataar);
   }
 
-  private void logAndIncreJobCounters(TaskInProgress tip, int level, 
+  private Locality logAndIncreJobCounters(TaskInProgress tip, int level, 
       boolean isNodeGroupAware) {
     switch (level) {
       case 0:
         // level 0 means data-local
         logAndIncrDataLocalMaps(tip);
-        break;
+        return Locality.NODE_LOCAL;
       case 1:
         if (isNodeGroupAware) {
           // level 1 in case of with-NodeGroup means nodegroup-local
           logAndIncrNodeGroupLocalMaps(tip);
+          return Locality.GROUP_LOCAL;
         } else {
           // level 1 in case of without-NodeGroup means rack-local
           logAndIncrRackLocalMaps(tip);
+          return Locality.RACK_LOCAL;
         }
-        break;
       case 2:
         if (isNodeGroupAware) {
           // level 2 in case of with-NodeGroup means rack-local
           logAndIncrRackLocalMaps(tip);
-          break;
+          return Locality.RACK_LOCAL;
         }
         // in case of without-NodeGroup, level 2 falls through to other-local
         // handled by default
@@ -1813,7 +1822,7 @@ public class JobInProgress {
         if (level != this.maxLevel) {
           logAndIncrOtherLocalMaps(tip, level);
         }
-        break;
+        return Locality.OFF_SWITCH;
     }
   }
 
@@ -2592,22 +2601,27 @@ public class JobInProgress {
       this.jobtracker.getTaskTrackerStatus(status.getTaskTracker());
     String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString();
     String taskType = getTaskType(tip);
+    TaskAttemptID taskAttemptId = status.getTaskID();
+    Locality locality = checkLocality(tip, taskAttemptId);
+    Avataar avataar = checkAvataar(tip, taskAttemptId);
     if (status.getIsMap()){
-      JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
+      JobHistory.MapAttempt.logStarted(taskAttemptId, status.getStartTime(), 
                                        status.getTaskTracker(), 
-                                       ttStatus.getHttpPort(), 
-                                       taskType); 
-      JobHistory.MapAttempt.logFinished(status.getTaskID(), status.getFinishTime(), 
+                                       ttStatus.getHttpPort(),
+                                       taskType,locality, avataar);
+      JobHistory.MapAttempt.logFinished(taskAttemptId, status.getFinishTime(), 
                                         trackerHostname, taskType,
                                         status.getStateString(), 
                                         status.getCounters()); 
     }else{
-      JobHistory.ReduceAttempt.logStarted( status.getTaskID(), status.getStartTime(), 
+      JobHistory.ReduceAttempt.logStarted(taskAttemptId, status.getStartTime(), 
                                           status.getTaskTracker(),
                                           ttStatus.getHttpPort(), 
-                                          taskType); 
-      JobHistory.ReduceAttempt.logFinished(status.getTaskID(), status.getShuffleFinishTime(),
-                                           status.getSortFinishTime(), status.getFinishTime(), 
+                                          taskType, locality, avataar);
+      JobHistory.ReduceAttempt.logFinished(taskAttemptId,
+                                           status.getShuffleFinishTime(),
+                                           status.getSortFinishTime(),
+                                           status.getFinishTime(),
                                            trackerHostname, 
                                            taskType,
                                            status.getStateString(), 
@@ -3052,9 +3066,12 @@ public class JobInProgress {
     String diagInfo = taskDiagnosticInfo == null ? "" :
       StringUtils.arrayToString(taskDiagnosticInfo.toArray(new String[0]));
     String taskType = getTaskType(tip);
+    TaskAttemptID taskAttemptId = status.getTaskID();
+    Locality locality = checkLocality(tip, taskAttemptId);
+    Avataar avataar = checkAvataar(tip, taskAttemptId);
     if (taskStatus.getIsMap()) {
       JobHistory.MapAttempt.logStarted(taskid, startTime, 
-        taskTrackerName, taskTrackerPort, taskType);
+        taskTrackerName, taskTrackerPort, taskType, locality, avataar);
       if (taskStatus.getRunState() == TaskStatus.State.FAILED) {
         JobHistory.MapAttempt.logFailed(taskid, finishTime,
           taskTrackerHostName, diagInfo, taskType);
@@ -3064,7 +3081,7 @@ public class JobInProgress {
       }
     } else {
       JobHistory.ReduceAttempt.logStarted(taskid, startTime, 
-        taskTrackerName, taskTrackerPort, taskType);
+        taskTrackerName, taskTrackerPort, taskType, locality, avataar);
       if (taskStatus.getRunState() == TaskStatus.State.FAILED) {
         JobHistory.ReduceAttempt.logFailed(taskid, finishTime,
           taskTrackerHostName, diagInfo, taskType);
@@ -3160,6 +3177,22 @@ public class JobInProgress {
     }
   }
 
+  private Locality checkLocality(TaskInProgress tip, TaskAttemptID taskAttemptId) {
+    Locality locality = tip.getTaskAttemptLocality(taskAttemptId);
+    if (locality == null) {
+      locality = Locality.OFF_SWITCH;
+    }
+    return locality;
+  }
+
+  private Avataar checkAvataar(TaskInProgress tip, TaskAttemptID taskAttemptId) {
+    Avataar avataar = tip.getTaskAttemptAvataar(taskAttemptId);
+    if (avataar == null) {
+      avataar = Avataar.VIRGIN;
+    }
+    return avataar;
+  }
+
   void killSetupTip(boolean isMap) {
     if (isMap) {
       setup[0].kill();

+ 26 - 0
src/mapred/org/apache/hadoop/mapred/Locality.java

@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+enum Locality {
+  NODE_LOCAL,
+  GROUP_LOCAL,
+  RACK_LOCAL,
+  OFF_SWITCH
+}

+ 23 - 2
src/mapred/org/apache/hadoop/mapred/TaskInProgress.java

@@ -27,13 +27,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.SortedRanges.Range;
 import org.apache.hadoop.mapred.TaskStatus.State;
-import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.net.Node;
 
 
@@ -128,7 +129,11 @@ class TaskInProgress {
   private Counters counters = new Counters();
   
   private String user;
-  
+
+  private Map<TaskAttemptID, Locality> taskLocality = 
+      new ConcurrentHashMap<TaskAttemptID, Locality>();
+  private Map<TaskAttemptID, Avataar> taskAvataar = 
+      new ConcurrentHashMap<TaskAttemptID, Avataar>();
 
   /**
    * Constructor for MapTask
@@ -272,7 +277,23 @@ class TaskInProgress {
     execFinishTime = finishTime;
     JobHistory.Task.logUpdates(id, execFinishTime); // log the update
   }
+
+  public void setTaskAttemptLocality(TaskAttemptID taskAttemptID, Locality locality) {
+    taskLocality.put(taskAttemptID, locality);
+  }
   
+  public Locality getTaskAttemptLocality(TaskAttemptID taskAttemptId) {
+    return taskLocality.get(taskAttemptId);
+  }
+  
+  public void setTaskAttemptAvataar(TaskAttemptID taskAttemptId, Avataar avataar) {
+    taskAvataar.put(taskAttemptId, avataar);
+  }
+  
+  public Avataar getTaskAttemptAvataar(TaskAttemptID taskAttemptID) {
+    return taskAvataar.get(taskAttemptID);
+  }
+ 
   /**
    * Return the parent job
    */

+ 17 - 0
src/test/org/apache/hadoop/mapred/TestJobHistory.java

@@ -381,6 +381,23 @@ public class TestJobHistory extends TestCase {
                    (status.equals("SUCCESS") || status.equals("FAILED") ||
                     status.equals("KILLED")));
 
+        // Validate task Avataar
+        String avataar = attempt.get(Keys.AVATAAR);
+        assertTrue("Unexpected LOCALITY \"" + avataar + "\" is seen in " +
+            " history file for task attempt " + id,
+            (avataar.equals("VIRGIN") || avataar.equals("SPECULATIVE"))
+        );
+        
+        // Map Task Attempts should have valid LOCALITY
+        if (type.equals("MAP")) {
+          String locality = attempt.get(Keys.LOCALITY);
+          assertTrue("Unexpected LOCALITY \"" + locality + "\" is seen in " +
+              " history file for task attempt " + id,
+              (locality.equals("NODE_LOCAL") || locality.equals("GROUP_LOCAL") ||
+                  locality.equals("RACK_LOCAL") || locality.equals("OFF_SWITCH"))
+          );
+        }
+ 
         // Reduce Task Attempts should have valid SHUFFLE_FINISHED time and
         // SORT_FINISHED time
         if (type.equals("REDUCE") && status.equals("SUCCESS")) {

+ 5 - 0
src/tools/org/apache/hadoop/tools/rumen/Hadoop20JHParser.java

@@ -66,6 +66,11 @@ public class Hadoop20JHParser implements JobHistoryParser {
     reader = new LineReader(input);
   }
 
+  public Hadoop20JHParser(LineReader reader) throws IOException {
+    super();
+    this.reader = reader;
+  }
+
   Map<String, HistoryEventEmitter> liveEmitters =
       new HashMap<String, HistoryEventEmitter>();
   Queue<HistoryEvent> remainingEvents = new LinkedList<HistoryEvent>();

+ 20 - 2
src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java

@@ -66,6 +66,22 @@ public class Job20LineHistoryEventEmitter extends HistoryEventEmitter {
       String user = line.get("USER");
       String jobName = line.get("JOBNAME");
       String queueName = line.get("JOB_QUEUE");
+      String workflowId = line.get("WORKFLOW_ID");
+      if (workflowId == null) {
+        workflowId = "";
+      }
+      String workflowName = line.get("WORKFLOW_NAME");
+      if (workflowName == null) {
+        workflowName = "";
+      }
+      String workflowNodeName = line.get("WORKFLOW_NODE_NAME");
+      if (workflowNodeName == null) {
+        workflowNodeName = "";
+      }
+      String workflowAdjacencies = line.get("WORKFLOW_ADJACENCIES");
+      if (workflowAdjacencies == null) {
+        workflowAdjacencies = "";
+      }
 
       if (submitTime != null) {
         Job20LineHistoryEventEmitter that =
@@ -75,8 +91,10 @@ public class Job20LineHistoryEventEmitter extends HistoryEventEmitter {
 
         Map<JobACL, AccessControlList> jobACLs =
           new HashMap<JobACL, AccessControlList>();
-        return new JobSubmittedEvent(jobID, jobName, user == null ? "nulluser"
-            : user, that.originalSubmitTime, jobConf, jobACLs, queueName);
+        return new JobSubmittedEvent(jobID, jobName,
+            user == null ? "nulluser" : user, that.originalSubmitTime,
+            jobConf, jobACLs, queueName, workflowId, workflowName,
+            workflowNodeName, workflowAdjacencies);
       }
 
       return null;

+ 35 - 4
src/tools/org/apache/hadoop/tools/rumen/JobSubmittedEvent.java

@@ -38,6 +38,10 @@ public class JobSubmittedEvent implements HistoryEvent {
   private String jobConfPath;
   private Map<JobACL, AccessControlList> jobAcls;
   private String queue;
+  private String workflowId;
+  private String workflowName;
+  private String workflowNodeName;
+  private String workflowAdjacencies;
 
   /**
    * @deprecated Use
@@ -49,7 +53,7 @@ public class JobSubmittedEvent implements HistoryEvent {
   public JobSubmittedEvent(JobID id, String jobName, String userName,
       long submitTime, String jobConfPath) {
     this(id, jobName, userName, submitTime, jobConfPath,
-        new HashMap<JobACL, AccessControlList>(), null);
+        new HashMap<JobACL, AccessControlList>(), null, "", "", "", "");
   }
 
   /**
@@ -62,7 +66,8 @@ public class JobSubmittedEvent implements HistoryEvent {
   public JobSubmittedEvent(JobID id, String jobName, String userName,
       long submitTime, String jobConfPath,
       Map<JobACL, AccessControlList> jobACLs) {
-    this(id, jobName, userName, submitTime, jobConfPath, jobACLs, null);
+    this(id, jobName, userName, submitTime, jobConfPath, jobACLs, null,
+        "", "", "", "");
   }
 
   /**
@@ -74,10 +79,16 @@ public class JobSubmittedEvent implements HistoryEvent {
    * @param jobConfPath Path of the Job Configuration file
    * @param jobACLs The configured acls for the job.
    * @param queue job queue name
+   * @param workflowId the workflow Id
+   * @param workflowName the workflow name
+   * @param workflowNodeName the workflow node name
+   * @param workflowAdjacencies the workflow adjacencies
    */
   public JobSubmittedEvent(JobID id, String jobName, String userName,
       long submitTime, String jobConfPath,
-      Map<JobACL, AccessControlList> jobACLs, String queue) {
+      Map<JobACL, AccessControlList> jobACLs, String queue,
+      String workflowId, String workflowName, String workflowNodeName,
+      String workflowAdjacencies) {
     this.jobId = id;
     this.jobName = jobName;
     this.userName = userName;
@@ -85,6 +96,10 @@ public class JobSubmittedEvent implements HistoryEvent {
     this.jobConfPath = jobConfPath;
     this.jobAcls = jobACLs;
     this.queue = queue;
+    this.workflowId = workflowId;
+    this.workflowName = workflowName;
+    this.workflowNodeName = workflowNodeName;
+    this.workflowAdjacencies = workflowAdjacencies;
   }
 
   /** Get the Job Id */
@@ -101,10 +116,26 @@ public class JobSubmittedEvent implements HistoryEvent {
   public Map<JobACL, AccessControlList> getJobAcls() {
     return jobAcls;
   }
-
+  /** Get the acls configured for the job **/
   public String getJobQueueName() {
     return queue;
   }
+  /** Get the workflow Id */
+  public String getWorkflowId() {
+    return workflowId;
+  }
+  /** Get the workflow name */
+  public String getWorkflowName() {
+    return workflowName;
+  }
+  /** Get the workflow node name */
+  public String getWorkflowNodeName() {
+    return workflowNodeName;
+  }
+  /** Get the workflow adjacencies */
+  public String getWorkflowAdjacencies() {
+    return workflowAdjacencies;
+  }
 
   /** Get the event type */
   public EventType getEventType() { return EventType.JOB_SUBMITTED; }

+ 10 - 1
src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java

@@ -61,6 +61,14 @@ public abstract class TaskAttempt20LineEventEmitter extends HistoryEventEmitter
       String taskType = line.get("TASK_TYPE");
       String trackerName = line.get("TRACKER_NAME");
       String httpPort = line.get("HTTP_PORT");
+      String locality = line.get("LOCALITY");
+      if (locality == null) {
+        locality = "";
+      }
+      String avataar = line.get("AVATAAR");
+      if (avataar == null) {
+        avataar = "";
+      }
 
       if (startTime != null && taskType != null) {
         TaskAttempt20LineEventEmitter that =
@@ -75,7 +83,8 @@ public abstract class TaskAttempt20LineEventEmitter extends HistoryEventEmitter
                 .parseInt(httpPort);
 
         return new TaskAttemptStartedEvent(taskAttemptID,
-            that.originalTaskType, that.originalStartTime, trackerName, port);
+            that.originalTaskType, that.originalStartTime, trackerName, port,
+            locality, avataar);
       }
 
       return null;

+ 7 - 1
src/tools/org/apache/hadoop/tools/rumen/TaskAttemptFinishedEvent.java

@@ -84,7 +84,13 @@ public class TaskAttemptFinishedEvent  implements HistoryEvent {
   public JhCounters getCounters() { return counters; }
   /** Get the event type */
   public EventType getEventType() {
-    return EventType.MAP_ATTEMPT_FINISHED;
+    if (taskType == TaskType.JOB_SETUP) {
+      return EventType.SETUP_ATTEMPT_FINISHED;
+    } else if (taskType == TaskType.JOB_CLEANUP) {
+      return EventType.CLEANUP_ATTEMPT_FINISHED;
+    }
+    return attemptId.isMap() ? 
+        EventType.MAP_ATTEMPT_FINISHED : EventType.REDUCE_ATTEMPT_FINISHED;
   }
 
 }

+ 22 - 2
src/tools/org/apache/hadoop/tools/rumen/TaskAttemptStartedEvent.java

@@ -35,6 +35,8 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
   private TaskType taskType;
   private String trackerName;
   private int httpPort;
+  private String locality;
+  private String avataar;
 
   /**
    * Create an event to record the start of an attempt
@@ -43,16 +45,20 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
    * @param startTime Start time of the attempt
    * @param trackerName Name of the Task Tracker where attempt is running
    * @param httpPort The port number of the tracker
+   * @param locality the locality of the task attempt
+   * @param avataar the avataar of the task attempt
    */
   public TaskAttemptStartedEvent( TaskAttemptID attemptId,  
       TaskType taskType, long startTime, String trackerName,
-      int httpPort) {
+      int httpPort, String locality, String avataar) {
     this.taskId = attemptId.getTaskID();
     this.attemptId = attemptId;
     this.startTime = startTime;
     this.taskType = taskType;
     this.trackerName = trackerName;
     this.httpPort = httpPort;
+    this.locality = locality;
+    this.avataar = avataar;
   }
 
   /** Get the task id */
@@ -71,9 +77,23 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
   public TaskAttemptID getTaskAttemptId() {
     return attemptId;
   }
+  /** Get the locality of the task attempt */
+  public String getLocality() {
+    return locality;
+  }
+  /** Get the avataar of the task attempt */
+  public String getAvataar() {
+    return avataar;
+  }
   /** Get the event type */
   public EventType getEventType() {
-    return EventType.MAP_ATTEMPT_STARTED;
+    if (taskType == TaskType.JOB_SETUP) {
+      return EventType.SETUP_ATTEMPT_STARTED;
+    } else if (taskType == TaskType.JOB_CLEANUP) {
+      return EventType.CLEANUP_ATTEMPT_STARTED;
+    }
+    return attemptId.isMap() ? 
+        EventType.MAP_ATTEMPT_STARTED : EventType.REDUCE_ATTEMPT_STARTED;
   }
 
 }

+ 17 - 1
src/tools/org/apache/hadoop/tools/rumen/TaskAttemptUnsuccessfulCompletionEvent.java

@@ -77,7 +77,23 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
   public String getTaskStatus() { return status; }
   /** Get the event type */
   public EventType getEventType() {
-    return EventType.MAP_ATTEMPT_KILLED;
+    if (status.equals("FAILED")) {
+      if (taskType == TaskType.JOB_SETUP) {
+        return EventType.SETUP_ATTEMPT_FAILED;
+      } else if (taskType == TaskType.JOB_CLEANUP) {
+        return EventType.CLEANUP_ATTEMPT_FAILED;
+      }
+      return attemptId.isMap() ? 
+          EventType.MAP_ATTEMPT_FAILED : EventType.REDUCE_ATTEMPT_FAILED;
+    } else {
+      if (taskType == TaskType.JOB_SETUP) {
+        return EventType.SETUP_ATTEMPT_KILLED;
+      } else if (taskType == TaskType.JOB_CLEANUP) {
+        return EventType.CLEANUP_ATTEMPT_KILLED;
+      }
+      return attemptId.isMap() ? 
+          EventType.MAP_ATTEMPT_KILLED : EventType.REDUCE_ATTEMPT_KILLED;
+    }
   }
 
 }