فهرست منبع

YARN-233. Added support for running containers in MS Windows to YARN. Contributed by Chris Nauroth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-trunk-win@1418433 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 12 سال پیش
والد
کامیت
1f1b81db6d
10فایلهای تغییر یافته به همراه306 افزوده شده و 121 حذف شده
  1. 49 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
  2. 9 1
      hadoop-common-project/hadoop-common/src/main/winutils/task.c
  3. 0 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
  4. 4 0
      hadoop-yarn-project/CHANGES.branch-trunk-win.txt
  5. 6 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java
  6. 1 17
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
  7. 113 45
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
  8. 91 46
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
  9. 21 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/ProcessIdFileReader.java
  10. 12 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestProcessIdFileReader.java

+ 49 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java

@@ -119,6 +119,32 @@ abstract public class Shell {
     return WINDOWS ? new String[] { WINUTILS, "symlink", link, target }
                    : new String[] { "ln", "-s", target, link };
   }
+  
+  /** Return a command to execute the given command in OS shell.
+   *  On Windows, the passed in groupId can be used to launch
+   *  and associate the given groupId in a process group. On
+   *  non-Windows, groupId is ignored. */
+  public static String[] getRunCommand(String command,
+                                       String groupId) {
+    if (WINDOWS) {
+      return new String[] { WINUTILS, "task", "create", groupId,
+        "cmd /c " + command };
+    } else {
+      return new String[] { "bash", "-c", command };
+    }
+  }
+
+  /** Return a command for determining if process with specified pid is alive. */
+  public static String[] getCheckProcessIsAliveCommand(String pid) {
+    return WINDOWS ? new String[] { WINUTILS, "task", "isAlive", pid } :
+      new String[] { "kill", "-0", isSetsidAvailable ? "-" + pid : pid };
+  }
+
+  /** Return a command to send a signal to a given pid */
+  public static String[] getSignalKillCommand(int code, String pid) {
+    return WINDOWS ? new String[] { WINUTILS, "task", "kill", pid } :
+      new String[] { "kill", "-" + code, isSetsidAvailable ? "-" + pid : pid };
+  }
 
   /** a Unix command to set permission */
   public static final String SET_PERMISSION_COMMAND = "chmod";
@@ -140,7 +166,30 @@ abstract public class Shell {
   /** Set to true on Windows platforms */
   public static final boolean WINDOWS /* borrowed from Path.WINDOWS */
                 = System.getProperty("os.name").startsWith("Windows");
+
+  public static final boolean LINUX
+                = System.getProperty("os.name").startsWith("Linux");
   
+  public static final boolean isSetsidAvailable = isSetsidSupported();
+  private static boolean isSetsidSupported() {
+    if (WINDOWS) {
+      return true;
+    }
+    ShellCommandExecutor shexec = null;
+    boolean setsidSupported = true;
+    try {
+      String[] args = {"setsid", "bash", "-c", "echo $$"};
+      shexec = new ShellCommandExecutor(args);
+      shexec.execute();
+    } catch (IOException ioe) {
+      LOG.warn("setsid is not available on this machine. So not using it.");
+      setsidSupported = false;
+    } finally { // handle the exit code
+      LOG.info("setsid exited with exit code " + shexec.getExitCode());
+    }
+    return setsidSupported;
+  }
+
   private long    interval;   // refresh interval in msec
   private long    lastTime;   // last time the command was performed
   private Map<String, String> environment; // env for the command execution

+ 9 - 1
hadoop-common-project/hadoop-common/src/main/winutils/task.c

@@ -22,6 +22,8 @@
 #define PSAPI_VERSION 1
 #pragma comment(lib, "psapi.lib")
 
+#define ERROR_TASK_NOT_ALIVE 1
+
 // List of different task related command line options supported by
 // winutils.
 typedef enum TaskCommandOptionType
@@ -401,6 +403,12 @@ int Task(int argc, wchar_t *argv[])
     {
       fwprintf(stdout, L"IsAlive,%d\n", numProcs);
     }
+    else
+    {
+      dwErrorCode = ERROR_TASK_NOT_ALIVE;
+      ReportErrorCode(L"isTaskAlive returned false", dwErrorCode);
+      goto TaskExit;
+    }
   } else if (command == TaskKill)
   {
     // Check if task jobobject
@@ -450,4 +458,4 @@ void TaskUsage()
     and comma separated info per process\n\
     ProcessId,VirtualMemoryCommitted(bytes),\n\
     WorkingSetSize(bytes),CpuTime(Millisec,Kernel+User)\n");
-}
+}

+ 0 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java

@@ -164,7 +164,6 @@ public class MapReduceChildJVM {
 
     Vector<String> vargs = new Vector<String>(8);
 
-    vargs.add("exec");
     vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
 
     // Add child (task) java-vm options.

+ 4 - 0
hadoop-yarn-project/CHANGES.branch-trunk-win.txt

@@ -12,3 +12,7 @@ branch-trunk-win changes - unreleased
 
   YARN-213. YARN build script would be more readable using abspath.
   (Chris Nauroth via suresh)
+
+  YARN-233. Added support for running containers in MS Windows to YARN. (Chris
+  Nauroth via acmurthy)
+

+ 6 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.api;
 
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Shell;
 
 /**
  * This is the API for the applications comprising of constants that YARN sets
@@ -187,7 +188,11 @@ public interface ApplicationConstants {
     }
     
     public String $() {
-      return "$" + variable;
+      if (Shell.WINDOWS) {
+        return "%" + variable + "%";
+      } else {
+        return "$" + variable;
+      }
     }
   }
 }

+ 1 - 17
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
+import org.apache.hadoop.util.Shell;
 
 public abstract class ContainerExecutor implements Configurable {
 
@@ -251,23 +252,6 @@ public abstract class ContainerExecutor implements Configurable {
     return pid;
   }
 
-  public static final boolean isSetsidAvailable = isSetsidSupported();
-  private static boolean isSetsidSupported() {
-    ShellCommandExecutor shexec = null;
-    boolean setsidSupported = true;
-    try {
-      String[] args = {"setsid", "bash", "-c", "echo $$"};
-      shexec = new ShellCommandExecutor(args);
-      shexec.execute();
-    } catch (IOException ioe) {
-      LOG.warn("setsid is not available on this machine. So not using it.");
-      setsidSupported = false;
-    } finally { // handle the exit code
-      LOG.info("setsid exited with exit code " + shexec.getExitCode());
-    }
-    return setsidSupported;
-  }
-
   public static class DelayedProcessKiller extends Thread {
     private final String user;
     private final String pid;

+ 113 - 45
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java

@@ -37,6 +37,8 @@ import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -55,9 +57,6 @@ public class DefaultContainerExecutor extends ContainerExecutor {
 
   private final FileContext lfs;
 
-  private static final String WRAPPER_LAUNCH_SCRIPT = 
-      "default_container_executor.sh";
-
   public DefaultContainerExecutor() {
     try {
       this.lfs = FileContext.getLocalFSFileContext();
@@ -145,15 +144,13 @@ public class DefaultContainerExecutor extends ContainerExecutor {
     lfs.util().copy(nmPrivateTokensPath, tokenDst);
 
     // Create new local launch wrapper script
-    Path wrapperScriptDst = new Path(containerWorkDir, WRAPPER_LAUNCH_SCRIPT);
-    DataOutputStream wrapperScriptOutStream =
-        lfs.create(wrapperScriptDst,
-            EnumSet.of(CREATE, OVERWRITE));
+    LocalWrapperScriptBuilder sb = Shell.WINDOWS ?
+      new WindowsLocalWrapperScriptBuilder(containerIdStr, containerWorkDir) :
+      new UnixLocalWrapperScriptBuilder(containerWorkDir);
 
     Path pidFile = getPidFilePath(containerId);
     if (pidFile != null) {
-      writeLocalWrapperScript(wrapperScriptOutStream, launchDst.toUri()
-          .getPath().toString(), pidFile.toString());
+      sb.writeLocalWrapperScript(launchDst, pidFile);
     } else {
       LOG.info("Container " + containerIdStr
           + " was marked as inactive. Returning terminated error");
@@ -166,12 +163,13 @@ public class DefaultContainerExecutor extends ContainerExecutor {
     try {
       lfs.setPermission(launchDst,
           ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
-      lfs.setPermission(wrapperScriptDst,
+      lfs.setPermission(sb.getWrapperScriptPath(),
           ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
 
       // Setup command to run
-      String[] command = {"bash",
-          wrapperScriptDst.toUri().getPath().toString()};
+      String[] command = Shell.getRunCommand(
+        sb.getWrapperScriptPath().toUri().getPath().toString(), containerIdStr);
+
       LOG.info("launchContainer: " + Arrays.toString(command));
       shExec = new ShellCommandExecutor(
           command,
@@ -202,49 +200,102 @@ public class DefaultContainerExecutor extends ContainerExecutor {
     return 0;
   }
 
-  private void writeLocalWrapperScript(DataOutputStream out,
-      String launchScriptDst, String pidFilePath) throws IOException {
-    // We need to do a move as writing to a file is not atomic
-    // Process reading a file being written to may get garbled data
-    // hence write pid to tmp file first followed by a mv
-    StringBuilder sb = new StringBuilder("#!/bin/bash\n\n");
-    sb.append("echo $$ > " + pidFilePath + ".tmp\n");
-    sb.append("/bin/mv -f " + pidFilePath + ".tmp " + pidFilePath + "\n");
-    sb.append(ContainerExecutor.isSetsidAvailable? "exec setsid" : "exec");
-    sb.append(" /bin/bash ");
-    sb.append("\"");
-    sb.append(launchScriptDst);
-    sb.append("\"\n");
-    PrintStream pout = null;
-    try {
-      pout = new PrintStream(out);
-      pout.append(sb);
-    } finally {
-      if (out != null) {
-        out.close();
+  private abstract class LocalWrapperScriptBuilder {
+
+    private final Path wrapperScriptPath;
+
+    public Path getWrapperScriptPath() {
+      return wrapperScriptPath;
+    }
+
+    public void writeLocalWrapperScript(Path launchDst, Path pidFile) throws IOException {
+      DataOutputStream out = null;
+      PrintStream pout = null;
+
+      try {
+        out = lfs.create(wrapperScriptPath, EnumSet.of(CREATE, OVERWRITE));
+        pout = new PrintStream(out);
+        writeLocalWrapperScript(launchDst, pidFile, pout);
+      } finally {
+        IOUtils.cleanup(LOG, pout, out);
       }
     }
+
+    protected abstract void writeLocalWrapperScript(Path launchDst, Path pidFile,
+        PrintStream pout);
+
+    protected LocalWrapperScriptBuilder(Path wrapperScriptPath) {
+      this.wrapperScriptPath = wrapperScriptPath;
+    }
+  }
+
+  private final class UnixLocalWrapperScriptBuilder
+      extends LocalWrapperScriptBuilder {
+
+    public UnixLocalWrapperScriptBuilder(Path containerWorkDir) {
+      super(new Path(containerWorkDir, "default_container_executor.sh"));
+    }
+
+    @Override
+    public void writeLocalWrapperScript(Path launchDst, Path pidFile,
+        PrintStream pout) {
+
+      // We need to do a move as writing to a file is not atomic
+      // Process reading a file being written to may get garbled data
+      // hence write pid to tmp file first followed by a mv
+      pout.println("#!/bin/bash");
+      pout.println();
+      pout.println("echo $$ > " + pidFile.toString() + ".tmp");
+      pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile);
+      String exec = Shell.isSetsidAvailable? "exec setsid" : "exec";
+      pout.println(exec + " /bin/bash -c \"" +
+        launchDst.toUri().getPath().toString() + "\"");
+    }
+  }
+
+  private final class WindowsLocalWrapperScriptBuilder
+      extends LocalWrapperScriptBuilder {
+
+    private final String containerIdStr;
+
+    public WindowsLocalWrapperScriptBuilder(String containerIdStr,
+        Path containerWorkDir) {
+
+      super(new Path(containerWorkDir, "default_container_executor.cmd"));
+      this.containerIdStr = containerIdStr;
+    }
+
+    @Override
+    public void writeLocalWrapperScript(Path launchDst, Path pidFile,
+        PrintStream pout) {
+
+      // On Windows, the pid is the container ID, so that it can also serve as
+      // the name of the job object created by winutils for task management.
+      // Write to temp file followed by atomic move.
+      String normalizedPidFile = new File(pidFile.toString()).getPath();
+      pout.println("@echo " + containerIdStr + " > " + normalizedPidFile +
+        ".tmp");
+      pout.println("@move /Y " + normalizedPidFile + ".tmp " +
+        normalizedPidFile);
+      pout.println("@call " + launchDst.toUri().getPath().toString());
+    }
   }
 
   @Override
   public boolean signalContainer(String user, String pid, Signal signal)
       throws IOException {
-    final String sigpid = ContainerExecutor.isSetsidAvailable
+    final String sigpid = Shell.isSetsidAvailable
         ? "-" + pid
         : pid;
     LOG.debug("Sending signal " + signal.getValue() + " to pid " + sigpid
         + " as user " + user);
-    try {
-      sendSignal(sigpid, Signal.NULL);
-    } catch (ExitCodeException e) {
+    if (!containerIsAlive(sigpid)) {
       return false;
     }
     try {
-      sendSignal(sigpid, signal);
+      killContainer(sigpid, signal);
     } catch (IOException e) {
-      try {
-        sendSignal(sigpid, Signal.NULL);
-      } catch (IOException ignore) {
+      if (!containerIsAlive(sigpid)) {
         return false;
       }
       throw e;
@@ -252,6 +303,25 @@ public class DefaultContainerExecutor extends ContainerExecutor {
     return true;
   }
 
+  /**
+   * Returns true if the process with the specified pid is alive.
+   * 
+   * @param pid String pid
+   * @return boolean true if the process is alive
+   */
+  private boolean containerIsAlive(String pid) throws IOException {
+    try {
+      new ShellCommandExecutor(Shell.getCheckProcessIsAliveCommand(pid))
+        .execute();
+      // successful execution means process is alive
+      return true;
+    }
+    catch (ExitCodeException e) {
+      // failure (non-zero exit code) means process is not alive
+      return false;
+    }
+  }
+
   /**
    * Send a specified signal to the specified pid
    *
@@ -259,11 +329,9 @@ public class DefaultContainerExecutor extends ContainerExecutor {
    * @param signal signal to send
    * (for logging).
    */
-  protected void sendSignal(String pid, Signal signal) throws IOException {
-    ShellCommandExecutor shexec = null;
-    String[] arg = { "kill", "-" + signal.getValue(), pid };
-    shexec = new ShellCommandExecutor(arg);
-    shexec.execute();
+  private void killContainer(String pid, Signal signal) throws IOException {
+    new ShellCommandExecutor(Shell.getSignalKillCommand(signal.getValue(), pid))
+      .execute();
   }
 
   @Override

+ 91 - 46
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java

@@ -23,6 +23,7 @@ import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.File;
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.util.ArrayList;
@@ -69,7 +70,8 @@ public class ContainerLaunch implements Callable<Integer> {
 
   private static final Log LOG = LogFactory.getLog(ContainerLaunch.class);
 
-  public static final String CONTAINER_SCRIPT = "launch_container.sh";
+  public static final String CONTAINER_SCRIPT = Shell.WINDOWS ?
+    "launch_container.cmd" : "launch_container.sh";
   public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens";
 
   private static final String PID_FILE_NAME_FMT = "%s.pid";
@@ -411,28 +413,17 @@ public class ContainerLaunch implements Callable<Integer> {
         + appIdStr;
   }
 
-  private static class ShellScriptBuilder {
-    
-    private final StringBuilder sb;
-  
-    public ShellScriptBuilder() {
-      this(new StringBuilder("#!/bin/bash\n\n"));
-    }
-  
-    protected ShellScriptBuilder(StringBuilder sb) {
-      this.sb = sb;
-    }
-  
-    public ShellScriptBuilder env(String key, String value) {
-      line("export ", key, "=\"", value, "\"");
-      return this;
-    }
-  
-    public ShellScriptBuilder symlink(Path src, String dst) throws IOException {
-      return symlink(src, new Path(dst));
-    }
-  
-    public ShellScriptBuilder symlink(Path src, Path dst) throws IOException {
+  private static abstract class ShellScriptBuilder {
+
+    private static final String LINE_SEPARATOR =
+        System.getProperty("line.separator");
+    private final StringBuilder sb = new StringBuilder();
+
+    public abstract void command(List<String> command);
+
+    public abstract void env(String key, String value);
+
+    public final void symlink(Path src, Path dst) throws IOException {
       if (!src.isAbsolute()) {
         throw new IOException("Source must be absolute");
       }
@@ -440,28 +431,89 @@ public class ContainerLaunch implements Callable<Integer> {
         throw new IOException("Destination must be relative");
       }
       if (dst.toUri().getPath().indexOf('/') != -1) {
-        line("mkdir -p ", dst.getParent().toString());
+        mkdir(dst.getParent());
       }
-      line("ln -sf \"", src.toUri().getPath(), "\" \"", dst.toString(), "\"");
-      return this;
+      link(src, dst);
     }
-  
-    public void write(PrintStream out) throws IOException {
+
+    @Override
+    public String toString() {
+      return sb.toString();
+    }
+
+    public final void write(PrintStream out) throws IOException {
       out.append(sb);
     }
-  
-    public void line(String... command) {
+
+    protected final void line(String... command) {
       for (String s : command) {
         sb.append(s);
       }
-      sb.append("\n");
+      sb.append(LINE_SEPARATOR);
     }
-  
+
+    protected abstract void link(Path src, Path dst) throws IOException;
+
+    protected abstract void mkdir(Path path);
+  }
+
+  private static final class UnixShellScriptBuilder extends ShellScriptBuilder {
+
+    public UnixShellScriptBuilder(){
+      line("#!/bin/bash");
+      line();
+    }
+
     @Override
-    public String toString() {
-      return sb.toString();
+    public void command(List<String> command) {
+      line("exec /bin/bash -c \"", StringUtils.join(" ", command), "\"");
+    }
+
+    @Override
+    public void env(String key, String value) {
+      line("export ", key, "=\"", value, "\"");
+    }
+
+    @Override
+    protected void link(Path src, Path dst) throws IOException {
+      line("ln -sf \"", src.toUri().getPath(), "\" \"", dst.toString(), "\"");
     }
 
+    @Override
+    protected void mkdir(Path path) {
+      line("mkdir -p ", path.toString());
+    }
+  }
+
+  private static final class WindowsShellScriptBuilder
+      extends ShellScriptBuilder {
+
+    public WindowsShellScriptBuilder() {
+      line("@setlocal");
+      line();
+    }
+
+    @Override
+    public void command(List<String> command) {
+      line("@call ", StringUtils.join(" ", command));
+    }
+
+    @Override
+    public void env(String key, String value) {
+      line("@set ", key, "=", value);
+    }
+
+    @Override
+    protected void link(Path src, Path dst) throws IOException {
+      line(String.format("@%s symlink \"%s\" \"%s\"", Shell.WINUTILS,
+        new File(dst.toString()).getPath(),
+        new File(src.toUri().getPath()).getPath()));
+    }
+
+    @Override
+    protected void mkdir(Path path) {
+      line("@if not exist ", path.toString(), " mkdir ", path.toString());
+    }
   }
 
   private static void putEnvIfNotNull(
@@ -537,7 +589,8 @@ public class ContainerLaunch implements Callable<Integer> {
       Map<String,String> environment, Map<Path,List<String>> resources,
       List<String> command)
       throws IOException {
-    ShellScriptBuilder sb = new ShellScriptBuilder();
+    ShellScriptBuilder sb = Shell.WINDOWS ? new WindowsShellScriptBuilder() :
+      new UnixShellScriptBuilder();
     if (environment != null) {
       for (Map.Entry<String,String> env : environment.entrySet()) {
         sb.env(env.getKey().toString(), env.getValue().toString());
@@ -546,21 +599,13 @@ public class ContainerLaunch implements Callable<Integer> {
     if (resources != null) {
       for (Map.Entry<Path,List<String>> entry : resources.entrySet()) {
         for (String linkName : entry.getValue()) {
-          sb.symlink(entry.getKey(), linkName);
+          sb.symlink(entry.getKey(), new Path(linkName));
         }
       }
     }
 
-    ArrayList<String> cmd = new ArrayList<String>(2 * command.size() + 5);
-    cmd.add("exec /bin/bash ");
-    cmd.add("-c ");
-    cmd.add("\"");
-    for (String cs : command) {
-      cmd.add(cs.toString());
-      cmd.add(" ");
-    }
-    cmd.add("\"");
-    sb.line(cmd.toArray(new String[cmd.size()]));
+    sb.command(command);
+
     PrintStream pout = null;
     try {
       pout = new PrintStream(out);

+ 21 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/ProcessIdFileReader.java

@@ -25,6 +25,8 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 
 /**
  * Helper functionality to read the pid from a file.
@@ -62,14 +64,28 @@ public class ProcessIdFileReader {
           }
           String temp = line.trim(); 
           if (!temp.isEmpty()) {
-            try {
-              Long pid = Long.valueOf(temp);
-              if (pid > 0) {
+            if (Shell.WINDOWS) {
+              // On Windows, pid is expected to be a container ID, so find first
+              // line that parses successfully as a container ID.
+              try {
+                ConverterUtils.toContainerId(temp);
                 processId = temp;
                 break;
+              } catch (Exception e) {
+                // do nothing
+              }
+            }
+            else {
+              // Otherwise, find first line containing a numeric pid.
+              try {
+                Long pid = Long.valueOf(temp);
+                if (pid > 0) {
+                  processId = temp;
+                  break;
+                }
+              } catch (Exception e) {
+                // do nothing
               }
-            } catch (Exception e) {
-              // do nothing
             }
           }
         }

+ 12 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestProcessIdFileReader.java

@@ -26,6 +26,7 @@ import java.io.PrintWriter;
 import junit.framework.Assert;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
 import org.junit.Test;
 
@@ -49,17 +50,20 @@ public class TestProcessIdFileReader {
     String rootDir = new File(System.getProperty(
         "test.build.data", "/tmp")).getAbsolutePath();
     File testFile = null;
+    String expectedProcessId = Shell.WINDOWS ?
+      "container_1353742680940_0002_01_000001" :
+      "56789";
     
     try {
       testFile = new File(rootDir, "temp.txt");
       PrintWriter fileWriter = new PrintWriter(testFile);
-      fileWriter.println("56789");
+      fileWriter.println(expectedProcessId);
       fileWriter.close();      
       String processId = null; 
                   
       processId = ProcessIdFileReader.getProcessId(
           new Path(rootDir + Path.SEPARATOR + "temp.txt"));
-      Assert.assertEquals("56789", processId);      
+      Assert.assertEquals(expectedProcessId, processId);      
       
     } finally {
       if (testFile != null
@@ -75,7 +79,10 @@ public class TestProcessIdFileReader {
     String rootDir = new File(System.getProperty(
         "test.build.data", "/tmp")).getAbsolutePath();
     File testFile = null;
-    
+    String processIdInFile = Shell.WINDOWS ?
+      " container_1353742680940_0002_01_000001 " :
+      " 23 ";
+    String expectedProcessId = processIdInFile.trim();
     try {
       testFile = new File(rootDir, "temp.txt");
       PrintWriter fileWriter = new PrintWriter(testFile);
@@ -84,14 +91,14 @@ public class TestProcessIdFileReader {
       fileWriter.println("abc");
       fileWriter.println("-123");
       fileWriter.println("-123 ");
-      fileWriter.println(" 23 ");
+      fileWriter.println(processIdInFile);
       fileWriter.println("6236");
       fileWriter.close();      
       String processId = null; 
                   
       processId = ProcessIdFileReader.getProcessId(
           new Path(rootDir + Path.SEPARATOR + "temp.txt"));
-      Assert.assertEquals("23", processId);      
+      Assert.assertEquals(expectedProcessId, processId);
       
     } finally {
       if (testFile != null