1
0
فهرست منبع

HADOOP-3649. Fix bug in removing blocks from the corrupted block map. Contributed by Lohit Vijayarenu.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@672976 13f79535-47bb-0310-9956-ffa450edef68
Konstantin Shvachko 17 سال پیش
والد
کامیت
0b244d761d

+ 4 - 1
CHANGES.txt

@@ -46,7 +46,7 @@ Trunk (unreleased changes)
 
     HADOOP-3556. Removed lock contention in MD5Hash by changing the 
     singleton MessageDigester by an instance per Thread using 
-    ThreadLocal. (Ivn de Prado via omalley)
+    ThreadLocal. (Iv?n de Prado via omalley)
 
   BUG FIXES
 
@@ -720,6 +720,9 @@ Release 0.18.0 - Unreleased
 
     HADOOP-3572. SetQuotas usage interface has some minor bugs. (hairong)
 
+    HADOOP-3649. Fix bug in removing blocks from the corrupted block map.
+    (Lohit Vijayarenu via shv)
+
 Release 0.17.1 - Unreleased
 
   INCOMPATIBLE CHANGES

+ 2 - 27
src/hdfs/org/apache/hadoop/dfs/CorruptReplicasMap.java

@@ -20,7 +20,6 @@ package org.apache.hadoop.dfs;
 import org.apache.hadoop.ipc.Server;
 
 import java.util.*;
-import java.io.IOException;
 
 /**
  * Stores information about all corrupt blocks in the File System.
@@ -71,9 +70,6 @@ class CorruptReplicasMap{
    * @param blk Block to be removed
    */
   void removeFromCorruptReplicasMap(Block blk) {
-    FSNamesystem fsNamesystem = FSNamesystem.getFSNamesystem();
-    if (fsNamesystem.blocksMap.contains(blk))
-      return;
     if (corruptReplicasMap != null) {
       corruptReplicasMap.remove(blk);
       NameNode.getNameNodeMetrics().numBlocksCorrupted.set(
@@ -103,29 +99,8 @@ class CorruptReplicasMap{
     return ((nodes != null) && (nodes.contains(node)));
   }
 
-  /**
-   * Invalidate corrupt replicas
-   *
-   * @param blk Block whose corrupt replicas need to be invalidated
-   */
-  void invalidateCorruptReplicas(Block blk) {
-    FSNamesystem fsNamesystem = FSNamesystem.getFSNamesystem();
+  int numCorruptReplicas(Block blk) {
     Collection<DatanodeDescriptor> nodes = getNodes(blk);
-    boolean gotException = false;
-    if (nodes == null)
-      return;
-    for (Iterator<DatanodeDescriptor> it = nodes.iterator(); it.hasNext(); ) {
-      DatanodeDescriptor node = it.next();
-      try {
-        fsNamesystem.invalidateBlock(blk, node);
-      } catch (IOException e) {
-        NameNode.stateChangeLog.info("NameNode.invalidateCorruptReplicas " +
-                                      "error in deleting bad block " + blk +
-                                      " on " + node + e);
-      }
-    }
-    // Remove the block from corruptReplicasMap if empty
-    if (!gotException)
-      removeFromCorruptReplicasMap(blk);
+    return (nodes == null) ? 0 : nodes.size();
   }
 }

+ 45 - 5
src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java

@@ -1345,8 +1345,12 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
                                    "block " + blk + " could not be marked " +
                                    "as corrupt as it does not exists in " +
                                    "blocksMap");
-    else 
+    else {
+      // Add this replica to corruptReplicas Map and 
+      // add the block to neededReplication 
       corruptReplicas.addToCorruptReplicasMap(blk, node);
+      updateNeededReplications(blk, 0, 1);
+    }
   }
 
   /**
@@ -2803,7 +2807,8 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
 
     // filter out containingNodes that are marked for decommission.
     NumberReplicas num = countNodes(storedBlock);
-    int numCurrentReplica = num.liveReplicas()
+    int numLiveReplicas = num.liveReplicas();
+    int numCurrentReplica = numLiveReplicas
       + pendingReplications.getNumReplicas(block);
 
     // check whether safe replication is reached for the block
@@ -2835,11 +2840,45 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
     // If the file replication has reached desired value
     // we can remove any corrupt replicas the block may have
     int corruptReplicasCount = num.corruptReplicas();
-    if ((corruptReplicasCount > 0) && (numCurrentReplica == fileReplication))
-      corruptReplicas.invalidateCorruptReplicas(block);
+    if ((corruptReplicasCount > 0) && (numLiveReplicas == fileReplication)) 
+      invalidateCorruptReplicas(block);
     return block;
   }
 
+  /**
+   * Invalidate corrupt replicas.
+   * <p>
+   * This will remove the replicas from the block's location list,
+   * add them to {@link #recentInvalidateSets} so that they could be further
+   * deleted from the respective data-nodes,
+   * and remove the block from corruptReplicasMap.
+   * <p>
+   * This method should be called when the block has sufficient
+   * number of live replicas.
+   *
+   * @param blk Block whose corrupt replicas need to be invalidated
+   */
+  void invalidateCorruptReplicas(Block blk) {
+    Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk);
+    boolean gotException = false;
+    if (nodes == null)
+      return;
+    for (Iterator<DatanodeDescriptor> it = nodes.iterator(); it.hasNext(); ) {
+      DatanodeDescriptor node = it.next();
+      try {
+        invalidateBlock(blk, node);
+      } catch (IOException e) {
+        NameNode.stateChangeLog.info("NameNode.invalidateCorruptReplicas " +
+                                      "error in deleting bad block " + blk +
+                                      " on " + node + e);
+        gotException = true;
+      }
+    }
+    // Remove the block from corruptReplicasMap
+    if (!gotException)
+      corruptReplicas.removeFromCorruptReplicasMap(blk);
+  }
+
   /**
    * For each block in the name-node verify whether it belongs to any file,
    * over or under replicated. Place it into the respective queue.
@@ -3067,7 +3106,8 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
       }
     }
     // If block is removed from blocksMap, remove it from corruptReplicas
-    corruptReplicas.removeFromCorruptReplicasMap(block);
+    if (fileINode == null)
+      corruptReplicas.removeFromCorruptReplicasMap(block);
   }
 
   /**

+ 143 - 2
src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java

@@ -27,7 +27,6 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.io.*;
 import java.nio.channels.FileChannel;
-import java.nio.ByteBuffer;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
@@ -142,9 +141,10 @@ public class TestDatanodeBlockScanner extends TestCase {
     cluster.shutdown();
   }
 
-  void corruptReplica(String blockName, int replica) throws IOException {
+  boolean corruptReplica(String blockName, int replica) throws IOException {
     Random random = new Random();
     File baseDir = new File(System.getProperty("test.build.data"), "dfs/data");
+    boolean corrupted = false;
     for (int i=replica*2; i<replica*2+2; i++) {
       File blockFile = new File(baseDir, "data" + (i+1)+ "/current/" + 
                                blockName);
@@ -157,8 +157,10 @@ public class TestDatanodeBlockScanner extends TestCase {
         raFile.seek(rand);
         raFile.write(badString.getBytes());
         raFile.close();
+        corrupted = true;
       }
     }
+    return corrupted;
   }
 
   public void testBlockCorruptionPolicy() throws IOException {
@@ -241,4 +243,143 @@ public class TestDatanodeBlockScanner extends TestCase {
 
     cluster.shutdown();
   }
+  
+  /**
+   * testBlockCorruptionRecoveryPolicy.
+   * This tests recovery of corrupt replicas, first for one corrupt replica
+   * then for two. The test invokes blockCorruptionRecoveryPolicy which
+   * 1. Creates a block with desired number of replicas
+   * 2. Corrupts the desired number of replicas and restarts the datanodes
+   *    containing the corrupt replica. Additionaly we also read the block
+   *    in case restarting does not report corrupt replicas.
+   *    Restarting or reading from the datanode would trigger reportBadBlocks 
+   *    to namenode.
+   *    NameNode adds it to corruptReplicasMap and neededReplication
+   * 3. Test waits until all corrupt replicas are reported, meanwhile
+   *    Re-replciation brings the block back to healthy state
+   * 4. Test again waits until the block is reported with expected number
+   *    of good replicas.
+   */
+  public void testBlockCorruptionRecoveryPolicy() throws IOException {
+    // Test recovery of 1 corrupt replica
+    LOG.info("Testing corrupt replica recovery for one corrupt replica");
+    blockCorruptionRecoveryPolicy(4, (short)3, 1);
+
+    // Test recovery of 2 corrupt replicas
+    LOG.info("Testing corrupt replica recovery for two corrupt replicas");
+    blockCorruptionRecoveryPolicy(5, (short)3, 2);
+  }
+  
+  private void blockCorruptionRecoveryPolicy(int numDataNodes, 
+                                             short numReplicas,
+                                             int numCorruptReplicas) 
+                                             throws IOException {
+    Configuration conf = new Configuration();
+    conf.setLong("dfs.blockreport.intervalMsec", 30L);
+    conf.setLong("dfs.replication.interval", 30);
+    conf.setLong("dfs.heartbeat.interval", 30L);
+    conf.setBoolean("dfs.replication.considerLoad", false);
+    Random random = new Random();
+    FileSystem fs = null;
+    DFSClient dfsClient = null;
+    LocatedBlocks blocks = null;
+    int replicaCount = 0;
+    int rand = random.nextInt(numDataNodes);
+
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    Path file1 = new Path("/tmp/testBlockCorruptRecovery/file");
+    DFSTestUtil.createFile(fs, file1, 1024, numReplicas, 0);
+    Block blk = DFSTestUtil.getFirstBlock(fs, file1);
+    String block = blk.getBlockName();
+    
+    dfsClient = new DFSClient(new InetSocketAddress("localhost", 
+                                        cluster.getNameNodePort()), conf);
+    blocks = dfsClient.namenode.
+               getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+    replicaCount = blocks.get(0).getLocations().length;
+
+    // Wait until block is replicated to numReplicas
+    while (replicaCount != numReplicas) {
+      try {
+        LOG.info("Looping until expected replicaCount of " + numReplicas +
+                  "is reached");
+        Thread.sleep(1000);
+      } catch (InterruptedException ignore) {
+      }
+      blocks = dfsClient.namenode.
+                   getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+      replicaCount = blocks.get(0).getLocations().length;
+    }
+    assertTrue(blocks.get(0).isCorrupt() == false);
+
+    // Corrupt numCorruptReplicas replicas of block 
+    int[] corruptReplicasDNIDs = new int[numCorruptReplicas];
+    for (int i=0, j=0; (j != numCorruptReplicas) && (i < numDataNodes); i++) {
+      if (corruptReplica(block, i)) 
+        corruptReplicasDNIDs[j++] = i;
+    }
+    
+    // Restart the datanodes containing corrupt replicas 
+    // so they would be reported to namenode and re-replicated
+    for (int i =0; i < numCorruptReplicas; i++) 
+     cluster.restartDataNode(corruptReplicasDNIDs[i]);
+
+    // Loop until all corrupt replicas are reported
+    int corruptReplicaSize = cluster.getNameNode().namesystem.
+                              corruptReplicas.numCorruptReplicas(blk);
+    while (corruptReplicaSize != numCorruptReplicas) {
+      try {
+        IOUtils.copyBytes(fs.open(file1), new IOUtils.NullOutputStream(), 
+                          conf, true);
+      } catch (IOException e) {
+      }
+      try {
+        LOG.info("Looping until expected " + numCorruptReplicas + " are " +
+                 "reported. Current reported " + corruptReplicaSize);
+        Thread.sleep(1000);
+      } catch (InterruptedException ignore) {
+      }
+      corruptReplicaSize = cluster.getNameNode().namesystem.
+                              corruptReplicas.numCorruptReplicas(blk);
+    }
+    
+    // Loop until the block recovers after replication
+    blocks = dfsClient.namenode.
+               getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+    replicaCount = blocks.get(0).getLocations().length;
+    while (replicaCount != numReplicas) {
+      try {
+        LOG.info("Looping until block gets rereplicated to " + numReplicas);
+        Thread.sleep(1000);
+      } catch (InterruptedException ignore) {
+      }
+      blocks = dfsClient.namenode.
+                 getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+      replicaCount = blocks.get(0).getLocations().length;
+    }
+
+    // Make sure the corrupt replica is invalidated and removed from
+    // corruptReplicasMap
+    corruptReplicaSize = cluster.getNameNode().namesystem.
+                          corruptReplicas.numCorruptReplicas(blk);
+    while (corruptReplicaSize != 0) {
+      try {
+        LOG.info("Looping until corrupt replica is invalidated");
+        Thread.sleep(1000);
+      } catch (InterruptedException ignore) {
+      }
+      corruptReplicaSize = cluster.getNameNode().namesystem.
+                            corruptReplicas.numCorruptReplicas(blk);
+      blocks = dfsClient.namenode.
+                 getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+      replicaCount = blocks.get(0).getLocations().length;
+    }
+    // Make sure block is healthy 
+    assertTrue(corruptReplicaSize == 0);
+    assertTrue(replicaCount == numReplicas);
+    assertTrue(blocks.get(0).isCorrupt() == false);
+    cluster.shutdown();
+  }
 }