Преглед на файлове

HADOOP-13709. Ability to clean up subprocesses spawned by Shell when the process exits. Contributed by Eric Badger

Jason Lowe преди 8 години
родител
ревизия
9947aeb60c

+ 24 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java

@@ -26,9 +26,11 @@ import java.io.InputStream;
 import java.io.InterruptedIOException;
 import java.nio.charset.Charset;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.WeakHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -48,6 +50,8 @@ import org.slf4j.LoggerFactory;
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public abstract class Shell {
+  private static final Map <Process, Object> CHILD_PROCESSES =
+      Collections.synchronizedMap(new WeakHashMap<Process, Object>());
   public static final Logger LOG = LoggerFactory.getLogger(Shell.class);
 
   /**
@@ -916,6 +920,7 @@ public abstract class Shell {
     } else {
       process = builder.start();
     }
+    CHILD_PROCESSES.put(process, null);
 
     if (timeOutInterval > 0) {
       timeOutTimer = new Timer("Shell command timeout");
@@ -1012,6 +1017,7 @@ public abstract class Shell {
         LOG.warn("Error while closing the error stream", ioe);
       }
       process.destroy();
+      CHILD_PROCESSES.remove(process);
       lastTime = Time.monotonicNow();
     }
   }
@@ -1310,4 +1316,22 @@ public abstract class Shell {
       }
     }
   }
+
+  /**
+   * Static method to destroy all running <code>Shell</code> processes
+   * Iterates through a list of all currently running <code>Shell</code>
+   * processes and destroys them one by one. This method is thread safe and
+   * is intended to be used in a shutdown hook.
+   */
+  public static void destroyAllProcesses() {
+    synchronized (CHILD_PROCESSES) {
+      for (Process key : CHILD_PROCESSES.keySet()) {
+        Process process = key;
+        if (key != null) {
+          process.destroy();
+        }
+      }
+      CHILD_PROCESSES.clear();
+    }
+  }
 }

+ 68 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.util;
 
+import com.google.common.base.Supplier;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.security.alias.AbstractJavaKeyStoreProvider;
 import org.junit.Assert;
@@ -471,4 +472,71 @@ public class TestShell extends Assert {
     assertEquals("'foo'\\''bar'", Shell.bashQuote("foo'bar"));
     assertEquals("''\\''foo'\\''bar'\\'''", Shell.bashQuote("'foo'bar'"));
   }
+
+  @Test(timeout=120000)
+  public void testShellKillAllProcesses() throws Throwable {
+    Assume.assumeFalse(WINDOWS);
+    StringBuffer sleepCommand = new StringBuffer();
+    sleepCommand.append("sleep 200");
+    String[] shellCmd = {"bash", "-c", sleepCommand.toString()};
+    final ShellCommandExecutor shexc1 = new ShellCommandExecutor(shellCmd);
+    final ShellCommandExecutor shexc2 = new ShellCommandExecutor(shellCmd);
+
+    Thread shellThread1 = new Thread() {
+      @Override
+      public void run() {
+        try {
+          shexc1.execute();
+        } catch(IOException ioe) {
+          //ignore IOException from thread interrupt
+        }
+      }
+    };
+    Thread shellThread2 = new Thread() {
+      @Override
+      public void run() {
+        try {
+          shexc2.execute();
+        } catch(IOException ioe) {
+          //ignore IOException from thread interrupt
+        }
+      }
+    };
+
+    shellThread1.start();
+    shellThread2.start();
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return shexc1.getProcess() != null;
+      }
+    }, 10, 10000);
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return shexc2.getProcess() != null;
+      }
+    }, 10, 10000);
+
+    Shell.destroyAllProcesses();
+    final Process process1 = shexc1.getProcess();
+    final Process process2 = shexc2.getProcess();
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return !process1.isAlive();
+      }
+    }, 10, 10000);
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return !process2.isAlive();
+      }
+    }, 10, 10000);
+
+    assertFalse("Process 1 was not killed within timeout", process1.isAlive());
+    assertFalse("Process 2 was not killed within timeout", process2.isAlive());
+  }
 }