Przeglądaj źródła

MAPREDUCE-3228. Fixed MR AM to timeout RPCs to bad NodeManagers. Contributed by Vinod K V.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1189879 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 13 lat temu
rodzic
commit
724f217343

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -1813,6 +1813,9 @@ Release 0.23.0 - Unreleased
 
     MAPREDUCE-3281. Fixed a bug in TestLinuxContainerExecutorWithMocks. (vinodkv)
 
+    MAPREDUCE-3228. Fixed MR AM to timeout RPCs to bad NodeManagers. (vinodkv
+    via acmurthy)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

+ 9 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapreduce.v2.app.launcher;
 
 
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.yarn.event.EventHandler;
 
 public interface ContainerLauncher 
@@ -28,4 +29,12 @@ public interface ContainerLauncher
     CONTAINER_REMOTE_LAUNCH,
     CONTAINER_REMOTE_CLEANUP
   }
+
+  // Not a documented config. Only used for tests
+  static final String MR_AM_NM_COMMAND_TIMEOUT = MRJobConfig.MR_AM_PREFIX
+      + "nm-command-timeout";
+  /**
+   *  Maximum of 1 minute timeout for a Node to react to the command
+   */
+  static final int DEFAULT_NM__COMMAND_TIMEOUT = 60000;
 }

+ 116 - 35
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java

@@ -21,6 +21,8 @@ package org.apache.hadoop.mapreduce.v2.app.launcher;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedAction;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -69,7 +71,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 public class ContainerLauncherImpl extends AbstractService implements
     ContainerLauncher {
 
-  private static final Log LOG = LogFactory.getLog(ContainerLauncherImpl.class);
+  static final Log LOG = LogFactory.getLog(ContainerLauncherImpl.class);
+
+  int nmTimeOut;
 
   private AppContext context;
   private ThreadPoolExecutor launcherPool;
@@ -95,14 +99,17 @@ public class ContainerLauncherImpl extends AbstractService implements
     this.limitOnPoolSize = conf.getInt(
         MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT,
         MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT);
+    this.nmTimeOut = conf.getInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT,
+        ContainerLauncher.DEFAULT_NM__COMMAND_TIMEOUT);
     super.init(conf);
   }
 
   public void start() {
+
+    ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
+        "ContainerLauncher #%d").setDaemon(true).build();
+
     // Start with a default core-pool size of 10 and change it dynamically.
-    ThreadFactory tf = new ThreadFactoryBuilder()
-      .setNameFormat("ContainerLauncher #%d")
-      .build();
     launcherPool = new ThreadPoolExecutor(INITIAL_POOL_SIZE,
         Integer.MAX_VALUE, 1, TimeUnit.HOURS,
         new LinkedBlockingQueue<Runnable>(),
@@ -156,11 +163,11 @@ public class ContainerLauncherImpl extends AbstractService implements
 
   public void stop() {
     eventHandlingThread.interrupt();
-    launcherPool.shutdown();
+    launcherPool.shutdownNow();
     super.stop();
   }
 
-  protected ContainerManager getCMProxy(ContainerId containerID,
+  protected ContainerManager getCMProxy(
       final String containerManagerBindAddr, ContainerToken containerToken)
       throws IOException {
 
@@ -193,6 +200,27 @@ public class ContainerLauncherImpl extends AbstractService implements
     return proxy;
   }
 
+  private static class CommandTimer extends TimerTask {
+    private final Thread commandThread;
+    protected final ContainerLauncherEvent event;
+    protected final String message;
+
+    public CommandTimer(Thread thread, ContainerLauncherEvent event) {
+      this.commandThread = thread;
+      this.event = event;
+      this.message = "Couldn't complete " + event.getType() + " on "
+          + event.getContainerID() + "/" + event.getTaskAttemptID()
+          + ". Interrupting and returning";
+    }
+
+    
+    @Override
+    public void run() {
+      LOG.warn(this.message);
+      this.commandThread.interrupt();
+    }
+  }
+
   /**
    * Setup and start the container on remote nodemanager.
    */
@@ -213,27 +241,53 @@ public class ContainerLauncherImpl extends AbstractService implements
       final String containerManagerBindAddr = event.getContainerMgrAddress();
       ContainerId containerID = event.getContainerID();
       ContainerToken containerToken = event.getContainerToken();
+      TaskAttemptId taskAttemptID = event.getTaskAttemptID();
+
+      Timer timer = new Timer(true);
 
       switch(event.getType()) {
 
       case CONTAINER_REMOTE_LAUNCH:
-        ContainerRemoteLaunchEvent launchEv = (ContainerRemoteLaunchEvent) event;
+        ContainerRemoteLaunchEvent launchEvent
+            = (ContainerRemoteLaunchEvent) event;
 
-        TaskAttemptId taskAttemptID = launchEv.getTaskAttemptID();
         try {
-          
-          ContainerManager proxy = 
-            getCMProxy(containerID, containerManagerBindAddr, containerToken);
-          
+          timer.schedule(new CommandTimer(Thread.currentThread(), event),
+              nmTimeOut);
+
+          ContainerManager proxy = getCMProxy(containerManagerBindAddr,
+              containerToken);
+
+          // Interruped during getProxy, but that didn't throw exception
+          if (Thread.currentThread().isInterrupted()) {
+            // The timer cancelled the command in the mean while.
+            String message = "Start-container for " + event.getContainerID()
+                + " got interrupted. Returning.";
+            sendContainerLaunchFailedMsg(taskAttemptID, message);
+            return;
+          }
+
           // Construct the actual Container
           ContainerLaunchContext containerLaunchContext =
-              launchEv.getContainer();
+              launchEvent.getContainer();
 
           // Now launch the actual container
           StartContainerRequest startRequest = recordFactory
               .newRecordInstance(StartContainerRequest.class);
           startRequest.setContainerLaunchContext(containerLaunchContext);
           StartContainerResponse response = proxy.startContainer(startRequest);
+
+          // container started properly. Stop the timer
+          timer.cancel();
+          if (Thread.currentThread().isInterrupted()) {
+            // The timer cancelled the command in the mean while, but
+            // startContainer didn't throw exception
+            String message = "Start-container for " + event.getContainerID()
+                + " got interrupted. Returning.";
+            sendContainerLaunchFailedMsg(taskAttemptID, message);
+            return;
+          }
+
           ByteBuffer portInfo = response
               .getServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
           int port = -1;
@@ -255,12 +309,9 @@ public class ContainerLauncherImpl extends AbstractService implements
         } catch (Throwable t) {
           String message = "Container launch failed for " + containerID
               + " : " + StringUtils.stringifyException(t);
-          LOG.error(message);
-          context.getEventHandler().handle(
-              new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, message));
-          context.getEventHandler().handle(
-              new TaskAttemptEvent(taskAttemptID,
-                  TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
+          sendContainerLaunchFailedMsg(taskAttemptID, message);
+        } finally {
+          timer.cancel();
         }
 
         break;
@@ -272,24 +323,44 @@ public class ContainerLauncherImpl extends AbstractService implements
           eventQueue.remove(event); // TODO: Any synchro needed?
           //deallocate the container
           context.getEventHandler().handle(
-              new ContainerAllocatorEvent(event.getTaskAttemptID(),
-              ContainerAllocator.EventType.CONTAINER_DEALLOCATE));
+              new ContainerAllocatorEvent(taskAttemptID,
+                  ContainerAllocator.EventType.CONTAINER_DEALLOCATE));
         } else {
-          try {
-            ContainerManager proxy = 
-              getCMProxy(containerID, containerManagerBindAddr, containerToken);
-            // TODO:check whether container is launched
-
-            // kill the remote container if already launched
-            StopContainerRequest stopRequest = recordFactory
-                .newRecordInstance(StopContainerRequest.class);
-            stopRequest.setContainerId(event.getContainerID());
-            proxy.stopContainer(stopRequest);
 
+          try {
+            timer.schedule(new CommandTimer(Thread.currentThread(), event),
+                nmTimeOut);
+
+            ContainerManager proxy = getCMProxy(containerManagerBindAddr,
+                containerToken);
+
+            if (Thread.currentThread().isInterrupted()) {
+              // The timer cancelled the command in the mean while. No need to
+              // return, send cleanedup event anyways.
+              LOG.info("Stop-container for " + event.getContainerID()
+                  + " got interrupted.");
+            } else {
+
+              // TODO:check whether container is launched
+
+              // kill the remote container if already launched
+              StopContainerRequest stopRequest = recordFactory
+                  .newRecordInstance(StopContainerRequest.class);
+              stopRequest.setContainerId(event.getContainerID());
+              proxy.stopContainer(stopRequest);
+            }
           } catch (Throwable t) {
-            //ignore the cleanup failure
-            LOG.warn("cleanup failed for container " + event.getContainerID() ,
-                t);
+            // ignore the cleanup failure
+            String message = "cleanup failed for container "
+                + event.getContainerID() + " : "
+                + StringUtils.stringifyException(t);
+            context.getEventHandler()
+                .handle(
+                    new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID,
+                        message));
+            LOG.warn(message);
+          } finally {
+            timer.cancel();
           }
 
           // after killing, send killed event to taskattempt
@@ -300,7 +371,17 @@ public class ContainerLauncherImpl extends AbstractService implements
         break;
       }
     }
-    
+  }
+
+  @SuppressWarnings("unchecked")
+  void sendContainerLaunchFailedMsg(TaskAttemptId taskAttemptID,
+      String message) {
+    LOG.error(message);
+    context.getEventHandler().handle(
+        new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, message));
+    context.getEventHandler().handle(
+        new TaskAttemptEvent(taskAttemptID,
+            TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
   }
 
   @Override

+ 132 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestContainerLauncher.java

@@ -0,0 +1,132 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.io.IOException;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.junit.Test;
+
+public class TestContainerLauncher {
+
+  static final Log LOG = LogFactory
+      .getLog(TestContainerLauncher.class);
+
+  @Test
+  public void testSlowNM() throws Exception {
+    test(false);
+  }
+
+  @Test
+  public void testSlowNMWithInterruptsSwallowed() throws Exception {
+    test(true);
+  }
+
+  private void test(boolean swallowInterrupts) throws Exception {
+
+    MRApp app = new MRAppWithSlowNM(swallowInterrupts);
+
+    Configuration conf = new Configuration();
+    int maxAttempts = 1;
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+
+    // Set low timeout for NM commands
+    conf.setInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT, 3000);
+
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+
+    Map<TaskId, Task> tasks = job.getTasks();
+    Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
+
+    Task task = tasks.values().iterator().next();
+    app.waitForState(task, TaskState.SCHEDULED);
+
+    Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator()
+        .next().getAttempts();
+    Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts
+        .size());
+
+    TaskAttempt attempt = attempts.values().iterator().next();
+    app.waitForState(attempt, TaskAttemptState.ASSIGNED);
+
+    app.waitForState(job, JobState.FAILED);
+
+    LOG.info("attempt.getDiagnostics: " + attempt.getDiagnostics());
+    Assert.assertTrue(attempt.getDiagnostics().toString().contains(
+        "Container launch failed for container_0_0000_01_000000 : "));
+    Assert.assertTrue(attempt.getDiagnostics().toString().contains(
+        ": java.lang.InterruptedException"));
+
+    app.stop();
+  }
+
+  private static class MRAppWithSlowNM extends MRApp {
+
+    final boolean swallowInterrupts;
+
+    public MRAppWithSlowNM(boolean swallowInterrupts) {
+      super(1, 0, false, "TestContainerLauncher", true);
+      this.swallowInterrupts = swallowInterrupts;
+    }
+
+    @Override
+    protected ContainerLauncher createContainerLauncher(AppContext context) {
+      return new ContainerLauncherImpl(context) {
+        @Override
+        protected ContainerManager getCMProxy(
+            String containerManagerBindAddr, ContainerToken containerToken)
+            throws IOException {
+          try {
+            synchronized (this) {
+              wait(); // Just hang the thread simulating a very slow NM.
+            }
+          } catch (InterruptedException e) {
+            LOG.info(e);
+            if (!swallowInterrupts) {
+              throw new IOException(e);
+            } else {
+              Thread.currentThread().interrupt();
+            }
+          }
+          return null;
+        }
+      };
+    };
+  }
+}

+ 1 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java

@@ -41,7 +41,6 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
 import org.apache.hadoop.yarn.api.ContainerManager;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
 import org.junit.Test;
 
@@ -219,7 +218,7 @@ public class TestFail {
         }
 
         @Override
-        protected ContainerManager getCMProxy(ContainerId containerID,
+        protected ContainerManager getCMProxy(
             String containerManagerBindAddr, ContainerToken containerToken)
             throws IOException {
           try {