|
@@ -15,7 +15,7 @@
|
|
* See the License for the specific language governing permissions and
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
* limitations under the License.
|
|
*/
|
|
*/
|
|
-package org.apache.hadoop.hdfs.server.blockmanagement;
|
|
|
|
|
|
+package org.apache.hadoop.hdfs.server.namenode;
|
|
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.io.PrintWriter;
|
|
import java.io.PrintWriter;
|
|
@@ -39,14 +39,10 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
|
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
-import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks.BlockIterator;
|
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
|
|
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.INode;
|
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
|
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
|
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks;
|
|
|
|
|
|
/**
|
|
/**
|
|
* Keeps information related to the blocks stored in the Hadoop cluster.
|
|
* Keeps information related to the blocks stored in the Hadoop cluster.
|
|
@@ -61,43 +57,18 @@ public class BlockManager {
|
|
|
|
|
|
private final FSNamesystem namesystem;
|
|
private final FSNamesystem namesystem;
|
|
|
|
|
|
- private volatile long pendingReplicationBlocksCount = 0L;
|
|
|
|
- private volatile long corruptReplicaBlocksCount = 0L;
|
|
|
|
- private volatile long underReplicatedBlocksCount = 0L;
|
|
|
|
- public volatile long scheduledReplicationBlocksCount = 0L;
|
|
|
|
- private volatile long excessBlocksCount = 0L;
|
|
|
|
- private volatile long pendingDeletionBlocksCount = 0L;
|
|
|
|
|
|
+ volatile long pendingReplicationBlocksCount = 0L;
|
|
|
|
+ volatile long corruptReplicaBlocksCount = 0L;
|
|
|
|
+ volatile long underReplicatedBlocksCount = 0L;
|
|
|
|
+ volatile long scheduledReplicationBlocksCount = 0L;
|
|
|
|
+ volatile long excessBlocksCount = 0L;
|
|
|
|
+ volatile long pendingDeletionBlocksCount = 0L;
|
|
|
|
|
|
- /** Used by metrics */
|
|
|
|
- public long getPendingReplicationBlocksCount() {
|
|
|
|
- return pendingReplicationBlocksCount;
|
|
|
|
- }
|
|
|
|
- /** Used by metrics */
|
|
|
|
- public long getUnderReplicatedBlocksCount() {
|
|
|
|
- return underReplicatedBlocksCount;
|
|
|
|
- }
|
|
|
|
- /** Used by metrics */
|
|
|
|
- public long getCorruptReplicaBlocksCount() {
|
|
|
|
- return corruptReplicaBlocksCount;
|
|
|
|
- }
|
|
|
|
- /** Used by metrics */
|
|
|
|
- public long getScheduledReplicationBlocksCount() {
|
|
|
|
- return scheduledReplicationBlocksCount;
|
|
|
|
- }
|
|
|
|
- /** Used by metrics */
|
|
|
|
- public long getPendingDeletionBlocksCount() {
|
|
|
|
- return pendingDeletionBlocksCount;
|
|
|
|
- }
|
|
|
|
- /** Used by metrics */
|
|
|
|
- public long getExcessBlocksCount() {
|
|
|
|
- return excessBlocksCount;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Mapping: Block -> { INode, datanodes, self ref }
|
|
|
|
- * Updated only in response to client-sent information.
|
|
|
|
- */
|
|
|
|
- public final BlocksMap blocksMap;
|
|
|
|
|
|
+ //
|
|
|
|
+ // Mapping: Block -> { INode, datanodes, self ref }
|
|
|
|
+ // Updated only in response to client-sent information.
|
|
|
|
+ //
|
|
|
|
+ final BlocksMap blocksMap;
|
|
|
|
|
|
//
|
|
//
|
|
// Store blocks-->datanodedescriptor(s) map of corrupt replicas
|
|
// Store blocks-->datanodedescriptor(s) map of corrupt replicas
|
|
@@ -119,24 +90,24 @@ public class BlockManager {
|
|
// eventually remove these extras.
|
|
// eventually remove these extras.
|
|
// Mapping: StorageID -> TreeSet<Block>
|
|
// Mapping: StorageID -> TreeSet<Block>
|
|
//
|
|
//
|
|
- public final Map<String, Collection<Block>> excessReplicateMap =
|
|
|
|
|
|
+ Map<String, Collection<Block>> excessReplicateMap =
|
|
new TreeMap<String, Collection<Block>>();
|
|
new TreeMap<String, Collection<Block>>();
|
|
|
|
|
|
//
|
|
//
|
|
// Store set of Blocks that need to be replicated 1 or more times.
|
|
// Store set of Blocks that need to be replicated 1 or more times.
|
|
// We also store pending replication-orders.
|
|
// We also store pending replication-orders.
|
|
//
|
|
//
|
|
- public UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
|
|
|
|
|
|
+ UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
|
|
private PendingReplicationBlocks pendingReplications;
|
|
private PendingReplicationBlocks pendingReplications;
|
|
|
|
|
|
// The maximum number of replicas allowed for a block
|
|
// The maximum number of replicas allowed for a block
|
|
- public int maxReplication;
|
|
|
|
|
|
+ int maxReplication;
|
|
// How many outgoing replication streams a given node should have at one time
|
|
// How many outgoing replication streams a given node should have at one time
|
|
- public int maxReplicationStreams;
|
|
|
|
|
|
+ int maxReplicationStreams;
|
|
// Minimum copies needed or else write is disallowed
|
|
// Minimum copies needed or else write is disallowed
|
|
- public int minReplication;
|
|
|
|
|
|
+ int minReplication;
|
|
// Default number of replicas
|
|
// Default number of replicas
|
|
- public int defaultReplication;
|
|
|
|
|
|
+ int defaultReplication;
|
|
// How many entries are returned by getCorruptInodes()
|
|
// How many entries are returned by getCorruptInodes()
|
|
int maxCorruptFilesReturned;
|
|
int maxCorruptFilesReturned;
|
|
|
|
|
|
@@ -150,9 +121,9 @@ public class BlockManager {
|
|
Random r = new Random();
|
|
Random r = new Random();
|
|
|
|
|
|
// for block replicas placement
|
|
// for block replicas placement
|
|
- public BlockPlacementPolicy replicator;
|
|
|
|
|
|
+ BlockPlacementPolicy replicator;
|
|
|
|
|
|
- public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
|
|
|
|
|
|
+ BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
|
|
this(fsn, conf, DEFAULT_INITIAL_MAP_CAPACITY);
|
|
this(fsn, conf, DEFAULT_INITIAL_MAP_CAPACITY);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -207,16 +178,16 @@ public class BlockManager {
|
|
FSNamesystem.LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks);
|
|
FSNamesystem.LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks);
|
|
}
|
|
}
|
|
|
|
|
|
- public void activate() {
|
|
|
|
|
|
+ void activate() {
|
|
pendingReplications.start();
|
|
pendingReplications.start();
|
|
}
|
|
}
|
|
|
|
|
|
- public void close() {
|
|
|
|
|
|
+ void close() {
|
|
if (pendingReplications != null) pendingReplications.stop();
|
|
if (pendingReplications != null) pendingReplications.stop();
|
|
blocksMap.close();
|
|
blocksMap.close();
|
|
}
|
|
}
|
|
|
|
|
|
- public void metaSave(PrintWriter out) {
|
|
|
|
|
|
+ void metaSave(PrintWriter out) {
|
|
//
|
|
//
|
|
// Dump contents of neededReplication
|
|
// Dump contents of neededReplication
|
|
//
|
|
//
|
|
@@ -278,7 +249,7 @@ public class BlockManager {
|
|
* @param block
|
|
* @param block
|
|
* @return true if the block has minimum replicas
|
|
* @return true if the block has minimum replicas
|
|
*/
|
|
*/
|
|
- public boolean checkMinReplication(Block block) {
|
|
|
|
|
|
+ boolean checkMinReplication(Block block) {
|
|
return (countNodes(block).liveReplicas() >= minReplication);
|
|
return (countNodes(block).liveReplicas() >= minReplication);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -326,7 +297,7 @@ public class BlockManager {
|
|
* @throws IOException if the block does not have at least a minimal number
|
|
* @throws IOException if the block does not have at least a minimal number
|
|
* of replicas reported from data-nodes.
|
|
* of replicas reported from data-nodes.
|
|
*/
|
|
*/
|
|
- public void commitOrCompleteLastBlock(INodeFileUnderConstruction fileINode,
|
|
|
|
|
|
+ void commitOrCompleteLastBlock(INodeFileUnderConstruction fileINode,
|
|
Block commitBlock) throws IOException {
|
|
Block commitBlock) throws IOException {
|
|
|
|
|
|
if(commitBlock == null)
|
|
if(commitBlock == null)
|
|
@@ -391,7 +362,7 @@ public class BlockManager {
|
|
* @param fileINode file
|
|
* @param fileINode file
|
|
* @return the last block locations if the block is partial or null otherwise
|
|
* @return the last block locations if the block is partial or null otherwise
|
|
*/
|
|
*/
|
|
- public LocatedBlock convertLastBlockToUnderConstruction(
|
|
|
|
|
|
+ LocatedBlock convertLastBlockToUnderConstruction(
|
|
INodeFileUnderConstruction fileINode) throws IOException {
|
|
INodeFileUnderConstruction fileINode) throws IOException {
|
|
BlockInfo oldBlock = fileINode.getLastBlock();
|
|
BlockInfo oldBlock = fileINode.getLastBlock();
|
|
if(oldBlock == null ||
|
|
if(oldBlock == null ||
|
|
@@ -422,7 +393,7 @@ public class BlockManager {
|
|
/**
|
|
/**
|
|
* Get all valid locations of the block
|
|
* Get all valid locations of the block
|
|
*/
|
|
*/
|
|
- public ArrayList<String> getValidLocations(Block block) {
|
|
|
|
|
|
+ ArrayList<String> getValidLocations(Block block) {
|
|
ArrayList<String> machineSet =
|
|
ArrayList<String> machineSet =
|
|
new ArrayList<String>(blocksMap.numNodes(block));
|
|
new ArrayList<String>(blocksMap.numNodes(block));
|
|
for(Iterator<DatanodeDescriptor> it =
|
|
for(Iterator<DatanodeDescriptor> it =
|
|
@@ -436,7 +407,7 @@ public class BlockManager {
|
|
return machineSet;
|
|
return machineSet;
|
|
}
|
|
}
|
|
|
|
|
|
- public List<LocatedBlock> getBlockLocations(BlockInfo[] blocks, long offset,
|
|
|
|
|
|
+ List<LocatedBlock> getBlockLocations(BlockInfo[] blocks, long offset,
|
|
long length, int nrBlocksToReturn) throws IOException {
|
|
long length, int nrBlocksToReturn) throws IOException {
|
|
int curBlk = 0;
|
|
int curBlk = 0;
|
|
long curPos = 0, blkSize = 0;
|
|
long curPos = 0, blkSize = 0;
|
|
@@ -465,15 +436,11 @@ public class BlockManager {
|
|
return results;
|
|
return results;
|
|
}
|
|
}
|
|
|
|
|
|
- /** @return a LocatedBlock for the given block */
|
|
|
|
- public LocatedBlock getBlockLocation(final BlockInfo blk, final long pos
|
|
|
|
|
|
+ /** @param needBlockToken
|
|
|
|
+ * @return a LocatedBlock for the given block */
|
|
|
|
+ LocatedBlock getBlockLocation(final BlockInfo blk, final long pos
|
|
) throws IOException {
|
|
) throws IOException {
|
|
- if (blk instanceof BlockInfoUnderConstruction) {
|
|
|
|
- if (blk.isComplete()) {
|
|
|
|
- throw new IOException(
|
|
|
|
- "blk instanceof BlockInfoUnderConstruction && blk.isComplete()"
|
|
|
|
- + ", blk=" + blk);
|
|
|
|
- }
|
|
|
|
|
|
+ if (!blk.isComplete()) {
|
|
final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)blk;
|
|
final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)blk;
|
|
final DatanodeDescriptor[] locations = uc.getExpectedLocations();
|
|
final DatanodeDescriptor[] locations = uc.getExpectedLocations();
|
|
return namesystem.createLocatedBlock(uc, locations, pos, false);
|
|
return namesystem.createLocatedBlock(uc, locations, pos, false);
|
|
@@ -509,7 +476,7 @@ public class BlockManager {
|
|
* Check whether the replication parameter is within the range
|
|
* Check whether the replication parameter is within the range
|
|
* determined by system configuration.
|
|
* determined by system configuration.
|
|
*/
|
|
*/
|
|
- public void verifyReplication(String src,
|
|
|
|
|
|
+ void verifyReplication(String src,
|
|
short replication,
|
|
short replication,
|
|
String clientName) throws IOException {
|
|
String clientName) throws IOException {
|
|
|
|
|
|
@@ -577,7 +544,7 @@ public class BlockManager {
|
|
* @param b block
|
|
* @param b block
|
|
* @param dn datanode
|
|
* @param dn datanode
|
|
*/
|
|
*/
|
|
- public void addToInvalidates(Block b, DatanodeInfo dn) {
|
|
|
|
|
|
+ void addToInvalidates(Block b, DatanodeInfo dn) {
|
|
addToInvalidates(b, dn, true);
|
|
addToInvalidates(b, dn, true);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -618,7 +585,7 @@ public class BlockManager {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- public void findAndMarkBlockAsCorrupt(Block blk,
|
|
|
|
|
|
+ void findAndMarkBlockAsCorrupt(Block blk,
|
|
DatanodeInfo dn) throws IOException {
|
|
DatanodeInfo dn) throws IOException {
|
|
BlockInfo storedBlock = getStoredBlock(blk);
|
|
BlockInfo storedBlock = getStoredBlock(blk);
|
|
if (storedBlock == null) {
|
|
if (storedBlock == null) {
|
|
@@ -701,14 +668,14 @@ public class BlockManager {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- public void updateState() {
|
|
|
|
|
|
+ void updateState() {
|
|
pendingReplicationBlocksCount = pendingReplications.size();
|
|
pendingReplicationBlocksCount = pendingReplications.size();
|
|
underReplicatedBlocksCount = neededReplications.size();
|
|
underReplicatedBlocksCount = neededReplications.size();
|
|
corruptReplicaBlocksCount = corruptReplicas.size();
|
|
corruptReplicaBlocksCount = corruptReplicas.size();
|
|
}
|
|
}
|
|
|
|
|
|
/** Return number of under-replicated but not missing blocks */
|
|
/** Return number of under-replicated but not missing blocks */
|
|
- public int getUnderReplicatedNotMissingBlocks() {
|
|
|
|
|
|
+ int getUnderReplicatedNotMissingBlocks() {
|
|
return neededReplications.getUnderReplicatedBlockCount();
|
|
return neededReplications.getUnderReplicatedBlockCount();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -717,7 +684,7 @@ public class BlockManager {
|
|
* @param nodesToProcess number of datanodes to schedule deletion work
|
|
* @param nodesToProcess number of datanodes to schedule deletion work
|
|
* @return total number of block for deletion
|
|
* @return total number of block for deletion
|
|
*/
|
|
*/
|
|
- public int computeInvalidateWork(int nodesToProcess) {
|
|
|
|
|
|
+ int computeInvalidateWork(int nodesToProcess) {
|
|
int numOfNodes = recentInvalidateSets.size();
|
|
int numOfNodes = recentInvalidateSets.size();
|
|
nodesToProcess = Math.min(numOfNodes, nodesToProcess);
|
|
nodesToProcess = Math.min(numOfNodes, nodesToProcess);
|
|
|
|
|
|
@@ -757,7 +724,7 @@ public class BlockManager {
|
|
*
|
|
*
|
|
* @return number of blocks scheduled for replication during this iteration.
|
|
* @return number of blocks scheduled for replication during this iteration.
|
|
*/
|
|
*/
|
|
- public int computeReplicationWork(int blocksToProcess) throws IOException {
|
|
|
|
|
|
+ int computeReplicationWork(int blocksToProcess) throws IOException {
|
|
// Choose the blocks to be replicated
|
|
// Choose the blocks to be replicated
|
|
List<List<Block>> blocksToReplicate =
|
|
List<List<Block>> blocksToReplicate =
|
|
chooseUnderReplicatedBlocks(blocksToProcess);
|
|
chooseUnderReplicatedBlocks(blocksToProcess);
|
|
@@ -1064,7 +1031,7 @@ public class BlockManager {
|
|
* If there were any replication requests that timed out, reap them
|
|
* If there were any replication requests that timed out, reap them
|
|
* and put them back into the neededReplication queue
|
|
* and put them back into the neededReplication queue
|
|
*/
|
|
*/
|
|
- public void processPendingReplications() {
|
|
|
|
|
|
+ void processPendingReplications() {
|
|
Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
|
|
Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
|
|
if (timedOutItems != null) {
|
|
if (timedOutItems != null) {
|
|
namesystem.writeLock();
|
|
namesystem.writeLock();
|
|
@@ -1497,7 +1464,7 @@ public class BlockManager {
|
|
short fileReplication = fileINode.getReplication();
|
|
short fileReplication = fileINode.getReplication();
|
|
if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) {
|
|
if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) {
|
|
neededReplications.remove(storedBlock, numCurrentReplica,
|
|
neededReplications.remove(storedBlock, numCurrentReplica,
|
|
- num.decommissionedReplicas(), fileReplication);
|
|
|
|
|
|
+ num.decommissionedReplicas, fileReplication);
|
|
} else {
|
|
} else {
|
|
updateNeededReplications(storedBlock, curReplicaDelta, 0);
|
|
updateNeededReplications(storedBlock, curReplicaDelta, 0);
|
|
}
|
|
}
|
|
@@ -1558,7 +1525,7 @@ public class BlockManager {
|
|
* For each block in the name-node verify whether it belongs to any file,
|
|
* For each block in the name-node verify whether it belongs to any file,
|
|
* over or under replicated. Place it into the respective queue.
|
|
* over or under replicated. Place it into the respective queue.
|
|
*/
|
|
*/
|
|
- public void processMisReplicatedBlocks() {
|
|
|
|
|
|
+ void processMisReplicatedBlocks() {
|
|
long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0;
|
|
long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0;
|
|
namesystem.writeLock();
|
|
namesystem.writeLock();
|
|
try {
|
|
try {
|
|
@@ -1603,7 +1570,7 @@ public class BlockManager {
|
|
* If there are any extras, call chooseExcessReplicates() to
|
|
* If there are any extras, call chooseExcessReplicates() to
|
|
* mark them in the excessReplicateMap.
|
|
* mark them in the excessReplicateMap.
|
|
*/
|
|
*/
|
|
- public void processOverReplicatedBlock(Block block, short replication,
|
|
|
|
|
|
+ void processOverReplicatedBlock(Block block, short replication,
|
|
DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) {
|
|
DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) {
|
|
assert namesystem.hasWriteLock();
|
|
assert namesystem.hasWriteLock();
|
|
if (addedNode == delNodeHint) {
|
|
if (addedNode == delNodeHint) {
|
|
@@ -1630,7 +1597,7 @@ public class BlockManager {
|
|
addedNode, delNodeHint, replicator);
|
|
addedNode, delNodeHint, replicator);
|
|
}
|
|
}
|
|
|
|
|
|
- public void addToExcessReplicate(DatanodeInfo dn, Block block) {
|
|
|
|
|
|
+ void addToExcessReplicate(DatanodeInfo dn, Block block) {
|
|
assert namesystem.hasWriteLock();
|
|
assert namesystem.hasWriteLock();
|
|
Collection<Block> excessBlocks = excessReplicateMap.get(dn.getStorageID());
|
|
Collection<Block> excessBlocks = excessReplicateMap.get(dn.getStorageID());
|
|
if (excessBlocks == null) {
|
|
if (excessBlocks == null) {
|
|
@@ -1651,7 +1618,7 @@ public class BlockManager {
|
|
* Modify (block-->datanode) map. Possibly generate replication tasks, if the
|
|
* Modify (block-->datanode) map. Possibly generate replication tasks, if the
|
|
* removed block is still valid.
|
|
* removed block is still valid.
|
|
*/
|
|
*/
|
|
- public void removeStoredBlock(Block block, DatanodeDescriptor node) {
|
|
|
|
|
|
+ void removeStoredBlock(Block block, DatanodeDescriptor node) {
|
|
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
|
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
|
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
|
|
+ block + " from " + node.getName());
|
|
+ block + " from " + node.getName());
|
|
@@ -1706,7 +1673,7 @@ public class BlockManager {
|
|
/**
|
|
/**
|
|
* The given node is reporting that it received a certain block.
|
|
* The given node is reporting that it received a certain block.
|
|
*/
|
|
*/
|
|
- public void addBlock(DatanodeDescriptor node, Block block, String delHint)
|
|
|
|
|
|
+ void addBlock(DatanodeDescriptor node, Block block, String delHint)
|
|
throws IOException {
|
|
throws IOException {
|
|
// decrement number of blocks scheduled to this datanode.
|
|
// decrement number of blocks scheduled to this datanode.
|
|
node.decBlocksScheduled();
|
|
node.decBlocksScheduled();
|
|
@@ -1759,7 +1726,7 @@ public class BlockManager {
|
|
/**
|
|
/**
|
|
* Return the number of nodes that are live and decommissioned.
|
|
* Return the number of nodes that are live and decommissioned.
|
|
*/
|
|
*/
|
|
- public NumberReplicas countNodes(Block b) {
|
|
|
|
|
|
+ NumberReplicas countNodes(Block b) {
|
|
int count = 0;
|
|
int count = 0;
|
|
int live = 0;
|
|
int live = 0;
|
|
int corrupt = 0;
|
|
int corrupt = 0;
|
|
@@ -1838,7 +1805,7 @@ public class BlockManager {
|
|
* Return true if there are any blocks on this node that have not
|
|
* Return true if there are any blocks on this node that have not
|
|
* yet reached their replication factor. Otherwise returns false.
|
|
* yet reached their replication factor. Otherwise returns false.
|
|
*/
|
|
*/
|
|
- public boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
|
|
|
|
|
|
+ boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
|
|
boolean status = false;
|
|
boolean status = false;
|
|
int underReplicatedBlocks = 0;
|
|
int underReplicatedBlocks = 0;
|
|
int decommissionOnlyReplicas = 0;
|
|
int decommissionOnlyReplicas = 0;
|
|
@@ -1888,11 +1855,11 @@ public class BlockManager {
|
|
return status;
|
|
return status;
|
|
}
|
|
}
|
|
|
|
|
|
- public int getActiveBlockCount() {
|
|
|
|
|
|
+ int getActiveBlockCount() {
|
|
return blocksMap.size() - (int)pendingDeletionBlocksCount;
|
|
return blocksMap.size() - (int)pendingDeletionBlocksCount;
|
|
}
|
|
}
|
|
|
|
|
|
- public DatanodeDescriptor[] getNodes(BlockInfo block) {
|
|
|
|
|
|
+ DatanodeDescriptor[] getNodes(BlockInfo block) {
|
|
DatanodeDescriptor[] nodes =
|
|
DatanodeDescriptor[] nodes =
|
|
new DatanodeDescriptor[block.numNodes()];
|
|
new DatanodeDescriptor[block.numNodes()];
|
|
Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
|
|
Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
|
|
@@ -1902,22 +1869,22 @@ public class BlockManager {
|
|
return nodes;
|
|
return nodes;
|
|
}
|
|
}
|
|
|
|
|
|
- public int getTotalBlocks() {
|
|
|
|
|
|
+ int getTotalBlocks() {
|
|
return blocksMap.size();
|
|
return blocksMap.size();
|
|
}
|
|
}
|
|
|
|
|
|
- public void removeBlock(Block block) {
|
|
|
|
|
|
+ void removeBlock(Block block) {
|
|
addToInvalidates(block);
|
|
addToInvalidates(block);
|
|
corruptReplicas.removeFromCorruptReplicasMap(block);
|
|
corruptReplicas.removeFromCorruptReplicasMap(block);
|
|
blocksMap.removeBlock(block);
|
|
blocksMap.removeBlock(block);
|
|
}
|
|
}
|
|
|
|
|
|
- public BlockInfo getStoredBlock(Block block) {
|
|
|
|
|
|
+ BlockInfo getStoredBlock(Block block) {
|
|
return blocksMap.getStoredBlock(block);
|
|
return blocksMap.getStoredBlock(block);
|
|
}
|
|
}
|
|
|
|
|
|
/* updates a block in under replication queue */
|
|
/* updates a block in under replication queue */
|
|
- public void updateNeededReplications(Block block, int curReplicasDelta,
|
|
|
|
|
|
+ void updateNeededReplications(Block block, int curReplicasDelta,
|
|
int expectedReplicasDelta) {
|
|
int expectedReplicasDelta) {
|
|
namesystem.writeLock();
|
|
namesystem.writeLock();
|
|
try {
|
|
try {
|
|
@@ -1938,13 +1905,13 @@ public class BlockManager {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- public void checkReplication(Block block, int numExpectedReplicas) {
|
|
|
|
|
|
+ void checkReplication(Block block, int numExpectedReplicas) {
|
|
// filter out containingNodes that are marked for decommission.
|
|
// filter out containingNodes that are marked for decommission.
|
|
NumberReplicas number = countNodes(block);
|
|
NumberReplicas number = countNodes(block);
|
|
if (isNeededReplication(block, numExpectedReplicas, number.liveReplicas())) {
|
|
if (isNeededReplication(block, numExpectedReplicas, number.liveReplicas())) {
|
|
neededReplications.add(block,
|
|
neededReplications.add(block,
|
|
number.liveReplicas(),
|
|
number.liveReplicas(),
|
|
- number.decommissionedReplicas(),
|
|
|
|
|
|
+ number.decommissionedReplicas,
|
|
numExpectedReplicas);
|
|
numExpectedReplicas);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -1959,8 +1926,11 @@ public class BlockManager {
|
|
return fileINode.getReplication();
|
|
return fileINode.getReplication();
|
|
}
|
|
}
|
|
|
|
|
|
- /** Remove a datanode from the invalidatesSet */
|
|
|
|
- public void removeFromInvalidates(String storageID) {
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Remove a datanode from the invalidatesSet
|
|
|
|
+ * @param n datanode
|
|
|
|
+ */
|
|
|
|
+ void removeFromInvalidates(String storageID) {
|
|
Collection<Block> blocks = recentInvalidateSets.remove(storageID);
|
|
Collection<Block> blocks = recentInvalidateSets.remove(storageID);
|
|
if (blocks != null) {
|
|
if (blocks != null) {
|
|
pendingDeletionBlocksCount -= blocks.size();
|
|
pendingDeletionBlocksCount -= blocks.size();
|
|
@@ -2028,7 +1998,7 @@ public class BlockManager {
|
|
//Returns the number of racks over which a given block is replicated
|
|
//Returns the number of racks over which a given block is replicated
|
|
//decommissioning/decommissioned nodes are not counted. corrupt replicas
|
|
//decommissioning/decommissioned nodes are not counted. corrupt replicas
|
|
//are also ignored
|
|
//are also ignored
|
|
- public int getNumberOfRacks(Block b) {
|
|
|
|
|
|
+ int getNumberOfRacks(Block b) {
|
|
HashSet<String> rackSet = new HashSet<String>(0);
|
|
HashSet<String> rackSet = new HashSet<String>(0);
|
|
Collection<DatanodeDescriptor> corruptNodes =
|
|
Collection<DatanodeDescriptor> corruptNodes =
|
|
corruptReplicas.getNodes(b);
|
|
corruptReplicas.getNodes(b);
|
|
@@ -2086,32 +2056,32 @@ public class BlockManager {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- public long getMissingBlocksCount() {
|
|
|
|
|
|
+ long getMissingBlocksCount() {
|
|
// not locking
|
|
// not locking
|
|
return this.neededReplications.getCorruptBlockSize();
|
|
return this.neededReplications.getCorruptBlockSize();
|
|
}
|
|
}
|
|
|
|
|
|
- public BlockInfo addINode(BlockInfo block, INodeFile iNode) {
|
|
|
|
|
|
+ BlockInfo addINode(BlockInfo block, INodeFile iNode) {
|
|
return blocksMap.addINode(block, iNode);
|
|
return blocksMap.addINode(block, iNode);
|
|
}
|
|
}
|
|
|
|
|
|
- public INodeFile getINode(Block b) {
|
|
|
|
|
|
+ INodeFile getINode(Block b) {
|
|
return blocksMap.getINode(b);
|
|
return blocksMap.getINode(b);
|
|
}
|
|
}
|
|
|
|
|
|
- public void removeFromCorruptReplicasMap(Block block) {
|
|
|
|
|
|
+ void removeFromCorruptReplicasMap(Block block) {
|
|
corruptReplicas.removeFromCorruptReplicasMap(block);
|
|
corruptReplicas.removeFromCorruptReplicasMap(block);
|
|
}
|
|
}
|
|
|
|
|
|
- public int numCorruptReplicas(Block block) {
|
|
|
|
|
|
+ int numCorruptReplicas(Block block) {
|
|
return corruptReplicas.numCorruptReplicas(block);
|
|
return corruptReplicas.numCorruptReplicas(block);
|
|
}
|
|
}
|
|
|
|
|
|
- public void removeBlockFromMap(Block block) {
|
|
|
|
|
|
+ void removeBlockFromMap(Block block) {
|
|
blocksMap.removeBlock(block);
|
|
blocksMap.removeBlock(block);
|
|
}
|
|
}
|
|
|
|
|
|
- public int getCapacity() {
|
|
|
|
|
|
+ int getCapacity() {
|
|
namesystem.readLock();
|
|
namesystem.readLock();
|
|
try {
|
|
try {
|
|
return blocksMap.getCapacity();
|
|
return blocksMap.getCapacity();
|
|
@@ -2134,7 +2104,7 @@ public class BlockManager {
|
|
* @return Up to numExpectedBlocks blocks from startingBlockId if it exists
|
|
* @return Up to numExpectedBlocks blocks from startingBlockId if it exists
|
|
*
|
|
*
|
|
*/
|
|
*/
|
|
- public long[] getCorruptReplicaBlockIds(int numExpectedBlocks,
|
|
|
|
|
|
+ long[] getCorruptReplicaBlockIds(int numExpectedBlocks,
|
|
Long startingBlockId) {
|
|
Long startingBlockId) {
|
|
return corruptReplicas.getCorruptReplicaBlockIds(numExpectedBlocks,
|
|
return corruptReplicas.getCorruptReplicaBlockIds(numExpectedBlocks,
|
|
startingBlockId);
|
|
startingBlockId);
|
|
@@ -2143,7 +2113,7 @@ public class BlockManager {
|
|
/**
|
|
/**
|
|
* Return an iterator over the set of blocks for which there are no replicas.
|
|
* Return an iterator over the set of blocks for which there are no replicas.
|
|
*/
|
|
*/
|
|
- public BlockIterator getCorruptReplicaBlockIterator() {
|
|
|
|
|
|
+ UnderReplicatedBlocks.BlockIterator getCorruptReplicaBlockIterator() {
|
|
return neededReplications
|
|
return neededReplications
|
|
.iterator(UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
|
|
.iterator(UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
|
|
}
|
|
}
|