|
@@ -89,7 +89,6 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
int xmitsInProgress = 0;
|
|
|
Daemon dataXceiveServer = null;
|
|
|
long blockReportInterval;
|
|
|
- private long datanodeStartupPeriod;
|
|
|
|
|
|
/**
|
|
|
* Create the DataNode given a configuration and a dataDir.
|
|
@@ -128,8 +127,6 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
|
|
|
this.blockReportInterval =
|
|
|
blockReportIntervalBasis - new Random().nextInt((int)(blockReportIntervalBasis/10));
|
|
|
- this.datanodeStartupPeriod =
|
|
|
- conf.getLong("dfs.datanode.startupMsec", DATANODE_STARTUP_PERIOD);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -159,7 +156,6 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
*/
|
|
|
public void offerService() throws Exception {
|
|
|
long lastHeartbeat = 0, lastBlockReport = 0;
|
|
|
- long sendStart = System.currentTimeMillis();
|
|
|
LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec");
|
|
|
|
|
|
//
|
|
@@ -171,100 +167,91 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
//
|
|
|
// Every so often, send heartbeat or block-report
|
|
|
//
|
|
|
- synchronized (receivedBlockList) {
|
|
|
- if (now - lastHeartbeat > HEARTBEAT_INTERVAL) {
|
|
|
+ if (now - lastHeartbeat > HEARTBEAT_INTERVAL) {
|
|
|
+ //
|
|
|
+ // All heartbeat messages include following info:
|
|
|
+ // -- Datanode name
|
|
|
+ // -- data transfer port
|
|
|
+ // -- Total capacity
|
|
|
+ // -- Bytes remaining
|
|
|
+ //
|
|
|
+ BlockCommand cmd = namenode.sendHeartbeat(localName,
|
|
|
+ data.getCapacity(), data.getRemaining(), xmitsInProgress);
|
|
|
+ //LOG.info("Just sent heartbeat, with name " + localName);
|
|
|
+ lastHeartbeat = now;
|
|
|
+
|
|
|
+ if (cmd != null && cmd.transferBlocks()) {
|
|
|
//
|
|
|
- // All heartbeat messages include following info:
|
|
|
- // -- Datanode name
|
|
|
- // -- data transfer port
|
|
|
- // -- Total capacity
|
|
|
- // -- Bytes remaining
|
|
|
+ // Send a copy of a block to another datanode
|
|
|
//
|
|
|
- namenode.sendHeartbeat(localName, data.getCapacity(), data.getRemaining());
|
|
|
- //LOG.info("Just sent heartbeat, with name " + localName);
|
|
|
- lastHeartbeat = now;
|
|
|
- }
|
|
|
- if (now - lastBlockReport > blockReportInterval) {
|
|
|
+ Block blocks[] = cmd.getBlocks();
|
|
|
+ DatanodeInfo xferTargets[][] = cmd.getTargets();
|
|
|
+
|
|
|
+ for (int i = 0; i < blocks.length; i++) {
|
|
|
+ if (!data.isValidBlock(blocks[i])) {
|
|
|
+ String errStr = "Can't send invalid block " + blocks[i];
|
|
|
+ LOG.info(errStr);
|
|
|
+ namenode.errorReport(localName, errStr);
|
|
|
+ break;
|
|
|
+ } else {
|
|
|
+ if (xferTargets[i].length > 0) {
|
|
|
+ LOG.info("Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]);
|
|
|
+ new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else if (cmd != null && cmd.invalidateBlocks()) {
|
|
|
//
|
|
|
- // Send latest blockinfo report if timer has expired.
|
|
|
- // Get back a list of local block(s) that are obsolete
|
|
|
- // and can be safely GC'ed.
|
|
|
+ // Some local block(s) are obsolete and can be
|
|
|
+ // safely garbage-collected.
|
|
|
//
|
|
|
- Block toDelete[] = namenode.blockReport(localName, data.getBlockReport());
|
|
|
- data.invalidate(toDelete);
|
|
|
- lastBlockReport = now;
|
|
|
- continue;
|
|
|
- }
|
|
|
- if (receivedBlockList.size() > 0) {
|
|
|
+ data.invalidate(cmd.getBlocks());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // send block report
|
|
|
+ if (now - lastBlockReport > blockReportInterval) {
|
|
|
+ //
|
|
|
+ // Send latest blockinfo report if timer has expired.
|
|
|
+ // Get back a list of local block(s) that are obsolete
|
|
|
+ // and can be safely GC'ed.
|
|
|
+ //
|
|
|
+ Block toDelete[] = namenode.blockReport(localName, data.getBlockReport());
|
|
|
+ data.invalidate(toDelete);
|
|
|
+ lastBlockReport = now;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ // check if there are newly received blocks
|
|
|
+ Block [] blockArray=null;
|
|
|
+ synchronized( receivedBlockList ) {
|
|
|
+ if (receivedBlockList.size() > 0) {
|
|
|
//
|
|
|
// Send newly-received blockids to namenode
|
|
|
//
|
|
|
- Block blockArray[] = (Block[]) receivedBlockList.toArray(new Block[receivedBlockList.size()]);
|
|
|
+ blockArray = (Block[]) receivedBlockList.toArray(new Block[receivedBlockList.size()]);
|
|
|
receivedBlockList.removeAllElements();
|
|
|
- namenode.blockReceived(localName, blockArray);
|
|
|
- }
|
|
|
-
|
|
|
- //
|
|
|
- // Only perform block operations (transfer, delete) after
|
|
|
- // a startup quiet period. The assumption is that all the
|
|
|
- // datanodes will be started together, but the namenode may
|
|
|
- // have been started some time before. (This is esp. true in
|
|
|
- // the case of network interruptions.) So, wait for some time
|
|
|
- // to pass from the time of connection to the first block-transfer.
|
|
|
- // Otherwise we transfer a lot of blocks unnecessarily.
|
|
|
- //
|
|
|
- if (now - sendStart > datanodeStartupPeriod) {
|
|
|
- //
|
|
|
- // Check to see if there are any block-instructions from the
|
|
|
- // namenode that this datanode should perform.
|
|
|
- //
|
|
|
- BlockCommand cmd = namenode.getBlockwork(localName, xmitsInProgress);
|
|
|
- if (cmd != null && cmd.transferBlocks()) {
|
|
|
- //
|
|
|
- // Send a copy of a block to another datanode
|
|
|
- //
|
|
|
- Block blocks[] = cmd.getBlocks();
|
|
|
- DatanodeInfo xferTargets[][] = cmd.getTargets();
|
|
|
-
|
|
|
- for (int i = 0; i < blocks.length; i++) {
|
|
|
- if (!data.isValidBlock(blocks[i])) {
|
|
|
- String errStr = "Can't send invalid block " + blocks[i];
|
|
|
- LOG.info(errStr);
|
|
|
- namenode.errorReport(localName, errStr);
|
|
|
- break;
|
|
|
- } else {
|
|
|
- if (xferTargets[i].length > 0) {
|
|
|
- LOG.info("Starting thread to transfer block " + blocks[i]
|
|
|
- + " to " + xferTargets[i][0].getName()
|
|
|
- + (xferTargets[i].length > 1 ? " and "
|
|
|
- + (xferTargets[i].length-1) + " more destination(s)" : "" ));
|
|
|
- new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- } else if (cmd != null && cmd.invalidateBlocks()) {
|
|
|
- //
|
|
|
- // Some local block(s) are obsolete and can be
|
|
|
- // safely garbage-collected.
|
|
|
- //
|
|
|
- data.invalidate(cmd.getBlocks());
|
|
|
- }
|
|
|
}
|
|
|
-
|
|
|
- //
|
|
|
- // There is no work to do; sleep until hearbeat timer elapses,
|
|
|
- // or work arrives, and then iterate again.
|
|
|
- //
|
|
|
- long waitTime = HEARTBEAT_INTERVAL - (now - lastHeartbeat);
|
|
|
+ }
|
|
|
+ if( blockArray != null ) {
|
|
|
+ namenode.blockReceived(localName, blockArray);
|
|
|
+ }
|
|
|
+
|
|
|
+ //
|
|
|
+ // There is no work to do; sleep until hearbeat timer elapses,
|
|
|
+ // or work arrives, and then iterate again.
|
|
|
+ //
|
|
|
+ long waitTime = HEARTBEAT_INTERVAL - (System.currentTimeMillis() - lastHeartbeat);
|
|
|
+ synchronized( receivedBlockList ) {
|
|
|
if (waitTime > 0 && receivedBlockList.size() == 0) {
|
|
|
try {
|
|
|
receivedBlockList.wait(waitTime);
|
|
|
} catch (InterruptedException ie) {
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ } // synchronized
|
|
|
+ } // while (shouldRun)
|
|
|
+ } // offerService
|
|
|
|
|
|
/**
|
|
|
* Server used for receiving/sending a block of data.
|