浏览代码

MAPREDUCE-3186. User jobs are getting hanged if the Resource manager process goes down and comes up while job is getting executed. (Eric Payne via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1190122 13f79535-47bb-0310-9956-ffa450edef68
Mahadev Konar 13 年之前
父节点
当前提交
b304062f1f

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -1824,6 +1824,9 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-3282. bin/mapred job -list throws exception. (acmurthy via 
     mahadev)
 
+    MAPREDUCE-3186. User jobs are getting hanged if the Resource manager process goes down 
+    and comes up while job is getting executed. (Eric Payne via mahadev)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

+ 45 - 5
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java

@@ -23,19 +23,23 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
+import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.AMResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -57,8 +61,10 @@ public class LocalContainerAllocator extends RMCommunicator
       LogFactory.getLog(LocalContainerAllocator.class);
 
   private final EventHandler eventHandler;
-  private final ApplicationId appID;
+//  private final ApplicationId appID;
   private AtomicInteger containerCount = new AtomicInteger();
+  private long retryInterval;
+  private long retrystartTime;
 
   private final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
@@ -67,7 +73,19 @@ public class LocalContainerAllocator extends RMCommunicator
                                  AppContext context) {
     super(clientService, context);
     this.eventHandler = context.getEventHandler();
-    this.appID = context.getApplicationID();
+//    this.appID = context.getApplicationID();
+    
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    retryInterval =
+        getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
+            MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
+    // Init startTime to current time. If all goes well, it will be reset after
+    // first attempt to contact RM.
+    retrystartTime = System.currentTimeMillis();
   }
 
   @Override
@@ -77,10 +95,32 @@ public class LocalContainerAllocator extends RMCommunicator
             .getApplicationProgress(), new ArrayList<ResourceRequest>(),
         new ArrayList<ContainerId>());
     AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
-    AMResponse response = allocateResponse.getAMResponse();
+    AMResponse response;
+    try {
+      response = allocateResponse.getAMResponse();
+      // Reset retry count if no exception occurred.
+      retrystartTime = System.currentTimeMillis();
+    } catch (Exception e) {
+      // This can happen when the connection to the RM has gone down. Keep
+      // re-trying until the retryInterval has expired.
+      if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
+        eventHandler.handle(new JobEvent(this.getJob().getID(),
+                                         JobEventType.INTERNAL_ERROR));
+        throw new YarnException("Could not contact RM after " +
+                                retryInterval + " milliseconds.");
+      }
+      // Throw this up to the caller, which may decide to ignore it and
+      // continue to attempt to contact the RM.
+      throw e;
+    }
     if (response.getReboot()) {
-      // TODO
       LOG.info("Event from RM: shutting down Application Master");
+      // This can happen if the RM has been restarted. If it is in that state,
+      // this application must clean itself up.
+      eventHandler.handle(new JobEvent(this.getJob().getID(),
+                                       JobEventType.INTERNAL_ERROR));
+      throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
+                               this.getContext().getApplicationID());
     }
   }
 

+ 3 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java

@@ -233,6 +233,9 @@ public abstract class RMCommunicator extends AbstractService  {
             Thread.sleep(rmPollInterval);
             try {
               heartbeat();
+            } catch (YarnException e) {
+              LOG.error("Error communicating with RM: " + e.getMessage() , e);
+              return;
             } catch (Exception e) {
               LOG.error("ERROR IN CONTACTING RM. ", e);
               // TODO: for other exceptions

+ 40 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssigned
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -128,6 +129,8 @@ public class RMContainerAllocator extends RMContainerRequestor
   private float maxReduceRampupLimit = 0;
   private float maxReducePreemptionLimit = 0;
   private float reduceSlowStart = 0;
+  private long retryInterval;
+  private long retrystartTime;
   
   public RMContainerAllocator(ClientService clientService, AppContext context) {
     super(clientService, context);
@@ -146,6 +149,11 @@ public class RMContainerAllocator extends RMContainerRequestor
         MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT,
         MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT);
     RackResolver.init(conf);
+    retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
+                                MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
+    // Init startTime to current time. If all goes well, it will be reset after
+    // first attempt to contact RM.
+    retrystartTime = System.currentTimeMillis();
   }
 
   @Override
@@ -429,11 +437,41 @@ public class RMContainerAllocator extends RMContainerRequestor
         " rackLocalAssigned:" + rackLocalAssigned +
         " availableResources(headroom):" + getAvailableResources();
   }
-  
+
   @SuppressWarnings("unchecked")
   private List<Container> getResources() throws Exception {
     int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null
-    AMResponse response = makeRemoteRequest();
+    AMResponse response;
+    /*
+     * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
+     * milliseconds before aborting. During this interval, AM will still try
+     * to contact the RM.
+     */
+    try {
+      response = makeRemoteRequest();
+      // Reset retry count if no exception occurred.
+      retrystartTime = System.currentTimeMillis();
+    } catch (Exception e) {
+      // This can happen when the connection to the RM has gone down. Keep
+      // re-trying until the retryInterval has expired.
+      if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
+        eventHandler.handle(new JobEvent(this.getJob().getID(),
+                                         JobEventType.INTERNAL_ERROR));
+        throw new YarnException("Could not contact RM after " +
+                                retryInterval + " milliseconds.");
+      }
+      // Throw this up to the caller, which may decide to ignore it and
+      // continue to attempt to contact the RM.
+      throw e;
+    }
+    if (response.getReboot()) {
+      // This can happen if the RM has been restarted. If it is in that state,
+      // this application must clean itself up.
+      eventHandler.handle(new JobEvent(this.getJob().getID(),
+                                       JobEventType.INTERNAL_ERROR));
+      throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
+                               this.getContext().getApplicationID());
+    }
     int newHeadRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
     List<Container> newContainers = response.getAllocatedContainers();
     List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses();

+ 9 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -403,6 +403,15 @@ public interface MRJobConfig {
     MR_AM_PREFIX + "scheduler.heartbeat.interval-ms";
   public static final int DEFAULT_MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS = 2000;
 
+  /**
+   * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
+   * milliseconds before aborting. During this interval, AM will still try
+   * to contact the RM.
+   */
+  public static final String MR_AM_TO_RM_WAIT_INTERVAL_MS =
+    MR_AM_PREFIX + "scheduler.connection.wait.interval-ms";
+  public static final int DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS = 360000;
+
   /**
    * Boolean. Create the base dirs in the JobHistoryEventHandler
    * Set to false for multi-user clusters.  This is an internal config that