|
@@ -717,7 +717,10 @@ class FSNamesystem implements FSConstants {
|
|
|
// the blocks.
|
|
|
for (int i = 0; i < nrBlocks; i++) {
|
|
|
SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]);
|
|
|
- if (containingNodes.size() < pendingFile.getReplication()) {
|
|
|
+ // filter out containingNodes that are marked for decommission.
|
|
|
+ int numCurrentReplica = countContainingNodes(containingNodes);
|
|
|
+
|
|
|
+ if (numCurrentReplica < pendingFile.getReplication()) {
|
|
|
NameNode.stateChangeLog.debug(
|
|
|
"DIR* NameSystem.completeFile:"
|
|
|
+ pendingBlocks[i].getBlockName()+" has only "+containingNodes.size()
|
|
@@ -1585,20 +1588,25 @@ class FSNamesystem implements FSConstants {
|
|
|
FSDirectory.INode fileINode = dir.getFileByBlock(block);
|
|
|
if( fileINode == null ) // block does not belong to any file
|
|
|
return;
|
|
|
+
|
|
|
+ // filter out containingNodes that are marked for decommission.
|
|
|
+ int numCurrentReplica = countContainingNodes(containingNodes);
|
|
|
+
|
|
|
// check whether safe replication is reached for the block
|
|
|
// only if it is a part of a files
|
|
|
- incrementSafeBlockCount( containingNodes.size() );
|
|
|
+ incrementSafeBlockCount( numCurrentReplica );
|
|
|
short fileReplication = fileINode.getReplication();
|
|
|
- if (containingNodes.size() >= fileReplication ) {
|
|
|
+ if (numCurrentReplica >= fileReplication ) {
|
|
|
neededReplications.remove(block);
|
|
|
pendingReplications.remove(block);
|
|
|
NameNode.stateChangeLog.trace("BLOCK* NameSystem.addStoredBlock: "
|
|
|
- +block.getBlockName()+" has "+containingNodes.size()
|
|
|
+ +block.getBlockName()+" has "+ numCurrentReplica
|
|
|
+" replicas so is removed from neededReplications and pendingReplications" );
|
|
|
- } else {// containingNodes.size() < fileReplication
|
|
|
+
|
|
|
+ } else {// numCurrentReplica < fileReplication
|
|
|
neededReplications.add(block);
|
|
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.addStoredBlock: "
|
|
|
- +block.getBlockName()+" has only "+containingNodes.size()
|
|
|
+ +block.getBlockName()+" has only "+ numCurrentReplica
|
|
|
+" replicas so is added to neededReplications" );
|
|
|
}
|
|
|
|
|
@@ -1620,7 +1628,9 @@ class FSNamesystem implements FSConstants {
|
|
|
DatanodeDescriptor cur = it.next();
|
|
|
Collection<Block> excessBlocks = excessReplicateMap.get(cur.getStorageID());
|
|
|
if (excessBlocks == null || ! excessBlocks.contains(block)) {
|
|
|
+ if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
|
|
|
nonExcess.add(cur);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
chooseExcessReplicates(nonExcess, block, replication);
|
|
@@ -1811,6 +1821,145 @@ class FSNamesystem implements FSConstants {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Start decommissioning the specified datanodes. If a datanode is
|
|
|
+ * already being decommissioned, then this is a no-op.
|
|
|
+ */
|
|
|
+ public synchronized void startDecommission (String[] nodes)
|
|
|
+ throws IOException {
|
|
|
+ if (isInSafeMode()) {
|
|
|
+ throw new SafeModeException("Cannot decommission node ", safeMode);
|
|
|
+ }
|
|
|
+ boolean isError = false;
|
|
|
+ String badnodes = "";
|
|
|
+
|
|
|
+ synchronized (datanodeMap) {
|
|
|
+ for (int i = 0; i < nodes.length; i++) {
|
|
|
+ boolean found = false;
|
|
|
+ for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
|
|
|
+ it.hasNext(); ) {
|
|
|
+ DatanodeDescriptor node = it.next();
|
|
|
+
|
|
|
+ //
|
|
|
+ // If this is a node that we are interested in, set its admin state.
|
|
|
+ //
|
|
|
+ if (node.getName().equals(nodes[i]) ||
|
|
|
+ node.getHost().equals(nodes[i])) {
|
|
|
+ found = true;
|
|
|
+ if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
|
|
|
+ LOG.info("Start Decommissioning node " + node.name);
|
|
|
+ node.startDecommission();
|
|
|
+ //
|
|
|
+ // all those blocks that resides on this node has to be
|
|
|
+ // replicated.
|
|
|
+ Block decommissionBlocks[] = node.getBlocks();
|
|
|
+ for (int j = 0; j < decommissionBlocks.length; j++) {
|
|
|
+ synchronized (neededReplications) {
|
|
|
+ neededReplications.add(decommissionBlocks[j]);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //
|
|
|
+ // Record the fact that a specified node was not found
|
|
|
+ //
|
|
|
+ if (!found) {
|
|
|
+ badnodes += nodes[i] + " ";
|
|
|
+ isError = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (isError) {
|
|
|
+ throw new IOException("Nodes " + badnodes + " not found");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Stop decommissioning the specified datanodes.
|
|
|
+ */
|
|
|
+ public synchronized void stopDecommission (String[] nodes)
|
|
|
+ throws IOException {
|
|
|
+ if (isInSafeMode()) {
|
|
|
+ throw new SafeModeException("Cannot decommission node ", safeMode);
|
|
|
+ }
|
|
|
+ boolean isError = false;
|
|
|
+ String badnodes = "";
|
|
|
+
|
|
|
+ synchronized (datanodeMap) {
|
|
|
+ for (int i = 0; i < nodes.length; i++) {
|
|
|
+ boolean found = false;
|
|
|
+ for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
|
|
|
+ it.hasNext(); ) {
|
|
|
+ DatanodeDescriptor node = it.next();
|
|
|
+
|
|
|
+ //
|
|
|
+ // If this is a node that we are interested in, set its admin state.
|
|
|
+ //
|
|
|
+ if (node.getName().equals(nodes[i]) ||
|
|
|
+ node.getHost().equals(nodes[i])) {
|
|
|
+ LOG.info("Stop Decommissioning node " + node.name);
|
|
|
+ found = true;
|
|
|
+ node.stopDecommission();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //
|
|
|
+ // Record the fact that a specified node was not found
|
|
|
+ //
|
|
|
+ if (!found) {
|
|
|
+ badnodes += nodes[i] + " ";
|
|
|
+ isError = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (isError) {
|
|
|
+ throw new IOException("Nodes " + badnodes + " not found");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Return true if all specified nodes are decommissioned.
|
|
|
+ * Otherwise return false.
|
|
|
+ */
|
|
|
+ public synchronized boolean checkDecommissioned (String[] nodes)
|
|
|
+ throws IOException {
|
|
|
+ String badnodes = "";
|
|
|
+ boolean isError = false;
|
|
|
+
|
|
|
+ synchronized (datanodeMap) {
|
|
|
+ for (int i = 0; i < nodes.length; i++) {
|
|
|
+ boolean found = false;
|
|
|
+ for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
|
|
|
+ it.hasNext(); ) {
|
|
|
+ DatanodeDescriptor node = it.next();
|
|
|
+
|
|
|
+ //
|
|
|
+ // If this is a node that we are interested in, check its admin state.
|
|
|
+ //
|
|
|
+ if (node.getName().equals(nodes[i]) ||
|
|
|
+ node.getHost().equals(nodes[i])) {
|
|
|
+ found = true;
|
|
|
+ boolean isDecommissioned = checkDecommissionStateInternal(node);
|
|
|
+ if (!isDecommissioned) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!found) {
|
|
|
+ badnodes += nodes[i] + " ";
|
|
|
+ isError = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (isError) {
|
|
|
+ throw new IOException("Nodes " + badnodes + " not found");
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
*/
|
|
|
public DatanodeInfo getDataNodeInfo(String name) {
|
|
@@ -1896,6 +2045,72 @@ class FSNamesystem implements FSConstants {
|
|
|
return (Block[]) sendBlock.toArray(new Block[sendBlock.size()]);
|
|
|
}
|
|
|
|
|
|
+ /*
|
|
|
+ * Counts the number of nodes in the given list. Skips over nodes
|
|
|
+ * that are marked for decommission.
|
|
|
+ */
|
|
|
+ private int countContainingNodes(Collection<DatanodeDescriptor> nodelist) {
|
|
|
+ int count = 0;
|
|
|
+ for (Iterator<DatanodeDescriptor> it = nodelist.iterator();
|
|
|
+ it.hasNext(); ) {
|
|
|
+ DatanodeDescriptor node = it.next();
|
|
|
+ if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
|
|
|
+ count++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return count;
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Return true if there are any blocks in neededReplication that
|
|
|
+ * reside on the specified node. Otherwise returns false.
|
|
|
+ */
|
|
|
+ private boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
|
|
|
+ synchronized (neededReplications) {
|
|
|
+ for (Iterator<Block> it = neededReplications.iterator(); it.hasNext();){
|
|
|
+ Block block = it.next();
|
|
|
+ Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
+ if (containingNodes.contains(srcNode)) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Change, if appropriate, the admin state of a datanode to
|
|
|
+ * decommission completed. Return true if decommission is complete.
|
|
|
+ */
|
|
|
+ private boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
|
|
|
+ //
|
|
|
+ // Check to see if there are any blocks in the neededReplication
|
|
|
+ // data structure that has a replica on the node being decommissioned.
|
|
|
+ //
|
|
|
+ if (node.isDecommissionInProgress()) {
|
|
|
+ if (!isReplicationInProgress(node)) {
|
|
|
+ node.setDecommissioned();
|
|
|
+ LOG.info("Decommission complete for node " + node.name);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (node.isDecommissioned()) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Change, if appropriate, the admin state of a datanode to
|
|
|
+ * decommission completed.
|
|
|
+ */
|
|
|
+ public synchronized void checkDecommissionState(DatanodeID nodeReg) {
|
|
|
+ DatanodeDescriptor node = datanodeMap.get(nodeReg.getStorageID());
|
|
|
+ if (node == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ checkDecommissionStateInternal(node);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Return with a list of Block/DataNodeInfo sets, indicating
|
|
|
* where various Blocks should be copied, ASAP.
|
|
@@ -1924,6 +2139,7 @@ class FSNamesystem implements FSConstants {
|
|
|
// replicate them.
|
|
|
//
|
|
|
List<Block> replicateBlocks = new ArrayList<Block>();
|
|
|
+ List<Integer> numCurrentReplicas = new ArrayList<Integer>();
|
|
|
List<DatanodeDescriptor[]> replicateTargetSets;
|
|
|
replicateTargetSets = new ArrayList<DatanodeDescriptor[]>();
|
|
|
for (Iterator<Block> it = neededReplications.iterator(); it.hasNext();) {
|
|
@@ -1943,17 +2159,23 @@ class FSNamesystem implements FSConstants {
|
|
|
Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
Collection<Block> excessBlocks = excessReplicateMap.get(
|
|
|
srcNode.getStorageID() );
|
|
|
+
|
|
|
// srcNode must contain the block, and the block must
|
|
|
// not be scheduled for removal on that node
|
|
|
if (containingNodes != null && containingNodes.contains(srcNode)
|
|
|
&& (excessBlocks == null || ! excessBlocks.contains(block))) {
|
|
|
+
|
|
|
+ // filter out containingNodes that are marked for decommission.
|
|
|
+ int numCurrentReplica = countContainingNodes(containingNodes);
|
|
|
+
|
|
|
DatanodeDescriptor targets[] = chooseTargets(
|
|
|
- Math.min( fileINode.getReplication() - containingNodes.size(),
|
|
|
+ Math.min( fileINode.getReplication() - numCurrentReplica,
|
|
|
this.maxReplicationStreams - xmitsInProgress),
|
|
|
containingNodes, null, blockSize);
|
|
|
if (targets.length > 0) {
|
|
|
// Build items to return
|
|
|
replicateBlocks.add(block);
|
|
|
+ numCurrentReplicas.add(new Integer(numCurrentReplica));
|
|
|
replicateTargetSets.add(targets);
|
|
|
scheduledXfers += targets.length;
|
|
|
}
|
|
@@ -1973,9 +2195,10 @@ class FSNamesystem implements FSConstants {
|
|
|
Block block = it.next();
|
|
|
DatanodeDescriptor targets[] =
|
|
|
(DatanodeDescriptor[]) replicateTargetSets.get(i);
|
|
|
+ int numCurrentReplica = numCurrentReplicas.get(i).intValue();
|
|
|
Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block);
|
|
|
|
|
|
- if (containingNodes.size() + targets.length >=
|
|
|
+ if (numCurrentReplica + targets.length >=
|
|
|
dir.getFileByBlock( block).getReplication() ) {
|
|
|
neededReplications.remove(block);
|
|
|
pendingReplications.add(block);
|
|
@@ -2060,7 +2283,8 @@ class FSNamesystem implements FSConstants {
|
|
|
it.hasNext();) {
|
|
|
DatanodeDescriptor node = it.next();
|
|
|
if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) &&
|
|
|
- clientMachine.toString().equals(node.getHost())) {
|
|
|
+ clientMachine.toString().equals(node.getHost()) &&
|
|
|
+ !node.isDecommissionInProgress() && !node.isDecommissioned()) {
|
|
|
if ((node.getRemaining() >= blockSize * MIN_BLOCKS_FOR_WRITE) &&
|
|
|
(node.getXceiverCount() <= (2.0 * avgLoad))) {
|
|
|
targets.add(node);
|
|
@@ -2084,6 +2308,7 @@ class FSNamesystem implements FSConstants {
|
|
|
DatanodeDescriptor node = heartbeats.get(idx);
|
|
|
if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) &&
|
|
|
!targets.contains(node) &&
|
|
|
+ !node.isDecommissionInProgress() && !node.isDecommissioned() &&
|
|
|
(node.getRemaining() >= blockSize * MIN_BLOCKS_FOR_WRITE) &&
|
|
|
(node.getXceiverCount() <= (2.0 * avgLoad))) {
|
|
|
target = node;
|
|
@@ -2100,6 +2325,7 @@ class FSNamesystem implements FSConstants {
|
|
|
DatanodeDescriptor node = heartbeats.get(idx);
|
|
|
if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) &&
|
|
|
!targets.contains(node) &&
|
|
|
+ !node.isDecommissionInProgress() && !node.isDecommissioned() &&
|
|
|
node.getRemaining() >= blockSize) {
|
|
|
target = node;
|
|
|
break;
|