|
@@ -94,6 +94,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
|
|
|
|
|
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
|
|
|
|
|
|
+import org.apache.hadoop.hdfs.util.RwLock;
|
|
|
import org.apache.hadoop.metrics2.util.MBeans;
|
|
|
import org.apache.hadoop.net.Node;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
@@ -112,7 +113,7 @@ import org.slf4j.LoggerFactory;
|
|
|
* Keeps information related to the blocks stored in the Hadoop cluster.
|
|
|
*/
|
|
|
@InterfaceAudience.Private
|
|
|
-public class BlockManager implements BlockStatsMXBean {
|
|
|
+public class BlockManager implements RwLock, BlockStatsMXBean {
|
|
|
|
|
|
public static final Logger LOG = LoggerFactory.getLogger(BlockManager.class);
|
|
|
public static final Logger blockLog = NameNode.blockStateChangeLog;
|
|
@@ -125,6 +126,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
private final Namesystem namesystem;
|
|
|
|
|
|
+ private final BlockManagerLock lock;
|
|
|
private final DatanodeManager datanodeManager;
|
|
|
private final HeartbeatManager heartbeatManager;
|
|
|
private final BlockTokenSecretManager blockTokenSecretManager;
|
|
@@ -302,6 +304,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
public BlockManager(final Namesystem namesystem, final Configuration conf)
|
|
|
throws IOException {
|
|
|
this.namesystem = namesystem;
|
|
|
+ this.lock = new BlockManagerLock(namesystem);
|
|
|
datanodeManager = new DatanodeManager(this, namesystem, conf);
|
|
|
heartbeatManager = datanodeManager.getHeartbeatManager();
|
|
|
|
|
@@ -519,7 +522,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
/** Dump meta data to out. */
|
|
|
public void metaSave(PrintWriter out) {
|
|
|
- assert namesystem.hasWriteLock();
|
|
|
+ assert hasWriteLock();
|
|
|
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
|
|
|
final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
|
|
|
datanodeManager.fetchDatanodes(live, dead, false);
|
|
@@ -551,7 +554,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
// Dump all datanodes
|
|
|
getDatanodeManager().datanodeDump(out);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Dump the metadata for the given block in a human-readable
|
|
|
* form.
|
|
@@ -580,12 +583,12 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
out.print(fileName + ": ");
|
|
|
}
|
|
|
// l: == live:, d: == decommissioned c: == corrupt e: == excess
|
|
|
- out.print(block + ((usableReplicas > 0)? "" : " MISSING") +
|
|
|
+ out.print(block + ((usableReplicas > 0)? "" : " MISSING") +
|
|
|
" (replicas:" +
|
|
|
" l: " + numReplicas.liveReplicas() +
|
|
|
" d: " + numReplicas.decommissionedAndDecommissioning() +
|
|
|
" c: " + numReplicas.corruptReplicas() +
|
|
|
- " e: " + numReplicas.excessReplicas() + ") ");
|
|
|
+ " e: " + numReplicas.excessReplicas() + ") ");
|
|
|
|
|
|
Collection<DatanodeDescriptor> corruptNodes =
|
|
|
corruptReplicas.getNodes(block);
|
|
@@ -956,7 +959,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
final boolean inSnapshot, FileEncryptionInfo feInfo,
|
|
|
ErasureCodingPolicy ecPolicy)
|
|
|
throws IOException {
|
|
|
- assert namesystem.hasReadLock();
|
|
|
+ assert hasReadLock();
|
|
|
if (blocks == null) {
|
|
|
return null;
|
|
|
} else if (blocks.length == 0) {
|
|
@@ -990,6 +993,41 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public void readLock() {
|
|
|
+ lock.readLock().lock();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void readUnlock() {
|
|
|
+ lock.readLock().unlock();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean hasReadLock() {
|
|
|
+ return lock.hasReadLock();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean hasWriteLock() {
|
|
|
+ return lock.hasWriteLock();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void writeLock() {
|
|
|
+ lock.writeLock().lock();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void writeLockInterruptibly() throws InterruptedException {
|
|
|
+ lock.writeLock().lockInterruptibly();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void writeUnlock() {
|
|
|
+ lock.writeLock().unlock();
|
|
|
+ }
|
|
|
+
|
|
|
/** @return current access keys. */
|
|
|
public ExportedBlockKeys getBlockKeys() {
|
|
|
return isBlockTokenEnabled()? blockTokenSecretManager.exportKeys()
|
|
@@ -1105,12 +1143,12 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
public BlocksWithLocations getBlocks(DatanodeID datanode, long size
|
|
|
) throws IOException {
|
|
|
namesystem.checkOperation(OperationCategory.READ);
|
|
|
- namesystem.readLock();
|
|
|
+ readLock();
|
|
|
try {
|
|
|
namesystem.checkOperation(OperationCategory.READ);
|
|
|
return getBlocksWithLocations(datanode, size);
|
|
|
} finally {
|
|
|
- namesystem.readUnlock();
|
|
|
+ readUnlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1173,7 +1211,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
/** Remove the blocks associated to the given DatanodeStorageInfo. */
|
|
|
void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) {
|
|
|
- assert namesystem.hasWriteLock();
|
|
|
+ assert hasWriteLock();
|
|
|
final Iterator<BlockInfo> it = storageInfo.getBlockIterator();
|
|
|
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
|
|
while(it.hasNext()) {
|
|
@@ -1250,7 +1288,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
*/
|
|
|
public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
|
|
|
final DatanodeInfo dn, String storageID, String reason) throws IOException {
|
|
|
- assert namesystem.hasWriteLock();
|
|
|
+ assert hasWriteLock();
|
|
|
final Block reportedBlock = blk.getLocalBlock();
|
|
|
final BlockInfo storedBlock = getStoredBlock(reportedBlock);
|
|
|
if (storedBlock == null) {
|
|
@@ -1428,13 +1466,13 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
*/
|
|
|
int computeBlockRecoveryWork(int blocksToProcess) {
|
|
|
List<List<BlockInfo>> blocksToReplicate = null;
|
|
|
- namesystem.writeLock();
|
|
|
+ writeLock();
|
|
|
try {
|
|
|
// Choose the blocks to be replicated
|
|
|
blocksToReplicate = neededReplications
|
|
|
.chooseUnderReplicatedBlocks(blocksToProcess);
|
|
|
} finally {
|
|
|
- namesystem.writeUnlock();
|
|
|
+ writeUnlock();
|
|
|
}
|
|
|
return computeRecoveryWorkForBlocks(blocksToReplicate);
|
|
|
}
|
|
@@ -1452,7 +1490,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
List<BlockRecoveryWork> recovWork = new LinkedList<>();
|
|
|
|
|
|
// Step 1: categorize at-risk blocks into replication and EC tasks
|
|
|
- namesystem.writeLock();
|
|
|
+ writeLock();
|
|
|
try {
|
|
|
synchronized (neededReplications) {
|
|
|
for (int priority = 0; priority < blocksToRecover.size(); priority++) {
|
|
@@ -1465,7 +1503,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
|
- namesystem.writeUnlock();
|
|
|
+ writeUnlock();
|
|
|
}
|
|
|
|
|
|
// Step 2: choose target nodes for each recovery task
|
|
@@ -1487,7 +1525,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
|
|
|
// Step 3: add tasks to the DN
|
|
|
- namesystem.writeLock();
|
|
|
+ writeLock();
|
|
|
try {
|
|
|
for(BlockRecoveryWork rw : recovWork){
|
|
|
final DatanodeStorageInfo[] targets = rw.getTargets();
|
|
@@ -1503,7 +1541,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
|
- namesystem.writeUnlock();
|
|
|
+ writeUnlock();
|
|
|
}
|
|
|
|
|
|
if (blockLog.isInfoEnabled()) {
|
|
@@ -1877,7 +1915,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
private void processPendingReplications() {
|
|
|
BlockInfo[] timedOutItems = pendingReplications.getTimedOutBlocks();
|
|
|
if (timedOutItems != null) {
|
|
|
- namesystem.writeLock();
|
|
|
+ writeLock();
|
|
|
try {
|
|
|
for (int i = 0; i < timedOutItems.length; i++) {
|
|
|
/*
|
|
@@ -1895,7 +1933,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
|
- namesystem.writeUnlock();
|
|
|
+ writeUnlock();
|
|
|
}
|
|
|
/* If we know the target datanodes where the replication timedout,
|
|
|
* we could invoke decBlocksScheduled() on it. Its ok for now.
|
|
@@ -1904,7 +1942,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
|
|
|
public long requestBlockReportLeaseId(DatanodeRegistration nodeReg) {
|
|
|
- assert namesystem.hasReadLock();
|
|
|
+ assert hasReadLock();
|
|
|
DatanodeDescriptor node = null;
|
|
|
try {
|
|
|
node = datanodeManager.getDatanode(nodeReg);
|
|
@@ -1965,7 +2003,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
final DatanodeStorage storage,
|
|
|
final BlockListAsLongs newReport, BlockReportContext context,
|
|
|
boolean lastStorageInRpc) throws IOException {
|
|
|
- namesystem.writeLock();
|
|
|
+ writeLock();
|
|
|
final long startTime = Time.monotonicNow(); //after acquiring write lock
|
|
|
final long endTime;
|
|
|
DatanodeDescriptor node;
|
|
@@ -2041,7 +2079,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
} finally {
|
|
|
endTime = Time.monotonicNow();
|
|
|
- namesystem.writeUnlock();
|
|
|
+ writeUnlock();
|
|
|
}
|
|
|
|
|
|
if (invalidatedBlocks != null) {
|
|
@@ -2068,7 +2106,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
LOG.warn("processReport 0x{}: removing zombie storage {}, which no " +
|
|
|
"longer exists on the DataNode.",
|
|
|
Long.toHexString(context.getReportId()), zombie.getStorageID());
|
|
|
- assert(namesystem.hasWriteLock());
|
|
|
+ assert hasWriteLock();
|
|
|
Iterator<BlockInfo> iter = zombie.getBlockIterator();
|
|
|
int prevBlocks = zombie.numBlocks();
|
|
|
while (iter.hasNext()) {
|
|
@@ -2102,7 +2140,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
long startTimeRescanPostponedMisReplicatedBlocks = Time.monotonicNow();
|
|
|
long startPostponedMisReplicatedBlocksCount =
|
|
|
getPostponedMisreplicatedBlocksCount();
|
|
|
- namesystem.writeLock();
|
|
|
+ writeLock();
|
|
|
try {
|
|
|
// blocksPerRescan is the configured number of blocks per rescan.
|
|
|
// Randomly select blocksPerRescan consecutive blocks from the HashSet
|
|
@@ -2155,7 +2193,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
|
- namesystem.writeUnlock();
|
|
|
+ writeUnlock();
|
|
|
long endPostponedMisReplicatedBlocksCount =
|
|
|
getPostponedMisreplicatedBlocksCount();
|
|
|
LOG.info("Rescan of postponedMisreplicatedBlocks completed in " +
|
|
@@ -2217,7 +2255,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
BlockInfo block,
|
|
|
long oldGenerationStamp, long oldNumBytes,
|
|
|
DatanodeStorageInfo[] newStorages) throws IOException {
|
|
|
- assert namesystem.hasWriteLock();
|
|
|
+ assert hasWriteLock();
|
|
|
BlockToMarkCorrupt b = null;
|
|
|
if (block.getGenerationStamp() != oldGenerationStamp) {
|
|
|
b = new BlockToMarkCorrupt(oldBlock, block, oldGenerationStamp,
|
|
@@ -2265,7 +2303,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
final DatanodeStorageInfo storageInfo,
|
|
|
final BlockListAsLongs report) throws IOException {
|
|
|
if (report == null) return;
|
|
|
- assert (namesystem.hasWriteLock());
|
|
|
+ assert (hasWriteLock());
|
|
|
assert (storageInfo.getBlockReportCount() == 0);
|
|
|
|
|
|
for (BlockReportReplica iblk : report) {
|
|
@@ -2703,7 +2741,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported,
|
|
|
DatanodeStorageInfo storageInfo)
|
|
|
throws IOException {
|
|
|
- assert (storedBlock != null && namesystem.hasWriteLock());
|
|
|
+ assert (storedBlock != null && hasWriteLock());
|
|
|
if (!namesystem.isInStartupSafeMode()
|
|
|
|| isPopulatingReplQueues()) {
|
|
|
addStoredBlock(storedBlock, reported, storageInfo, null, false);
|
|
@@ -2738,7 +2776,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
DatanodeDescriptor delNodeHint,
|
|
|
boolean logEveryBlock)
|
|
|
throws IOException {
|
|
|
- assert block != null && namesystem.hasWriteLock();
|
|
|
+ assert block != null && hasWriteLock();
|
|
|
BlockInfo storedBlock;
|
|
|
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
|
|
if (!block.isComplete()) {
|
|
@@ -2900,7 +2938,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
* over or under replicated. Place it into the respective queue.
|
|
|
*/
|
|
|
public void processMisReplicatedBlocks() {
|
|
|
- assert namesystem.hasWriteLock();
|
|
|
+ assert hasWriteLock();
|
|
|
stopReplicationInitializer();
|
|
|
neededReplications.clear();
|
|
|
replicationQueuesInitializer = new Daemon() {
|
|
@@ -2957,7 +2995,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
while (namesystem.isRunning() && !Thread.currentThread().isInterrupted()) {
|
|
|
int processed = 0;
|
|
|
- namesystem.writeLockInterruptibly();
|
|
|
+ writeLockInterruptibly();
|
|
|
try {
|
|
|
while (processed < numBlocksPerIteration && blocksItr.hasNext()) {
|
|
|
BlockInfo block = blocksItr.next();
|
|
@@ -3011,7 +3049,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
break;
|
|
|
}
|
|
|
} finally {
|
|
|
- namesystem.writeUnlock();
|
|
|
+ writeUnlock();
|
|
|
// Make sure it is out of the write lock for sufficiently long time.
|
|
|
Thread.sleep(sleepDuration);
|
|
|
}
|
|
@@ -3109,7 +3147,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
private void processOverReplicatedBlock(final BlockInfo block,
|
|
|
final short replication, final DatanodeDescriptor addedNode,
|
|
|
DatanodeDescriptor delNodeHint) {
|
|
|
- assert namesystem.hasWriteLock();
|
|
|
+ assert hasWriteLock();
|
|
|
if (addedNode == delNodeHint) {
|
|
|
delNodeHint = null;
|
|
|
}
|
|
@@ -3147,7 +3185,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
BlockInfo storedBlock, short replication,
|
|
|
DatanodeDescriptor addedNode,
|
|
|
DatanodeDescriptor delNodeHint) {
|
|
|
- assert namesystem.hasWriteLock();
|
|
|
+ assert hasWriteLock();
|
|
|
// first form a rack to datanodes map and
|
|
|
BlockCollection bc = getBlockCollection(storedBlock);
|
|
|
if (storedBlock.isStriped()) {
|
|
@@ -3285,7 +3323,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
|
|
|
private void addToExcessReplicate(DatanodeInfo dn, BlockInfo storedBlock) {
|
|
|
- assert namesystem.hasWriteLock();
|
|
|
+ assert hasWriteLock();
|
|
|
LightWeightHashSet<BlockInfo> excessBlocks = excessReplicateMap.get(
|
|
|
dn.getDatanodeUuid());
|
|
|
if (excessBlocks == null) {
|
|
@@ -3316,7 +3354,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
*/
|
|
|
public void removeStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) {
|
|
|
blockLog.debug("BLOCK* removeStoredBlock: {} from {}", storedBlock, node);
|
|
|
- assert (namesystem.hasWriteLock());
|
|
|
+ assert hasWriteLock();
|
|
|
{
|
|
|
if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) {
|
|
|
blockLog.debug("BLOCK* removeStoredBlock: {} has already been" +
|
|
@@ -3493,7 +3531,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
*/
|
|
|
public void processIncrementalBlockReport(final DatanodeID nodeID,
|
|
|
final StorageReceivedDeletedBlocks srdb) throws IOException {
|
|
|
- assert namesystem.hasWriteLock();
|
|
|
+ assert hasWriteLock();
|
|
|
int received = 0;
|
|
|
int deleted = 0;
|
|
|
int receiving = 0;
|
|
@@ -3701,7 +3739,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
|
|
|
public void removeBlock(BlockInfo block) {
|
|
|
- assert namesystem.hasWriteLock();
|
|
|
+ assert hasWriteLock();
|
|
|
// No need to ACK blocks that are being removed entirely
|
|
|
// from the namespace, since the removal of the associated
|
|
|
// file already removes them from the block map below.
|
|
@@ -3735,7 +3773,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
/** updates a block in under replication queue */
|
|
|
private void updateNeededReplications(final BlockInfo block,
|
|
|
final int curReplicasDelta, int expectedReplicasDelta) {
|
|
|
- namesystem.writeLock();
|
|
|
+ writeLock();
|
|
|
try {
|
|
|
if (!isPopulatingReplQueues()) {
|
|
|
return;
|
|
@@ -3753,7 +3791,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
repl.decommissionedAndDecommissioning(), oldExpectedReplicas);
|
|
|
}
|
|
|
} finally {
|
|
|
- namesystem.writeUnlock();
|
|
|
+ writeUnlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -3815,7 +3853,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
private int invalidateWorkForOneNode(DatanodeInfo dn) {
|
|
|
final List<Block> toInvalidate;
|
|
|
|
|
|
- namesystem.writeLock();
|
|
|
+ writeLock();
|
|
|
try {
|
|
|
// blocks should not be replicated or removed if safe mode is on
|
|
|
if (namesystem.isInSafeMode()) {
|
|
@@ -3839,7 +3877,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
return 0;
|
|
|
}
|
|
|
} finally {
|
|
|
- namesystem.writeUnlock();
|
|
|
+ writeUnlock();
|
|
|
}
|
|
|
blockLog.debug("BLOCK* {}: ask {} to delete {}", getClass().getSimpleName(),
|
|
|
dn, toInvalidate);
|
|
@@ -4042,12 +4080,12 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
int workFound = this.computeBlockRecoveryWork(blocksToProcess);
|
|
|
|
|
|
// Update counters
|
|
|
- namesystem.writeLock();
|
|
|
+ writeLock();
|
|
|
try {
|
|
|
this.updateState();
|
|
|
this.scheduledReplicationBlocksCount = workFound;
|
|
|
} finally {
|
|
|
- namesystem.writeUnlock();
|
|
|
+ writeUnlock();
|
|
|
}
|
|
|
workFound += this.computeInvalidateWork(nodesToProcess);
|
|
|
return workFound;
|