Explorar o código

HADOOP-1184. Fix HDFS decommissioning to complete when the only copy of a block is on a decomissioned node.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@535962 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting %!s(int64=18) %!d(string=hai) anos
pai
achega
580be427a5

+ 3 - 0
CHANGES.txt

@@ -329,6 +329,9 @@ Trunk (unreleased changes)
     that can fail before a job is aborted.  The default is zero.
     (Arun C Murthy via cutting)
 
+98. HADOOP-1184.  Fix HDFS decomissioning to complete when the only
+    copy of a block is on a decommissioned node. (Dhruba Borthakur via cutting)
+
 
 Release 0.12.3 - 2007-04-06
 

+ 116 - 101
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -186,6 +186,8 @@ class FSNamesystem implements FSConstants {
   private long heartbeatExpireInterval;
   //replicationRecheckInterval is how often namenode checks for new replication work
   private long replicationRecheckInterval;
+  //decommissionRecheckInterval is how often namenode checks if a node has finished decommission
+  private long decommissionRecheckInterval;
   static int replIndex = 0; // last datanode used for replication work
   static int REPL_WORK_PER_ITERATION = 32; // max percent datanodes per iteration
 
@@ -240,6 +242,9 @@ class FSNamesystem implements FSConstants {
     this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval +
       10 * heartbeatInterval;
     this.replicationRecheckInterval = 3 * 1000; //  3 second
+    this.decommissionRecheckInterval = conf.getInt(
+                                                   "dfs.namenode.decommission.interval",
+                                                   5 * 60 * 1000);
 
     this.localMachine = hostname;
     this.port = port;
@@ -407,10 +412,13 @@ class FSNamesystem implements FSConstants {
   /* updates a block in under replication queue */
   synchronized void updateNeededReplications(Block block,
                         int curReplicasDelta, int expectedReplicasDelta) {
-    int curReplicas = countContainingNodes( block );
+    NumberReplicas repl = countNodes(block);
     int curExpectedReplicas = getReplication(block);
-    neededReplications.update(block, curReplicas, curExpectedReplicas,
-                                     curReplicasDelta, expectedReplicasDelta);
+    neededReplications.update(block, 
+                              repl.liveReplicas(), 
+                              repl.decommissionedReplicas(),
+                              curExpectedReplicas,
+                              curReplicasDelta, expectedReplicasDelta);
   }
 
   /////////////////////////////////////////////////////////
@@ -863,10 +871,12 @@ class FSNamesystem implements FSConstants {
     int numExpectedReplicas = pendingFile.getReplication();
     for (int i = 0; i < nrBlocks; i++) {
       // filter out containingNodes that are marked for decommission.
-      int numCurrentReplica = countContainingNodes(pendingBlocks[i]);
-      if (numCurrentReplica < numExpectedReplicas) {
-        neededReplications.add(
-                               pendingBlocks[i], numCurrentReplica, numExpectedReplicas);
+      NumberReplicas number = countNodes(pendingBlocks[i]);
+      if (number.liveReplicas() < numExpectedReplicas) {
+        neededReplications.add(pendingBlocks[i], 
+                               number.liveReplicas(), 
+                               number.decommissionedReplicas,
+                               numExpectedReplicas);
       }
     }
     return COMPLETE_SUCCESS;
@@ -976,9 +986,8 @@ class FSNamesystem implements FSConstants {
 
     // Check how many copies we have of the block.  If we have at least one
     // copy on a live node, then we can delete it. 
-    int count = countContainingNodes(blk);
-    if ((count > 1) || ((count == 1) && (dn.isDecommissionInProgress() || 
-                                         dn.isDecommissioned()))) {
+    int count = countNodes(blk).liveReplicas();
+    if (count > 1) {
       addToInvalidates(blk, dn);
       removeStoredBlock(blk, getDatanode(dn));
       NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
@@ -1737,14 +1746,7 @@ class FSNamesystem implements FSConstants {
           break;
         }
         foundwork++;
-      } else {
-        //
-        // See if the decommissioned node has finished moving all
-        // its datablocks to another replica. This is a loose
-        // heuristic to determine when a decommission is really over.
-        //
-        checkDecommissionState(node);
-      }
+      } 
     }
   }
 
@@ -1757,8 +1759,10 @@ class FSNamesystem implements FSConstants {
     if (timedOutItems != null) {
       synchronized (this) {
         for (int i = 0; i < timedOutItems.length; i++) {
+          NumberReplicas num = countNodes(timedOutItems[i]);
           neededReplications.add(timedOutItems[i], 
-                                 countContainingNodes(timedOutItems[i]),
+                                 num.liveReplicas(),
+                                 num.decommissionedReplicas(),
                                  getReplication(timedOutItems[i]));
         }
       }
@@ -2076,7 +2080,8 @@ class FSNamesystem implements FSConstants {
       return block;
         
     // filter out containingNodes that are marked for decommission.
-    int numCurrentReplica = countContainingNodes(block)
+    NumberReplicas num = countNodes(block);
+    int numCurrentReplica = num.liveReplicas()
       + pendingReplications.getNumReplicas(block);
         
     // check whether safe replication is reached for the block
@@ -2086,7 +2091,8 @@ class FSNamesystem implements FSConstants {
     // handle underReplication/overReplication
     short fileReplication = fileINode.getReplication();
     if (numCurrentReplica >= fileReplication) {
-      neededReplications.remove(block, numCurrentReplica, fileReplication);
+      neededReplications.remove(block, numCurrentReplica, 
+                                num.decommissionedReplicas, fileReplication);
     } else {
       updateNeededReplications(block, curReplicaDelta, 0);
     }
@@ -2342,46 +2348,6 @@ class FSNamesystem implements FSConstants {
     node.stopDecommission();
   }
 
-  /**
-   * Return true if all specified nodes are decommissioned.
-   * Otherwise return false.
-   */
-  public synchronized boolean checkDecommissioned (String[] nodes) 
-    throws IOException {
-    String badnodes = "";
-    boolean isError = false;
-
-    synchronized (datanodeMap) {
-      for (int i = 0; i < nodes.length; i++) {
-        boolean found = false;
-        for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
-             it.hasNext();) {
-          DatanodeDescriptor node = it.next();
-
-          //
-          // If this is a node that we are interested in, check its admin state.
-          //
-          if (node.getName().equals(nodes[i]) || 
-              node.getHost().equals(nodes[i])) {
-            found = true;
-            boolean isDecommissioned = checkDecommissionStateInternal(node);
-            if (!isDecommissioned) {
-              return false;
-            }
-          }
-        }
-        if (!found) {
-          badnodes += nodes[i] + " ";
-          isError = true;
-        }
-      }
-    }
-    if (isError) {
-      throw new IOException("Nodes " + badnodes + " not found");
-    }
-    return true;
-  }
-
   /** 
    */
   public DatanodeInfo getDataNodeInfo(String name) {
@@ -2472,38 +2438,74 @@ class FSNamesystem implements FSConstants {
     return sendBlock.toArray(new Block[sendBlock.size()]);
   }
 
+
+  /**
+   * A immutable object that stores the number of live replicas and
+   * the number of decommissined Replicas.
+   */
+  static class NumberReplicas {
+    private int liveReplicas;
+    private int decommissionedReplicas;
+
+    NumberReplicas(int live, int decommissioned) {
+      liveReplicas = live;
+      decommissionedReplicas = decommissioned;
+    }
+
+    int liveReplicas() {
+      return liveReplicas;
+    }
+    int decommissionedReplicas() {
+      return decommissionedReplicas;
+    }
+  } 
+
   /*
-   * Counts the number of nodes in the given list. Skips over nodes
-   * that are marked for decommission.
+   * Counts the number of nodes in the given list into active and
+   * decommissioned counters.
    */
-  private int countContainingNodes(Iterator<DatanodeDescriptor> nodeIter) {
+  private NumberReplicas countNodes(Iterator<DatanodeDescriptor> nodeIter) {
     int count = 0;
-    while (nodeIter.hasNext()) {
+    int live = 0;
+    while ( nodeIter.hasNext() ) {
       DatanodeDescriptor node = nodeIter.next();
-      if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
+      if (node.isDecommissionInProgress() || node.isDecommissioned()) {
         count++;
       }
+      else {
+        live++;
+      }
     }
-    return count;
+    return new NumberReplicas(live, count);
   }
-    
-  /** wrapper for countContainingNodes(Iterator). */
-  private int countContainingNodes(Block b) {
-    return countContainingNodes(blocksMap.nodeIterator(b));
+
+  /** return the number of nodes that are live and decommissioned. */
+  private NumberReplicas countNodes(Block b) {
+    return countNodes(blocksMap.nodeIterator(b));
   }
 
-  /** Reeturns a newly allocated list exluding the decommisioned nodes. */
-  ArrayList<DatanodeDescriptor> containingNodeList(Block b) {
-    ArrayList<DatanodeDescriptor> nonCommissionedNodeList = 
+  /** Returns a newly allocated list of all nodes. Returns a count of
+  * live and decommissioned nodes. */
+  ArrayList<DatanodeDescriptor> containingNodeList(Block b, NumberReplicas[] numReplicas) {
+    ArrayList<DatanodeDescriptor> nodeList = 
       new ArrayList<DatanodeDescriptor>();
+    int count = 0;
+    int live = 0;
     for(Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b);
         it.hasNext();) {
       DatanodeDescriptor node = it.next();
       if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
-        nonCommissionedNodeList.add(node);
+        live++;
       }
+      else {
+        count++;
+      }
+      nodeList.add(node);
+    }
+    if (numReplicas != null) {
+      numReplicas[0] = new NumberReplicas(live, count);
     }
-    return nonCommissionedNodeList;
+    return nodeList;
   }
   /*
    * Return true if there are any blocks on this node that have not
@@ -2511,15 +2513,34 @@ class FSNamesystem implements FSConstants {
    */
   private boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
     Block decommissionBlocks[] = srcNode.getBlocks();
+    boolean status = false;
     for (int i = 0; i < decommissionBlocks.length; i++) {
       Block block = decommissionBlocks[i];
       FSDirectory.INode fileINode = blocksMap.getINode(block);
-      if (fileINode != null &&
-          fileINode.getReplication() > countContainingNodes(block)) {
-        return true;
+
+      if (fileINode != null) {
+        NumberReplicas num = countNodes(block);
+        int curReplicas = num.liveReplicas();
+        int curExpectedReplicas = getReplication(block);
+        if (curExpectedReplicas > curReplicas) {
+          status = true;
+          if (!neededReplications.contains(block) &&
+            pendingReplications.getNumReplicas(block) == 0) {
+            //
+            // These blocks have been reported from the datanode
+            // after the startDecommission method has been executed. These
+            // blocks were in flight when the decommission was started.
+            //
+            neededReplications.update(block, 
+                                      curReplicas,
+                                      num.decommissionedReplicas(),
+                                      curExpectedReplicas,
+                                      -1, 0);
+          }
+        }
       }
     }
-    return false;
+    return status;
   }
 
   /**
@@ -2528,7 +2549,7 @@ class FSNamesystem implements FSConstants {
    */
   private boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
     //
-    // Check to see if there are all blocks in this decommisioned
+    // Check to see if all blocks in this decommisioned
     // node has reached their target replication factor.
     //
     if (node.isDecommissionInProgress()) {
@@ -2543,18 +2564,6 @@ class FSNamesystem implements FSConstants {
     return false;
   }
 
-  /**
-   * Change, if appropriate, the admin state of a datanode to 
-   * decommission completed.
-   */
-  public synchronized void checkDecommissionState(DatanodeID nodeReg) {
-    DatanodeDescriptor node = datanodeMap.get(nodeReg.getStorageID());
-    if (node == null) {
-      return;
-    }
-    checkDecommissionStateInternal(node);
-  }
-
   /**
    * Return with a list of Block/DataNodeInfo sets, indicating
    * where various Blocks should be copied, ASAP.
@@ -2582,9 +2591,10 @@ class FSNamesystem implements FSConstants {
         // replicate them.
         //
         List<Block> replicateBlocks = new ArrayList<Block>();
-        List<Integer> numCurrentReplicas = new ArrayList<Integer>();
+        List<NumberReplicas> numCurrentReplicas = new ArrayList<NumberReplicas>();
         List<DatanodeDescriptor[]> replicateTargetSets;
         replicateTargetSets = new ArrayList<DatanodeDescriptor[]>();
+        NumberReplicas[] allReplicas = new NumberReplicas[1];
         for (Iterator<Block> it = neededReplications.iterator(); it.hasNext();) {
           if (needed <= 0) {
             break;
@@ -2596,7 +2606,7 @@ class FSNamesystem implements FSConstants {
             it.remove();
           } else {
             List<DatanodeDescriptor> containingNodes = 
-              containingNodeList(block);
+              containingNodeList(block, allReplicas);
             Collection<Block> excessBlocks = excessReplicateMap.get(
                                                                     srcNode.getStorageID());
 
@@ -2604,8 +2614,10 @@ class FSNamesystem implements FSConstants {
             // not be scheduled for removal on that node
             if (containingNodes.contains(srcNode)
                 && (excessBlocks == null || !excessBlocks.contains(block))) {
-              int numCurrentReplica = containingNodes.size() + 
+              int numCurrentReplica = allReplicas[0].liveReplicas() +
                 pendingReplications.getNumReplicas(block);
+              NumberReplicas repl = new NumberReplicas(numCurrentReplica,
+                                        allReplicas[0].decommissionedReplicas()); 
               if (numCurrentReplica >= fileINode.getReplication()) {
                 it.remove();
               } else {
@@ -2617,7 +2629,7 @@ class FSNamesystem implements FSConstants {
                 if (targets.length > 0) {
                   // Build items to return
                   replicateBlocks.add(block);
-                  numCurrentReplicas.add(new Integer(numCurrentReplica));
+                  numCurrentReplicas.add(repl);
                   replicateTargetSets.add(targets);
                   needed -= targets.length;
                 }
@@ -2638,11 +2650,14 @@ class FSNamesystem implements FSConstants {
             Block block = it.next();
             DatanodeDescriptor targets[] = 
               (DatanodeDescriptor[]) replicateTargetSets.get(i);
-            int numCurrentReplica = numCurrentReplicas.get(i).intValue();
+            int numCurrentReplica = numCurrentReplicas.get(i).liveReplicas();
             int numExpectedReplica = blocksMap.getINode(block).getReplication(); 
             if (numCurrentReplica + targets.length >= numExpectedReplica) {
               neededReplications.remove(
-                                        block, numCurrentReplica, numExpectedReplica);
+                                        block, 
+                                        numCurrentReplica, 
+                                        numCurrentReplicas.get(i).decommissionedReplicas(),
+                                        numExpectedReplica);
               pendingReplications.add(block, targets.length);
               NameNode.stateChangeLog.debug(
                                             "BLOCK* NameSystem.pendingTransfer: "
@@ -2797,7 +2812,7 @@ class FSNamesystem implements FSConstants {
           FSNamesystem.LOG.info(StringUtils.stringifyException(e));
         }
         try {
-          Thread.sleep(1000 * 60 * 5);
+          Thread.sleep(decommissionRecheckInterval);
         } catch (InterruptedException ie) {
         }
       }
@@ -3165,7 +3180,7 @@ class FSNamesystem implements FSConstants {
   void decrementSafeBlockCount(Block b) {
     if (safeMode == null) // mostly true
       return;
-    safeMode.decrementSafeBlockCount((short)countContainingNodes(b));
+    safeMode.decrementSafeBlockCount((short)countNodes(b).liveReplicas());
   }
 
   /**

+ 30 - 11
src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java

@@ -57,8 +57,17 @@ class UnderReplicatedBlocks {
    * @param expectedReplicas expected number of replicas of the block
    */
   private int getPriority(Block block, 
-                          int curReplicas, int expectedReplicas) {
-    if (curReplicas<=0 || curReplicas>=expectedReplicas) {
+                          int curReplicas, 
+                          int decommissionedReplicas,
+                          int expectedReplicas) {
+    if (curReplicas<0 || curReplicas>=expectedReplicas) {
+      return LEVEL; // no need to replicate
+    } else if(curReplicas==0) {
+      // If there are zero non-decommissioned replica but there are
+      // some decommissioned replicas, then assign them highest priority
+      if (decommissionedReplicas > 0) {
+        return 0;
+      }
       return LEVEL; // no need to replicate
     } else if(curReplicas==1) {
       return 0; // highest priority
@@ -75,12 +84,16 @@ class UnderReplicatedBlocks {
    * @param expectedReplicas expected number of replicas of the block
    */
   synchronized boolean add(
-                           Block block, int curReplicas, int expectedReplicas) {
-    if(curReplicas<=0 || expectedReplicas <= curReplicas) {
+                           Block block,
+                           int curReplicas, 
+                           int decomissionedReplicas,
+                           int expectedReplicas) {
+    if(curReplicas<0 || expectedReplicas <= curReplicas) {
       return false;
     }
-    int priLevel = getPriority(block, curReplicas, expectedReplicas);
-    if(priorityQueues.get(priLevel).add(block)) {
+    int priLevel = getPriority(block, curReplicas, decomissionedReplicas,
+                               expectedReplicas);
+    if(priLevel != LEVEL && priorityQueues.get(priLevel).add(block)) {
       NameNode.stateChangeLog.debug(
                                     "BLOCK* NameSystem.UnderReplicationBlock.add:"
                                     + block.getBlockName()
@@ -95,8 +108,12 @@ class UnderReplicatedBlocks {
 
   /* remove a block from a under replication queue */
   synchronized boolean remove(Block block, 
-                              int oldReplicas, int oldExpectedReplicas) {
-    int priLevel = getPriority(block, oldReplicas, oldExpectedReplicas);
+                              int oldReplicas, 
+                              int decommissionedReplicas,
+                              int oldExpectedReplicas) {
+    int priLevel = getPriority(block, oldReplicas, 
+                               decommissionedReplicas,
+                               oldExpectedReplicas);
     return remove(block, priLevel);
   }
       
@@ -124,12 +141,14 @@ class UnderReplicatedBlocks {
   }
       
   /* update the priority level of a block */
-  synchronized void update(Block block, int curReplicas, int curExpectedReplicas,
+  synchronized void update(Block block, int curReplicas, 
+                           int decommissionedReplicas,
+                           int curExpectedReplicas,
                            int curReplicasDelta, int expectedReplicasDelta) {
     int oldReplicas = curReplicas-curReplicasDelta;
     int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
-    int curPri = getPriority(block, curReplicas, curExpectedReplicas);
-    int oldPri = getPriority(block, oldReplicas, oldExpectedReplicas);
+    int curPri = getPriority(block, curReplicas, decommissionedReplicas, curExpectedReplicas);
+    int oldPri = getPriority(block, oldReplicas, decommissionedReplicas, oldExpectedReplicas);
     NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " + 
                                   block +
                                   " curReplicas " + curReplicas +

+ 1 - 0
src/test/org/apache/hadoop/dfs/MiniDFSCluster.java

@@ -115,6 +115,7 @@ public class MiniDFSCluster {
     }
     conf.setInt("dfs.replication", Math.min(3, numDataNodes));
     conf.setInt("dfs.safemode.extension", 0);
+    conf.setInt("dfs.namenode.decommission.interval", 3 * 1000); // 3 second
     
     // Format and clean out DataNode directories
     if (format) {

+ 26 - 17
src/test/org/apache/hadoop/dfs/TestDecommission.java

@@ -36,15 +36,14 @@ public class TestDecommission extends TestCase {
   static final long seed = 0xDEADBEEFL;
   static final int blockSize = 8192;
   static final int fileSize = 16384;
-  static final int numIterations = 2;
-  static final int numDatanodes = numIterations + 3;
+  static final int numDatanodes = 6;
 
 
   Random myrand = new Random();
   Path hostsFile;
   Path excludeFile;
 
-  ArrayList<String> decommissionedNodes = new ArrayList<String>(numIterations);
+  ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes);
 
   private enum NodeState {NORMAL, DECOMMISSION_INPROGRESS, DECOMMISSIONED; }
 
@@ -91,6 +90,19 @@ public class TestDecommission extends TestCase {
     }
   }
 
+  private void printFileLocations(FileSystem fileSys, Path name)
+  throws IOException {
+    String[][] locations = fileSys.getFileCacheHints(name, 0, fileSize);
+    for (int idx = 0; idx < locations.length; idx++) {
+      String[] loc = locations[idx];
+      System.out.print("Block[" + idx + "] : ");
+      for (int j = 0; j < loc.length; j++) {
+        System.out.print(loc[j] + " ");
+      }
+      System.out.println("");
+    }
+  }
+
   /**
    * For blocks that reside on the nodes that are down, verify that their
    * replication factor is 1 more than the specified one.
@@ -223,7 +235,7 @@ public class TestDecommission extends TestCase {
     boolean done = checkNodeState(filesys, node, state);
     while (!done) {
       System.out.println("Waiting for node " + node +
-                         " to change state...");
+                         " to change state to " + state);
       try {
         Thread.sleep(1000);
       } catch (InterruptedException e) {
@@ -260,24 +272,21 @@ public class TestDecommission extends TestCase {
     DistributedFileSystem dfs = (DistributedFileSystem) fileSys;
 
     try {
-      for (int iteration = 0; iteration < numIterations; iteration++) {
+      for (int iteration = 0; iteration < numDatanodes - 1; iteration++) {
+        int replicas = numDatanodes - iteration - 1;
         //
         // Decommission one node. Verify that node is decommissioned.
-        // Verify that replication factor of file has increased from 3
-        // to 4. This means one replica is on decommissioned node.
         // 
-        Path file1 = new Path("smallblocktest.dat");
-        writeFile(fileSys, file1, 3);
-        checkFile(fileSys, file1, 3);
-
-        String downnode  = decommissionNode(client, fileSys, localFileSys);
-        waitNodeState(fileSys, downnode, NodeState.DECOMMISSION_INPROGRESS);
-        commissionNode(fileSys, localFileSys, downnode);
-        waitNodeState(fileSys, downnode, NodeState.NORMAL);
-        downnode  = decommissionNode(client, fileSys, localFileSys);
+        Path file1 = new Path("decommission.dat");
+        writeFile(fileSys, file1, replicas);
+        System.out.println("Created file decommission.dat with " +
+                           replicas + " replicas.");
+        checkFile(fileSys, file1, replicas);
+        printFileLocations(fileSys, file1);
+        String downnode = decommissionNode(client, fileSys, localFileSys);
         decommissionedNodes.add(downnode);
         waitNodeState(fileSys, downnode, NodeState.DECOMMISSIONED);
-        checkFile(fileSys, file1, 3, downnode);
+        checkFile(fileSys, file1, replicas, downnode);
         cleanupFile(fileSys, file1);
         cleanupFile(localFileSys, dir);
       }