|
@@ -203,8 +203,11 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
private long decommissionRecheckInterval;
|
|
|
// default block size of a file
|
|
|
private long defaultBlockSize = 0;
|
|
|
- private int replIndex = 0; // last datanode used for replication work
|
|
|
- static int REPL_WORK_PER_ITERATION = 32; // max percent datanodes per iteration
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Last block index used for replication work.
|
|
|
+ */
|
|
|
+ private int replIndex = 0;
|
|
|
|
|
|
public static FSNamesystem fsNamesystemObject;
|
|
|
private String localMachine;
|
|
@@ -392,10 +395,10 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
"heartbeat.recheck.interval", 5 * 60 * 1000); // 5 minutes
|
|
|
this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval +
|
|
|
10 * heartbeatInterval;
|
|
|
- this.replicationRecheckInterval = 3 * 1000; // 3 second
|
|
|
- this.replicationRecheckInterval = conf.getInt("dfs.replication.interval", 3) * 1000;
|
|
|
- this.decommissionRecheckInterval = conf.getInt("dfs.namenode.decommission.interval",
|
|
|
- 5 * 60) * 1000;
|
|
|
+ this.replicationRecheckInterval =
|
|
|
+ conf.getInt("dfs.replication.interval", 3) * 1000L;
|
|
|
+ this.decommissionRecheckInterval =
|
|
|
+ conf.getInt("dfs.namenode.decommission.interval", 5 * 60) * 1000L;
|
|
|
this.defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
|
|
|
this.maxFsObjects = conf.getLong("dfs.max.objects", 0);
|
|
|
this.blockInvalidateLimit = Math.max(this.blockInvalidateLimit,
|
|
@@ -474,18 +477,14 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
synchronized (neededReplications) {
|
|
|
out.println("Metasave: Blocks waiting for replication: " +
|
|
|
neededReplications.size());
|
|
|
- if (neededReplications.size() > 0) {
|
|
|
- for (Iterator<Block> it = neededReplications.iterator();
|
|
|
- it.hasNext();) {
|
|
|
- Block block = it.next();
|
|
|
- out.print(block);
|
|
|
- for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
|
|
|
- jt.hasNext();) {
|
|
|
- DatanodeDescriptor node = jt.next();
|
|
|
- out.print(" " + node + " : ");
|
|
|
- }
|
|
|
- out.println("");
|
|
|
+ for (Block block : neededReplications) {
|
|
|
+ out.print(block);
|
|
|
+ for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
|
|
|
+ jt.hasNext();) {
|
|
|
+ DatanodeDescriptor node = jt.next();
|
|
|
+ out.print(" " + node + " : ");
|
|
|
}
|
|
|
+ out.println("");
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2212,6 +2211,8 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
* Periodically calls computeReplicationWork().
|
|
|
*/
|
|
|
class ReplicationMonitor implements Runnable {
|
|
|
+ static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32;
|
|
|
+ static final float REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
|
|
|
public void run() {
|
|
|
while (fsRunning) {
|
|
|
try {
|
|
@@ -2219,6 +2220,8 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
processPendingReplications();
|
|
|
Thread.sleep(replicationRecheckInterval);
|
|
|
} catch (InterruptedException ie) {
|
|
|
+ LOG.warn("ReplicationMonitor thread received InterruptedException." + ie);
|
|
|
+ break;
|
|
|
} catch (IOException ie) {
|
|
|
LOG.warn("ReplicationMonitor thread received exception. " + ie);
|
|
|
} catch (Throwable t) {
|
|
@@ -2229,81 +2232,276 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /////////////////////////////////////////////////////////
|
|
|
+ //
|
|
|
+ // These methods are called by the Namenode system, to see
|
|
|
+ // if there is any work for registered datanodes.
|
|
|
+ //
|
|
|
+ /////////////////////////////////////////////////////////
|
|
|
/**
|
|
|
- * Look at a few datanodes and compute any replication work that
|
|
|
- * can be scheduled on them. The datanode will be infomed of this
|
|
|
- * work at the next heartbeat.
|
|
|
+ * Compute block replication and block invalidation work
|
|
|
+ * that can be scheduled on data-nodes.
|
|
|
+ * The datanode will be informed of this work at the next heartbeat.
|
|
|
+ *
|
|
|
+ * @return number of blocks scheduled for replication or removal.
|
|
|
*/
|
|
|
- void computeDatanodeWork() throws IOException {
|
|
|
- int numiter = 0;
|
|
|
- int foundwork = 0;
|
|
|
- int hsize = 0;
|
|
|
- int lastReplIndex = -1;
|
|
|
+ int computeDatanodeWork() throws IOException {
|
|
|
+ int workFound = 0;
|
|
|
+ int blocksToProcess = 0;
|
|
|
+ int nodesToProcess = 0;
|
|
|
+ synchronized(heartbeats) {
|
|
|
+ blocksToProcess = (int)(heartbeats.size()
|
|
|
+ * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
|
|
|
+ nodesToProcess = (int)Math.ceil((double)heartbeats.size()
|
|
|
+ * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100);
|
|
|
+ }
|
|
|
|
|
|
- while (true) {
|
|
|
- DatanodeDescriptor node = null;
|
|
|
+ workFound = computeReplicationWork(blocksToProcess);
|
|
|
+ if(workFound == 0)
|
|
|
+ workFound = computeInvalidateWork(nodesToProcess);
|
|
|
+ return workFound;
|
|
|
+ }
|
|
|
|
|
|
- //
|
|
|
- // pick the datanode that was the last one in the
|
|
|
- // previous invocation of this method.
|
|
|
- //
|
|
|
- synchronized (heartbeats) {
|
|
|
- hsize = heartbeats.size();
|
|
|
- if (numiter++ >= hsize) {
|
|
|
- // no change in replIndex.
|
|
|
- if (lastReplIndex >= 0) {
|
|
|
- //next time, start after where the last replication was scheduled
|
|
|
- replIndex = lastReplIndex;
|
|
|
- }
|
|
|
- break;
|
|
|
- }
|
|
|
- if (replIndex >= hsize) {
|
|
|
+ private int computeInvalidateWork(int nodesToProcess) {
|
|
|
+ int blockCnt = 0;
|
|
|
+ for(int nodeCnt = 0; nodeCnt < nodesToProcess; nodeCnt++ ) {
|
|
|
+ int work = invalidateWorkForOneNode();
|
|
|
+ if(work == 0)
|
|
|
+ break;
|
|
|
+ blockCnt += work;
|
|
|
+ }
|
|
|
+ return blockCnt;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Scan blocks in {@link #neededReplications} and assign replication
|
|
|
+ * work to data-nodes they belong to.
|
|
|
+ *
|
|
|
+ * The number of process blocks equals either twice the number of live
|
|
|
+ * data-nodes or the number of under-replicated blocks whichever is less.
|
|
|
+ *
|
|
|
+ * @return number of blocks scheduled for replication during this iteration.
|
|
|
+ */
|
|
|
+ private synchronized int computeReplicationWork(
|
|
|
+ int blocksToProcess) throws IOException {
|
|
|
+ int scheduledReplicationCount = 0;
|
|
|
+ // blocks should not be replicated or removed if safe mode is on
|
|
|
+ if (isInSafeMode())
|
|
|
+ return scheduledReplicationCount;
|
|
|
+
|
|
|
+ synchronized(neededReplications) {
|
|
|
+ // # of blocks to process equals either twice the number of live
|
|
|
+ // data-nodes or the number of under-replicated blocks whichever is less
|
|
|
+ blocksToProcess = Math.min(blocksToProcess, neededReplications.size());
|
|
|
+ if(blocksToProcess == 0)
|
|
|
+ return scheduledReplicationCount;
|
|
|
+
|
|
|
+ // Go through all blocks that need replications.
|
|
|
+ // Select source and target nodes for replication.
|
|
|
+ Iterator<Block> neededReplicationsIterator = neededReplications.iterator();
|
|
|
+ // skip to the first unprocessed block, which is at replIndex
|
|
|
+ for(int i=0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
|
|
|
+ neededReplicationsIterator.next();
|
|
|
+ }
|
|
|
+ // process blocks
|
|
|
+ for(int blkCnt = 0; blkCnt < blocksToProcess; blkCnt++, replIndex++) {
|
|
|
+ if( ! neededReplicationsIterator.hasNext()) {
|
|
|
+ // start from the beginning
|
|
|
replIndex = 0;
|
|
|
+ blocksToProcess = Math.min(blocksToProcess, neededReplications.size());
|
|
|
+ if(blkCnt >= blocksToProcess)
|
|
|
+ break;
|
|
|
+ neededReplicationsIterator = neededReplications.iterator();
|
|
|
+ assert neededReplicationsIterator.hasNext() :
|
|
|
+ "neededReplications should not be empty.";
|
|
|
}
|
|
|
- node = heartbeats.get(replIndex);
|
|
|
- replIndex++;
|
|
|
- }
|
|
|
|
|
|
- //
|
|
|
- // Is there replication work to be computed for this datanode?
|
|
|
- //
|
|
|
- int precomputed = node.getNumberOfBlocksToBeReplicated();
|
|
|
- int needed = this.maxReplicationStreams - precomputed;
|
|
|
- boolean doReplication = false;
|
|
|
- boolean doInvalidation = false;
|
|
|
- if (needed > 0) {
|
|
|
- //
|
|
|
- // Compute replication work and store work into the datanode
|
|
|
- //
|
|
|
- Object replsets[] = pendingTransfers(node, needed);
|
|
|
- if (replsets != null) {
|
|
|
- doReplication = true;
|
|
|
- addBlocksToBeReplicated(node, (Block[])replsets[0],
|
|
|
- (DatanodeDescriptor[][])replsets[1]);
|
|
|
- lastReplIndex = replIndex;
|
|
|
+ Block block = neededReplicationsIterator.next();
|
|
|
+
|
|
|
+ // block should belong to a file
|
|
|
+ INodeFile fileINode = blocksMap.getINode(block);
|
|
|
+ if(fileINode == null) { // abandoned block
|
|
|
+ neededReplicationsIterator.remove(); // remove from neededReplications
|
|
|
+ replIndex--;
|
|
|
+ continue;
|
|
|
}
|
|
|
- }
|
|
|
- if (!doReplication) {
|
|
|
- //
|
|
|
- // Determine if block deletion is pending for this datanode
|
|
|
- //
|
|
|
- Block blocklist[] = blocksToInvalidate(node);
|
|
|
- if (blocklist != null) {
|
|
|
- doInvalidation = true;
|
|
|
- addBlocksToBeInvalidated(node, blocklist);
|
|
|
+ int requiredReplication = fileINode.getReplication();
|
|
|
+
|
|
|
+ // get a source data-node
|
|
|
+ List<DatanodeDescriptor> containingNodes =
|
|
|
+ new ArrayList<DatanodeDescriptor>();
|
|
|
+ NumberReplicas numReplicas = new NumberReplicas();
|
|
|
+ DatanodeDescriptor srcNode =
|
|
|
+ chooseSourceDatanode(block, containingNodes, numReplicas);
|
|
|
+ if(srcNode == null) // block can not be replicated from any node
|
|
|
+ continue;
|
|
|
+
|
|
|
+ // do not schedule more if enough replicas is already pending
|
|
|
+ int numEffectiveReplicas = numReplicas.liveReplicas() +
|
|
|
+ pendingReplications.getNumReplicas(block);
|
|
|
+ if(numEffectiveReplicas >= requiredReplication) {
|
|
|
+ neededReplicationsIterator.remove(); // remove from neededReplications
|
|
|
+ replIndex--;
|
|
|
+ NameNode.stateChangeLog.info("BLOCK* "
|
|
|
+ + "Removing block " + block.getBlockName()
|
|
|
+ + " from neededReplications as it does not belong to any file.");
|
|
|
+ continue;
|
|
|
}
|
|
|
- }
|
|
|
- if (doReplication || doInvalidation) {
|
|
|
- //
|
|
|
- // If we have already computed work for a predefined
|
|
|
- // number of datanodes in this iteration, then relax
|
|
|
- //
|
|
|
- if (foundwork > ((hsize * REPL_WORK_PER_ITERATION)/100)) {
|
|
|
- break;
|
|
|
+
|
|
|
+ // choose replication targets
|
|
|
+ int maxTargets =
|
|
|
+ maxReplicationStreams - srcNode.getNumberOfBlocksToBeReplicated();
|
|
|
+ assert maxTargets > 0 : "Datanode " + srcNode.getName()
|
|
|
+ + " should have not been selected as a source for replication.";
|
|
|
+ DatanodeDescriptor targets[] = replicator.chooseTarget(
|
|
|
+ Math.min(requiredReplication - numEffectiveReplicas, maxTargets),
|
|
|
+ srcNode, containingNodes, null, block.getNumBytes());
|
|
|
+ if(targets.length == 0)
|
|
|
+ continue;
|
|
|
+ // Add block to the to be replicated list
|
|
|
+ srcNode.addBlockToBeReplicated(block, targets);
|
|
|
+ scheduledReplicationCount++;
|
|
|
+
|
|
|
+ // Move the block-replication into a "pending" state.
|
|
|
+ // The reason we use 'pending' is so we can retry
|
|
|
+ // replications that fail after an appropriate amount of time.
|
|
|
+ if(numEffectiveReplicas + targets.length >= requiredReplication) {
|
|
|
+ neededReplicationsIterator.remove(); // remove from neededReplications
|
|
|
+ replIndex--;
|
|
|
+ pendingReplications.add(block, targets.length);
|
|
|
+ NameNode.stateChangeLog.debug(
|
|
|
+ "BLOCK* block " + block.getBlockName()
|
|
|
+ + " is moved from neededReplications to pendingReplications");
|
|
|
}
|
|
|
- foundwork++;
|
|
|
- }
|
|
|
+ if (NameNode.stateChangeLog.isInfoEnabled()) {
|
|
|
+ StringBuffer targetList = new StringBuffer("datanode(s)");
|
|
|
+ for (int k = 0; k < targets.length; k++) {
|
|
|
+ targetList.append(' ');
|
|
|
+ targetList.append(targets[k].getName());
|
|
|
+ }
|
|
|
+ NameNode.stateChangeLog.info(
|
|
|
+ "BLOCK* ask "
|
|
|
+ + srcNode.getName() + " to replicate "
|
|
|
+ + block.getBlockName() + " to " + targetList);
|
|
|
+ NameNode.stateChangeLog.debug(
|
|
|
+ "BLOCK* neededReplications = " + neededReplications.size()
|
|
|
+ + " pendingReplications = " + pendingReplications.size());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return scheduledReplicationCount;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Parse the data-nodes the block belongs to and choose one,
|
|
|
+ * which will be the replication source.
|
|
|
+ *
|
|
|
+ * We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes
|
|
|
+ * since the former do not have write traffic and hence are less busy.
|
|
|
+ * We do not use already decommissioned nodes as a source.
|
|
|
+ * Otherwise we choose a random node among those that did not reach their
|
|
|
+ * replication limit.
|
|
|
+ *
|
|
|
+ * In addition form a list of all nodes containing the block
|
|
|
+ * and calculate its replication numbers.
|
|
|
+ */
|
|
|
+ private DatanodeDescriptor chooseSourceDatanode(
|
|
|
+ Block block,
|
|
|
+ List<DatanodeDescriptor> containingNodes,
|
|
|
+ NumberReplicas numReplicas) {
|
|
|
+ containingNodes.clear();
|
|
|
+ DatanodeDescriptor srcNode = null;
|
|
|
+ int live = 0;
|
|
|
+ int decommissioned = 0;
|
|
|
+ Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
|
|
|
+ while(it.hasNext()) {
|
|
|
+ DatanodeDescriptor node = it.next();
|
|
|
+ if(!node.isDecommissionInProgress() && !node.isDecommissioned())
|
|
|
+ live++;
|
|
|
+ else
|
|
|
+ decommissioned++;
|
|
|
+ containingNodes.add(node);
|
|
|
+ if(node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
|
|
|
+ continue; // already reached replication limit
|
|
|
+ // the block must not be scheduled for removal on srcNode
|
|
|
+ Collection<Block> excessBlocks =
|
|
|
+ excessReplicateMap.get(node.getStorageID());
|
|
|
+ if(excessBlocks != null && excessBlocks.contains(block))
|
|
|
+ continue;
|
|
|
+ // never use already decommissioned nodes
|
|
|
+ if(node.isDecommissioned())
|
|
|
+ continue;
|
|
|
+ // we prefer nodes that are in DECOMMISSION_INPROGRESS state
|
|
|
+ if(node.isDecommissionInProgress() || srcNode == null) {
|
|
|
+ srcNode = node;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if(srcNode.isDecommissionInProgress())
|
|
|
+ continue;
|
|
|
+ // switch to a different node randomly
|
|
|
+ // this to prevent from deterministically selecting the same node even
|
|
|
+ // if the node failed to replicate the block on previous iterations
|
|
|
+ if(r.nextBoolean())
|
|
|
+ srcNode = node;
|
|
|
+ }
|
|
|
+ if(numReplicas != null)
|
|
|
+ numReplicas.initialize(live, decommissioned);
|
|
|
+ return srcNode;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get blocks to invalidate for the first node
|
|
|
+ * in {@link #recentInvalidateSets}.
|
|
|
+ *
|
|
|
+ * @return number of blocks scheduled for removal during this iteration.
|
|
|
+ */
|
|
|
+ private synchronized int invalidateWorkForOneNode() {
|
|
|
+ // blocks should not be replicated or removed if safe mode is on
|
|
|
+ if (isInSafeMode())
|
|
|
+ return 0;
|
|
|
+ if(recentInvalidateSets.isEmpty())
|
|
|
+ return 0;
|
|
|
+ // get blocks to invalidate for the first node
|
|
|
+ String firstNodeId = recentInvalidateSets.keySet().iterator().next();
|
|
|
+ assert firstNodeId != null;
|
|
|
+ DatanodeDescriptor dn = datanodeMap.get(firstNodeId);
|
|
|
+ Collection<Block> invalidateSet = recentInvalidateSets.remove(firstNodeId);
|
|
|
+
|
|
|
+ if(invalidateSet == null || dn == null)
|
|
|
+ return 0;
|
|
|
+
|
|
|
+ ArrayList<Block> blocksToInvalidate =
|
|
|
+ new ArrayList<Block>(blockInvalidateLimit);
|
|
|
+
|
|
|
+ // # blocks that can be sent in one message is limited
|
|
|
+ Iterator<Block> it = invalidateSet.iterator();
|
|
|
+ for(int blkCount = 0; blkCount < blockInvalidateLimit && it.hasNext();
|
|
|
+ blkCount++) {
|
|
|
+ blocksToInvalidate.add(it.next());
|
|
|
+ it.remove();
|
|
|
}
|
|
|
+
|
|
|
+ // If we could not send everything in this message, reinsert this item
|
|
|
+ // into the collection.
|
|
|
+ if(it.hasNext())
|
|
|
+ recentInvalidateSets.put(firstNodeId, invalidateSet);
|
|
|
+
|
|
|
+ dn.addBlocksToBeInvalidated(blocksToInvalidate);
|
|
|
+
|
|
|
+ if(NameNode.stateChangeLog.isInfoEnabled()) {
|
|
|
+ StringBuffer blockList = new StringBuffer();
|
|
|
+ for(Block blk : blocksToInvalidate) {
|
|
|
+ blockList.append(' ');
|
|
|
+ blockList.append(blk.getBlockName());
|
|
|
+ }
|
|
|
+ NameNode.stateChangeLog.info("BLOCK* ask "
|
|
|
+ + dn.getName() + " to delete " + blockList);
|
|
|
+ }
|
|
|
+ return blocksToInvalidate.size();
|
|
|
+ }
|
|
|
+
|
|
|
+ void setNodeReplicationLimit(int limit) {
|
|
|
+ this.maxReplicationStreams = limit;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2325,36 +2523,6 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Add more replication work for this datanode.
|
|
|
- */
|
|
|
- synchronized void addBlocksToBeReplicated(DatanodeDescriptor node,
|
|
|
- Block[] blocklist,
|
|
|
- DatanodeDescriptor[][] targets)
|
|
|
- throws IOException {
|
|
|
- //
|
|
|
- // Find the datanode with the FSNamesystem lock held.
|
|
|
- //
|
|
|
- DatanodeDescriptor n = getDatanode(node);
|
|
|
- if (n != null) {
|
|
|
- n.addBlocksToBeReplicated(blocklist, targets);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Add more block invalidation work for this datanode.
|
|
|
- */
|
|
|
- synchronized void addBlocksToBeInvalidated(DatanodeDescriptor node,
|
|
|
- Block[] blocklist) throws IOException {
|
|
|
- //
|
|
|
- // Find the datanode with the FSNamesystem lock held.
|
|
|
- //
|
|
|
- DatanodeDescriptor n = getDatanode(node);
|
|
|
- if (n != null) {
|
|
|
- n.addBlocksToBeInvalidated(blocklist);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* remove a datanode descriptor
|
|
|
* @param nodeID datanode ID
|
|
@@ -3125,78 +3293,23 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
short getMinReplication() { return (short)minReplication; }
|
|
|
short getDefaultReplication() { return (short)defaultReplication; }
|
|
|
|
|
|
- /////////////////////////////////////////////////////////
|
|
|
- //
|
|
|
- // These methods are called by the Namenode system, to see
|
|
|
- // if there is any work for a given datanode.
|
|
|
- //
|
|
|
- /////////////////////////////////////////////////////////
|
|
|
-
|
|
|
- /**
|
|
|
- * Check if there are any recently-deleted blocks a datanode should remove.
|
|
|
- */
|
|
|
- public synchronized Block[] blocksToInvalidate(DatanodeID nodeID) {
|
|
|
- // Ask datanodes to perform block delete
|
|
|
- // only if safe mode is off.
|
|
|
- if (isInSafeMode())
|
|
|
- return null;
|
|
|
-
|
|
|
- Collection<Block> invalidateSet = recentInvalidateSets.remove(
|
|
|
- nodeID.getStorageID());
|
|
|
-
|
|
|
- if (invalidateSet == null) {
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- Iterator<Block> it = null;
|
|
|
- int sendNum = invalidateSet.size();
|
|
|
- ArrayList<Block> sendBlock = new ArrayList<Block>(sendNum);
|
|
|
-
|
|
|
- //
|
|
|
- // calculate the number of blocks that we send in one message
|
|
|
- //
|
|
|
- sendNum = Math.min(sendNum, blockInvalidateLimit);
|
|
|
-
|
|
|
- //
|
|
|
- // Copy the first chunk into sendBlock
|
|
|
- //
|
|
|
- for (it = invalidateSet.iterator(); sendNum > 0; sendNum--) {
|
|
|
- assert(it.hasNext());
|
|
|
- sendBlock.add(it.next());
|
|
|
- it.remove();
|
|
|
- }
|
|
|
-
|
|
|
- //
|
|
|
- // If we could not send everything in this message, reinsert this item
|
|
|
- // into the collection.
|
|
|
- //
|
|
|
- if (it.hasNext()) {
|
|
|
- recentInvalidateSets.put(nodeID.getStorageID(), invalidateSet);
|
|
|
- }
|
|
|
-
|
|
|
- if (NameNode.stateChangeLog.isInfoEnabled()) {
|
|
|
- StringBuffer blockList = new StringBuffer();
|
|
|
- for (int i = 0; i < sendBlock.size(); i++) {
|
|
|
- blockList.append(' ');
|
|
|
- Block block = sendBlock.get(i);
|
|
|
- blockList.append(block.getBlockName());
|
|
|
- }
|
|
|
- NameNode.stateChangeLog.info("BLOCK* NameSystem.blockToInvalidate: "
|
|
|
- +"ask "+nodeID.getName()+" to delete " + blockList);
|
|
|
- }
|
|
|
- return sendBlock.toArray(new Block[sendBlock.size()]);
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
/**
|
|
|
* A immutable object that stores the number of live replicas and
|
|
|
* the number of decommissined Replicas.
|
|
|
*/
|
|
|
- static class NumberReplicas {
|
|
|
+ private static class NumberReplicas {
|
|
|
private int liveReplicas;
|
|
|
private int decommissionedReplicas;
|
|
|
|
|
|
+ NumberReplicas() {
|
|
|
+ initialize(0, 0);
|
|
|
+ }
|
|
|
+
|
|
|
NumberReplicas(int live, int decommissioned) {
|
|
|
+ initialize(live, decommissioned);
|
|
|
+ }
|
|
|
+
|
|
|
+ void initialize(int live, int decommissioned) {
|
|
|
liveReplicas = live;
|
|
|
decommissionedReplicas = decommissioned;
|
|
|
}
|
|
@@ -3235,32 +3348,6 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
return countNodes(blocksMap.nodeIterator(b));
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Returns a newly allocated list of all nodes. Returns a count of
|
|
|
- * live and decommissioned nodes.
|
|
|
- */
|
|
|
- ArrayList<DatanodeDescriptor> containingNodeList(Block b, NumberReplicas[] numReplicas) {
|
|
|
- ArrayList<DatanodeDescriptor> nodeList =
|
|
|
- new ArrayList<DatanodeDescriptor>();
|
|
|
- int count = 0;
|
|
|
- int live = 0;
|
|
|
- for(Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b);
|
|
|
- it.hasNext();) {
|
|
|
- DatanodeDescriptor node = it.next();
|
|
|
- if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
|
|
|
- live++;
|
|
|
- }
|
|
|
- else {
|
|
|
- count++;
|
|
|
- }
|
|
|
- nodeList.add(node);
|
|
|
- }
|
|
|
- if (numReplicas != null) {
|
|
|
- numReplicas[0] = new NumberReplicas(live, count);
|
|
|
- }
|
|
|
- return nodeList;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Return true if there are any blocks on this node that have not
|
|
|
* yet reached their replication factor. Otherwise returns false.
|
|
@@ -3318,140 +3405,6 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Return with a list of Block/DataNodeInfo sets, indicating
|
|
|
- * where various Blocks should be copied, ASAP.
|
|
|
- *
|
|
|
- * The Array that we return consists of two objects:
|
|
|
- * The 1st elt is an array of Blocks.
|
|
|
- * The 2nd elt is a 2D array of DatanodeDescriptor objs, identifying the
|
|
|
- * target sequence for the Block at the appropriate index.
|
|
|
- *
|
|
|
- */
|
|
|
- public synchronized Object[] pendingTransfers(DatanodeID srcNode,
|
|
|
- int needed) {
|
|
|
- // Ask datanodes to perform block replication
|
|
|
- // only if safe mode is off.
|
|
|
- if (isInSafeMode())
|
|
|
- return null;
|
|
|
-
|
|
|
- synchronized (neededReplications) {
|
|
|
- Object results[] = null;
|
|
|
-
|
|
|
- if (neededReplications.size() > 0) {
|
|
|
- //
|
|
|
- // Go through all blocks that need replications. See if any
|
|
|
- // are present at the current node. If so, ask the node to
|
|
|
- // replicate them.
|
|
|
- //
|
|
|
- List<Block> replicateBlocks = new ArrayList<Block>();
|
|
|
- List<NumberReplicas> numCurrentReplicas = new ArrayList<NumberReplicas>();
|
|
|
- List<DatanodeDescriptor[]> replicateTargetSets;
|
|
|
- replicateTargetSets = new ArrayList<DatanodeDescriptor[]>();
|
|
|
- NumberReplicas[] allReplicas = new NumberReplicas[1];
|
|
|
- for (Iterator<Block> it = neededReplications.iterator(); it.hasNext();) {
|
|
|
- if (needed <= 0) {
|
|
|
- break;
|
|
|
- }
|
|
|
- Block block = it.next();
|
|
|
- long blockSize = block.getNumBytes();
|
|
|
- INodeFile fileINode = blocksMap.getINode(block);
|
|
|
- if (fileINode == null) { // block does not belong to any file
|
|
|
- it.remove();
|
|
|
- } else {
|
|
|
- List<DatanodeDescriptor> containingNodes =
|
|
|
- containingNodeList(block, allReplicas);
|
|
|
- Collection<Block> excessBlocks = excessReplicateMap.get(
|
|
|
- srcNode.getStorageID());
|
|
|
-
|
|
|
- // srcNode must contain the block, and the block must
|
|
|
- // not be scheduled for removal on that node
|
|
|
- if (containingNodes.contains(srcNode)
|
|
|
- && (excessBlocks == null || !excessBlocks.contains(block))) {
|
|
|
- int numCurrentReplica = allReplicas[0].liveReplicas() +
|
|
|
- pendingReplications.getNumReplicas(block);
|
|
|
- NumberReplicas repl = new NumberReplicas(numCurrentReplica,
|
|
|
- allReplicas[0].decommissionedReplicas());
|
|
|
- if (numCurrentReplica >= fileINode.getReplication()) {
|
|
|
- it.remove();
|
|
|
- } else {
|
|
|
- DatanodeDescriptor targets[] = replicator.chooseTarget(
|
|
|
- Math.min(fileINode.getReplication() - numCurrentReplica,
|
|
|
- needed),
|
|
|
- datanodeMap.get(srcNode.getStorageID()),
|
|
|
- containingNodes, null, blockSize);
|
|
|
- if (targets.length > 0) {
|
|
|
- // Build items to return
|
|
|
- replicateBlocks.add(block);
|
|
|
- numCurrentReplicas.add(repl);
|
|
|
- replicateTargetSets.add(targets);
|
|
|
- needed -= targets.length;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- //
|
|
|
- // Move the block-replication into a "pending" state.
|
|
|
- // The reason we use 'pending' is so we can retry
|
|
|
- // replications that fail after an appropriate amount of time.
|
|
|
- // (REMIND - mjc - this timer is not yet implemented.)
|
|
|
- //
|
|
|
- if (replicateBlocks.size() > 0) {
|
|
|
- int i = 0;
|
|
|
- for (Iterator<Block> it = replicateBlocks.iterator(); it.hasNext(); i++) {
|
|
|
- Block block = it.next();
|
|
|
- DatanodeDescriptor targets[] = replicateTargetSets.get(i);
|
|
|
- int numCurrentReplica = numCurrentReplicas.get(i).liveReplicas();
|
|
|
- int numExpectedReplica = blocksMap.getINode(block).getReplication();
|
|
|
- if (numCurrentReplica + targets.length >= numExpectedReplica) {
|
|
|
- neededReplications.remove(
|
|
|
- block,
|
|
|
- numCurrentReplica,
|
|
|
- numCurrentReplicas.get(i).decommissionedReplicas(),
|
|
|
- numExpectedReplica);
|
|
|
- pendingReplications.add(block, targets.length);
|
|
|
- NameNode.stateChangeLog.debug(
|
|
|
- "BLOCK* NameSystem.pendingTransfer: "
|
|
|
- + block.getBlockName()
|
|
|
- + " is removed from neededReplications to pendingReplications");
|
|
|
- }
|
|
|
-
|
|
|
- if (NameNode.stateChangeLog.isInfoEnabled()) {
|
|
|
- StringBuffer targetList = new StringBuffer("datanode(s)");
|
|
|
- for (int k = 0; k < targets.length; k++) {
|
|
|
- targetList.append(' ');
|
|
|
- targetList.append(targets[k].getName());
|
|
|
- }
|
|
|
- NameNode.stateChangeLog.info(
|
|
|
- "BLOCK* NameSystem.pendingTransfer: " + "ask "
|
|
|
- + srcNode.getName() + " to replicate "
|
|
|
- + block.getBlockName() + " to " + targetList);
|
|
|
- NameNode.stateChangeLog.debug(
|
|
|
- "BLOCK* neededReplications = " + neededReplications.size()
|
|
|
- + " pendingReplications = " + pendingReplications.size());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- //
|
|
|
- // Build returned objects from above lists
|
|
|
- //
|
|
|
- DatanodeDescriptor targetMatrix[][] =
|
|
|
- new DatanodeDescriptor[replicateTargetSets.size()][];
|
|
|
- for (i = 0; i < targetMatrix.length; i++) {
|
|
|
- targetMatrix[i] = replicateTargetSets.get(i);
|
|
|
- }
|
|
|
-
|
|
|
- results = new Object[2];
|
|
|
- results[0] = replicateBlocks.toArray(new Block[replicateBlocks.size()]);
|
|
|
- results[1] = targetMatrix;
|
|
|
- }
|
|
|
- }
|
|
|
- return results;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Keeps track of which datanodes are allowed to connect to the namenode.
|
|
|
*/
|