瀏覽代碼

merge -r 1213536:1213537 Merging from trunk to branch-0.23 to fix HDFS-1765

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1441577 13f79535-47bb-0310-9956-ffa450edef68
Kihwal Lee 12 年之前
父節點
當前提交
3d31884755

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -24,6 +24,9 @@ Release 0.23.7 - UNRELEASED
   BUG FIXES
     HDFS-4288. NN accepts incremental BR as IBR in safemode (daryn via kihwal)
 
+    HDFS-1765. Block Replication should respect under-replication
+    block priority. (Uma Maheswara Rao G via eli)
+
 Release 0.23.6 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 10 - 71
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -181,9 +181,6 @@ public class BlockManager {
   /** variable to enable check for enough racks */
   final boolean shouldCheckForEnoughRacks;
 
-  /** Last block index used for replication work. */
-  private int replIndex = 0;
-
   /** for block replicas placement */
   private BlockPlacementPolicy blockplacement;
   
@@ -954,74 +951,16 @@ public class BlockManager {
    * @return number of blocks scheduled for replication during this iteration.
    */
   private int computeReplicationWork(int blocksToProcess) throws IOException {
-    // Choose the blocks to be replicated
-    List<List<Block>> blocksToReplicate =
-      chooseUnderReplicatedBlocks(blocksToProcess);
-
-    // replicate blocks
-    return computeReplicationWorkForBlocks(blocksToReplicate);
-  }
-
-  /**
-   * Get a list of block lists to be replicated The index of block lists
-   * represents the
-   *
-   * @param blocksToProcess
-   * @return Return a list of block lists to be replicated. The block list index
-   *         represents its replication priority.
-   */
-  private List<List<Block>> chooseUnderReplicatedBlocks(int blocksToProcess) {
-    // initialize data structure for the return value
-    List<List<Block>> blocksToReplicate = new ArrayList<List<Block>>(
-        UnderReplicatedBlocks.LEVEL);
-    for (int i = 0; i < UnderReplicatedBlocks.LEVEL; i++) {
-      blocksToReplicate.add(new ArrayList<Block>());
-    }
+    List<List<Block>> blocksToReplicate = null;
     namesystem.writeLock();
     try {
-      synchronized (neededReplications) {
-        if (neededReplications.size() == 0) {
-          return blocksToReplicate;
-        }
-
-        // Go through all blocks that need replications.
-        UnderReplicatedBlocks.BlockIterator neededReplicationsIterator = 
-            neededReplications.iterator();
-        // skip to the first unprocessed block, which is at replIndex
-        for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
-          neededReplicationsIterator.next();
-        }
-        // # of blocks to process equals either twice the number of live
-        // data-nodes or the number of under-replicated blocks whichever is less
-        blocksToProcess = Math.min(blocksToProcess, neededReplications.size());
-
-        for (int blkCnt = 0; blkCnt < blocksToProcess; blkCnt++, replIndex++) {
-          if (!neededReplicationsIterator.hasNext()) {
-            // start from the beginning
-            replIndex = 0;
-            blocksToProcess = Math.min(blocksToProcess, neededReplications
-                .size());
-            if (blkCnt >= blocksToProcess)
-              break;
-            neededReplicationsIterator = neededReplications.iterator();
-            assert neededReplicationsIterator.hasNext() : "neededReplications should not be empty.";
-          }
-
-          Block block = neededReplicationsIterator.next();
-          int priority = neededReplicationsIterator.getPriority();
-          if (priority < 0 || priority >= blocksToReplicate.size()) {
-            LOG.warn("Unexpected replication priority: "
-                + priority + " " + block);
-          } else {
-            blocksToReplicate.get(priority).add(block);
-          }
-        } // end for
-      } // end synchronized neededReplication
+      // Choose the blocks to be replicated
+      blocksToReplicate = neededReplications
+          .chooseUnderReplicatedBlocks(blocksToProcess);
     } finally {
       namesystem.writeUnlock();
     }
-
-    return blocksToReplicate;
+    return computeReplicationWorkForBlocks(blocksToReplicate);
   }
 
   /** Replicate a set of blocks
@@ -1050,7 +989,7 @@ public class BlockManager {
             // abandoned block or block reopened for append
             if(fileINode == null || fileINode.isUnderConstruction()) {
               neededReplications.remove(block, priority); // remove from neededReplications
-              replIndex--;
+              neededReplications.decrementReplicationIndex(priority);
               continue;
             }
 
@@ -1074,7 +1013,7 @@ public class BlockManager {
               if ( (pendingReplications.getNumReplicas(block) > 0) ||
                    (blockHasEnoughRacks(block)) ) {
                 neededReplications.remove(block, priority); // remove from neededReplications
-                replIndex--;
+                neededReplications.decrementReplicationIndex(priority);
                 blockLog.info("BLOCK* "
                     + "Removing block " + block
                     + " from neededReplications as it has enough replicas.");
@@ -1135,7 +1074,7 @@ public class BlockManager {
           if(fileINode == null || fileINode.isUnderConstruction()) {
             neededReplications.remove(block, priority); // remove from neededReplications
             rw.targets = null;
-            replIndex--;
+            neededReplications.decrementReplicationIndex(priority);
             continue;
           }
           requiredReplication = fileINode.getReplication();
@@ -1149,7 +1088,7 @@ public class BlockManager {
             if ( (pendingReplications.getNumReplicas(block) > 0) ||
                  (blockHasEnoughRacks(block)) ) {
               neededReplications.remove(block, priority); // remove from neededReplications
-              replIndex--;
+              neededReplications.decrementReplicationIndex(priority);
               rw.targets = null;
               NameNode.stateChangeLog.info("BLOCK* "
                   + "Removing block " + block
@@ -1187,7 +1126,7 @@ public class BlockManager {
           // remove from neededReplications
           if(numEffectiveReplicas + targets.length >= requiredReplication) {
             neededReplications.remove(block, priority); // remove from neededReplications
-            replIndex--;
+            neededReplications.decrementReplicationIndex(priority);
           }
         }
       }

+ 80 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java

@@ -18,10 +18,12 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.TreeSet;
+import java.util.Map;
 
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
@@ -84,10 +86,14 @@ class UnderReplicatedBlocks implements Iterable<Block> {
   private List<LightWeightLinkedSet<Block>> priorityQueues
       = new ArrayList<LightWeightLinkedSet<Block>>();
 
+  /** Stores the replication index for each priority */
+  private Map<Integer, Integer> priorityToReplIdx = new HashMap<Integer, Integer>(LEVEL);
+  
   /** Create an object. */
   UnderReplicatedBlocks() {
     for (int i = 0; i < LEVEL; i++) {
       priorityQueues.add(new LightWeightLinkedSet<Block>());
+      priorityToReplIdx.put(i, 0);
     }
   }
 
@@ -303,6 +309,70 @@ class UnderReplicatedBlocks implements Iterable<Block> {
       }
     }
   }
+  
+  /**
+   * Get a list of block lists to be replicated. The index of block lists
+   * represents its replication priority. Replication index will be tracked for
+   * each priority list separately in priorityToReplIdx map. Iterates through
+   * all priority lists and find the elements after replication index. Once the
+   * last priority lists reaches to end, all replication indexes will be set to
+   * 0 and start from 1st priority list to fulfill the blockToProces count.
+   * 
+   * @param blocksToProcess - number of blocks to fetch from underReplicated blocks.
+   * @return Return a list of block lists to be replicated. The block list index
+   *         represents its replication priority.
+   */
+  public synchronized List<List<Block>> chooseUnderReplicatedBlocks(
+      int blocksToProcess) {
+    // initialize data structure for the return value
+    List<List<Block>> blocksToReplicate = new ArrayList<List<Block>>(LEVEL);
+    for (int i = 0; i < LEVEL; i++) {
+      blocksToReplicate.add(new ArrayList<Block>());
+    }
+
+    if (size() == 0) { // There are no blocks to collect.
+      return blocksToReplicate;
+    }
+    
+    int blockCount = 0;
+    for (int priority = 0; priority < LEVEL; priority++) { 
+      // Go through all blocks that need replications with current priority.
+      BlockIterator neededReplicationsIterator = iterator(priority);
+      Integer replIndex = priorityToReplIdx.get(priority);
+      
+      // skip to the first unprocessed block, which is at replIndex
+      for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
+        neededReplicationsIterator.next();
+      }
+
+      blocksToProcess = Math.min(blocksToProcess, size());
+      
+      if (blockCount == blocksToProcess) {
+        break;  // break if already expected blocks are obtained
+      }
+      
+      // Loop through all remaining blocks in the list.
+      while (blockCount < blocksToProcess
+          && neededReplicationsIterator.hasNext()) {
+        Block block = neededReplicationsIterator.next();
+        blocksToReplicate.get(priority).add(block);
+        replIndex++;
+        blockCount++;
+      }
+      
+      if (!neededReplicationsIterator.hasNext()
+          && neededReplicationsIterator.getPriority() == LEVEL - 1) {
+        // reset all priorities replication index to 0 because there is no
+        // recently added blocks in any list.
+        for (int i = 0; i < LEVEL; i++) {
+          priorityToReplIdx.put(i, 0);
+        }
+        break;
+      }
+      priorityToReplIdx.put(priority, replIndex); 
+    }
+    return blocksToReplicate;
+  }
 
   /** returns an iterator of all blocks in a given priority queue */
   synchronized BlockIterator iterator(int level) {
@@ -383,4 +453,14 @@ class UnderReplicatedBlocks implements Iterable<Block> {
       return level;
     }
   }
+
+  /**
+   * This method is to decrement the replication index for the given priority
+   * 
+   * @param priority  - int priority level
+   */
+  public void decrementReplicationIndex(int priority) {
+    Integer replIdx = priorityToReplIdx.get(priority);
+    priorityToReplIdx.put(priority, --replIdx); 
+  }
 }

+ 135 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java

@@ -17,26 +17,32 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import static org.junit.Assert.*;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
-
-import junit.framework.TestCase;
+import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
+import org.junit.Test;
 
-public class TestReplicationPolicy extends TestCase {
+public class TestReplicationPolicy {
+  private Random random= DFSUtil.getRandom();
   private static final int BLOCK_SIZE = 1024;
   private static final int NUM_OF_DATANODES = 6;
   private static final Configuration CONF = new HdfsConfiguration();
@@ -90,6 +96,7 @@ public class TestReplicationPolicy extends TestCase {
    * the 1st is on dataNodes[0] and the 2nd is on a different rack.
    * @throws Exception
    */
+  @Test
   public void testChooseTarget1() throws Exception {
     dataNodes[0].updateHeartbeat(
         2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 
@@ -150,6 +157,7 @@ public class TestReplicationPolicy extends TestCase {
    * should be placed on a third rack.
    * @throws Exception
    */
+  @Test
   public void testChooseTarget2() throws Exception { 
     HashMap<Node, Node> excludedNodes;
     DatanodeDescriptor[] targets;
@@ -225,6 +233,7 @@ public class TestReplicationPolicy extends TestCase {
    * and the rest should be placed on the third rack.
    * @throws Exception
    */
+  @Test
   public void testChooseTarget3() throws Exception {
     // make data node 0 to be not qualified to choose
     dataNodes[0].updateHeartbeat(
@@ -278,6 +287,7 @@ public class TestReplicationPolicy extends TestCase {
    * the 3rd replica should be placed on the same rack as the 1st replica,
    * @throws Exception
    */
+  @Test
   public void testChoooseTarget4() throws Exception {
     // make data node 0 & 1 to be not qualified to choose: not enough disk space
     for(int i=0; i<2; i++) {
@@ -325,6 +335,7 @@ public class TestReplicationPolicy extends TestCase {
    * the 3rd replica should be placed on the same rack as the 2nd replica,
    * @throws Exception
    */
+  @Test
   public void testChooseTarget5() throws Exception {
     DatanodeDescriptor[] targets;
     targets = replicator.chooseTarget(filename,
@@ -354,6 +365,7 @@ public class TestReplicationPolicy extends TestCase {
    * the 1st replica. The 3rd replica can be placed randomly.
    * @throws Exception
    */
+  @Test
   public void testRereplicate1() throws Exception {
     List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
     chosenNodes.add(dataNodes[0]);    
@@ -388,6 +400,7 @@ public class TestReplicationPolicy extends TestCase {
    * the rest replicas can be placed randomly,
    * @throws Exception
    */
+  @Test
   public void testRereplicate2() throws Exception {
     List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
     chosenNodes.add(dataNodes[0]);
@@ -417,6 +430,7 @@ public class TestReplicationPolicy extends TestCase {
    * the rest replicas can be placed randomly,
    * @throws Exception
    */
+  @Test
   public void testRereplicate3() throws Exception {
     List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
     chosenNodes.add(dataNodes[0]);
@@ -450,4 +464,122 @@ public class TestReplicationPolicy extends TestCase {
     assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
   }
   
+  /**
+   * Test for the high priority blocks are processed before the low priority
+   * blocks.
+   */
+  @Test(timeout = 60000)
+  public void testReplicationWithPriority() throws Exception {
+    int DFS_NAMENODE_REPLICATION_INTERVAL = 1000;
+    int HIGH_PRIORITY = 0;
+    Configuration conf = new Configuration();
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+        .format(true).build();
+    try {
+      cluster.waitActive();
+      final UnderReplicatedBlocks neededReplications = (UnderReplicatedBlocks) cluster
+          .getNameNode().getNamesystem().getBlockManager().neededReplications;
+      for (int i = 0; i < 100; i++) {
+        // Adding the blocks directly to normal priority
+        neededReplications.add(new Block(random.nextLong()), 2, 0, 3);
+      }
+      // Lets wait for the replication interval, to start process normal
+      // priority blocks
+      Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
+      
+      // Adding the block directly to high priority list
+      neededReplications.add(new Block(random.nextLong()), 1, 0, 3);
+      
+      // Lets wait for the replication interval
+      Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
+
+      // Check replication completed successfully. Need not wait till it process
+      // all the 100 normal blocks.
+      assertFalse("Not able to clear the element from high priority list",
+          neededReplications.iterator(HIGH_PRIORITY).hasNext());
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+  /**
+   * Test for the ChooseUnderReplicatedBlocks are processed based on priority
+   */
+  @Test
+  public void testChooseUnderReplicatedBlocks() throws Exception {
+    UnderReplicatedBlocks underReplicatedBlocks = new UnderReplicatedBlocks();
+
+    for (int i = 0; i < 5; i++) {
+      // Adding QUEUE_HIGHEST_PRIORITY block
+      underReplicatedBlocks.add(new Block(random.nextLong()), 1, 0, 3);
+
+      // Adding QUEUE_VERY_UNDER_REPLICATED block
+      underReplicatedBlocks.add(new Block(random.nextLong()), 2, 0, 7);
+
+      // Adding QUEUE_UNDER_REPLICATED block
+      underReplicatedBlocks.add(new Block(random.nextLong()), 6, 0, 6);
+
+      // Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block
+      underReplicatedBlocks.add(new Block(random.nextLong()), 5, 0, 6);
+
+      // Adding QUEUE_WITH_CORRUPT_BLOCKS block
+      underReplicatedBlocks.add(new Block(random.nextLong()), 0, 0, 3);
+    }
+
+    // Choose 6 blocks from UnderReplicatedBlocks. Then it should pick 5 blocks
+    // from
+    // QUEUE_HIGHEST_PRIORITY and 1 block from QUEUE_VERY_UNDER_REPLICATED.
+    List<List<Block>> chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(6);
+    assertTheChosenBlocks(chosenBlocks, 5, 1, 0, 0, 0);
+
+    // Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 4 blocks from
+    // QUEUE_VERY_UNDER_REPLICATED, 5 blocks from QUEUE_UNDER_REPLICATED and 1
+    // block from QUEUE_REPLICAS_BADLY_DISTRIBUTED.
+    chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(10);
+    assertTheChosenBlocks(chosenBlocks, 0, 4, 5, 1, 0);
+
+    // Adding QUEUE_HIGHEST_PRIORITY
+    underReplicatedBlocks.add(new Block(random.nextLong()), 1, 0, 3);
+
+    // Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 1 block from
+    // QUEUE_HIGHEST_PRIORITY, 4 blocks from QUEUE_REPLICAS_BADLY_DISTRIBUTED
+    // and 5 blocks from QUEUE_WITH_CORRUPT_BLOCKS.
+    chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(10);
+    assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 4, 5);
+
+    // Since it is reached to end of all lists,
+    // should start picking the blocks from start.
+    // Choose 7 blocks from UnderReplicatedBlocks. Then it should pick 6 blocks from
+    // QUEUE_HIGHEST_PRIORITY, 1 block from QUEUE_VERY_UNDER_REPLICATED.
+    chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(7);
+    assertTheChosenBlocks(chosenBlocks, 6, 1, 0, 0, 0);
+  }
+  
+  /** asserts the chosen blocks with expected priority blocks */
+  private void assertTheChosenBlocks(
+      List<List<Block>> chosenBlocks, int firstPrioritySize,
+      int secondPrioritySize, int thirdPrioritySize, int fourthPrioritySize,
+      int fifthPrioritySize) {
+    assertEquals(
+        "Not returned the expected number of QUEUE_HIGHEST_PRIORITY blocks",
+        firstPrioritySize, chosenBlocks.get(
+            UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY).size());
+    assertEquals(
+        "Not returned the expected number of QUEUE_VERY_UNDER_REPLICATED blocks",
+        secondPrioritySize, chosenBlocks.get(
+            UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED).size());
+    assertEquals(
+        "Not returned the expected number of QUEUE_UNDER_REPLICATED blocks",
+        thirdPrioritySize, chosenBlocks.get(
+            UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED).size());
+    assertEquals(
+        "Not returned the expected number of QUEUE_REPLICAS_BADLY_DISTRIBUTED blocks",
+        fourthPrioritySize, chosenBlocks.get(
+            UnderReplicatedBlocks.QUEUE_REPLICAS_BADLY_DISTRIBUTED).size());
+    assertEquals(
+        "Not returned the expected number of QUEUE_WITH_CORRUPT_BLOCKS blocks",
+        fifthPrioritySize, chosenBlocks.get(
+            UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS).size());
+  }
 }

+ 9 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java

@@ -149,9 +149,7 @@ public class TestNameNodeMetrics {
     fs.delete(file, true);
     filesTotal--; // reduce the filecount for deleted file
     
-    // Wait for more than DATANODE_COUNT replication intervals to ensure all 
-    // the blocks pending deletion are sent for deletion to the datanodes.
-    Thread.sleep(DFS_REPLICATION_INTERVAL * (DATANODE_COUNT + 1) * 1000);
+    waitForDeletion();
     updateMetrics();
     rb = getMetrics(NS_METRICS);
     assertGauge("FilesTotal", filesTotal, rb);
@@ -182,7 +180,7 @@ public class TestNameNodeMetrics {
     assertGauge("PendingReplicationBlocks", 1L, rb);
     assertGauge("ScheduledReplicationBlocks", 1L, rb);
     fs.delete(file, true);
-    updateMetrics();
+    waitForDeletion();
     rb = getMetrics(NS_METRICS);
     assertGauge("CorruptBlocks", 0L, rb);
     assertGauge("PendingReplicationBlocks", 0L, rb);
@@ -221,9 +219,15 @@ public class TestNameNodeMetrics {
     assertGauge("UnderReplicatedBlocks", 1L, rb);
     assertGauge("MissingBlocks", 1L, rb);
     fs.delete(file, true);
-    updateMetrics();
+    waitForDeletion();
     assertGauge("UnderReplicatedBlocks", 0L, getMetrics(NS_METRICS));
   }
+
+  private void waitForDeletion() throws InterruptedException {
+    // Wait for more than DATANODE_COUNT replication intervals to ensure all
+    // the blocks pending deletion are sent for deletion to the datanodes.
+    Thread.sleep(DFS_REPLICATION_INTERVAL * (DATANODE_COUNT + 1) * 1000);
+  }
   
   @Test
   public void testRenameMetrics() throws Exception {