Browse Source

svn merge -c 1399965 Merging from trunk to branch-0.23 to fix HDFS-4072.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1453626 13f79535-47bb-0310-9956-ffa450edef68
Kihwal Lee 12 years ago
parent
commit
a22506bea5

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -59,6 +59,9 @@ Release 0.23.7 - UNRELEASED
     HDFS-3266. DFSTestUtil#waitCorruptReplicas doesn't sleep between checks.
     (Madhukara Phatak via atm)
 
+    HDFS-4072. On file deletion remove corresponding blocks pending
+    replications. (Jing Zhao via suresh)
+
 Release 0.23.6 - 2013-02-06
 
   INCOMPATIBLE CHANGES

+ 17 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -1116,7 +1116,7 @@ public class BlockManager {
           // Move the block-replication into a "pending" state.
           // The reason we use 'pending' is so we can retry
           // replications that fail after an appropriate amount of time.
-          pendingReplications.add(block, targets.length);
+          pendingReplications.increment(block, targets.length);
           if(NameNode.stateChangeLog.isDebugEnabled()) {
             NameNode.stateChangeLog.debug(
                 "BLOCK* block " + block
@@ -1162,8 +1162,11 @@ public class BlockManager {
 
   /**
    * Choose target datanodes according to the replication policy.
-   * @throws IOException if the number of targets < minimum replication.
-   * @see BlockPlacementPolicy#chooseTarget(String, int, DatanodeDescriptor, HashMap, long)
+   * 
+   * @throws IOException
+   *           if the number of targets < minimum replication.
+   * @see BlockPlacementPolicy#chooseTarget(String, int, DatanodeDescriptor,
+   *      List, boolean, HashMap, long)
    */
   public DatanodeDescriptor[] chooseTarget(final String src,
       final int numOfReplicas, final DatanodeDescriptor client,
@@ -1696,14 +1699,15 @@ public class BlockManager {
   }
   
   /**
-   * Faster version of {@link addStoredBlock()}, intended for use with 
-   * initial block report at startup.  If not in startup safe mode, will
-   * call standard addStoredBlock().
-   * Assumes this method is called "immediately" so there is no need to
-   * refresh the storedBlock from blocksMap.
-   * Doesn't handle underReplication/overReplication, or worry about
+   * Faster version of
+   * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, DatanodeDescriptor, boolean)}
+   * , intended for use with initial block report at startup. If not in startup
+   * safe mode, will call standard addStoredBlock(). Assumes this method is
+   * called "immediately" so there is no need to refresh the storedBlock from
+   * blocksMap. Doesn't handle underReplication/overReplication, or worry about
    * pendingReplications or corruptReplicas, because it's in startup safe mode.
    * Doesn't log every block, because there are typically millions of them.
+   * 
    * @throws IOException
    */
   private void addStoredBlockImmediate(BlockInfo storedBlock,
@@ -2185,7 +2189,7 @@ public class BlockManager {
     //
     // Modify the blocks->datanode map and node's map.
     //
-    pendingReplications.remove(block);
+    pendingReplications.decrement(block);
 
     // blockReceived reports a finalized block
     Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
@@ -2291,7 +2295,7 @@ public class BlockManager {
   }
 
   /** 
-   * Simpler, faster form of {@link countNodes()} that only returns the number
+   * Simpler, faster form of {@link #countNodes(Block)} that only returns the number
    * of live nodes.  If in startup safemode (or its 30-sec extension period),
    * then it gains speed by ignoring issues of excess replicas or nodes
    * that are decommissioned or in process of becoming decommissioned.
@@ -2440,6 +2444,8 @@ public class BlockManager {
     addToInvalidates(block);
     corruptReplicas.removeFromCorruptReplicasMap(block);
     blocksMap.removeBlock(block);
+    // Remove the block from pendingReplications
+    pendingReplications.remove(block);
   }
 
   public BlockInfo getStoredBlock(Block block) {

+ 15 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java

@@ -72,7 +72,7 @@ class PendingReplicationBlocks {
   /**
    * Add a block to the list of pending Replications
    */
-  void add(Block block, int numReplicas) {
+  void increment(Block block, int numReplicas) {
     synchronized (pendingReplications) {
       PendingBlockInfo found = pendingReplications.get(block);
       if (found == null) {
@@ -89,7 +89,7 @@ class PendingReplicationBlocks {
    * Decrement the number of pending replication requests
    * for this block.
    */
-  void remove(Block block) {
+  void decrement(Block block) {
     synchronized (pendingReplications) {
       PendingBlockInfo found = pendingReplications.get(block);
       if (found != null) {
@@ -104,6 +104,19 @@ class PendingReplicationBlocks {
     }
   }
 
+  /**
+   * Remove the record about the given block from pendingReplications.
+   * @param block The given block whose pending replication requests need to be
+   *              removed
+   */
+  void remove(Block block) {
+    synchronized (pendingReplications) {
+      pendingReplications.remove(block);
+    }
+  }
+
+
+
   /**
    * The total number of blocks that are undergoing replication
    */

+ 8 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java

@@ -23,7 +23,9 @@ import java.lang.System;
 import org.apache.hadoop.hdfs.protocol.Block;
 
 /**
- * This class tests the internals of PendingReplicationBlocks.java
+ * This class tests the internals of PendingReplicationBlocks.java,
+ * as well as how PendingReplicationBlocks acts in BlockManager
+ *
  */
 public class TestPendingReplication extends TestCase {
   final static int TIMEOUT = 3;     // 3 seconds
@@ -38,7 +40,7 @@ public class TestPendingReplication extends TestCase {
     //
     for (int i = 0; i < 10; i++) {
       Block block = new Block(i, i, 0);
-      pendingReplications.add(block, i);
+      pendingReplications.increment(block, i);
     }
     assertEquals("Size of pendingReplications ",
                  10, pendingReplications.size());
@@ -48,15 +50,15 @@ public class TestPendingReplication extends TestCase {
     // remove one item and reinsert it
     //
     Block blk = new Block(8, 8, 0);
-    pendingReplications.remove(blk);             // removes one replica
+    pendingReplications.decrement(blk);             // removes one replica
     assertEquals("pendingReplications.getNumReplicas ",
                  7, pendingReplications.getNumReplicas(blk));
 
     for (int i = 0; i < 7; i++) {
-      pendingReplications.remove(blk);           // removes all replicas
+      pendingReplications.decrement(blk);           // removes all replicas
     }
     assertTrue(pendingReplications.size() == 9);
-    pendingReplications.add(blk, 8);
+    pendingReplications.increment(blk, 8);
     assertTrue(pendingReplications.size() == 10);
 
     //
@@ -84,7 +86,7 @@ public class TestPendingReplication extends TestCase {
 
     for (int i = 10; i < 15; i++) {
       Block block = new Block(i, i, 0);
-      pendingReplications.add(block, i);
+      pendingReplications.increment(block, i);
     }
     assertTrue(pendingReplications.size() == 15);