|
@@ -341,6 +341,27 @@ class BPServiceActor implements Runnable {
|
|
|
return mapForStorage;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Add a blockInfo for notification to NameNode. If another entry
|
|
|
+ * exists for the same block it is removed.
|
|
|
+ *
|
|
|
+ * Caller must synchronize access using pendingIncrementalBRperStorage.
|
|
|
+ * @param bInfo
|
|
|
+ * @param storageUuid
|
|
|
+ */
|
|
|
+ void addPendingReplicationBlockInfo(ReceivedDeletedBlockInfo bInfo,
|
|
|
+ String storageUuid) {
|
|
|
+ // Make sure another entry for the same block is first removed.
|
|
|
+ // There may only be one such entry.
|
|
|
+ for (Map.Entry<String, PerStoragePendingIncrementalBR> entry :
|
|
|
+ pendingIncrementalBRperStorage.entrySet()) {
|
|
|
+ if (entry.getValue().removeBlockInfo(bInfo)) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ getIncrementalBRMapForStorage(storageUuid).putBlockInfo(bInfo);
|
|
|
+ }
|
|
|
+
|
|
|
/*
|
|
|
* Informing the name node could take a long long time! Should we wait
|
|
|
* till namenode is informed before responding with success to the
|
|
@@ -349,7 +370,7 @@ class BPServiceActor implements Runnable {
|
|
|
void notifyNamenodeBlockImmediately(
|
|
|
ReceivedDeletedBlockInfo bInfo, String storageUuid) {
|
|
|
synchronized (pendingIncrementalBRperStorage) {
|
|
|
- getIncrementalBRMapForStorage(storageUuid).putBlockInfo(bInfo);
|
|
|
+ addPendingReplicationBlockInfo(bInfo, storageUuid);
|
|
|
pendingReceivedRequests++;
|
|
|
pendingIncrementalBRperStorage.notifyAll();
|
|
|
}
|
|
@@ -358,7 +379,7 @@ class BPServiceActor implements Runnable {
|
|
|
void notifyNamenodeDeletedBlock(
|
|
|
ReceivedDeletedBlockInfo bInfo, String storageUuid) {
|
|
|
synchronized (pendingIncrementalBRperStorage) {
|
|
|
- getIncrementalBRMapForStorage(storageUuid).putBlockInfo(bInfo);
|
|
|
+ addPendingReplicationBlockInfo(bInfo, storageUuid);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -880,5 +901,17 @@ class BPServiceActor implements Runnable {
|
|
|
void putBlockInfo(ReceivedDeletedBlockInfo blockInfo) {
|
|
|
pendingIncrementalBR.put(blockInfo.getBlock().getBlockId(), blockInfo);
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Remove pending incremental block report for a single block if it
|
|
|
+ * exists.
|
|
|
+ *
|
|
|
+ * @param blockInfo
|
|
|
+ * @return true if a report was removed, false if no report existed for
|
|
|
+ * the given block.
|
|
|
+ */
|
|
|
+ boolean removeBlockInfo(ReceivedDeletedBlockInfo blockInfo) {
|
|
|
+ return (pendingIncrementalBR.remove(blockInfo.getBlock().getBlockId()) != null);
|
|
|
+ }
|
|
|
}
|
|
|
}
|