|
@@ -112,14 +112,14 @@ class PendingDataNodeMessages {
|
|
|
if (storageInfo == null || block == null) {
|
|
|
return;
|
|
|
}
|
|
|
- block = new Block(block);
|
|
|
- Queue<ReportedBlockInfo> queue = null;
|
|
|
+ Block blk = new Block(block);
|
|
|
if (BlockIdManager.isStripedBlockID(block.getBlockId())) {
|
|
|
- Block blkId = new Block(BlockIdManager.convertToStripedID(block
|
|
|
+ blk = new Block(BlockIdManager.convertToStripedID(block
|
|
|
.getBlockId()));
|
|
|
- queue = getBlockQueue(blkId);
|
|
|
- } else {
|
|
|
- queue = getBlockQueue(block);
|
|
|
+ }
|
|
|
+ Queue<ReportedBlockInfo> queue = queueByBlockId.get(blk);
|
|
|
+ if (queue == null) {
|
|
|
+ return;
|
|
|
}
|
|
|
// We only want the latest non-future reported block to be queued for each
|
|
|
// DataNode. Otherwise, there can be a race condition that causes an old
|
|
@@ -130,6 +130,11 @@ class PendingDataNodeMessages {
|
|
|
if (queue.removeIf(rbi -> storageInfo.equals(rbi.storageInfo))) {
|
|
|
count -= (size - queue.size());
|
|
|
}
|
|
|
+ // If the block message queue is now empty, we should remove the block
|
|
|
+ // from the queue.
|
|
|
+ if (queue.isEmpty()) {
|
|
|
+ queueByBlockId.remove(blk);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|