浏览代码

HDFS-5438. Flaws in block report processing can cause data loss. Contributed by Kihwal Lee.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1542058 13f79535-47bb-0310-9956-ffa450edef68
Kihwal Lee 11 年之前
父节点
当前提交
8f6ec2d494

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -58,6 +58,8 @@ Release 0.23.10 - UNRELEASED
     HDFS-3970. Fix bug causing rollback of HDFS upgrade to result in bad
     VERSION file. (Vinay and Andrew Wang via atm)
 
+    HDFS-5438. Flaws in block report processing can cause data loss. (kihwal)
+
 Release 0.23.9 - 2013-07-08
 
   INCOMPATIBLE CHANGES

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java

@@ -42,4 +42,8 @@ public class DFSClientFaultInjector {
   public boolean uncorruptPacket() {
     return false;
   }
+
+  public boolean failPacket() {
+    return false;
+  }
 }

+ 33 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

@@ -129,6 +129,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
   private long initialFileSize = 0; // at time of file open
   private Progressable progress;
   private short blockReplication; // replication factor of file
+  private boolean failPacket = false;
   
   private class Packet {
     long    seqno;               // sequencenumber of buffer in block
@@ -703,6 +704,16 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
                                     one.seqno + " but received " + seqno);
             }
             isLastPacketInBlock = one.lastPacketInBlock;
+
+            // Fail the packet write for testing in order to force a
+            // pipeline recovery.
+            if (DFSClientFaultInjector.get().failPacket() &&
+                isLastPacketInBlock) {
+              failPacket = true;
+              throw new IOException(
+                    "Failing the last packet for testing.");
+            }
+              
             // update bytesAcked
             block.setNumBytes(one.getLastByteOffsetBlock());
 
@@ -969,7 +980,18 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
         accessToken = lb.getBlockToken();
         
         // set up the pipeline again with the remaining nodes
-        success = createBlockOutputStream(nodes, newGS, isRecovery);
+        if (failPacket) { // for testing
+          success = createBlockOutputStream(nodes, newGS-1, isRecovery);
+          failPacket = false;
+          try {
+            // Give DNs time to send in bad reports. In real situations,
+            // good reports should follow bad ones, if client committed
+            // with those nodes.
+            Thread.sleep(2000);
+          } catch (InterruptedException ie) {}
+        } else {
+          success = createBlockOutputStream(nodes, newGS, isRecovery);
+        }
       }
 
       if (success) {
@@ -1699,7 +1721,9 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
   // be called during unit tests
   private void completeFile(ExtendedBlock last) throws IOException {
     long localstart = System.currentTimeMillis();
+    long localTimeout = 400;
     boolean fileComplete = false;
+    int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
     while (!fileComplete) {
       fileComplete = dfsClient.namenode.complete(src, dfsClient.clientName, last);
       if (!fileComplete) {
@@ -1714,7 +1738,14 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
             throw new IOException(msg);
         }
         try {
-          Thread.sleep(400);
+          Thread.sleep(localTimeout);
+          if (retries == 0) {
+            throw new IOException("Unable to close file because the last block"
+                + " does not have enough number of replicas.");
+          }
+          retries--;
+          localTimeout *= 2;
+
           if (System.currentTimeMillis() - localstart > 5000) {
             DFSClient.LOG.info("Could not complete file " + src + " retrying...");
           }

+ 29 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java

@@ -205,6 +205,29 @@ public class BlockInfoUnderConstruction extends BlockInfo {
     return blockRecoveryId;
   }
 
+  /**
+   * Process the recorded replicas. When about to commit or finish the
+   * pipeline recovery sort out bad replicas.
+   * @param genStamp  The final generation stamp for the block.
+   */
+  public void setGenerationStampAndVerifyReplicas(long genStamp) {
+    if (replicas == null)
+      return;
+
+    // Remove the replicas with wrong gen stamp.
+    // The replica list is unchanged.
+    for (ReplicaUnderConstruction r : replicas) {
+      if (genStamp != r.getGenerationStamp()) {
+        r.getExpectedLocation().removeBlock(this);
+        NameNode.blockStateChangeLog.info("BLOCK* Removing stale replica "
+            + "from location: " + r);
+      }
+    }
+
+    // Set the generation stamp for the block.
+    setGenerationStamp(genStamp);
+  }
+
   /**
    * Commit block's length and generation stamp as reported by the client.
    * Set block state to {@link BlockUCState#COMMITTED}.
@@ -250,9 +273,13 @@ public class BlockInfoUnderConstruction extends BlockInfo {
   void addReplicaIfNotPresent(DatanodeDescriptor dn,
                      Block block,
                      ReplicaState rState) {
-    for(ReplicaUnderConstruction r : replicas)
-      if(r.getExpectedLocation() == dn)
+    for (ReplicaUnderConstruction r : replicas) {
+      if (r.getExpectedLocation() == dn) {
+        // Record the gen stamp from the report
+        r.setGenerationStamp(block.getGenerationStamp());
         return;
+      }
+    }
     replicas.add(new ReplicaUnderConstruction(block, dn, rState));
   }
 

+ 30 - 14
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
 
 import static org.apache.hadoop.util.ExitUtil.terminate;
 
@@ -844,7 +845,7 @@ public class BlockManager {
             + blk + " not found.");
         return;
       }
-      markBlockAsCorrupt(storedBlock, dn, reason);
+      markBlockAsCorrupt(storedBlock, dn, reason, Reason.CORRUPTION_REPORTED);
     } finally {
       namesystem.writeUnlock();
     }
@@ -852,7 +853,8 @@ public class BlockManager {
 
   private void markBlockAsCorrupt(BlockInfo storedBlock,
                                   DatanodeInfo dn,
-                                  String reason) throws IOException {
+                                  String reason,
+                                  Reason reasonCode) throws IOException {
     assert storedBlock != null : "storedBlock should not be null";
     DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
     if (node == null) {
@@ -876,7 +878,8 @@ public class BlockManager {
     node.addBlock(storedBlock);
 
     // Add this replica to corruptReplicas Map
-    corruptReplicas.addToCorruptReplicasMap(storedBlock, node, reason);
+    corruptReplicas.addToCorruptReplicasMap(storedBlock, node, reason,
+        reasonCode);
     if (countNodes(storedBlock).liveReplicas() >= inode.getReplication()) {
       // the block is over-replicated so invalidate the replicas immediately
       invalidateBlock(storedBlock, node);
@@ -1339,11 +1342,13 @@ public class BlockManager {
   private static class BlockToMarkCorrupt {
     final BlockInfo blockInfo;
     final String reason;
+    final Reason reasonCode;
     
-    BlockToMarkCorrupt(BlockInfo blockInfo, String reason) {
+    BlockToMarkCorrupt(BlockInfo blockInfo, String reason, Reason reasonCode) {
       super();
       this.blockInfo = blockInfo;
       this.reason = reason;
+      this.reasonCode = reasonCode;
     }
   }
 
@@ -1425,7 +1430,7 @@ public class BlockManager {
       addToInvalidates(b, node);
     }
     for (BlockToMarkCorrupt b : toCorrupt) {
-      markBlockAsCorrupt(b.blockInfo, node, b.reason);
+      markBlockAsCorrupt(b.blockInfo, node, b.reason, b.reasonCode);
     }
   }
 
@@ -1459,7 +1464,7 @@ public class BlockManager {
       BlockToMarkCorrupt c = checkReplicaCorrupt(
           iblk, reportedState, storedBlock, ucState, node);
       if (c != null) {
-        markBlockAsCorrupt(c.blockInfo, node, c.reason);
+        markBlockAsCorrupt(c.blockInfo, node, c.reason, c.reasonCode);
         continue;
       }
       
@@ -1590,9 +1595,11 @@ public class BlockManager {
       return storedBlock;
     }
 
-    //add replica if appropriate
+    // Add replica if appropriate. If the replica was previously corrupt
+    // but now okay, it might need to be updated.
     if (reportedState == ReplicaState.FINALIZED
-        && storedBlock.findDatanode(dn) < 0) {
+        && (storedBlock.findDatanode(dn) < 0
+        || corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
       toAdd.add(storedBlock);
     }
     return storedBlock;
@@ -1620,12 +1627,14 @@ public class BlockManager {
           return new BlockToMarkCorrupt(storedBlock,
               "block is " + ucState + " and reported genstamp " +
               iblk.getGenerationStamp() + " does not match " +
-              "genstamp in block map " + storedBlock.getGenerationStamp());
+              "genstamp in block map " + storedBlock.getGenerationStamp(),
+              Reason.GENSTAMP_MISMATCH);
         } else if (storedBlock.getNumBytes() != iblk.getNumBytes()) {
           return new BlockToMarkCorrupt(storedBlock,
               "block is " + ucState + " and reported length " +
               iblk.getNumBytes() + " does not match " +
-              "length in block map " + storedBlock.getNumBytes());
+              "length in block map " + storedBlock.getNumBytes(),
+              Reason.SIZE_MISMATCH);
         } else {
           return null; // not corrupt
         }
@@ -1640,7 +1649,8 @@ public class BlockManager {
         return new BlockToMarkCorrupt(storedBlock,
             "reported " + reportedState + " replica with genstamp " +
             iblk.getGenerationStamp() + " does not match COMPLETE block's " +
-            "genstamp in block map " + storedBlock.getGenerationStamp());
+            "genstamp in block map " + storedBlock.getGenerationStamp(),
+            Reason.GENSTAMP_MISMATCH);
       } else { // COMPLETE block, same genstamp
         if (reportedState == ReplicaState.RBW) {
           // If it's a RBW report for a COMPLETE block, it may just be that
@@ -1653,7 +1663,8 @@ public class BlockManager {
           return null;
         } else {
           return new BlockToMarkCorrupt(storedBlock,
-              "reported replica has invalid state " + reportedState);
+              "reported replica has invalid state " + reportedState,
+              Reason.INVALID_STATE);
         }
       }
     case RUR:       // should not be reported
@@ -1665,7 +1676,7 @@ public class BlockManager {
       // log here at WARN level since this is really a broken HDFS
       // invariant
       LOG.warn(msg);
-      return new BlockToMarkCorrupt(storedBlock, msg);
+      return new BlockToMarkCorrupt(storedBlock, msg, Reason.INVALID_STATE);
     }
   }
 
@@ -1781,6 +1792,11 @@ public class BlockManager {
             storedBlock + " size " + storedBlock.getNumBytes());
       }
     } else {
+      // if the same block is added again and the replica was corrupt
+      // previously because of a wrong gen stamp, remove it from the
+      // corrupt block list.
+      corruptReplicas.removeFromCorruptReplicasMap(block, node,
+          Reason.GENSTAMP_MISMATCH);
       curReplicaDelta = 0;
       blockLog.warn("BLOCK* addStoredBlock: "
           + "Redundant addStoredBlock request received for " + storedBlock
@@ -2219,7 +2235,7 @@ public class BlockManager {
       addToInvalidates(b, node);
     }
     for (BlockToMarkCorrupt b : toCorrupt) {
-      markBlockAsCorrupt(b.blockInfo, node, b.reason);
+      markBlockAsCorrupt(b.blockInfo, node, b.reason, b.reasonCode);
     }
   }
 

+ 50 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java

@@ -36,8 +36,18 @@ import java.util.*;
 @InterfaceAudience.Private
 public class CorruptReplicasMap{
 
-  private SortedMap<Block, Collection<DatanodeDescriptor>> corruptReplicasMap =
-    new TreeMap<Block, Collection<DatanodeDescriptor>>();
+  /** The corruption reason code */
+  public static enum Reason {
+    NONE,                // not specified.
+    ANY,                 // wildcard reason
+    GENSTAMP_MISMATCH,   // mismatch in generation stamps
+    SIZE_MISMATCH,       // mismatch in sizes
+    INVALID_STATE,       // invalid state
+    CORRUPTION_REPORTED  // client or datanode reported the corruption
+  }
+
+  private SortedMap<Block, Map<DatanodeDescriptor, Reason>> corruptReplicasMap =
+    new TreeMap<Block, Map<DatanodeDescriptor, Reason>>();
   
   /**
    * Mark the block belonging to datanode as corrupt.
@@ -48,9 +58,22 @@ public class CorruptReplicasMap{
    */
   public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
       String reason) {
-    Collection<DatanodeDescriptor> nodes = getNodes(blk);
+    addToCorruptReplicasMap(blk, dn, reason, Reason.NONE);
+  }
+
+  /**
+   * Mark the block belonging to datanode as corrupt.
+   *
+   * @param blk Block to be added to CorruptReplicasMap
+   * @param dn DatanodeDescriptor which holds the corrupt replica
+   * @param reason a textual reason (for logging purposes)
+   * @param reasonCode the enum representation of the reason
+   */
+  public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
+      String reason, Reason reasonCode) {
+    Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
     if (nodes == null) {
-      nodes = new TreeSet<DatanodeDescriptor>();
+      nodes = new HashMap<DatanodeDescriptor, Reason>();
       corruptReplicasMap.put(blk, nodes);
     }
     
@@ -61,8 +84,7 @@ public class CorruptReplicasMap{
       reasonText = "";
     }
     
-    if (!nodes.contains(dn)) {
-      nodes.add(dn);
+    if (!nodes.keySet().contains(dn)) {
       NameNode.blockStateChangeLog.info("BLOCK NameSystem.addToCorruptReplicasMap: "+
                                    blk.getBlockName() +
                                    " added as corrupt on " + dn.getName() +
@@ -76,6 +98,8 @@ public class CorruptReplicasMap{
                                    " by " + Server.getRemoteIp() +
                                    reasonText);
     }
+    // Add the node or update the reason.
+    nodes.put(dn, reasonCode);
   }
 
   /**
@@ -97,10 +121,24 @@ public class CorruptReplicasMap{
              false if the replica is not in the map
    */ 
   boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode) {
-    Collection<DatanodeDescriptor> datanodes = corruptReplicasMap.get(blk);
+    return removeFromCorruptReplicasMap(blk, datanode, Reason.ANY);
+  }
+
+  boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode,
+      Reason reason) {
+    Map <DatanodeDescriptor, Reason> datanodes = corruptReplicasMap.get(blk);
+    boolean removed = false;
     if (datanodes==null)
       return false;
-    if (datanodes.remove(datanode)) { // remove the replicas
+
+    // if reasons can be compared but don't match, return false.
+    Reason storedReason = datanodes.get(datanode);
+    if (reason != Reason.ANY && storedReason != null &&
+        reason != storedReason) {
+      return false;
+    }
+
+    if (datanodes.remove(datanode) != null) { // remove the replicas
       if (datanodes.isEmpty()) {
         // remove the block if there is no more corrupted replicas
         corruptReplicasMap.remove(blk);
@@ -118,7 +156,10 @@ public class CorruptReplicasMap{
    * @return collection of nodes. Null if does not exists
    */
   Collection<DatanodeDescriptor> getNodes(Block blk) {
-    return corruptReplicasMap.get(blk);
+    Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
+    if (nodes == null)
+      return null;
+    return nodes.keySet();
   }
 
   /**

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -3937,8 +3937,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
 
     // Update old block with the new generation stamp and new length
-    blockinfo.setGenerationStamp(newBlock.getGenerationStamp());
     blockinfo.setNumBytes(newBlock.getNumBytes());
+    blockinfo.setGenerationStampAndVerifyReplicas(newBlock.getGenerationStamp());
 
     // find the DatanodeDescriptor objects
     final DatanodeManager dm = getBlockManager().getDatanodeManager();

+ 55 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java

@@ -27,8 +27,12 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.io.IOUtils;
 
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+
 import org.junit.Test;
 import org.junit.Assert;
 
@@ -115,4 +119,55 @@ public class TestClientProtocolForPipelineRecovery {
       cluster.shutdown();
     }
   }
+
+  /** Test whether corrupt replicas are detected correctly during pipeline
+   * recoveries.
+   */
+  @Test
+  public void testPipelineRecoveryForLastBlock() throws IOException {
+    DFSClientFaultInjector faultInjector
+        = Mockito.mock(DFSClientFaultInjector.class);
+    DFSClientFaultInjector oldInjector = DFSClientFaultInjector.instance;
+    DFSClientFaultInjector.instance = faultInjector;
+    Configuration conf = new HdfsConfiguration();
+
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 3);
+    MiniDFSCluster cluster = null;
+
+    try {
+      int numDataNodes = 3;
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
+      cluster.waitActive();
+      FileSystem fileSys = cluster.getFileSystem();
+
+      Path file = new Path("dataprotocol1.dat");
+      Mockito.when(faultInjector.failPacket()).thenReturn(true);
+      try {
+        DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
+      } catch (IOException e) {
+        // completeFile() should fail.
+        Assert.assertTrue(e.getMessage().startsWith("Unable to close file"));
+        return;
+      }
+
+      // At this point, NN let data corruption to happen. 
+      // Before failing test, try reading the file. It should fail.
+      FSDataInputStream in = fileSys.open(file);
+      try {
+        int c = in.read();
+        // Test will fail with BlockMissingException if NN does not update the
+        // replica state based on the latest report.
+      } catch (org.apache.hadoop.hdfs.BlockMissingException bme) {
+        Assert.fail("Block is missing because the file was closed with"
+            + " corrupt replicas.");
+      }
+      Assert.fail("The file was closed with corrupt replicas, but read still"
+          + " works!");
+    } finally {
+      DFSClientFaultInjector.instance = oldInjector;
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java

@@ -92,6 +92,10 @@ public class TestCorruptFilesJsp  {
         in.close();
       }
 
+      try {
+        Thread.sleep(3000); // Wait for block reports. They shouldn't matter.
+      } catch (InterruptedException ie) {}
+
       // verify if all corrupt files were reported to NN
       badFiles = namenode.getNamesystem().listCorruptFileBlocks("/", null);
       assertTrue("Expecting 3 corrupt files, but got " + badFiles.size(),