|
@@ -1082,6 +1082,7 @@ public class BlockManager {
|
|
* Mark the block belonging to datanode as corrupt
|
|
* Mark the block belonging to datanode as corrupt
|
|
* @param blk Block to be marked as corrupt
|
|
* @param blk Block to be marked as corrupt
|
|
* @param dn Datanode which holds the corrupt replica
|
|
* @param dn Datanode which holds the corrupt replica
|
|
|
|
+ * @param storageID if known, null otherwise.
|
|
* @param reason a textual reason why the block should be marked corrupt,
|
|
* @param reason a textual reason why the block should be marked corrupt,
|
|
* for logging purposes
|
|
* for logging purposes
|
|
*/
|
|
*/
|
|
@@ -1098,19 +1099,29 @@ public class BlockManager {
|
|
+ blk + " not found");
|
|
+ blk + " not found");
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
- markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
|
|
|
|
- blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
|
|
|
|
- dn, storageID);
|
|
|
|
- }
|
|
|
|
|
|
|
|
- private void markBlockAsCorrupt(BlockToMarkCorrupt b,
|
|
|
|
- DatanodeInfo dn, String storageID) throws IOException {
|
|
|
|
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
|
|
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
|
|
if (node == null) {
|
|
if (node == null) {
|
|
- throw new IOException("Cannot mark " + b
|
|
|
|
|
|
+ throw new IOException("Cannot mark " + blk
|
|
+ " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid()
|
|
+ " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid()
|
|
+ ") does not exist");
|
|
+ ") does not exist");
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
|
|
|
|
+ blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
|
|
|
|
+ storageID == null ? null : node.getStorageInfo(storageID),
|
|
|
|
+ node);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ *
|
|
|
|
+ * @param b
|
|
|
|
+ * @param storageInfo storage that contains the block, if known. null otherwise.
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
|
|
+ private void markBlockAsCorrupt(BlockToMarkCorrupt b,
|
|
|
|
+ DatanodeStorageInfo storageInfo,
|
|
|
|
+ DatanodeDescriptor node) throws IOException {
|
|
|
|
|
|
BlockCollection bc = b.corrupted.getBlockCollection();
|
|
BlockCollection bc = b.corrupted.getBlockCollection();
|
|
if (bc == null) {
|
|
if (bc == null) {
|
|
@@ -1121,7 +1132,9 @@ public class BlockManager {
|
|
}
|
|
}
|
|
|
|
|
|
// Add replica to the data-node if it is not already there
|
|
// Add replica to the data-node if it is not already there
|
|
- node.addBlock(storageID, b.stored);
|
|
|
|
|
|
+ if (storageInfo != null) {
|
|
|
|
+ storageInfo.addBlock(b.stored);
|
|
|
|
+ }
|
|
|
|
|
|
// Add this replica to corruptReplicas Map
|
|
// Add this replica to corruptReplicas Map
|
|
corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
|
|
corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
|
|
@@ -1460,7 +1473,7 @@ public class BlockManager {
|
|
* @throws IOException
|
|
* @throws IOException
|
|
* if the number of targets < minimum replication.
|
|
* if the number of targets < minimum replication.
|
|
* @see BlockPlacementPolicy#chooseTarget(String, int, Node,
|
|
* @see BlockPlacementPolicy#chooseTarget(String, int, Node,
|
|
- * List, boolean, Set, long)
|
|
|
|
|
|
+ * List, boolean, Set, long, StorageType)
|
|
*/
|
|
*/
|
|
public DatanodeStorageInfo[] chooseTarget(final String src,
|
|
public DatanodeStorageInfo[] chooseTarget(final String src,
|
|
final int numOfReplicas, final DatanodeDescriptor client,
|
|
final int numOfReplicas, final DatanodeDescriptor client,
|
|
@@ -1697,7 +1710,7 @@ public class BlockManager {
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
public boolean processReport(final DatanodeID nodeID,
|
|
public boolean processReport(final DatanodeID nodeID,
|
|
- final DatanodeStorage storage, final String poolId,
|
|
|
|
|
|
+ final DatanodeStorage storage,
|
|
final BlockListAsLongs newReport) throws IOException {
|
|
final BlockListAsLongs newReport) throws IOException {
|
|
namesystem.writeLock();
|
|
namesystem.writeLock();
|
|
final long startTime = Time.now(); //after acquiring write lock
|
|
final long startTime = Time.now(); //after acquiring write lock
|
|
@@ -1729,9 +1742,9 @@ public class BlockManager {
|
|
if (storageInfo.numBlocks() == 0) {
|
|
if (storageInfo.numBlocks() == 0) {
|
|
// The first block report can be processed a lot more efficiently than
|
|
// The first block report can be processed a lot more efficiently than
|
|
// ordinary block reports. This shortens restart times.
|
|
// ordinary block reports. This shortens restart times.
|
|
- processFirstBlockReport(node, storage.getStorageID(), newReport);
|
|
|
|
|
|
+ processFirstBlockReport(storageInfo, newReport);
|
|
} else {
|
|
} else {
|
|
- processReport(node, storage, newReport);
|
|
|
|
|
|
+ processReport(storageInfo, newReport);
|
|
}
|
|
}
|
|
|
|
|
|
// Now that we have an up-to-date block report, we know that any
|
|
// Now that we have an up-to-date block report, we know that any
|
|
@@ -1793,9 +1806,8 @@ public class BlockManager {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private void processReport(final DatanodeDescriptor node,
|
|
|
|
- final DatanodeStorage storage,
|
|
|
|
- final BlockListAsLongs report) throws IOException {
|
|
|
|
|
|
+ private void processReport(final DatanodeStorageInfo storageInfo,
|
|
|
|
+ final BlockListAsLongs report) throws IOException {
|
|
// Normal case:
|
|
// Normal case:
|
|
// Modify the (block-->datanode) map, according to the difference
|
|
// Modify the (block-->datanode) map, according to the difference
|
|
// between the old and new block report.
|
|
// between the old and new block report.
|
|
@@ -1805,19 +1817,20 @@ public class BlockManager {
|
|
Collection<Block> toInvalidate = new LinkedList<Block>();
|
|
Collection<Block> toInvalidate = new LinkedList<Block>();
|
|
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
|
|
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
|
|
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
|
|
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
|
|
- reportDiff(node, storage, report,
|
|
|
|
|
|
+ reportDiff(storageInfo, report,
|
|
toAdd, toRemove, toInvalidate, toCorrupt, toUC);
|
|
toAdd, toRemove, toInvalidate, toCorrupt, toUC);
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
|
// Process the blocks on each queue
|
|
// Process the blocks on each queue
|
|
for (StatefulBlockInfo b : toUC) {
|
|
for (StatefulBlockInfo b : toUC) {
|
|
- addStoredBlockUnderConstruction(b, node, storage.getStorageID());
|
|
|
|
|
|
+ addStoredBlockUnderConstruction(b, storageInfo);
|
|
}
|
|
}
|
|
for (Block b : toRemove) {
|
|
for (Block b : toRemove) {
|
|
removeStoredBlock(b, node);
|
|
removeStoredBlock(b, node);
|
|
}
|
|
}
|
|
int numBlocksLogged = 0;
|
|
int numBlocksLogged = 0;
|
|
for (BlockInfo b : toAdd) {
|
|
for (BlockInfo b : toAdd) {
|
|
- addStoredBlock(b, node, storage.getStorageID(), null, numBlocksLogged < maxNumBlocksToLog);
|
|
|
|
|
|
+ addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog);
|
|
numBlocksLogged++;
|
|
numBlocksLogged++;
|
|
}
|
|
}
|
|
if (numBlocksLogged > maxNumBlocksToLog) {
|
|
if (numBlocksLogged > maxNumBlocksToLog) {
|
|
@@ -1831,7 +1844,7 @@ public class BlockManager {
|
|
addToInvalidates(b, node);
|
|
addToInvalidates(b, node);
|
|
}
|
|
}
|
|
for (BlockToMarkCorrupt b : toCorrupt) {
|
|
for (BlockToMarkCorrupt b : toCorrupt) {
|
|
- markBlockAsCorrupt(b, node, storage.getStorageID());
|
|
|
|
|
|
+ markBlockAsCorrupt(b, storageInfo, node);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1842,16 +1855,16 @@ public class BlockManager {
|
|
* a toRemove list (since there won't be any). It also silently discards
|
|
* a toRemove list (since there won't be any). It also silently discards
|
|
* any invalid blocks, thereby deferring their processing until
|
|
* any invalid blocks, thereby deferring their processing until
|
|
* the next block report.
|
|
* the next block report.
|
|
- * @param node - DatanodeDescriptor of the node that sent the report
|
|
|
|
|
|
+ * @param storageInfo - DatanodeStorageInfo that sent the report
|
|
* @param report - the initial block report, to be processed
|
|
* @param report - the initial block report, to be processed
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
- private void processFirstBlockReport(final DatanodeDescriptor node,
|
|
|
|
- final String storageID,
|
|
|
|
|
|
+ private void processFirstBlockReport(
|
|
|
|
+ final DatanodeStorageInfo storageInfo,
|
|
final BlockListAsLongs report) throws IOException {
|
|
final BlockListAsLongs report) throws IOException {
|
|
if (report == null) return;
|
|
if (report == null) return;
|
|
assert (namesystem.hasWriteLock());
|
|
assert (namesystem.hasWriteLock());
|
|
- assert (node.getStorageInfo(storageID).numBlocks() == 0);
|
|
|
|
|
|
+ assert (storageInfo.numBlocks() == 0);
|
|
BlockReportIterator itBR = report.getBlockReportIterator();
|
|
BlockReportIterator itBR = report.getBlockReportIterator();
|
|
|
|
|
|
while(itBR.hasNext()) {
|
|
while(itBR.hasNext()) {
|
|
@@ -1860,7 +1873,7 @@ public class BlockManager {
|
|
|
|
|
|
if (shouldPostponeBlocksFromFuture &&
|
|
if (shouldPostponeBlocksFromFuture &&
|
|
namesystem.isGenStampInFuture(iblk)) {
|
|
namesystem.isGenStampInFuture(iblk)) {
|
|
- queueReportedBlock(node, storageID, iblk, reportedState,
|
|
|
|
|
|
+ queueReportedBlock(storageInfo, iblk, reportedState,
|
|
QUEUE_REASON_FUTURE_GENSTAMP);
|
|
QUEUE_REASON_FUTURE_GENSTAMP);
|
|
continue;
|
|
continue;
|
|
}
|
|
}
|
|
@@ -1872,15 +1885,16 @@ public class BlockManager {
|
|
// If block is corrupt, mark it and continue to next block.
|
|
// If block is corrupt, mark it and continue to next block.
|
|
BlockUCState ucState = storedBlock.getBlockUCState();
|
|
BlockUCState ucState = storedBlock.getBlockUCState();
|
|
BlockToMarkCorrupt c = checkReplicaCorrupt(
|
|
BlockToMarkCorrupt c = checkReplicaCorrupt(
|
|
- iblk, reportedState, storedBlock, ucState, node);
|
|
|
|
|
|
+ iblk, reportedState, storedBlock, ucState,
|
|
|
|
+ storageInfo.getDatanodeDescriptor());
|
|
if (c != null) {
|
|
if (c != null) {
|
|
if (shouldPostponeBlocksFromFuture) {
|
|
if (shouldPostponeBlocksFromFuture) {
|
|
// In the Standby, we may receive a block report for a file that we
|
|
// In the Standby, we may receive a block report for a file that we
|
|
// just have an out-of-date gen-stamp or state for, for example.
|
|
// just have an out-of-date gen-stamp or state for, for example.
|
|
- queueReportedBlock(node, storageID, iblk, reportedState,
|
|
|
|
|
|
+ queueReportedBlock(storageInfo, iblk, reportedState,
|
|
QUEUE_REASON_CORRUPT_STATE);
|
|
QUEUE_REASON_CORRUPT_STATE);
|
|
} else {
|
|
} else {
|
|
- markBlockAsCorrupt(c, node, storageID);
|
|
|
|
|
|
+ markBlockAsCorrupt(c, storageInfo, storageInfo.getDatanodeDescriptor());
|
|
}
|
|
}
|
|
continue;
|
|
continue;
|
|
}
|
|
}
|
|
@@ -1888,7 +1902,7 @@ public class BlockManager {
|
|
// If block is under construction, add this replica to its list
|
|
// If block is under construction, add this replica to its list
|
|
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
|
|
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
|
|
((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
|
|
((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
|
|
- node.getStorageInfo(storageID), iblk, reportedState);
|
|
|
|
|
|
+ storageInfo, iblk, reportedState);
|
|
// OpenFileBlocks only inside snapshots also will be added to safemode
|
|
// OpenFileBlocks only inside snapshots also will be added to safemode
|
|
// threshold. So we need to update such blocks to safemode
|
|
// threshold. So we need to update such blocks to safemode
|
|
// refer HDFS-5283
|
|
// refer HDFS-5283
|
|
@@ -1901,12 +1915,12 @@ public class BlockManager {
|
|
}
|
|
}
|
|
//add replica if appropriate
|
|
//add replica if appropriate
|
|
if (reportedState == ReplicaState.FINALIZED) {
|
|
if (reportedState == ReplicaState.FINALIZED) {
|
|
- addStoredBlockImmediate(storedBlock, node, storageID);
|
|
|
|
|
|
+ addStoredBlockImmediate(storedBlock, storageInfo);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage,
|
|
|
|
|
|
+ private void reportDiff(DatanodeStorageInfo storageInfo,
|
|
BlockListAsLongs newReport,
|
|
BlockListAsLongs newReport,
|
|
Collection<BlockInfo> toAdd, // add to DatanodeDescriptor
|
|
Collection<BlockInfo> toAdd, // add to DatanodeDescriptor
|
|
Collection<Block> toRemove, // remove from DatanodeDescriptor
|
|
Collection<Block> toRemove, // remove from DatanodeDescriptor
|
|
@@ -1914,8 +1928,6 @@ public class BlockManager {
|
|
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
|
|
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
|
|
Collection<StatefulBlockInfo> toUC) { // add to under-construction list
|
|
Collection<StatefulBlockInfo> toUC) { // add to under-construction list
|
|
|
|
|
|
- final DatanodeStorageInfo storageInfo = dn.getStorageInfo(storage.getStorageID());
|
|
|
|
-
|
|
|
|
// place a delimiter in the list which separates blocks
|
|
// place a delimiter in the list which separates blocks
|
|
// that have been reported from those that have not
|
|
// that have been reported from those that have not
|
|
BlockInfo delimiter = new BlockInfo(new Block(), 1);
|
|
BlockInfo delimiter = new BlockInfo(new Block(), 1);
|
|
@@ -1932,7 +1944,7 @@ public class BlockManager {
|
|
while(itBR.hasNext()) {
|
|
while(itBR.hasNext()) {
|
|
Block iblk = itBR.next();
|
|
Block iblk = itBR.next();
|
|
ReplicaState iState = itBR.getCurrentReplicaState();
|
|
ReplicaState iState = itBR.getCurrentReplicaState();
|
|
- BlockInfo storedBlock = processReportedBlock(dn, storage.getStorageID(),
|
|
|
|
|
|
+ BlockInfo storedBlock = processReportedBlock(storageInfo,
|
|
iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
|
|
iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
|
|
|
|
|
|
// move block to the head of the list
|
|
// move block to the head of the list
|
|
@@ -1969,7 +1981,7 @@ public class BlockManager {
|
|
* BlockInfoUnderConstruction's list of replicas.</li>
|
|
* BlockInfoUnderConstruction's list of replicas.</li>
|
|
* </ol>
|
|
* </ol>
|
|
*
|
|
*
|
|
- * @param dn descriptor for the datanode that made the report
|
|
|
|
|
|
+ * @param storageInfo DatanodeStorageInfo that sent the report.
|
|
* @param block reported block replica
|
|
* @param block reported block replica
|
|
* @param reportedState reported replica state
|
|
* @param reportedState reported replica state
|
|
* @param toAdd add to DatanodeDescriptor
|
|
* @param toAdd add to DatanodeDescriptor
|
|
@@ -1981,14 +1993,16 @@ public class BlockManager {
|
|
* @return the up-to-date stored block, if it should be kept.
|
|
* @return the up-to-date stored block, if it should be kept.
|
|
* Otherwise, null.
|
|
* Otherwise, null.
|
|
*/
|
|
*/
|
|
- private BlockInfo processReportedBlock(final DatanodeDescriptor dn,
|
|
|
|
- final String storageID,
|
|
|
|
|
|
+ private BlockInfo processReportedBlock(
|
|
|
|
+ final DatanodeStorageInfo storageInfo,
|
|
final Block block, final ReplicaState reportedState,
|
|
final Block block, final ReplicaState reportedState,
|
|
final Collection<BlockInfo> toAdd,
|
|
final Collection<BlockInfo> toAdd,
|
|
final Collection<Block> toInvalidate,
|
|
final Collection<Block> toInvalidate,
|
|
final Collection<BlockToMarkCorrupt> toCorrupt,
|
|
final Collection<BlockToMarkCorrupt> toCorrupt,
|
|
final Collection<StatefulBlockInfo> toUC) {
|
|
final Collection<StatefulBlockInfo> toUC) {
|
|
|
|
|
|
|
|
+ DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
|
|
|
|
+
|
|
if(LOG.isDebugEnabled()) {
|
|
if(LOG.isDebugEnabled()) {
|
|
LOG.debug("Reported block " + block
|
|
LOG.debug("Reported block " + block
|
|
+ " on " + dn + " size " + block.getNumBytes()
|
|
+ " on " + dn + " size " + block.getNumBytes()
|
|
@@ -1997,7 +2011,7 @@ public class BlockManager {
|
|
|
|
|
|
if (shouldPostponeBlocksFromFuture &&
|
|
if (shouldPostponeBlocksFromFuture &&
|
|
namesystem.isGenStampInFuture(block)) {
|
|
namesystem.isGenStampInFuture(block)) {
|
|
- queueReportedBlock(dn, storageID, block, reportedState,
|
|
|
|
|
|
+ queueReportedBlock(storageInfo, block, reportedState,
|
|
QUEUE_REASON_FUTURE_GENSTAMP);
|
|
QUEUE_REASON_FUTURE_GENSTAMP);
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
@@ -2037,7 +2051,7 @@ public class BlockManager {
|
|
// TODO: Pretty confident this should be s/storedBlock/block below,
|
|
// TODO: Pretty confident this should be s/storedBlock/block below,
|
|
// since we should be postponing the info of the reported block, not
|
|
// since we should be postponing the info of the reported block, not
|
|
// the stored block. See HDFS-6289 for more context.
|
|
// the stored block. See HDFS-6289 for more context.
|
|
- queueReportedBlock(dn, storageID, storedBlock, reportedState,
|
|
|
|
|
|
+ queueReportedBlock(storageInfo, storedBlock, reportedState,
|
|
QUEUE_REASON_CORRUPT_STATE);
|
|
QUEUE_REASON_CORRUPT_STATE);
|
|
} else {
|
|
} else {
|
|
toCorrupt.add(c);
|
|
toCorrupt.add(c);
|
|
@@ -2066,17 +2080,17 @@ public class BlockManager {
|
|
* standby node. @see PendingDataNodeMessages.
|
|
* standby node. @see PendingDataNodeMessages.
|
|
* @param reason a textual reason to report in the debug logs
|
|
* @param reason a textual reason to report in the debug logs
|
|
*/
|
|
*/
|
|
- private void queueReportedBlock(DatanodeDescriptor dn, String storageID, Block block,
|
|
|
|
|
|
+ private void queueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
|
|
ReplicaState reportedState, String reason) {
|
|
ReplicaState reportedState, String reason) {
|
|
assert shouldPostponeBlocksFromFuture;
|
|
assert shouldPostponeBlocksFromFuture;
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug("Queueing reported block " + block +
|
|
LOG.debug("Queueing reported block " + block +
|
|
" in state " + reportedState +
|
|
" in state " + reportedState +
|
|
- " from datanode " + dn + " for later processing " +
|
|
|
|
- "because " + reason + ".");
|
|
|
|
|
|
+ " from datanode " + storageInfo.getDatanodeDescriptor() +
|
|
|
|
+ " for later processing because " + reason + ".");
|
|
}
|
|
}
|
|
- pendingDNMessages.enqueueReportedBlock(dn, storageID, block, reportedState);
|
|
|
|
|
|
+ pendingDNMessages.enqueueReportedBlock(storageInfo, block, reportedState);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -2099,7 +2113,7 @@ public class BlockManager {
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug("Processing previouly queued message " + rbi);
|
|
LOG.debug("Processing previouly queued message " + rbi);
|
|
}
|
|
}
|
|
- processAndHandleReportedBlock(rbi.getNode(), rbi.getStorageID(),
|
|
|
|
|
|
+ processAndHandleReportedBlock(rbi.getStorageInfo(),
|
|
rbi.getBlock(), rbi.getReportedState(), null);
|
|
rbi.getBlock(), rbi.getReportedState(), null);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -2156,6 +2170,16 @@ public class BlockManager {
|
|
} else {
|
|
} else {
|
|
return null; // not corrupt
|
|
return null; // not corrupt
|
|
}
|
|
}
|
|
|
|
+ case UNDER_CONSTRUCTION:
|
|
|
|
+ if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) {
|
|
|
|
+ final long reportedGS = reported.getGenerationStamp();
|
|
|
|
+ return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is "
|
|
|
|
+ + ucState + " and reported state " + reportedState
|
|
|
|
+ + ", But reported genstamp " + reportedGS
|
|
|
|
+ + " does not match genstamp in block map "
|
|
|
|
+ + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
|
|
|
|
+ }
|
|
|
|
+ return null;
|
|
default:
|
|
default:
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
@@ -2219,19 +2243,20 @@ public class BlockManager {
|
|
}
|
|
}
|
|
|
|
|
|
void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
|
|
void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
|
|
- DatanodeDescriptor node, String storageID) throws IOException {
|
|
|
|
|
|
+ DatanodeStorageInfo storageInfo) throws IOException {
|
|
BlockInfoUnderConstruction block = ucBlock.storedBlock;
|
|
BlockInfoUnderConstruction block = ucBlock.storedBlock;
|
|
- block.addReplicaIfNotPresent(node.getStorageInfo(storageID),
|
|
|
|
- ucBlock.reportedBlock, ucBlock.reportedState);
|
|
|
|
|
|
+ block.addReplicaIfNotPresent(
|
|
|
|
+ storageInfo, ucBlock.reportedBlock, ucBlock.reportedState);
|
|
|
|
|
|
- if (ucBlock.reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) {
|
|
|
|
- addStoredBlock(block, node, storageID, null, true);
|
|
|
|
|
|
+ if (ucBlock.reportedState == ReplicaState.FINALIZED &&
|
|
|
|
+ block.findDatanode(storageInfo.getDatanodeDescriptor()) < 0) {
|
|
|
|
+ addStoredBlock(block, storageInfo, null, true);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* Faster version of
|
|
* Faster version of
|
|
- * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, String, DatanodeDescriptor, boolean)}
|
|
|
|
|
|
+ * {@link #addStoredBlock(BlockInfo, DatanodeStorageInfo, DatanodeDescriptor, boolean)}
|
|
* , intended for use with initial block report at startup. If not in startup
|
|
* , intended for use with initial block report at startup. If not in startup
|
|
* safe mode, will call standard addStoredBlock(). Assumes this method is
|
|
* safe mode, will call standard addStoredBlock(). Assumes this method is
|
|
* called "immediately" so there is no need to refresh the storedBlock from
|
|
* called "immediately" so there is no need to refresh the storedBlock from
|
|
@@ -2242,17 +2267,17 @@ public class BlockManager {
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
private void addStoredBlockImmediate(BlockInfo storedBlock,
|
|
private void addStoredBlockImmediate(BlockInfo storedBlock,
|
|
- DatanodeDescriptor node, String storageID)
|
|
|
|
|
|
+ DatanodeStorageInfo storageInfo)
|
|
throws IOException {
|
|
throws IOException {
|
|
assert (storedBlock != null && namesystem.hasWriteLock());
|
|
assert (storedBlock != null && namesystem.hasWriteLock());
|
|
if (!namesystem.isInStartupSafeMode()
|
|
if (!namesystem.isInStartupSafeMode()
|
|
|| namesystem.isPopulatingReplQueues()) {
|
|
|| namesystem.isPopulatingReplQueues()) {
|
|
- addStoredBlock(storedBlock, node, storageID, null, false);
|
|
|
|
|
|
+ addStoredBlock(storedBlock, storageInfo, null, false);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
// just add it
|
|
// just add it
|
|
- node.addBlock(storageID, storedBlock);
|
|
|
|
|
|
+ storageInfo.addBlock(storedBlock);
|
|
|
|
|
|
// Now check for completion of blocks and safe block count
|
|
// Now check for completion of blocks and safe block count
|
|
int numCurrentReplica = countLiveNodes(storedBlock);
|
|
int numCurrentReplica = countLiveNodes(storedBlock);
|
|
@@ -2274,13 +2299,13 @@ public class BlockManager {
|
|
* @return the block that is stored in blockMap.
|
|
* @return the block that is stored in blockMap.
|
|
*/
|
|
*/
|
|
private Block addStoredBlock(final BlockInfo block,
|
|
private Block addStoredBlock(final BlockInfo block,
|
|
- DatanodeDescriptor node,
|
|
|
|
- String storageID,
|
|
|
|
|
|
+ DatanodeStorageInfo storageInfo,
|
|
DatanodeDescriptor delNodeHint,
|
|
DatanodeDescriptor delNodeHint,
|
|
boolean logEveryBlock)
|
|
boolean logEveryBlock)
|
|
throws IOException {
|
|
throws IOException {
|
|
assert block != null && namesystem.hasWriteLock();
|
|
assert block != null && namesystem.hasWriteLock();
|
|
BlockInfo storedBlock;
|
|
BlockInfo storedBlock;
|
|
|
|
+ DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
|
if (block instanceof BlockInfoUnderConstruction) {
|
|
if (block instanceof BlockInfoUnderConstruction) {
|
|
//refresh our copy in case the block got completed in another thread
|
|
//refresh our copy in case the block got completed in another thread
|
|
storedBlock = blocksMap.getStoredBlock(block);
|
|
storedBlock = blocksMap.getStoredBlock(block);
|
|
@@ -2300,7 +2325,7 @@ public class BlockManager {
|
|
assert bc != null : "Block must belong to a file";
|
|
assert bc != null : "Block must belong to a file";
|
|
|
|
|
|
// add block to the datanode
|
|
// add block to the datanode
|
|
- boolean added = node.addBlock(storageID, storedBlock);
|
|
|
|
|
|
+ boolean added = storageInfo.addBlock(storedBlock);
|
|
|
|
|
|
int curReplicaDelta;
|
|
int curReplicaDelta;
|
|
if (added) {
|
|
if (added) {
|
|
@@ -2829,12 +2854,15 @@ public class BlockManager {
|
|
} else {
|
|
} else {
|
|
final String[] datanodeUuids = new String[locations.size()];
|
|
final String[] datanodeUuids = new String[locations.size()];
|
|
final String[] storageIDs = new String[datanodeUuids.length];
|
|
final String[] storageIDs = new String[datanodeUuids.length];
|
|
|
|
+ final StorageType[] storageTypes = new StorageType[datanodeUuids.length];
|
|
for(int i = 0; i < locations.size(); i++) {
|
|
for(int i = 0; i < locations.size(); i++) {
|
|
final DatanodeStorageInfo s = locations.get(i);
|
|
final DatanodeStorageInfo s = locations.get(i);
|
|
datanodeUuids[i] = s.getDatanodeDescriptor().getDatanodeUuid();
|
|
datanodeUuids[i] = s.getDatanodeDescriptor().getDatanodeUuid();
|
|
storageIDs[i] = s.getStorageID();
|
|
storageIDs[i] = s.getStorageID();
|
|
|
|
+ storageTypes[i] = s.getStorageType();
|
|
}
|
|
}
|
|
- results.add(new BlockWithLocations(block, datanodeUuids, storageIDs));
|
|
|
|
|
|
+ results.add(new BlockWithLocations(block, datanodeUuids, storageIDs,
|
|
|
|
+ storageTypes));
|
|
return block.getNumBytes();
|
|
return block.getNumBytes();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -2843,8 +2871,9 @@ public class BlockManager {
|
|
* The given node is reporting that it received a certain block.
|
|
* The given node is reporting that it received a certain block.
|
|
*/
|
|
*/
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
- void addBlock(DatanodeDescriptor node, String storageID, Block block, String delHint)
|
|
|
|
|
|
+ void addBlock(DatanodeStorageInfo storageInfo, Block block, String delHint)
|
|
throws IOException {
|
|
throws IOException {
|
|
|
|
+ DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
|
// Decrement number of blocks scheduled to this datanode.
|
|
// Decrement number of blocks scheduled to this datanode.
|
|
// for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with
|
|
// for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with
|
|
// RECEIVED_BLOCK), we currently also decrease the approximate number.
|
|
// RECEIVED_BLOCK), we currently also decrease the approximate number.
|
|
@@ -2864,12 +2893,12 @@ public class BlockManager {
|
|
// Modify the blocks->datanode map and node's map.
|
|
// Modify the blocks->datanode map and node's map.
|
|
//
|
|
//
|
|
pendingReplications.decrement(block, node);
|
|
pendingReplications.decrement(block, node);
|
|
- processAndHandleReportedBlock(node, storageID, block, ReplicaState.FINALIZED,
|
|
|
|
|
|
+ processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
|
|
delHintNode);
|
|
delHintNode);
|
|
}
|
|
}
|
|
|
|
|
|
- private void processAndHandleReportedBlock(DatanodeDescriptor node,
|
|
|
|
- String storageID, Block block,
|
|
|
|
|
|
+ private void processAndHandleReportedBlock(
|
|
|
|
+ DatanodeStorageInfo storageInfo, Block block,
|
|
ReplicaState reportedState, DatanodeDescriptor delHintNode)
|
|
ReplicaState reportedState, DatanodeDescriptor delHintNode)
|
|
throws IOException {
|
|
throws IOException {
|
|
// blockReceived reports a finalized block
|
|
// blockReceived reports a finalized block
|
|
@@ -2877,7 +2906,9 @@ public class BlockManager {
|
|
Collection<Block> toInvalidate = new LinkedList<Block>();
|
|
Collection<Block> toInvalidate = new LinkedList<Block>();
|
|
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
|
|
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
|
|
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
|
|
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
|
|
- processReportedBlock(node, storageID, block, reportedState,
|
|
|
|
|
|
+ final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
|
|
|
+
|
|
|
|
+ processReportedBlock(storageInfo, block, reportedState,
|
|
toAdd, toInvalidate, toCorrupt, toUC);
|
|
toAdd, toInvalidate, toCorrupt, toUC);
|
|
// the block is only in one of the to-do lists
|
|
// the block is only in one of the to-do lists
|
|
// if it is in none then data-node already has it
|
|
// if it is in none then data-node already has it
|
|
@@ -2885,11 +2916,11 @@ public class BlockManager {
|
|
: "The block should be only in one of the lists.";
|
|
: "The block should be only in one of the lists.";
|
|
|
|
|
|
for (StatefulBlockInfo b : toUC) {
|
|
for (StatefulBlockInfo b : toUC) {
|
|
- addStoredBlockUnderConstruction(b, node, storageID);
|
|
|
|
|
|
+ addStoredBlockUnderConstruction(b, storageInfo);
|
|
}
|
|
}
|
|
long numBlocksLogged = 0;
|
|
long numBlocksLogged = 0;
|
|
for (BlockInfo b : toAdd) {
|
|
for (BlockInfo b : toAdd) {
|
|
- addStoredBlock(b, node, storageID, delHintNode, numBlocksLogged < maxNumBlocksToLog);
|
|
|
|
|
|
+ addStoredBlock(b, storageInfo, delHintNode, numBlocksLogged < maxNumBlocksToLog);
|
|
numBlocksLogged++;
|
|
numBlocksLogged++;
|
|
}
|
|
}
|
|
if (numBlocksLogged > maxNumBlocksToLog) {
|
|
if (numBlocksLogged > maxNumBlocksToLog) {
|
|
@@ -2903,7 +2934,7 @@ public class BlockManager {
|
|
addToInvalidates(b, node);
|
|
addToInvalidates(b, node);
|
|
}
|
|
}
|
|
for (BlockToMarkCorrupt b : toCorrupt) {
|
|
for (BlockToMarkCorrupt b : toCorrupt) {
|
|
- markBlockAsCorrupt(b, node, storageID);
|
|
|
|
|
|
+ markBlockAsCorrupt(b, storageInfo, node);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -2930,13 +2961,15 @@ public class BlockManager {
|
|
"Got incremental block report from unregistered or dead node");
|
|
"Got incremental block report from unregistered or dead node");
|
|
}
|
|
}
|
|
|
|
|
|
- if (node.getStorageInfo(srdb.getStorage().getStorageID()) == null) {
|
|
|
|
|
|
+ DatanodeStorageInfo storageInfo =
|
|
|
|
+ node.getStorageInfo(srdb.getStorage().getStorageID());
|
|
|
|
+ if (storageInfo == null) {
|
|
// The DataNode is reporting an unknown storage. Usually the NN learns
|
|
// The DataNode is reporting an unknown storage. Usually the NN learns
|
|
// about new storages from heartbeats but during NN restart we may
|
|
// about new storages from heartbeats but during NN restart we may
|
|
// receive a block report or incremental report before the heartbeat.
|
|
// receive a block report or incremental report before the heartbeat.
|
|
// We must handle this for protocol compatibility. This issue was
|
|
// We must handle this for protocol compatibility. This issue was
|
|
// uncovered by HDFS-6094.
|
|
// uncovered by HDFS-6094.
|
|
- node.updateStorage(srdb.getStorage());
|
|
|
|
|
|
+ storageInfo = node.updateStorage(srdb.getStorage());
|
|
}
|
|
}
|
|
|
|
|
|
for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
|
|
for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
|
|
@@ -2946,14 +2979,13 @@ public class BlockManager {
|
|
deleted++;
|
|
deleted++;
|
|
break;
|
|
break;
|
|
case RECEIVED_BLOCK:
|
|
case RECEIVED_BLOCK:
|
|
- addBlock(node, srdb.getStorage().getStorageID(),
|
|
|
|
- rdbi.getBlock(), rdbi.getDelHints());
|
|
|
|
|
|
+ addBlock(storageInfo, rdbi.getBlock(), rdbi.getDelHints());
|
|
received++;
|
|
received++;
|
|
break;
|
|
break;
|
|
case RECEIVING_BLOCK:
|
|
case RECEIVING_BLOCK:
|
|
receiving++;
|
|
receiving++;
|
|
- processAndHandleReportedBlock(node, srdb.getStorage().getStorageID(),
|
|
|
|
- rdbi.getBlock(), ReplicaState.RBW, null);
|
|
|
|
|
|
+ processAndHandleReportedBlock(storageInfo, rdbi.getBlock(),
|
|
|
|
+ ReplicaState.RBW, null);
|
|
break;
|
|
break;
|
|
default:
|
|
default:
|
|
String msg =
|
|
String msg =
|