1
0
Просмотр исходного кода

HDFS-3157. Fix a bug in the case that the generation stamps of the stored block in a namenode and the reported block from a datanode do not match. Contributed by Ashish Singhi

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1356086 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 13 лет назад
Родитель
Сommit
28e8151ad3

+ 7 - 3
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -381,6 +381,13 @@ Branch-2 ( Unreleased changes )
     HDFS-3559. DFSTestUtil: use Builder class to construct DFSTestUtil
     instances. (Colin Patrick McCabe via atm)
 
+    HDFS-3551. WebHDFS CREATE should use client location for HTTP redirection.
+    (szetszwo)
+
+    HDFS-3157. Fix a bug in the case that the generation stamps of the stored
+    block in a namenode and the reported block from a datanode do not match.
+    (Ashish Singhi via szetszwo)
+
   BREAKDOWN OF HDFS-3042 SUBTASKS
 
     HDFS-2185. HDFS portion of ZK-based FailoverController (todd)
@@ -399,9 +406,6 @@ Branch-2 ( Unreleased changes )
 
     HDFS-3428. Move DelegationTokenRenewer to common (tucu)
 
-    HDFS-3551. WebHDFS CREATE should use client location for HTTP redirection.
-    (szetszwo)
-
     HDFS-3491. HttpFs does not set permissions correctly (tucu)
 
     HDFS-3580. incompatible types; no instance(s) of type variable(s) V exist 

+ 76 - 58
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.TreeMap;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -928,78 +929,71 @@ public class BlockManager {
           + blk + " not found.");
       return;
     }
-    markBlockAsCorrupt(storedBlock, dn, reason);
+    markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason), dn);
   }
 
-  private void markBlockAsCorrupt(BlockInfo storedBlock,
-                                  DatanodeInfo dn,
-                                  String reason) throws IOException {
-    assert storedBlock != null : "storedBlock should not be null";
+  private void markBlockAsCorrupt(BlockToMarkCorrupt b,
+                                  DatanodeInfo dn) throws IOException {
     DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
     if (node == null) {
-      throw new IOException("Cannot mark block " + 
-                            storedBlock.getBlockName() +
-                            " as corrupt because datanode " + dn +
-                            " does not exist. ");
+      throw new IOException("Cannot mark " + b
+          + " as corrupt because datanode " + dn + " does not exist");
     }
 
-    BlockCollection bc = storedBlock.getBlockCollection();
+    BlockCollection bc = b.corrupted.getBlockCollection();
     if (bc == null) {
-      NameNode.stateChangeLog.info("BLOCK markBlockAsCorrupt: " +
-                                   "block " + storedBlock +
-                                   " could not be marked as corrupt as it" +
-                                   " does not belong to any file");
-      addToInvalidates(storedBlock, node);
+      NameNode.stateChangeLog.info("BLOCK markBlockAsCorrupt: " + b
+          + " cannot be marked as corrupt as it does not belong to any file");
+      addToInvalidates(b.corrupted, node);
       return;
     } 
 
     // Add replica to the data-node if it is not already there
-    node.addBlock(storedBlock);
+    node.addBlock(b.stored);
 
     // Add this replica to corruptReplicas Map
-    corruptReplicas.addToCorruptReplicasMap(storedBlock, node, reason);
-    if (countNodes(storedBlock).liveReplicas() >= bc.getReplication()) {
+    corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason);
+    if (countNodes(b.stored).liveReplicas() >= bc.getReplication()) {
       // the block is over-replicated so invalidate the replicas immediately
-      invalidateBlock(storedBlock, node);
+      invalidateBlock(b, node);
     } else if (namesystem.isPopulatingReplQueues()) {
       // add the block to neededReplication
-      updateNeededReplications(storedBlock, -1, 0);
+      updateNeededReplications(b.stored, -1, 0);
     }
   }
 
   /**
    * Invalidates the given block on the given datanode.
    */
-  private void invalidateBlock(Block blk, DatanodeInfo dn)
-      throws IOException {
-    NameNode.stateChangeLog.info("BLOCK* invalidateBlock: "
-                                 + blk + " on " + dn);
+  private void invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn
+      ) throws IOException {
+    NameNode.stateChangeLog.info("BLOCK* invalidateBlock: " + b + " on " + dn);
     DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
     if (node == null) {
-      throw new IOException("Cannot invalidate block " + blk
+      throw new IOException("Cannot invalidate " + b
           + " because datanode " + dn + " does not exist.");
     }
 
     // Check how many copies we have of the block
-    NumberReplicas nr = countNodes(blk);
+    NumberReplicas nr = countNodes(b.stored);
     if (nr.replicasOnStaleNodes() > 0) {
       NameNode.stateChangeLog.info("BLOCK* invalidateBlocks: postponing " +
-          "invalidation of block " + blk + " on " + dn + " because " +
+          "invalidation of " + b + " on " + dn + " because " +
           nr.replicasOnStaleNodes() + " replica(s) are located on nodes " +
           "with potentially out-of-date block reports.");
-      postponeBlock(blk);
+      postponeBlock(b.corrupted);
 
     } else if (nr.liveReplicas() >= 1) {
       // If we have at least one copy on a live node, then we can delete it.
-      addToInvalidates(blk, dn);
-      removeStoredBlock(blk, node);
+      addToInvalidates(b.corrupted, dn);
+      removeStoredBlock(b.stored, node);
       if(NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug("BLOCK* invalidateBlocks: "
-            + blk + " on " + dn + " listed for deletion.");
+            + b + " on " + dn + " listed for deletion.");
       }
     } else {
-      NameNode.stateChangeLog.info("BLOCK* invalidateBlocks: " + blk + " on "
-          + dn + " is the only copy and was not deleted.");
+      NameNode.stateChangeLog.info("BLOCK* invalidateBlocks: " + b
+          + " on " + dn + " is the only copy and was not deleted.");
     }
   }
 
@@ -1406,14 +1400,37 @@ public class BlockManager {
    * list of blocks that should be considered corrupt due to a block report.
    */
   private static class BlockToMarkCorrupt {
-    final BlockInfo blockInfo;
+    /** The corrupted block in a datanode. */
+    final BlockInfo corrupted;
+    /** The corresponding block stored in the BlockManager. */
+    final BlockInfo stored;
+    /** The reason to mark corrupt. */
     final String reason;
     
-    BlockToMarkCorrupt(BlockInfo blockInfo, String reason) {
-      super();
-      this.blockInfo = blockInfo;
+    BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason) {
+      Preconditions.checkNotNull(corrupted, "corrupted is null");
+      Preconditions.checkNotNull(stored, "stored is null");
+
+      this.corrupted = corrupted;
+      this.stored = stored;
       this.reason = reason;
     }
+
+    BlockToMarkCorrupt(BlockInfo stored, String reason) {
+      this(stored, stored, reason);
+    }
+
+    BlockToMarkCorrupt(BlockInfo stored, long gs, String reason) {
+      this(new BlockInfo(stored), stored, reason);
+      //the corrupted block in datanode has a different generation stamp
+      corrupted.setGenerationStamp(gs);
+    }
+
+    @Override
+    public String toString() {
+      return corrupted + "("
+          + (corrupted == stored? "same as stored": "stored=" + stored) + ")";
+    }
   }
 
   /**
@@ -1534,7 +1551,7 @@ public class BlockManager {
       addToInvalidates(b, node);
     }
     for (BlockToMarkCorrupt b : toCorrupt) {
-      markBlockAsCorrupt(b.blockInfo, node, b.reason);
+      markBlockAsCorrupt(b, node);
     }
   }
 
@@ -1584,7 +1601,7 @@ public class BlockManager {
           queueReportedBlock(node, iblk, reportedState,
               QUEUE_REASON_CORRUPT_STATE);
         } else {
-          markBlockAsCorrupt(c.blockInfo, node, c.reason);
+          markBlockAsCorrupt(c, node);
         }
         continue;
       }
@@ -1805,7 +1822,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
     assert pendingDNMessages.count() == 0;
   }
 
-  /*
+  /**
    * The next two methods test the various cases under which we must conclude
    * the replica is corrupt, or under construction.  These are laid out
    * as switch statements, on the theory that it is easier to understand
@@ -1815,7 +1832,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
    * @return a BlockToMarkCorrupt object, or null if the replica is not corrupt
    */
   private BlockToMarkCorrupt checkReplicaCorrupt(
-      Block iblk, ReplicaState reportedState, 
+      Block reported, ReplicaState reportedState, 
       BlockInfo storedBlock, BlockUCState ucState, 
       DatanodeDescriptor dn) {
     switch(reportedState) {
@@ -1823,15 +1840,16 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
       switch(ucState) {
       case COMPLETE:
       case COMMITTED:
-        if (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()) {
-          return new BlockToMarkCorrupt(storedBlock,
-              "block is " + ucState + " and reported genstamp " +
-              iblk.getGenerationStamp() + " does not match " +
-              "genstamp in block map " + storedBlock.getGenerationStamp());
-        } else if (storedBlock.getNumBytes() != iblk.getNumBytes()) {
+        if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
+          final long reportedGS = reported.getGenerationStamp();
+          return new BlockToMarkCorrupt(storedBlock, reportedGS,
+              "block is " + ucState + " and reported genstamp " + reportedGS
+              + " does not match genstamp in block map "
+              + storedBlock.getGenerationStamp());
+        } else if (storedBlock.getNumBytes() != reported.getNumBytes()) {
           return new BlockToMarkCorrupt(storedBlock,
               "block is " + ucState + " and reported length " +
-              iblk.getNumBytes() + " does not match " +
+              reported.getNumBytes() + " does not match " +
               "length in block map " + storedBlock.getNumBytes());
         } else {
           return null; // not corrupt
@@ -1843,11 +1861,12 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
     case RWR:
       if (!storedBlock.isComplete()) {
         return null; // not corrupt
-      } else if (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()) {
-        return new BlockToMarkCorrupt(storedBlock,
-            "reported " + reportedState + " replica with genstamp " +
-            iblk.getGenerationStamp() + " does not match COMPLETE block's " +
-            "genstamp in block map " + storedBlock.getGenerationStamp());
+      } else if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
+        final long reportedGS = reported.getGenerationStamp();
+        return new BlockToMarkCorrupt(storedBlock, reportedGS,
+            "reported " + reportedState + " replica with genstamp " + reportedGS
+            + " does not match COMPLETE block's genstamp in block map "
+            + storedBlock.getGenerationStamp());
       } else { // COMPLETE block, same genstamp
         if (reportedState == ReplicaState.RBW) {
           // If it's a RBW report for a COMPLETE block, it may just be that
@@ -1869,8 +1888,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
       String msg = "Unexpected replica state " + reportedState
       + " for block: " + storedBlock + 
       " on " + dn + " size " + storedBlock.getNumBytes();
-      // log here at WARN level since this is really a broken HDFS
-      // invariant
+      // log here at WARN level since this is really a broken HDFS invariant
       LOG.warn(msg);
       return new BlockToMarkCorrupt(storedBlock, msg);
     }
@@ -2073,7 +2091,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
    *
    * @param blk Block whose corrupt replicas need to be invalidated
    */
-  private void invalidateCorruptReplicas(Block blk) {
+  private void invalidateCorruptReplicas(BlockInfo blk) {
     Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk);
     boolean gotException = false;
     if (nodes == null)
@@ -2083,7 +2101,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
     DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]);
     for (DatanodeDescriptor node : nodesCopy) {
       try {
-        invalidateBlock(blk, node);
+        invalidateBlock(new BlockToMarkCorrupt(blk, null), node);
       } catch (IOException e) {
         NameNode.stateChangeLog.info("NameNode.invalidateCorruptReplicas " +
                                       "error in deleting bad block " + blk +
@@ -2471,7 +2489,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
       addToInvalidates(b, node);
     }
     for (BlockToMarkCorrupt b : toCorrupt) {
-      markBlockAsCorrupt(b.blockInfo, node, b.reason);
+      markBlockAsCorrupt(b, node);
     }
   }
 

+ 127 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRBWBlockInvalidation.java

@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test when RBW block is removed. Invalidation of the corrupted block happens
+ * and then the under replicated block gets replicated to the datanode.
+ */
+public class TestRBWBlockInvalidation {
+  private static NumberReplicas countReplicas(final FSNamesystem namesystem,
+      ExtendedBlock block) {
+    return namesystem.getBlockManager().countNodes(block.getLocalBlock());
+  }
+
+  /**
+   * Test when a block's replica is removed from RBW folder in one of the
+   * datanode, namenode should ask to invalidate that corrupted block and
+   * schedule replication for one more replica for that under replicated block.
+   */
+  @Test
+  public void testBlockInvalidationWhenRBWReplicaMissedInDN()
+      throws IOException, InterruptedException {
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
+    conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 300);
+    conf.setLong(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1);
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+        .build();
+    FSDataOutputStream out = null;
+    try {
+      final FSNamesystem namesystem = cluster.getNamesystem();
+      FileSystem fs = cluster.getFileSystem();
+      Path testPath = new Path(MiniDFSCluster.getBaseDirectory(), "foo1");
+      out = fs.create(testPath, (short) 2);
+      out.writeBytes("HDFS-3157: " + testPath);
+      out.hsync();
+      cluster.startDataNodes(conf, 1, true, null, null, null);
+      String bpid = namesystem.getBlockPoolId();
+      ExtendedBlock blk = DFSTestUtil.getFirstBlock(fs, testPath);
+      Block block = blk.getLocalBlock();
+      DataNode dn = cluster.getDataNodes().get(0);
+
+      // Delete partial block and its meta information from the RBW folder
+      // of first datanode.
+      File blockFile = DataNodeTestUtils.getBlockFile(dn, bpid, block);
+      File metaFile = DataNodeTestUtils.getMetaFile(dn, bpid, block);
+      assertTrue("Could not delete the block file from the RBW folder",
+          blockFile.delete());
+      assertTrue("Could not delete the block meta file from the RBW folder",
+          metaFile.delete());
+
+      out.close();
+
+      // Check datanode has reported the corrupt block.
+      boolean isCorruptReported = false;
+      while (!isCorruptReported) {
+        if (countReplicas(namesystem, blk).corruptReplicas() > 0) {
+          isCorruptReported = true;
+        }
+        Thread.sleep(100);
+      }
+      assertEquals("There should be 1 replica in the corruptReplicasMap", 1,
+          countReplicas(namesystem, blk).corruptReplicas());
+
+      // Check the block has got replicated to another datanode.
+      blk = DFSTestUtil.getFirstBlock(fs, testPath);
+      boolean isReplicated = false;
+      while (!isReplicated) {
+        if (countReplicas(namesystem, blk).liveReplicas() > 1) {
+          isReplicated = true;
+        }
+        Thread.sleep(100);
+      }
+      assertEquals("There should be two live replicas", 2, countReplicas(
+          namesystem, blk).liveReplicas());
+
+      // sleep for 1 second, so that by this time datanode reports the corrupt
+      // block after a live replica of block got replicated.
+      Thread.sleep(1000);
+
+      // Check that there is no corrupt block in the corruptReplicasMap.
+      assertEquals("There should not be any replica in the corruptReplicasMap",
+          0, countReplicas(namesystem, blk).corruptReplicas());
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+      cluster.shutdown();
+    }
+  }
+}

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java

@@ -137,6 +137,11 @@ public class DataNodeTestUtils {
     return FsDatasetTestUtil.getBlockFile(dn.getFSDataset(), bpid, b);
   }
 
+  public static File getMetaFile(DataNode dn, String bpid, Block b)
+      throws IOException {
+    return FsDatasetTestUtil.getMetaFile(dn.getFSDataset(), bpid, b);
+  }
+  
   public static boolean unlinkBlock(DataNode dn, ExtendedBlock bk, int numLinks
       ) throws IOException {
     return FsDatasetTestUtil.unlinkBlock(dn.getFSDataset(), bk, numLinks);

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java

@@ -37,6 +37,12 @@ public class FsDatasetTestUtil {
     return ((FsDatasetImpl)fsd).getBlockFile(bpid, b);
   }
 
+  public static File getMetaFile(FsDatasetSpi<?> fsd, String bpid, Block b)
+      throws IOException {
+    return FsDatasetUtil.getMetaFile(getBlockFile(fsd, bpid, b), b
+        .getGenerationStamp());
+  }
+  
   public static boolean unlinkBlock(FsDatasetSpi<?> fsd,
       ExtendedBlock block, int numLinks) throws IOException {
     final ReplicaInfo info = ((FsDatasetImpl)fsd).getReplicaInfo(block);