Browse Source

svn merge -c 1452372 FIXES: MAPREDUCE-5043 Fetch failure processing can cause AM event queue to backup and eventually OOM Contributed by Jason Lowe


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1454440 13f79535-47bb-0310-9956-ffa450edef68
Robert Joseph Evans 12 years ago
parent
commit
f57594968a

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -88,6 +88,9 @@ Release 0.23.7 - UNRELEASED
     appropriately used and that on-disk segments are correctly sorted on
     file-size. (Anty Rao and Ravi Prakash via acmurthy) 
 
+    MAPREDUCE-5043. Fetch failure processing can cause AM event queue to
+    backup and eventually OOM (Jason Lowe via bobby)
+
     MAPREDUCE-5023. History Server Web Services missing Job Counters (Ravi
     Prakash via tgraves)
 

+ 2 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app.job;
 import java.util.List;
 
 import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
@@ -36,6 +37,7 @@ public interface TaskAttempt {
   List<String> getDiagnostics();
   Counters getCounters();
   float getProgress();
+  Phase getPhase();
   TaskAttemptState getState();
 
   /** 

+ 14 - 14
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java

@@ -1543,6 +1543,20 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
       SingleArcTransition<JobImpl, JobEvent> {
     @Override
     public void transition(JobImpl job, JobEvent event) {
+      //get number of shuffling reduces
+      int shufflingReduceTasks = 0;
+      for (TaskId taskId : job.reduceTasks) {
+        Task task = job.tasks.get(taskId);
+        if (TaskState.RUNNING.equals(task.getState())) {
+          for(TaskAttempt attempt : task.getAttempts().values()) {
+            if(attempt.getPhase() == Phase.SHUFFLE) {
+              shufflingReduceTasks++;
+              break;
+            }
+          }
+        }
+      }
+
       JobTaskAttemptFetchFailureEvent fetchfailureEvent = 
         (JobTaskAttemptFetchFailureEvent) event;
       for (org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId mapId : 
@@ -1551,20 +1565,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
         fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
         job.fetchFailuresMapping.put(mapId, fetchFailures);
         
-        //get number of shuffling reduces
-        int shufflingReduceTasks = 0;
-        for (TaskId taskId : job.reduceTasks) {
-          Task task = job.tasks.get(taskId);
-          if (TaskState.RUNNING.equals(task.getState())) {
-            for(TaskAttempt attempt : task.getAttempts().values()) {
-              if(attempt.getReport().getPhase() == Phase.SHUFFLE) {
-                shufflingReduceTasks++;
-                break;
-              }
-            }
-          }
-        }
-        
         float failureRate = shufflingReduceTasks == 0 ? 1.0f : 
           (float) fetchFailures / shufflingReduceTasks;
         // declare faulty if fetch-failures >= max-allowed-failures

+ 10 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -947,6 +947,16 @@ public abstract class TaskAttemptImpl implements
     }
   }
 
+  @Override
+  public Phase getPhase() {
+    readLock.lock();
+    try {
+      return reportedStatus.phase;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
   @Override
   public TaskAttemptState getState() {
     readLock.lock();

+ 5 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java

@@ -270,6 +270,11 @@ public class MockJobs extends MockApps {
         return report.getProgress();
       }
 
+      @Override
+      public Phase getPhase() {
+        return report.getPhase();
+      }
+
       @Override
       public TaskAttemptState getState() {
         return report.getTaskAttemptState();

+ 6 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java

@@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
@@ -632,6 +633,11 @@ public class TestRuntimeEstimators {
       return myAttemptID.getTaskId().getTaskType() == TaskType.MAP ? getMapProgress() : getReduceProgress();
     }
 
+    @Override
+    public Phase getPhase() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
     @Override
     public TaskAttemptState getState() {
       if (overridingState != null) {

+ 6 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java

@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
@@ -100,6 +101,11 @@ public class CompletedTaskAttempt implements TaskAttempt {
     return report;
   }
 
+  @Override
+  public Phase getPhase() {
+    return Phase.CLEANUP;
+  }
+
   @Override
   public TaskAttemptState getState() {
     return state;