|
@@ -40,7 +40,6 @@ import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.Queue;
|
|
import java.util.Queue;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
-import java.util.TreeSet;
|
|
|
|
import java.util.concurrent.ArrayBlockingQueue;
|
|
import java.util.concurrent.ArrayBlockingQueue;
|
|
import java.util.concurrent.BlockingQueue;
|
|
import java.util.concurrent.BlockingQueue;
|
|
import java.util.concurrent.Callable;
|
|
import java.util.concurrent.Callable;
|
|
@@ -70,7 +69,6 @@ import org.apache.hadoop.hdfs.protocol.BlockType;
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
@@ -111,7 +109,6 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
|
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
|
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
|
-import org.apache.hadoop.hdfs.util.FoldedTreeSet;
|
|
|
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
|
import org.apache.hadoop.hdfs.server.namenode.CacheManager;
|
|
import org.apache.hadoop.hdfs.server.namenode.CacheManager;
|
|
|
|
|
|
@@ -126,7 +123,6 @@ import org.apache.hadoop.util.ExitUtil;
|
|
import org.apache.hadoop.util.LightWeightGSet;
|
|
import org.apache.hadoop.util.LightWeightGSet;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
import org.apache.hadoop.util.Time;
|
|
import org.apache.hadoop.util.Time;
|
|
-import org.apache.hadoop.util.VersionInfo;
|
|
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.base.Preconditions;
|
|
import com.google.common.base.Preconditions;
|
|
@@ -312,11 +308,6 @@ public class BlockManager implements BlockStatsMXBean {
|
|
private int replQueueResetToHeadThreshold;
|
|
private int replQueueResetToHeadThreshold;
|
|
private int replQueueCallsSinceReset = 0;
|
|
private int replQueueCallsSinceReset = 0;
|
|
|
|
|
|
- /** How often to check and the limit for the storageinfo efficiency. */
|
|
|
|
- private final long storageInfoDefragmentInterval;
|
|
|
|
- private final long storageInfoDefragmentTimeout;
|
|
|
|
- private final double storageInfoDefragmentRatio;
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Mapping: Block -> { BlockCollection, datanodes, self ref }
|
|
* Mapping: Block -> { BlockCollection, datanodes, self ref }
|
|
* Updated only in response to client-sent information.
|
|
* Updated only in response to client-sent information.
|
|
@@ -331,10 +322,6 @@ public class BlockManager implements BlockStatsMXBean {
|
|
* {@link #redundancyThread} has run at least one full iteration.
|
|
* {@link #redundancyThread} has run at least one full iteration.
|
|
*/
|
|
*/
|
|
private final AtomicLong lastRedundancyCycleTS = new AtomicLong(-1);
|
|
private final AtomicLong lastRedundancyCycleTS = new AtomicLong(-1);
|
|
- /** StorageInfoDefragmenter thread. */
|
|
|
|
- private final Daemon storageInfoDefragmenterThread =
|
|
|
|
- new Daemon(new StorageInfoDefragmenter());
|
|
|
|
-
|
|
|
|
/** Block report thread for handling async reports. */
|
|
/** Block report thread for handling async reports. */
|
|
private final BlockReportProcessingThread blockReportThread;
|
|
private final BlockReportProcessingThread blockReportThread;
|
|
|
|
|
|
@@ -543,19 +530,6 @@ public class BlockManager implements BlockStatsMXBean {
|
|
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT,
|
|
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT,
|
|
TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
|
|
TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
|
|
|
|
|
|
- this.storageInfoDefragmentInterval =
|
|
|
|
- conf.getLong(
|
|
|
|
- DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_KEY,
|
|
|
|
- DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_DEFAULT);
|
|
|
|
- this.storageInfoDefragmentTimeout =
|
|
|
|
- conf.getLong(
|
|
|
|
- DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_KEY,
|
|
|
|
- DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_DEFAULT);
|
|
|
|
- this.storageInfoDefragmentRatio =
|
|
|
|
- conf.getDouble(
|
|
|
|
- DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_KEY,
|
|
|
|
- DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_DEFAULT);
|
|
|
|
-
|
|
|
|
this.encryptDataTransfer =
|
|
this.encryptDataTransfer =
|
|
conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,
|
|
conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,
|
|
DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
|
|
DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
|
|
@@ -740,8 +714,6 @@ public class BlockManager implements BlockStatsMXBean {
|
|
datanodeManager.activate(conf);
|
|
datanodeManager.activate(conf);
|
|
this.redundancyThread.setName("RedundancyMonitor");
|
|
this.redundancyThread.setName("RedundancyMonitor");
|
|
this.redundancyThread.start();
|
|
this.redundancyThread.start();
|
|
- storageInfoDefragmenterThread.setName("StorageInfoMonitor");
|
|
|
|
- storageInfoDefragmenterThread.start();
|
|
|
|
this.blockReportThread.start();
|
|
this.blockReportThread.start();
|
|
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
|
|
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
|
|
bmSafeMode.activate(blockTotal);
|
|
bmSafeMode.activate(blockTotal);
|
|
@@ -754,10 +726,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
bmSafeMode.close();
|
|
bmSafeMode.close();
|
|
try {
|
|
try {
|
|
redundancyThread.interrupt();
|
|
redundancyThread.interrupt();
|
|
- storageInfoDefragmenterThread.interrupt();
|
|
|
|
blockReportThread.interrupt();
|
|
blockReportThread.interrupt();
|
|
redundancyThread.join(3000);
|
|
redundancyThread.join(3000);
|
|
- storageInfoDefragmenterThread.join(3000);
|
|
|
|
blockReportThread.join(3000);
|
|
blockReportThread.join(3000);
|
|
} catch (InterruptedException ie) {
|
|
} catch (InterruptedException ie) {
|
|
}
|
|
}
|
|
@@ -1655,18 +1625,9 @@ public class BlockManager implements BlockStatsMXBean {
|
|
/** Remove the blocks associated to the given datanode. */
|
|
/** Remove the blocks associated to the given datanode. */
|
|
void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
|
|
void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
|
|
providedStorageMap.removeDatanode(node);
|
|
providedStorageMap.removeDatanode(node);
|
|
- for (DatanodeStorageInfo storage : node.getStorageInfos()) {
|
|
|
|
- final Iterator<BlockInfo> it = storage.getBlockIterator();
|
|
|
|
- //add the BlockInfos to a new collection as the
|
|
|
|
- //returned iterator is not modifiable.
|
|
|
|
- Collection<BlockInfo> toRemove = new ArrayList<>();
|
|
|
|
- while (it.hasNext()) {
|
|
|
|
- toRemove.add(it.next());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- for (BlockInfo b : toRemove) {
|
|
|
|
- removeStoredBlock(b, node);
|
|
|
|
- }
|
|
|
|
|
|
+ final Iterator<BlockInfo> it = node.getBlockIterator();
|
|
|
|
+ while(it.hasNext()) {
|
|
|
|
+ removeStoredBlock(it.next(), node);
|
|
}
|
|
}
|
|
// Remove all pending DN messages referencing this DN.
|
|
// Remove all pending DN messages referencing this DN.
|
|
pendingDNMessages.removeAllMessagesForDatanode(node);
|
|
pendingDNMessages.removeAllMessagesForDatanode(node);
|
|
@@ -1680,11 +1641,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
assert namesystem.hasWriteLock();
|
|
assert namesystem.hasWriteLock();
|
|
final Iterator<BlockInfo> it = storageInfo.getBlockIterator();
|
|
final Iterator<BlockInfo> it = storageInfo.getBlockIterator();
|
|
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
|
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
|
- Collection<BlockInfo> toRemove = new ArrayList<>();
|
|
|
|
- while (it.hasNext()) {
|
|
|
|
- toRemove.add(it.next());
|
|
|
|
- }
|
|
|
|
- for (BlockInfo block : toRemove) {
|
|
|
|
|
|
+ while(it.hasNext()) {
|
|
|
|
+ BlockInfo block = it.next();
|
|
removeStoredBlock(block, node);
|
|
removeStoredBlock(block, node);
|
|
final Block b = getBlockOnStorage(block, storageInfo);
|
|
final Block b = getBlockOnStorage(block, storageInfo);
|
|
if (b != null) {
|
|
if (b != null) {
|
|
@@ -1848,7 +1806,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
// stale storage due to failover or any other reason.
|
|
// stale storage due to failover or any other reason.
|
|
corruptReplicas.removeFromCorruptReplicasMap(b.getStored(), node);
|
|
corruptReplicas.removeFromCorruptReplicasMap(b.getStored(), node);
|
|
BlockInfoStriped blk = (BlockInfoStriped) getStoredBlock(b.getStored());
|
|
BlockInfoStriped blk = (BlockInfoStriped) getStoredBlock(b.getStored());
|
|
- blk.removeStorage(storageInfo);
|
|
|
|
|
|
+ storageInfo.removeBlock(blk);
|
|
}
|
|
}
|
|
// the block is over-replicated so invalidate the replicas immediately
|
|
// the block is over-replicated so invalidate the replicas immediately
|
|
invalidateBlock(b, node, numberOfReplicas);
|
|
invalidateBlock(b, node, numberOfReplicas);
|
|
@@ -2744,7 +2702,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
// Block reports for provided storage are not
|
|
// Block reports for provided storage are not
|
|
// maintained by DN heartbeats
|
|
// maintained by DN heartbeats
|
|
if (!StorageType.PROVIDED.equals(storageInfo.getStorageType())) {
|
|
if (!StorageType.PROVIDED.equals(storageInfo.getStorageType())) {
|
|
- invalidatedBlocks = processReport(storageInfo, newReport, context);
|
|
|
|
|
|
+ invalidatedBlocks = processReport(storageInfo, newReport);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
storageInfo.receivedBlockReport();
|
|
storageInfo.receivedBlockReport();
|
|
@@ -2838,47 +2796,18 @@ public class BlockManager implements BlockStatsMXBean {
|
|
|
|
|
|
Collection<Block> processReport(
|
|
Collection<Block> processReport(
|
|
final DatanodeStorageInfo storageInfo,
|
|
final DatanodeStorageInfo storageInfo,
|
|
- final BlockListAsLongs report,
|
|
|
|
- BlockReportContext context) throws IOException {
|
|
|
|
|
|
+ final BlockListAsLongs report) throws IOException {
|
|
// Normal case:
|
|
// Normal case:
|
|
// Modify the (block-->datanode) map, according to the difference
|
|
// Modify the (block-->datanode) map, according to the difference
|
|
// between the old and new block report.
|
|
// between the old and new block report.
|
|
//
|
|
//
|
|
- Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
|
|
|
|
- Collection<BlockInfo> toRemove = new TreeSet<>();
|
|
|
|
- Collection<Block> toInvalidate = new LinkedList<>();
|
|
|
|
- Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<>();
|
|
|
|
- Collection<StatefulBlockInfo> toUC = new LinkedList<>();
|
|
|
|
-
|
|
|
|
- boolean sorted = false;
|
|
|
|
- String strBlockReportId = "";
|
|
|
|
- if (context != null) {
|
|
|
|
- sorted = context.isSorted();
|
|
|
|
- strBlockReportId = Long.toHexString(context.getReportId());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- Iterable<BlockReportReplica> sortedReport;
|
|
|
|
- if (!sorted) {
|
|
|
|
- blockLog.warn("BLOCK* processReport 0x{}: Report from the DataNode ({}) "
|
|
|
|
- + "is unsorted. This will cause overhead on the NameNode "
|
|
|
|
- + "which needs to sort the Full BR. Please update the "
|
|
|
|
- + "DataNode to the same version of Hadoop HDFS as the "
|
|
|
|
- + "NameNode ({}).",
|
|
|
|
- strBlockReportId,
|
|
|
|
- storageInfo.getDatanodeDescriptor().getDatanodeUuid(),
|
|
|
|
- VersionInfo.getVersion());
|
|
|
|
- Set<BlockReportReplica> set = new FoldedTreeSet<>();
|
|
|
|
- for (BlockReportReplica iblk : report) {
|
|
|
|
- set.add(new BlockReportReplica(iblk));
|
|
|
|
- }
|
|
|
|
- sortedReport = set;
|
|
|
|
- } else {
|
|
|
|
- sortedReport = report;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- reportDiffSorted(storageInfo, sortedReport,
|
|
|
|
- toAdd, toRemove, toInvalidate, toCorrupt, toUC);
|
|
|
|
-
|
|
|
|
|
|
+ Collection<BlockInfoToAdd> toAdd = new ArrayList<>();
|
|
|
|
+ Collection<BlockInfo> toRemove = new HashSet<>();
|
|
|
|
+ Collection<Block> toInvalidate = new ArrayList<>();
|
|
|
|
+ Collection<BlockToMarkCorrupt> toCorrupt = new ArrayList<>();
|
|
|
|
+ Collection<StatefulBlockInfo> toUC = new ArrayList<>();
|
|
|
|
+ reportDiff(storageInfo, report,
|
|
|
|
+ toAdd, toRemove, toInvalidate, toCorrupt, toUC);
|
|
|
|
|
|
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
|
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
|
// Process the blocks on each queue
|
|
// Process the blocks on each queue
|
|
@@ -2895,8 +2824,8 @@ public class BlockManager implements BlockStatsMXBean {
|
|
numBlocksLogged++;
|
|
numBlocksLogged++;
|
|
}
|
|
}
|
|
if (numBlocksLogged > maxNumBlocksToLog) {
|
|
if (numBlocksLogged > maxNumBlocksToLog) {
|
|
- blockLog.info("BLOCK* processReport 0x{}: logged info for {} of {} " +
|
|
|
|
- "reported.", strBlockReportId, maxNumBlocksToLog, numBlocksLogged);
|
|
|
|
|
|
+ blockLog.info("BLOCK* processReport: logged info for {} of {} " +
|
|
|
|
+ "reported.", maxNumBlocksToLog, numBlocksLogged);
|
|
}
|
|
}
|
|
for (Block b : toInvalidate) {
|
|
for (Block b : toInvalidate) {
|
|
addToInvalidates(b, node);
|
|
addToInvalidates(b, node);
|
|
@@ -3028,106 +2957,127 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private void reportDiffSorted(DatanodeStorageInfo storageInfo,
|
|
|
|
- Iterable<BlockReportReplica> newReport,
|
|
|
|
|
|
+ private void reportDiff(DatanodeStorageInfo storageInfo,
|
|
|
|
+ BlockListAsLongs newReport,
|
|
Collection<BlockInfoToAdd> toAdd, // add to DatanodeDescriptor
|
|
Collection<BlockInfoToAdd> toAdd, // add to DatanodeDescriptor
|
|
Collection<BlockInfo> toRemove, // remove from DatanodeDescriptor
|
|
Collection<BlockInfo> toRemove, // remove from DatanodeDescriptor
|
|
Collection<Block> toInvalidate, // should be removed from DN
|
|
Collection<Block> toInvalidate, // should be removed from DN
|
|
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
|
|
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
|
|
Collection<StatefulBlockInfo> toUC) { // add to under-construction list
|
|
Collection<StatefulBlockInfo> toUC) { // add to under-construction list
|
|
|
|
|
|
- // The blocks must be sorted and the storagenodes blocks must be sorted
|
|
|
|
- Iterator<BlockInfo> storageBlocksIterator = storageInfo.getBlockIterator();
|
|
|
|
|
|
+ // place a delimiter in the list which separates blocks
|
|
|
|
+ // that have been reported from those that have not
|
|
DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
|
|
DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
|
|
- BlockInfo storageBlock = null;
|
|
|
|
-
|
|
|
|
- for (BlockReportReplica replica : newReport) {
|
|
|
|
-
|
|
|
|
- long replicaID = replica.getBlockId();
|
|
|
|
- if (BlockIdManager.isStripedBlockID(replicaID)
|
|
|
|
- && (!hasNonEcBlockUsingStripedID ||
|
|
|
|
- !blocksMap.containsBlock(replica))) {
|
|
|
|
- replicaID = BlockIdManager.convertToStripedID(replicaID);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- ReplicaState reportedState = replica.getState();
|
|
|
|
-
|
|
|
|
- LOG.debug("Reported block {} on {} size {} replicaState = {}",
|
|
|
|
- replica, dn, replica.getNumBytes(), reportedState);
|
|
|
|
-
|
|
|
|
- if (shouldPostponeBlocksFromFuture
|
|
|
|
- && isGenStampInFuture(replica)) {
|
|
|
|
- queueReportedBlock(storageInfo, replica, reportedState,
|
|
|
|
- QUEUE_REASON_FUTURE_GENSTAMP);
|
|
|
|
- continue;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (storageBlock == null && storageBlocksIterator.hasNext()) {
|
|
|
|
- storageBlock = storageBlocksIterator.next();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- do {
|
|
|
|
- int cmp;
|
|
|
|
- if (storageBlock == null ||
|
|
|
|
- (cmp = Long.compare(replicaID, storageBlock.getBlockId())) < 0) {
|
|
|
|
- // Check if block is available in NN but not yet on this storage
|
|
|
|
- BlockInfo nnBlock = blocksMap.getStoredBlock(new Block(replicaID));
|
|
|
|
- if (nnBlock != null) {
|
|
|
|
- reportDiffSortedInner(storageInfo, replica, reportedState,
|
|
|
|
- nnBlock, toAdd, toCorrupt, toUC);
|
|
|
|
- } else {
|
|
|
|
- // Replica not found anywhere so it should be invalidated
|
|
|
|
- toInvalidate.add(new Block(replica));
|
|
|
|
- }
|
|
|
|
- break;
|
|
|
|
- } else if (cmp == 0) {
|
|
|
|
- // Replica matched current storageblock
|
|
|
|
- reportDiffSortedInner(storageInfo, replica, reportedState,
|
|
|
|
- storageBlock, toAdd, toCorrupt, toUC);
|
|
|
|
- storageBlock = null;
|
|
|
|
- } else {
|
|
|
|
- // replica has higher ID than storedBlock
|
|
|
|
- // Remove all stored blocks with IDs lower than replica
|
|
|
|
- do {
|
|
|
|
- toRemove.add(storageBlock);
|
|
|
|
- storageBlock = storageBlocksIterator.hasNext()
|
|
|
|
- ? storageBlocksIterator.next() : null;
|
|
|
|
- } while (storageBlock != null &&
|
|
|
|
- Long.compare(replicaID, storageBlock.getBlockId()) > 0);
|
|
|
|
|
|
+ Block delimiterBlock = new Block();
|
|
|
|
+ BlockInfo delimiter = new BlockInfoContiguous(delimiterBlock,
|
|
|
|
+ (short) 1);
|
|
|
|
+ AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock);
|
|
|
|
+ assert result == AddBlockResult.ADDED
|
|
|
|
+ : "Delimiting block cannot be present in the node";
|
|
|
|
+ int headIndex = 0; //currently the delimiter is in the head of the list
|
|
|
|
+ int curIndex;
|
|
|
|
+
|
|
|
|
+ if (newReport == null) {
|
|
|
|
+ newReport = BlockListAsLongs.EMPTY;
|
|
|
|
+ }
|
|
|
|
+ // scan the report and process newly reported blocks
|
|
|
|
+ for (BlockReportReplica iblk : newReport) {
|
|
|
|
+ ReplicaState iState = iblk.getState();
|
|
|
|
+ LOG.debug("Reported block {} on {} size {} replicaState = {}", iblk, dn,
|
|
|
|
+ iblk.getNumBytes(), iState);
|
|
|
|
+ BlockInfo storedBlock = processReportedBlock(storageInfo,
|
|
|
|
+ iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
|
|
|
|
+
|
|
|
|
+ // move block to the head of the list
|
|
|
|
+ if (storedBlock != null) {
|
|
|
|
+ curIndex = storedBlock.findStorageInfo(storageInfo);
|
|
|
|
+ if (curIndex >= 0) {
|
|
|
|
+ headIndex =
|
|
|
|
+ storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex);
|
|
}
|
|
}
|
|
- } while (storageBlock != null);
|
|
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
- // Iterate any remaining blocks that have not been reported and remove them
|
|
|
|
- while (storageBlocksIterator.hasNext()) {
|
|
|
|
- toRemove.add(storageBlocksIterator.next());
|
|
|
|
|
|
+ // collect blocks that have not been reported
|
|
|
|
+ // all of them are next to the delimiter
|
|
|
|
+ Iterator<BlockInfo> it =
|
|
|
|
+ storageInfo.new BlockIterator(delimiter.getNext(0));
|
|
|
|
+ while (it.hasNext()) {
|
|
|
|
+ toRemove.add(it.next());
|
|
}
|
|
}
|
|
|
|
+ storageInfo.removeBlock(delimiter);
|
|
}
|
|
}
|
|
|
|
|
|
- private void reportDiffSortedInner(
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Process a block replica reported by the data-node.
|
|
|
|
+ * No side effects except adding to the passed-in Collections.
|
|
|
|
+ *
|
|
|
|
+ * <ol>
|
|
|
|
+ * <li>If the block is not known to the system (not in blocksMap) then the
|
|
|
|
+ * data-node should be notified to invalidate this block.</li>
|
|
|
|
+ * <li>If the reported replica is valid that is has the same generation stamp
|
|
|
|
+ * and length as recorded on the name-node, then the replica location should
|
|
|
|
+ * be added to the name-node.</li>
|
|
|
|
+ * <li>If the reported replica is not valid, then it is marked as corrupt,
|
|
|
|
+ * which triggers replication of the existing valid replicas.
|
|
|
|
+ * Corrupt replicas are removed from the system when the block
|
|
|
|
+ * is fully replicated.</li>
|
|
|
|
+ * <li>If the reported replica is for a block currently marked "under
|
|
|
|
+ * construction" in the NN, then it should be added to the
|
|
|
|
+ * BlockUnderConstructionFeature's list of replicas.</li>
|
|
|
|
+ * </ol>
|
|
|
|
+ *
|
|
|
|
+ * @param storageInfo DatanodeStorageInfo that sent the report.
|
|
|
|
+ * @param block reported block replica
|
|
|
|
+ * @param reportedState reported replica state
|
|
|
|
+ * @param toAdd add to DatanodeDescriptor
|
|
|
|
+ * @param toInvalidate missing blocks (not in the blocks map)
|
|
|
|
+ * should be removed from the data-node
|
|
|
|
+ * @param toCorrupt replicas with unexpected length or generation stamp;
|
|
|
|
+ * add to corrupt replicas
|
|
|
|
+ * @param toUC replicas of blocks currently under construction
|
|
|
|
+ * @return the up-to-date stored block, if it should be kept.
|
|
|
|
+ * Otherwise, null.
|
|
|
|
+ */
|
|
|
|
+ private BlockInfo processReportedBlock(
|
|
final DatanodeStorageInfo storageInfo,
|
|
final DatanodeStorageInfo storageInfo,
|
|
- final BlockReportReplica replica, final ReplicaState reportedState,
|
|
|
|
- final BlockInfo storedBlock,
|
|
|
|
|
|
+ final Block block, final ReplicaState reportedState,
|
|
final Collection<BlockInfoToAdd> toAdd,
|
|
final Collection<BlockInfoToAdd> toAdd,
|
|
|
|
+ final Collection<Block> toInvalidate,
|
|
final Collection<BlockToMarkCorrupt> toCorrupt,
|
|
final Collection<BlockToMarkCorrupt> toCorrupt,
|
|
final Collection<StatefulBlockInfo> toUC) {
|
|
final Collection<StatefulBlockInfo> toUC) {
|
|
|
|
|
|
- assert replica != null;
|
|
|
|
- assert storedBlock != null;
|
|
|
|
-
|
|
|
|
DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
|
|
DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
|
|
|
|
+
|
|
|
|
+ LOG.debug("Reported block {} on {} size {} replicaState = {}", block, dn,
|
|
|
|
+ block.getNumBytes(), reportedState);
|
|
|
|
+
|
|
|
|
+ if (shouldPostponeBlocksFromFuture && isGenStampInFuture(block)) {
|
|
|
|
+ queueReportedBlock(storageInfo, block, reportedState,
|
|
|
|
+ QUEUE_REASON_FUTURE_GENSTAMP);
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // find block by blockId
|
|
|
|
+ BlockInfo storedBlock = getStoredBlock(block);
|
|
|
|
+ if(storedBlock == null) {
|
|
|
|
+ // If blocksMap does not contain reported block id,
|
|
|
|
+ // the replica should be removed from the data-node.
|
|
|
|
+ toInvalidate.add(new Block(block));
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
BlockUCState ucState = storedBlock.getBlockUCState();
|
|
BlockUCState ucState = storedBlock.getBlockUCState();
|
|
|
|
|
|
// Block is on the NN
|
|
// Block is on the NN
|
|
LOG.debug("In memory blockUCState = {}", ucState);
|
|
LOG.debug("In memory blockUCState = {}", ucState);
|
|
|
|
|
|
// Ignore replicas already scheduled to be removed from the DN
|
|
// Ignore replicas already scheduled to be removed from the DN
|
|
- if (invalidateBlocks.contains(dn, replica)) {
|
|
|
|
- return;
|
|
|
|
|
|
+ if(invalidateBlocks.contains(dn, block)) {
|
|
|
|
+ return storedBlock;
|
|
}
|
|
}
|
|
|
|
|
|
- BlockToMarkCorrupt c = checkReplicaCorrupt(replica, reportedState,
|
|
|
|
- storedBlock, ucState, dn);
|
|
|
|
|
|
+ BlockToMarkCorrupt c = checkReplicaCorrupt(
|
|
|
|
+ block, reportedState, storedBlock, ucState, dn);
|
|
if (c != null) {
|
|
if (c != null) {
|
|
if (shouldPostponeBlocksFromFuture) {
|
|
if (shouldPostponeBlocksFromFuture) {
|
|
// If the block is an out-of-date generation stamp or state,
|
|
// If the block is an out-of-date generation stamp or state,
|
|
@@ -3137,21 +3087,28 @@ public class BlockManager implements BlockStatsMXBean {
|
|
// comes from the IBR / FBR and hence what we should use to compare
|
|
// comes from the IBR / FBR and hence what we should use to compare
|
|
// against the memory state.
|
|
// against the memory state.
|
|
// See HDFS-6289 and HDFS-15422 for more context.
|
|
// See HDFS-6289 and HDFS-15422 for more context.
|
|
- queueReportedBlock(storageInfo, replica, reportedState,
|
|
|
|
|
|
+ queueReportedBlock(storageInfo, block, reportedState,
|
|
QUEUE_REASON_CORRUPT_STATE);
|
|
QUEUE_REASON_CORRUPT_STATE);
|
|
} else {
|
|
} else {
|
|
toCorrupt.add(c);
|
|
toCorrupt.add(c);
|
|
}
|
|
}
|
|
- } else if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
|
|
|
|
- toUC.add(new StatefulBlockInfo(storedBlock, new Block(replica),
|
|
|
|
- reportedState));
|
|
|
|
- } else if (reportedState == ReplicaState.FINALIZED &&
|
|
|
|
- (storedBlock.findStorageInfo(storageInfo) == -1 ||
|
|
|
|
- corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
|
|
|
|
- // Add replica if appropriate. If the replica was previously corrupt
|
|
|
|
- // but now okay, it might need to be updated.
|
|
|
|
- toAdd.add(new BlockInfoToAdd(storedBlock, new Block(replica)));
|
|
|
|
|
|
+ return storedBlock;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
|
|
|
|
+ toUC.add(new StatefulBlockInfo(storedBlock,
|
|
|
|
+ new Block(block), reportedState));
|
|
|
|
+ return storedBlock;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Add replica if appropriate. If the replica was previously corrupt
|
|
|
|
+ // but now okay, it might need to be updated.
|
|
|
|
+ if (reportedState == ReplicaState.FINALIZED
|
|
|
|
+ && (storedBlock.findStorageInfo(storageInfo) == -1 ||
|
|
|
|
+ corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
|
|
|
|
+ toAdd.add(new BlockInfoToAdd(storedBlock, new Block(block)));
|
|
|
|
+ }
|
|
|
|
+ return storedBlock;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -3394,7 +3351,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
|
|
|
|
// just add it
|
|
// just add it
|
|
- AddBlockResult result = storageInfo.addBlockInitial(storedBlock, reported);
|
|
|
|
|
|
+ AddBlockResult result = storageInfo.addBlock(storedBlock, reported);
|
|
|
|
|
|
// Now check for completion of blocks and safe block count
|
|
// Now check for completion of blocks and safe block count
|
|
int numCurrentReplica = countLiveNodes(storedBlock);
|
|
int numCurrentReplica = countLiveNodes(storedBlock);
|
|
@@ -4120,6 +4077,12 @@ public class BlockManager implements BlockStatsMXBean {
|
|
DatanodeStorageInfo storageInfo, Block block,
|
|
DatanodeStorageInfo storageInfo, Block block,
|
|
ReplicaState reportedState, DatanodeDescriptor delHintNode)
|
|
ReplicaState reportedState, DatanodeDescriptor delHintNode)
|
|
throws IOException {
|
|
throws IOException {
|
|
|
|
+ // blockReceived reports a finalized block
|
|
|
|
+ Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
|
|
|
|
+ Collection<Block> toInvalidate = new LinkedList<>();
|
|
|
|
+ Collection<BlockToMarkCorrupt> toCorrupt =
|
|
|
|
+ new LinkedList<>();
|
|
|
|
+ Collection<StatefulBlockInfo> toUC = new LinkedList<>();
|
|
|
|
|
|
final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
|
final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
|
|
|
|
|
@@ -4133,58 +4096,33 @@ public class BlockManager implements BlockStatsMXBean {
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
|
|
- // find block by blockId
|
|
|
|
- BlockInfo storedBlock = getStoredBlock(block);
|
|
|
|
- if(storedBlock == null) {
|
|
|
|
- // If blocksMap does not contain reported block id,
|
|
|
|
- // the replica should be removed from the data-node.
|
|
|
|
- blockLog.debug("BLOCK* addBlock: block {} on node {} size {} does not " +
|
|
|
|
- "belong to any file", block, node, block.getNumBytes());
|
|
|
|
- addToInvalidates(new Block(block), node);
|
|
|
|
- return true;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- BlockUCState ucState = storedBlock.getBlockUCState();
|
|
|
|
- // Block is on the NN
|
|
|
|
- LOG.debug("In memory blockUCState = {}", ucState);
|
|
|
|
|
|
+ processReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate,
|
|
|
|
+ toCorrupt, toUC);
|
|
|
|
+ // the block is only in one of the to-do lists
|
|
|
|
+ // if it is in none then data-node already has it
|
|
|
|
+ assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt
|
|
|
|
+ .size() <= 1 : "The block should be only in one of the lists.";
|
|
|
|
|
|
- // Ignore replicas already scheduled to be removed from the DN
|
|
|
|
- if(invalidateBlocks.contains(node, block)) {
|
|
|
|
- return true;
|
|
|
|
|
|
+ for (StatefulBlockInfo b : toUC) {
|
|
|
|
+ addStoredBlockUnderConstruction(b, storageInfo);
|
|
}
|
|
}
|
|
-
|
|
|
|
- BlockToMarkCorrupt c = checkReplicaCorrupt(
|
|
|
|
- block, reportedState, storedBlock, ucState, node);
|
|
|
|
- if (c != null) {
|
|
|
|
- if (shouldPostponeBlocksFromFuture) {
|
|
|
|
- // If the block is an out-of-date generation stamp or state,
|
|
|
|
- // but we're the standby, we shouldn't treat it as corrupt,
|
|
|
|
- // but instead just queue it for later processing.
|
|
|
|
- // Storing the reported block for later processing, as that is what
|
|
|
|
- // comes from the IBR / FBR and hence what we should use to compare
|
|
|
|
- // against the memory state.
|
|
|
|
- // See HDFS-6289 and HDFS-15422 for more context.
|
|
|
|
- queueReportedBlock(storageInfo, block, reportedState,
|
|
|
|
- QUEUE_REASON_CORRUPT_STATE);
|
|
|
|
- } else {
|
|
|
|
- markBlockAsCorrupt(c, storageInfo, node);
|
|
|
|
- }
|
|
|
|
- return true;
|
|
|
|
|
|
+ long numBlocksLogged = 0;
|
|
|
|
+ for (BlockInfoToAdd b : toAdd) {
|
|
|
|
+ addStoredBlock(b.stored, b.reported, storageInfo, delHintNode,
|
|
|
|
+ numBlocksLogged < maxNumBlocksToLog);
|
|
|
|
+ numBlocksLogged++;
|
|
}
|
|
}
|
|
-
|
|
|
|
- if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
|
|
|
|
- addStoredBlockUnderConstruction(
|
|
|
|
- new StatefulBlockInfo(storedBlock, new Block(block), reportedState),
|
|
|
|
- storageInfo);
|
|
|
|
- return true;
|
|
|
|
|
|
+ if (numBlocksLogged > maxNumBlocksToLog) {
|
|
|
|
+ blockLog.debug("BLOCK* addBlock: logged info for {} of {} reported.",
|
|
|
|
+ maxNumBlocksToLog, numBlocksLogged);
|
|
}
|
|
}
|
|
-
|
|
|
|
- // Add replica if appropriate. If the replica was previously corrupt
|
|
|
|
- // but now okay, it might need to be updated.
|
|
|
|
- if (reportedState == ReplicaState.FINALIZED
|
|
|
|
- && (storedBlock.findStorageInfo(storageInfo) == -1 ||
|
|
|
|
- corruptReplicas.isReplicaCorrupt(storedBlock, node))) {
|
|
|
|
- addStoredBlock(storedBlock, block, storageInfo, delHintNode, true);
|
|
|
|
|
|
+ for (Block b : toInvalidate) {
|
|
|
|
+ blockLog.debug("BLOCK* addBlock: block {} on node {} size {} does not " +
|
|
|
|
+ "belong to any file", b, node, b.getNumBytes());
|
|
|
|
+ addToInvalidates(b, node);
|
|
|
|
+ }
|
|
|
|
+ for (BlockToMarkCorrupt b : toCorrupt) {
|
|
|
|
+ markBlockAsCorrupt(b, storageInfo, node);
|
|
}
|
|
}
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
@@ -4884,91 +4822,6 @@ public class BlockManager implements BlockStatsMXBean {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * Runnable that monitors the fragmentation of the StorageInfo TreeSet and
|
|
|
|
- * compacts it when it falls under a certain threshold.
|
|
|
|
- */
|
|
|
|
- private class StorageInfoDefragmenter implements Runnable {
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void run() {
|
|
|
|
- while (namesystem.isRunning()) {
|
|
|
|
- try {
|
|
|
|
- // Check storage efficiency only when active NN is out of safe mode.
|
|
|
|
- if (isPopulatingReplQueues()) {
|
|
|
|
- scanAndCompactStorages();
|
|
|
|
- }
|
|
|
|
- Thread.sleep(storageInfoDefragmentInterval);
|
|
|
|
- } catch (Throwable t) {
|
|
|
|
- if (!namesystem.isRunning()) {
|
|
|
|
- LOG.info("Stopping thread.");
|
|
|
|
- if (!(t instanceof InterruptedException)) {
|
|
|
|
- LOG.info("Received an exception while shutting down.", t);
|
|
|
|
- }
|
|
|
|
- break;
|
|
|
|
- } else if (!checkNSRunning && t instanceof InterruptedException) {
|
|
|
|
- LOG.info("Stopping for testing.");
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- LOG.error("Thread received Runtime exception.", t);
|
|
|
|
- terminate(1, t);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private void scanAndCompactStorages() throws InterruptedException {
|
|
|
|
- ArrayList<String> datanodesAndStorages = new ArrayList<>();
|
|
|
|
- for (DatanodeDescriptor node
|
|
|
|
- : datanodeManager.getDatanodeListForReport(DatanodeReportType.ALL)) {
|
|
|
|
- for (DatanodeStorageInfo storage : node.getStorageInfos()) {
|
|
|
|
- try {
|
|
|
|
- namesystem.readLock();
|
|
|
|
- double ratio = storage.treeSetFillRatio();
|
|
|
|
- if (ratio < storageInfoDefragmentRatio) {
|
|
|
|
- datanodesAndStorages.add(node.getDatanodeUuid());
|
|
|
|
- datanodesAndStorages.add(storage.getStorageID());
|
|
|
|
- }
|
|
|
|
- LOG.debug("StorageInfo TreeSet fill ratio {} : {}{}",
|
|
|
|
- storage.getStorageID(), ratio,
|
|
|
|
- (ratio < storageInfoDefragmentRatio)
|
|
|
|
- ? " (queued for defragmentation)" : "");
|
|
|
|
- } finally {
|
|
|
|
- namesystem.readUnlock();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- if (!datanodesAndStorages.isEmpty()) {
|
|
|
|
- for (int i = 0; i < datanodesAndStorages.size(); i += 2) {
|
|
|
|
- namesystem.writeLock();
|
|
|
|
- try {
|
|
|
|
- final DatanodeDescriptor dn = datanodeManager.
|
|
|
|
- getDatanode(datanodesAndStorages.get(i));
|
|
|
|
- if (dn == null) {
|
|
|
|
- continue;
|
|
|
|
- }
|
|
|
|
- final DatanodeStorageInfo storage = dn.
|
|
|
|
- getStorageInfo(datanodesAndStorages.get(i + 1));
|
|
|
|
- if (storage != null) {
|
|
|
|
- boolean aborted =
|
|
|
|
- !storage.treeSetCompact(storageInfoDefragmentTimeout);
|
|
|
|
- if (aborted) {
|
|
|
|
- // Compaction timed out, reset iterator to continue with
|
|
|
|
- // the same storage next iteration.
|
|
|
|
- i -= 2;
|
|
|
|
- }
|
|
|
|
- LOG.info("StorageInfo TreeSet defragmented {} : {}{}",
|
|
|
|
- storage.getStorageID(), storage.treeSetFillRatio(),
|
|
|
|
- aborted ? " (aborted)" : "");
|
|
|
|
- }
|
|
|
|
- } finally {
|
|
|
|
- namesystem.writeUnlock();
|
|
|
|
- }
|
|
|
|
- // Wait between each iteration
|
|
|
|
- Thread.sleep(1000);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
* Compute block replication and block invalidation work that can be scheduled
|
|
* Compute block replication and block invalidation work that can be scheduled
|