|
@@ -97,7 +97,6 @@ import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
|
import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
|
|
import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
|
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
|
-import org.apache.hadoop.hdfs.server.namenode.fgl.FSNamesystemLockMode;
|
|
|
|
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
|
|
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
|
|
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
|
|
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
|
|
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
|
|
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
|
|
@@ -121,6 +120,7 @@ import org.apache.hadoop.hdfs.server.namenode.CacheManager;
|
|
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
|
|
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
|
|
|
|
|
|
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
|
|
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
|
|
|
|
+import org.apache.hadoop.hdfs.util.RwLockMode;
|
|
import org.apache.hadoop.metrics2.util.MBeans;
|
|
import org.apache.hadoop.metrics2.util.MBeans;
|
|
import org.apache.hadoop.net.Node;
|
|
import org.apache.hadoop.net.Node;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
@@ -861,7 +861,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
/** Dump meta data to out. */
|
|
/** Dump meta data to out. */
|
|
public void metaSave(PrintWriter out) {
|
|
public void metaSave(PrintWriter out) {
|
|
- assert namesystem.hasReadLock(FSNamesystemLockMode.BM);
|
|
|
|
|
|
+ assert namesystem.hasReadLock(RwLockMode.BM);
|
|
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
|
|
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
|
|
final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
|
|
final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
|
|
datanodeManager.fetchDatanodes(live, dead, false);
|
|
datanodeManager.fetchDatanodes(live, dead, false);
|
|
@@ -1584,7 +1584,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
final boolean inSnapshot, FileEncryptionInfo feInfo,
|
|
final boolean inSnapshot, FileEncryptionInfo feInfo,
|
|
ErasureCodingPolicy ecPolicy)
|
|
ErasureCodingPolicy ecPolicy)
|
|
throws IOException {
|
|
throws IOException {
|
|
- assert namesystem.hasReadLock(FSNamesystemLockMode.BM);
|
|
|
|
|
|
+ assert namesystem.hasReadLock(RwLockMode.BM);
|
|
if (blocks == null) {
|
|
if (blocks == null) {
|
|
return null;
|
|
return null;
|
|
} else if (blocks.length == 0) {
|
|
} else if (blocks.length == 0) {
|
|
@@ -1830,7 +1830,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
/** Remove the blocks associated to the given DatanodeStorageInfo. */
|
|
/** Remove the blocks associated to the given DatanodeStorageInfo. */
|
|
void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) {
|
|
void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) {
|
|
- assert namesystem.hasWriteLock(FSNamesystemLockMode.BM);
|
|
|
|
|
|
+ assert namesystem.hasWriteLock(RwLockMode.BM);
|
|
final Iterator<BlockInfo> it = storageInfo.getBlockIterator();
|
|
final Iterator<BlockInfo> it = storageInfo.getBlockIterator();
|
|
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
|
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
|
while(it.hasNext()) {
|
|
while(it.hasNext()) {
|
|
@@ -1901,7 +1901,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
*/
|
|
*/
|
|
public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
|
|
public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
|
|
final DatanodeInfo dn, String storageID, String reason) throws IOException {
|
|
final DatanodeInfo dn, String storageID, String reason) throws IOException {
|
|
- assert namesystem.hasWriteLock(FSNamesystemLockMode.BM);
|
|
|
|
|
|
+ assert namesystem.hasWriteLock(RwLockMode.BM);
|
|
final Block reportedBlock = blk.getLocalBlock();
|
|
final Block reportedBlock = blk.getLocalBlock();
|
|
final BlockInfo storedBlock = getStoredBlock(reportedBlock);
|
|
final BlockInfo storedBlock = getStoredBlock(reportedBlock);
|
|
if (storedBlock == null) {
|
|
if (storedBlock == null) {
|
|
@@ -2107,9 +2107,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|
*/
|
|
*/
|
|
int computeBlockReconstructionWork(int blocksToProcess) {
|
|
int computeBlockReconstructionWork(int blocksToProcess) {
|
|
List<List<BlockInfo>> blocksToReconstruct = null;
|
|
List<List<BlockInfo>> blocksToReconstruct = null;
|
|
- // TODO: Change it to readLock(FSNamesystemLockMode.BM)
|
|
|
|
|
|
+ // TODO: Change it to readLock(RwLockMode.BM)
|
|
// since chooseLowRedundancyBlocks is thread safe.
|
|
// since chooseLowRedundancyBlocks is thread safe.
|
|
- namesystem.writeLock(FSNamesystemLockMode.BM);
|
|
|
|
|
|
+ namesystem.writeLock(RwLockMode.BM);
|
|
try {
|
|
try {
|
|
boolean reset = false;
|
|
boolean reset = false;
|
|
if (replQueueResetToHeadThreshold > 0) {
|
|
if (replQueueResetToHeadThreshold > 0) {
|
|
@@ -2124,7 +2124,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
blocksToReconstruct = neededReconstruction
|
|
blocksToReconstruct = neededReconstruction
|
|
.chooseLowRedundancyBlocks(blocksToProcess, reset);
|
|
.chooseLowRedundancyBlocks(blocksToProcess, reset);
|
|
} finally {
|
|
} finally {
|
|
- namesystem.writeUnlock(FSNamesystemLockMode.BM, "computeBlockReconstructionWork");
|
|
|
|
|
|
+ namesystem.writeUnlock(RwLockMode.BM, "computeBlockReconstructionWork");
|
|
}
|
|
}
|
|
return computeReconstructionWorkForBlocks(blocksToReconstruct);
|
|
return computeReconstructionWorkForBlocks(blocksToReconstruct);
|
|
}
|
|
}
|
|
@@ -2143,9 +2143,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|
List<BlockReconstructionWork> reconWork = new ArrayList<>();
|
|
List<BlockReconstructionWork> reconWork = new ArrayList<>();
|
|
|
|
|
|
// Step 1: categorize at-risk blocks into replication and EC tasks
|
|
// Step 1: categorize at-risk blocks into replication and EC tasks
|
|
- // TODO: Change to readLock(FSNamesystemLockMode.GLOBAL)
|
|
|
|
|
|
+ // TODO: Change to readLock(RwLockMode.GLOBAL)
|
|
// since neededReconstruction is thread safe.
|
|
// since neededReconstruction is thread safe.
|
|
- namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
|
|
|
|
|
|
+ namesystem.writeLock(RwLockMode.GLOBAL);
|
|
try {
|
|
try {
|
|
synchronized (neededReconstruction) {
|
|
synchronized (neededReconstruction) {
|
|
for (int priority = 0; priority < blocksToReconstruct
|
|
for (int priority = 0; priority < blocksToReconstruct
|
|
@@ -2160,7 +2160,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} finally {
|
|
} finally {
|
|
- namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "computeReconstructionWorkForBlocks");
|
|
|
|
|
|
+ namesystem.writeUnlock(RwLockMode.GLOBAL, "computeReconstructionWorkForBlocks");
|
|
}
|
|
}
|
|
|
|
|
|
// Step 2: choose target nodes for each reconstruction task
|
|
// Step 2: choose target nodes for each reconstruction task
|
|
@@ -2185,9 +2185,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
|
|
|
|
// Step 3: add tasks to the DN
|
|
// Step 3: add tasks to the DN
|
|
- // TODO: Change to readLock(FSNamesystemLockMode.BM)
|
|
|
|
|
|
+ // TODO: Change to readLock(RwLockMode.BM)
|
|
// since pendingReconstruction and neededReconstruction are thread safe.
|
|
// since pendingReconstruction and neededReconstruction are thread safe.
|
|
- namesystem.writeLock(FSNamesystemLockMode.BM);
|
|
|
|
|
|
+ namesystem.writeLock(RwLockMode.BM);
|
|
try {
|
|
try {
|
|
for (BlockReconstructionWork rw : reconWork) {
|
|
for (BlockReconstructionWork rw : reconWork) {
|
|
final DatanodeStorageInfo[] targets = rw.getTargets();
|
|
final DatanodeStorageInfo[] targets = rw.getTargets();
|
|
@@ -2203,7 +2203,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} finally {
|
|
} finally {
|
|
- namesystem.writeUnlock(FSNamesystemLockMode.BM, "computeReconstructionWorkForBlocks");
|
|
|
|
|
|
+ namesystem.writeUnlock(RwLockMode.BM, "computeReconstructionWorkForBlocks");
|
|
}
|
|
}
|
|
|
|
|
|
if (blockLog.isDebugEnabled()) {
|
|
if (blockLog.isDebugEnabled()) {
|
|
@@ -2694,9 +2694,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|
void processPendingReconstructions() {
|
|
void processPendingReconstructions() {
|
|
BlockInfo[] timedOutItems = pendingReconstruction.getTimedOutBlocks();
|
|
BlockInfo[] timedOutItems = pendingReconstruction.getTimedOutBlocks();
|
|
if (timedOutItems != null) {
|
|
if (timedOutItems != null) {
|
|
- // TODO: Change to readLock(FSNamesystemLockMode.BM)
|
|
|
|
|
|
+ // TODO: Change to readLock(RwLockMode.BM)
|
|
// since neededReconstruction is thread safe.
|
|
// since neededReconstruction is thread safe.
|
|
- namesystem.writeLock(FSNamesystemLockMode.BM);
|
|
|
|
|
|
+ namesystem.writeLock(RwLockMode.BM);
|
|
try {
|
|
try {
|
|
for (int i = 0; i < timedOutItems.length; i++) {
|
|
for (int i = 0; i < timedOutItems.length; i++) {
|
|
/*
|
|
/*
|
|
@@ -2715,7 +2715,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} finally {
|
|
} finally {
|
|
- namesystem.writeUnlock(FSNamesystemLockMode.BM, "processPendingReconstructions");
|
|
|
|
|
|
+ namesystem.writeUnlock(RwLockMode.BM, "processPendingReconstructions");
|
|
}
|
|
}
|
|
/* If we know the target datanodes where the replication timedout,
|
|
/* If we know the target datanodes where the replication timedout,
|
|
* we could invoke decBlocksScheduled() on it. Its ok for now.
|
|
* we could invoke decBlocksScheduled() on it. Its ok for now.
|
|
@@ -2724,7 +2724,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
|
|
|
|
public long requestBlockReportLeaseId(DatanodeRegistration nodeReg) {
|
|
public long requestBlockReportLeaseId(DatanodeRegistration nodeReg) {
|
|
- assert namesystem.hasReadLock(FSNamesystemLockMode.BM);
|
|
|
|
|
|
+ assert namesystem.hasReadLock(RwLockMode.BM);
|
|
DatanodeDescriptor node = null;
|
|
DatanodeDescriptor node = null;
|
|
try {
|
|
try {
|
|
node = datanodeManager.getDatanode(nodeReg);
|
|
node = datanodeManager.getDatanode(nodeReg);
|
|
@@ -2795,7 +2795,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
* list of blocks that need to be removed from blocksMap
|
|
* list of blocks that need to be removed from blocksMap
|
|
*/
|
|
*/
|
|
public void removeBlocksAndUpdateSafemodeTotal(BlocksMapUpdateInfo blocks) {
|
|
public void removeBlocksAndUpdateSafemodeTotal(BlocksMapUpdateInfo blocks) {
|
|
- assert namesystem.hasWriteLock(FSNamesystemLockMode.BM);
|
|
|
|
|
|
+ assert namesystem.hasWriteLock(RwLockMode.BM);
|
|
// In the case that we are a Standby tailing edits from the
|
|
// In the case that we are a Standby tailing edits from the
|
|
// active while in safe-mode, we need to track the total number
|
|
// active while in safe-mode, we need to track the total number
|
|
// of blocks and safe blocks in the system.
|
|
// of blocks and safe blocks in the system.
|
|
@@ -2910,7 +2910,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
final DatanodeStorage storage,
|
|
final DatanodeStorage storage,
|
|
final BlockListAsLongs newReport,
|
|
final BlockListAsLongs newReport,
|
|
BlockReportContext context) throws IOException {
|
|
BlockReportContext context) throws IOException {
|
|
- namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
|
|
|
|
|
|
+ namesystem.writeLock(RwLockMode.GLOBAL);
|
|
final long startTime = Time.monotonicNow(); //after acquiring write lock
|
|
final long startTime = Time.monotonicNow(); //after acquiring write lock
|
|
final long endTime;
|
|
final long endTime;
|
|
DatanodeDescriptor node;
|
|
DatanodeDescriptor node;
|
|
@@ -2968,7 +2968,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
storageInfo.receivedBlockReport();
|
|
storageInfo.receivedBlockReport();
|
|
} finally {
|
|
} finally {
|
|
endTime = Time.monotonicNow();
|
|
endTime = Time.monotonicNow();
|
|
- namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "processReport");
|
|
|
|
|
|
+ namesystem.writeUnlock(RwLockMode.GLOBAL, "processReport");
|
|
}
|
|
}
|
|
|
|
|
|
if (blockLog.isDebugEnabled()) {
|
|
if (blockLog.isDebugEnabled()) {
|
|
@@ -3012,7 +3012,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
public void removeBRLeaseIfNeeded(final DatanodeID nodeID,
|
|
public void removeBRLeaseIfNeeded(final DatanodeID nodeID,
|
|
final BlockReportContext context) throws IOException {
|
|
final BlockReportContext context) throws IOException {
|
|
- namesystem.writeLock(FSNamesystemLockMode.BM);
|
|
|
|
|
|
+ namesystem.writeLock(RwLockMode.BM);
|
|
DatanodeDescriptor node;
|
|
DatanodeDescriptor node;
|
|
try {
|
|
try {
|
|
node = datanodeManager.getDatanode(nodeID);
|
|
node = datanodeManager.getDatanode(nodeID);
|
|
@@ -3030,7 +3030,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} finally {
|
|
} finally {
|
|
- namesystem.writeUnlock(FSNamesystemLockMode.BM, "removeBRLeaseIfNeeded");
|
|
|
|
|
|
+ namesystem.writeUnlock(RwLockMode.BM, "removeBRLeaseIfNeeded");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -3041,7 +3041,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
if (getPostponedMisreplicatedBlocksCount() == 0) {
|
|
if (getPostponedMisreplicatedBlocksCount() == 0) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
- namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
|
|
|
|
|
|
+ namesystem.writeLock(RwLockMode.GLOBAL);
|
|
long startTime = Time.monotonicNow();
|
|
long startTime = Time.monotonicNow();
|
|
long startSize = postponedMisreplicatedBlocks.size();
|
|
long startSize = postponedMisreplicatedBlocks.size();
|
|
try {
|
|
try {
|
|
@@ -3070,7 +3070,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
postponedMisreplicatedBlocks.addAll(rescannedMisreplicatedBlocks);
|
|
postponedMisreplicatedBlocks.addAll(rescannedMisreplicatedBlocks);
|
|
rescannedMisreplicatedBlocks.clear();
|
|
rescannedMisreplicatedBlocks.clear();
|
|
long endSize = postponedMisreplicatedBlocks.size();
|
|
long endSize = postponedMisreplicatedBlocks.size();
|
|
- namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL,
|
|
|
|
|
|
+ namesystem.writeUnlock(RwLockMode.GLOBAL,
|
|
"rescanPostponedMisreplicatedBlocks");
|
|
"rescanPostponedMisreplicatedBlocks");
|
|
LOG.info("Rescan of postponedMisreplicatedBlocks completed in {}" +
|
|
LOG.info("Rescan of postponedMisreplicatedBlocks completed in {}" +
|
|
" msecs. {} blocks are left. {} blocks were removed.",
|
|
" msecs. {} blocks are left. {} blocks were removed.",
|
|
@@ -3114,7 +3114,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
// TODO: Change to readLock(FSNamesysteLockMode.BM) since invalidateBlocks is thread safe.
|
|
// TODO: Change to readLock(FSNamesysteLockMode.BM) since invalidateBlocks is thread safe.
|
|
- namesystem.writeLock(FSNamesystemLockMode.BM);
|
|
|
|
|
|
+ namesystem.writeLock(RwLockMode.BM);
|
|
long now = Time.monotonicNow();
|
|
long now = Time.monotonicNow();
|
|
int processed = 0;
|
|
int processed = 0;
|
|
try {
|
|
try {
|
|
@@ -3168,7 +3168,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} finally {
|
|
} finally {
|
|
- namesystem.writeUnlock(FSNamesystemLockMode.BM, "processTimedOutExcessBlocks");
|
|
|
|
|
|
+ namesystem.writeUnlock(RwLockMode.BM, "processTimedOutExcessBlocks");
|
|
LOG.info("processTimedOutExcessBlocks {} msecs.", (Time.monotonicNow() - now));
|
|
LOG.info("processTimedOutExcessBlocks {} msecs.", (Time.monotonicNow() - now));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -3224,7 +3224,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
BlockInfo block,
|
|
BlockInfo block,
|
|
long oldGenerationStamp, long oldNumBytes,
|
|
long oldGenerationStamp, long oldNumBytes,
|
|
DatanodeStorageInfo[] newStorages) throws IOException {
|
|
DatanodeStorageInfo[] newStorages) throws IOException {
|
|
- assert namesystem.hasWriteLock(FSNamesystemLockMode.BM);
|
|
|
|
|
|
+ assert namesystem.hasWriteLock(RwLockMode.BM);
|
|
BlockToMarkCorrupt b = null;
|
|
BlockToMarkCorrupt b = null;
|
|
if (block.getGenerationStamp() != oldGenerationStamp) {
|
|
if (block.getGenerationStamp() != oldGenerationStamp) {
|
|
b = new BlockToMarkCorrupt(oldBlock, block, oldGenerationStamp,
|
|
b = new BlockToMarkCorrupt(oldBlock, block, oldGenerationStamp,
|
|
@@ -3274,7 +3274,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
final DatanodeStorageInfo storageInfo,
|
|
final DatanodeStorageInfo storageInfo,
|
|
final BlockListAsLongs report) throws IOException {
|
|
final BlockListAsLongs report) throws IOException {
|
|
if (report == null) return;
|
|
if (report == null) return;
|
|
- assert (namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL));
|
|
|
|
|
|
+ assert (namesystem.hasWriteLock(RwLockMode.GLOBAL));
|
|
assert (storageInfo.getBlockReportCount() == 0);
|
|
assert (storageInfo.getBlockReportCount() == 0);
|
|
|
|
|
|
for (BlockReportReplica iblk : report) {
|
|
for (BlockReportReplica iblk : report) {
|
|
@@ -3742,7 +3742,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported,
|
|
private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported,
|
|
DatanodeStorageInfo storageInfo)
|
|
DatanodeStorageInfo storageInfo)
|
|
throws IOException {
|
|
throws IOException {
|
|
- assert (storedBlock != null && namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL));
|
|
|
|
|
|
+ assert (storedBlock != null && namesystem.hasWriteLock(RwLockMode.GLOBAL));
|
|
if (!namesystem.isInStartupSafeMode()
|
|
if (!namesystem.isInStartupSafeMode()
|
|
|| isPopulatingReplQueues()) {
|
|
|| isPopulatingReplQueues()) {
|
|
addStoredBlock(storedBlock, reported, storageInfo, null, false);
|
|
addStoredBlock(storedBlock, reported, storageInfo, null, false);
|
|
@@ -3777,7 +3777,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
DatanodeDescriptor delNodeHint,
|
|
DatanodeDescriptor delNodeHint,
|
|
boolean logEveryBlock)
|
|
boolean logEveryBlock)
|
|
throws IOException {
|
|
throws IOException {
|
|
- assert block != null && namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL);
|
|
|
|
|
|
+ assert block != null && namesystem.hasWriteLock(RwLockMode.GLOBAL);
|
|
BlockInfo storedBlock;
|
|
BlockInfo storedBlock;
|
|
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
|
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
|
if (!block.isComplete()) {
|
|
if (!block.isComplete()) {
|
|
@@ -3954,7 +3954,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
* extra or low redundancy. Place it into the respective queue.
|
|
* extra or low redundancy. Place it into the respective queue.
|
|
*/
|
|
*/
|
|
public void processMisReplicatedBlocks() {
|
|
public void processMisReplicatedBlocks() {
|
|
- assert namesystem.hasWriteLock(FSNamesystemLockMode.BM);
|
|
|
|
|
|
+ assert namesystem.hasWriteLock(RwLockMode.BM);
|
|
stopReconstructionInitializer();
|
|
stopReconstructionInitializer();
|
|
neededReconstruction.clear();
|
|
neededReconstruction.clear();
|
|
reconstructionQueuesInitializer = new Daemon() {
|
|
reconstructionQueuesInitializer = new Daemon() {
|
|
@@ -4013,7 +4013,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
while (namesystem.isRunning() && !Thread.currentThread().isInterrupted()) {
|
|
while (namesystem.isRunning() && !Thread.currentThread().isInterrupted()) {
|
|
int processed = 0;
|
|
int processed = 0;
|
|
- namesystem.writeLockInterruptibly(FSNamesystemLockMode.GLOBAL);
|
|
|
|
|
|
+ namesystem.writeLockInterruptibly(RwLockMode.GLOBAL);
|
|
try {
|
|
try {
|
|
while (processed < numBlocksPerIteration && blocksItr.hasNext()) {
|
|
while (processed < numBlocksPerIteration && blocksItr.hasNext()) {
|
|
BlockInfo block = blocksItr.next();
|
|
BlockInfo block = blocksItr.next();
|
|
@@ -4072,7 +4072,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
} finally {
|
|
} finally {
|
|
- namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "processMisReplicatesAsync");
|
|
|
|
|
|
+ namesystem.writeUnlock(RwLockMode.GLOBAL, "processMisReplicatesAsync");
|
|
LOG.info("Reconstruction queues initialisation progress: {}, total number of blocks " +
|
|
LOG.info("Reconstruction queues initialisation progress: {}, total number of blocks " +
|
|
"processed: {}/{}", reconstructionQueuesInitProgress, totalProcessed, totalBlocks);
|
|
"processed: {}/{}", reconstructionQueuesInitProgress, totalProcessed, totalBlocks);
|
|
// Make sure it is out of the write lock for sufficiently long time.
|
|
// Make sure it is out of the write lock for sufficiently long time.
|
|
@@ -4119,7 +4119,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
&& !Thread.currentThread().isInterrupted()
|
|
&& !Thread.currentThread().isInterrupted()
|
|
&& iter.hasNext()) {
|
|
&& iter.hasNext()) {
|
|
int limit = processed + numBlocksPerIteration;
|
|
int limit = processed + numBlocksPerIteration;
|
|
- namesystem.writeLockInterruptibly(FSNamesystemLockMode.GLOBAL);
|
|
|
|
|
|
+ namesystem.writeLockInterruptibly(RwLockMode.GLOBAL);
|
|
try {
|
|
try {
|
|
while (iter.hasNext() && processed < limit) {
|
|
while (iter.hasNext() && processed < limit) {
|
|
BlockInfo blk = iter.next();
|
|
BlockInfo blk = iter.next();
|
|
@@ -4129,7 +4129,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
blk, r);
|
|
blk, r);
|
|
}
|
|
}
|
|
} finally {
|
|
} finally {
|
|
- namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "processMisReplicatedBlocks");
|
|
|
|
|
|
+ namesystem.writeUnlock(RwLockMode.GLOBAL, "processMisReplicatedBlocks");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} catch (InterruptedException ex) {
|
|
} catch (InterruptedException ex) {
|
|
@@ -4225,7 +4225,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
private boolean processExtraRedundancyBlockWithoutPostpone(final BlockInfo block,
|
|
private boolean processExtraRedundancyBlockWithoutPostpone(final BlockInfo block,
|
|
final short replication, final DatanodeDescriptor addedNode,
|
|
final short replication, final DatanodeDescriptor addedNode,
|
|
DatanodeDescriptor delNodeHint) {
|
|
DatanodeDescriptor delNodeHint) {
|
|
- assert namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL);
|
|
|
|
|
|
+ assert namesystem.hasWriteLock(RwLockMode.GLOBAL);
|
|
if (addedNode == delNodeHint) {
|
|
if (addedNode == delNodeHint) {
|
|
delNodeHint = null;
|
|
delNodeHint = null;
|
|
}
|
|
}
|
|
@@ -4270,9 +4270,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|
DatanodeDescriptor addedNode,
|
|
DatanodeDescriptor addedNode,
|
|
DatanodeDescriptor delNodeHint) {
|
|
DatanodeDescriptor delNodeHint) {
|
|
// bc.getStoragePolicyID() needs FSReadLock.
|
|
// bc.getStoragePolicyID() needs FSReadLock.
|
|
- // TODO: Change to hasReadLock(FSNamesystemLockMode.GLOBAL)
|
|
|
|
|
|
+ // TODO: Change to hasReadLock(RwLockMode.GLOBAL)
|
|
// since chooseExcessRedundancyContiguous is thread safe.
|
|
// since chooseExcessRedundancyContiguous is thread safe.
|
|
- assert namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL);
|
|
|
|
|
|
+ assert namesystem.hasWriteLock(RwLockMode.GLOBAL);
|
|
// first form a rack to datanodes map and
|
|
// first form a rack to datanodes map and
|
|
BlockCollection bc = getBlockCollection(storedBlock);
|
|
BlockCollection bc = getBlockCollection(storedBlock);
|
|
if (storedBlock.isStriped()) {
|
|
if (storedBlock.isStriped()) {
|
|
@@ -4447,7 +4447,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
*/
|
|
*/
|
|
public void removeStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) {
|
|
public void removeStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) {
|
|
blockLog.debug("BLOCK* removeStoredBlock: {} from {}", storedBlock, node);
|
|
blockLog.debug("BLOCK* removeStoredBlock: {} from {}", storedBlock, node);
|
|
- assert (namesystem.hasWriteLock(FSNamesystemLockMode.BM));
|
|
|
|
|
|
+ assert (namesystem.hasWriteLock(RwLockMode.BM));
|
|
{
|
|
{
|
|
if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) {
|
|
if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) {
|
|
blockLog.debug("BLOCK* removeStoredBlock: {} has already been removed from node {}",
|
|
blockLog.debug("BLOCK* removeStoredBlock: {} has already been removed from node {}",
|
|
@@ -4641,7 +4641,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
*/
|
|
*/
|
|
public void processIncrementalBlockReport(final DatanodeID nodeID,
|
|
public void processIncrementalBlockReport(final DatanodeID nodeID,
|
|
final StorageReceivedDeletedBlocks srdb) throws IOException {
|
|
final StorageReceivedDeletedBlocks srdb) throws IOException {
|
|
- assert namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL);
|
|
|
|
|
|
+ assert namesystem.hasWriteLock(RwLockMode.GLOBAL);
|
|
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
|
|
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
|
|
if (node == null || !node.isRegistered()) {
|
|
if (node == null || !node.isRegistered()) {
|
|
blockLog.warn("BLOCK* processIncrementalBlockReport"
|
|
blockLog.warn("BLOCK* processIncrementalBlockReport"
|
|
@@ -4892,15 +4892,15 @@ public class BlockManager implements BlockStatsMXBean {
|
|
// When called by tests like TestDefaultBlockPlacementPolicy.
|
|
// When called by tests like TestDefaultBlockPlacementPolicy.
|
|
// testPlacementWithLocalRackNodesDecommissioned, it is not protected by
|
|
// testPlacementWithLocalRackNodesDecommissioned, it is not protected by
|
|
// lock, only when called by DatanodeManager.refreshNodes have writeLock
|
|
// lock, only when called by DatanodeManager.refreshNodes have writeLock
|
|
- if (namesystem.hasWriteLock(FSNamesystemLockMode.GLOBAL)) {
|
|
|
|
- namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL,
|
|
|
|
|
|
+ if (namesystem.hasWriteLock(RwLockMode.GLOBAL)) {
|
|
|
|
+ namesystem.writeUnlock(RwLockMode.GLOBAL,
|
|
"processExtraRedundancyBlocksOnInService");
|
|
"processExtraRedundancyBlocksOnInService");
|
|
try {
|
|
try {
|
|
Thread.sleep(1);
|
|
Thread.sleep(1);
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
Thread.currentThread().interrupt();
|
|
Thread.currentThread().interrupt();
|
|
}
|
|
}
|
|
- namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
|
|
|
|
|
|
+ namesystem.writeLock(RwLockMode.GLOBAL);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
LOG.info("Invalidated {} extra redundancy blocks on {} after "
|
|
LOG.info("Invalidated {} extra redundancy blocks on {} after "
|
|
@@ -4964,7 +4964,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
|
|
|
|
public void removeBlock(BlockInfo block) {
|
|
public void removeBlock(BlockInfo block) {
|
|
- assert namesystem.hasWriteLock(FSNamesystemLockMode.BM);
|
|
|
|
|
|
+ assert namesystem.hasWriteLock(RwLockMode.BM);
|
|
// No need to ACK blocks that are being removed entirely
|
|
// No need to ACK blocks that are being removed entirely
|
|
// from the namespace, since the removal of the associated
|
|
// from the namespace, since the removal of the associated
|
|
// file already removes them from the block map below.
|
|
// file already removes them from the block map below.
|
|
@@ -5007,9 +5007,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|
/** updates a block in needed reconstruction queue. */
|
|
/** updates a block in needed reconstruction queue. */
|
|
private void updateNeededReconstructions(final BlockInfo block,
|
|
private void updateNeededReconstructions(final BlockInfo block,
|
|
final int curReplicasDelta, int expectedReplicasDelta) {
|
|
final int curReplicasDelta, int expectedReplicasDelta) {
|
|
- // TODO: Change to readLock(FSNamesystemLockMode.BM)
|
|
|
|
|
|
+ // TODO: Change to readLock(RwLockMode.BM)
|
|
// since pendingReconstruction and neededReconstruction are thread safe.
|
|
// since pendingReconstruction and neededReconstruction are thread safe.
|
|
- namesystem.writeLock(FSNamesystemLockMode.BM);
|
|
|
|
|
|
+ namesystem.writeLock(RwLockMode.BM);
|
|
try {
|
|
try {
|
|
if (!isPopulatingReplQueues() || !block.isComplete()) {
|
|
if (!isPopulatingReplQueues() || !block.isComplete()) {
|
|
return;
|
|
return;
|
|
@@ -5028,7 +5028,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
repl.outOfServiceReplicas(), oldExpectedReplicas);
|
|
repl.outOfServiceReplicas(), oldExpectedReplicas);
|
|
}
|
|
}
|
|
} finally {
|
|
} finally {
|
|
- namesystem.writeUnlock(FSNamesystemLockMode.BM, "updateNeededReconstructions");
|
|
|
|
|
|
+ namesystem.writeUnlock(RwLockMode.BM, "updateNeededReconstructions");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -5061,8 +5061,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
private int invalidateWorkForOneNode(DatanodeInfo dn) {
|
|
private int invalidateWorkForOneNode(DatanodeInfo dn) {
|
|
final List<Block> toInvalidate;
|
|
final List<Block> toInvalidate;
|
|
|
|
|
|
- // TODO: Change to readLock(FSNamesystemLockMode.BM) since invalidateBlocks is thread safe.
|
|
|
|
- namesystem.writeLock(FSNamesystemLockMode.BM);
|
|
|
|
|
|
+ // TODO: Change to readLock(RwLockMode.BM) since invalidateBlocks is thread safe.
|
|
|
|
+ namesystem.writeLock(RwLockMode.BM);
|
|
try {
|
|
try {
|
|
// blocks should not be replicated or removed if safe mode is on
|
|
// blocks should not be replicated or removed if safe mode is on
|
|
if (namesystem.isInSafeMode()) {
|
|
if (namesystem.isInSafeMode()) {
|
|
@@ -5086,7 +5086,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
return 0;
|
|
return 0;
|
|
}
|
|
}
|
|
} finally {
|
|
} finally {
|
|
- namesystem.writeUnlock(FSNamesystemLockMode.BM, "invalidateWorkForOneNode");
|
|
|
|
|
|
+ namesystem.writeUnlock(RwLockMode.BM, "invalidateWorkForOneNode");
|
|
}
|
|
}
|
|
if (blockLog.isDebugEnabled()) {
|
|
if (blockLog.isDebugEnabled()) {
|
|
blockLog.debug("BLOCK* {}: ask {} to delete {}",
|
|
blockLog.debug("BLOCK* {}: ask {} to delete {}",
|
|
@@ -5314,7 +5314,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
private void remove(long time) {
|
|
private void remove(long time) {
|
|
if (checkToDeleteIterator()) {
|
|
if (checkToDeleteIterator()) {
|
|
- namesystem.writeLock(FSNamesystemLockMode.BM);
|
|
|
|
|
|
+ namesystem.writeLock(RwLockMode.BM);
|
|
try {
|
|
try {
|
|
while (toDeleteIterator.hasNext()) {
|
|
while (toDeleteIterator.hasNext()) {
|
|
removeBlock(toDeleteIterator.next());
|
|
removeBlock(toDeleteIterator.next());
|
|
@@ -5325,7 +5325,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} finally {
|
|
} finally {
|
|
- namesystem.writeUnlock(FSNamesystemLockMode.BM, "markedDeleteBlockScrubberThread");
|
|
|
|
|
|
+ namesystem.writeUnlock(RwLockMode.BM, "markedDeleteBlockScrubberThread");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -5440,12 +5440,12 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
// Update counters
|
|
// Update counters
|
|
// TODO: Make corruptReplicas thread safe to remove this lock.
|
|
// TODO: Make corruptReplicas thread safe to remove this lock.
|
|
- namesystem.writeLock(FSNamesystemLockMode.BM);
|
|
|
|
|
|
+ namesystem.writeLock(RwLockMode.BM);
|
|
try {
|
|
try {
|
|
this.updateState();
|
|
this.updateState();
|
|
this.scheduledReplicationBlocksCount = workFound;
|
|
this.scheduledReplicationBlocksCount = workFound;
|
|
} finally {
|
|
} finally {
|
|
- namesystem.writeUnlock(FSNamesystemLockMode.BM, "computeDatanodeWork");
|
|
|
|
|
|
+ namesystem.writeUnlock(RwLockMode.BM, "computeDatanodeWork");
|
|
}
|
|
}
|
|
workFound += this.computeInvalidateWork(nodesToProcess);
|
|
workFound += this.computeInvalidateWork(nodesToProcess);
|
|
return workFound;
|
|
return workFound;
|
|
@@ -5672,7 +5672,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
// batch as many operations in the write lock until the queue
|
|
// batch as many operations in the write lock until the queue
|
|
// runs dry, or the max lock hold is reached.
|
|
// runs dry, or the max lock hold is reached.
|
|
int processed = 0;
|
|
int processed = 0;
|
|
- namesystem.writeLock(FSNamesystemLockMode.GLOBAL);
|
|
|
|
|
|
+ namesystem.writeLock(RwLockMode.GLOBAL);
|
|
metrics.setBlockOpsQueued(queue.size() + 1);
|
|
metrics.setBlockOpsQueued(queue.size() + 1);
|
|
try {
|
|
try {
|
|
long start = Time.monotonicNow();
|
|
long start = Time.monotonicNow();
|
|
@@ -5685,7 +5685,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
action = queue.poll();
|
|
action = queue.poll();
|
|
} while (action != null);
|
|
} while (action != null);
|
|
} finally {
|
|
} finally {
|
|
- namesystem.writeUnlock(FSNamesystemLockMode.GLOBAL, "processQueue");
|
|
|
|
|
|
+ namesystem.writeUnlock(RwLockMode.GLOBAL, "processQueue");
|
|
metrics.addBlockOpsBatched(processed - 1);
|
|
metrics.addBlockOpsBatched(processed - 1);
|
|
}
|
|
}
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|