|
@@ -37,6 +37,9 @@ import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
|
|
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
|
|
|
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
|
|
|
import org.apache.hadoop.security.AccessTokenHandler;
|
|
@@ -248,13 +251,15 @@ public class BlockManager {
|
|
|
Block commitBlock) throws IOException {
|
|
|
if(commitBlock == null)
|
|
|
return; // not committing, this is a block allocation retry
|
|
|
- BlockInfoUnderConstruction lastBlock = fileINode.getLastBlock();
|
|
|
+ BlockInfo lastBlock = fileINode.getLastBlock();
|
|
|
if(lastBlock == null)
|
|
|
return; // no blocks in file yet
|
|
|
+ if(!lastBlock.isUnderConstruction())
|
|
|
+ return; // already completed (e.g. by syncBlock)
|
|
|
assert lastBlock.getNumBytes() <= commitBlock.getNumBytes() :
|
|
|
"commitBlock length is less than the stored one "
|
|
|
+ commitBlock.getNumBytes() + " vs. " + lastBlock.getNumBytes();
|
|
|
- lastBlock.commitBlock(commitBlock);
|
|
|
+ ((BlockInfoUnderConstruction)lastBlock).commitBlock(commitBlock);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -264,12 +269,13 @@ public class BlockManager {
|
|
|
* @throws IOException if the block does not have at least a minimal number
|
|
|
* of replicas reported from data-nodes.
|
|
|
*/
|
|
|
- void completeBlock(INodeFile fileINode, int blkIndex) throws IOException {
|
|
|
+ BlockInfo completeBlock(INodeFile fileINode, int blkIndex)
|
|
|
+ throws IOException {
|
|
|
if(blkIndex < 0)
|
|
|
- return;
|
|
|
+ return null;
|
|
|
BlockInfo curBlock = fileINode.getBlocks()[blkIndex];
|
|
|
if(!curBlock.isUnderConstruction())
|
|
|
- return;
|
|
|
+ return curBlock;
|
|
|
BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)curBlock;
|
|
|
if(ucBlock.numNodes() < minReplication)
|
|
|
throw new IOException("Cannot complete block: " +
|
|
@@ -278,7 +284,17 @@ public class BlockManager {
|
|
|
// replace penultimate block in file
|
|
|
fileINode.setBlock(blkIndex, completeBlock);
|
|
|
// replace block in the blocksMap
|
|
|
- blocksMap.replaceBlock(completeBlock);
|
|
|
+ return blocksMap.replaceBlock(completeBlock);
|
|
|
+ }
|
|
|
+
|
|
|
+ BlockInfo completeBlock(INodeFile fileINode, BlockInfo block)
|
|
|
+ throws IOException {
|
|
|
+ BlockInfo[] fileBlocks = fileINode.getBlocks();
|
|
|
+ for(int idx = 0; idx < fileBlocks.length; idx++)
|
|
|
+ if(fileBlocks[idx] == block) {
|
|
|
+ return completeBlock(fileINode, idx);
|
|
|
+ }
|
|
|
+ return block;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -430,7 +446,7 @@ public class BlockManager {
|
|
|
pendingDeletionBlocksCount++;
|
|
|
if (log) {
|
|
|
NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
|
|
|
- + b.getBlockName() + " to " + dn.getName());
|
|
|
+ + b + " to " + dn.getName());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -460,7 +476,7 @@ public class BlockManager {
|
|
|
}
|
|
|
if (datanodes.length() != 0) {
|
|
|
NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
|
|
|
- + b.getBlockName() + " to " + datanodes.toString());
|
|
|
+ + b + " to " + datanodes.toString());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -943,7 +959,8 @@ public class BlockManager {
|
|
|
Collection<Block> toAdd = new LinkedList<Block>();
|
|
|
Collection<Block> toRemove = new LinkedList<Block>();
|
|
|
Collection<Block> toInvalidate = new LinkedList<Block>();
|
|
|
- node.reportDiff(blocksMap, report, toAdd, toRemove, toInvalidate);
|
|
|
+ Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
|
|
|
+ node.reportDiff(this, report, toAdd, toRemove, toInvalidate, toCorrupt);
|
|
|
|
|
|
for (Block b : toRemove) {
|
|
|
removeStoredBlock(b, node);
|
|
@@ -957,6 +974,9 @@ public class BlockManager {
|
|
|
+ " does not belong to any file.");
|
|
|
addToInvalidates(b, node);
|
|
|
}
|
|
|
+ for (BlockInfo b : toCorrupt) {
|
|
|
+ markBlockAsCorrupt(b, node);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -966,7 +986,8 @@ public class BlockManager {
|
|
|
*/
|
|
|
private Block addStoredBlock(final Block block,
|
|
|
DatanodeDescriptor node,
|
|
|
- DatanodeDescriptor delNodeHint) {
|
|
|
+ DatanodeDescriptor delNodeHint)
|
|
|
+ throws IOException {
|
|
|
BlockInfo storedBlock = blocksMap.getStoredBlock(block);
|
|
|
if (storedBlock == null || storedBlock.getINode() == null) {
|
|
|
// If this block does not belong to anyfile, then we are done.
|
|
@@ -1081,11 +1102,12 @@ public class BlockManager {
|
|
|
// check whether safe replication is reached for the block
|
|
|
namesystem.incrementSafeBlockCount(numCurrentReplica);
|
|
|
|
|
|
- //
|
|
|
- // if file is being actively written to, then do not check
|
|
|
- // replication-factor here. It will be checked when the file is closed.
|
|
|
- //
|
|
|
+ // if file is under construction, then check whether the block
|
|
|
+ // can be completed
|
|
|
if (fileINode.isUnderConstruction()) {
|
|
|
+ if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
|
|
|
+ numLiveReplicas >= minReplication)
|
|
|
+ storedBlock = completeBlock(fileINode, storedBlock);
|
|
|
return storedBlock;
|
|
|
}
|
|
|
|
|
@@ -1311,7 +1333,30 @@ public class BlockManager {
|
|
|
// Modify the blocks->datanode map and node's map.
|
|
|
//
|
|
|
pendingReplications.remove(block);
|
|
|
- addStoredBlock(block, node, delHintNode);
|
|
|
+
|
|
|
+ // blockReceived reports a finalized block
|
|
|
+ Collection<Block> toAdd = new LinkedList<Block>();
|
|
|
+ Collection<Block> toInvalidate = new LinkedList<Block>();
|
|
|
+ Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
|
|
|
+ node.processReportedBlock(this, block, ReplicaState.FINALIZED,
|
|
|
+ toAdd, toInvalidate, toCorrupt);
|
|
|
+ // the block is only in one of the lists
|
|
|
+ // if it is in none then data-node already has it
|
|
|
+ assert toAdd.size() + toInvalidate.size() <= 1 :
|
|
|
+ "The block should be only in one of the lists.";
|
|
|
+
|
|
|
+ for (Block b : toAdd) {
|
|
|
+ addStoredBlock(b, node, delHintNode);
|
|
|
+ }
|
|
|
+ for (Block b : toInvalidate) {
|
|
|
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.addBlock: block "
|
|
|
+ + b + " on " + node.getName() + " size " + b.getNumBytes()
|
|
|
+ + " does not belong to any file.");
|
|
|
+ addToInvalidates(b, node);
|
|
|
+ }
|
|
|
+ for (BlockInfo b : toCorrupt) {
|
|
|
+ markBlockAsCorrupt(b, node);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1409,6 +1454,14 @@ public class BlockManager {
|
|
|
return blocksMap.getStoredBlock(block);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Find the block by block ID.
|
|
|
+ */
|
|
|
+ BlockInfo findStoredBlock(long blockId) {
|
|
|
+ Block wildcardBlock = new Block(blockId, 0, GenerationStamp.WILDCARD_STAMP);
|
|
|
+ return blocksMap.getStoredBlock(wildcardBlock);
|
|
|
+ }
|
|
|
+
|
|
|
/* updates a block in under replication queue */
|
|
|
void updateNeededReplications(Block block, int curReplicasDelta,
|
|
|
int expectedReplicasDelta) {
|