1
0
Pārlūkot izejas kodu

MAPREDUCE-6024. Shortened the time when Fetcher is stuck in retrying before concluding the failure by configuration. Contributed by Yunjiong Zhao.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1618677 13f79535-47bb-0310-9956-ffa450edef68
Zhijie Shen 10 gadi atpakaļ
vecāks
revīzija
f8e871d01b

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -227,6 +227,9 @@ Release 2.6.0 - UNRELEASED
     MAPREDUCE-6032. Made MR jobs write job history files on the default FS when
     MAPREDUCE-6032. Made MR jobs write job history files on the default FS when
     the current context's FS is different. (Benjamin Zhitomirsky via zjshen)
     the current context's FS is different. (Benjamin Zhitomirsky via zjshen)
 
 
+    MAPREDUCE-6024. Shortened the time when Fetcher is stuck in retrying before
+    concluding the failure by configuration. (Yunjiong Zhao via zjshen)
+
 Release 2.5.0 - UNRELEASED
 Release 2.5.0 - UNRELEASED
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 21 - 7
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java

@@ -148,10 +148,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
   private static final Log LOG = LogFactory.getLog(JobImpl.class);
   private static final Log LOG = LogFactory.getLog(JobImpl.class);
 
 
   //The maximum fraction of fetch failures allowed for a map
   //The maximum fraction of fetch failures allowed for a map
-  private static final double MAX_ALLOWED_FETCH_FAILURES_FRACTION = 0.5;
-
-  // Maximum no. of fetch-failure notifications after which map task is failed
-  private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
+  private float maxAllowedFetchFailuresFraction;
+  
+  //Maximum no. of fetch-failure notifications after which map task is failed
+  private int maxFetchFailuresNotifications;
 
 
   public static final String JOB_KILLED_DIAG =
   public static final String JOB_KILLED_DIAG =
       "Job received Kill while in RUNNING state.";
       "Job received Kill while in RUNNING state.";
@@ -704,6 +704,13 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     if(forcedDiagnostic != null) {
     if(forcedDiagnostic != null) {
       this.diagnostics.add(forcedDiagnostic);
       this.diagnostics.add(forcedDiagnostic);
     }
     }
+    
+    this.maxAllowedFetchFailuresFraction = conf.getFloat(
+        MRJobConfig.MAX_ALLOWED_FETCH_FAILURES_FRACTION,
+        MRJobConfig.DEFAULT_MAX_ALLOWED_FETCH_FAILURES_FRACTION);
+    this.maxFetchFailuresNotifications = conf.getInt(
+        MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS,
+        MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS);
   }
   }
 
 
   protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
   protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
@@ -1900,9 +1907,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
         float failureRate = shufflingReduceTasks == 0 ? 1.0f : 
         float failureRate = shufflingReduceTasks == 0 ? 1.0f : 
           (float) fetchFailures / shufflingReduceTasks;
           (float) fetchFailures / shufflingReduceTasks;
         // declare faulty if fetch-failures >= max-allowed-failures
         // declare faulty if fetch-failures >= max-allowed-failures
-        boolean isMapFaulty =
-            (failureRate >= MAX_ALLOWED_FETCH_FAILURES_FRACTION);
-        if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS && isMapFaulty) {
+        if (fetchFailures >= job.getMaxFetchFailuresNotifications()
+            && failureRate >= job.getMaxAllowedFetchFailuresFraction()) {
           LOG.info("Too many fetch-failures for output of task attempt: " + 
           LOG.info("Too many fetch-failures for output of task attempt: " + 
               mapId + " ... raising fetch failure to map");
               mapId + " ... raising fetch failure to map");
           job.eventHandler.handle(new TaskAttemptEvent(mapId, 
           job.eventHandler.handle(new TaskAttemptEvent(mapId, 
@@ -2185,4 +2191,12 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
     jobConf.addResource(fc.open(confPath), confPath.toString());
     jobConf.addResource(fc.open(confPath), confPath.toString());
     return jobConf;
     return jobConf;
   }
   }
+
+  public float getMaxAllowedFetchFailuresFraction() {
+    return maxAllowedFetchFailuresFraction;
+  }
+
+  public int getMaxFetchFailuresNotifications() {
+    return maxFetchFailuresNotifications;
+  }
 }
 }

+ 8 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -293,11 +293,19 @@ public interface MRJobConfig {
   public static final String SHUFFLE_READ_TIMEOUT = "mapreduce.reduce.shuffle.read.timeout";
   public static final String SHUFFLE_READ_TIMEOUT = "mapreduce.reduce.shuffle.read.timeout";
 
 
   public static final String SHUFFLE_FETCH_FAILURES = "mapreduce.reduce.shuffle.maxfetchfailures";
   public static final String SHUFFLE_FETCH_FAILURES = "mapreduce.reduce.shuffle.maxfetchfailures";
+  public static final String MAX_ALLOWED_FETCH_FAILURES_FRACTION = "mapreduce.reduce.shuffle.max-fetch-failures-fraction";
+  public static final float DEFAULT_MAX_ALLOWED_FETCH_FAILURES_FRACTION = 0.5f;
+  
+  public static final String MAX_FETCH_FAILURES_NOTIFICATIONS = "mapreduce.reduce.shuffle.max-fetch-failures-notifications";
+  public static final int DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
 
 
   public static final String SHUFFLE_NOTIFY_READERROR = "mapreduce.reduce.shuffle.notify.readerror";
   public static final String SHUFFLE_NOTIFY_READERROR = "mapreduce.reduce.shuffle.notify.readerror";
   
   
   public static final String MAX_SHUFFLE_FETCH_RETRY_DELAY = "mapreduce.reduce.shuffle.retry-delay.max.ms";
   public static final String MAX_SHUFFLE_FETCH_RETRY_DELAY = "mapreduce.reduce.shuffle.retry-delay.max.ms";
   public static final long DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY = 60000;
   public static final long DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY = 60000;
+  
+  public static final String MAX_SHUFFLE_FETCH_HOST_FAILURES = "mapreduce.reduce.shuffle.max-host-failures";
+  public static final int DEFAULT_MAX_SHUFFLE_FETCH_HOST_FAILURES = 5;
 
 
   public static final String REDUCE_SKIP_INCR_PROC_COUNT = "mapreduce.reduce.skip.proc-count.auto-incr";
   public static final String REDUCE_SKIP_INCR_PROC_COUNT = "mapreduce.reduce.skip.proc-count.auto-incr";
 
 

+ 2 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java

@@ -319,6 +319,7 @@ class Fetcher<K,V> extends Thread {
 
 
       // If connect did not succeed, just mark all the maps as failed,
       // If connect did not succeed, just mark all the maps as failed,
       // indirectly penalizing the host
       // indirectly penalizing the host
+      scheduler.hostFailed(host.getHostName());
       for(TaskAttemptID left: remaining) {
       for(TaskAttemptID left: remaining) {
         scheduler.copyFailed(left, host, false, connectExcpt);
         scheduler.copyFailed(left, host, false, connectExcpt);
       }
       }
@@ -343,6 +344,7 @@ class Fetcher<K,V> extends Thread {
       
       
       if(failedTasks != null && failedTasks.length > 0) {
       if(failedTasks != null && failedTasks.length > 0) {
         LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
         LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
+        scheduler.hostFailed(host.getHostName());
         for(TaskAttemptID left: failedTasks) {
         for(TaskAttemptID left: failedTasks) {
           scheduler.copyFailed(left, host, true, false);
           scheduler.copyFailed(left, host, true, false);
         }
         }

+ 23 - 11
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java

@@ -18,7 +18,6 @@
 package org.apache.hadoop.mapreduce.task.reduce;
 package org.apache.hadoop.mapreduce.task.reduce;
 
 
 import java.io.IOException;
 import java.io.IOException;
-
 import java.net.InetAddress;
 import java.net.InetAddress;
 import java.net.URI;
 import java.net.URI;
 import java.net.UnknownHostException;
 import java.net.UnknownHostException;
@@ -101,6 +100,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
 
 
   private final boolean reportReadErrorImmediately;
   private final boolean reportReadErrorImmediately;
   private long maxDelay = MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY;
   private long maxDelay = MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY;
+  private int maxHostFailures;
 
 
   public ShuffleSchedulerImpl(JobConf job, TaskStatus status,
   public ShuffleSchedulerImpl(JobConf job, TaskStatus status,
                           TaskAttemptID reduceId,
                           TaskAttemptID reduceId,
@@ -132,6 +132,9 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
 
 
     this.maxDelay = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY,
     this.maxDelay = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY,
         MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY);
         MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY);
+    this.maxHostFailures = job.getInt(
+        MRJobConfig.MAX_SHUFFLE_FETCH_HOST_FAILURES,
+        MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_HOST_FAILURES);
   }
   }
 
 
   @Override
   @Override
@@ -213,9 +216,18 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
     progress.setStatus("copy(" + mapsDone + " of " + totalMaps + " at "
     progress.setStatus("copy(" + mapsDone + " of " + totalMaps + " at "
         + mbpsFormat.format(transferRate) + " MB/s)");
         + mbpsFormat.format(transferRate) + " MB/s)");
   }
   }
+  
+  public synchronized void hostFailed(String hostname) {
+    if (hostFailures.containsKey(hostname)) {
+      IntWritable x = hostFailures.get(hostname);
+      x.set(x.get() + 1);
+    } else {
+      hostFailures.put(hostname, new IntWritable(1));
+    }
+  }
 
 
   public synchronized void copyFailed(TaskAttemptID mapId, MapHost host,
   public synchronized void copyFailed(TaskAttemptID mapId, MapHost host,
-                                      boolean readError, boolean connectExcpt) {
+      boolean readError, boolean connectExcpt) {
     host.penalize();
     host.penalize();
     int failures = 1;
     int failures = 1;
     if (failureCounts.containsKey(mapId)) {
     if (failureCounts.containsKey(mapId)) {
@@ -226,12 +238,9 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
       failureCounts.put(mapId, new IntWritable(1));
       failureCounts.put(mapId, new IntWritable(1));
     }
     }
     String hostname = host.getHostName();
     String hostname = host.getHostName();
-    if (hostFailures.containsKey(hostname)) {
-      IntWritable x = hostFailures.get(hostname);
-      x.set(x.get() + 1);
-    } else {
-      hostFailures.put(hostname, new IntWritable(1));
-    }
+    //report failure if already retried maxHostFailures times
+    boolean hostFail = hostFailures.get(hostname).get() > getMaxHostFailures() ? true : false;
+    
     if (failures >= abortFailureLimit) {
     if (failures >= abortFailureLimit) {
       try {
       try {
         throw new IOException(failures + " failures downloading " + mapId);
         throw new IOException(failures + " failures downloading " + mapId);
@@ -240,7 +249,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
       }
       }
     }
     }
 
 
-    checkAndInformJobTracker(failures, mapId, readError, connectExcpt);
+    checkAndInformJobTracker(failures, mapId, readError, connectExcpt, hostFail);
 
 
     checkReducerHealth();
     checkReducerHealth();
 
 
@@ -270,9 +279,9 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
   // after every 'maxFetchFailuresBeforeReporting' failures
   // after every 'maxFetchFailuresBeforeReporting' failures
   private void checkAndInformJobTracker(
   private void checkAndInformJobTracker(
       int failures, TaskAttemptID mapId, boolean readError,
       int failures, TaskAttemptID mapId, boolean readError,
-      boolean connectExcpt) {
+      boolean connectExcpt, boolean hostFailed) {
     if (connectExcpt || (reportReadErrorImmediately && readError)
     if (connectExcpt || (reportReadErrorImmediately && readError)
-        || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
+        || ((failures % maxFetchFailuresBeforeReporting) == 0) || hostFailed) {
       LOG.info("Reporting fetch failure for " + mapId + " to jobtracker.");
       LOG.info("Reporting fetch failure for " + mapId + " to jobtracker.");
       status.addFetchFailedMap((org.apache.hadoop.mapred.TaskAttemptID) mapId);
       status.addFetchFailedMap((org.apache.hadoop.mapred.TaskAttemptID) mapId);
     }
     }
@@ -507,4 +516,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
     referee.join();
     referee.join();
   }
   }
 
 
+  public int getMaxHostFailures() {
+    return maxHostFailures;
+  }
 }
 }