|
@@ -106,6 +106,7 @@ import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
|
|
@@ -348,6 +349,8 @@ public class DataNode extends Configured
|
|
|
ThreadGroup threadGroup = null;
|
|
|
long blockReportInterval;
|
|
|
boolean resetBlockReportTime = true;
|
|
|
+ long deleteReportInterval;
|
|
|
+ long lastDeletedReport = 0;
|
|
|
long initialBlockReportDelay = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT * 1000L;
|
|
|
long heartBeatInterval;
|
|
|
private boolean heartbeatsDisabledForTests = false;
|
|
@@ -458,6 +461,7 @@ public class DataNode extends Configured
|
|
|
this.heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
|
|
|
DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L;
|
|
|
|
|
|
+ this.deleteReportInterval = 100 * heartBeatInterval;
|
|
|
// do we need to sync block file contents to disk when blockfile is closed?
|
|
|
this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY,
|
|
|
DFS_DATANODE_SYNCONCLOSE_DEFAULT);
|
|
@@ -643,6 +647,17 @@ public class DataNode extends Configured
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // calls specific to BP
|
|
|
+ protected void notifyNamenodeDeletedBlock(ExtendedBlock block) {
|
|
|
+ BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
|
|
|
+ if (bpos != null) {
|
|
|
+ bpos.notifyNamenodeDeletedBlock(block);
|
|
|
+ } else {
|
|
|
+ LOG.warn("Cannot find BPOfferService for reporting block deleted for bpid="
|
|
|
+ + block.getBlockPoolId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public void reportBadBlocks(ExtendedBlock block) throws IOException{
|
|
|
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
|
|
|
if(bpos == null || bpos.bpNamenode == null) {
|
|
@@ -677,8 +692,9 @@ public class DataNode extends Configured
|
|
|
private String blockPoolId;
|
|
|
private long lastHeartbeat = 0;
|
|
|
private volatile boolean initialized = false;
|
|
|
- private final LinkedList<Block> receivedBlockList = new LinkedList<Block>();
|
|
|
- private final LinkedList<String> delHints = new LinkedList<String>();
|
|
|
+ private final LinkedList<ReceivedDeletedBlockInfo> receivedAndDeletedBlockList
|
|
|
+ = new LinkedList<ReceivedDeletedBlockInfo>();
|
|
|
+ private volatile int pendingReceivedRequests = 0;
|
|
|
private volatile boolean shouldServiceRun = true;
|
|
|
private boolean isBlockTokenInitialized = false;
|
|
|
UpgradeManagerDatanode upgradeManager = null;
|
|
@@ -848,41 +864,33 @@ public class DataNode extends Configured
|
|
|
|
|
|
/**
|
|
|
* Report received blocks and delete hints to the Namenode
|
|
|
+ *
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- private void reportReceivedBlocks() throws IOException {
|
|
|
- //check if there are newly received blocks
|
|
|
- Block [] blockArray=null;
|
|
|
- String [] delHintArray=null;
|
|
|
- synchronized(receivedBlockList) {
|
|
|
- synchronized(delHints){
|
|
|
- int numBlocks = receivedBlockList.size();
|
|
|
- if (numBlocks > 0) {
|
|
|
- if(numBlocks!=delHints.size()) {
|
|
|
- LOG.warn("Panic: receiveBlockList and delHints are not of " +
|
|
|
- "the same length" );
|
|
|
- }
|
|
|
- //
|
|
|
- // Send newly-received blockids to namenode
|
|
|
- //
|
|
|
- blockArray = receivedBlockList.toArray(new Block[numBlocks]);
|
|
|
- delHintArray = delHints.toArray(new String[numBlocks]);
|
|
|
- }
|
|
|
+ private void reportReceivedDeletedBlocks() throws IOException {
|
|
|
+
|
|
|
+ // check if there are newly received blocks
|
|
|
+ ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null;
|
|
|
+ int currentReceivedRequestsCounter;
|
|
|
+ synchronized (receivedAndDeletedBlockList) {
|
|
|
+ currentReceivedRequestsCounter = pendingReceivedRequests;
|
|
|
+ int numBlocks = receivedAndDeletedBlockList.size();
|
|
|
+ if (numBlocks > 0) {
|
|
|
+ //
|
|
|
+ // Send newly-received and deleted blockids to namenode
|
|
|
+ //
|
|
|
+ receivedAndDeletedBlockArray = receivedAndDeletedBlockList
|
|
|
+ .toArray(new ReceivedDeletedBlockInfo[numBlocks]);
|
|
|
}
|
|
|
}
|
|
|
- if (blockArray != null) {
|
|
|
- if(delHintArray == null || delHintArray.length != blockArray.length ) {
|
|
|
- LOG.warn("Panic: block array & delHintArray are not the same" );
|
|
|
- }
|
|
|
- bpNamenode.blockReceived(bpRegistration, blockPoolId, blockArray,
|
|
|
- delHintArray);
|
|
|
- synchronized(receivedBlockList) {
|
|
|
- synchronized(delHints){
|
|
|
- for(int i=0; i<blockArray.length; i++) {
|
|
|
- receivedBlockList.remove(blockArray[i]);
|
|
|
- delHints.remove(delHintArray[i]);
|
|
|
- }
|
|
|
+ if (receivedAndDeletedBlockArray != null) {
|
|
|
+ bpNamenode.blockReceivedAndDeleted(bpRegistration, blockPoolId,
|
|
|
+ receivedAndDeletedBlockArray);
|
|
|
+ synchronized (receivedAndDeletedBlockList) {
|
|
|
+ for (int i = 0; i < receivedAndDeletedBlockArray.length; i++) {
|
|
|
+ receivedAndDeletedBlockList.remove(receivedAndDeletedBlockArray[i]);
|
|
|
}
|
|
|
+ pendingReceivedRequests -= currentReceivedRequestsCounter;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -893,23 +901,39 @@ public class DataNode extends Configured
|
|
|
* client? For now we don't.
|
|
|
*/
|
|
|
void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
|
|
|
- if(block==null || delHint==null) {
|
|
|
- throw new IllegalArgumentException(
|
|
|
- block==null?"Block is null":"delHint is null");
|
|
|
+ if (block == null || delHint == null) {
|
|
|
+ throw new IllegalArgumentException(block == null ? "Block is null"
|
|
|
+ : "delHint is null");
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if (!block.getBlockPoolId().equals(blockPoolId)) {
|
|
|
- LOG.warn("BlockPool mismatch " + block.getBlockPoolId() +
|
|
|
- " vs. " + blockPoolId);
|
|
|
+ LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. "
|
|
|
+ + blockPoolId);
|
|
|
return;
|
|
|
}
|
|
|
-
|
|
|
- synchronized (receivedBlockList) {
|
|
|
- synchronized (delHints) {
|
|
|
- receivedBlockList.add(block.getLocalBlock());
|
|
|
- delHints.add(delHint);
|
|
|
- receivedBlockList.notifyAll();
|
|
|
- }
|
|
|
+
|
|
|
+ synchronized (receivedAndDeletedBlockList) {
|
|
|
+ receivedAndDeletedBlockList.add(new ReceivedDeletedBlockInfo(block
|
|
|
+ .getLocalBlock(), delHint));
|
|
|
+ pendingReceivedRequests++;
|
|
|
+ receivedAndDeletedBlockList.notifyAll();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void notifyNamenodeDeletedBlock(ExtendedBlock block) {
|
|
|
+ if (block == null) {
|
|
|
+ throw new IllegalArgumentException("Block is null");
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!block.getBlockPoolId().equals(blockPoolId)) {
|
|
|
+ LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. "
|
|
|
+ + blockPoolId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized (receivedAndDeletedBlockList) {
|
|
|
+ receivedAndDeletedBlockList.add(new ReceivedDeletedBlockInfo(block
|
|
|
+ .getLocalBlock(), ReceivedDeletedBlockInfo.TODELETE_HINT));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1027,7 +1051,8 @@ public class DataNode extends Configured
|
|
|
* forever calling remote NameNode functions.
|
|
|
*/
|
|
|
private void offerService() throws Exception {
|
|
|
- LOG.info("For namenode " + nnAddr + " using BLOCKREPORT_INTERVAL of "
|
|
|
+ LOG.info("For namenode " + nnAddr + " using DELETEREPORT_INTERVAL of "
|
|
|
+ + deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of "
|
|
|
+ blockReportInterval + "msec" + " Initial delay: "
|
|
|
+ initialBlockReportDelay + "msec" + "; heartBeatInterval="
|
|
|
+ heartBeatInterval);
|
|
@@ -1058,8 +1083,11 @@ public class DataNode extends Configured
|
|
|
continue;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- reportReceivedBlocks();
|
|
|
+ if (pendingReceivedRequests > 0
|
|
|
+ || (startTime - lastDeletedReport > deleteReportInterval)) {
|
|
|
+ reportReceivedDeletedBlocks();
|
|
|
+ lastDeletedReport = startTime;
|
|
|
+ }
|
|
|
|
|
|
DatanodeCommand cmd = blockReport();
|
|
|
processCommand(cmd);
|
|
@@ -1075,10 +1103,10 @@ public class DataNode extends Configured
|
|
|
//
|
|
|
long waitTime = heartBeatInterval -
|
|
|
(System.currentTimeMillis() - lastHeartbeat);
|
|
|
- synchronized(receivedBlockList) {
|
|
|
- if (waitTime > 0 && receivedBlockList.size() == 0) {
|
|
|
+ synchronized(receivedAndDeletedBlockList) {
|
|
|
+ if (waitTime > 0 && pendingReceivedRequests == 0) {
|
|
|
try {
|
|
|
- receivedBlockList.wait(waitTime);
|
|
|
+ receivedAndDeletedBlockList.wait(waitTime);
|
|
|
} catch (InterruptedException ie) {
|
|
|
LOG.warn("BPOfferService for block pool="
|
|
|
+ this.getBlockPoolId() + " received exception:" + ie);
|