|
@@ -134,6 +134,8 @@ public class Dispatcher {
|
|
private final boolean connectToDnViaHostname;
|
|
private final boolean connectToDnViaHostname;
|
|
private BlockPlacementPolicy placementPolicy;
|
|
private BlockPlacementPolicy placementPolicy;
|
|
|
|
|
|
|
|
+ private long maxIterationTime;
|
|
|
|
+
|
|
static class Allocator {
|
|
static class Allocator {
|
|
private final int max;
|
|
private final int max;
|
|
private int count = 0;
|
|
private int count = 0;
|
|
@@ -335,12 +337,18 @@ public class Dispatcher {
|
|
|
|
|
|
/** Dispatch the move to the proxy source & wait for the response. */
|
|
/** Dispatch the move to the proxy source & wait for the response. */
|
|
private void dispatch() {
|
|
private void dispatch() {
|
|
- LOG.info("Start moving " + this);
|
|
|
|
-
|
|
|
|
Socket sock = new Socket();
|
|
Socket sock = new Socket();
|
|
DataOutputStream out = null;
|
|
DataOutputStream out = null;
|
|
DataInputStream in = null;
|
|
DataInputStream in = null;
|
|
try {
|
|
try {
|
|
|
|
+ if (source.isIterationOver()){
|
|
|
|
+ LOG.info("Cancel moving " + this +
|
|
|
|
+ " as iteration is already cancelled due to" +
|
|
|
|
+ " dfs.balancer.max-iteration-time is passed.");
|
|
|
|
+ throw new IOException("Block move cancelled.");
|
|
|
|
+ }
|
|
|
|
+ LOG.info("Start moving " + this);
|
|
|
|
+
|
|
sock.connect(
|
|
sock.connect(
|
|
NetUtils.createSocketAddr(target.getDatanodeInfo().
|
|
NetUtils.createSocketAddr(target.getDatanodeInfo().
|
|
getXferAddr(Dispatcher.this.connectToDnViaHostname)),
|
|
getXferAddr(Dispatcher.this.connectToDnViaHostname)),
|
|
@@ -679,7 +687,10 @@ public class Dispatcher {
|
|
* Check if the iteration is over
|
|
* Check if the iteration is over
|
|
*/
|
|
*/
|
|
public boolean isIterationOver() {
|
|
public boolean isIterationOver() {
|
|
- return (Time.monotonicNow()-startTime > MAX_ITERATION_TIME);
|
|
|
|
|
|
+ if (maxIterationTime < 0){
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ return (Time.monotonicNow()-startTime > maxIterationTime);
|
|
}
|
|
}
|
|
|
|
|
|
/** Add a task */
|
|
/** Add a task */
|
|
@@ -811,8 +822,6 @@ public class Dispatcher {
|
|
return blocksToReceive > 0;
|
|
return blocksToReceive > 0;
|
|
}
|
|
}
|
|
|
|
|
|
- private static final long MAX_ITERATION_TIME = 20 * 60 * 1000L; // 20 mins
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* This method iteratively does the following: it first selects a block to
|
|
* This method iteratively does the following: it first selects a block to
|
|
* move, then sends a request to the proxy source to start the block move
|
|
* move, then sends a request to the proxy source to start the block move
|
|
@@ -881,7 +890,7 @@ public class Dispatcher {
|
|
}
|
|
}
|
|
|
|
|
|
if (isIterationOver()) {
|
|
if (isIterationOver()) {
|
|
- LOG.info("The maximum iteration time (" + MAX_ITERATION_TIME/1000
|
|
|
|
|
|
+ LOG.info("The maximum iteration time (" + maxIterationTime/1000
|
|
+ " seconds) has been reached. Stopping " + this);
|
|
+ " seconds) has been reached. Stopping " + this);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -904,14 +913,14 @@ public class Dispatcher {
|
|
int maxNoMoveInterval, Configuration conf) {
|
|
int maxNoMoveInterval, Configuration conf) {
|
|
this(nnc, includedNodes, excludedNodes, movedWinWidth,
|
|
this(nnc, includedNodes, excludedNodes, movedWinWidth,
|
|
moverThreads, dispatcherThreads, maxConcurrentMovesPerNode,
|
|
moverThreads, dispatcherThreads, maxConcurrentMovesPerNode,
|
|
- 0L, 0L, 0, maxNoMoveInterval, conf);
|
|
|
|
|
|
+ 0L, 0L, 0, maxNoMoveInterval, -1, conf);
|
|
}
|
|
}
|
|
|
|
|
|
Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
|
|
Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
|
|
Set<String> excludedNodes, long movedWinWidth, int moverThreads,
|
|
Set<String> excludedNodes, long movedWinWidth, int moverThreads,
|
|
int dispatcherThreads, int maxConcurrentMovesPerNode,
|
|
int dispatcherThreads, int maxConcurrentMovesPerNode,
|
|
- long getBlocksSize, long getBlocksMinBlockSize,
|
|
|
|
- int blockMoveTimeout, int maxNoMoveInterval, Configuration conf) {
|
|
|
|
|
|
+ long getBlocksSize, long getBlocksMinBlockSize, int blockMoveTimeout,
|
|
|
|
+ int maxNoMoveInterval, long maxIterationTime, Configuration conf) {
|
|
this.nnc = nnc;
|
|
this.nnc = nnc;
|
|
this.excludedNodes = excludedNodes;
|
|
this.excludedNodes = excludedNodes;
|
|
this.includedNodes = includedNodes;
|
|
this.includedNodes = includedNodes;
|
|
@@ -939,6 +948,7 @@ public class Dispatcher {
|
|
HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
|
|
HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
|
|
this.placementPolicy =
|
|
this.placementPolicy =
|
|
BlockPlacementPolicy.getInstance(conf, null, cluster, null);
|
|
BlockPlacementPolicy.getInstance(conf, null, cluster, null);
|
|
|
|
+ this.maxIterationTime = maxIterationTime;
|
|
}
|
|
}
|
|
|
|
|
|
public DistributedFileSystem getDistributedFileSystem() {
|
|
public DistributedFileSystem getDistributedFileSystem() {
|