Browse Source

MAPREDUCE-3616. Thread pool for launching containers in MR AM not expanding as expected. (Contributed by Vinod Kumar Vavilapalli)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1229394 13f79535-47bb-0310-9956-ffa450edef68
Siddharth Seth 13 years ago
parent
commit
239a5549ea

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -431,6 +431,9 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3624. Remove unnecessary dependency on JDK's tools.jar. (mahadev
     via acmurthy)
 
+    MAPREDUCE-3616. Thread pool for launching containers in MR AM not
+    expanding as expected. (vinodkv via sseth)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

+ 16 - 11
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java

@@ -76,8 +76,8 @@ public class ContainerLauncherImpl extends AbstractService implements
   int nmTimeOut;
 
   private AppContext context;
-  private ThreadPoolExecutor launcherPool;
-  private static final int INITIAL_POOL_SIZE = 10;
+  protected ThreadPoolExecutor launcherPool;
+  protected static final int INITIAL_POOL_SIZE = 10;
   private int limitOnPoolSize;
   private Thread eventHandlingThread;
   private BlockingQueue<ContainerLauncherEvent> eventQueue =
@@ -102,6 +102,7 @@ public class ContainerLauncherImpl extends AbstractService implements
     this.limitOnPoolSize = conf.getInt(
         MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT,
         MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT);
+    LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize);
     this.nmTimeOut = conf.getInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT,
         ContainerLauncher.DEFAULT_NM_COMMAND_TIMEOUT);
     this.rpc = YarnRPC.create(conf);
@@ -141,20 +142,21 @@ public class ContainerLauncherImpl extends AbstractService implements
             int numNodes = allNodes.size();
             int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
 
-            if (poolSize <= idealPoolSize) {
+            if (poolSize < idealPoolSize) {
               // Bump up the pool size to idealPoolSize+INITIAL_POOL_SIZE, the
               // later is just a buffer so we are not always increasing the
               // pool-size
-              int newPoolSize = idealPoolSize + INITIAL_POOL_SIZE;
-              LOG.info("Setting ContainerLauncher pool size to "
-                  + newPoolSize);
+              int newPoolSize = Math.min(limitOnPoolSize, idealPoolSize
+                  + INITIAL_POOL_SIZE);
+              LOG.info("Setting ContainerLauncher pool size to " + newPoolSize
+                  + " as number-of-nodes to talk to is " + numNodes);
               launcherPool.setCorePoolSize(newPoolSize);
             }
           }
 
           // the events from the queue are handled in parallel
           // using a thread pool
-          launcherPool.execute(new EventProcessor(event));
+          launcherPool.execute(createEventProcessor(event));
 
           // TODO: Group launching of multiple containers to a single
           // NodeManager into a single connection
@@ -172,14 +174,16 @@ public class ContainerLauncherImpl extends AbstractService implements
     super.stop();
   }
 
+  protected EventProcessor createEventProcessor(ContainerLauncherEvent event) {
+    return new EventProcessor(event);
+  }
+
   protected ContainerManager getCMProxy(ContainerId containerID,
       final String containerManagerBindAddr, ContainerToken containerToken)
       throws IOException {
 
     UserGroupInformation user = UserGroupInformation.getCurrentUser();
 
-    this.allNodes.add(containerManagerBindAddr);
-
     if (UserGroupInformation.isSecurityEnabled()) {
       Token<ContainerTokenIdentifier> token = new Token<ContainerTokenIdentifier>(
           containerToken.getIdentifier().array(), containerToken
@@ -244,7 +248,7 @@ public class ContainerLauncherImpl extends AbstractService implements
   /**
    * Setup and start the container on remote nodemanager.
    */
-  private class EventProcessor implements Runnable {
+  class EventProcessor implements Runnable {
     private ContainerLauncherEvent event;
 
     EventProcessor(ContainerLauncherEvent event) {
@@ -280,7 +284,7 @@ public class ContainerLauncherImpl extends AbstractService implements
           proxy = getCMProxy(containerID, containerManagerBindAddr,
               containerToken);
 
-          // Interruped during getProxy, but that didn't throw exception
+          // Interrupted during getProxy, but that didn't throw exception
           if (Thread.interrupted()) {
             // The timer cancelled the command in the mean while.
             String message = "Container launch failed for " + containerID
@@ -438,6 +442,7 @@ public class ContainerLauncherImpl extends AbstractService implements
   public void handle(ContainerLauncherEvent event) {
     try {
       eventQueue.put(event);
+      this.allNodes.add(event.getContainerMgrAddress());
     } catch (InterruptedException e) {
       throw new YarnException(e);
     }

+ 0 - 140
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestContainerLauncher.java

@@ -1,140 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.hadoop.mapreduce.v2.app;
-
-import java.io.IOException;
-import java.util.Map;
-
-import junit.framework.Assert;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.v2.api.records.JobState;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
-import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.app.job.Task;
-import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
-import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
-import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
-import org.apache.hadoop.yarn.api.ContainerManager;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerToken;
-import org.junit.Test;
-
-public class TestContainerLauncher {
-
-  static final Log LOG = LogFactory
-      .getLog(TestContainerLauncher.class);
-
-  @Test
-  public void testSlowNM() throws Exception {
-    test(false);
-  }
-
-  @Test
-  public void testSlowNMWithInterruptsSwallowed() throws Exception {
-    test(true);
-  }
-
-  private void test(boolean swallowInterrupts) throws Exception {
-
-    MRApp app = new MRAppWithSlowNM(swallowInterrupts);
-
-    Configuration conf = new Configuration();
-    int maxAttempts = 1;
-    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts);
-    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
-
-    // Set low timeout for NM commands
-    conf.setInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT, 3000);
-
-    Job job = app.submit(conf);
-    app.waitForState(job, JobState.RUNNING);
-
-    Map<TaskId, Task> tasks = job.getTasks();
-    Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
-
-    Task task = tasks.values().iterator().next();
-    app.waitForState(task, TaskState.SCHEDULED);
-
-    Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator()
-        .next().getAttempts();
-    Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts
-        .size());
-
-    TaskAttempt attempt = attempts.values().iterator().next();
-    app.waitForState(attempt, TaskAttemptState.ASSIGNED);
-
-    app.waitForState(job, JobState.FAILED);
-
-    String diagnostics = attempt.getDiagnostics().toString();
-    LOG.info("attempt.getDiagnostics: " + diagnostics);
-    if (swallowInterrupts) {
-      Assert.assertEquals("[Container launch failed for "
-          + "container_0_0000_01_000000 : Start-container for "
-          + "container_0_0000_01_000000 got interrupted. Returning.]",
-          diagnostics);
-    } else {
-      Assert.assertTrue(diagnostics.contains("Container launch failed for "
-          + "container_0_0000_01_000000 : "));
-      Assert.assertTrue(diagnostics
-          .contains(": java.lang.InterruptedException"));
-    }
-
-    app.stop();
-  }
-
-  private static class MRAppWithSlowNM extends MRApp {
-
-    final boolean swallowInterrupts;
-
-    public MRAppWithSlowNM(boolean swallowInterrupts) {
-      super(1, 0, false, "TestContainerLauncher", true);
-      this.swallowInterrupts = swallowInterrupts;
-    }
-
-    @Override
-    protected ContainerLauncher createContainerLauncher(AppContext context) {
-      return new ContainerLauncherImpl(context) {
-        @Override
-        protected ContainerManager getCMProxy(ContainerId containerID,
-            String containerManagerBindAddr, ContainerToken containerToken)
-            throws IOException {
-          try {
-            synchronized (this) {
-              wait(); // Just hang the thread simulating a very slow NM.
-            }
-          } catch (InterruptedException e) {
-            LOG.info(e);
-            if (!MRAppWithSlowNM.this.swallowInterrupts) {
-              throw new IOException(e);
-            }
-            Thread.currentThread().interrupt();
-          }
-          return null;
-        }
-      };
-    };
-  }
-}

+ 321 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java

@@ -0,0 +1,321 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.launcher;
+
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.MRApp;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Test;
+
+public class TestContainerLauncher {
+
+  static final Log LOG = LogFactory
+      .getLog(TestContainerLauncher.class);
+
+  @Test
+  public void testPoolSize() throws InterruptedException {
+
+    ApplicationId appId = BuilderUtils.newApplicationId(12345, 67);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+      appId, 3);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 8);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 9, TaskType.MAP);
+    TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 10);
+
+    AppContext context = mock(AppContext.class);
+    CustomContainerLauncher containerLauncher = new CustomContainerLauncher(
+      context);
+    containerLauncher.init(new Configuration());
+    containerLauncher.start();
+
+    ThreadPoolExecutor threadPool = containerLauncher.getThreadPool();
+
+    // No events yet
+    Assert.assertEquals(0, threadPool.getPoolSize());
+    Assert.assertEquals(ContainerLauncherImpl.INITIAL_POOL_SIZE,
+      threadPool.getCorePoolSize());
+    Assert.assertNull(containerLauncher.foundErrors);
+
+    containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE;
+    for (int i = 0; i < 10; i++) {
+      containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
+        containerId, "host" + i + ":1234", null,
+        ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
+    }
+    waitForEvents(containerLauncher, 10);
+    Assert.assertEquals(10, threadPool.getPoolSize());
+    Assert.assertNull(containerLauncher.foundErrors);
+
+    // Same set of hosts, so no change
+    containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE;
+    containerLauncher.finishEventHandling = true;
+    for (int i = 0; i < 10; i++) {
+      containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
+        containerId, "host" + i + ":1234", null,
+        ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
+    }
+    waitForEvents(containerLauncher, 20);
+    Assert.assertEquals(10, threadPool.getPoolSize());
+    Assert.assertNull(containerLauncher.foundErrors);
+
+    // Different hosts, there should be an increase in core-thread-pool size to
+    // 21(11hosts+10buffer)
+    // Core pool size should be 21 but the live pool size should be only 11.
+    containerLauncher.expectedCorePoolSize = 12 + ContainerLauncherImpl.INITIAL_POOL_SIZE;
+    for (int i = 1; i <= 2; i++) {
+      containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
+        containerId, "host1" + i + ":1234", null,
+        ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
+    }
+    waitForEvents(containerLauncher, 22);
+    Assert.assertEquals(12, threadPool.getPoolSize());
+    Assert.assertNull(containerLauncher.foundErrors);
+
+    containerLauncher.stop();
+  }
+
+  @Test
+  public void testPoolLimits() throws InterruptedException {
+    ApplicationId appId = BuilderUtils.newApplicationId(12345, 67);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+      appId, 3);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 8);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 9, TaskType.MAP);
+    TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 10);
+
+    AppContext context = mock(AppContext.class);
+    CustomContainerLauncher containerLauncher = new CustomContainerLauncher(
+      context);
+    Configuration conf = new Configuration();
+    conf.setInt(MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT, 12);
+    containerLauncher.init(conf);
+    containerLauncher.start();
+
+    ThreadPoolExecutor threadPool = containerLauncher.getThreadPool();
+
+    // 10 different hosts
+    containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE;
+    for (int i = 0; i < 10; i++) {
+      containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
+        containerId, "host" + i + ":1234", null,
+        ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
+    }
+    waitForEvents(containerLauncher, 10);
+    Assert.assertEquals(10, threadPool.getPoolSize());
+    Assert.assertNull(containerLauncher.foundErrors);
+
+    // 4 more different hosts, but thread pool size should be capped at 12
+    containerLauncher.expectedCorePoolSize = 12 ;
+    for (int i = 1; i <= 4; i++) {
+      containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
+        containerId, "host1" + i + ":1234", null,
+        ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
+    }
+    waitForEvents(containerLauncher, 12);
+    Assert.assertEquals(12, threadPool.getPoolSize());
+    Assert.assertNull(containerLauncher.foundErrors);
+
+    // Make some threads ideal so that remaining events are also done.
+    containerLauncher.finishEventHandling = true;
+    waitForEvents(containerLauncher, 14);
+    Assert.assertEquals(12, threadPool.getPoolSize());
+    Assert.assertNull(containerLauncher.foundErrors);
+
+    containerLauncher.stop();
+  }
+
+  private void waitForEvents(CustomContainerLauncher containerLauncher,
+      int expectedNumEvents) throws InterruptedException {
+    int timeOut = 20;
+    while (expectedNumEvents != containerLauncher.numEventsProcessed
+        || timeOut++ < 20) {
+      LOG.info("Waiting for number of events to become " + expectedNumEvents
+          + ". It is now " + containerLauncher.numEventsProcessed);
+      Thread.sleep(1000);
+    }
+    Assert
+      .assertEquals(expectedNumEvents, containerLauncher.numEventsProcessed);
+  }
+
+  @Test
+  public void testSlowNM() throws Exception {
+    test(false);
+  }
+
+  @Test
+  public void testSlowNMWithInterruptsSwallowed() throws Exception {
+    test(true);
+  }
+
+  private void test(boolean swallowInterrupts) throws Exception {
+
+    MRApp app = new MRAppWithSlowNM(swallowInterrupts);
+
+    Configuration conf = new Configuration();
+    int maxAttempts = 1;
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+
+    // Set low timeout for NM commands
+    conf.setInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT, 3000);
+
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+
+    Map<TaskId, Task> tasks = job.getTasks();
+    Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
+
+    Task task = tasks.values().iterator().next();
+    app.waitForState(task, TaskState.SCHEDULED);
+
+    Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator()
+        .next().getAttempts();
+    Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts
+        .size());
+
+    TaskAttempt attempt = attempts.values().iterator().next();
+    app.waitForState(attempt, TaskAttemptState.ASSIGNED);
+
+    app.waitForState(job, JobState.FAILED);
+
+    String diagnostics = attempt.getDiagnostics().toString();
+    LOG.info("attempt.getDiagnostics: " + diagnostics);
+    if (swallowInterrupts) {
+      Assert.assertEquals("[Container launch failed for "
+          + "container_0_0000_01_000000 : Start-container for "
+          + "container_0_0000_01_000000 got interrupted. Returning.]",
+          diagnostics);
+    } else {
+      Assert.assertTrue(diagnostics.contains("Container launch failed for "
+          + "container_0_0000_01_000000 : "));
+      Assert.assertTrue(diagnostics
+          .contains(": java.lang.InterruptedException"));
+    }
+
+    app.stop();
+  }
+
+  private final class CustomContainerLauncher extends ContainerLauncherImpl {
+
+    private volatile int expectedCorePoolSize = 0;
+    private volatile int numEventsProcessed = 0;
+    private volatile String foundErrors = null;
+    private volatile boolean finishEventHandling;
+    private CustomContainerLauncher(AppContext context) {
+      super(context);
+    }
+
+    public ThreadPoolExecutor getThreadPool() {
+      return super.launcherPool;
+    }
+
+    protected ContainerLauncherImpl.EventProcessor createEventProcessor(
+        ContainerLauncherEvent event) {
+      // At this point of time, the EventProcessor is being created and so no
+      // additional threads would have been created.
+
+      // Core-pool-size should have increased by now.
+      if (expectedCorePoolSize != launcherPool.getCorePoolSize()) {
+        foundErrors = "Expected " + expectedCorePoolSize + " but found "
+            + launcherPool.getCorePoolSize();
+      }
+
+      return new ContainerLauncherImpl.EventProcessor(event) {
+        @Override
+        public void run() {
+          // do nothing substantial
+          numEventsProcessed++;
+          // Stall
+          synchronized(this) {
+            try {
+              while(!finishEventHandling) {
+                wait(1000);
+              }
+            } catch (InterruptedException e) {
+              ;
+            }
+          }
+        }
+      };
+    }
+  }
+
+  private static class MRAppWithSlowNM extends MRApp {
+
+    final boolean swallowInterrupts;
+
+    public MRAppWithSlowNM(boolean swallowInterrupts) {
+      super(1, 0, false, "TestContainerLauncher", true);
+      this.swallowInterrupts = swallowInterrupts;
+    }
+
+    @Override
+    protected ContainerLauncher createContainerLauncher(AppContext context) {
+      return new ContainerLauncherImpl(context) {
+        @Override
+        protected ContainerManager getCMProxy(ContainerId containerID,
+            String containerManagerBindAddr, ContainerToken containerToken)
+            throws IOException {
+          try {
+            synchronized (this) {
+              wait(); // Just hang the thread simulating a very slow NM.
+            }
+          } catch (InterruptedException e) {
+            LOG.info(e);
+            if (!MRAppWithSlowNM.this.swallowInterrupts) {
+              throw new IOException(e);
+            }
+            Thread.currentThread().interrupt();
+          }
+          return null;
+        }
+      };
+    };
+  }
+}

+ 22 - 13
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java

@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.net.URI;
+import java.net.URL;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -181,23 +182,31 @@ public class MRApps extends Apps {
       String mrAppGeneratedClasspathFile = "mrapp-generated-classpath";
       classpathFileStream =
           thisClassLoader.getResourceAsStream(mrAppGeneratedClasspathFile);
+
       // Put the file itself on classpath for tasks.
-      String classpathElement = thisClassLoader.getResource(mrAppGeneratedClasspathFile).getFile();
-      if (classpathElement.contains("!")) {
-        classpathElement = classpathElement.substring(0, classpathElement.indexOf("!"));
+      URL classpathResource = thisClassLoader
+        .getResource(mrAppGeneratedClasspathFile);
+      if (classpathResource != null) {
+        String classpathElement = classpathResource.getFile();
+        if (classpathElement.contains("!")) {
+          classpathElement = classpathElement.substring(0,
+            classpathElement.indexOf("!"));
+        } else {
+          classpathElement = new File(classpathElement).getParent();
+        }
+        Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
+          classpathElement);
       }
-      else {
-        classpathElement = new File(classpathElement).getParent();
+
+      if (classpathFileStream != null) {
+        reader = new BufferedReader(new InputStreamReader(classpathFileStream));
+        String cp = reader.readLine();
+        if (cp != null) {
+          Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
+            cp.trim());
+        }
       }
-      Apps.addToEnvironment(
-          environment,
-          Environment.CLASSPATH.name(), classpathElement);
 
-      reader = new BufferedReader(new InputStreamReader(classpathFileStream));
-      String cp = reader.readLine();
-      if (cp != null) {
-        Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), cp.trim());
-      }      
       // Add standard Hadoop classes
       for (String c : ApplicationConstants.APPLICATION_CLASSPATH) {
         Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c);