|
@@ -126,6 +126,29 @@ import org.slf4j.LoggerFactory;
|
|
|
|
|
|
/**
|
|
|
* Keeps information related to the blocks stored in the Hadoop cluster.
|
|
|
+ * For block state management, it tries to maintain the safety
|
|
|
+ * property of "# of live replicas == # of expected redundancy" under
|
|
|
+ * any events such as decommission, namenode failover, datanode failure.
|
|
|
+ *
|
|
|
+ * The motivation of maintenance mode is to allow admins quickly repair nodes
|
|
|
+ * without paying the cost of decommission. Thus with maintenance mode,
|
|
|
+ * # of live replicas doesn't have to be equal to # of expected redundancy.
|
|
|
+ * If any of the replica is in maintenance mode, the safety property
|
|
|
+ * is extended as follows. These property still apply for the case of zero
|
|
|
+ * maintenance replicas, thus we can use these safe property for all scenarios.
|
|
|
+ * a. # of live replicas >= # of min replication for maintenance.
|
|
|
+ * b. # of live replicas <= # of expected redundancy.
|
|
|
+ * c. # of live replicas and maintenance replicas >= # of expected redundancy.
|
|
|
+ *
|
|
|
+ * For regular replication, # of min live replicas for maintenance is determined
|
|
|
+ * by DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY. This number has to <=
|
|
|
+ * DFS_NAMENODE_REPLICATION_MIN_KEY.
|
|
|
+ * For erasure encoding, # of min live replicas for maintenance is
|
|
|
+ * BlockInfoStriped#getRealDataBlockNum.
|
|
|
+ *
|
|
|
+ * Another safety property is to satisfy the block placement policy. While the
|
|
|
+ * policy is configurable, the replicas the policy is applied to are the live
|
|
|
+ * replicas + maintenance replicas.
|
|
|
*/
|
|
|
@InterfaceAudience.Private
|
|
|
public class BlockManager implements BlockStatsMXBean {
|
|
@@ -341,6 +364,11 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
private final BlockIdManager blockIdManager;
|
|
|
|
|
|
+ /** Minimum live replicas needed for the datanode to be transitioned
|
|
|
+ * from ENTERING_MAINTENANCE to IN_MAINTENANCE.
|
|
|
+ */
|
|
|
+ private final short minReplicationToBeInMaintenance;
|
|
|
+
|
|
|
public BlockManager(final Namesystem namesystem, boolean haEnabled,
|
|
|
final Configuration conf) throws IOException {
|
|
|
this.namesystem = namesystem;
|
|
@@ -373,13 +401,13 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
this.maxCorruptFilesReturned = conf.getInt(
|
|
|
DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY,
|
|
|
DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED);
|
|
|
- this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
|
|
|
- DFSConfigKeys.DFS_REPLICATION_DEFAULT);
|
|
|
+ this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
|
|
|
+ DFSConfigKeys.DFS_REPLICATION_DEFAULT);
|
|
|
|
|
|
- final int maxR = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY,
|
|
|
- DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT);
|
|
|
+ final int maxR = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY,
|
|
|
+ DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT);
|
|
|
final int minR = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
|
|
|
- DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
|
|
|
if (minR <= 0)
|
|
|
throw new IOException("Unexpected configuration parameters: "
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
|
|
@@ -407,7 +435,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
|
|
|
this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
|
|
|
|
|
|
- this.replicationRecheckInterval =
|
|
|
+ this.replicationRecheckInterval =
|
|
|
conf.getTimeDuration(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
|
|
|
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT,
|
|
|
TimeUnit.SECONDS) * 1000L;
|
|
@@ -428,7 +456,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
this.encryptDataTransfer =
|
|
|
conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,
|
|
|
DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
|
|
|
-
|
|
|
+
|
|
|
this.maxNumBlocksToLog =
|
|
|
conf.getLong(DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
|
|
|
DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
|
|
@@ -438,6 +466,25 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
this.getBlocksMinBlockSize = conf.getLongBytes(
|
|
|
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY,
|
|
|
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT);
|
|
|
+
|
|
|
+ final int minMaintenanceR = conf.getInt(
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY,
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_DEFAULT);
|
|
|
+
|
|
|
+ if (minMaintenanceR < 0) {
|
|
|
+ throw new IOException("Unexpected configuration parameters: "
|
|
|
+ + DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY
|
|
|
+ + " = " + minMaintenanceR + " < 0");
|
|
|
+ }
|
|
|
+ if (minMaintenanceR > minR) {
|
|
|
+ throw new IOException("Unexpected configuration parameters: "
|
|
|
+ + DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY
|
|
|
+ + " = " + minMaintenanceR + " > "
|
|
|
+ + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
|
|
|
+ + " = " + minR);
|
|
|
+ }
|
|
|
+ this.minReplicationToBeInMaintenance = (short)minMaintenanceR;
|
|
|
+
|
|
|
this.blockReportLeaseManager = new BlockReportLeaseManager(conf);
|
|
|
|
|
|
bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf);
|
|
@@ -668,7 +715,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
// Dump all datanodes
|
|
|
getDatanodeManager().datanodeDump(out);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Dump the metadata for the given block in a human-readable
|
|
|
* form.
|
|
@@ -697,12 +744,12 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
out.print(fileName + ": ");
|
|
|
}
|
|
|
// l: == live:, d: == decommissioned c: == corrupt e: == excess
|
|
|
- out.print(block + ((usableReplicas > 0)? "" : " MISSING") +
|
|
|
+ out.print(block + ((usableReplicas > 0)? "" : " MISSING") +
|
|
|
" (replicas:" +
|
|
|
" l: " + numReplicas.liveReplicas() +
|
|
|
" d: " + numReplicas.decommissionedAndDecommissioning() +
|
|
|
" c: " + numReplicas.corruptReplicas() +
|
|
|
- " e: " + numReplicas.excessReplicas() + ") ");
|
|
|
+ " e: " + numReplicas.excessReplicas() + ") ");
|
|
|
|
|
|
Collection<DatanodeDescriptor> corruptNodes =
|
|
|
corruptReplicas.getNodes(block);
|
|
@@ -750,6 +797,18 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public short getMinReplicationToBeInMaintenance() {
|
|
|
+ return minReplicationToBeInMaintenance;
|
|
|
+ }
|
|
|
+
|
|
|
+ private short getMinMaintenanceStorageNum(BlockInfo block) {
|
|
|
+ if (block.isStriped()) {
|
|
|
+ return ((BlockInfoStriped) block).getRealDataBlockNum();
|
|
|
+ } else {
|
|
|
+ return minReplicationToBeInMaintenance;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public boolean hasMinStorage(BlockInfo block) {
|
|
|
return countNodes(block).liveReplicas() >= getMinStorageNum(block);
|
|
|
}
|
|
@@ -942,7 +1001,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
NumberReplicas replicas = countNodes(lastBlock);
|
|
|
neededReconstruction.remove(lastBlock, replicas.liveReplicas(),
|
|
|
replicas.readOnlyReplicas(),
|
|
|
- replicas.decommissionedAndDecommissioning(), getRedundancy(lastBlock));
|
|
|
+ replicas.outOfServiceReplicas(), getExpectedRedundancyNum(lastBlock));
|
|
|
pendingReconstruction.remove(lastBlock);
|
|
|
|
|
|
// remove this block from the list of pending blocks to be deleted.
|
|
@@ -1078,7 +1137,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
} else {
|
|
|
isCorrupt = numCorruptReplicas != 0 && numCorruptReplicas == numNodes;
|
|
|
}
|
|
|
- final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptReplicas;
|
|
|
+ int numMachines = isCorrupt ? numNodes: numNodes - numCorruptReplicas;
|
|
|
+ numMachines -= numReplicas.maintenanceNotForReadReplicas();
|
|
|
DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
|
|
|
final byte[] blockIndices = blk.isStriped() ? new byte[numMachines] : null;
|
|
|
int j = 0, i = 0;
|
|
@@ -1086,11 +1146,17 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
final boolean noCorrupt = (numCorruptReplicas == 0);
|
|
|
for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
|
|
|
if (storage.getState() != State.FAILED) {
|
|
|
+ final DatanodeDescriptor d = storage.getDatanodeDescriptor();
|
|
|
+ // Don't pick IN_MAINTENANCE or dead ENTERING_MAINTENANCE states.
|
|
|
+ if (d.isInMaintenance()
|
|
|
+ || (d.isEnteringMaintenance() && !d.isAlive())) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
if (noCorrupt) {
|
|
|
machines[j++] = storage;
|
|
|
i = setBlockIndices(blk, blockIndices, i, storage);
|
|
|
} else {
|
|
|
- final DatanodeDescriptor d = storage.getDatanodeDescriptor();
|
|
|
final boolean replicaCorrupt = isReplicaCorrupt(blk, d);
|
|
|
if (isCorrupt || !replicaCorrupt) {
|
|
|
machines[j++] = storage;
|
|
@@ -1106,7 +1172,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
|
|
|
assert j == machines.length :
|
|
|
- "isCorrupt: " + isCorrupt +
|
|
|
+ "isCorrupt: " + isCorrupt +
|
|
|
" numMachines: " + numMachines +
|
|
|
" numNodes: " + numNodes +
|
|
|
" numCorrupt: " + numCorruptNodes +
|
|
@@ -1347,6 +1413,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
}
|
|
|
checkSafeMode();
|
|
|
+ LOG.info("Removed blocks associated with storage {} from DataNode {}",
|
|
|
+ storageInfo, node);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1698,8 +1766,11 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
return scheduledWork;
|
|
|
}
|
|
|
|
|
|
+ // Check if the number of live + pending replicas satisfies
|
|
|
+ // the expected redundancy.
|
|
|
boolean hasEnoughEffectiveReplicas(BlockInfo block,
|
|
|
- NumberReplicas numReplicas, int pendingReplicaNum, int required) {
|
|
|
+ NumberReplicas numReplicas, int pendingReplicaNum) {
|
|
|
+ int required = getExpectedLiveRedundancyNum(block, numReplicas);
|
|
|
int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum;
|
|
|
return (numEffectiveReplicas >= required) &&
|
|
|
(pendingReplicaNum > 0 || isPlacementPolicySatisfied(block));
|
|
@@ -1714,8 +1785,6 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
- short requiredRedundancy = getExpectedRedundancyNum(block);
|
|
|
-
|
|
|
// get a source data-node
|
|
|
List<DatanodeDescriptor> containingNodes = new ArrayList<>();
|
|
|
List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
|
|
@@ -1724,6 +1793,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block,
|
|
|
containingNodes, liveReplicaNodes, numReplicas,
|
|
|
liveBlockIndices, priority);
|
|
|
+ short requiredRedundancy = getExpectedLiveRedundancyNum(block,
|
|
|
+ numReplicas);
|
|
|
if(srcNodes == null || srcNodes.length == 0) {
|
|
|
// block can not be reconstructed from any node
|
|
|
LOG.debug("Block " + block + " cannot be reconstructed " +
|
|
@@ -1736,8 +1807,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
|
|
|
|
|
|
int pendingNum = pendingReconstruction.getNumReplicas(block);
|
|
|
- if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
|
|
|
- requiredRedundancy)) {
|
|
|
+ if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)) {
|
|
|
neededReconstruction.remove(block, priority);
|
|
|
blockLog.debug("BLOCK* Removing {} from neededReconstruction as" +
|
|
|
" it has enough replicas", block);
|
|
@@ -1761,9 +1831,11 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
// should reconstruct all the internal blocks before scheduling
|
|
|
// replication task for decommissioning node(s).
|
|
|
- if (additionalReplRequired - numReplicas.decommissioning() > 0) {
|
|
|
- additionalReplRequired = additionalReplRequired
|
|
|
- - numReplicas.decommissioning();
|
|
|
+ if (additionalReplRequired - numReplicas.decommissioning() -
|
|
|
+ numReplicas.liveEnteringMaintenanceReplicas() > 0) {
|
|
|
+ additionalReplRequired = additionalReplRequired -
|
|
|
+ numReplicas.decommissioning() -
|
|
|
+ numReplicas.liveEnteringMaintenanceReplicas();
|
|
|
}
|
|
|
byte[] indices = new byte[liveBlockIndices.size()];
|
|
|
for (int i = 0 ; i < liveBlockIndices.size(); i++) {
|
|
@@ -1805,11 +1877,11 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
|
|
|
// do not schedule more if enough replicas is already pending
|
|
|
- final short requiredRedundancy = getExpectedRedundancyNum(block);
|
|
|
NumberReplicas numReplicas = countNodes(block);
|
|
|
+ final short requiredRedundancy =
|
|
|
+ getExpectedLiveRedundancyNum(block, numReplicas);
|
|
|
final int pendingNum = pendingReconstruction.getNumReplicas(block);
|
|
|
- if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
|
|
|
- requiredRedundancy)) {
|
|
|
+ if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)) {
|
|
|
neededReconstruction.remove(block, priority);
|
|
|
rw.resetTargets();
|
|
|
blockLog.debug("BLOCK* Removing {} from neededReplications as" +
|
|
@@ -1878,7 +1950,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
* @throws IOException
|
|
|
* if the number of targets < minimum replication.
|
|
|
* @see BlockPlacementPolicy#chooseTarget(String, int, Node,
|
|
|
- * Set, long, List, BlockStoragePolicy)
|
|
|
+ * Set, long, List, BlockStoragePolicy, EnumSet)
|
|
|
*/
|
|
|
public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src,
|
|
|
final int numOfReplicas, final Node client,
|
|
@@ -1985,13 +2057,15 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- // never use already decommissioned nodes or unknown state replicas
|
|
|
- if (state == null || state == StoredReplicaState.DECOMMISSIONED) {
|
|
|
+ // never use already decommissioned nodes, maintenance node not
|
|
|
+ // suitable for read or unknown state replicas.
|
|
|
+ if (state == null || state == StoredReplicaState.DECOMMISSIONED
|
|
|
+ || state == StoredReplicaState.MAINTENANCE_NOT_FOR_READ) {
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY
|
|
|
- && !node.isDecommissionInProgress()
|
|
|
+ && (!node.isDecommissionInProgress() && !node.isEnteringMaintenance())
|
|
|
&& node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) {
|
|
|
continue; // already reached replication limit
|
|
|
}
|
|
@@ -2043,10 +2117,10 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
continue;
|
|
|
}
|
|
|
NumberReplicas num = countNodes(timedOutItems[i]);
|
|
|
- if (isNeededReconstruction(bi, num.liveReplicas())) {
|
|
|
+ if (isNeededReconstruction(bi, num)) {
|
|
|
neededReconstruction.add(bi, num.liveReplicas(),
|
|
|
- num.readOnlyReplicas(), num.decommissionedAndDecommissioning(),
|
|
|
- getRedundancy(bi));
|
|
|
+ num.readOnlyReplicas(), num.outOfServiceReplicas(),
|
|
|
+ getExpectedRedundancyNum(bi));
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
@@ -2191,7 +2265,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
public boolean processReport(final DatanodeID nodeID,
|
|
|
final DatanodeStorage storage,
|
|
|
final BlockListAsLongs newReport,
|
|
|
- BlockReportContext context, boolean lastStorageInRpc) throws IOException {
|
|
|
+ BlockReportContext context) throws IOException {
|
|
|
namesystem.writeLock();
|
|
|
final long startTime = Time.monotonicNow(); //after acquiring write lock
|
|
|
final long endTime;
|
|
@@ -2245,32 +2319,6 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
|
|
|
storageInfo.receivedBlockReport();
|
|
|
- if (context != null) {
|
|
|
- storageInfo.setLastBlockReportId(context.getReportId());
|
|
|
- if (lastStorageInRpc) {
|
|
|
- int rpcsSeen = node.updateBlockReportContext(context);
|
|
|
- if (rpcsSeen >= context.getTotalRpcs()) {
|
|
|
- long leaseId = blockReportLeaseManager.removeLease(node);
|
|
|
- BlockManagerFaultInjector.getInstance().
|
|
|
- removeBlockReportLease(node, leaseId);
|
|
|
- List<DatanodeStorageInfo> zombies = node.removeZombieStorages();
|
|
|
- if (zombies.isEmpty()) {
|
|
|
- LOG.debug("processReport 0x{}: no zombie storages found.",
|
|
|
- Long.toHexString(context.getReportId()));
|
|
|
- } else {
|
|
|
- for (DatanodeStorageInfo zombie : zombies) {
|
|
|
- removeZombieReplicas(context, zombie);
|
|
|
- }
|
|
|
- }
|
|
|
- node.clearBlockReportContext();
|
|
|
- } else {
|
|
|
- LOG.debug("processReport 0x{}: {} more RPCs remaining in this " +
|
|
|
- "report.", Long.toHexString(context.getReportId()),
|
|
|
- (context.getTotalRpcs() - rpcsSeen)
|
|
|
- );
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
} finally {
|
|
|
endTime = Time.monotonicNow();
|
|
|
namesystem.writeUnlock();
|
|
@@ -2295,36 +2343,25 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
return !node.hasStaleStorages();
|
|
|
}
|
|
|
|
|
|
- private void removeZombieReplicas(BlockReportContext context,
|
|
|
- DatanodeStorageInfo zombie) {
|
|
|
- LOG.warn("processReport 0x{}: removing zombie storage {}, which no " +
|
|
|
- "longer exists on the DataNode.",
|
|
|
- Long.toHexString(context.getReportId()), zombie.getStorageID());
|
|
|
- assert(namesystem.hasWriteLock());
|
|
|
- Iterator<BlockInfo> iter = zombie.getBlockIterator();
|
|
|
- int prevBlocks = zombie.numBlocks();
|
|
|
- while (iter.hasNext()) {
|
|
|
- BlockInfo block = iter.next();
|
|
|
- // We assume that a block can be on only one storage in a DataNode.
|
|
|
- // That's why we pass in the DatanodeDescriptor rather than the
|
|
|
- // DatanodeStorageInfo.
|
|
|
- // TODO: remove this assumption in case we want to put a block on
|
|
|
- // more than one storage on a datanode (and because it's a difficult
|
|
|
- // assumption to really enforce)
|
|
|
- // DatanodeStorageInfo must be removed using the iterator to avoid
|
|
|
- // ConcurrentModificationException in the underlying storage
|
|
|
- iter.remove();
|
|
|
- removeStoredBlock(block, zombie.getDatanodeDescriptor());
|
|
|
- Block b = getBlockOnStorage(block, zombie);
|
|
|
- if (b != null) {
|
|
|
- invalidateBlocks.remove(zombie.getDatanodeDescriptor(), b);
|
|
|
+ public void removeBRLeaseIfNeeded(final DatanodeID nodeID,
|
|
|
+ final BlockReportContext context) throws IOException {
|
|
|
+ namesystem.writeLock();
|
|
|
+ DatanodeDescriptor node;
|
|
|
+ try {
|
|
|
+ node = datanodeManager.getDatanode(nodeID);
|
|
|
+ if (context != null) {
|
|
|
+ if (context.getTotalRpcs() == context.getCurRpc() + 1) {
|
|
|
+ long leaseId = this.getBlockReportLeaseManager().removeLease(node);
|
|
|
+ BlockManagerFaultInjector.getInstance().
|
|
|
+ removeBlockReportLease(node, leaseId);
|
|
|
+ }
|
|
|
+ LOG.debug("Processing RPC with index {} out of total {} RPCs in "
|
|
|
+ + "processReport 0x{}", context.getCurRpc(),
|
|
|
+ context.getTotalRpcs(), Long.toHexString(context.getReportId()));
|
|
|
}
|
|
|
+ } finally {
|
|
|
+ namesystem.writeUnlock();
|
|
|
}
|
|
|
- assert(zombie.numBlocks() == 0);
|
|
|
- LOG.warn("processReport 0x{}: removed {} replicas from storage {}, " +
|
|
|
- "which no longer exists on the DataNode.",
|
|
|
- Long.toHexString(context.getReportId()), prevBlocks,
|
|
|
- zombie.getStorageID());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -3049,10 +3086,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
// handle low redundancy/extra redundancy
|
|
|
short fileRedundancy = getExpectedRedundancyNum(storedBlock);
|
|
|
- if (!isNeededReconstruction(storedBlock, numCurrentReplica)) {
|
|
|
+ if (!isNeededReconstruction(storedBlock, num, pendingNum)) {
|
|
|
neededReconstruction.remove(storedBlock, numCurrentReplica,
|
|
|
- num.readOnlyReplicas(),
|
|
|
- num.decommissionedAndDecommissioning(), fileRedundancy);
|
|
|
+ num.readOnlyReplicas(), num.outOfServiceReplicas(), fileRedundancy);
|
|
|
} else {
|
|
|
updateNeededReconstructions(storedBlock, curReplicaDelta, 0);
|
|
|
}
|
|
@@ -3075,6 +3111,10 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
return storedBlock;
|
|
|
}
|
|
|
|
|
|
+ // If there is any maintenance replica, we don't have to restore
|
|
|
+ // the condition of live + maintenance == expected. We allow
|
|
|
+ // live + maintenance >= expected. The extra redundancy will be removed
|
|
|
+ // when the maintenance node changes to live.
|
|
|
private boolean shouldProcessExtraRedundancy(NumberReplicas num,
|
|
|
int expectedNum) {
|
|
|
final int numCurrent = num.liveReplicas();
|
|
@@ -3290,9 +3330,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
NumberReplicas num = countNodes(block);
|
|
|
final int numCurrentReplica = num.liveReplicas();
|
|
|
// add to low redundancy queue if need to be
|
|
|
- if (isNeededReconstruction(block, numCurrentReplica)) {
|
|
|
+ if (isNeededReconstruction(block, num)) {
|
|
|
if (neededReconstruction.add(block, numCurrentReplica,
|
|
|
- num.readOnlyReplicas(), num.decommissionedAndDecommissioning(),
|
|
|
+ num.readOnlyReplicas(), num.outOfServiceReplicas(),
|
|
|
expectedRedundancy)) {
|
|
|
return MisReplicationResult.UNDER_REPLICATED;
|
|
|
}
|
|
@@ -3325,9 +3365,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
// update neededReconstruction priority queues
|
|
|
b.setReplication(newRepl);
|
|
|
+ NumberReplicas num = countNodes(b);
|
|
|
updateNeededReconstructions(b, 0, newRepl - oldRepl);
|
|
|
-
|
|
|
- if (oldRepl > newRepl) {
|
|
|
+ if (shouldProcessExtraRedundancy(num, newRepl)) {
|
|
|
processExtraRedundancyBlock(b, newRepl, null, null);
|
|
|
}
|
|
|
}
|
|
@@ -3353,14 +3393,14 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
|
|
|
if (storage.areBlockContentsStale()) {
|
|
|
- LOG.trace("BLOCK* processOverReplicatedBlock: Postponing {}"
|
|
|
+ LOG.trace("BLOCK* processExtraRedundancyBlock: Postponing {}"
|
|
|
+ " since storage {} does not yet have up-to-date information.",
|
|
|
block, storage);
|
|
|
postponeBlock(block);
|
|
|
return;
|
|
|
}
|
|
|
if (!isExcess(cur, block)) {
|
|
|
- if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
|
|
|
+ if (cur.isInService()) {
|
|
|
// exclude corrupt replicas
|
|
|
if (corruptNodes == null || !corruptNodes.contains(cur)) {
|
|
|
nonExcess.add(storage);
|
|
@@ -3801,7 +3841,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
return countNodes(b, false);
|
|
|
}
|
|
|
|
|
|
- private NumberReplicas countNodes(BlockInfo b, boolean inStartupSafeMode) {
|
|
|
+ NumberReplicas countNodes(BlockInfo b, boolean inStartupSafeMode) {
|
|
|
NumberReplicas numberReplicas = new NumberReplicas();
|
|
|
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
|
|
|
if (b.isStriped()) {
|
|
@@ -3832,6 +3872,12 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
s = StoredReplicaState.DECOMMISSIONING;
|
|
|
} else if (node.isDecommissioned()) {
|
|
|
s = StoredReplicaState.DECOMMISSIONED;
|
|
|
+ } else if (node.isMaintenance()) {
|
|
|
+ if (node.isInMaintenance() || !node.isAlive()) {
|
|
|
+ s = StoredReplicaState.MAINTENANCE_NOT_FOR_READ;
|
|
|
+ } else {
|
|
|
+ s = StoredReplicaState.MAINTENANCE_FOR_READ;
|
|
|
+ }
|
|
|
} else if (isExcess(node, b)) {
|
|
|
s = StoredReplicaState.EXCESS;
|
|
|
} else {
|
|
@@ -3903,11 +3949,11 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * On stopping decommission, check if the node has excess replicas.
|
|
|
+ * On putting the node in service, check if the node has excess replicas.
|
|
|
* If there are any excess replicas, call processExtraRedundancyBlock().
|
|
|
* Process extra redundancy blocks only when active NN is out of safe mode.
|
|
|
*/
|
|
|
- void processExtraRedundancyBlocksOnReCommission(
|
|
|
+ void processExtraRedundancyBlocksOnInService(
|
|
|
final DatanodeDescriptor srcNode) {
|
|
|
if (!isPopulatingReplQueues()) {
|
|
|
return;
|
|
@@ -3916,7 +3962,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
int numExtraRedundancy = 0;
|
|
|
while(it.hasNext()) {
|
|
|
final BlockInfo block = it.next();
|
|
|
- int expectedReplication = this.getRedundancy(block);
|
|
|
+ int expectedReplication = this.getExpectedRedundancyNum(block);
|
|
|
NumberReplicas num = countNodes(block);
|
|
|
if (shouldProcessExtraRedundancy(num, expectedReplication)) {
|
|
|
// extra redundancy block
|
|
@@ -3926,14 +3972,15 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
}
|
|
|
LOG.info("Invalidated " + numExtraRedundancy
|
|
|
- + " extra redundancy blocks on " + srcNode + " during recommissioning");
|
|
|
+ + " extra redundancy blocks on " + srcNode + " after it is in service");
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Returns whether a node can be safely decommissioned based on its
|
|
|
- * liveness. Dead nodes cannot always be safely decommissioned.
|
|
|
+ * Returns whether a node can be safely decommissioned or in maintenance
|
|
|
+ * based on its liveness. Dead nodes cannot always be safely decommissioned
|
|
|
+ * or in maintenance.
|
|
|
*/
|
|
|
- boolean isNodeHealthyForDecommission(DatanodeDescriptor node) {
|
|
|
+ boolean isNodeHealthyForDecommissionOrMaintenance(DatanodeDescriptor node) {
|
|
|
if (!node.checkBlockReportReceived()) {
|
|
|
LOG.info("Node {} hasn't sent its first block report.", node);
|
|
|
return false;
|
|
@@ -3947,17 +3994,18 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
if (pendingReconstructionBlocksCount == 0 &&
|
|
|
lowRedundancyBlocksCount == 0) {
|
|
|
LOG.info("Node {} is dead and there are no low redundancy" +
|
|
|
- " blocks or blocks pending reconstruction. Safe to decommission.",
|
|
|
- node);
|
|
|
+ " blocks or blocks pending reconstruction. Safe to decommission or",
|
|
|
+ " put in maintenance.", node);
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
LOG.warn("Node {} is dead " +
|
|
|
- "while decommission is in progress. Cannot be safely " +
|
|
|
- "decommissioned since there is risk of reduced " +
|
|
|
- "data durability or data loss. Either restart the failed node or" +
|
|
|
- " force decommissioning by removing, calling refreshNodes, " +
|
|
|
- "then re-adding to the excludes files.", node);
|
|
|
+ "while in {}. Cannot be safely " +
|
|
|
+ "decommissioned or be in maintenance since there is risk of reduced " +
|
|
|
+ "data durability or data loss. Either restart the failed node or " +
|
|
|
+ "force decommissioning or maintenance by removing, calling " +
|
|
|
+ "refreshNodes, then re-adding to the excludes or host config files.",
|
|
|
+ node, node.getAdminState());
|
|
|
return false;
|
|
|
}
|
|
|
|
|
@@ -4025,17 +4073,16 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
}
|
|
|
NumberReplicas repl = countNodes(block);
|
|
|
int pendingNum = pendingReconstruction.getNumReplicas(block);
|
|
|
- int curExpectedReplicas = getRedundancy(block);
|
|
|
- if (!hasEnoughEffectiveReplicas(block, repl, pendingNum,
|
|
|
- curExpectedReplicas)) {
|
|
|
+ int curExpectedReplicas = getExpectedRedundancyNum(block);
|
|
|
+ if (!hasEnoughEffectiveReplicas(block, repl, pendingNum)) {
|
|
|
neededReconstruction.update(block, repl.liveReplicas() + pendingNum,
|
|
|
- repl.readOnlyReplicas(), repl.decommissionedAndDecommissioning(),
|
|
|
+ repl.readOnlyReplicas(), repl.outOfServiceReplicas(),
|
|
|
curExpectedReplicas, curReplicasDelta, expectedReplicasDelta);
|
|
|
} else {
|
|
|
int oldReplicas = repl.liveReplicas() + pendingNum - curReplicasDelta;
|
|
|
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
|
|
|
neededReconstruction.remove(block, oldReplicas, repl.readOnlyReplicas(),
|
|
|
- repl.decommissionedAndDecommissioning(), oldExpectedReplicas);
|
|
|
+ repl.outOfServiceReplicas(), oldExpectedReplicas);
|
|
|
}
|
|
|
} finally {
|
|
|
namesystem.writeUnlock();
|
|
@@ -4053,24 +4100,15 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
short expected = getExpectedRedundancyNum(block);
|
|
|
final NumberReplicas n = countNodes(block);
|
|
|
final int pending = pendingReconstruction.getNumReplicas(block);
|
|
|
- if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) {
|
|
|
+ if (!hasEnoughEffectiveReplicas(block, n, pending)) {
|
|
|
neededReconstruction.add(block, n.liveReplicas() + pending,
|
|
|
- n.readOnlyReplicas(),
|
|
|
- n.decommissionedAndDecommissioning(), expected);
|
|
|
+ n.readOnlyReplicas(), n.outOfServiceReplicas(), expected);
|
|
|
} else if (shouldProcessExtraRedundancy(n, expected)) {
|
|
|
processExtraRedundancyBlock(block, expected, null, null);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * @return 0 if the block is not found;
|
|
|
- * otherwise, return the replication factor of the block.
|
|
|
- */
|
|
|
- private int getRedundancy(BlockInfo block) {
|
|
|
- return getExpectedRedundancyNum(block);
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Get blocks to invalidate for <i>nodeId</i>
|
|
|
* in {@link #invalidateBlocks}.
|
|
@@ -4123,6 +4161,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
.getNodes(storedBlock);
|
|
|
for (DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) {
|
|
|
final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
|
|
|
+ // Nodes under maintenance should be counted as valid replicas from
|
|
|
+ // rack policy point of view.
|
|
|
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()
|
|
|
&& ((corruptNodes == null) || !corruptNodes.contains(cur))) {
|
|
|
liveNodes.add(cur);
|
|
@@ -4137,14 +4177,36 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
.isPlacementPolicySatisfied();
|
|
|
}
|
|
|
|
|
|
+ boolean isNeededReconstructionForMaintenance(BlockInfo storedBlock,
|
|
|
+ NumberReplicas numberReplicas) {
|
|
|
+ return storedBlock.isComplete() && (numberReplicas.liveReplicas() <
|
|
|
+ getMinMaintenanceStorageNum(storedBlock) ||
|
|
|
+ !isPlacementPolicySatisfied(storedBlock));
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean isNeededReconstruction(BlockInfo storedBlock,
|
|
|
+ NumberReplicas numberReplicas) {
|
|
|
+ return isNeededReconstruction(storedBlock, numberReplicas, 0);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* A block needs reconstruction if the number of redundancies is less than
|
|
|
* expected or if it does not have enough racks.
|
|
|
*/
|
|
|
- boolean isNeededReconstruction(BlockInfo storedBlock, int current) {
|
|
|
- int expected = getExpectedRedundancyNum(storedBlock);
|
|
|
- return storedBlock.isComplete()
|
|
|
- && (current < expected || !isPlacementPolicySatisfied(storedBlock));
|
|
|
+ boolean isNeededReconstruction(BlockInfo storedBlock,
|
|
|
+ NumberReplicas numberReplicas, int pending) {
|
|
|
+ return storedBlock.isComplete() &&
|
|
|
+ !hasEnoughEffectiveReplicas(storedBlock, numberReplicas, pending);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Exclude maintenance, but make sure it has minimal live replicas
|
|
|
+ // to satisfy the maintenance requirement.
|
|
|
+ public short getExpectedLiveRedundancyNum(BlockInfo block,
|
|
|
+ NumberReplicas numberReplicas) {
|
|
|
+ final short expectedRedundancy = getExpectedRedundancyNum(block);
|
|
|
+ return (short)Math.max(expectedRedundancy -
|
|
|
+ numberReplicas.maintenanceReplicas(),
|
|
|
+ getMinMaintenanceStorageNum(block));
|
|
|
}
|
|
|
|
|
|
public short getExpectedRedundancyNum(BlockInfo block) {
|