Browse Source

HADOOP-6106. Provides an option in ShellCommandExecutor to timeout commands that do not complete within a certain amount of time. Contributed by Sreekanth Ramakrishnan.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@788600 13f79535-47bb-0310-9956-ffa450edef68
Hemanth Yamijala 16 years ago
parent
commit
b089f4448d

+ 4 - 0
CHANGES.txt

@@ -461,6 +461,10 @@ Trunk (unreleased changes)
     HADOOP-5952. Change "-1 tests included" wording in test-patch.sh.
     (Gary Murry via szetszwo)
 
+    HADOOP-6106. Provides an option in ShellCommandExecutor to timeout 
+    commands that do not complete within a certain amount of time.
+    (Sreekanth Ramakrishnan via yhemanth)
+
   OPTIMIZATIONS
 
     HADOOP-5595. NameNode does not need to run a replicator to choose a

+ 129 - 18
src/java/org/apache/hadoop/util/Shell.java

@@ -22,6 +22,9 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -55,6 +58,11 @@ abstract public class Shell {
     return new String[] {(WINDOWS ? "ls" : "/bin/ls"), "-ld"};
   }
 
+  /**Time after which the executing script would be timedout*/
+  protected long timeOutInterval = 0L;
+  /** If or not script timed out*/
+  private AtomicBoolean timedOut;
+
   /** 
    * Get the Unix command for setting the maximum virtual memory available
    * to a given child process. This is only relevant when we are forking a
@@ -96,6 +104,9 @@ abstract public class Shell {
   private File dir;
   private Process process; // sub process used to execute the command
   private int exitCode;
+
+  /**If or not script finished executing*/
+  private volatile AtomicBoolean completed;
   
   public Shell() {
     this(0L);
@@ -135,7 +146,10 @@ abstract public class Shell {
   /** Run a command */
   private void runCommand() throws IOException { 
     ProcessBuilder builder = new ProcessBuilder(getExecString());
-    boolean completed = false;
+    Timer timeOutTimer = null;
+    ShellTimeoutTimerTask timeoutTimerTask = null;
+    timedOut = new AtomicBoolean(false);
+    completed = new AtomicBoolean(false);
     
     if (environment != null) {
       builder.environment().putAll(this.environment);
@@ -145,6 +159,13 @@ abstract public class Shell {
     }
     
     process = builder.start();
+    if (timeOutInterval > 0) {
+      timeOutTimer = new Timer();
+      timeoutTimerTask = new ShellTimeoutTimerTask(
+          this);
+      //One time scheduling.
+      timeOutTimer.schedule(timeoutTimerTask, timeOutInterval);
+    }
     final BufferedReader errReader = 
             new BufferedReader(new InputStreamReader(process
                                                      .getErrorStream()));
@@ -181,27 +202,32 @@ abstract public class Shell {
         line = inReader.readLine();
       }
       // wait for the process to finish and check the exit code
-      exitCode = process.waitFor();
+      exitCode  = process.waitFor();
       try {
         // make sure that the error thread exits
         errThread.join();
       } catch (InterruptedException ie) {
         LOG.warn("Interrupted while reading the error stream", ie);
       }
-      completed = true;
+      completed.set(true);
+      //the timeout thread handling
+      //taken care in finally block
       if (exitCode != 0) {
         throw new ExitCodeException(exitCode, errMsg.toString());
       }
     } catch (InterruptedException ie) {
       throw new IOException(ie.toString());
     } finally {
+      if ((timeOutTimer!=null) && !timedOut.get()) {
+        timeOutTimer.cancel();
+      }
       // close the input stream
       try {
         inReader.close();
       } catch (IOException ioe) {
         LOG.warn("Error while closing the input stream", ioe);
       }
-      if (!completed) {
+      if (!completed.get()) {
         errThread.interrupt();
       }
       try {
@@ -264,21 +290,47 @@ abstract public class Shell {
     private String[] command;
     private StringBuffer output;
     
+    
     public ShellCommandExecutor(String[] execString) {
-      command = execString.clone();
+      this(execString, null);
     }
-
+    
     public ShellCommandExecutor(String[] execString, File dir) {
-      this(execString);
-      this.setWorkingDirectory(dir);
+      this(execString, dir, null);
     }
-
+   
     public ShellCommandExecutor(String[] execString, File dir, 
                                  Map<String, String> env) {
-      this(execString, dir);
-      this.setEnvironment(env);
+      this(execString, dir, env , 0L);
     }
-    
+
+    /**
+     * Create a new instance of the ShellCommandExecutor to execute a command.
+     * 
+     * @param execString The command to execute with arguments
+     * @param dir If not-null, specifies the directory which should be set
+     *            as the current working directory for the command.
+     *            If null, the current working directory is not modified.
+     * @param env If not-null, environment of the command will include the
+     *            key-value pairs specified in the map. If null, the current
+     *            environment is not modified.
+     * @param timeout Specifies the time in milliseconds, after which the
+     *                command will be killed and the status marked as timedout.
+     *                If 0, the command will not be timed out. 
+     */
+    public ShellCommandExecutor(String[] execString, File dir, 
+        Map<String, String> env, long timeout) {
+      command = execString.clone();
+      if (dir != null) {
+        setWorkingDirectory(dir);
+      }
+      if (env != null) {
+        setEnvironment(env);
+      }
+      timeOutInterval = timeout;
+    }
+        
+
     /** Execute the shell command. */
     public void execute() throws IOException {
       this.run();    
@@ -324,6 +376,24 @@ abstract public class Shell {
     }
   }
   
+  /**
+   * To check if the passed script to shell command executor timed out or
+   * not.
+   * 
+   * @return if the script timed out.
+   */
+  public boolean isTimedOut() {
+    return timedOut.get();
+  }
+  
+  /**
+   * Set if the command has timed out.
+   * 
+   */
+  private void setTimedOut() {
+    this.timedOut.set(true);
+  }
+  
   /** 
    * Static method to execute a shell command. 
    * Covers most of the simple cases without requiring the user to implement  
@@ -332,9 +402,27 @@ abstract public class Shell {
    * @return the output of the executed command.
    */
   public static String execCommand(String ... cmd) throws IOException {
-    return execCommand(null, cmd);
+    return execCommand(null, cmd, 0L);
   }
   
+  /** 
+   * Static method to execute a shell command. 
+   * Covers most of the simple cases without requiring the user to implement  
+   * the <code>Shell</code> interface.
+   * @param env the map of environment key=value
+   * @param cmd shell command to execute.
+   * @param timeout time in milliseconds after which script should be marked timeout
+   * @return the output of the executed command.o
+   */
+  
+  public static String execCommand(Map<String, String> env, String[] cmd,
+      long timeout) throws IOException {
+    ShellCommandExecutor exec = new ShellCommandExecutor(cmd, null, env, 
+                                                          timeout);
+    exec.execute();
+    return exec.getOutput();
+  }
+
   /** 
    * Static method to execute a shell command. 
    * Covers most of the simple cases without requiring the user to implement  
@@ -345,11 +433,34 @@ abstract public class Shell {
    */
   public static String execCommand(Map<String,String> env, String ... cmd) 
   throws IOException {
-    ShellCommandExecutor exec = new ShellCommandExecutor(cmd);
-    if (env != null) {
-      exec.setEnvironment(env);
+    return execCommand(env, cmd, 0L);
+  }
+  
+  /**
+   * Timer which is used to timeout scripts spawned off by shell.
+   */
+  private static class ShellTimeoutTimerTask extends TimerTask {
+
+    private Shell shell;
+
+    public ShellTimeoutTimerTask(Shell shell) {
+      this.shell = shell;
+    }
+
+    @Override
+    public void run() {
+      Process p = shell.getProcess();
+      try {
+        p.exitValue();
+      } catch (Exception e) {
+        //Process has not terminated.
+        //So check if it has completed 
+        //if not just destroy it.
+        if (p != null && !shell.completed.get()) {
+          shell.setTimedOut();
+          p.destroy();
+        }
+      }
     }
-    exec.execute();
-    return exec.getOutput();
   }
 }

+ 24 - 0
src/test/core/org/apache/hadoop/util/TestShell.java

@@ -20,7 +20,10 @@ package org.apache.hadoop.util;
 import junit.framework.TestCase;
 
 import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.PrintWriter;
 
 public class TestShell extends TestCase {
 
@@ -71,6 +74,27 @@ public class TestShell extends TestCase {
     assertInString(command, " .. ");
     assertInString(command, "\"arg 2\"");
   }
+  
+  public void testShellCommandTimeout() throws Throwable {
+    String rootDir = new File(System.getProperty(
+        "test.build.data", "/tmp")).getAbsolutePath();
+    File shellFile = new File(rootDir, "timeout.sh");
+    String timeoutCommand = "sleep 4; echo \"hello\"";
+    PrintWriter writer = new PrintWriter(new FileOutputStream(shellFile));
+    writer.println(timeoutCommand);
+    writer.close();
+    shellFile.setExecutable(true);
+    Shell.ShellCommandExecutor shexc 
+    = new Shell.ShellCommandExecutor(new String[]{shellFile.getAbsolutePath()},
+                                      null, null, 100);
+    try {
+      shexc.execute();
+    } catch (Exception e) {
+      //When timing out exception is thrown.
+    }
+    shellFile.delete();
+    assertTrue("Script didnt not timeout" , shexc.isTimedOut());
+  }
 
   private void testInterval(long interval) throws IOException {
     Command command = new Command(interval);