Browse Source

YARN-966. Fixed ContainerLaunch to not fail quietly when there are no localized resources due to some other failure. Contributed by Zhijie Shen.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1508688 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 12 years ago
parent
commit
813efd25a1

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -42,6 +42,9 @@ Release 2.1.1-beta - UNRELEASED
     YARN-948. Changed ResourceManager to validate the release container list
     before actually releasing them. (Omkar Vinit Joshi via vinodkv)
 
+    YARN-966. Fixed ContainerLaunch to not fail quietly when there are no
+    localized resources due to some other failure. (Zhijie Shen via vinodkv)
+
 Release 2.1.0-beta - 2013-08-06
 
   INCOMPATIBLE CHANGES

+ 5 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java

@@ -335,8 +335,11 @@ public class ContainerImpl implements Container {
   public Map<Path,List<String>> getLocalizedResources() {
     this.readLock.lock();
     try {
-    assert ContainerState.LOCALIZED == getContainerState(); // TODO: FIXME!!
-    return localizedResources;
+      if (ContainerState.LOCALIZED == getContainerState()) {
+        return localizedResources;
+      } else {
+        return null;
+      }
     } finally {
       this.readLock.unlock();
     }

+ 9 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java

@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.DelayedProcessKiller;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
@@ -120,14 +121,20 @@ public class ContainerLaunch implements Callable<Integer> {
   @SuppressWarnings("unchecked") // dispatcher not typed
   public Integer call() {
     final ContainerLaunchContext launchContext = container.getLaunchContext();
-    final Map<Path,List<String>> localResources =
-        container.getLocalizedResources();
+    Map<Path,List<String>> localResources = null;
     ContainerId containerID = container.getContainerId();
     String containerIdStr = ConverterUtils.toString(containerID);
     final List<String> command = launchContext.getCommands();
     int ret = -1;
 
     try {
+      localResources = container.getLocalizedResources();
+      if (localResources == null) {
+        RPCUtil.getRemoteException(
+            "Unable to get local resources when Container " + containerID +
+            " is at " + container.getContainerState());
+      }
+
       final String user = container.getUser();
       // /////////////////////////// Variable expansion
       // Before the container script gets written out.

+ 21 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java

@@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
@@ -124,6 +126,7 @@ public class TestContainer {
 
       // all resources should be localized
       assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
+      assertNotNull(wc.c.getLocalizedResources());
       for (Entry<Path, List<String>> loc : wc.c.getLocalizedResources()
           .entrySet()) {
         assertEquals(localPaths.remove(loc.getKey()), loc.getValue());
@@ -161,6 +164,7 @@ public class TestContainer {
       wc.containerKilledOnRequest();
       assertEquals(ContainerState.EXITED_WITH_FAILURE, 
           wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       verifyCleanupCall(wc);
     }
     finally {
@@ -183,6 +187,7 @@ public class TestContainer {
       wc.containerFailed(ExitCode.FORCE_KILLED.getExitCode());
       assertEquals(ContainerState.EXITED_WITH_FAILURE, 
           wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       verifyCleanupCall(wc);
     }
     finally {
@@ -205,7 +210,7 @@ public class TestContainer {
       wc.containerSuccessful();
       assertEquals(ContainerState.EXITED_WITH_SUCCESS,
           wc.c.getContainerState());
-      
+      assertNull(wc.c.getLocalizedResources());
       verifyCleanupCall(wc);
     }
     finally {
@@ -228,10 +233,12 @@ public class TestContainer {
       wc.containerSuccessful();
       wc.containerResourcesCleanup();
       assertEquals(ContainerState.DONE, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       // Now in DONE, issue INIT
       wc.initContainer();
       // Verify still in DONE
       assertEquals(ContainerState.DONE, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       verifyCleanupCall(wc);
     }
     finally {
@@ -255,10 +262,12 @@ public class TestContainer {
       wc.containerSuccessful();
       wc.containerResourcesCleanup();
       assertEquals(ContainerState.DONE, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       // Now in DONE, issue RESOURCE_FAILED as done by LocalizeRunner
       wc.resourceFailedContainer();
       // Verify still in DONE
       assertEquals(ContainerState.DONE, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       verifyCleanupCall(wc);
     } finally {
       if (wc != null) {
@@ -279,6 +288,7 @@ public class TestContainer {
       reset(wc.localizerBus);
       wc.killContainer();
       assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       wc.containerKilledOnRequest();
       
       verifyCleanupCall(wc);
@@ -297,8 +307,10 @@ public class TestContainer {
       wc.initContainer();
       wc.failLocalizeResources(wc.getLocalResourceCount());
       assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       wc.killContainer();
       assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       verifyCleanupCall(wc);
     } finally {
       if (wc != null) {
@@ -319,8 +331,10 @@ public class TestContainer {
       }
       wc.failLocalizeResources(failCount);
       assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       wc.localizeResourcesFromInvalidState(failCount);
       assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       verifyCleanupCall(wc);
       Assert.assertTrue(wc.getDiagnostics().contains(FAKE_LOCALIZATION_ERROR));
     } finally {
@@ -342,8 +356,10 @@ public class TestContainer {
       String key2 = lRsrcKeys.next();
       wc.failLocalizeSpecificResource(key1);
       assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       wc.failLocalizeSpecificResource(key2);
       assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       verifyCleanupCall(wc);
     } finally {
       if (wc != null) {
@@ -363,8 +379,10 @@ public class TestContainer {
       String key1 = lRsrcKeys.next();
       wc.killContainer();
       assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       wc.failLocalizeSpecificResource(key1);
       assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       verifyCleanupCall(wc);
     } finally {
       if (wc != null) {
@@ -425,8 +443,10 @@ public class TestContainer {
       wc.localizeResources();
       wc.killContainer();
       assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       wc.launchContainer();
       assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
       wc.containerKilledOnRequest();
       verifyCleanupCall(wc);
     } finally {

+ 36 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java

@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -28,12 +30,14 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import junit.framework.Assert;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
@@ -59,10 +63,16 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
 import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -617,6 +627,32 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
     }
   }
 
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testCallFailureWithNullLocalizedResources() {
+    Container container = mock(Container.class);
+    when(container.getContainerId()).thenReturn(ContainerId.newInstance(
+        ApplicationAttemptId.newInstance(ApplicationId.newInstance(
+            System.currentTimeMillis(), 1), 1), 1));
+    ContainerLaunchContext clc = mock(ContainerLaunchContext.class);
+    when(clc.getCommands()).thenReturn(Collections.<String>emptyList());
+    when(container.getLaunchContext()).thenReturn(clc);
+    when(container.getLocalizedResources()).thenReturn(null);
+    Dispatcher dispatcher = mock(Dispatcher.class);
+    EventHandler eventHandler = new EventHandler() {
+      public void handle(Event event) {
+        Assert.assertTrue(event instanceof ContainerExitEvent);
+        ContainerExitEvent exitEvent = (ContainerExitEvent) event;
+        Assert.assertEquals(ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+            exitEvent.getType());
+      }
+    };
+    when(dispatcher.getEventHandler()).thenReturn(eventHandler);
+    ContainerLaunch launch = new ContainerLaunch(context, new Configuration(),
+        dispatcher, exec, null, container, dirsHandler);
+    launch.call();
+  }
+
   protected Token createContainerToken(ContainerId cId) throws InvalidToken {
     Resource r = BuilderUtils.newResource(1024, 1);
     ContainerTokenIdentifier containerTokenIdentifier =