|
@@ -226,13 +226,9 @@ public class Balancer {
|
|
|
= new HashMap<String, BalancerDatanode>();
|
|
|
|
|
|
private NetworkTopology cluster;
|
|
|
- final static private int MOVER_THREAD_POOL_SIZE = 1000;
|
|
|
- final private ExecutorService moverExecutor =
|
|
|
- Executors.newFixedThreadPool(MOVER_THREAD_POOL_SIZE);
|
|
|
- final static private int DISPATCHER_THREAD_POOL_SIZE = 200;
|
|
|
- final private ExecutorService dispatcherExecutor =
|
|
|
- Executors.newFixedThreadPool(DISPATCHER_THREAD_POOL_SIZE);
|
|
|
-
|
|
|
+
|
|
|
+ private final ExecutorService moverExecutor;
|
|
|
+ private final ExecutorService dispatcherExecutor;
|
|
|
|
|
|
/* This class keeps track of a scheduled block move */
|
|
|
private class PendingBlockMove {
|
|
@@ -830,6 +826,13 @@ public class Balancer {
|
|
|
this.policy = p.policy;
|
|
|
this.nnc = theblockpool;
|
|
|
cluster = NetworkTopology.getInstance(conf);
|
|
|
+
|
|
|
+ this.moverExecutor = Executors.newFixedThreadPool(
|
|
|
+ conf.getInt(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY,
|
|
|
+ DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT));
|
|
|
+ this.dispatcherExecutor = Executors.newFixedThreadPool(
|
|
|
+ conf.getInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
|
|
|
+ DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT));
|
|
|
}
|
|
|
|
|
|
/* Shuffle datanode array */
|