|
@@ -591,8 +591,8 @@ public class BlockManager {
|
|
|
* of replicas reported from data-nodes.
|
|
|
*/
|
|
|
private static boolean commitBlock(
|
|
|
- final BlockInfoContiguousUnderConstruction block, final Block commitBlock)
|
|
|
- throws IOException {
|
|
|
+ final BlockInfoContiguousUnderConstruction block,
|
|
|
+ final Block commitBlock) throws IOException {
|
|
|
if (block.getBlockUCState() == BlockUCState.COMMITTED)
|
|
|
return false;
|
|
|
assert block.getNumBytes() <= commitBlock.getNumBytes() :
|
|
@@ -623,7 +623,7 @@ public class BlockManager {
|
|
|
return false; // already completed (e.g. by syncBlock)
|
|
|
|
|
|
final boolean b = commitBlock(
|
|
|
- (BlockInfoContiguousUnderConstruction) lastBlock, commitBlock);
|
|
|
+ (BlockInfoContiguousUnderConstruction)lastBlock, commitBlock);
|
|
|
if(countNodes(lastBlock).liveReplicas() >= minReplication)
|
|
|
completeBlock(bc, bc.numBlocks()-1, false);
|
|
|
return b;
|
|
@@ -636,15 +636,16 @@ public class BlockManager {
|
|
|
* @throws IOException if the block does not have at least a minimal number
|
|
|
* of replicas reported from data-nodes.
|
|
|
*/
|
|
|
- private BlockInfoContiguous completeBlock(final BlockCollection bc,
|
|
|
+ private BlockInfo completeBlock(final BlockCollection bc,
|
|
|
final int blkIndex, boolean force) throws IOException {
|
|
|
if(blkIndex < 0)
|
|
|
return null;
|
|
|
BlockInfoContiguous curBlock = bc.getBlocks()[blkIndex];
|
|
|
- if(curBlock.isComplete())
|
|
|
+ if (curBlock.isComplete())
|
|
|
return curBlock;
|
|
|
+ // TODO: support BlockInfoStripedUC
|
|
|
BlockInfoContiguousUnderConstruction ucBlock =
|
|
|
- (BlockInfoContiguousUnderConstruction) curBlock;
|
|
|
+ (BlockInfoContiguousUnderConstruction)curBlock;
|
|
|
int numNodes = ucBlock.numNodes();
|
|
|
if (!force && numNodes < minReplication)
|
|
|
throw new IOException("Cannot complete block: " +
|
|
@@ -670,13 +671,15 @@ public class BlockManager {
|
|
|
return blocksMap.replaceBlock(completeBlock);
|
|
|
}
|
|
|
|
|
|
- private BlockInfoContiguous completeBlock(final BlockCollection bc,
|
|
|
- final BlockInfoContiguous block, boolean force) throws IOException {
|
|
|
+ // TODO: support BlockInfoStrippedUC
|
|
|
+ private BlockInfo completeBlock(final BlockCollection bc,
|
|
|
+ final BlockInfo block, boolean force) throws IOException {
|
|
|
BlockInfoContiguous[] fileBlocks = bc.getBlocks();
|
|
|
- for(int idx = 0; idx < fileBlocks.length; idx++)
|
|
|
- if(fileBlocks[idx] == block) {
|
|
|
+ for (int idx = 0; idx < fileBlocks.length; idx++) {
|
|
|
+ if (fileBlocks[idx] == block) {
|
|
|
return completeBlock(bc, idx, force);
|
|
|
}
|
|
|
+ }
|
|
|
return block;
|
|
|
}
|
|
|
|
|
@@ -685,7 +688,7 @@ public class BlockManager {
|
|
|
* regardless of whether enough replicas are present. This is necessary
|
|
|
* when tailing edit logs as a Standby.
|
|
|
*/
|
|
|
- public BlockInfoContiguous forceCompleteBlock(final BlockCollection bc,
|
|
|
+ public BlockInfo forceCompleteBlock(final BlockCollection bc,
|
|
|
final BlockInfoContiguousUnderConstruction block) throws IOException {
|
|
|
block.commitBlock(block);
|
|
|
return completeBlock(bc, block, true);
|
|
@@ -717,8 +720,8 @@ public class BlockManager {
|
|
|
|
|
|
DatanodeStorageInfo[] targets = getStorages(oldBlock);
|
|
|
|
|
|
- BlockInfoContiguousUnderConstruction ucBlock =
|
|
|
- bc.setLastBlock(oldBlock, targets);
|
|
|
+ BlockInfoContiguousUnderConstruction ucBlock = bc.setLastBlock(oldBlock,
|
|
|
+ targets);
|
|
|
blocksMap.replaceBlock(ucBlock);
|
|
|
|
|
|
// Remove block from replication queue.
|
|
@@ -1018,7 +1021,7 @@ public class BlockManager {
|
|
|
if(numBlocks == 0) {
|
|
|
return new BlocksWithLocations(new BlockWithLocations[0]);
|
|
|
}
|
|
|
- Iterator<BlockInfoContiguous> iter = node.getBlockIterator();
|
|
|
+ Iterator<BlockInfo> iter = node.getBlockIterator();
|
|
|
int startBlock = DFSUtil.getRandom().nextInt(numBlocks); // starting from a random block
|
|
|
// skip blocks
|
|
|
for(int i=0; i<startBlock; i++) {
|
|
@@ -1026,7 +1029,7 @@ public class BlockManager {
|
|
|
}
|
|
|
List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
|
|
|
long totalSize = 0;
|
|
|
- BlockInfoContiguous curBlock;
|
|
|
+ BlockInfo curBlock;
|
|
|
while(totalSize<size && iter.hasNext()) {
|
|
|
curBlock = iter.next();
|
|
|
if(!curBlock.isComplete()) continue;
|
|
@@ -1125,7 +1128,8 @@ public class BlockManager {
|
|
|
public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
|
|
|
final DatanodeInfo dn, String storageID, String reason) throws IOException {
|
|
|
assert namesystem.hasWriteLock();
|
|
|
- final BlockInfoContiguous storedBlock = getStoredBlock(blk.getLocalBlock());
|
|
|
+ final Block reportedBlock = blk.getLocalBlock();
|
|
|
+ final BlockInfo storedBlock = getStoredBlock(reportedBlock);
|
|
|
if (storedBlock == null) {
|
|
|
// Check if the replica is in the blockMap, if not
|
|
|
// ignore the request for now. This could happen when BlockScanner
|
|
@@ -1142,7 +1146,7 @@ public class BlockManager {
|
|
|
+ ") does not exist");
|
|
|
}
|
|
|
|
|
|
- markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
|
|
|
+ markBlockAsCorrupt(new BlockToMarkCorrupt(reportedBlock, storedBlock,
|
|
|
blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
|
|
|
storageID == null ? null : node.getStorageInfo(storageID),
|
|
|
node);
|
|
@@ -1168,7 +1172,7 @@ public class BlockManager {
|
|
|
|
|
|
// Add replica to the data-node if it is not already there
|
|
|
if (storageInfo != null) {
|
|
|
- storageInfo.addBlock(b.stored);
|
|
|
+ storageInfo.addBlock(b.stored, b.reportedBlock);
|
|
|
}
|
|
|
|
|
|
// Add this replica to corruptReplicas Map
|
|
@@ -1713,41 +1717,55 @@ public class BlockManager {
|
|
|
this.reportedState = reportedState;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ private static class BlockInfoToAdd {
|
|
|
+ final BlockInfo stored;
|
|
|
+ final Block reported;
|
|
|
+
|
|
|
+ BlockInfoToAdd(BlockInfo stored, Block reported) {
|
|
|
+ this.stored = stored;
|
|
|
+ this.reported = reported;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
|
|
|
* list of blocks that should be considered corrupt due to a block report.
|
|
|
*/
|
|
|
private static class BlockToMarkCorrupt {
|
|
|
/** The corrupted block in a datanode. */
|
|
|
- final BlockInfoContiguous corrupted;
|
|
|
+ final BlockInfo corrupted;
|
|
|
/** The corresponding block stored in the BlockManager. */
|
|
|
- final BlockInfoContiguous stored;
|
|
|
+ final BlockInfo stored;
|
|
|
+ /** The block reported from a datanode */
|
|
|
+ final Block reportedBlock;
|
|
|
/** The reason to mark corrupt. */
|
|
|
final String reason;
|
|
|
/** The reason code to be stored */
|
|
|
final Reason reasonCode;
|
|
|
|
|
|
- BlockToMarkCorrupt(BlockInfoContiguous corrupted,
|
|
|
- BlockInfoContiguous stored, String reason,
|
|
|
- Reason reasonCode) {
|
|
|
+ BlockToMarkCorrupt(Block reported, BlockInfo corrupted,
|
|
|
+ BlockInfo stored, String reason, Reason reasonCode) {
|
|
|
+ Preconditions.checkNotNull(reported, "reported is null");
|
|
|
Preconditions.checkNotNull(corrupted, "corrupted is null");
|
|
|
Preconditions.checkNotNull(stored, "stored is null");
|
|
|
|
|
|
+ this.reportedBlock = reported;
|
|
|
this.corrupted = corrupted;
|
|
|
this.stored = stored;
|
|
|
this.reason = reason;
|
|
|
this.reasonCode = reasonCode;
|
|
|
}
|
|
|
|
|
|
- BlockToMarkCorrupt(BlockInfoContiguous stored, String reason,
|
|
|
+ BlockToMarkCorrupt(Block reported, BlockInfo stored, String reason,
|
|
|
Reason reasonCode) {
|
|
|
- this(stored, stored, reason, reasonCode);
|
|
|
+ this(reported, stored, stored, reason, reasonCode);
|
|
|
}
|
|
|
|
|
|
- BlockToMarkCorrupt(BlockInfoContiguous stored, long gs, String reason,
|
|
|
- Reason reasonCode) {
|
|
|
- this(new BlockInfoContiguous(stored), stored, reason, reasonCode);
|
|
|
+ BlockToMarkCorrupt(Block reported, BlockInfo stored, long gs,
|
|
|
+ String reason, Reason reasonCode) {
|
|
|
+ this(reported, BlockInfo.copyOf(stored), stored, reason,
|
|
|
+ reasonCode);
|
|
|
//the corrupted block in datanode has a different generation stamp
|
|
|
corrupted.setGenerationStamp(gs);
|
|
|
}
|
|
@@ -1872,7 +1890,7 @@ public class BlockManager {
|
|
|
break;
|
|
|
}
|
|
|
|
|
|
- BlockInfoContiguous bi = getStoredBlock(b);
|
|
|
+ BlockInfo bi = getStoredBlock(b);
|
|
|
if (bi == null) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
|
|
@@ -1912,7 +1930,7 @@ public class BlockManager {
|
|
|
// Modify the (block-->datanode) map, according to the difference
|
|
|
// between the old and new block report.
|
|
|
//
|
|
|
- Collection<BlockInfoContiguous> toAdd = new LinkedList<BlockInfoContiguous>();
|
|
|
+ Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
|
|
|
Collection<Block> toRemove = new TreeSet<Block>();
|
|
|
Collection<Block> toInvalidate = new LinkedList<Block>();
|
|
|
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
|
|
@@ -1929,8 +1947,9 @@ public class BlockManager {
|
|
|
removeStoredBlock(b, node);
|
|
|
}
|
|
|
int numBlocksLogged = 0;
|
|
|
- for (BlockInfoContiguous b : toAdd) {
|
|
|
- addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog);
|
|
|
+ for (BlockInfoToAdd b : toAdd) {
|
|
|
+ addStoredBlock(b.stored, b.reported, storageInfo, null,
|
|
|
+ numBlocksLogged < maxNumBlocksToLog);
|
|
|
numBlocksLogged++;
|
|
|
}
|
|
|
if (numBlocksLogged > maxNumBlocksToLog) {
|
|
@@ -1977,7 +1996,7 @@ public class BlockManager {
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- BlockInfoContiguous storedBlock = getStoredBlock(iblk);
|
|
|
+ BlockInfo storedBlock = getStoredBlock(iblk);
|
|
|
// If block does not belong to any file, we are done.
|
|
|
if (storedBlock == null) continue;
|
|
|
|
|
@@ -2000,7 +2019,7 @@ public class BlockManager {
|
|
|
|
|
|
// If block is under construction, add this replica to its list
|
|
|
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
|
|
|
- ((BlockInfoContiguousUnderConstruction)storedBlock)
|
|
|
+ ((BlockInfoContiguousUnderConstruction) storedBlock)
|
|
|
.addReplicaIfNotPresent(storageInfo, iblk, reportedState);
|
|
|
// OpenFileBlocks only inside snapshots also will be added to safemode
|
|
|
// threshold. So we need to update such blocks to safemode
|
|
@@ -2015,14 +2034,14 @@ public class BlockManager {
|
|
|
}
|
|
|
//add replica if appropriate
|
|
|
if (reportedState == ReplicaState.FINALIZED) {
|
|
|
- addStoredBlockImmediate(storedBlock, storageInfo);
|
|
|
+ addStoredBlockImmediate(storedBlock, iblk, storageInfo);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private void reportDiff(DatanodeStorageInfo storageInfo,
|
|
|
BlockListAsLongs newReport,
|
|
|
- Collection<BlockInfoContiguous> toAdd, // add to DatanodeDescriptor
|
|
|
+ Collection<BlockInfoToAdd> toAdd, // add to DatanodeDescriptor
|
|
|
Collection<Block> toRemove, // remove from DatanodeDescriptor
|
|
|
Collection<Block> toInvalidate, // should be removed from DN
|
|
|
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
|
|
@@ -2030,8 +2049,10 @@ public class BlockManager {
|
|
|
|
|
|
// place a delimiter in the list which separates blocks
|
|
|
// that have been reported from those that have not
|
|
|
- BlockInfoContiguous delimiter = new BlockInfoContiguous(new Block(), (short) 1);
|
|
|
- AddBlockResult result = storageInfo.addBlock(delimiter);
|
|
|
+ Block delimiterBlock = new Block();
|
|
|
+ BlockInfoContiguous delimiter = new BlockInfoContiguous(delimiterBlock,
|
|
|
+ (short) 1);
|
|
|
+ AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock);
|
|
|
assert result == AddBlockResult.ADDED
|
|
|
: "Delimiting block cannot be present in the node";
|
|
|
int headIndex = 0; //currently the delimiter is in the head of the list
|
|
@@ -2045,7 +2066,7 @@ public class BlockManager {
|
|
|
while(itBR.hasNext()) {
|
|
|
Block iblk = itBR.next();
|
|
|
ReplicaState iState = itBR.getCurrentReplicaState();
|
|
|
- BlockInfoContiguous storedBlock = processReportedBlock(storageInfo,
|
|
|
+ BlockInfo storedBlock = processReportedBlock(storageInfo,
|
|
|
iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
|
|
|
|
|
|
// move block to the head of the list
|
|
@@ -2057,8 +2078,7 @@ public class BlockManager {
|
|
|
|
|
|
// collect blocks that have not been reported
|
|
|
// all of them are next to the delimiter
|
|
|
- Iterator<BlockInfoContiguous> it =
|
|
|
- storageInfo.new BlockIterator(delimiter.getNext(0));
|
|
|
+ Iterator<BlockInfo> it = storageInfo.new BlockIterator(delimiter.getNext(0));
|
|
|
while(it.hasNext())
|
|
|
toRemove.add(it.next());
|
|
|
storageInfo.removeBlock(delimiter);
|
|
@@ -2095,10 +2115,10 @@ public class BlockManager {
|
|
|
* @return the up-to-date stored block, if it should be kept.
|
|
|
* Otherwise, null.
|
|
|
*/
|
|
|
- private BlockInfoContiguous processReportedBlock(
|
|
|
+ private BlockInfo processReportedBlock(
|
|
|
final DatanodeStorageInfo storageInfo,
|
|
|
final Block block, final ReplicaState reportedState,
|
|
|
- final Collection<BlockInfoContiguous> toAdd,
|
|
|
+ final Collection<BlockInfoToAdd> toAdd,
|
|
|
final Collection<Block> toInvalidate,
|
|
|
final Collection<BlockToMarkCorrupt> toCorrupt,
|
|
|
final Collection<StatefulBlockInfo> toUC) {
|
|
@@ -2119,7 +2139,7 @@ public class BlockManager {
|
|
|
}
|
|
|
|
|
|
// find block by blockId
|
|
|
- BlockInfoContiguous storedBlock = getStoredBlock(block);
|
|
|
+ BlockInfo storedBlock = getStoredBlock(block);
|
|
|
if(storedBlock == null) {
|
|
|
// If blocksMap does not contain reported block id,
|
|
|
// the replica should be removed from the data-node.
|
|
@@ -2173,7 +2193,7 @@ public class BlockManager {
|
|
|
if (reportedState == ReplicaState.FINALIZED
|
|
|
&& (storedBlock.findStorageInfo(storageInfo) == -1 ||
|
|
|
corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
|
|
|
- toAdd.add(storedBlock);
|
|
|
+ toAdd.add(new BlockInfoToAdd(storedBlock, block));
|
|
|
}
|
|
|
return storedBlock;
|
|
|
}
|
|
@@ -2251,7 +2271,7 @@ public class BlockManager {
|
|
|
*/
|
|
|
private BlockToMarkCorrupt checkReplicaCorrupt(
|
|
|
Block reported, ReplicaState reportedState,
|
|
|
- BlockInfoContiguous storedBlock, BlockUCState ucState,
|
|
|
+ BlockInfo storedBlock, BlockUCState ucState,
|
|
|
DatanodeDescriptor dn) {
|
|
|
switch(reportedState) {
|
|
|
case FINALIZED:
|
|
@@ -2260,12 +2280,12 @@ public class BlockManager {
|
|
|
case COMMITTED:
|
|
|
if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
|
|
|
final long reportedGS = reported.getGenerationStamp();
|
|
|
- return new BlockToMarkCorrupt(storedBlock, reportedGS,
|
|
|
+ return new BlockToMarkCorrupt(reported, storedBlock, reportedGS,
|
|
|
"block is " + ucState + " and reported genstamp " + reportedGS
|
|
|
+ " does not match genstamp in block map "
|
|
|
+ storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
|
|
|
} else if (storedBlock.getNumBytes() != reported.getNumBytes()) {
|
|
|
- return new BlockToMarkCorrupt(storedBlock,
|
|
|
+ return new BlockToMarkCorrupt(reported, storedBlock,
|
|
|
"block is " + ucState + " and reported length " +
|
|
|
reported.getNumBytes() + " does not match " +
|
|
|
"length in block map " + storedBlock.getNumBytes(),
|
|
@@ -2276,8 +2296,8 @@ public class BlockManager {
|
|
|
case UNDER_CONSTRUCTION:
|
|
|
if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) {
|
|
|
final long reportedGS = reported.getGenerationStamp();
|
|
|
- return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is "
|
|
|
- + ucState + " and reported state " + reportedState
|
|
|
+ return new BlockToMarkCorrupt(reported, storedBlock, reportedGS,
|
|
|
+ "block is " + ucState + " and reported state " + reportedState
|
|
|
+ ", But reported genstamp " + reportedGS
|
|
|
+ " does not match genstamp in block map "
|
|
|
+ storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
|
|
@@ -2292,7 +2312,7 @@ public class BlockManager {
|
|
|
return null; // not corrupt
|
|
|
} else if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
|
|
|
final long reportedGS = reported.getGenerationStamp();
|
|
|
- return new BlockToMarkCorrupt(storedBlock, reportedGS,
|
|
|
+ return new BlockToMarkCorrupt(reported, storedBlock, reportedGS,
|
|
|
"reported " + reportedState + " replica with genstamp " + reportedGS
|
|
|
+ " does not match COMPLETE block's genstamp in block map "
|
|
|
+ storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
|
|
@@ -2307,7 +2327,7 @@ public class BlockManager {
|
|
|
"complete with the same genstamp");
|
|
|
return null;
|
|
|
} else {
|
|
|
- return new BlockToMarkCorrupt(storedBlock,
|
|
|
+ return new BlockToMarkCorrupt(reported, storedBlock,
|
|
|
"reported replica has invalid state " + reportedState,
|
|
|
Reason.INVALID_STATE);
|
|
|
}
|
|
@@ -2320,11 +2340,12 @@ public class BlockManager {
|
|
|
" on " + dn + " size " + storedBlock.getNumBytes();
|
|
|
// log here at WARN level since this is really a broken HDFS invariant
|
|
|
LOG.warn(msg);
|
|
|
- return new BlockToMarkCorrupt(storedBlock, msg, Reason.INVALID_STATE);
|
|
|
+ return new BlockToMarkCorrupt(reported, storedBlock, msg,
|
|
|
+ Reason.INVALID_STATE);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private boolean isBlockUnderConstruction(BlockInfoContiguous storedBlock,
|
|
|
+ private boolean isBlockUnderConstruction(BlockInfo storedBlock,
|
|
|
BlockUCState ucState, ReplicaState reportedState) {
|
|
|
switch(reportedState) {
|
|
|
case FINALIZED:
|
|
@@ -2353,7 +2374,7 @@ public class BlockManager {
|
|
|
|
|
|
if (ucBlock.reportedState == ReplicaState.FINALIZED &&
|
|
|
!block.findDatanode(storageInfo.getDatanodeDescriptor())) {
|
|
|
- addStoredBlock(block, storageInfo, null, true);
|
|
|
+ addStoredBlock(block, ucBlock.reportedBlock, storageInfo, null, true);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2368,18 +2389,18 @@ public class BlockManager {
|
|
|
*
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- private void addStoredBlockImmediate(BlockInfoContiguous storedBlock,
|
|
|
+ private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported,
|
|
|
DatanodeStorageInfo storageInfo)
|
|
|
throws IOException {
|
|
|
assert (storedBlock != null && namesystem.hasWriteLock());
|
|
|
if (!namesystem.isInStartupSafeMode()
|
|
|
|| namesystem.isPopulatingReplQueues()) {
|
|
|
- addStoredBlock(storedBlock, storageInfo, null, false);
|
|
|
+ addStoredBlock(storedBlock, reported, storageInfo, null, false);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
// just add it
|
|
|
- storageInfo.addBlock(storedBlock);
|
|
|
+ storageInfo.addBlock(storedBlock, reported);
|
|
|
|
|
|
// Now check for completion of blocks and safe block count
|
|
|
int numCurrentReplica = countLiveNodes(storedBlock);
|
|
@@ -2400,13 +2421,14 @@ public class BlockManager {
|
|
|
* needed replications if this takes care of the problem.
|
|
|
* @return the block that is stored in blockMap.
|
|
|
*/
|
|
|
- private Block addStoredBlock(final BlockInfoContiguous block,
|
|
|
+ private Block addStoredBlock(final BlockInfo block,
|
|
|
+ final Block reportedBlock,
|
|
|
DatanodeStorageInfo storageInfo,
|
|
|
DatanodeDescriptor delNodeHint,
|
|
|
boolean logEveryBlock)
|
|
|
throws IOException {
|
|
|
assert block != null && namesystem.hasWriteLock();
|
|
|
- BlockInfoContiguous storedBlock;
|
|
|
+ BlockInfo storedBlock;
|
|
|
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
|
|
if (block instanceof BlockInfoContiguousUnderConstruction) {
|
|
|
//refresh our copy in case the block got completed in another thread
|
|
@@ -2427,7 +2449,7 @@ public class BlockManager {
|
|
|
assert bc != null : "Block must belong to a file";
|
|
|
|
|
|
// add block to the datanode
|
|
|
- AddBlockResult result = storageInfo.addBlock(storedBlock);
|
|
|
+ AddBlockResult result = storageInfo.addBlock(storedBlock, reportedBlock);
|
|
|
|
|
|
int curReplicaDelta;
|
|
|
if (result == AddBlockResult.ADDED) {
|
|
@@ -2502,13 +2524,13 @@ public class BlockManager {
|
|
|
storedBlock + "blockMap has " + numCorruptNodes +
|
|
|
" but corrupt replicas map has " + corruptReplicasCount);
|
|
|
}
|
|
|
- if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication))
|
|
|
- invalidateCorruptReplicas(storedBlock);
|
|
|
+ if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) {
|
|
|
+ invalidateCorruptReplicas(storedBlock, reportedBlock);
|
|
|
+ }
|
|
|
return storedBlock;
|
|
|
}
|
|
|
|
|
|
- private void logAddStoredBlock(BlockInfoContiguous storedBlock,
|
|
|
- DatanodeDescriptor node) {
|
|
|
+ private void logAddStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) {
|
|
|
if (!blockLog.isInfoEnabled()) {
|
|
|
return;
|
|
|
}
|
|
@@ -2535,7 +2557,7 @@ public class BlockManager {
|
|
|
*
|
|
|
* @param blk Block whose corrupt replicas need to be invalidated
|
|
|
*/
|
|
|
- private void invalidateCorruptReplicas(BlockInfoContiguous blk) {
|
|
|
+ private void invalidateCorruptReplicas(BlockInfo blk, Block reported) {
|
|
|
Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk);
|
|
|
boolean removedFromBlocksMap = true;
|
|
|
if (nodes == null)
|
|
@@ -2545,7 +2567,7 @@ public class BlockManager {
|
|
|
DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]);
|
|
|
for (DatanodeDescriptor node : nodesCopy) {
|
|
|
try {
|
|
|
- if (!invalidateBlock(new BlockToMarkCorrupt(blk, null,
|
|
|
+ if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, null,
|
|
|
Reason.ANY), node)) {
|
|
|
removedFromBlocksMap = false;
|
|
|
}
|
|
@@ -2614,7 +2636,7 @@ public class BlockManager {
|
|
|
long nrInvalid = 0, nrOverReplicated = 0;
|
|
|
long nrUnderReplicated = 0, nrPostponed = 0, nrUnderConstruction = 0;
|
|
|
long startTimeMisReplicatedScan = Time.now();
|
|
|
- Iterator<BlockInfoContiguous> blocksItr = blocksMap.getBlocks().iterator();
|
|
|
+ Iterator<BlockInfo> blocksItr = blocksMap.getBlocks().iterator();
|
|
|
long totalBlocks = blocksMap.size();
|
|
|
replicationQueuesInitProgress = 0;
|
|
|
long totalProcessed = 0;
|
|
@@ -2626,7 +2648,7 @@ public class BlockManager {
|
|
|
namesystem.writeLockInterruptibly();
|
|
|
try {
|
|
|
while (processed < numBlocksPerIteration && blocksItr.hasNext()) {
|
|
|
- BlockInfoContiguous block = blocksItr.next();
|
|
|
+ BlockInfo block = blocksItr.next();
|
|
|
MisReplicationResult res = processMisReplicatedBlock(block);
|
|
|
if (LOG.isTraceEnabled()) {
|
|
|
LOG.trace("block " + block + ": " + res);
|
|
@@ -2700,7 +2722,7 @@ public class BlockManager {
|
|
|
* appropriate queues if necessary, and returns a result code indicating
|
|
|
* what happened with it.
|
|
|
*/
|
|
|
- private MisReplicationResult processMisReplicatedBlock(BlockInfoContiguous block) {
|
|
|
+ private MisReplicationResult processMisReplicatedBlock(BlockInfo block) {
|
|
|
BlockCollection bc = block.getBlockCollection();
|
|
|
if (bc == null) {
|
|
|
// block does not belong to any file
|
|
@@ -3029,14 +3051,14 @@ public class BlockManager {
|
|
|
ReplicaState reportedState, DatanodeDescriptor delHintNode)
|
|
|
throws IOException {
|
|
|
// blockReceived reports a finalized block
|
|
|
- Collection<BlockInfoContiguous> toAdd = new LinkedList<BlockInfoContiguous>();
|
|
|
+ Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
|
|
|
Collection<Block> toInvalidate = new LinkedList<Block>();
|
|
|
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
|
|
|
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
|
|
|
final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
|
|
|
|
|
- processReportedBlock(storageInfo, block, reportedState,
|
|
|
- toAdd, toInvalidate, toCorrupt, toUC);
|
|
|
+ processReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate,
|
|
|
+ toCorrupt, toUC);
|
|
|
// the block is only in one of the to-do lists
|
|
|
// if it is in none then data-node already has it
|
|
|
assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt.size() <= 1
|
|
@@ -3046,8 +3068,9 @@ public class BlockManager {
|
|
|
addStoredBlockUnderConstruction(b, storageInfo);
|
|
|
}
|
|
|
long numBlocksLogged = 0;
|
|
|
- for (BlockInfoContiguous b : toAdd) {
|
|
|
- addStoredBlock(b, storageInfo, delHintNode, numBlocksLogged < maxNumBlocksToLog);
|
|
|
+ for (BlockInfoToAdd b : toAdd) {
|
|
|
+ addStoredBlock(b.stored, b.reported, storageInfo, delHintNode,
|
|
|
+ numBlocksLogged < maxNumBlocksToLog);
|
|
|
numBlocksLogged++;
|
|
|
}
|
|
|
if (numBlocksLogged > maxNumBlocksToLog) {
|
|
@@ -3170,7 +3193,7 @@ public class BlockManager {
|
|
|
* @param b - the block being tested
|
|
|
* @return count of live nodes for this block
|
|
|
*/
|
|
|
- int countLiveNodes(BlockInfoContiguous b) {
|
|
|
+ int countLiveNodes(BlockInfo b) {
|
|
|
if (!namesystem.isInStartupSafeMode()) {
|
|
|
return countNodes(b).liveReplicas();
|
|
|
}
|
|
@@ -3325,7 +3348,7 @@ public class BlockManager {
|
|
|
return blocksMap.size();
|
|
|
}
|
|
|
|
|
|
- public DatanodeStorageInfo[] getStorages(BlockInfoContiguous block) {
|
|
|
+ public DatanodeStorageInfo[] getStorages(BlockInfo block) {
|
|
|
final DatanodeStorageInfo[] storages = new DatanodeStorageInfo[block.numNodes()];
|
|
|
int i = 0;
|
|
|
for(DatanodeStorageInfo s : blocksMap.getStorages(block)) {
|
|
@@ -3355,8 +3378,8 @@ public class BlockManager {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public BlockInfoContiguous getStoredBlock(Block block) {
|
|
|
- BlockInfoContiguous info = null;
|
|
|
+ public BlockInfo getStoredBlock(Block block) {
|
|
|
+ BlockInfo info = null;
|
|
|
if (BlockIdManager.isStripedBlockID(block.getBlockId())) {
|
|
|
info = blocksMap.getStoredBlock(
|
|
|
new Block(BlockIdManager.convertToGroupID(block.getBlockId())));
|
|
@@ -3513,7 +3536,8 @@ public class BlockManager {
|
|
|
|
|
|
public BlockInfoContiguous addBlockCollection(BlockInfoContiguous block,
|
|
|
BlockCollection bc) {
|
|
|
- return blocksMap.addBlockCollection(block, bc);
|
|
|
+ // TODO
|
|
|
+ return (BlockInfoContiguous) blocksMap.addBlockCollection(block, bc);
|
|
|
}
|
|
|
|
|
|
public BlockCollection getBlockCollection(Block b) {
|
|
@@ -3721,7 +3745,7 @@ public class BlockManager {
|
|
|
|
|
|
/**
|
|
|
* A simple result enum for the result of
|
|
|
- * {@link BlockManager#processMisReplicatedBlock(BlockInfoContiguous)}.
|
|
|
+ * {@link BlockManager#processMisReplicatedBlock}.
|
|
|
*/
|
|
|
enum MisReplicationResult {
|
|
|
/** The block should be invalidated since it belongs to a deleted file. */
|