|
@@ -146,6 +146,45 @@ public class DataNode extends Configured
|
|
|
private static final int MAX_XCEIVER_COUNT = 256;
|
|
|
private int maxXceiverCount = MAX_XCEIVER_COUNT;
|
|
|
|
|
|
+ /** A manager to make sure that cluster balancing does not
|
|
|
+ * take too much resources.
|
|
|
+ *
|
|
|
+ * It limits the number of block moves for balancing and
|
|
|
+ * the total amount of bandwidth they can use.
|
|
|
+ */
|
|
|
+ private static class BlockBalanceThrottler extends Throttler {
|
|
|
+ private int numThreads;
|
|
|
+
|
|
|
+ /**Constructor
|
|
|
+ *
|
|
|
+ * @param bandwidth Total amount of bandwidth can be used for balancing
|
|
|
+ */
|
|
|
+ private BlockBalanceThrottler(long bandwidth) {
|
|
|
+ super(bandwidth);
|
|
|
+ LOG.info("Balancing bandwith is "+ bandwidth + " bytes/s");
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Check if the block move can start.
|
|
|
+ *
|
|
|
+ * Return true if the thread quota is not exceeded and
|
|
|
+ * the counter is incremented; False otherwise.
|
|
|
+ */
|
|
|
+ private synchronized boolean acquire() {
|
|
|
+ if (numThreads >= Balancer.MAX_NUM_CONCURRENT_MOVES) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ numThreads++;
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Mark that the move is completed. The thread counter is decremented. */
|
|
|
+ private synchronized void release() {
|
|
|
+ numThreads--;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private BlockBalanceThrottler balancingThrottler;
|
|
|
+
|
|
|
/**
|
|
|
* We need an estimate for block size to check if the disk partition has
|
|
|
* enough space. For now we set it to be the default block size set
|
|
@@ -156,12 +195,6 @@ public class DataNode extends Configured
|
|
|
*/
|
|
|
private long estimateBlockSize;
|
|
|
|
|
|
- // The following three fields are to support balancing
|
|
|
- final static short MAX_BALANCING_THREADS = 5;
|
|
|
- private Semaphore balancingSem = new Semaphore(MAX_BALANCING_THREADS);
|
|
|
- long balanceBandwidth;
|
|
|
- private Throttler balancingThrottler;
|
|
|
-
|
|
|
// For InterDataNodeProtocol
|
|
|
Server ipcServer;
|
|
|
|
|
@@ -307,10 +340,8 @@ public class DataNode extends Configured
|
|
|
this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
|
|
|
DataNode.nameNodeAddr = nameNodeAddr;
|
|
|
|
|
|
- //set up parameter for cluster balancing
|
|
|
- this.balanceBandwidth = conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024);
|
|
|
- LOG.info("Balancing bandwith is "+balanceBandwidth + " bytes/s");
|
|
|
- this.balancingThrottler = new Throttler(balanceBandwidth);
|
|
|
+ this.balancingThrottler = new BlockBalanceThrottler(
|
|
|
+ conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024));
|
|
|
|
|
|
//initialize periodic block scanner
|
|
|
String reason = null;
|
|
@@ -1364,13 +1395,18 @@ public class DataNode extends Configured
|
|
|
DatanodeInfo target = new DatanodeInfo(); // read target
|
|
|
target.readFields(in);
|
|
|
|
|
|
+ if (!balancingThrottler.acquire()) { // not able to start
|
|
|
+ LOG.info("Not able to copy block " + blockId + " to "
|
|
|
+ + s.getRemoteSocketAddress() + " because threads quota is exceeded.");
|
|
|
+ sendResponse(s, (short)OP_STATUS_ERROR, socketWriteTimeout);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
Socket targetSock = null;
|
|
|
short opStatus = OP_STATUS_SUCCESS;
|
|
|
BlockSender blockSender = null;
|
|
|
DataOutputStream targetOut = null;
|
|
|
try {
|
|
|
- balancingSem.acquireUninterruptibly();
|
|
|
-
|
|
|
// check if the block exists or not
|
|
|
blockSender = new BlockSender(block, 0, -1, false, false, false);
|
|
|
|
|
@@ -1410,6 +1446,9 @@ public class DataNode extends Configured
|
|
|
+ target.getName() + ": " + StringUtils.stringifyException(ioe));
|
|
|
throw ioe;
|
|
|
} finally {
|
|
|
+ // now release the thread resource
|
|
|
+ balancingThrottler.release();
|
|
|
+
|
|
|
/* send response to the requester */
|
|
|
try {
|
|
|
sendResponse(s, opStatus, socketWriteTimeout);
|
|
@@ -1420,7 +1459,6 @@ public class DataNode extends Configured
|
|
|
}
|
|
|
IOUtils.closeStream(targetOut);
|
|
|
IOUtils.closeStream(blockSender);
|
|
|
- balancingSem.release();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1433,12 +1471,17 @@ public class DataNode extends Configured
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
private void replaceBlock(DataInputStream in) throws IOException {
|
|
|
- balancingSem.acquireUninterruptibly();
|
|
|
-
|
|
|
/* read header */
|
|
|
- Block block = new Block(in.readLong(), estimateBlockSize, in.readLong()); // block id & len
|
|
|
+ long blockId = in.readLong();
|
|
|
+ Block block = new Block(blockId, estimateBlockSize, in.readLong()); // block id & len
|
|
|
String sourceID = Text.readString(in);
|
|
|
|
|
|
+ if (!balancingThrottler.acquire()) { // not able to start
|
|
|
+ LOG.warn("Not able to receive block " + blockId + " from "
|
|
|
+ + s.getRemoteSocketAddress() + " because threads quota is exceeded.");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
short opStatus = OP_STATUS_SUCCESS;
|
|
|
BlockReceiver blockReceiver = null;
|
|
|
try {
|
|
@@ -1458,6 +1501,8 @@ public class DataNode extends Configured
|
|
|
opStatus = OP_STATUS_ERROR;
|
|
|
throw ioe;
|
|
|
} finally {
|
|
|
+ balancingThrottler.release();
|
|
|
+
|
|
|
// send response back
|
|
|
try {
|
|
|
sendResponse(s, opStatus, socketWriteTimeout);
|
|
@@ -1465,7 +1510,6 @@ public class DataNode extends Configured
|
|
|
LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
|
|
|
}
|
|
|
IOUtils.closeStream(blockReceiver);
|
|
|
- balancingSem.release();
|
|
|
}
|
|
|
}
|
|
|
}
|