Kaynağa Gözat

MAPREDUCE-2666. Retrieve shuffle port number from JobHistory on MR AM restart. Contributed by Jonathan Eagles.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1182613 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 13 yıl önce
ebeveyn
işleme
277e520579

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -1597,6 +1597,9 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-3125. Modified TaskImpl to consider only non-failed, non-killed
     MAPREDUCE-3125. Modified TaskImpl to consider only non-failed, non-killed
     task-attempts for obtaining task's progress. (Hitesh Shah via vinodkv)
     task-attempts for obtaining task's progress. (Hitesh Shah via vinodkv)
 
 
+    MAPREDUCE-2666. Retrieve shuffle port number from JobHistory on MR AM
+    restart. (Jonathan Eagles via acmurthy) 
+
 Release 0.22.0 - Unreleased
 Release 0.22.0 - Unreleased
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 3 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -1095,6 +1095,8 @@ public abstract class TaskAttemptImpl implements
 
 
       //set the launch time
       //set the launch time
       taskAttempt.launchTime = taskAttempt.clock.getTime();
       taskAttempt.launchTime = taskAttempt.clock.getTime();
+      taskAttempt.shufflePort = event.getShufflePort();
+
       // register it to TaskAttemptListener so that it start listening
       // register it to TaskAttemptListener so that it start listening
       // for it
       // for it
       taskAttempt.taskAttemptListener.register(
       taskAttempt.taskAttemptListener.register(
@@ -1116,7 +1118,7 @@ public abstract class TaskAttemptImpl implements
         new TaskAttemptStartedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
         new TaskAttemptStartedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
             TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
             TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
             taskAttempt.launchTime,
             taskAttempt.launchTime,
-            nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort());
+            nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(), taskAttempt.shufflePort);
       taskAttempt.eventHandler.handle
       taskAttempt.eventHandler.handle
           (new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase));
           (new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase));
       taskAttempt.eventHandler.handle
       taskAttempt.eventHandler.handle
@@ -1125,7 +1127,6 @@ public abstract class TaskAttemptImpl implements
       //make remoteTask reference as null as it is no more needed
       //make remoteTask reference as null as it is no more needed
       //and free up the memory
       //and free up the memory
       taskAttempt.remoteTask = null;
       taskAttempt.remoteTask = null;
-      taskAttempt.shufflePort = event.getShufflePort();
       
       
       //tell the Task that attempt has started
       //tell the Task that attempt has started
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(

+ 2 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr

@@ -173,7 +173,8 @@
           {"name": "attemptId", "type": "string"},
           {"name": "attemptId", "type": "string"},
           {"name": "startTime", "type": "long"},
           {"name": "startTime", "type": "long"},
           {"name": "trackerName", "type": "string"},
           {"name": "trackerName", "type": "string"},
-          {"name": "httpPort", "type": "int"}
+          {"name": "httpPort", "type": "int"},
+          {"name": "shufflePort", "type": "int"}
       ]
       ]
      },
      },
 
 

+ 6 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java

@@ -240,6 +240,7 @@ public class JobHistoryParser {
     attemptInfo.httpPort = event.getHttpPort();
     attemptInfo.httpPort = event.getHttpPort();
     attemptInfo.trackerName = event.getTrackerName();
     attemptInfo.trackerName = event.getTrackerName();
     attemptInfo.taskType = event.getTaskType();
     attemptInfo.taskType = event.getTaskType();
+    attemptInfo.shufflePort = event.getShufflePort();
     
     
     taskInfo.attemptsMap.put(attemptId, attemptInfo);
     taskInfo.attemptsMap.put(attemptId, attemptInfo);
   }
   }
@@ -506,6 +507,7 @@ public class JobHistoryParser {
     String trackerName;
     String trackerName;
     Counters counters;
     Counters counters;
     int httpPort;
     int httpPort;
+    int shufflePort;
     String hostname;
     String hostname;
 
 
     /** Create a Task Attempt Info which will store attempt level information
     /** Create a Task Attempt Info which will store attempt level information
@@ -516,6 +518,7 @@ public class JobHistoryParser {
         mapFinishTime = -1;
         mapFinishTime = -1;
       error =  state =  trackerName = hostname = "";
       error =  state =  trackerName = hostname = "";
       httpPort = -1;
       httpPort = -1;
+      shufflePort = -1;
     }
     }
     /**
     /**
      * Print all the information about this attempt.
      * Print all the information about this attempt.
@@ -530,6 +533,7 @@ public class JobHistoryParser {
       System.out.println("TASK_TYPE:" + taskType);
       System.out.println("TASK_TYPE:" + taskType);
       System.out.println("TRACKER_NAME:" + trackerName);
       System.out.println("TRACKER_NAME:" + trackerName);
       System.out.println("HTTP_PORT:" + httpPort);
       System.out.println("HTTP_PORT:" + httpPort);
+      System.out.println("SHUFFLE_PORT:" + shufflePort);
       if (counters != null) {
       if (counters != null) {
         System.out.println("COUNTERS:" + counters.toString());
         System.out.println("COUNTERS:" + counters.toString());
       }
       }
@@ -563,5 +567,7 @@ public class JobHistoryParser {
     public Counters getCounters() { return counters; }
     public Counters getCounters() { return counters; }
     /** @return the HTTP port for the tracker */
     /** @return the HTTP port for the tracker */
     public int getHttpPort() { return httpPort; }
     public int getHttpPort() { return httpPort; }
+    /** @return the Shuffle port for the tracker */
+    public int getShufflePort() { return shufflePort; }
   }
   }
 }
 }

+ 5 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java

@@ -44,16 +44,18 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
    * @param startTime Start time of the attempt
    * @param startTime Start time of the attempt
    * @param trackerName Name of the Task Tracker where attempt is running
    * @param trackerName Name of the Task Tracker where attempt is running
    * @param httpPort The port number of the tracker
    * @param httpPort The port number of the tracker
+   * @param shufflePort The shuffle port number of the container
    */
    */
   public TaskAttemptStartedEvent( TaskAttemptID attemptId,  
   public TaskAttemptStartedEvent( TaskAttemptID attemptId,  
       TaskType taskType, long startTime, String trackerName,
       TaskType taskType, long startTime, String trackerName,
-      int httpPort) {
+      int httpPort, int shufflePort) {
     datum.attemptId = new Utf8(attemptId.toString());
     datum.attemptId = new Utf8(attemptId.toString());
     datum.taskid = new Utf8(attemptId.getTaskID().toString());
     datum.taskid = new Utf8(attemptId.getTaskID().toString());
     datum.startTime = startTime;
     datum.startTime = startTime;
     datum.taskType = new Utf8(taskType.name());
     datum.taskType = new Utf8(taskType.name());
     datum.trackerName = new Utf8(trackerName);
     datum.trackerName = new Utf8(trackerName);
     datum.httpPort = httpPort;
     datum.httpPort = httpPort;
+    datum.shufflePort = shufflePort;
   }
   }
 
 
   TaskAttemptStartedEvent() {}
   TaskAttemptStartedEvent() {}
@@ -75,6 +77,8 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
   }
   }
   /** Get the HTTP port */
   /** Get the HTTP port */
   public int getHttpPort() { return datum.httpPort; }
   public int getHttpPort() { return datum.httpPort; }
+  /** Get the shuffle port */
+  public int getShufflePort() { return datum.shufflePort; }
   /** Get the attempt id */
   /** Get the attempt id */
   public TaskAttemptID getTaskAttemptId() {
   public TaskAttemptID getTaskAttemptId() {
     return TaskAttemptID.forName(datum.attemptId.toString());
     return TaskAttemptID.forName(datum.attemptId.toString());

+ 21 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java

@@ -35,10 +35,13 @@ import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.MRApp;
 import org.apache.hadoop.mapreduce.v2.app.MRApp;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
 import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
 import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
 import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
@@ -101,12 +104,27 @@ public class TestJobHistoryParsing {
     Assert.assertEquals("total number of tasks is incorrect  ", 3, totalTasks);
     Assert.assertEquals("total number of tasks is incorrect  ", 3, totalTasks);
 
 
     //Assert at taskAttempt level
     //Assert at taskAttempt level
-    for (TaskInfo taskInfo :  jobInfo.getAllTasks().values()) {
+    for (TaskInfo taskInfo : jobInfo.getAllTasks().values()) {
       int taskAttemptCount = taskInfo.getAllTaskAttempts().size();
       int taskAttemptCount = taskInfo.getAllTaskAttempts().size();
-      Assert.assertEquals("total number of task attempts ", 
+      Assert.assertEquals("total number of task attempts ",
           1, taskAttemptCount);
           1, taskAttemptCount);
     }
     }
-    
+
+    // Deep compare Job and JobInfo
+    for (Task task : job.getTasks().values()) {
+      TaskInfo taskInfo = jobInfo.getAllTasks().get(
+          TypeConverter.fromYarn(task.getID()));
+      Assert.assertNotNull("TaskInfo not found", taskInfo);
+      for (TaskAttempt taskAttempt : task.getAttempts().values()) {
+        TaskAttemptInfo taskAttemptInfo =
+          taskInfo.getAllTaskAttempts().get(
+              TypeConverter.fromYarn((taskAttempt.getID())));
+        Assert.assertNotNull("TaskAttemptInfo not found", taskAttemptInfo);
+        Assert.assertEquals("Incorrect shuffle port for task attempt",
+            taskAttempt.getShufflePort(), taskAttemptInfo.getShufflePort());
+      }
+    }
+
     String summaryFileName = JobHistoryUtils
     String summaryFileName = JobHistoryUtils
         .getIntermediateSummaryFileName(jobId);
         .getIntermediateSummaryFileName(jobId);
     Path summaryFile = new Path(jobhistoryDir, summaryFileName);
     Path summaryFile = new Path(jobhistoryDir, summaryFileName);

+ 2 - 2
hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -2676,7 +2676,7 @@ public class JobInProgress {
 
 
     TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent(
     TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent(
         status.getTaskID(), taskType, status.getStartTime(), 
         status.getTaskID(), taskType, status.getStartTime(), 
-        status.getTaskTracker(),  ttStatus.getHttpPort());
+        status.getTaskTracker(),  ttStatus.getHttpPort(), -1);
     
     
     jobHistory.logEvent(tse, status.getTaskID().getJobID());
     jobHistory.logEvent(tse, status.getTaskID().getJobID());
     TaskAttemptID statusAttemptID = status.getTaskID();
     TaskAttemptID statusAttemptID = status.getTaskID();
@@ -3197,7 +3197,7 @@ public class JobInProgress {
       StringUtils.arrayToString(taskDiagnosticInfo.toArray(new String[0]));
       StringUtils.arrayToString(taskDiagnosticInfo.toArray(new String[0]));
     TaskType taskType = getTaskType(tip);
     TaskType taskType = getTaskType(tip);
     TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent(
     TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent(
-        taskid, taskType, startTime, taskTrackerName, taskTrackerPort);
+        taskid, taskType, startTime, taskTrackerName, taskTrackerPort, -1);
     
     
     jobHistory.logEvent(tse, taskid.getJobID());
     jobHistory.logEvent(tse, taskid.getJobID());
 
 

+ 1 - 1
hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java

@@ -48,7 +48,7 @@ public class TestJobHistoryEvents extends TestCase {
                                                       TaskType[] types) {
                                                       TaskType[] types) {
     for (TaskType t : types) {
     for (TaskType t : types) {
       TaskAttemptStartedEvent tase = 
       TaskAttemptStartedEvent tase = 
-        new TaskAttemptStartedEvent(id, t, 0L, "", 0);
+        new TaskAttemptStartedEvent(id, t, 0L, "", 0, -1);
       assertEquals(expected, tase.getEventType());
       assertEquals(expected, tase.getEventType());
     }
     }
   }
   }

+ 1 - 1
hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java

@@ -79,7 +79,7 @@ public abstract class TaskAttempt20LineEventEmitter extends HistoryEventEmitter
                 .parseInt(httpPort);
                 .parseInt(httpPort);
 
 
         return new TaskAttemptStartedEvent(taskAttemptID,
         return new TaskAttemptStartedEvent(taskAttemptID,
-            that.originalTaskType, that.originalStartTime, trackerName, port);
+            that.originalTaskType, that.originalStartTime, trackerName, port, -1);
       }
       }
 
 
       return null;
       return null;