瀏覽代碼

HADOOP-3487. Balancer uses thread pools for managing its threads. Contributed by Hairong Kuang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@670695 13f79535-47bb-0310-9956-ffa450edef68
Hairong Kuang 17 年之前
父節點
當前提交
6fab39c786
共有 2 個文件被更改,包括 39 次插入28 次删除
  1. 3 0
      CHANGES.txt
  2. 36 28
      src/hdfs/org/apache/hadoop/dfs/Balancer.java

+ 3 - 0
CHANGES.txt

@@ -668,6 +668,9 @@ Release 0.17.1 - Unreleased
     HADOOP-3522. Improve documentation on reduce pointing out that
     input keys and values will be reused. (omalley)
 
+    HADOOP-3487. Balancer uses thread pools for managing its threads;
+    therefore provides better resource management. (hairong)
+
   BUG FIXES
 
     HADOOP-2159 Namenode stuck in safemode. The counter blockSafe should

+ 36 - 28
src/hdfs/org/apache/hadoop/dfs/Balancer.java

@@ -42,6 +42,10 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
@@ -204,6 +208,13 @@ public class Balancer implements Tool {
   
   private double avgUtilization = 0.0D;
   
+  final private int MOVER_THREAD_POOL_SIZE = 1000;
+  final private ExecutorService moverExecutor = 
+    Executors.newFixedThreadPool(MOVER_THREAD_POOL_SIZE);
+  final private int DISPATCHER_THREAD_POOL_SIZE = 200;
+  final private ExecutorService dispatcherExecutor =
+    Executors.newFixedThreadPool(DISPATCHER_THREAD_POOL_SIZE);
+  
   /* This class keeps track of a scheduled block move */
   private class PendingBlockMove {
     private BalancerBlock block;
@@ -372,22 +383,13 @@ public class Balancer implements Tool {
     
     /* start a thread to dispatch the block move */
     private void scheduleBlockMove() {
-      BlockMover blockMover = new BlockMover();
-      blockMover.setDaemon(true);
-      blockMover.setName("Block mover for "+ block.getBlockId() +
-          " from " + proxySource.getName() + " to " + target.getName());
-      LOG.info("Starting " + blockMover.getName());
-      blockMover.start();
-    }
-    
-    /* A thread for moving a block */
-    private class BlockMover extends Thread {
-      BlockMover() {
-      }
-      
-      public void run() {
-        dispatch();
-      }
+      moverExecutor.execute(new Runnable() {
+        public void run() {
+          LOG.info("Starting moving "+ block.getBlockId() +
+              " from " + proxySource.getName() + " to " + target.getName());
+          dispatch();
+        }
+      });
     }
   }
   
@@ -580,7 +582,7 @@ public class Balancer implements Tool {
     
     /* A thread that initiates a block move 
      * and waits for block move to complete */
-    private class BlockMoveDispatcher extends Thread {
+    private class BlockMoveDispatcher implements Runnable {
       public void run() {
         dispatchBlocks();
       }
@@ -1187,25 +1189,26 @@ public class Balancer implements Tool {
    * blocked if there are too many un-confirmed block moves.
    * Return the total number of bytes successfully moved in this iteration.
    */
-  private long dispatchBlockMoves() {
+  private long dispatchBlockMoves() throws InterruptedException {
     long bytesLastMoved = bytesMoved.get();
-    Source.BlockMoveDispatcher dispatchers[] =
-      new Source.BlockMoveDispatcher[sources.size()];
+    Future<?>[] futures = new Future<?>[sources.size()];
     int i=0;
     for (Source source : sources) {
-      dispatchers[i] = source.new BlockMoveDispatcher();
-      dispatchers[i].setName("Dispatcher for source " + source.getName());
-      LOG.info("Starting " + dispatchers[i].getName());
-      dispatchers[i++].start();
+      futures[i++] = dispatcherExecutor.submit(source.new BlockMoveDispatcher());
     }
-    for (Source.BlockMoveDispatcher dispatcher : dispatchers) {
+    
+    // wait for all dispatcher threads to finish
+    for (Future<?> future : futures) {
       try {
-        dispatcher.join();
-      } catch (InterruptedException e) {
-        LOG.info(StringUtils.stringifyException(e));
+        future.get();
+      } catch (ExecutionException e) {
+        LOG.warn("Dispatcher thread failed", e.getCause());
       }
     }
+    
+    // wait for all block moving to be done
     waitForMoveCompletion();
+    
     return bytesMoved.get()-bytesLastMoved;
   }
   
@@ -1449,6 +1452,11 @@ public class Balancer implements Tool {
           " . Exiting...");
       return IO_EXCEPTION;
     } finally {
+      // shutdown thread pools
+      dispatcherExecutor.shutdownNow();
+      moverExecutor.shutdownNow();
+
+      // close the output file
       IOUtils.closeStream(out); 
       try {
         fs.delete(BALANCER_ID_PATH, true);