|
@@ -118,6 +118,7 @@ public class Dispatcher {
|
|
|
|
|
|
/** The maximum number of concurrent blocks moves at a datanode */
|
|
|
private final int maxConcurrentMovesPerNode;
|
|
|
+ private final int maxMoverThreads;
|
|
|
|
|
|
private final long getBlocksSize;
|
|
|
private final long getBlocksMinBlockSize;
|
|
@@ -131,11 +132,13 @@ public class Dispatcher {
|
|
|
static class Allocator {
|
|
|
private final int max;
|
|
|
private int count = 0;
|
|
|
+ private int lotSize = 1;
|
|
|
|
|
|
Allocator(int max) {
|
|
|
this.max = max;
|
|
|
}
|
|
|
|
|
|
+ /** Allocate specified number of items */
|
|
|
synchronized int allocate(int n) {
|
|
|
final int remaining = max - count;
|
|
|
if (remaining <= 0) {
|
|
@@ -147,9 +150,19 @@ public class Dispatcher {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /** Aloocate a single lot of items */
|
|
|
+ int allocate() {
|
|
|
+ return allocate(lotSize);
|
|
|
+ }
|
|
|
+
|
|
|
synchronized void reset() {
|
|
|
count = 0;
|
|
|
}
|
|
|
+
|
|
|
+ /** Set the lot size */
|
|
|
+ synchronized void setLotSize(int lotSize) {
|
|
|
+ this.lotSize = lotSize;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private static class GlobalBlockMap {
|
|
@@ -905,6 +918,7 @@ public class Dispatcher {
|
|
|
this.dispatchExecutor = dispatcherThreads == 0? null
|
|
|
: Executors.newFixedThreadPool(dispatcherThreads);
|
|
|
this.moverThreadAllocator = new Allocator(moverThreads);
|
|
|
+ this.maxMoverThreads = moverThreads;
|
|
|
this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
|
|
|
|
|
|
this.getBlocksSize = getBlocksSize;
|
|
@@ -999,7 +1013,7 @@ public class Dispatcher {
|
|
|
final DDatanode targetDn = p.target.getDDatanode();
|
|
|
ExecutorService moveExecutor = targetDn.getMoveExecutor();
|
|
|
if (moveExecutor == null) {
|
|
|
- final int nThreads = moverThreadAllocator.allocate(maxConcurrentMovesPerNode);
|
|
|
+ final int nThreads = moverThreadAllocator.allocate();
|
|
|
if (nThreads > 0) {
|
|
|
moveExecutor = targetDn.initMoveExecutor(nThreads);
|
|
|
}
|
|
@@ -1050,6 +1064,25 @@ public class Dispatcher {
|
|
|
LOG.debug("Disperse Interval sec = " +
|
|
|
concurrentThreads / BALANCER_NUM_RPC_PER_SEC);
|
|
|
}
|
|
|
+
|
|
|
+ // Determine the size of each mover thread pool per target
|
|
|
+ int threadsPerTarget = maxMoverThreads/targets.size();
|
|
|
+ if (threadsPerTarget == 0) {
|
|
|
+ // Some scheduled moves will get ignored as some targets won't have
|
|
|
+ // any threads allocated.
|
|
|
+ moverThreadAllocator.setLotSize(1);
|
|
|
+ LOG.warn(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY + "=" +
|
|
|
+ maxMoverThreads + " is too small for moving blocks to " +
|
|
|
+ targets.size() + " targets. Balancing may be slower.");
|
|
|
+ } else {
|
|
|
+ if (threadsPerTarget > maxConcurrentMovesPerNode) {
|
|
|
+ threadsPerTarget = maxConcurrentMovesPerNode;
|
|
|
+ LOG.info("Limiting threads per target to the specified max.");
|
|
|
+ }
|
|
|
+ moverThreadAllocator.setLotSize(threadsPerTarget);
|
|
|
+ LOG.info("Allocating " + threadsPerTarget + " threads per target.");
|
|
|
+ }
|
|
|
+
|
|
|
long dSec = 0;
|
|
|
final Iterator<Source> i = sources.iterator();
|
|
|
for (int j = 0; j < futures.length; j++) {
|