|
@@ -182,6 +182,9 @@ public class Balancer {
|
|
|
*/
|
|
|
public static final int MAX_NUM_CONCURRENT_MOVES = 5;
|
|
|
|
|
|
+ public static final long DELAY_AFTER_ERROR = 10*1000L; // 10 seconds
|
|
|
+ public static final int BLOCK_MOVE_READ_TIMEOUT = 20*60*1000; // 20 minutes
|
|
|
+
|
|
|
private final NameNodeConnector nnc;
|
|
|
private final BalancingPolicy policy;
|
|
|
private final double threshold;
|
|
@@ -307,7 +310,13 @@ public class Balancer {
|
|
|
sock.connect(NetUtils.createSocketAddr(
|
|
|
target.datanode.getName()), HdfsServerConstants.READ_TIMEOUT);
|
|
|
sock.setKeepAlive(true);
|
|
|
- sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
|
|
|
+ /* Unfortunately we don't have a good way to know if the Datanode is
|
|
|
+ * taking a really long time to move a block, OR something has
|
|
|
+ * gone wrong and it's never going to finish. To deal with this
|
|
|
+ * scenario, we set a long timeout (20 minutes) to avoid hanging
|
|
|
+ * the balancer indefinitely.
|
|
|
+ */
|
|
|
+ sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT);
|
|
|
out = new DataOutputStream( new BufferedOutputStream(
|
|
|
sock.getOutputStream(), HdfsConstants.IO_FILE_BUFFER_SIZE));
|
|
|
sendRequest(out);
|
|
@@ -326,6 +335,13 @@ public class Balancer {
|
|
|
target.getName() + " through " +
|
|
|
proxySource.getName() +
|
|
|
": "+e.getMessage());
|
|
|
+ /* proxy or target may have an issue, insert a small delay
|
|
|
+ * before using these nodes further. This avoids a potential storm
|
|
|
+ * of "threads quota exceeded" Warnings when the balancer
|
|
|
+ * gets out of sync with work going on in datanode.
|
|
|
+ */
|
|
|
+ proxySource.activateDelay(DELAY_AFTER_ERROR);
|
|
|
+ target.activateDelay(DELAY_AFTER_ERROR);
|
|
|
} finally {
|
|
|
IOUtils.closeStream(out);
|
|
|
IOUtils.closeStream(in);
|
|
@@ -468,6 +484,7 @@ public class Balancer {
|
|
|
final double utilization;
|
|
|
final long maxSize2Move;
|
|
|
protected long scheduledSize = 0L;
|
|
|
+ protected long delayUntil = 0L;
|
|
|
// blocks being moved but not confirmed yet
|
|
|
private List<PendingBlockMove> pendingBlocks =
|
|
|
new ArrayList<PendingBlockMove>(MAX_NUM_CONCURRENT_MOVES);
|
|
@@ -530,6 +547,18 @@ public class Balancer {
|
|
|
scheduledSize += size;
|
|
|
}
|
|
|
|
|
|
+ synchronized private void activateDelay(long delta) {
|
|
|
+ delayUntil = Util.now() + delta;
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized private boolean isDelayActive() {
|
|
|
+ if (delayUntil == 0 || Util.now() > delayUntil){
|
|
|
+ delayUntil = 0;
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
/* Check if the node can schedule more blocks to move */
|
|
|
synchronized private boolean isPendingQNotFull() {
|
|
|
if ( pendingBlocks.size() < MAX_NUM_CONCURRENT_MOVES ) {
|
|
@@ -546,7 +575,7 @@ public class Balancer {
|
|
|
/* Add a scheduled block move to the node */
|
|
|
private synchronized boolean addPendingBlock(
|
|
|
PendingBlockMove pendingBlock) {
|
|
|
- if (isPendingQNotFull()) {
|
|
|
+ if (!isDelayActive() && isPendingQNotFull()) {
|
|
|
return pendingBlocks.add(pendingBlock);
|
|
|
}
|
|
|
return false;
|