|
@@ -64,6 +64,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBloc
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.INodeId;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
|
@@ -542,7 +543,8 @@ public class BlockManager {
|
|
|
numReplicas.decommissionedAndDecommissioning();
|
|
|
|
|
|
if (block instanceof BlockInfoContiguous) {
|
|
|
- BlockCollection bc = ((BlockInfoContiguous) block).getBlockCollection();
|
|
|
+ long bcId = ((BlockInfoContiguous) block).getBlockCollectionId();
|
|
|
+ BlockCollection bc = namesystem.getBlockCollection(bcId);
|
|
|
String fileName = (bc == null) ? "[orphaned]" : bc.getName();
|
|
|
out.print(fileName + ": ");
|
|
|
}
|
|
@@ -1275,7 +1277,8 @@ public class BlockManager {
|
|
|
for (int priority = 0; priority < blocksToReplicate.size(); priority++) {
|
|
|
for (Block block : blocksToReplicate.get(priority)) {
|
|
|
// block should belong to a file
|
|
|
- bc = blocksMap.getBlockCollection(block);
|
|
|
+ long bcId = getBlockCollectionId(block);
|
|
|
+ bc = namesystem.getBlockCollection(bcId);
|
|
|
BlockInfoContiguous bi = getStoredBlock(block);
|
|
|
// abandoned block or block reopened for append
|
|
|
if (bc == null || bi == null
|
|
@@ -1364,7 +1367,8 @@ public class BlockManager {
|
|
|
int priority = rw.priority;
|
|
|
// Recheck since global lock was released
|
|
|
// block should belong to a file
|
|
|
- bc = blocksMap.getBlockCollection(block);
|
|
|
+ long bcId = getBlockCollectionId(block);
|
|
|
+ bc = namesystem.getBlockCollection(bcId);
|
|
|
// abandoned block or block reopened for append
|
|
|
if(bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
|
|
|
neededReplications.remove(block, priority); // remove from neededReplications
|
|
@@ -1806,18 +1810,18 @@ public class BlockManager {
|
|
|
if (metrics != null) {
|
|
|
metrics.addBlockReport((int) (endTime - startTime));
|
|
|
}
|
|
|
- blockLog.info("BLOCK* processReport: from storage {} node {}, " +
|
|
|
- "blocks: {}, hasStaleStorage: {}, processing time: {} msecs", storage
|
|
|
- .getStorageID(), nodeID, newReport.getNumberOfBlocks(),
|
|
|
+ blockLog.info(
|
|
|
+ "BLOCK* processReport: from storage {} node {}, " + "blocks: {}, hasStaleStorage: {}, processing time: {} msecs",
|
|
|
+ storage.getStorageID(), nodeID, newReport.getNumberOfBlocks(),
|
|
|
node.hasStaleStorages(), (endTime - startTime));
|
|
|
return !node.hasStaleStorages();
|
|
|
}
|
|
|
|
|
|
private void removeZombieReplicas(BlockReportContext context,
|
|
|
DatanodeStorageInfo zombie) {
|
|
|
- LOG.warn("processReport 0x{}: removing zombie storage {}, which no " +
|
|
|
- "longer exists on the DataNode.",
|
|
|
- Long.toHexString(context.getReportId()), zombie.getStorageID());
|
|
|
+ LOG.warn(
|
|
|
+ "processReport 0x{}: removing zombie storage {}, which no " + "longer exists on the DataNode.",
|
|
|
+ Long.toHexString(context.getReportId()), zombie.getStorageID());
|
|
|
assert(namesystem.hasWriteLock());
|
|
|
Iterator<BlockInfoContiguous> iter = zombie.getBlockIterator();
|
|
|
int prevBlocks = zombie.numBlocks();
|
|
@@ -1833,10 +1837,10 @@ public class BlockManager {
|
|
|
invalidateBlocks.remove(zombie.getDatanodeDescriptor(), block);
|
|
|
}
|
|
|
assert(zombie.numBlocks() == 0);
|
|
|
- LOG.warn("processReport 0x{}: removed {} replicas from storage {}, " +
|
|
|
- "which no longer exists on the DataNode.",
|
|
|
- Long.toHexString(context.getReportId()), prevBlocks,
|
|
|
- zombie.getStorageID());
|
|
|
+ LOG.warn(
|
|
|
+ "processReport 0x{}: removed {} replicas from storage {}, " + "which no longer exists on the DataNode.",
|
|
|
+ Long.toHexString(context.getReportId()), prevBlocks,
|
|
|
+ zombie.getStorageID());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2441,7 +2445,9 @@ public class BlockManager {
|
|
|
int numCurrentReplica = countLiveNodes(storedBlock);
|
|
|
if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
|
|
|
&& numCurrentReplica >= minReplication) {
|
|
|
- completeBlock(storedBlock.getBlockCollection(), storedBlock, false);
|
|
|
+ long bcId = storedBlock.getBlockCollectionId();
|
|
|
+ BlockCollection bc = namesystem.getBlockCollection(bcId);
|
|
|
+ completeBlock(bc, storedBlock, false);
|
|
|
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
|
|
|
// check whether safe replication is reached for the block
|
|
|
// only complete blocks are counted towards that.
|
|
@@ -2479,7 +2485,8 @@ public class BlockManager {
|
|
|
// it will happen in next block report otherwise.
|
|
|
return block;
|
|
|
}
|
|
|
- BlockCollection bc = storedBlock.getBlockCollection();
|
|
|
+ long bcId = storedBlock.getBlockCollectionId();
|
|
|
+ BlockCollection bc = namesystem.getBlockCollection(bcId);
|
|
|
assert bc != null : "Block must belong to a file";
|
|
|
|
|
|
// add block to the datanode
|
|
@@ -2813,8 +2820,8 @@ public class BlockManager {
|
|
|
+ " for " + src);
|
|
|
processOverReplicatedBlock(b, newRepl, null, null);
|
|
|
} else { // replication factor is increased
|
|
|
- LOG.info("Increasing replication from " + oldRepl + " to " + newRepl
|
|
|
- + " for " + src);
|
|
|
+ LOG.info(
|
|
|
+ "Increasing replication from " + oldRepl + " to " + newRepl + " for " + src);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2881,7 +2888,8 @@ public class BlockManager {
|
|
|
BlockPlacementPolicy replicator) {
|
|
|
assert namesystem.hasWriteLock();
|
|
|
// first form a rack to datanodes map and
|
|
|
- BlockCollection bc = getBlockCollection(b);
|
|
|
+ long bcId = getBlockCollectionId(b);
|
|
|
+ BlockCollection bc = namesystem.getBlockCollection(bcId);
|
|
|
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(bc.getStoragePolicyID());
|
|
|
final List<StorageType> excessTypes = storagePolicy.chooseExcess(
|
|
|
replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
|
|
@@ -3004,8 +3012,8 @@ public class BlockManager {
|
|
|
// necessary. In that case, put block on a possibly-will-
|
|
|
// be-replicated list.
|
|
|
//
|
|
|
- BlockCollection bc = blocksMap.getBlockCollection(block);
|
|
|
- if (bc != null) {
|
|
|
+ long bcId = getBlockCollectionId(block);
|
|
|
+ if (bcId != INodeId.INVALID_INODE_ID) {
|
|
|
namesystem.decrementSafeBlockCount(block);
|
|
|
updateNeededReplications(block, -1, 0);
|
|
|
}
|
|
@@ -3097,8 +3105,8 @@ public class BlockManager {
|
|
|
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
|
|
|
final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
|
|
|
|
|
- processReportedBlock(storageInfo, block, reportedState,
|
|
|
- toAdd, toInvalidate, toCorrupt, toUC);
|
|
|
+ processReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate,
|
|
|
+ toCorrupt, toUC);
|
|
|
// the block is only in one of the to-do lists
|
|
|
// if it is in none then data-node already has it
|
|
|
assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt.size() <= 1
|
|
@@ -3183,7 +3191,7 @@ public class BlockManager {
|
|
|
break;
|
|
|
}
|
|
|
blockLog.debug("BLOCK* block {}: {} is received from {}",
|
|
|
- rdbi.getStatus(), rdbi.getBlock(), nodeID);
|
|
|
+ rdbi.getStatus(), rdbi.getBlock(), nodeID);
|
|
|
}
|
|
|
blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: from "
|
|
|
+ "{} receiving: {}, received: {}, deleted: {}", nodeID, receiving,
|
|
@@ -3274,8 +3282,9 @@ public class BlockManager {
|
|
|
numOverReplicated++;
|
|
|
}
|
|
|
}
|
|
|
- LOG.info("Invalidated " + numOverReplicated + " over-replicated blocks on " +
|
|
|
- srcNode + " during recommissioning");
|
|
|
+ LOG.info(
|
|
|
+ "Invalidated " + numOverReplicated + " over-replicated blocks on " +
|
|
|
+ srcNode + " during recommissioning");
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -3302,11 +3311,11 @@ public class BlockManager {
|
|
|
}
|
|
|
|
|
|
LOG.warn("Node {} is dead " +
|
|
|
- "while decommission is in progress. Cannot be safely " +
|
|
|
- "decommissioned since there is risk of reduced " +
|
|
|
- "data durability or data loss. Either restart the failed node or" +
|
|
|
- " force decommissioning by removing, calling refreshNodes, " +
|
|
|
- "then re-adding to the excludes files.", node);
|
|
|
+ "while decommission is in progress. Cannot be safely " +
|
|
|
+ "decommissioned since there is risk of reduced " +
|
|
|
+ "data durability or data loss. Either restart the failed node or" +
|
|
|
+ " force decommissioning by removing, calling refreshNodes, " +
|
|
|
+ "then re-adding to the excludes files.", node);
|
|
|
return false;
|
|
|
}
|
|
|
|
|
@@ -3417,6 +3426,7 @@ public class BlockManager {
|
|
|
* otherwise, return the replication factor of the block.
|
|
|
*/
|
|
|
private int getReplication(Block block) {
|
|
|
+ assert namesystem.hasReadLock();
|
|
|
BlockInfoContiguous bi = blocksMap.getStoredBlock(block);
|
|
|
return bi == null ? 0 : bi.getReplication();
|
|
|
}
|
|
@@ -3517,8 +3527,9 @@ public class BlockManager {
|
|
|
return blocksMap.addBlockCollection(block, bc);
|
|
|
}
|
|
|
|
|
|
- public BlockCollection getBlockCollection(Block b) {
|
|
|
- return blocksMap.getBlockCollection(b);
|
|
|
+ public long getBlockCollectionId(Block b) {
|
|
|
+ BlockInfoContiguous bi = getStoredBlock(b);
|
|
|
+ return bi == null ? INodeId.INVALID_INODE_ID : bi.getBlockCollectionId();
|
|
|
}
|
|
|
|
|
|
/** @return an iterator of the datanodes. */
|