|
@@ -98,7 +98,7 @@ class BPServiceActor implements Runnable {
|
|
|
* reported to the NN. Access should be synchronized on this object.
|
|
|
*/
|
|
|
private final Map<String, PerStoragePendingIncrementalBR>
|
|
|
- pendingIncrementalBRperStorage = Maps.newConcurrentMap();
|
|
|
+ pendingIncrementalBRperStorage = Maps.newHashMap();
|
|
|
|
|
|
private volatile int pendingReceivedRequests = 0;
|
|
|
private volatile boolean shouldServiceRun = true;
|
|
@@ -266,43 +266,54 @@ class BPServiceActor implements Runnable {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Report received blocks and delete hints to the Namenode
|
|
|
+ * Report received blocks and delete hints to the Namenode for each
|
|
|
+ * storage.
|
|
|
+ *
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
private void reportReceivedDeletedBlocks() throws IOException {
|
|
|
- // For each storage, check if there are newly received blocks and if
|
|
|
- // so then send an incremental report to the NameNode.
|
|
|
- for (Map.Entry<String, PerStoragePendingIncrementalBR> entry :
|
|
|
- pendingIncrementalBRperStorage.entrySet()) {
|
|
|
- final String storageUuid = entry.getKey();
|
|
|
- final PerStoragePendingIncrementalBR perStorageMap = entry.getValue();
|
|
|
- ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null;
|
|
|
- // TODO: We can probably use finer-grained synchronization now.
|
|
|
- synchronized (pendingIncrementalBRperStorage) {
|
|
|
+
|
|
|
+ // Generate a list of the pending reports for each storage under the lock
|
|
|
+ Map<String, ReceivedDeletedBlockInfo[]> blockArrays = Maps.newHashMap();
|
|
|
+ synchronized (pendingIncrementalBRperStorage) {
|
|
|
+ for (Map.Entry<String, PerStoragePendingIncrementalBR> entry :
|
|
|
+ pendingIncrementalBRperStorage.entrySet()) {
|
|
|
+ final String storageUuid = entry.getKey();
|
|
|
+ final PerStoragePendingIncrementalBR perStorageMap = entry.getValue();
|
|
|
+ ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null;
|
|
|
+
|
|
|
if (perStorageMap.getBlockInfoCount() > 0) {
|
|
|
// Send newly-received and deleted blockids to namenode
|
|
|
receivedAndDeletedBlockArray = perStorageMap.dequeueBlockInfos();
|
|
|
pendingReceivedRequests -= receivedAndDeletedBlockArray.length;
|
|
|
+ blockArrays.put(storageUuid, receivedAndDeletedBlockArray);
|
|
|
}
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- if (receivedAndDeletedBlockArray != null) {
|
|
|
- StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
|
|
|
- storageUuid, receivedAndDeletedBlockArray) };
|
|
|
- boolean success = false;
|
|
|
- try {
|
|
|
- bpNamenode.blockReceivedAndDeleted(bpRegistration, bpos.getBlockPoolId(),
|
|
|
- report);
|
|
|
- success = true;
|
|
|
- } finally {
|
|
|
+ // Send incremental block reports to the Namenode outside the lock
|
|
|
+ for (Map.Entry<String, ReceivedDeletedBlockInfo[]> entry :
|
|
|
+ blockArrays.entrySet()) {
|
|
|
+ final String storageUuid = entry.getKey();
|
|
|
+ final ReceivedDeletedBlockInfo[] rdbi = entry.getValue();
|
|
|
+
|
|
|
+ StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
|
|
|
+ storageUuid, rdbi) };
|
|
|
+ boolean success = false;
|
|
|
+ try {
|
|
|
+ bpNamenode.blockReceivedAndDeleted(bpRegistration,
|
|
|
+ bpos.getBlockPoolId(), report);
|
|
|
+ success = true;
|
|
|
+ } finally {
|
|
|
+ if (!success) {
|
|
|
synchronized (pendingIncrementalBRperStorage) {
|
|
|
- if (!success) {
|
|
|
- // If we didn't succeed in sending the report, put all of the
|
|
|
- // blocks back onto our queue, but only in the case where we
|
|
|
- // didn't put something newer in the meantime.
|
|
|
- perStorageMap.putMissingBlockInfos(receivedAndDeletedBlockArray);
|
|
|
- pendingReceivedRequests += perStorageMap.getBlockInfoCount();
|
|
|
- }
|
|
|
+ // If we didn't succeed in sending the report, put all of the
|
|
|
+ // blocks back onto our queue, but only in the case where we
|
|
|
+ // didn't put something newer in the meantime.
|
|
|
+ PerStoragePendingIncrementalBR perStorageMap =
|
|
|
+ pendingIncrementalBRperStorage.get(storageUuid);
|
|
|
+ perStorageMap.putMissingBlockInfos(rdbi);
|
|
|
+ pendingReceivedRequests += perStorageMap.getBlockInfoCount();
|
|
|
}
|
|
|
}
|
|
|
}
|