Ver Fonte

YARN-3626. On Windows localized resources are not moved to the front of the classpath when they should be. Contributed by Craig Welch

(cherry picked from commit 0f95921447ea547bdf9caf18f7fde46bc66031f8)
Xuan há 10 anos atrás
pai
commit
487d9b0f3f

+ 6 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java

@@ -241,7 +241,12 @@ public class MRApps extends Apps {
     boolean userClassesTakesPrecedence = 
       conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false);
 
-    String classpathEnvVar = 
+    if (userClassesTakesPrecedence) {
+      conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH_PREPEND_DISTCACHE,
+        "true");
+    }
+
+    String classpathEnvVar =
       conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)
         ? Environment.APP_CLASSPATH.name() : Environment.CLASSPATH.name();
 

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -438,6 +438,9 @@ Release 2.7.1 - UNRELEASED
     YARN-3493. RM fails to come up with error "Failed to load/recover state" 
     when mem settings are changed. (Jian He via wangda)
 
+    YARN-3626. On Windows localized resources are not moved to the front
+    of the classpath when they should be. (Craig Welch via xgong)
+
 Release 2.7.0 - 2015-04-20
 
   INCOMPATIBLE CHANGES

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -1248,6 +1248,16 @@ public class YarnConfiguration extends Configuration {
   public static final String YARN_APPLICATION_CLASSPATH = YARN_PREFIX
       + "application.classpath";
 
+  /**
+   * Whether or not entries from the distributed cache should be preferred over
+   * the rest of the YARN CLASSPATH
+   */
+  public static final String YARN_APPLICATION_CLASSPATH_PREPEND_DISTCACHE =
+    YARN_PREFIX + "application.classpath.prepend.distcache";
+
+  public static final boolean
+    DEFAULT_YARN_APPLICATION_CLASSPATH_PREPEND_DISTCACHE = false;
+
   /**
    * Default platform-agnostic CLASSPATH for YARN applications. A
    * comma-separated list of CLASSPATH entries. The parameter expansion marker

+ 33 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java

@@ -726,8 +726,28 @@ public class ContainerLaunch implements Callable<Integer> {
     if (Shell.WINDOWS) {
       
       String inputClassPath = environment.get(Environment.CLASSPATH.name());
+
       if (inputClassPath != null && !inputClassPath.isEmpty()) {
-        StringBuilder newClassPath = new StringBuilder(inputClassPath);
+
+        //On non-windows, localized resources
+        //from distcache are available via the classpath as they were placed
+        //there but on windows they are not available when the classpath
+        //jar is created and so they "are lost" and have to be explicitly
+        //added to the classpath instead.  This also means that their position
+        //is lost relative to other non-distcache classpath entries which will
+        //break things like mapreduce.job.user.classpath.first.
+
+        boolean preferLocalizedJars = conf.getBoolean(
+          YarnConfiguration.YARN_APPLICATION_CLASSPATH_PREPEND_DISTCACHE,
+          YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH_PREPEND_DISTCACHE
+          );
+
+        boolean needsSeparator = false;
+        StringBuilder newClassPath = new StringBuilder();
+        if (!preferLocalizedJars) {
+          newClassPath.append(inputClassPath);
+          needsSeparator = true;
+        }
 
         // Localized resources do not exist at the desired paths yet, because the
         // container launch script has not run to create symlinks yet.  This
@@ -741,7 +761,12 @@ public class ContainerLaunch implements Callable<Integer> {
 
           for (String linkName : entry.getValue()) {
             // Append resource.
-            newClassPath.append(File.pathSeparator).append(pwd.toString())
+            if (needsSeparator) {
+              newClassPath.append(File.pathSeparator);
+            } else {
+              needsSeparator = true;
+            }
+            newClassPath.append(pwd.toString())
               .append(Path.SEPARATOR).append(linkName);
 
             // FileUtil.createJarWithClassPath must use File.toURI to convert
@@ -758,6 +783,12 @@ public class ContainerLaunch implements Callable<Integer> {
             }
           }
         }
+        if (preferLocalizedJars) {
+          if (needsSeparator) {
+            newClassPath.append(File.pathSeparator);
+          }
+          newClassPath.append(inputClassPath);
+        }
 
         // When the container launches, it takes the parent process's environment
         // and then adds/overwrites with the entries from the container launch

+ 129 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java

@@ -39,6 +39,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
@@ -65,6 +68,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
@@ -78,12 +82,18 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
 import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.ShellScriptBuilder;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
@@ -97,6 +107,17 @@ import org.junit.Test;
 
 public class TestContainerLaunch extends BaseContainerManagerTest {
 
+  protected Context distContext = new NMContext(new NMContainerTokenSecretManager(
+    conf), new NMTokenSecretManagerInNM(), null,
+    new ApplicationACLsManager(conf), new NMNullStateStoreService()) {
+    public int getHttpPort() {
+      return HTTP_PORT;
+    };
+    public NodeId getNodeId() {
+      return NodeId.newInstance("ahost", 1234);
+    };
+  };
+
   public TestContainerLaunch() throws UnsupportedFileSystemException {
     super();
   }
@@ -371,6 +392,114 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
     }
   }
 
+  @Test
+  public void testPrependDistcache() throws Exception {
+
+    // Test is only relevant on Windows
+    Assume.assumeTrue(Shell.WINDOWS);
+
+    ContainerLaunchContext containerLaunchContext =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+    ApplicationId appId = ApplicationId.newInstance(0, 0);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+
+    ContainerId cId = ContainerId.newContainerId(appAttemptId, 0);
+    Map<String, String> userSetEnv = new HashMap<String, String>();
+    userSetEnv.put(Environment.CONTAINER_ID.name(), "user_set_container_id");
+    userSetEnv.put(Environment.NM_HOST.name(), "user_set_NM_HOST");
+    userSetEnv.put(Environment.NM_PORT.name(), "user_set_NM_PORT");
+    userSetEnv.put(Environment.NM_HTTP_PORT.name(), "user_set_NM_HTTP_PORT");
+    userSetEnv.put(Environment.LOCAL_DIRS.name(), "user_set_LOCAL_DIR");
+    userSetEnv.put(Environment.USER.key(), "user_set_" +
+      Environment.USER.key());
+    userSetEnv.put(Environment.LOGNAME.name(), "user_set_LOGNAME");
+    userSetEnv.put(Environment.PWD.name(), "user_set_PWD");
+    userSetEnv.put(Environment.HOME.name(), "user_set_HOME");
+    userSetEnv.put(Environment.CLASSPATH.name(), "SYSTEM_CLPATH");
+    containerLaunchContext.setEnvironment(userSetEnv);
+    Container container = mock(Container.class);
+    when(container.getContainerId()).thenReturn(cId);
+    when(container.getLaunchContext()).thenReturn(containerLaunchContext);
+    when(container.getLocalizedResources()).thenReturn(null);
+    Dispatcher dispatcher = mock(Dispatcher.class);
+    EventHandler eventHandler = new EventHandler() {
+      public void handle(Event event) {
+        Assert.assertTrue(event instanceof ContainerExitEvent);
+        ContainerExitEvent exitEvent = (ContainerExitEvent) event;
+        Assert.assertEquals(ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+            exitEvent.getType());
+      }
+    };
+    when(dispatcher.getEventHandler()).thenReturn(eventHandler);
+
+    Configuration conf = new Configuration();
+
+    ContainerLaunch launch = new ContainerLaunch(distContext, conf,
+        dispatcher, exec, null, container, dirsHandler, containerManager);
+
+    String testDir = System.getProperty("test.build.data",
+        "target/test-dir");
+    Path pwd = new Path(testDir);
+    List<Path> appDirs = new ArrayList<Path>();
+    List<String> containerLogs = new ArrayList<String>();
+
+    Map<Path, List<String>> resources = new HashMap<Path, List<String>>();
+    Path userjar = new Path("user.jar");
+    List<String> lpaths = new ArrayList<String>();
+    lpaths.add("userjarlink.jar");
+    resources.put(userjar, lpaths);
+
+    Path nmp = new Path(testDir);
+
+    launch.sanitizeEnv(
+      userSetEnv, pwd, appDirs, containerLogs, resources, nmp);
+
+    List<String> result =
+      getJarManifestClasspath(userSetEnv.get(Environment.CLASSPATH.name()));
+
+    Assert.assertTrue(result.size() > 1);
+    Assert.assertTrue(
+      result.get(result.size() - 1).endsWith("userjarlink.jar"));
+
+    //Now move userjar to the front
+
+    cId = ContainerId.newContainerId(appAttemptId, 1);
+    when(container.getContainerId()).thenReturn(cId);
+
+    conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH_PREPEND_DISTCACHE,
+      "true");
+
+    launch = new ContainerLaunch(distContext, conf,
+        dispatcher, exec, null, container, dirsHandler, containerManager);
+
+    launch.sanitizeEnv(
+      userSetEnv, pwd, appDirs, containerLogs, resources, nmp);
+
+    result =
+      getJarManifestClasspath(userSetEnv.get(Environment.CLASSPATH.name()));
+
+    Assert.assertTrue(result.size() > 1);
+    Assert.assertTrue(
+      result.get(0).endsWith("userjarlink.jar"));
+
+  }
+
+  private static List<String> getJarManifestClasspath(String path)
+      throws Exception {
+    List<String> classpath = new ArrayList<String>();
+    JarFile jarFile = new JarFile(path);
+    Manifest manifest = jarFile.getManifest();
+    String cps = manifest.getMainAttributes().getValue("Class-Path");
+    StringTokenizer cptok = new StringTokenizer(cps);
+    while (cptok.hasMoreTokens()) {
+      String cpentry = cptok.nextToken();
+      classpath.add(cpentry);
+    }
+    return classpath;
+  }
+
   /**
    * See if environment variable is forwarded using sanitizeEnv.
    * @throws Exception