Browse Source

HADOOP-8325. Add a ShutdownHookManager to be used by different components instead of the JVM shutdownhook (tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1332345 13f79535-47bb-0310-9956-ffa450edef68
Alejandro Abdelnur 13 năm trước cách đây
mục cha
commit
8a0c7323ce

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -384,6 +384,9 @@ Release 2.0.0 - UNRELEASED
 
     HADOOP-8321. TestUrlStreamHandler fails. (tucu)
 
+    HADOOP-8325. Add a ShutdownHookManager to be used by different
+    components instead of the JVM shutdownhook (tucu)
+
   BREAKDOWN OF HADOOP-7454 SUBTASKS
 
     HADOOP-7455. HA: Introduce HA Service Protocol Interface. (suresh)

+ 10 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java

@@ -54,6 +54,7 @@ import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.ShutdownHookManager;
 
 /**
  * The FileContext class provides an interface to the application writer for
@@ -171,7 +172,12 @@ public final class FileContext {
   
   public static final Log LOG = LogFactory.getLog(FileContext.class);
   public static final FsPermission DEFAULT_PERM = FsPermission.getDefault();
-  
+
+  /**
+   * Priority of the FileContext shutdown hook.
+   */
+  public static final int SHUTDOWN_HOOK_PRIORITY = 20;
+
   /**
    * List of files that should be deleted on JVM shutdown.
    */
@@ -1456,8 +1462,8 @@ public final class FileContext {
       return false;
     }
     synchronized (DELETE_ON_EXIT) {
-      if (DELETE_ON_EXIT.isEmpty() && !FINALIZER.isAlive()) {
-        Runtime.getRuntime().addShutdownHook(FINALIZER);
+      if (DELETE_ON_EXIT.isEmpty()) {
+        ShutdownHookManager.get().addShutdownHook(FINALIZER, SHUTDOWN_HOOK_PRIORITY);
       }
       
       Set<Path> set = DELETE_ON_EXIT.get(this);
@@ -2215,7 +2221,7 @@ public final class FileContext {
   /**
    * Deletes all the paths in deleteOnExit on JVM shutdown.
    */
-  static class FileContextFinalizer extends Thread {
+  static class FileContextFinalizer implements Runnable {
     public synchronized void run() {
       processDeleteOnExit();
     }

+ 11 - 8
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

@@ -55,6 +55,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.ShutdownHookManager;
 
 /****************************************************************
  * An abstract base class for a fairly generic filesystem.  It
@@ -84,6 +85,11 @@ public abstract class FileSystem extends Configured implements Closeable {
 
   public static final Log LOG = LogFactory.getLog(FileSystem.class);
 
+  /**
+   * Priority of the FileSystem shutdown hook.
+   */
+  public static final int SHUTDOWN_HOOK_PRIORITY = 10;
+
   /** FileSystem cache */
   static final Cache CACHE = new Cache();
 
@@ -2176,8 +2182,8 @@ public abstract class FileSystem extends Configured implements Closeable {
         }
         
         // now insert the new file system into the map
-        if (map.isEmpty() && !clientFinalizer.isAlive()) {
-          Runtime.getRuntime().addShutdownHook(clientFinalizer);
+        if (map.isEmpty() ) {
+          ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
         }
         fs.key = key;
         map.put(key, fs);
@@ -2192,11 +2198,8 @@ public abstract class FileSystem extends Configured implements Closeable {
       if (map.containsKey(key) && fs == map.get(key)) {
         map.remove(key);
         toAutoClose.remove(key);
-        if (map.isEmpty() && !clientFinalizer.isAlive()) {
-          if (!Runtime.getRuntime().removeShutdownHook(clientFinalizer)) {
-            LOG.info("Could not cancel cleanup thread, though no " +
-                     "FileSystems are open");
-          }
+        if (map.isEmpty()) {
+          ShutdownHookManager.get().removeShutdownHook(clientFinalizer);
         }
       }
     }
@@ -2242,7 +2245,7 @@ public abstract class FileSystem extends Configured implements Closeable {
       }
     }
 
-    private class ClientFinalizer extends Thread {
+    private class ClientFinalizer implements Runnable {
       public synchronized void run() {
         try {
           closeAll(true);

+ 10 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RunJar.java

@@ -50,6 +50,11 @@ public class RunJar {
   /** Pattern that matches any string */
   public static final Pattern MATCH_ANY = Pattern.compile(".*");
 
+  /**
+   * Priority of the RunJar shutdown hook.
+   */
+  public static final int SHUTDOWN_HOOK_PRIORITY = 10;
+
   /**
    * Unpack a jar file into a directory.
    *
@@ -167,11 +172,14 @@ public class RunJar {
     }
     ensureDirectory(workDir);
 
-    Runtime.getRuntime().addShutdownHook(new Thread() {
+    ShutdownHookManager.get().addShutdownHook(
+      new Runnable() {
+        @Override
         public void run() {
           FileUtil.fullyDelete(workDir);
         }
-      });
+      }, SHUTDOWN_HOOK_PRIORITY);
+
 
     unJar(file, workDir);
 

+ 181 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ShutdownHookManager.java

@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The <code>ShutdownHookManager</code> enables running shutdownHook
+ * in a determistic order, higher priority first.
+ * <p/>
+ * The JVM runs ShutdownHooks in a non-deterministic order or in parallel.
+ * This class registers a single JVM shutdownHook and run all the
+ * shutdownHooks registered to it (to this class) in order based on their
+ * priority.
+ */
+public class ShutdownHookManager {
+
+  private static final ShutdownHookManager MGR = new ShutdownHookManager();
+
+  private static final Log LOG = LogFactory.getLog(ShutdownHookManager.class);
+
+  static {
+    Runtime.getRuntime().addShutdownHook(
+      new Thread() {
+        @Override
+        public void run() {
+          MGR.shutdownInProgress.set(true);
+          for (Runnable hook: MGR.getShutdownHooksInOrder()) {
+            try {
+              hook.run();
+            } catch (Throwable ex) {
+              LOG.warn("ShutdownHook '" + hook.getClass().getSimpleName() +
+                       "' failed, " + ex.toString(), ex);
+            }
+          }
+        }
+      }
+    );
+  }
+
+  /**
+   * Return <code>ShutdownHookManager</code> singleton.
+   *
+   * @return <code>ShutdownHookManager</code> singleton.
+   */
+  public static ShutdownHookManager get() {
+    return MGR;
+  }
+
+  /**
+   * Private structure to store ShutdownHook and its priority.
+   */
+  private static class HookEntry {
+    Runnable hook;
+    int priority;
+
+    public HookEntry(Runnable hook, int priority) {
+      this.hook = hook;
+      this.priority = priority;
+    }
+
+    @Override
+    public int hashCode() {
+      return hook.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      boolean eq = false;
+      if (obj != null) {
+        if (obj instanceof HookEntry) {
+          eq = (hook == ((HookEntry)obj).hook);
+        }
+      }
+      return eq;
+    }
+
+  }
+
+  private Set<HookEntry> hooks =
+    Collections.synchronizedSet(new HashSet<HookEntry>());
+
+  private AtomicBoolean shutdownInProgress = new AtomicBoolean(false);
+
+  //private to constructor to ensure singularity
+  private ShutdownHookManager() {
+  }
+
+  /**
+   * Returns the list of shutdownHooks in order of execution,
+   * Highest priority first.
+   *
+   * @return the list of shutdownHooks in order of execution.
+   */
+  List<Runnable> getShutdownHooksInOrder() {
+    List<HookEntry> list;
+    synchronized (MGR.hooks) {
+      list = new ArrayList<HookEntry>(MGR.hooks);
+    }
+    Collections.sort(list, new Comparator<HookEntry>() {
+
+      //reversing comparison so highest priority hooks are first
+      @Override
+      public int compare(HookEntry o1, HookEntry o2) {
+        return o2.priority - o1.priority;
+      }
+    });
+    List<Runnable> ordered = new ArrayList<Runnable>();
+    for (HookEntry entry: list) {
+      ordered.add(entry.hook);
+    }
+    return ordered;
+  }
+
+  /**
+   * Adds a shutdownHook with a priority, the higher the priority
+   * the earlier will run. ShutdownHooks with same priority run
+   * in a non-deterministic order.
+   *
+   * @param shutdownHook shutdownHook <code>Runnable</code>
+   * @param priority priority of the shutdownHook.
+   */
+  public void addShutdownHook(Runnable shutdownHook, int priority) {
+    if (shutdownHook == null) {
+      throw new IllegalArgumentException("shutdownHook cannot be NULL");
+    }
+    if (shutdownInProgress.get()) {
+      throw new IllegalStateException("Shutdown in progress, cannot add a shutdownHook");
+    }
+    hooks.add(new HookEntry(shutdownHook, priority));
+  }
+
+  /**
+   * Removes a shutdownHook.
+   *
+   * @param shutdownHook shutdownHook to remove.
+   * @return TRUE if the shutdownHook was registered and removed,
+   * FALSE otherwise.
+   */
+  public boolean removeShutdownHook(Runnable shutdownHook) {
+    if (shutdownInProgress.get()) {
+      throw new IllegalStateException("Shutdown in progress, cannot remove a shutdownHook");
+    }
+    return hooks.remove(new HookEntry(shutdownHook, 0));
+  }
+
+  /**
+   * Indicates if a shutdownHook is registered or nt.
+   *
+   * @param shutdownHook shutdownHook to check if registered.
+   * @return TRUE/FALSE depending if the shutdownHook is is registered.
+   */
+  public boolean hasShutdownHook(Runnable shutdownHook) {
+    return hooks.contains(new HookEntry(shutdownHook, 0));
+  }
+
+}

+ 14 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java

@@ -46,6 +46,11 @@ import org.apache.hadoop.net.NetUtils;
 @InterfaceStability.Unstable
 public class StringUtils {
 
+  /**
+   * Priority of the StringUtils shutdown hook.
+   */
+  public static final int SHUTDOWN_HOOK_PRIORITY = 0;
+
   private static final DecimalFormat decimalFormat;
   static {
           NumberFormat numberFormat = NumberFormat.getNumberInstance(Locale.ENGLISH);
@@ -600,12 +605,15 @@ public class StringUtils {
         )
       );
 
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      public void run() {
-        LOG.info(toStartupShutdownString("SHUTDOWN_MSG: ", new String[]{
-          "Shutting down " + classname + " at " + hostname}));
-      }
-    });
+    ShutdownHookManager.get().addShutdownHook(
+      new Runnable() {
+        @Override
+        public void run() {
+          LOG.info(toStartupShutdownString("SHUTDOWN_MSG: ", new String[]{
+            "Shutting down " + classname + " at " + hostname}));
+        }
+      }, SHUTDOWN_HOOK_PRIORITY);
+
   }
 
   /**

+ 3 - 3
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileContextDeleteOnExit.java

@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Set;
 
 import junit.framework.Assert;
+import org.apache.hadoop.util.ShutdownHookManager;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -65,7 +66,7 @@ public class TestFileContextDeleteOnExit {
     checkDeleteOnExitData(1, fc, file1);
     
     // Ensure shutdown hook is added
-    Assert.assertTrue(Runtime.getRuntime().removeShutdownHook(FileContext.FINALIZER));
+    Assert.assertTrue(ShutdownHookManager.get().hasShutdownHook(FileContext.FINALIZER));
     
     Path file2 = getTestRootPath(fc, "dir1/file2");
     createFile(fc, file2, numBlocks, blockSize);
@@ -79,8 +80,7 @@ public class TestFileContextDeleteOnExit {
     
     // trigger deleteOnExit and ensure the registered
     // paths are cleaned up
-    FileContext.FINALIZER.start();
-    FileContext.FINALIZER.join();
+    FileContext.FINALIZER.run();
     checkDeleteOnExitData(0, fc, new Path[0]);
     Assert.assertFalse(exists(fc, file1));
     Assert.assertFalse(exists(fc, file2));

+ 62 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShutdownHookManager.java

@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestShutdownHookManager {
+
+  @Test
+  public void shutdownHookManager() {
+    ShutdownHookManager mgr = ShutdownHookManager.get();
+    Assert.assertNotNull(mgr);
+    Assert.assertEquals(0, mgr.getShutdownHooksInOrder().size());
+    Runnable hook1 = new Runnable() {
+      @Override
+      public void run() {
+      }
+    };
+    Runnable hook2 = new Runnable() {
+      @Override
+      public void run() {
+      }
+    };
+
+    mgr.addShutdownHook(hook1, 0);
+    Assert.assertTrue(mgr.hasShutdownHook(hook1));
+    Assert.assertEquals(1, mgr.getShutdownHooksInOrder().size());
+    Assert.assertEquals(hook1, mgr.getShutdownHooksInOrder().get(0));
+    mgr.removeShutdownHook(hook1);
+    Assert.assertFalse(mgr.hasShutdownHook(hook1));
+
+    mgr.addShutdownHook(hook1, 0);
+    Assert.assertTrue(mgr.hasShutdownHook(hook1));
+    Assert.assertEquals(1, mgr.getShutdownHooksInOrder().size());
+    Assert.assertTrue(mgr.hasShutdownHook(hook1));
+    Assert.assertEquals(1, mgr.getShutdownHooksInOrder().size());
+
+    mgr.addShutdownHook(hook2, 1);
+    Assert.assertTrue(mgr.hasShutdownHook(hook1));
+    Assert.assertTrue(mgr.hasShutdownHook(hook2));
+    Assert.assertEquals(2, mgr.getShutdownHooksInOrder().size());
+    Assert.assertEquals(hook2, mgr.getShutdownHooksInOrder().get(0));
+    Assert.assertEquals(hook1, mgr.getShutdownHooksInOrder().get(1));
+
+  }
+}