|
@@ -34,6 +34,7 @@ import java.util.Map;
|
|
|
import java.util.Queue;
|
|
|
import java.util.Set;
|
|
|
import java.util.TreeMap;
|
|
|
+import java.util.TreeSet;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -44,6 +45,7 @@ import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
import org.apache.hadoop.hdfs.HAUtil;
|
|
|
+import org.apache.hadoop.hdfs.StorageType;
|
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
|
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
|
|
@@ -70,8 +72,10 @@ import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
|
|
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
|
|
|
import org.apache.hadoop.net.Node;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
@@ -470,8 +474,8 @@ public class BlockManager {
|
|
|
private void dumpBlockMeta(Block block, PrintWriter out) {
|
|
|
List<DatanodeDescriptor> containingNodes =
|
|
|
new ArrayList<DatanodeDescriptor>();
|
|
|
- List<DatanodeDescriptor> containingLiveReplicasNodes =
|
|
|
- new ArrayList<DatanodeDescriptor>();
|
|
|
+ List<DatanodeStorageInfo> containingLiveReplicasNodes =
|
|
|
+ new ArrayList<DatanodeStorageInfo>();
|
|
|
|
|
|
NumberReplicas numReplicas = new NumberReplicas();
|
|
|
// source node returned is not used
|
|
@@ -498,9 +502,8 @@ public class BlockManager {
|
|
|
Collection<DatanodeDescriptor> corruptNodes =
|
|
|
corruptReplicas.getNodes(block);
|
|
|
|
|
|
- for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
|
|
|
- jt.hasNext();) {
|
|
|
- DatanodeDescriptor node = jt.next();
|
|
|
+ for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
|
|
|
+ final DatanodeDescriptor node = storage.getDatanodeDescriptor();
|
|
|
String state = "";
|
|
|
if (corruptNodes != null && corruptNodes.contains(node)) {
|
|
|
state = "(corrupt)";
|
|
@@ -509,7 +512,7 @@ public class BlockManager {
|
|
|
state = "(decommissioned)";
|
|
|
}
|
|
|
|
|
|
- if (node.areBlockContentsStale()) {
|
|
|
+ if (storage.areBlockContentsStale()) {
|
|
|
state += " (block deletions maybe out of date)";
|
|
|
}
|
|
|
out.print(" " + node + state + " : ");
|
|
@@ -660,10 +663,9 @@ public class BlockManager {
|
|
|
assert oldBlock == getStoredBlock(oldBlock) :
|
|
|
"last block of the file is not in blocksMap";
|
|
|
|
|
|
- DatanodeDescriptor[] targets = getNodes(oldBlock);
|
|
|
+ DatanodeStorageInfo[] targets = getStorages(oldBlock);
|
|
|
|
|
|
- BlockInfoUnderConstruction ucBlock =
|
|
|
- bc.setLastBlock(oldBlock, targets);
|
|
|
+ BlockInfoUnderConstruction ucBlock = bc.setLastBlock(oldBlock, targets);
|
|
|
blocksMap.replaceBlock(ucBlock);
|
|
|
|
|
|
// Remove block from replication queue.
|
|
@@ -673,9 +675,8 @@ public class BlockManager {
|
|
|
pendingReplications.remove(ucBlock);
|
|
|
|
|
|
// remove this block from the list of pending blocks to be deleted.
|
|
|
- for (DatanodeDescriptor dd : targets) {
|
|
|
- String datanodeId = dd.getStorageID();
|
|
|
- invalidateBlocks.remove(datanodeId, oldBlock);
|
|
|
+ for (DatanodeStorageInfo storage : targets) {
|
|
|
+ invalidateBlocks.remove(storage.getStorageID(), oldBlock);
|
|
|
}
|
|
|
|
|
|
// Adjust safe-mode totals, since under-construction blocks don't
|
|
@@ -694,18 +695,17 @@ public class BlockManager {
|
|
|
/**
|
|
|
* Get all valid locations of the block
|
|
|
*/
|
|
|
- private List<String> getValidLocations(Block block) {
|
|
|
- ArrayList<String> machineSet =
|
|
|
- new ArrayList<String>(blocksMap.numNodes(block));
|
|
|
- for(Iterator<DatanodeDescriptor> it =
|
|
|
- blocksMap.nodeIterator(block); it.hasNext();) {
|
|
|
- String storageID = it.next().getStorageID();
|
|
|
+ private List<DatanodeStorageInfo> getValidLocations(Block block) {
|
|
|
+ final List<DatanodeStorageInfo> locations
|
|
|
+ = new ArrayList<DatanodeStorageInfo>(blocksMap.numNodes(block));
|
|
|
+ for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
|
|
|
+ final String storageID = storage.getStorageID();
|
|
|
// filter invalidate replicas
|
|
|
if(!invalidateBlocks.contains(storageID, block)) {
|
|
|
- machineSet.add(storageID);
|
|
|
+ locations.add(storage);
|
|
|
}
|
|
|
}
|
|
|
- return machineSet;
|
|
|
+ return locations;
|
|
|
}
|
|
|
|
|
|
private List<LocatedBlock> createLocatedBlockList(final BlockInfo[] blocks,
|
|
@@ -773,9 +773,9 @@ public class BlockManager {
|
|
|
+ ", blk=" + blk);
|
|
|
}
|
|
|
final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)blk;
|
|
|
- final DatanodeDescriptor[] locations = uc.getExpectedLocations();
|
|
|
+ final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
|
|
|
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
|
|
|
- return new LocatedBlock(eb, locations, pos, false);
|
|
|
+ return new LocatedBlock(eb, storages, pos, false);
|
|
|
}
|
|
|
|
|
|
// get block locations
|
|
@@ -790,15 +790,14 @@ public class BlockManager {
|
|
|
final int numNodes = blocksMap.numNodes(blk);
|
|
|
final boolean isCorrupt = numCorruptNodes == numNodes;
|
|
|
final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
|
|
|
- final DatanodeDescriptor[] machines = new DatanodeDescriptor[numMachines];
|
|
|
+ final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
|
|
|
int j = 0;
|
|
|
if (numMachines > 0) {
|
|
|
- for(Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(blk);
|
|
|
- it.hasNext();) {
|
|
|
- final DatanodeDescriptor d = it.next();
|
|
|
+ for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
|
|
|
+ final DatanodeDescriptor d = storage.getDatanodeDescriptor();
|
|
|
final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
|
|
|
if (isCorrupt || (!isCorrupt && !replicaCorrupt))
|
|
|
- machines[j++] = d;
|
|
|
+ machines[j++] = storage;
|
|
|
}
|
|
|
}
|
|
|
assert j == machines.length :
|
|
@@ -990,13 +989,20 @@ public class BlockManager {
|
|
|
}
|
|
|
|
|
|
node.resetBlocks();
|
|
|
- invalidateBlocks.remove(node.getStorageID());
|
|
|
+ invalidateBlocks.remove(node.getDatanodeUuid());
|
|
|
|
|
|
// If the DN hasn't block-reported since the most recent
|
|
|
// failover, then we may have been holding up on processing
|
|
|
// over-replicated blocks because of it. But we can now
|
|
|
// process those blocks.
|
|
|
- if (node.areBlockContentsStale()) {
|
|
|
+ boolean stale = false;
|
|
|
+ for(DatanodeStorageInfo storage : node.getStorageInfos()) {
|
|
|
+ if (storage.areBlockContentsStale()) {
|
|
|
+ stale = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (stale) {
|
|
|
rescanPostponedMisreplicatedBlocks();
|
|
|
}
|
|
|
}
|
|
@@ -1015,9 +1021,8 @@ public class BlockManager {
|
|
|
*/
|
|
|
private void addToInvalidates(Block b) {
|
|
|
StringBuilder datanodes = new StringBuilder();
|
|
|
- for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b); it
|
|
|
- .hasNext();) {
|
|
|
- DatanodeDescriptor node = it.next();
|
|
|
+ for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
|
|
|
+ final DatanodeDescriptor node = storage.getDatanodeDescriptor();
|
|
|
invalidateBlocks.add(b, node, false);
|
|
|
datanodes.append(node).append(" ");
|
|
|
}
|
|
@@ -1035,7 +1040,7 @@ public class BlockManager {
|
|
|
* for logging purposes
|
|
|
*/
|
|
|
public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
|
|
|
- final DatanodeInfo dn, String reason) throws IOException {
|
|
|
+ final DatanodeInfo dn, String storageID, String reason) throws IOException {
|
|
|
assert namesystem.hasWriteLock();
|
|
|
final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock());
|
|
|
if (storedBlock == null) {
|
|
@@ -1048,11 +1053,11 @@ public class BlockManager {
|
|
|
return;
|
|
|
}
|
|
|
markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason,
|
|
|
- Reason.CORRUPTION_REPORTED), dn);
|
|
|
+ Reason.CORRUPTION_REPORTED), dn, storageID);
|
|
|
}
|
|
|
|
|
|
private void markBlockAsCorrupt(BlockToMarkCorrupt b,
|
|
|
- DatanodeInfo dn) throws IOException {
|
|
|
+ DatanodeInfo dn, String storageID) throws IOException {
|
|
|
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
|
|
|
if (node == null) {
|
|
|
throw new IOException("Cannot mark " + b
|
|
@@ -1068,7 +1073,7 @@ public class BlockManager {
|
|
|
}
|
|
|
|
|
|
// Add replica to the data-node if it is not already there
|
|
|
- node.addBlock(b.stored);
|
|
|
+ node.addBlock(storageID, b.stored);
|
|
|
|
|
|
// Add this replica to corruptReplicas Map
|
|
|
corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
|
|
@@ -1193,7 +1198,7 @@ public class BlockManager {
|
|
|
@VisibleForTesting
|
|
|
int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
|
|
|
int requiredReplication, numEffectiveReplicas;
|
|
|
- List<DatanodeDescriptor> containingNodes, liveReplicaNodes;
|
|
|
+ List<DatanodeDescriptor> containingNodes;
|
|
|
DatanodeDescriptor srcNode;
|
|
|
BlockCollection bc = null;
|
|
|
int additionalReplRequired;
|
|
@@ -1219,7 +1224,7 @@ public class BlockManager {
|
|
|
|
|
|
// get a source data-node
|
|
|
containingNodes = new ArrayList<DatanodeDescriptor>();
|
|
|
- liveReplicaNodes = new ArrayList<DatanodeDescriptor>();
|
|
|
+ List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<DatanodeStorageInfo>();
|
|
|
NumberReplicas numReplicas = new NumberReplicas();
|
|
|
srcNode = chooseSourceDatanode(
|
|
|
block, containingNodes, liveReplicaNodes, numReplicas,
|
|
@@ -1279,7 +1284,7 @@ public class BlockManager {
|
|
|
namesystem.writeLock();
|
|
|
try {
|
|
|
for(ReplicationWork rw : work){
|
|
|
- DatanodeDescriptor[] targets = rw.targets;
|
|
|
+ final DatanodeStorageInfo[] targets = rw.targets;
|
|
|
if(targets == null || targets.length == 0){
|
|
|
rw.targets = null;
|
|
|
continue;
|
|
@@ -1319,7 +1324,8 @@ public class BlockManager {
|
|
|
|
|
|
if ( (numReplicas.liveReplicas() >= requiredReplication) &&
|
|
|
(!blockHasEnoughRacks(block)) ) {
|
|
|
- if (rw.srcNode.getNetworkLocation().equals(targets[0].getNetworkLocation())) {
|
|
|
+ if (rw.srcNode.getNetworkLocation().equals(
|
|
|
+ targets[0].getDatanodeDescriptor().getNetworkLocation())) {
|
|
|
//No use continuing, unless a new rack in this case
|
|
|
continue;
|
|
|
}
|
|
@@ -1328,15 +1334,13 @@ public class BlockManager {
|
|
|
// Add block to the to be replicated list
|
|
|
rw.srcNode.addBlockToBeReplicated(block, targets);
|
|
|
scheduledWork++;
|
|
|
-
|
|
|
- for (DatanodeDescriptor dn : targets) {
|
|
|
- dn.incBlocksScheduled();
|
|
|
- }
|
|
|
+ DatanodeStorageInfo.incrementBlocksScheduled(targets);
|
|
|
|
|
|
// Move the block-replication into a "pending" state.
|
|
|
// The reason we use 'pending' is so we can retry
|
|
|
// replications that fail after an appropriate amount of time.
|
|
|
- pendingReplications.increment(block, targets);
|
|
|
+ pendingReplications.increment(block,
|
|
|
+ DatanodeStorageInfo.toDatanodeDescriptors(targets));
|
|
|
if(blockLog.isDebugEnabled()) {
|
|
|
blockLog.debug(
|
|
|
"BLOCK* block " + block
|
|
@@ -1357,12 +1361,12 @@ public class BlockManager {
|
|
|
if (blockLog.isInfoEnabled()) {
|
|
|
// log which blocks have been scheduled for replication
|
|
|
for(ReplicationWork rw : work){
|
|
|
- DatanodeDescriptor[] targets = rw.targets;
|
|
|
+ DatanodeStorageInfo[] targets = rw.targets;
|
|
|
if (targets != null && targets.length != 0) {
|
|
|
StringBuilder targetList = new StringBuilder("datanode(s)");
|
|
|
for (int k = 0; k < targets.length; k++) {
|
|
|
targetList.append(' ');
|
|
|
- targetList.append(targets[k]);
|
|
|
+ targetList.append(targets[k].getDatanodeDescriptor());
|
|
|
}
|
|
|
blockLog.info("BLOCK* ask " + rw.srcNode
|
|
|
+ " to replicate " + rw.block + " to " + targetList);
|
|
@@ -1386,15 +1390,16 @@ public class BlockManager {
|
|
|
* @see BlockPlacementPolicy#chooseTarget(String, int, Node,
|
|
|
* List, boolean, Set, long)
|
|
|
*/
|
|
|
- public DatanodeDescriptor[] chooseTarget(final String src,
|
|
|
+ public DatanodeStorageInfo[] chooseTarget(final String src,
|
|
|
final int numOfReplicas, final DatanodeDescriptor client,
|
|
|
final Set<Node> excludedNodes,
|
|
|
final long blocksize, List<String> favoredNodes) throws IOException {
|
|
|
List<DatanodeDescriptor> favoredDatanodeDescriptors =
|
|
|
getDatanodeDescriptors(favoredNodes);
|
|
|
- final DatanodeDescriptor targets[] = blockplacement.chooseTarget(src,
|
|
|
+ final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src,
|
|
|
numOfReplicas, client, excludedNodes, blocksize,
|
|
|
- favoredDatanodeDescriptors);
|
|
|
+ // TODO: get storage type from file
|
|
|
+ favoredDatanodeDescriptors, StorageType.DEFAULT);
|
|
|
if (targets.length < minReplication) {
|
|
|
throw new IOException("File " + src + " could only be replicated to "
|
|
|
+ targets.length + " nodes instead of minReplication (="
|
|
@@ -1455,12 +1460,11 @@ public class BlockManager {
|
|
|
* the given block
|
|
|
*/
|
|
|
@VisibleForTesting
|
|
|
- DatanodeDescriptor chooseSourceDatanode(
|
|
|
- Block block,
|
|
|
- List<DatanodeDescriptor> containingNodes,
|
|
|
- List<DatanodeDescriptor> nodesContainingLiveReplicas,
|
|
|
- NumberReplicas numReplicas,
|
|
|
- int priority) {
|
|
|
+ DatanodeDescriptor chooseSourceDatanode(Block block,
|
|
|
+ List<DatanodeDescriptor> containingNodes,
|
|
|
+ List<DatanodeStorageInfo> nodesContainingLiveReplicas,
|
|
|
+ NumberReplicas numReplicas,
|
|
|
+ int priority) {
|
|
|
containingNodes.clear();
|
|
|
nodesContainingLiveReplicas.clear();
|
|
|
DatanodeDescriptor srcNode = null;
|
|
@@ -1468,12 +1472,12 @@ public class BlockManager {
|
|
|
int decommissioned = 0;
|
|
|
int corrupt = 0;
|
|
|
int excess = 0;
|
|
|
- Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
|
|
|
+
|
|
|
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
|
|
|
- while(it.hasNext()) {
|
|
|
- DatanodeDescriptor node = it.next();
|
|
|
+ for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
|
|
|
+ final DatanodeDescriptor node = storage.getDatanodeDescriptor();
|
|
|
LightWeightLinkedSet<Block> excessBlocks =
|
|
|
- excessReplicateMap.get(node.getStorageID());
|
|
|
+ excessReplicateMap.get(node.getDatanodeUuid());
|
|
|
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
|
|
|
corrupt++;
|
|
|
else if (node.isDecommissionInProgress() || node.isDecommissioned())
|
|
@@ -1481,7 +1485,7 @@ public class BlockManager {
|
|
|
else if (excessBlocks != null && excessBlocks.contains(block)) {
|
|
|
excess++;
|
|
|
} else {
|
|
|
- nodesContainingLiveReplicas.add(node);
|
|
|
+ nodesContainingLiveReplicas.add(storage);
|
|
|
live++;
|
|
|
}
|
|
|
containingNodes.add(node);
|
|
@@ -1613,10 +1617,11 @@ public class BlockManager {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * The given datanode is reporting all its blocks.
|
|
|
- * Update the (machine-->blocklist) and (block-->machinelist) maps.
|
|
|
+ * The given storage is reporting all its blocks.
|
|
|
+ * Update the (storage-->block list) and (block-->storage list) maps.
|
|
|
*/
|
|
|
- public void processReport(final DatanodeID nodeID, final String poolId,
|
|
|
+ public void processReport(final DatanodeID nodeID,
|
|
|
+ final DatanodeStorage storage, final String poolId,
|
|
|
final BlockListAsLongs newReport) throws IOException {
|
|
|
namesystem.writeLock();
|
|
|
final long startTime = Time.now(); //after acquiring write lock
|
|
@@ -1630,26 +1635,28 @@ public class BlockManager {
|
|
|
|
|
|
// To minimize startup time, we discard any second (or later) block reports
|
|
|
// that we receive while still in startup phase.
|
|
|
- if (namesystem.isInStartupSafeMode() && !node.isFirstBlockReport()) {
|
|
|
+ final DatanodeStorageInfo storageInfo = node.updateStorage(storage);
|
|
|
+ if (namesystem.isInStartupSafeMode()
|
|
|
+ && storageInfo.getBlockReportCount() > 0) {
|
|
|
blockLog.info("BLOCK* processReport: "
|
|
|
+ "discarded non-initial block report from " + nodeID
|
|
|
+ " because namenode still in startup phase");
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- if (node.numBlocks() == 0) {
|
|
|
+ if (storageInfo.numBlocks() == 0) {
|
|
|
// The first block report can be processed a lot more efficiently than
|
|
|
// ordinary block reports. This shortens restart times.
|
|
|
- processFirstBlockReport(node, newReport);
|
|
|
+ processFirstBlockReport(node, storage.getStorageID(), newReport);
|
|
|
} else {
|
|
|
- processReport(node, newReport);
|
|
|
+ processReport(node, storage, newReport);
|
|
|
}
|
|
|
|
|
|
// Now that we have an up-to-date block report, we know that any
|
|
|
// deletions from a previous NN iteration have been accounted for.
|
|
|
- boolean staleBefore = node.areBlockContentsStale();
|
|
|
- node.receivedBlockReport();
|
|
|
- if (staleBefore && !node.areBlockContentsStale()) {
|
|
|
+ boolean staleBefore = storageInfo.areBlockContentsStale();
|
|
|
+ storageInfo.receivedBlockReport();
|
|
|
+ if (staleBefore && !storageInfo.areBlockContentsStale()) {
|
|
|
LOG.info("BLOCK* processReport: Received first block report from "
|
|
|
+ node + " after starting up or becoming active. Its block "
|
|
|
+ "contents are no longer considered stale");
|
|
@@ -1703,28 +1710,30 @@ public class BlockManager {
|
|
|
}
|
|
|
|
|
|
private void processReport(final DatanodeDescriptor node,
|
|
|
+ final DatanodeStorage storage,
|
|
|
final BlockListAsLongs report) throws IOException {
|
|
|
// Normal case:
|
|
|
// Modify the (block-->datanode) map, according to the difference
|
|
|
// between the old and new block report.
|
|
|
//
|
|
|
Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
|
|
|
- Collection<Block> toRemove = new LinkedList<Block>();
|
|
|
+ Collection<Block> toRemove = new TreeSet<Block>();
|
|
|
Collection<Block> toInvalidate = new LinkedList<Block>();
|
|
|
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
|
|
|
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
|
|
|
- reportDiff(node, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC);
|
|
|
+ reportDiff(node, storage, report,
|
|
|
+ toAdd, toRemove, toInvalidate, toCorrupt, toUC);
|
|
|
|
|
|
// Process the blocks on each queue
|
|
|
for (StatefulBlockInfo b : toUC) {
|
|
|
- addStoredBlockUnderConstruction(b, node);
|
|
|
+ addStoredBlockUnderConstruction(b, node, storage.getStorageID());
|
|
|
}
|
|
|
for (Block b : toRemove) {
|
|
|
removeStoredBlock(b, node);
|
|
|
}
|
|
|
int numBlocksLogged = 0;
|
|
|
for (BlockInfo b : toAdd) {
|
|
|
- addStoredBlock(b, node, null, numBlocksLogged < maxNumBlocksToLog);
|
|
|
+ addStoredBlock(b, node, storage.getStorageID(), null, numBlocksLogged < maxNumBlocksToLog);
|
|
|
numBlocksLogged++;
|
|
|
}
|
|
|
if (numBlocksLogged > maxNumBlocksToLog) {
|
|
@@ -1738,7 +1747,7 @@ public class BlockManager {
|
|
|
addToInvalidates(b, node);
|
|
|
}
|
|
|
for (BlockToMarkCorrupt b : toCorrupt) {
|
|
|
- markBlockAsCorrupt(b, node);
|
|
|
+ markBlockAsCorrupt(b, node, storage.getStorageID());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1754,10 +1763,11 @@ public class BlockManager {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
private void processFirstBlockReport(final DatanodeDescriptor node,
|
|
|
+ final String storageID,
|
|
|
final BlockListAsLongs report) throws IOException {
|
|
|
if (report == null) return;
|
|
|
assert (namesystem.hasWriteLock());
|
|
|
- assert (node.numBlocks() == 0);
|
|
|
+ assert (node.getStorageInfo(storageID).numBlocks() == 0);
|
|
|
BlockReportIterator itBR = report.getBlockReportIterator();
|
|
|
|
|
|
while(itBR.hasNext()) {
|
|
@@ -1766,7 +1776,7 @@ public class BlockManager {
|
|
|
|
|
|
if (shouldPostponeBlocksFromFuture &&
|
|
|
namesystem.isGenStampInFuture(iblk)) {
|
|
|
- queueReportedBlock(node, iblk, reportedState,
|
|
|
+ queueReportedBlock(node, storageID, iblk, reportedState,
|
|
|
QUEUE_REASON_FUTURE_GENSTAMP);
|
|
|
continue;
|
|
|
}
|
|
@@ -1783,10 +1793,10 @@ public class BlockManager {
|
|
|
if (shouldPostponeBlocksFromFuture) {
|
|
|
// In the Standby, we may receive a block report for a file that we
|
|
|
// just have an out-of-date gen-stamp or state for, for example.
|
|
|
- queueReportedBlock(node, iblk, reportedState,
|
|
|
+ queueReportedBlock(node, storageID, iblk, reportedState,
|
|
|
QUEUE_REASON_CORRUPT_STATE);
|
|
|
} else {
|
|
|
- markBlockAsCorrupt(c, node);
|
|
|
+ markBlockAsCorrupt(c, node, storageID);
|
|
|
}
|
|
|
continue;
|
|
|
}
|
|
@@ -1794,7 +1804,7 @@ public class BlockManager {
|
|
|
// If block is under construction, add this replica to its list
|
|
|
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
|
|
|
((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
|
|
|
- node, iblk, reportedState);
|
|
|
+ node.getStorageInfo(storageID), iblk, reportedState);
|
|
|
// OpenFileBlocks only inside snapshots also will be added to safemode
|
|
|
// threshold. So we need to update such blocks to safemode
|
|
|
// refer HDFS-5283
|
|
@@ -1807,22 +1817,25 @@ public class BlockManager {
|
|
|
}
|
|
|
//add replica if appropriate
|
|
|
if (reportedState == ReplicaState.FINALIZED) {
|
|
|
- addStoredBlockImmediate(storedBlock, node);
|
|
|
+ addStoredBlockImmediate(storedBlock, node, storageID);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void reportDiff(DatanodeDescriptor dn,
|
|
|
+ private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage,
|
|
|
BlockListAsLongs newReport,
|
|
|
Collection<BlockInfo> toAdd, // add to DatanodeDescriptor
|
|
|
Collection<Block> toRemove, // remove from DatanodeDescriptor
|
|
|
Collection<Block> toInvalidate, // should be removed from DN
|
|
|
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
|
|
|
Collection<StatefulBlockInfo> toUC) { // add to under-construction list
|
|
|
+
|
|
|
+ final DatanodeStorageInfo storageInfo = dn.updateStorage(storage);
|
|
|
+
|
|
|
// place a delimiter in the list which separates blocks
|
|
|
// that have been reported from those that have not
|
|
|
BlockInfo delimiter = new BlockInfo(new Block(), 1);
|
|
|
- boolean added = dn.addBlock(delimiter);
|
|
|
+ boolean added = storageInfo.addBlock(delimiter);
|
|
|
assert added : "Delimiting block cannot be present in the node";
|
|
|
int headIndex = 0; //currently the delimiter is in the head of the list
|
|
|
int curIndex;
|
|
@@ -1834,20 +1847,21 @@ public class BlockManager {
|
|
|
while(itBR.hasNext()) {
|
|
|
Block iblk = itBR.next();
|
|
|
ReplicaState iState = itBR.getCurrentReplicaState();
|
|
|
- BlockInfo storedBlock = processReportedBlock(dn, iblk, iState,
|
|
|
- toAdd, toInvalidate, toCorrupt, toUC);
|
|
|
+ BlockInfo storedBlock = processReportedBlock(dn, storage.getStorageID(),
|
|
|
+ iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
|
|
|
+
|
|
|
// move block to the head of the list
|
|
|
if (storedBlock != null && (curIndex = storedBlock.findDatanode(dn)) >= 0) {
|
|
|
- headIndex = dn.moveBlockToHead(storedBlock, curIndex, headIndex);
|
|
|
+ headIndex = storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
// collect blocks that have not been reported
|
|
|
// all of them are next to the delimiter
|
|
|
- Iterator<? extends Block> it = new DatanodeDescriptor.BlockIterator(
|
|
|
- delimiter.getNext(0), dn);
|
|
|
+ Iterator<BlockInfo> it = storageInfo.new BlockIterator(delimiter.getNext(0));
|
|
|
while(it.hasNext())
|
|
|
toRemove.add(it.next());
|
|
|
- dn.removeBlock(delimiter);
|
|
|
+ storageInfo.removeBlock(delimiter);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1881,7 +1895,8 @@ public class BlockManager {
|
|
|
* @return the up-to-date stored block, if it should be kept.
|
|
|
* Otherwise, null.
|
|
|
*/
|
|
|
- private BlockInfo processReportedBlock(final DatanodeDescriptor dn,
|
|
|
+ private BlockInfo processReportedBlock(final DatanodeDescriptor dn,
|
|
|
+ final String storageID,
|
|
|
final Block block, final ReplicaState reportedState,
|
|
|
final Collection<BlockInfo> toAdd,
|
|
|
final Collection<Block> toInvalidate,
|
|
@@ -1896,7 +1911,7 @@ public class BlockManager {
|
|
|
|
|
|
if (shouldPostponeBlocksFromFuture &&
|
|
|
namesystem.isGenStampInFuture(block)) {
|
|
|
- queueReportedBlock(dn, block, reportedState,
|
|
|
+ queueReportedBlock(dn, storageID, block, reportedState,
|
|
|
QUEUE_REASON_FUTURE_GENSTAMP);
|
|
|
return null;
|
|
|
}
|
|
@@ -1917,7 +1932,7 @@ public class BlockManager {
|
|
|
}
|
|
|
|
|
|
// Ignore replicas already scheduled to be removed from the DN
|
|
|
- if(invalidateBlocks.contains(dn.getStorageID(), block)) {
|
|
|
+ if(invalidateBlocks.contains(dn.getDatanodeUuid(), block)) {
|
|
|
/* TODO: following assertion is incorrect, see HDFS-2668
|
|
|
assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
+ " in recentInvalidatesSet should not appear in DN " + dn; */
|
|
@@ -1931,7 +1946,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
// If the block is an out-of-date generation stamp or state,
|
|
|
// but we're the standby, we shouldn't treat it as corrupt,
|
|
|
// but instead just queue it for later processing.
|
|
|
- queueReportedBlock(dn, storedBlock, reportedState,
|
|
|
+ queueReportedBlock(dn, storageID, storedBlock, reportedState,
|
|
|
QUEUE_REASON_CORRUPT_STATE);
|
|
|
} else {
|
|
|
toCorrupt.add(c);
|
|
@@ -1960,7 +1975,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
* standby node. @see PendingDataNodeMessages.
|
|
|
* @param reason a textual reason to report in the debug logs
|
|
|
*/
|
|
|
- private void queueReportedBlock(DatanodeDescriptor dn, Block block,
|
|
|
+ private void queueReportedBlock(DatanodeDescriptor dn, String storageID, Block block,
|
|
|
ReplicaState reportedState, String reason) {
|
|
|
assert shouldPostponeBlocksFromFuture;
|
|
|
|
|
@@ -1970,7 +1985,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
" from datanode " + dn + " for later processing " +
|
|
|
"because " + reason + ".");
|
|
|
}
|
|
|
- pendingDNMessages.enqueueReportedBlock(dn, block, reportedState);
|
|
|
+ pendingDNMessages.enqueueReportedBlock(dn, storageID, block, reportedState);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1993,8 +2008,8 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Processing previouly queued message " + rbi);
|
|
|
}
|
|
|
- processAndHandleReportedBlock(
|
|
|
- rbi.getNode(), rbi.getBlock(), rbi.getReportedState(), null);
|
|
|
+ processAndHandleReportedBlock(rbi.getNode(), rbi.getStorageID(),
|
|
|
+ rbi.getBlock(), rbi.getReportedState(), null);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2111,19 +2126,21 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
|
|
|
- DatanodeDescriptor node) throws IOException {
|
|
|
+ DatanodeDescriptor node, String storageID) throws IOException {
|
|
|
BlockInfoUnderConstruction block = ucBlock.storedBlock;
|
|
|
- block.addReplicaIfNotPresent(node, ucBlock.reportedBlock, ucBlock.reportedState);
|
|
|
+ block.addReplicaIfNotPresent(node.getStorageInfo(storageID),
|
|
|
+ ucBlock.reportedBlock, ucBlock.reportedState);
|
|
|
+
|
|
|
if (ucBlock.reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) {
|
|
|
- addStoredBlock(block, node, null, true);
|
|
|
+ addStoredBlock(block, node, storageID, null, true);
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Faster version of
|
|
|
- * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, DatanodeDescriptor, boolean)}
|
|
|
+ * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, String, DatanodeDescriptor, boolean)}
|
|
|
* , intended for use with initial block report at startup. If not in startup
|
|
|
* safe mode, will call standard addStoredBlock(). Assumes this method is
|
|
|
* called "immediately" so there is no need to refresh the storedBlock from
|
|
@@ -2134,17 +2151,17 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
private void addStoredBlockImmediate(BlockInfo storedBlock,
|
|
|
- DatanodeDescriptor node)
|
|
|
+ DatanodeDescriptor node, String storageID)
|
|
|
throws IOException {
|
|
|
assert (storedBlock != null && namesystem.hasWriteLock());
|
|
|
if (!namesystem.isInStartupSafeMode()
|
|
|
|| namesystem.isPopulatingReplQueues()) {
|
|
|
- addStoredBlock(storedBlock, node, null, false);
|
|
|
+ addStoredBlock(storedBlock, node, storageID, null, false);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
// just add it
|
|
|
- node.addBlock(storedBlock);
|
|
|
+ node.addBlock(storageID, storedBlock);
|
|
|
|
|
|
// Now check for completion of blocks and safe block count
|
|
|
int numCurrentReplica = countLiveNodes(storedBlock);
|
|
@@ -2167,6 +2184,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
*/
|
|
|
private Block addStoredBlock(final BlockInfo block,
|
|
|
DatanodeDescriptor node,
|
|
|
+ String storageID,
|
|
|
DatanodeDescriptor delNodeHint,
|
|
|
boolean logEveryBlock)
|
|
|
throws IOException {
|
|
@@ -2192,7 +2210,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
assert bc != null : "Block must belong to a file";
|
|
|
|
|
|
// add block to the datanode
|
|
|
- boolean added = node.addBlock(storedBlock);
|
|
|
+ boolean added = node.addBlock(storageID, storedBlock);
|
|
|
|
|
|
int curReplicaDelta;
|
|
|
if (added) {
|
|
@@ -2452,19 +2470,19 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
|
|
|
Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
|
|
|
.getNodes(block);
|
|
|
- for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
|
|
|
- it.hasNext();) {
|
|
|
- DatanodeDescriptor cur = it.next();
|
|
|
- if (cur.areBlockContentsStale()) {
|
|
|
+ for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
|
|
|
+ final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
|
|
|
+ if (storage.areBlockContentsStale()) {
|
|
|
LOG.info("BLOCK* processOverReplicatedBlock: " +
|
|
|
"Postponing processing of over-replicated " +
|
|
|
- block + " since datanode " + cur + " does not yet have up-to-date " +
|
|
|
+ block + " since storage + " + storage
|
|
|
+ + "datanode " + cur + " does not yet have up-to-date " +
|
|
|
"block information.");
|
|
|
postponeBlock(block);
|
|
|
return;
|
|
|
}
|
|
|
LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(cur
|
|
|
- .getStorageID());
|
|
|
+ .getDatanodeUuid());
|
|
|
if (excessBlocks == null || !excessBlocks.contains(block)) {
|
|
|
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
|
|
|
// exclude corrupt replicas
|
|
@@ -2553,10 +2571,10 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
|
|
|
private void addToExcessReplicate(DatanodeInfo dn, Block block) {
|
|
|
assert namesystem.hasWriteLock();
|
|
|
- LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(dn.getStorageID());
|
|
|
+ LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(dn.getDatanodeUuid());
|
|
|
if (excessBlocks == null) {
|
|
|
excessBlocks = new LightWeightLinkedSet<Block>();
|
|
|
- excessReplicateMap.put(dn.getStorageID(), excessBlocks);
|
|
|
+ excessReplicateMap.put(dn.getDatanodeUuid(), excessBlocks);
|
|
|
}
|
|
|
if (excessBlocks.add(block)) {
|
|
|
excessBlocksCount.incrementAndGet();
|
|
@@ -2604,7 +2622,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
// in "excess" there.
|
|
|
//
|
|
|
LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(node
|
|
|
- .getStorageID());
|
|
|
+ .getDatanodeUuid());
|
|
|
if (excessBlocks != null) {
|
|
|
if (excessBlocks.remove(block)) {
|
|
|
excessBlocksCount.decrementAndGet();
|
|
@@ -2613,7 +2631,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
+ block + " is removed from excessBlocks");
|
|
|
}
|
|
|
if (excessBlocks.size() == 0) {
|
|
|
- excessReplicateMap.remove(node.getStorageID());
|
|
|
+ excessReplicateMap.remove(node.getDatanodeUuid());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -2628,12 +2646,18 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
* return the length of the added block; 0 if the block is not added
|
|
|
*/
|
|
|
private long addBlock(Block block, List<BlockWithLocations> results) {
|
|
|
- final List<String> machineSet = getValidLocations(block);
|
|
|
- if(machineSet.size() == 0) {
|
|
|
+ final List<DatanodeStorageInfo> locations = getValidLocations(block);
|
|
|
+ if(locations.size() == 0) {
|
|
|
return 0;
|
|
|
} else {
|
|
|
- results.add(new BlockWithLocations(block,
|
|
|
- machineSet.toArray(new String[machineSet.size()])));
|
|
|
+ final String[] datanodeUuids = new String[locations.size()];
|
|
|
+ final String[] storageIDs = new String[datanodeUuids.length];
|
|
|
+ for(int i = 0; i < locations.size(); i++) {
|
|
|
+ final DatanodeStorageInfo s = locations.get(i);
|
|
|
+ datanodeUuids[i] = s.getDatanodeDescriptor().getDatanodeUuid();
|
|
|
+ storageIDs[i] = s.getStorageID();
|
|
|
+ }
|
|
|
+ results.add(new BlockWithLocations(block, datanodeUuids, storageIDs));
|
|
|
return block.getNumBytes();
|
|
|
}
|
|
|
}
|
|
@@ -2642,12 +2666,12 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
* The given node is reporting that it received a certain block.
|
|
|
*/
|
|
|
@VisibleForTesting
|
|
|
- void addBlock(DatanodeDescriptor node, Block block, String delHint)
|
|
|
+ void addBlock(DatanodeDescriptor node, String storageID, Block block, String delHint)
|
|
|
throws IOException {
|
|
|
- // decrement number of blocks scheduled to this datanode.
|
|
|
+ // Decrement number of blocks scheduled to this datanode.
|
|
|
// for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with
|
|
|
// RECEIVED_BLOCK), we currently also decrease the approximate number.
|
|
|
- node.decBlocksScheduled();
|
|
|
+ node.decrementBlocksScheduled();
|
|
|
|
|
|
// get the deletion hint node
|
|
|
DatanodeDescriptor delHintNode = null;
|
|
@@ -2663,11 +2687,12 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
// Modify the blocks->datanode map and node's map.
|
|
|
//
|
|
|
pendingReplications.decrement(block, node);
|
|
|
- processAndHandleReportedBlock(node, block, ReplicaState.FINALIZED,
|
|
|
+ processAndHandleReportedBlock(node, storageID, block, ReplicaState.FINALIZED,
|
|
|
delHintNode);
|
|
|
}
|
|
|
|
|
|
- private void processAndHandleReportedBlock(DatanodeDescriptor node, Block block,
|
|
|
+ private void processAndHandleReportedBlock(DatanodeDescriptor node,
|
|
|
+ String storageID, Block block,
|
|
|
ReplicaState reportedState, DatanodeDescriptor delHintNode)
|
|
|
throws IOException {
|
|
|
// blockReceived reports a finalized block
|
|
@@ -2675,7 +2700,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
Collection<Block> toInvalidate = new LinkedList<Block>();
|
|
|
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
|
|
|
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
|
|
|
- processReportedBlock(node, block, reportedState,
|
|
|
+ processReportedBlock(node, storageID, block, reportedState,
|
|
|
toAdd, toInvalidate, toCorrupt, toUC);
|
|
|
// the block is only in one of the to-do lists
|
|
|
// if it is in none then data-node already has it
|
|
@@ -2683,11 +2708,11 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
: "The block should be only in one of the lists.";
|
|
|
|
|
|
for (StatefulBlockInfo b : toUC) {
|
|
|
- addStoredBlockUnderConstruction(b, node);
|
|
|
+ addStoredBlockUnderConstruction(b, node, storageID);
|
|
|
}
|
|
|
long numBlocksLogged = 0;
|
|
|
for (BlockInfo b : toAdd) {
|
|
|
- addStoredBlock(b, node, delHintNode, numBlocksLogged < maxNumBlocksToLog);
|
|
|
+ addStoredBlock(b, node, storageID, delHintNode, numBlocksLogged < maxNumBlocksToLog);
|
|
|
numBlocksLogged++;
|
|
|
}
|
|
|
if (numBlocksLogged > maxNumBlocksToLog) {
|
|
@@ -2701,7 +2726,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
addToInvalidates(b, node);
|
|
|
}
|
|
|
for (BlockToMarkCorrupt b : toCorrupt) {
|
|
|
- markBlockAsCorrupt(b, node);
|
|
|
+ markBlockAsCorrupt(b, node, storageID);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2713,7 +2738,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
* This method must be called with FSNamesystem lock held.
|
|
|
*/
|
|
|
public void processIncrementalBlockReport(final DatanodeID nodeID,
|
|
|
- final String poolId, final ReceivedDeletedBlockInfo blockInfos[])
|
|
|
+ final String poolId, final StorageReceivedDeletedBlocks srdb)
|
|
|
throws IOException {
|
|
|
assert namesystem.hasWriteLock();
|
|
|
int received = 0;
|
|
@@ -2729,19 +2754,19 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
"Got incremental block report from unregistered or dead node");
|
|
|
}
|
|
|
|
|
|
- for (ReceivedDeletedBlockInfo rdbi : blockInfos) {
|
|
|
+ for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
|
|
|
switch (rdbi.getStatus()) {
|
|
|
case DELETED_BLOCK:
|
|
|
removeStoredBlock(rdbi.getBlock(), node);
|
|
|
deleted++;
|
|
|
break;
|
|
|
case RECEIVED_BLOCK:
|
|
|
- addBlock(node, rdbi.getBlock(), rdbi.getDelHints());
|
|
|
+ addBlock(node, srdb.getStorageID(), rdbi.getBlock(), rdbi.getDelHints());
|
|
|
received++;
|
|
|
break;
|
|
|
case RECEIVING_BLOCK:
|
|
|
receiving++;
|
|
|
- processAndHandleReportedBlock(node, rdbi.getBlock(),
|
|
|
+ processAndHandleReportedBlock(node, srdb.getStorageID(), rdbi.getBlock(),
|
|
|
ReplicaState.RBW, null);
|
|
|
break;
|
|
|
default:
|
|
@@ -2773,24 +2798,23 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
int corrupt = 0;
|
|
|
int excess = 0;
|
|
|
int stale = 0;
|
|
|
- Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(b);
|
|
|
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
|
|
|
- while (nodeIter.hasNext()) {
|
|
|
- DatanodeDescriptor node = nodeIter.next();
|
|
|
+ for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
|
|
|
+ final DatanodeDescriptor node = storage.getDatanodeDescriptor();
|
|
|
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
|
|
|
corrupt++;
|
|
|
} else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
|
|
|
decommissioned++;
|
|
|
} else {
|
|
|
LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
|
|
|
- .getStorageID());
|
|
|
+ .getDatanodeUuid());
|
|
|
if (blocksExcess != null && blocksExcess.contains(b)) {
|
|
|
excess++;
|
|
|
} else {
|
|
|
live++;
|
|
|
}
|
|
|
}
|
|
|
- if (node.areBlockContentsStale()) {
|
|
|
+ if (storage.areBlockContentsStale()) {
|
|
|
stale++;
|
|
|
}
|
|
|
}
|
|
@@ -2813,10 +2837,9 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
}
|
|
|
// else proceed with fast case
|
|
|
int live = 0;
|
|
|
- Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(b);
|
|
|
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
|
|
|
- while (nodeIter.hasNext()) {
|
|
|
- DatanodeDescriptor node = nodeIter.next();
|
|
|
+ for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
|
|
|
+ final DatanodeDescriptor node = storage.getDatanodeDescriptor();
|
|
|
if ((nodesCorrupt == null) || (!nodesCorrupt.contains(node)))
|
|
|
live++;
|
|
|
}
|
|
@@ -2828,10 +2851,9 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
int curReplicas = num.liveReplicas();
|
|
|
int curExpectedReplicas = getReplication(block);
|
|
|
BlockCollection bc = blocksMap.getBlockCollection(block);
|
|
|
- Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(block);
|
|
|
StringBuilder nodeList = new StringBuilder();
|
|
|
- while (nodeIter.hasNext()) {
|
|
|
- DatanodeDescriptor node = nodeIter.next();
|
|
|
+ for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
|
|
|
+ final DatanodeDescriptor node = storage.getDatanodeDescriptor();
|
|
|
nodeList.append(node);
|
|
|
nodeList.append(" ");
|
|
|
}
|
|
@@ -2936,14 +2958,13 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
return blocksMap.size();
|
|
|
}
|
|
|
|
|
|
- public DatanodeDescriptor[] getNodes(BlockInfo block) {
|
|
|
- DatanodeDescriptor[] nodes =
|
|
|
- new DatanodeDescriptor[block.numNodes()];
|
|
|
- Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
|
|
|
- for (int i = 0; it != null && it.hasNext(); i++) {
|
|
|
- nodes[i] = it.next();
|
|
|
+ public DatanodeStorageInfo[] getStorages(BlockInfo block) {
|
|
|
+ final DatanodeStorageInfo[] storages = new DatanodeStorageInfo[block.numNodes()];
|
|
|
+ int i = 0;
|
|
|
+ for(DatanodeStorageInfo s : blocksMap.getStorages(block)) {
|
|
|
+ storages[i++] = s;
|
|
|
}
|
|
|
- return nodes;
|
|
|
+ return storages;
|
|
|
}
|
|
|
|
|
|
public int getTotalBlocks() {
|
|
@@ -3056,9 +3077,8 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
corruptReplicas.getNodes(b);
|
|
|
int numExpectedReplicas = getReplication(b);
|
|
|
String rackName = null;
|
|
|
- for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b);
|
|
|
- it.hasNext();) {
|
|
|
- DatanodeDescriptor cur = it.next();
|
|
|
+ for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
|
|
|
+ final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
|
|
|
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
|
|
|
if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
|
|
|
if (numExpectedReplicas == 1 ||
|
|
@@ -3102,8 +3122,8 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
}
|
|
|
|
|
|
/** @return an iterator of the datanodes. */
|
|
|
- public Iterator<DatanodeDescriptor> datanodeIterator(final Block block) {
|
|
|
- return blocksMap.nodeIterator(block);
|
|
|
+ public Iterable<DatanodeStorageInfo> getStorages(final Block block) {
|
|
|
+ return blocksMap.getStorages(block);
|
|
|
}
|
|
|
|
|
|
public int numCorruptReplicas(Block block) {
|
|
@@ -3247,24 +3267,24 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
|
|
|
private DatanodeDescriptor srcNode;
|
|
|
private List<DatanodeDescriptor> containingNodes;
|
|
|
- private List<DatanodeDescriptor> liveReplicaNodes;
|
|
|
+ private List<DatanodeStorageInfo> liveReplicaStorages;
|
|
|
private int additionalReplRequired;
|
|
|
|
|
|
- private DatanodeDescriptor targets[];
|
|
|
+ private DatanodeStorageInfo targets[];
|
|
|
private int priority;
|
|
|
|
|
|
public ReplicationWork(Block block,
|
|
|
BlockCollection bc,
|
|
|
DatanodeDescriptor srcNode,
|
|
|
List<DatanodeDescriptor> containingNodes,
|
|
|
- List<DatanodeDescriptor> liveReplicaNodes,
|
|
|
+ List<DatanodeStorageInfo> liveReplicaStorages,
|
|
|
int additionalReplRequired,
|
|
|
int priority) {
|
|
|
this.block = block;
|
|
|
this.bc = bc;
|
|
|
this.srcNode = srcNode;
|
|
|
this.containingNodes = containingNodes;
|
|
|
- this.liveReplicaNodes = liveReplicaNodes;
|
|
|
+ this.liveReplicaStorages = liveReplicaStorages;
|
|
|
this.additionalReplRequired = additionalReplRequired;
|
|
|
this.priority = priority;
|
|
|
this.targets = null;
|
|
@@ -3273,8 +3293,8 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
|
|
|
private void chooseTargets(BlockPlacementPolicy blockplacement,
|
|
|
Set<Node> excludedNodes) {
|
|
|
targets = blockplacement.chooseTarget(bc.getName(),
|
|
|
- additionalReplRequired, srcNode, liveReplicaNodes, false,
|
|
|
- excludedNodes, block.getNumBytes());
|
|
|
+ additionalReplRequired, srcNode, liveReplicaStorages, false,
|
|
|
+ excludedNodes, block.getNumBytes(), StorageType.DEFAULT);
|
|
|
}
|
|
|
}
|
|
|
|