Browse Source

HADOOP-6433. Introduce asychronous deletion of files via a pool of
threads. This can be used to delete files in the Distributed Cache.
(Zheng Shao via dhruba)



git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@890502 13f79535-47bb-0310-9956-ffa450edef68

Dhruba Borthakur 15 years ago
parent
commit
c4e1126980

+ 4 - 0
CHANGES.txt

@@ -22,6 +22,10 @@ Trunk (unreleased changes)
     HADOOP-6323. Add comparators to the serialization API.
     (Aaron Kimball via cutting)
 
+    HADOOP-6433. Introduce asychronous deletion of files via a pool of
+    threads. This can be used to delete files in the Distributed
+    Cache. (Zheng Shao via dhruba)
+
   IMPROVEMENTS
 
     HADOOP-6283. Improve the exception messages thrown by

+ 155 - 0
src/java/org/apache/hadoop/util/AsyncDiskService.java

@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/*
+ * This class is a container of multiple thread pools, each for a volume,
+ * so that we can schedule async disk operations easily.
+ * 
+ * Examples of async disk operations are deletion of files.
+ * We can move the files to a "TO_BE_DELETED" folder before asychronously
+ * deleting it, to make sure the caller can run it faster.
+ */
+public class AsyncDiskService {
+  
+  public static final Log LOG = LogFactory.getLog(AsyncDiskService.class);
+  
+  // ThreadPool core pool size
+  private static final int CORE_THREADS_PER_VOLUME = 1;
+  // ThreadPool maximum pool size
+  private static final int MAXIMUM_THREADS_PER_VOLUME = 4;
+  // ThreadPool keep-alive time for threads over core pool size
+  private static final long THREADS_KEEP_ALIVE_SECONDS = 60; 
+  
+  private final ThreadGroup threadGroup = new ThreadGroup("async disk service");
+  
+  private ThreadFactory threadFactory;
+  
+  private HashMap<String, ThreadPoolExecutor> executors
+      = new HashMap<String, ThreadPoolExecutor>();
+  
+  /**
+   * Create a AsyncDiskServices with a set of volumes (specified by their
+   * root directories).
+   * 
+   * The AsyncDiskServices uses one ThreadPool per volume to do the async
+   * disk operations.
+   * 
+   * @param volumes The roots of the file system volumes.
+   */
+  public AsyncDiskService(String[] volumes) throws IOException {
+    
+    threadFactory = new ThreadFactory() {
+      public Thread newThread(Runnable r) {
+        return new Thread(threadGroup, r);
+      }
+    };
+    
+    // Create one ThreadPool per volume
+    for (int v = 0 ; v < volumes.length; v++) {
+      ThreadPoolExecutor executor = new ThreadPoolExecutor(
+          CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME, 
+          THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, 
+          new LinkedBlockingQueue<Runnable>(), threadFactory);
+
+      // This can reduce the number of running threads
+      executor.allowCoreThreadTimeOut(true);
+      executors.put(volumes[v], executor);
+    }
+    
+  }
+  
+  /**
+   * Execute the task sometime in the future, using ThreadPools.
+   */
+  public synchronized void execute(String root, Runnable task) {
+    ThreadPoolExecutor executor = executors.get(root);
+    if (executor == null) {
+      throw new RuntimeException("Cannot find root " + root
+          + " for execution of task " + task);
+    } else {
+      executor.execute(task);
+    }
+  }
+  
+  /**
+   * Gracefully start the shut down of all ThreadPools.
+   */
+  public synchronized void shutdown() {
+    
+    LOG.info("Shutting down all AsyncDiskService threads...");
+    
+    for (Map.Entry<String, ThreadPoolExecutor> e
+        : executors.entrySet()) {
+      e.getValue().shutdown();
+    }
+  }
+
+  /**
+   * Wait for the termination of the thread pools.
+   * 
+   * @param milliseconds  The number of milliseconds to wait
+   * @return   true if all thread pools are terminated without time limit
+   * @throws InterruptedException 
+   */
+  public synchronized boolean awaitTermination(long milliseconds) 
+      throws InterruptedException {
+
+    long end = System.currentTimeMillis() + milliseconds;
+    for (Map.Entry<String, ThreadPoolExecutor> e:
+        executors.entrySet()) {
+      ThreadPoolExecutor executor = e.getValue();
+      if (!executor.awaitTermination(
+          Math.max(end - System.currentTimeMillis(), 0),
+          TimeUnit.MILLISECONDS)) {
+        LOG.warn("AsyncDiskService awaitTermination timeout.");
+        return false;
+      }
+    }
+    LOG.info("All AsyncDiskService threads are terminated.");
+    return true;
+  }
+  
+  /**
+   * Shut down all ThreadPools immediately.
+   */
+  public synchronized List<Runnable> shutdownNow() {
+    
+    LOG.info("Shutting down all AsyncDiskService threads immediately...");
+    
+    List<Runnable> list = new ArrayList<Runnable>();
+    for (Map.Entry<String, ThreadPoolExecutor> e
+        : executors.entrySet()) {
+      list.addAll(e.getValue().shutdownNow());
+    }
+    return list;
+  }
+  
+}

+ 84 - 0
src/test/core/org/apache/hadoop/util/TestAsyncDiskService.java

@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.AsyncDiskService;
+import org.junit.Test;
+
+/**
+ * A test for AsyncDiskService.
+ */
+public class TestAsyncDiskService extends TestCase {
+  
+  public static final Log LOG = LogFactory.getLog(TestAsyncDiskService.class);
+  
+  // Access by multiple threads from the ThreadPools in AsyncDiskService.
+  volatile int count;
+  
+  /** An example task for incrementing a counter.  
+   */
+  class ExampleTask implements Runnable {
+
+    ExampleTask() {
+    }
+    
+    @Override
+    public void run() {
+      synchronized (TestAsyncDiskService.this) {
+        count ++;
+      }
+    }
+  };
+  
+  
+  /**
+   * This test creates some ExampleTasks and runs them. 
+   */
+  @Test
+  public void testAsyncDiskService() throws Throwable {
+  
+    String[] vols = new String[]{"/0", "/1"};
+    AsyncDiskService service = new AsyncDiskService(vols);
+    
+    int total = 100;
+    
+    for (int i = 0; i < total; i++) {
+      service.execute(vols[i%2], new ExampleTask());
+    }
+
+    Exception e = null;
+    try {
+      service.execute("no_such_volume", new ExampleTask());
+    } catch (RuntimeException ex) {
+      e = ex;
+    }
+    assertNotNull("Executing a task on a non-existing volume should throw an "
+        + "Exception.", e);
+    
+    service.shutdown();
+    if (!service.awaitTermination(5000)) {
+      fail("AsyncDiskService didn't shutdown in 5 seconds.");
+    }
+    
+    assertEquals(total, count);
+  }
+}