|
@@ -48,6 +48,7 @@ import javax.management.ObjectName;
|
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
|
+import org.apache.hadoop.hdfs.AddBlockFlag;
|
|
import org.apache.hadoop.fs.FileEncryptionInfo;
|
|
import org.apache.hadoop.fs.FileEncryptionInfo;
|
|
import org.apache.hadoop.fs.StorageType;
|
|
import org.apache.hadoop.fs.StorageType;
|
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
|
@@ -109,6 +110,7 @@ import org.apache.hadoop.security.token.Token;
|
|
import org.apache.hadoop.util.Daemon;
|
|
import org.apache.hadoop.util.Daemon;
|
|
import org.apache.hadoop.util.ExitUtil;
|
|
import org.apache.hadoop.util.ExitUtil;
|
|
import org.apache.hadoop.util.LightWeightGSet;
|
|
import org.apache.hadoop.util.LightWeightGSet;
|
|
|
|
+import org.apache.hadoop.util.StringUtils;
|
|
import org.apache.hadoop.util.Time;
|
|
import org.apache.hadoop.util.Time;
|
|
import org.apache.hadoop.util.VersionInfo;
|
|
import org.apache.hadoop.util.VersionInfo;
|
|
|
|
|
|
@@ -147,7 +149,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
private final PendingDataNodeMessages pendingDNMessages =
|
|
private final PendingDataNodeMessages pendingDNMessages =
|
|
new PendingDataNodeMessages();
|
|
new PendingDataNodeMessages();
|
|
|
|
|
|
- private volatile long pendingReplicationBlocksCount = 0L;
|
|
|
|
|
|
+ private volatile long pendingReconstructionBlocksCount = 0L;
|
|
private volatile long corruptReplicaBlocksCount = 0L;
|
|
private volatile long corruptReplicaBlocksCount = 0L;
|
|
private volatile long lowRedundancyBlocksCount = 0L;
|
|
private volatile long lowRedundancyBlocksCount = 0L;
|
|
private volatile long scheduledReplicationBlocksCount = 0L;
|
|
private volatile long scheduledReplicationBlocksCount = 0L;
|
|
@@ -161,8 +163,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
private ObjectName mxBeanName;
|
|
private ObjectName mxBeanName;
|
|
|
|
|
|
/** Used by metrics */
|
|
/** Used by metrics */
|
|
- public long getPendingReplicationBlocksCount() {
|
|
|
|
- return pendingReplicationBlocksCount;
|
|
|
|
|
|
+ public long getPendingReconstructionBlocksCount() {
|
|
|
|
+ return pendingReconstructionBlocksCount;
|
|
}
|
|
}
|
|
/** Used by metrics */
|
|
/** Used by metrics */
|
|
public long getUnderReplicatedBlocksCount() {
|
|
public long getUnderReplicatedBlocksCount() {
|
|
@@ -186,7 +188,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
/** Used by metrics */
|
|
/** Used by metrics */
|
|
public long getExcessBlocksCount() {
|
|
public long getExcessBlocksCount() {
|
|
- return excessReplicas.size();
|
|
|
|
|
|
+ return excessRedundancyMap.size();
|
|
}
|
|
}
|
|
/** Used by metrics */
|
|
/** Used by metrics */
|
|
public long getPostponedMisreplicatedBlocksCount() {
|
|
public long getPostponedMisreplicatedBlocksCount() {
|
|
@@ -246,7 +248,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
* Maps a StorageID to the set of blocks that are "extra" for this
|
|
* Maps a StorageID to the set of blocks that are "extra" for this
|
|
* DataNode. We'll eventually remove these extras.
|
|
* DataNode. We'll eventually remove these extras.
|
|
*/
|
|
*/
|
|
- private final ExcessReplicaMap excessReplicas = new ExcessReplicaMap();
|
|
|
|
|
|
+ private final ExcessRedundancyMap excessRedundancyMap =
|
|
|
|
+ new ExcessRedundancyMap();
|
|
|
|
|
|
/**
|
|
/**
|
|
* Store set of Blocks that need to be replicated 1 or more times.
|
|
* Store set of Blocks that need to be replicated 1 or more times.
|
|
@@ -256,7 +259,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
new LowRedundancyBlocks();
|
|
new LowRedundancyBlocks();
|
|
|
|
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
- final PendingReplicationBlocks pendingReplications;
|
|
|
|
|
|
+ final PendingReconstructionBlocks pendingReconstruction;
|
|
|
|
|
|
/** The maximum number of replicas allowed for a block */
|
|
/** The maximum number of replicas allowed for a block */
|
|
public final short maxReplication;
|
|
public final short maxReplication;
|
|
@@ -352,9 +355,10 @@ public class BlockManager implements BlockStatsMXBean {
|
|
datanodeManager.getNetworkTopology(),
|
|
datanodeManager.getNetworkTopology(),
|
|
datanodeManager.getHost2DatanodeMap());
|
|
datanodeManager.getHost2DatanodeMap());
|
|
storagePolicySuite = BlockStoragePolicySuite.createDefaultSuite();
|
|
storagePolicySuite = BlockStoragePolicySuite.createDefaultSuite();
|
|
- pendingReplications = new PendingReplicationBlocks(conf.getInt(
|
|
|
|
- DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
|
|
|
|
- DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);
|
|
|
|
|
|
+ pendingReconstruction = new PendingReconstructionBlocks(conf.getInt(
|
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
|
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT)
|
|
|
|
+ * 1000L);
|
|
|
|
|
|
blockTokenSecretManager = createBlockTokenSecretManager(conf);
|
|
blockTokenSecretManager = createBlockTokenSecretManager(conf);
|
|
|
|
|
|
@@ -542,7 +546,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
|
|
|
|
public void activate(Configuration conf, long blockTotal) {
|
|
public void activate(Configuration conf, long blockTotal) {
|
|
- pendingReplications.start();
|
|
|
|
|
|
+ pendingReconstruction.start();
|
|
datanodeManager.activate(conf);
|
|
datanodeManager.activate(conf);
|
|
this.replicationThread.setName("ReplicationMonitor");
|
|
this.replicationThread.setName("ReplicationMonitor");
|
|
this.replicationThread.start();
|
|
this.replicationThread.start();
|
|
@@ -565,7 +569,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
} catch (InterruptedException ie) {
|
|
} catch (InterruptedException ie) {
|
|
}
|
|
}
|
|
datanodeManager.close();
|
|
datanodeManager.close();
|
|
- pendingReplications.stop();
|
|
|
|
|
|
+ pendingReconstruction.stop();
|
|
blocksMap.close();
|
|
blocksMap.close();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -604,12 +608,54 @@ public class BlockManager implements BlockStatsMXBean {
|
|
dumpBlockMeta(block, out);
|
|
dumpBlockMeta(block, out);
|
|
}
|
|
}
|
|
|
|
|
|
- // Dump blocks from pendingReplication
|
|
|
|
- pendingReplications.metaSave(out);
|
|
|
|
|
|
+ // Dump blocks from pendingReconstruction
|
|
|
|
+ pendingReconstruction.metaSave(out);
|
|
|
|
|
|
// Dump blocks that are waiting to be deleted
|
|
// Dump blocks that are waiting to be deleted
|
|
invalidateBlocks.dump(out);
|
|
invalidateBlocks.dump(out);
|
|
|
|
|
|
|
|
+ //Dump corrupt blocks and their storageIDs
|
|
|
|
+ Set<Block> corruptBlocks = corruptReplicas.getCorruptBlocks();
|
|
|
|
+ out.println("Corrupt Blocks:");
|
|
|
|
+ for(Block block : corruptBlocks) {
|
|
|
|
+ Collection<DatanodeDescriptor> corruptNodes =
|
|
|
|
+ corruptReplicas.getNodes(block);
|
|
|
|
+ if (corruptNodes == null) {
|
|
|
|
+ LOG.warn(block.getBlockId() +
|
|
|
|
+ " is corrupt but has no associated node.");
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ int numNodesToFind = corruptNodes.size();
|
|
|
|
+ for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
|
|
|
|
+ DatanodeDescriptor node = storage.getDatanodeDescriptor();
|
|
|
|
+ if (corruptNodes.contains(node)) {
|
|
|
|
+ String storageId = storage.getStorageID();
|
|
|
|
+ DatanodeStorageInfo storageInfo = node.getStorageInfo(storageId);
|
|
|
|
+ State state = (storageInfo == null) ? null : storageInfo.getState();
|
|
|
|
+ out.println("Block=" + block.getBlockId() + "\tNode=" + node.getName()
|
|
|
|
+ + "\tStorageID=" + storageId + "\tStorageState=" + state
|
|
|
|
+ + "\tTotalReplicas=" +
|
|
|
|
+ blocksMap.numNodes(block)
|
|
|
|
+ + "\tReason=" + corruptReplicas.getCorruptReason(block, node));
|
|
|
|
+ numNodesToFind--;
|
|
|
|
+ if (numNodesToFind == 0) {
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if (numNodesToFind > 0) {
|
|
|
|
+ String[] corruptNodesList = new String[corruptNodes.size()];
|
|
|
|
+ int i = 0;
|
|
|
|
+ for (DatanodeDescriptor d : corruptNodes) {
|
|
|
|
+ corruptNodesList[i] = d.getHostName();
|
|
|
|
+ i++;
|
|
|
|
+ }
|
|
|
|
+ out.println(block.getBlockId() + " corrupt on " +
|
|
|
|
+ StringUtils.join(",", corruptNodesList) + " but not all nodes are" +
|
|
|
|
+ "found in its block locations");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
// Dump all datanodes
|
|
// Dump all datanodes
|
|
getDatanodeManager().datanodeDump(out);
|
|
getDatanodeManager().datanodeDump(out);
|
|
}
|
|
}
|
|
@@ -765,7 +811,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
/**
|
|
/**
|
|
* If IBR is not sent from expected locations yet, add the datanodes to
|
|
* If IBR is not sent from expected locations yet, add the datanodes to
|
|
- * pendingReplications in order to keep ReplicationMonitor from scheduling
|
|
|
|
|
|
+ * pendingReconstruction in order to keep ReplicationMonitor from scheduling
|
|
* the block.
|
|
* the block.
|
|
*/
|
|
*/
|
|
public void addExpectedReplicasToPending(BlockInfo blk) {
|
|
public void addExpectedReplicasToPending(BlockInfo blk) {
|
|
@@ -780,7 +826,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
pendingNodes.add(dnd);
|
|
pendingNodes.add(dnd);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- pendingReplications.increment(blk,
|
|
|
|
|
|
+ pendingReconstruction.increment(blk,
|
|
pendingNodes.toArray(new DatanodeDescriptor[pendingNodes.size()]));
|
|
pendingNodes.toArray(new DatanodeDescriptor[pendingNodes.size()]));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -866,7 +912,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
neededReconstruction.remove(lastBlock, replicas.liveReplicas(),
|
|
neededReconstruction.remove(lastBlock, replicas.liveReplicas(),
|
|
replicas.readOnlyReplicas(),
|
|
replicas.readOnlyReplicas(),
|
|
replicas.decommissionedAndDecommissioning(), getReplication(lastBlock));
|
|
replicas.decommissionedAndDecommissioning(), getReplication(lastBlock));
|
|
- pendingReplications.remove(lastBlock);
|
|
|
|
|
|
+ pendingReconstruction.remove(lastBlock);
|
|
|
|
|
|
// remove this block from the list of pending blocks to be deleted.
|
|
// remove this block from the list of pending blocks to be deleted.
|
|
for (DatanodeStorageInfo storage : targets) {
|
|
for (DatanodeStorageInfo storage : targets) {
|
|
@@ -992,9 +1038,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
|
|
|
|
final int numNodes = blocksMap.numNodes(blk);
|
|
final int numNodes = blocksMap.numNodes(blk);
|
|
- final boolean isCorrupt = numCorruptNodes != 0 &&
|
|
|
|
- numCorruptNodes == numNodes;
|
|
|
|
- final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
|
|
|
|
|
|
+ final boolean isCorrupt = numCorruptReplicas != 0 &&
|
|
|
|
+ numCorruptReplicas == numNodes;
|
|
|
|
+ final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptReplicas;
|
|
final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
|
|
final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
|
|
final byte[] blockIndices = blk.isStriped() ? new byte[numMachines] : null;
|
|
final byte[] blockIndices = blk.isStriped() ? new byte[numMachines] : null;
|
|
int j = 0, i = 0;
|
|
int j = 0, i = 0;
|
|
@@ -1320,11 +1366,22 @@ public class BlockManager implements BlockStatsMXBean {
|
|
+ " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid()
|
|
+ " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid()
|
|
+ ") does not exist");
|
|
+ ") does not exist");
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+ DatanodeStorageInfo storage = null;
|
|
|
|
+ if (storageID != null) {
|
|
|
|
+ storage = node.getStorageInfo(storageID);
|
|
|
|
+ }
|
|
|
|
+ if (storage == null) {
|
|
|
|
+ storage = storedBlock.findStorageInfo(node);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (storage == null) {
|
|
|
|
+ blockLog.debug("BLOCK* findAndMarkBlockAsCorrupt: {} not found on {}",
|
|
|
|
+ blk, dn);
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
markBlockAsCorrupt(new BlockToMarkCorrupt(reportedBlock, storedBlock,
|
|
markBlockAsCorrupt(new BlockToMarkCorrupt(reportedBlock, storedBlock,
|
|
blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
|
|
blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
|
|
- storageID == null ? null : node.getStorageInfo(storageID),
|
|
|
|
- node);
|
|
|
|
|
|
+ storage, node);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -1435,7 +1492,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
|
|
|
|
void updateState() {
|
|
void updateState() {
|
|
- pendingReplicationBlocksCount = pendingReplications.size();
|
|
|
|
|
|
+ pendingReconstructionBlocksCount = pendingReconstruction.size();
|
|
lowRedundancyBlocksCount = neededReconstruction.size();
|
|
lowRedundancyBlocksCount = neededReconstruction.size();
|
|
corruptReplicaBlocksCount = corruptReplicas.size();
|
|
corruptReplicaBlocksCount = corruptReplicas.size();
|
|
}
|
|
}
|
|
@@ -1578,8 +1635,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
|
|
|
|
blockLog.debug(
|
|
blockLog.debug(
|
|
- "BLOCK* neededReconstruction = {} pendingReplications = {}",
|
|
|
|
- neededReconstruction.size(), pendingReplications.size());
|
|
|
|
|
|
+ "BLOCK* neededReconstruction = {} pendingReconstruction = {}",
|
|
|
|
+ neededReconstruction.size(), pendingReconstruction.size());
|
|
}
|
|
}
|
|
|
|
|
|
return scheduledWork;
|
|
return scheduledWork;
|
|
@@ -1622,7 +1679,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
// not included in the numReplicas.liveReplicas() count
|
|
// not included in the numReplicas.liveReplicas() count
|
|
assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
|
|
assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
|
|
|
|
|
|
- int pendingNum = pendingReplications.getNumReplicas(block);
|
|
|
|
|
|
+ int pendingNum = pendingReconstruction.getNumReplicas(block);
|
|
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
|
|
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
|
|
requiredReplication)) {
|
|
requiredReplication)) {
|
|
neededReconstruction.remove(block, priority);
|
|
neededReconstruction.remove(block, priority);
|
|
@@ -1690,7 +1747,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
// do not schedule more if enough replicas is already pending
|
|
// do not schedule more if enough replicas is already pending
|
|
final short requiredReplication = getExpectedReplicaNum(block);
|
|
final short requiredReplication = getExpectedReplicaNum(block);
|
|
NumberReplicas numReplicas = countNodes(block);
|
|
NumberReplicas numReplicas = countNodes(block);
|
|
- final int pendingNum = pendingReplications.getNumReplicas(block);
|
|
|
|
|
|
+ final int pendingNum = pendingReconstruction.getNumReplicas(block);
|
|
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
|
|
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
|
|
requiredReplication)) {
|
|
requiredReplication)) {
|
|
neededReconstruction.remove(block, priority);
|
|
neededReconstruction.remove(block, priority);
|
|
@@ -1718,8 +1775,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
// Move the block-replication into a "pending" state.
|
|
// Move the block-replication into a "pending" state.
|
|
// The reason we use 'pending' is so we can retry
|
|
// The reason we use 'pending' is so we can retry
|
|
- // replications that fail after an appropriate amount of time.
|
|
|
|
- pendingReplications.increment(block,
|
|
|
|
|
|
+ // reconstructions that fail after an appropriate amount of time.
|
|
|
|
+ pendingReconstruction.increment(block,
|
|
DatanodeStorageInfo.toDatanodeDescriptors(targets));
|
|
DatanodeStorageInfo.toDatanodeDescriptors(targets));
|
|
blockLog.debug("BLOCK* block {} is moved from neededReplications to "
|
|
blockLog.debug("BLOCK* block {} is moved from neededReplications to "
|
|
+ "pendingReplications", block);
|
|
+ "pendingReplications", block);
|
|
@@ -1737,7 +1794,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) {
|
|
DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) {
|
|
return placementPolicies.getPolicy(false).chooseTarget(src, 1, clientnode,
|
|
return placementPolicies.getPolicy(false).chooseTarget(src, 1, clientnode,
|
|
Collections.<DatanodeStorageInfo>emptyList(), false, excludes,
|
|
Collections.<DatanodeStorageInfo>emptyList(), false, excludes,
|
|
- blocksize, storagePolicySuite.getDefaultPolicy());
|
|
|
|
|
|
+ blocksize, storagePolicySuite.getDefaultPolicy(), null);
|
|
}
|
|
}
|
|
|
|
|
|
/** Choose target for getting additional datanodes for an existing pipeline. */
|
|
/** Choose target for getting additional datanodes for an existing pipeline. */
|
|
@@ -1752,7 +1809,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID);
|
|
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID);
|
|
final BlockPlacementPolicy blockplacement = placementPolicies.getPolicy(isStriped);
|
|
final BlockPlacementPolicy blockplacement = placementPolicies.getPolicy(isStriped);
|
|
return blockplacement.chooseTarget(src, numAdditionalNodes, clientnode,
|
|
return blockplacement.chooseTarget(src, numAdditionalNodes, clientnode,
|
|
- chosen, true, excludes, blocksize, storagePolicy);
|
|
|
|
|
|
+ chosen, true, excludes, blocksize, storagePolicy, null);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -1769,14 +1826,15 @@ public class BlockManager implements BlockStatsMXBean {
|
|
final long blocksize,
|
|
final long blocksize,
|
|
final List<String> favoredNodes,
|
|
final List<String> favoredNodes,
|
|
final byte storagePolicyID,
|
|
final byte storagePolicyID,
|
|
- final boolean isStriped) throws IOException {
|
|
|
|
|
|
+ final boolean isStriped,
|
|
|
|
+ final EnumSet<AddBlockFlag> flags) throws IOException {
|
|
List<DatanodeDescriptor> favoredDatanodeDescriptors =
|
|
List<DatanodeDescriptor> favoredDatanodeDescriptors =
|
|
getDatanodeDescriptors(favoredNodes);
|
|
getDatanodeDescriptors(favoredNodes);
|
|
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID);
|
|
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID);
|
|
final BlockPlacementPolicy blockplacement = placementPolicies.getPolicy(isStriped);
|
|
final BlockPlacementPolicy blockplacement = placementPolicies.getPolicy(isStriped);
|
|
final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src,
|
|
final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src,
|
|
numOfReplicas, client, excludedNodes, blocksize,
|
|
numOfReplicas, client, excludedNodes, blocksize,
|
|
- favoredDatanodeDescriptors, storagePolicy);
|
|
|
|
|
|
+ favoredDatanodeDescriptors, storagePolicy, flags);
|
|
if (targets.length < minReplication) {
|
|
if (targets.length < minReplication) {
|
|
throw new IOException("File " + src + " could only be replicated to "
|
|
throw new IOException("File " + src + " could only be replicated to "
|
|
+ targets.length + " nodes instead of minReplication (="
|
|
+ targets.length + " nodes instead of minReplication (="
|
|
@@ -1907,11 +1965,11 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * If there were any replication requests that timed out, reap them
|
|
|
|
- * and put them back into the neededReplication queue
|
|
|
|
|
|
+ * If there were any reconstruction requests that timed out, reap them
|
|
|
|
+ * and put them back into the neededReconstruction queue
|
|
*/
|
|
*/
|
|
- private void processPendingReplications() {
|
|
|
|
- BlockInfo[] timedOutItems = pendingReplications.getTimedOutBlocks();
|
|
|
|
|
|
+ private void processPendingReconstructions() {
|
|
|
|
+ BlockInfo[] timedOutItems = pendingReconstruction.getTimedOutBlocks();
|
|
if (timedOutItems != null) {
|
|
if (timedOutItems != null) {
|
|
namesystem.writeLock();
|
|
namesystem.writeLock();
|
|
try {
|
|
try {
|
|
@@ -2890,7 +2948,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
// Now check for completion of blocks and safe block count
|
|
// Now check for completion of blocks and safe block count
|
|
NumberReplicas num = countNodes(storedBlock);
|
|
NumberReplicas num = countNodes(storedBlock);
|
|
int numLiveReplicas = num.liveReplicas();
|
|
int numLiveReplicas = num.liveReplicas();
|
|
- int pendingNum = pendingReplications.getNumReplicas(storedBlock);
|
|
|
|
|
|
+ int pendingNum = pendingReconstruction.getNumReplicas(storedBlock);
|
|
int numCurrentReplica = numLiveReplicas + pendingNum;
|
|
int numCurrentReplica = numLiveReplicas + pendingNum;
|
|
|
|
|
|
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
|
|
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
|
|
@@ -3203,8 +3261,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
/**
|
|
/**
|
|
* Find how many of the containing nodes are "extra", if any.
|
|
* Find how many of the containing nodes are "extra", if any.
|
|
- * If there are any extras, call chooseExcessReplicates() to
|
|
|
|
- * mark them in the excessReplicateMap.
|
|
|
|
|
|
+ * If there are any extras, call chooseExcessRedundancies() to
|
|
|
|
+ * mark them in the excessRedundancyMap.
|
|
*/
|
|
*/
|
|
private void processExtraRedundancyBlock(final BlockInfo block,
|
|
private void processExtraRedundancyBlock(final BlockInfo block,
|
|
final short replication, final DatanodeDescriptor addedNode,
|
|
final short replication, final DatanodeDescriptor addedNode,
|
|
@@ -3237,11 +3295,11 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- chooseExcessReplicates(nonExcess, block, replication, addedNode,
|
|
|
|
|
|
+ chooseExcessRedundancies(nonExcess, block, replication, addedNode,
|
|
delNodeHint);
|
|
delNodeHint);
|
|
}
|
|
}
|
|
|
|
|
|
- private void chooseExcessReplicates(
|
|
|
|
|
|
+ private void chooseExcessRedundancies(
|
|
final Collection<DatanodeStorageInfo> nonExcess,
|
|
final Collection<DatanodeStorageInfo> nonExcess,
|
|
BlockInfo storedBlock, short replication,
|
|
BlockInfo storedBlock, short replication,
|
|
DatanodeDescriptor addedNode,
|
|
DatanodeDescriptor addedNode,
|
|
@@ -3250,19 +3308,19 @@ public class BlockManager implements BlockStatsMXBean {
|
|
// first form a rack to datanodes map and
|
|
// first form a rack to datanodes map and
|
|
BlockCollection bc = getBlockCollection(storedBlock);
|
|
BlockCollection bc = getBlockCollection(storedBlock);
|
|
if (storedBlock.isStriped()) {
|
|
if (storedBlock.isStriped()) {
|
|
- chooseExcessReplicasStriped(bc, nonExcess, storedBlock, delNodeHint);
|
|
|
|
|
|
+ chooseExcessRedundancyStriped(bc, nonExcess, storedBlock, delNodeHint);
|
|
} else {
|
|
} else {
|
|
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(
|
|
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(
|
|
bc.getStoragePolicyID());
|
|
bc.getStoragePolicyID());
|
|
final List<StorageType> excessTypes = storagePolicy.chooseExcess(
|
|
final List<StorageType> excessTypes = storagePolicy.chooseExcess(
|
|
replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
|
|
replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
|
|
- chooseExcessReplicasContiguous(nonExcess, storedBlock, replication,
|
|
|
|
|
|
+ chooseExcessRedundancyContiguous(nonExcess, storedBlock, replication,
|
|
addedNode, delNodeHint, excessTypes);
|
|
addedNode, delNodeHint, excessTypes);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * We want "replication" replicates for the block, but we now have too many.
|
|
|
|
|
|
+ * We want sufficient redundancy for the block, but we now have too many.
|
|
* In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that:
|
|
* In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that:
|
|
*
|
|
*
|
|
* srcNodes.size() - dstNodes.size() == replication
|
|
* srcNodes.size() - dstNodes.size() == replication
|
|
@@ -3275,7 +3333,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
* If no such a node is available,
|
|
* If no such a node is available,
|
|
* then pick a node with least free space
|
|
* then pick a node with least free space
|
|
*/
|
|
*/
|
|
- private void chooseExcessReplicasContiguous(
|
|
|
|
|
|
+ private void chooseExcessRedundancyContiguous(
|
|
final Collection<DatanodeStorageInfo> nonExcess, BlockInfo storedBlock,
|
|
final Collection<DatanodeStorageInfo> nonExcess, BlockInfo storedBlock,
|
|
short replication, DatanodeDescriptor addedNode,
|
|
short replication, DatanodeDescriptor addedNode,
|
|
DatanodeDescriptor delNodeHint, List<StorageType> excessTypes) {
|
|
DatanodeDescriptor delNodeHint, List<StorageType> excessTypes) {
|
|
@@ -3284,7 +3342,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
.chooseReplicasToDelete(nonExcess, nonExcess, replication, excessTypes,
|
|
.chooseReplicasToDelete(nonExcess, nonExcess, replication, excessTypes,
|
|
addedNode, delNodeHint);
|
|
addedNode, delNodeHint);
|
|
for (DatanodeStorageInfo choosenReplica : replicasToDelete) {
|
|
for (DatanodeStorageInfo choosenReplica : replicasToDelete) {
|
|
- processChosenExcessReplica(nonExcess, choosenReplica, storedBlock);
|
|
|
|
|
|
+ processChosenExcessRedundancy(nonExcess, choosenReplica, storedBlock);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -3297,7 +3355,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
* The block placement policy will make sure that the left internal blocks are
|
|
* The block placement policy will make sure that the left internal blocks are
|
|
* spread across racks and also try hard to pick one with least free space.
|
|
* spread across racks and also try hard to pick one with least free space.
|
|
*/
|
|
*/
|
|
- private void chooseExcessReplicasStriped(BlockCollection bc,
|
|
|
|
|
|
+ private void chooseExcessRedundancyStriped(BlockCollection bc,
|
|
final Collection<DatanodeStorageInfo> nonExcess,
|
|
final Collection<DatanodeStorageInfo> nonExcess,
|
|
BlockInfo storedBlock,
|
|
BlockInfo storedBlock,
|
|
DatanodeDescriptor delNodeHint) {
|
|
DatanodeDescriptor delNodeHint) {
|
|
@@ -3325,7 +3383,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
if (delStorageHint != null) {
|
|
if (delStorageHint != null) {
|
|
Integer index = storage2index.get(delStorageHint);
|
|
Integer index = storage2index.get(delStorageHint);
|
|
if (index != null && duplicated.get(index)) {
|
|
if (index != null && duplicated.get(index)) {
|
|
- processChosenExcessReplica(nonExcess, delStorageHint, storedBlock);
|
|
|
|
|
|
+ processChosenExcessRedundancy(nonExcess, delStorageHint, storedBlock);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -3357,7 +3415,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
.chooseReplicasToDelete(nonExcess, candidates, (short) 1,
|
|
.chooseReplicasToDelete(nonExcess, candidates, (short) 1,
|
|
excessTypes, null, null);
|
|
excessTypes, null, null);
|
|
for (DatanodeStorageInfo chosen : replicasToDelete) {
|
|
for (DatanodeStorageInfo chosen : replicasToDelete) {
|
|
- processChosenExcessReplica(nonExcess, chosen, storedBlock);
|
|
|
|
|
|
+ processChosenExcessRedundancy(nonExcess, chosen, storedBlock);
|
|
candidates.remove(chosen);
|
|
candidates.remove(chosen);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -3365,11 +3423,11 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private void processChosenExcessReplica(
|
|
|
|
|
|
+ private void processChosenExcessRedundancy(
|
|
final Collection<DatanodeStorageInfo> nonExcess,
|
|
final Collection<DatanodeStorageInfo> nonExcess,
|
|
final DatanodeStorageInfo chosen, BlockInfo storedBlock) {
|
|
final DatanodeStorageInfo chosen, BlockInfo storedBlock) {
|
|
nonExcess.remove(chosen);
|
|
nonExcess.remove(chosen);
|
|
- excessReplicas.add(chosen.getDatanodeDescriptor(), storedBlock);
|
|
|
|
|
|
+ excessRedundancyMap.add(chosen.getDatanodeDescriptor(), storedBlock);
|
|
//
|
|
//
|
|
// The 'excessblocks' tracks blocks until we get confirmation
|
|
// The 'excessblocks' tracks blocks until we get confirmation
|
|
// that the datanode has deleted them; the only way we remove them
|
|
// that the datanode has deleted them; the only way we remove them
|
|
@@ -3381,7 +3439,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
//
|
|
//
|
|
final Block blockToInvalidate = getBlockOnStorage(storedBlock, chosen);
|
|
final Block blockToInvalidate = getBlockOnStorage(storedBlock, chosen);
|
|
addToInvalidates(blockToInvalidate, chosen.getDatanodeDescriptor());
|
|
addToInvalidates(blockToInvalidate, chosen.getDatanodeDescriptor());
|
|
- blockLog.debug("BLOCK* chooseExcessReplicates: "
|
|
|
|
|
|
+ blockLog.debug("BLOCK* chooseExcessRedundancies: "
|
|
+ "({}, {}) is added to invalidated blocks set", chosen, storedBlock);
|
|
+ "({}, {}) is added to invalidated blocks set", chosen, storedBlock);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -3433,7 +3491,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
updateNeededReconstructions(storedBlock, -1, 0);
|
|
updateNeededReconstructions(storedBlock, -1, 0);
|
|
}
|
|
}
|
|
|
|
|
|
- excessReplicas.remove(node, storedBlock);
|
|
|
|
|
|
+ excessRedundancyMap.remove(node, storedBlock);
|
|
corruptReplicas.removeFromCorruptReplicasMap(storedBlock, node);
|
|
corruptReplicas.removeFromCorruptReplicasMap(storedBlock, node);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -3504,7 +3562,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
//
|
|
//
|
|
BlockInfo storedBlock = getStoredBlock(block);
|
|
BlockInfo storedBlock = getStoredBlock(block);
|
|
if (storedBlock != null) {
|
|
if (storedBlock != null) {
|
|
- pendingReplications.decrement(storedBlock, node);
|
|
|
|
|
|
+ pendingReconstruction.decrement(storedBlock, node);
|
|
}
|
|
}
|
|
processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
|
|
processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
|
|
delHintNode);
|
|
delHintNode);
|
|
@@ -3749,11 +3807,11 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
int getExcessSize4Testing(String dnUuid) {
|
|
int getExcessSize4Testing(String dnUuid) {
|
|
- return excessReplicas.getSize4Testing(dnUuid);
|
|
|
|
|
|
+ return excessRedundancyMap.getSize4Testing(dnUuid);
|
|
}
|
|
}
|
|
|
|
|
|
public boolean isExcess(DatanodeDescriptor dn, BlockInfo blk) {
|
|
public boolean isExcess(DatanodeDescriptor dn, BlockInfo blk) {
|
|
- return excessReplicas.contains(dn, blk);
|
|
|
|
|
|
+ return excessRedundancyMap.contains(dn, blk);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -3813,7 +3871,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
|
|
|
|
updateState();
|
|
updateState();
|
|
- if (pendingReplicationBlocksCount == 0 &&
|
|
|
|
|
|
+ if (pendingReconstructionBlocksCount == 0 &&
|
|
lowRedundancyBlocksCount == 0) {
|
|
lowRedundancyBlocksCount == 0) {
|
|
LOG.info("Node {} is dead and there are no low redundancy" +
|
|
LOG.info("Node {} is dead and there are no low redundancy" +
|
|
" blocks or blocks pending reconstruction. Safe to decommission.",
|
|
" blocks or blocks pending reconstruction. Safe to decommission.",
|
|
@@ -3860,8 +3918,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
block.setNumBytes(BlockCommand.NO_ACK);
|
|
block.setNumBytes(BlockCommand.NO_ACK);
|
|
addToInvalidates(block);
|
|
addToInvalidates(block);
|
|
removeBlockFromMap(block);
|
|
removeBlockFromMap(block);
|
|
- // Remove the block from pendingReplications and neededReconstruction
|
|
|
|
- pendingReplications.remove(block);
|
|
|
|
|
|
+ // Remove the block from pendingReconstruction and neededReconstruction
|
|
|
|
+ pendingReconstruction.remove(block);
|
|
neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL);
|
|
neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL);
|
|
if (postponedMisreplicatedBlocks.remove(block)) {
|
|
if (postponedMisreplicatedBlocks.remove(block)) {
|
|
postponedMisreplicatedBlocksCount.decrementAndGet();
|
|
postponedMisreplicatedBlocksCount.decrementAndGet();
|
|
@@ -3919,7 +3977,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
for (BlockInfo block : bc.getBlocks()) {
|
|
for (BlockInfo block : bc.getBlocks()) {
|
|
short expected = getExpectedReplicaNum(block);
|
|
short expected = getExpectedReplicaNum(block);
|
|
final NumberReplicas n = countNodes(block);
|
|
final NumberReplicas n = countNodes(block);
|
|
- final int pending = pendingReplications.getNumReplicas(block);
|
|
|
|
|
|
+ final int pending = pendingReconstruction.getNumReplicas(block);
|
|
if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) {
|
|
if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) {
|
|
neededReconstruction.add(block, n.liveReplicas() + pending,
|
|
neededReconstruction.add(block, n.liveReplicas() + pending,
|
|
n.readOnlyReplicas(),
|
|
n.readOnlyReplicas(),
|
|
@@ -4059,7 +4117,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
public void removeBlockFromMap(BlockInfo block) {
|
|
public void removeBlockFromMap(BlockInfo block) {
|
|
for(DatanodeStorageInfo info : blocksMap.getStorages(block)) {
|
|
for(DatanodeStorageInfo info : blocksMap.getStorages(block)) {
|
|
- excessReplicas.remove(info.getDatanodeDescriptor(), block);
|
|
|
|
|
|
+ excessRedundancyMap.remove(info.getDatanodeDescriptor(), block);
|
|
}
|
|
}
|
|
|
|
|
|
blocksMap.removeBlock(block);
|
|
blocksMap.removeBlock(block);
|
|
@@ -4110,7 +4168,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
// Process replication work only when active NN is out of safe mode.
|
|
// Process replication work only when active NN is out of safe mode.
|
|
if (isPopulatingReplQueues()) {
|
|
if (isPopulatingReplQueues()) {
|
|
computeDatanodeWork();
|
|
computeDatanodeWork();
|
|
- processPendingReplications();
|
|
|
|
|
|
+ processPendingReconstructions();
|
|
rescanPostponedMisreplicatedBlocks();
|
|
rescanPostponedMisreplicatedBlocks();
|
|
}
|
|
}
|
|
Thread.sleep(replicationRecheckInterval);
|
|
Thread.sleep(replicationRecheckInterval);
|
|
@@ -4258,8 +4316,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
*/
|
|
*/
|
|
public void clearQueues() {
|
|
public void clearQueues() {
|
|
neededReconstruction.clear();
|
|
neededReconstruction.clear();
|
|
- pendingReplications.clear();
|
|
|
|
- excessReplicas.clear();
|
|
|
|
|
|
+ pendingReconstruction.clear();
|
|
|
|
+ excessRedundancyMap.clear();
|
|
invalidateBlocks.clear();
|
|
invalidateBlocks.clear();
|
|
datanodeManager.clearPendingQueues();
|
|
datanodeManager.clearPendingQueues();
|
|
postponedMisreplicatedBlocks.clear();
|
|
postponedMisreplicatedBlocks.clear();
|