|
@@ -2102,7 +2102,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
*/
|
|
|
int computeBlockReconstructionWork(int blocksToProcess) {
|
|
|
List<List<BlockInfo>> blocksToReconstruct = null;
|
|
|
- namesystem.writeLock();
|
|
|
+ // TODO: Change it to readLock(FSNamesystemLockMode.BM)
|
|
|
+ // since chooseLowRedundancyBlocks is thread safe.
|
|
|
+ namesystem.writeLock(FSNamesystemLockMode.BM);
|
|
|
try {
|
|
|
boolean reset = false;
|
|
|
if (replQueueResetToHeadThreshold > 0) {
|
|
@@ -2117,7 +2119,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
blocksToReconstruct = neededReconstruction
|
|
|
.chooseLowRedundancyBlocks(blocksToProcess, reset);
|
|
|
} finally {
|
|
|
- namesystem.writeUnlock("computeBlockReconstructionWork");
|
|
|
+ namesystem.writeUnlock(FSNamesystemLockMode.BM, "computeBlockReconstructionWork");
|
|
|
}
|
|
|
return computeReconstructionWorkForBlocks(blocksToReconstruct);
|
|
|
}
|
|
@@ -2136,7 +2138,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
List<BlockReconstructionWork> reconWork = new ArrayList<>();
|
|
|
|
|
|
// Step 1: categorize at-risk blocks into replication and EC tasks
|
|
|
- namesystem.writeLock();
|
|
|
+ // TODO: Change to readLock(FSNamesystemLockMode.GLOBAL)
|
|
|
+ // since neededReconstruction is thread safe.
|
|
|
+ namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
|
|
|
try {
|
|
|
synchronized (neededReconstruction) {
|
|
|
for (int priority = 0; priority < blocksToReconstruct
|
|
@@ -2151,7 +2155,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
|
- namesystem.writeUnlock("computeReconstructionWorkForBlocks");
|
|
|
+ namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "computeReconstructionWorkForBlocks");
|
|
|
}
|
|
|
|
|
|
// Step 2: choose target nodes for each reconstruction task
|
|
@@ -2176,7 +2180,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
|
|
|
// Step 3: add tasks to the DN
|
|
|
- namesystem.writeLock();
|
|
|
+ // TODO: Change to readLock(FSNamesystemLockMode.BM)
|
|
|
+ // since pendingReconstruction and neededReconstruction are thread safe.
|
|
|
+ namesystem.writeLock(FSNamesystemLockMode.BM);
|
|
|
try {
|
|
|
for (BlockReconstructionWork rw : reconWork) {
|
|
|
final DatanodeStorageInfo[] targets = rw.getTargets();
|
|
@@ -2192,7 +2198,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
|
- namesystem.writeUnlock("computeReconstructionWorkForBlocks");
|
|
|
+ namesystem.writeUnlock(FSNamesystemLockMode.BM, "computeReconstructionWorkForBlocks");
|
|
|
}
|
|
|
|
|
|
if (blockLog.isDebugEnabled()) {
|
|
@@ -2681,7 +2687,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
void processPendingReconstructions() {
|
|
|
BlockInfo[] timedOutItems = pendingReconstruction.getTimedOutBlocks();
|
|
|
if (timedOutItems != null) {
|
|
|
- namesystem.writeLock();
|
|
|
+ // TODO: Change to readLock(FSNamesystemLockMode.BM)
|
|
|
+ // since neededReconstruction is thread safe.
|
|
|
+ namesystem.writeLock(FSNamesystemLockMode.BM);
|
|
|
try {
|
|
|
for (int i = 0; i < timedOutItems.length; i++) {
|
|
|
/*
|
|
@@ -2700,7 +2708,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
|
- namesystem.writeUnlock("processPendingReconstructions");
|
|
|
+ namesystem.writeUnlock(FSNamesystemLockMode.BM, "processPendingReconstructions");
|
|
|
}
|
|
|
/* If we know the target datanodes where the replication timedout,
|
|
|
* we could invoke decBlocksScheduled() on it. Its ok for now.
|
|
@@ -2895,7 +2903,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
final DatanodeStorage storage,
|
|
|
final BlockListAsLongs newReport,
|
|
|
BlockReportContext context) throws IOException {
|
|
|
- namesystem.writeLock();
|
|
|
+ namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
|
|
|
final long startTime = Time.monotonicNow(); //after acquiring write lock
|
|
|
final long endTime;
|
|
|
DatanodeDescriptor node;
|
|
@@ -2953,7 +2961,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
storageInfo.receivedBlockReport();
|
|
|
} finally {
|
|
|
endTime = Time.monotonicNow();
|
|
|
- namesystem.writeUnlock("processReport");
|
|
|
+ namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "processReport");
|
|
|
}
|
|
|
|
|
|
if (blockLog.isDebugEnabled()) {
|
|
@@ -3026,7 +3034,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
if (getPostponedMisreplicatedBlocksCount() == 0) {
|
|
|
return;
|
|
|
}
|
|
|
- namesystem.writeLock();
|
|
|
+ namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
|
|
|
long startTime = Time.monotonicNow();
|
|
|
long startSize = postponedMisreplicatedBlocks.size();
|
|
|
try {
|
|
@@ -3055,7 +3063,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
postponedMisreplicatedBlocks.addAll(rescannedMisreplicatedBlocks);
|
|
|
rescannedMisreplicatedBlocks.clear();
|
|
|
long endSize = postponedMisreplicatedBlocks.size();
|
|
|
- namesystem.writeUnlock("rescanPostponedMisreplicatedBlocks");
|
|
|
+ namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL,
|
|
|
+ "rescanPostponedMisreplicatedBlocks");
|
|
|
LOG.info("Rescan of postponedMisreplicatedBlocks completed in {}" +
|
|
|
" msecs. {} blocks are left. {} blocks were removed.",
|
|
|
(Time.monotonicNow() - startTime), endSize, (startSize - endSize));
|
|
@@ -3097,7 +3106,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
if (excessRedundancyMap.size() == 0) {
|
|
|
return;
|
|
|
}
|
|
|
- namesystem.writeLock();
|
|
|
+ // TODO: Change to readLock(FSNamesysteLockMode.BM) since invalidateBlocks is thread safe.
|
|
|
+ namesystem.writeLock(FSNamesystemLockMode.BM);
|
|
|
long now = Time.monotonicNow();
|
|
|
int processed = 0;
|
|
|
try {
|
|
@@ -3151,7 +3161,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
|
- namesystem.writeUnlock("processTimedOutExcessBlocks");
|
|
|
+ namesystem.writeUnlock(FSNamesystemLockMode.BM, "processTimedOutExcessBlocks");
|
|
|
LOG.info("processTimedOutExcessBlocks {} msecs.", (Time.monotonicNow() - now));
|
|
|
}
|
|
|
}
|
|
@@ -3257,7 +3267,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
final DatanodeStorageInfo storageInfo,
|
|
|
final BlockListAsLongs report) throws IOException {
|
|
|
if (report == null) return;
|
|
|
- assert (namesystem.hasWriteLock());
|
|
|
+ assert (namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL));
|
|
|
assert (storageInfo.getBlockReportCount() == 0);
|
|
|
|
|
|
for (BlockReportReplica iblk : report) {
|
|
@@ -3309,6 +3319,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
// OpenFileBlocks only inside snapshots also will be added to safemode
|
|
|
// threshold. So we need to update such blocks to safemode
|
|
|
// refer HDFS-5283
|
|
|
+ // isInSnapshot involves the full path, so it needs FSReadLock.
|
|
|
if (namesystem.isInSnapshot(storedBlock.getBlockCollectionId())) {
|
|
|
int numOfReplicas = storedBlock.getUnderConstructionFeature()
|
|
|
.getNumExpectedLocations();
|
|
@@ -3724,7 +3735,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported,
|
|
|
DatanodeStorageInfo storageInfo)
|
|
|
throws IOException {
|
|
|
- assert (storedBlock != null && namesystem.hasWriteLock());
|
|
|
+ assert (storedBlock != null && namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL));
|
|
|
if (!namesystem.isInStartupSafeMode()
|
|
|
|| isPopulatingReplQueues()) {
|
|
|
addStoredBlock(storedBlock, reported, storageInfo, null, false);
|
|
@@ -3759,7 +3770,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
DatanodeDescriptor delNodeHint,
|
|
|
boolean logEveryBlock)
|
|
|
throws IOException {
|
|
|
- assert block != null && namesystem.hasWriteLock();
|
|
|
+ assert block != null && namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL);
|
|
|
BlockInfo storedBlock;
|
|
|
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
|
|
if (!block.isComplete()) {
|
|
@@ -3995,7 +4006,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
while (namesystem.isRunning() && !Thread.currentThread().isInterrupted()) {
|
|
|
int processed = 0;
|
|
|
- namesystem.writeLockInterruptibly();
|
|
|
+ namesystem.writeLockInterruptibly(FSNamesystemLockMode.GLOBAL);
|
|
|
try {
|
|
|
while (processed < numBlocksPerIteration && blocksItr.hasNext()) {
|
|
|
BlockInfo block = blocksItr.next();
|
|
@@ -4054,7 +4065,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
break;
|
|
|
}
|
|
|
} finally {
|
|
|
- namesystem.writeUnlock("processMisReplicatesAsync");
|
|
|
+ namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "processMisReplicatesAsync");
|
|
|
LOG.info("Reconstruction queues initialisation progress: {}, total number of blocks " +
|
|
|
"processed: {}/{}", reconstructionQueuesInitProgress, totalProcessed, totalBlocks);
|
|
|
// Make sure it is out of the write lock for sufficiently long time.
|
|
@@ -4207,7 +4218,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
private boolean processExtraRedundancyBlockWithoutPostpone(final BlockInfo block,
|
|
|
final short replication, final DatanodeDescriptor addedNode,
|
|
|
DatanodeDescriptor delNodeHint) {
|
|
|
- assert namesystem.hasWriteLock();
|
|
|
+ assert namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL);
|
|
|
if (addedNode == delNodeHint) {
|
|
|
delNodeHint = null;
|
|
|
}
|
|
@@ -4251,7 +4262,10 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
BlockInfo storedBlock, short replication,
|
|
|
DatanodeDescriptor addedNode,
|
|
|
DatanodeDescriptor delNodeHint) {
|
|
|
- assert namesystem.hasWriteLock();
|
|
|
+ // bc.getStoragePolicyID() needs FSReadLock.
|
|
|
+ // TODO: Change to hasReadLock(FSNamesystemLockMode.GLOBAL)
|
|
|
+ // since chooseExcessRedundancyContiguous is thread safe.
|
|
|
+ assert namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL);
|
|
|
// first form a rack to datanodes map and
|
|
|
BlockCollection bc = getBlockCollection(storedBlock);
|
|
|
if (storedBlock.isStriped()) {
|
|
@@ -4620,7 +4634,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
*/
|
|
|
public void processIncrementalBlockReport(final DatanodeID nodeID,
|
|
|
final StorageReceivedDeletedBlocks srdb) throws IOException {
|
|
|
- assert namesystem.hasWriteLock();
|
|
|
+ assert namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL);
|
|
|
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
|
|
|
if (node == null || !node.isRegistered()) {
|
|
|
blockLog.warn("BLOCK* processIncrementalBlockReport"
|
|
@@ -4984,6 +4998,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
/** updates a block in needed reconstruction queue. */
|
|
|
private void updateNeededReconstructions(final BlockInfo block,
|
|
|
final int curReplicasDelta, int expectedReplicasDelta) {
|
|
|
+ // TODO: Change to readLock(FSNamesystemLockMode.BM)
|
|
|
+ // since pendingReconstruction and neededReconstruction are thread safe.
|
|
|
namesystem.writeLock(FSNamesystemLockMode.BM);
|
|
|
try {
|
|
|
if (!isPopulatingReplQueues() || !block.isComplete()) {
|
|
@@ -5035,8 +5051,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
*/
|
|
|
private int invalidateWorkForOneNode(DatanodeInfo dn) {
|
|
|
final List<Block> toInvalidate;
|
|
|
-
|
|
|
- namesystem.writeLock();
|
|
|
+
|
|
|
+ // TODO: Change to readLock(FSNamesystemLockMode.BM) since invalidateBlocks is thread safe.
|
|
|
+ namesystem.writeLock(FSNamesystemLockMode.BM);
|
|
|
try {
|
|
|
// blocks should not be replicated or removed if safe mode is on
|
|
|
if (namesystem.isInSafeMode()) {
|
|
@@ -5060,7 +5077,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
return 0;
|
|
|
}
|
|
|
} finally {
|
|
|
- namesystem.writeUnlock("invalidateWorkForOneNode");
|
|
|
+ namesystem.writeUnlock(FSNamesystemLockMode.BM, "invalidateWorkForOneNode");
|
|
|
}
|
|
|
if (blockLog.isDebugEnabled()) {
|
|
|
blockLog.debug("BLOCK* {}: ask {} to delete {}",
|
|
@@ -5283,7 +5300,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
private void remove(long time) {
|
|
|
if (checkToDeleteIterator()) {
|
|
|
- namesystem.writeLock();
|
|
|
+ namesystem.writeLock(FSNamesystemLockMode.BM);
|
|
|
try {
|
|
|
while (toDeleteIterator.hasNext()) {
|
|
|
removeBlock(toDeleteIterator.next());
|
|
@@ -5294,7 +5311,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
|
- namesystem.writeUnlock("markedDeleteBlockScrubberThread");
|
|
|
+ namesystem.writeUnlock(FSNamesystemLockMode.BM, "markedDeleteBlockScrubberThread");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -5408,12 +5425,13 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
int workFound = this.computeBlockReconstructionWork(blocksToProcess);
|
|
|
|
|
|
// Update counters
|
|
|
- namesystem.writeLock();
|
|
|
+ // TODO: Make corruptReplicas thread safe to remove this lock.
|
|
|
+ namesystem.writeLock(FSNamesystemLockMode.BM);
|
|
|
try {
|
|
|
this.updateState();
|
|
|
this.scheduledReplicationBlocksCount = workFound;
|
|
|
} finally {
|
|
|
- namesystem.writeUnlock("computeDatanodeWork");
|
|
|
+ namesystem.writeUnlock(FSNamesystemLockMode.BM, "computeDatanodeWork");
|
|
|
}
|
|
|
workFound += this.computeInvalidateWork(nodesToProcess);
|
|
|
return workFound;
|