Browse Source

HDFS-3493. Invalidate excess corrupted blocks as long as minimum replication is satisfied. Contributed by Juan Yu and Vinayakumar B.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1602293 13f79535-47bb-0310-9956-ffa450edef68
Andrew Wang 11 years ago
parent
commit
c5de5f87ea

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -172,6 +172,9 @@ Release 2.5.0 - UNRELEASED
     HDFS-6395. Skip checking xattr limits for non-user-visible namespaces.
     (Yi Liu via wang).
 
+    HDFS-3493. Invalidate excess corrupted blocks as long as minimum
+    replication is satisfied. (Juan Yu and Vinayakumar B via wang)
+
   OPTIMIZATIONS
 
     HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)

+ 22 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -1096,8 +1096,9 @@ public class BlockManager {
           + blk + " not found");
       return;
     }
-    markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason,
-        Reason.CORRUPTION_REPORTED), dn, storageID);
+    markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
+        blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
+        dn, storageID);
   }
 
   private void markBlockAsCorrupt(BlockToMarkCorrupt b,
@@ -1123,7 +1124,25 @@ public class BlockManager {
     // Add this replica to corruptReplicas Map
     corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
         b.reasonCode);
-    if (countNodes(b.stored).liveReplicas() >= bc.getBlockReplication()) {
+
+    NumberReplicas numberOfReplicas = countNodes(b.stored);
+    boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >= bc
+        .getBlockReplication();
+    boolean minReplicationSatisfied =
+        numberOfReplicas.liveReplicas() >= minReplication;
+    boolean hasMoreCorruptReplicas = minReplicationSatisfied &&
+        (numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) >
+        bc.getBlockReplication();
+    boolean corruptedDuringWrite = minReplicationSatisfied &&
+        (b.stored.getGenerationStamp() > b.corrupted.getGenerationStamp());
+    // case 1: have enough number of live replicas
+    // case 2: corrupted replicas + live replicas > Replication factor
+    // case 3: Block is marked corrupt due to failure while writing. In this
+    //         case genstamp will be different than that of valid block.
+    // In all these cases we can delete the replica.
+    // In case of 3, rbw block will be deleted and valid block can be replicated
+    if (hasEnoughLiveReplicas || hasMoreCorruptReplicas
+        || corruptedDuringWrite) {
       // the block is over-replicated so invalidate the replicas immediately
       invalidateBlock(b, node);
     } else if (namesystem.isPopulatingReplQueues()) {

+ 64 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java

@@ -25,8 +25,10 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Random;
+import java.util.List;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.logging.Log;
@@ -463,4 +465,66 @@ public class TestReplication {
     }
     fs.delete(fileName, true);
   }
+
+  /**
+   * Test that blocks should get replicated if we have corrupted blocks and
+   * having good replicas at least equal or greater to minreplication
+   *
+   * Simulate rbw blocks by creating dummy copies, then a DN restart to detect
+   * those corrupted blocks asap.
+   */
+  @Test(timeout=30000)
+  public void testReplicationWhenBlockCorruption() throws Exception {
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new HdfsConfiguration();
+      conf.setLong(
+          DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1);
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+      FileSystem fs = cluster.getFileSystem();
+      FSDataOutputStream create = fs.create(new Path("/test"));
+      fs.setReplication(new Path("/test"), (short) 1);
+      create.write(new byte[1024]);
+      create.close();
+
+      List<File> nonParticipatedNodeDirs = new ArrayList<File>();
+      File participatedNodeDirs = null;
+      for (int i = 0; i < cluster.getDataNodes().size(); i++) {
+        File storageDir = cluster.getInstanceStorageDir(i, 0);
+        String bpid = cluster.getNamesystem().getBlockPoolId();
+        File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
+        if (data_dir.listFiles().length == 0) {
+          nonParticipatedNodeDirs.add(data_dir);
+        } else {
+          participatedNodeDirs = data_dir;
+        }
+      }
+
+      String blockFile = null;
+      File[] listFiles = participatedNodeDirs.listFiles();
+      for (File file : listFiles) {
+        if (file.getName().startsWith("blk_")
+            && !file.getName().endsWith("meta")) {
+          blockFile = file.getName();
+          for (File file1 : nonParticipatedNodeDirs) {
+            file1.mkdirs();
+            new File(file1, blockFile).createNewFile();
+            new File(file1, blockFile + "_1000.meta").createNewFile();
+          }
+          break;
+        }
+      }
+
+      fs.setReplication(new Path("/test"), (short) 3);
+      cluster.restartDataNodes(); // Lets detect all DNs about dummy copied
+      // blocks
+      cluster.waitActive();
+      cluster.triggerBlockReports();
+      DFSTestUtil.waitReplication(fs, new Path("/test"), (short) 3);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }

+ 6 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java

@@ -410,6 +410,7 @@ public abstract class BlockReportTestBase {
    * The second datanode is started in the cluster.
    * As soon as the replication process is completed test finds a block from
    * the second DN and sets its GS to be < of original one.
+   * this is the markBlockAsCorrupt case 3 so we expect one pending deletion
    * Block report is forced and the check for # of currupted blocks is performed.
    * Another block is chosen and its length is set to a lesser than original.
    * A check for another corrupted block is performed after yet another
@@ -436,20 +437,20 @@ public abstract class BlockReportTestBase {
     printStats();
 
     assertThat("Wrong number of corrupt blocks",
-               cluster.getNamesystem().getCorruptReplicaBlocks(), is(1L));
+               cluster.getNamesystem().getCorruptReplicaBlocks(), is(0L));
     assertThat("Wrong number of PendingDeletion blocks",
-               cluster.getNamesystem().getPendingDeletionBlocks(), is(0L));
+               cluster.getNamesystem().getPendingDeletionBlocks(), is(1L));
     assertThat("Wrong number of PendingReplication blocks",
                cluster.getNamesystem().getPendingReplicationBlocks(), is(0L));
 
-    reports = getBlockReports(dn, poolId, true, true);
+    reports = getBlockReports(dn, poolId, false, true);
     sendBlockReports(dnR, poolId, reports);
     printStats();
 
     assertThat("Wrong number of corrupt blocks",
-               cluster.getNamesystem().getCorruptReplicaBlocks(), is(2L));
+               cluster.getNamesystem().getCorruptReplicaBlocks(), is(1L));
     assertThat("Wrong number of PendingDeletion blocks",
-               cluster.getNamesystem().getPendingDeletionBlocks(), is(0L));
+               cluster.getNamesystem().getPendingDeletionBlocks(), is(1L));
     assertThat("Wrong number of PendingReplication blocks",
                cluster.getNamesystem().getPendingReplicationBlocks(), is(0L));