|
@@ -193,6 +193,8 @@ public class Balancer implements Tool {
|
|
|
*/
|
|
|
public static final int MAX_NUM_CONCURRENT_MOVES = 5;
|
|
|
|
|
|
+ public static final int MAX_NO_PENDING_BLOCK_ITERATIONS = 5;
|
|
|
+
|
|
|
private Configuration conf;
|
|
|
|
|
|
private double threshold = 10D;
|
|
@@ -746,6 +748,7 @@ public class Balancer implements Tool {
|
|
|
long startTime = Util.now();
|
|
|
this.blocksToReceive = 2*scheduledSize;
|
|
|
boolean isTimeUp = false;
|
|
|
+ int noPendingBlockIteration = 0;
|
|
|
while(!isTimeUp && scheduledSize > 0 &&
|
|
|
(!srcBlockList.isEmpty() || blocksToReceive > 0)) {
|
|
|
PendingBlockMove pendingBlock = chooseNextBlockToMove();
|
|
@@ -769,7 +772,15 @@ public class Balancer implements Tool {
|
|
|
LOG.warn(StringUtils.stringifyException(e));
|
|
|
return;
|
|
|
}
|
|
|
- }
|
|
|
+ } else {
|
|
|
+ // source node cannot find a pendingBlockToMove, iteration +1
|
|
|
+ noPendingBlockIteration++;
|
|
|
+ // in case no blocks can be moved for source node's task,
|
|
|
+ // jump out of while-loop after 5 iterations.
|
|
|
+ if (noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_ITERATIONS) {
|
|
|
+ scheduledSize = 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
// check if time is up or not
|
|
|
if (Util.now()-startTime > MAX_ITERATION_TIME) {
|
|
@@ -1496,7 +1507,11 @@ public class Balancer implements Tool {
|
|
|
Formatter formatter = new Formatter(System.out);
|
|
|
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
|
|
|
int iterations = 0;
|
|
|
+
|
|
|
while (true) {
|
|
|
+ // clean all lists at the beginning of balancer iteration.
|
|
|
+ resetData();
|
|
|
+
|
|
|
/* get all live datanodes of a cluster and their disk usage
|
|
|
* decide the number of bytes need to be moved
|
|
|
*/
|
|
@@ -1547,9 +1562,6 @@ public class Balancer implements Tool {
|
|
|
return NO_MOVE_PROGRESS;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- // clean all lists
|
|
|
- resetData();
|
|
|
|
|
|
try {
|
|
|
Thread.sleep(2*conf.getLong("dfs.heartbeat.interval", 3));
|