|
@@ -445,11 +445,12 @@ public class DataNode extends Configured
|
|
|
// calls specific to BP
|
|
|
protected void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
|
|
|
BPOfferService bpos = bpMapping.get(block.getPoolId());
|
|
|
- if(bpos != null)
|
|
|
+ if(bpos != null) {
|
|
|
bpos.notifyNamenodeReceivedBlock(block, delHint);
|
|
|
- else
|
|
|
+ } else {
|
|
|
LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
|
|
|
+ block.getPoolId());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -623,7 +624,7 @@ public class DataNode extends Configured
|
|
|
if(delHintArray == null || delHintArray.length != blockArray.length ) {
|
|
|
LOG.warn("Panic: block array & delHintArray are not the same" );
|
|
|
}
|
|
|
- bpNamenode.blockReceived(dnRegistration, blockPoolId, blockArray,
|
|
|
+ bpNamenode.blockReceived(bpRegistration, blockPoolId, blockArray,
|
|
|
delHintArray);
|
|
|
synchronized(receivedBlockList) {
|
|
|
synchronized(delHints){
|
|
@@ -647,8 +648,8 @@ public class DataNode extends Configured
|
|
|
block==null?"Block is null":"delHint is null");
|
|
|
}
|
|
|
|
|
|
- if (block.getPoolId().equals(blockPoolId)) {
|
|
|
- LOG.warn("BlockPool is mismaptch " + block.getBlockId() +
|
|
|
+ if (!block.getPoolId().equals(blockPoolId)) {
|
|
|
+ LOG.warn("BlockPool mismatch " + block.getPoolId() +
|
|
|
" vs. " + blockPoolId);
|
|
|
return;
|
|
|
}
|
|
@@ -1074,6 +1075,7 @@ public class DataNode extends Configured
|
|
|
|
|
|
// get all the NNs configured
|
|
|
nameNodeThreads = getAllNamenodes(conf);
|
|
|
+ this.namenode = namenode;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1323,12 +1325,14 @@ public class DataNode extends Configured
|
|
|
}
|
|
|
|
|
|
// shutdown BPOS thread TODO:FEDERATION - review if this is enough
|
|
|
- for(BPOfferService bpos : nameNodeThreads) {
|
|
|
- if(bpos != null && bpos.bpThread!=null) {
|
|
|
- try {
|
|
|
- bpos.bpThread.interrupt();
|
|
|
- bpos.bpThread.join();
|
|
|
- } catch (InterruptedException ie) {}
|
|
|
+ if (nameNodeThreads != null) {
|
|
|
+ for(BPOfferService bpos : nameNodeThreads) {
|
|
|
+ if(bpos != null && bpos.bpThread!=null) {
|
|
|
+ try {
|
|
|
+ bpos.bpThread.interrupt();
|
|
|
+ bpos.bpThread.join();
|
|
|
+ } catch (InterruptedException ie) {}
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|