Browse Source

HDFS-1197. Blocks are considered "complete" prematurely after commitBlockSynchronization or DN restart. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security@1167408 13f79535-47bb-0310-9956-ffa450edef68
Jitendra Nath Pandey 13 năm trước cách đây
mục cha
commit
cecfd47bbd

+ 3 - 0
CHANGES.txt

@@ -113,6 +113,9 @@ Release 0.20.205.0 - unreleased
     HDFS-1779. After NameNode restart , Clients can not read partial files even after 
     client invokes Sync. (Uma Maheswara Rao G via jitendra)
 
+    HDFS-1197. Blocks are considered "complete" prematurely after 
+    commitBlockSynchronization or DN restart. (Todd Lipcon via jitendra)
+
   IMPROVEMENTS
 
     MAPREDUCE-2187. Reporter sends progress during sort/merge. (Anupam Seth via

+ 3 - 3
src/hdfs/org/apache/hadoop/hdfs/DFSClient.java

@@ -3137,9 +3137,9 @@ public class DFSClient implements FSConstants, java.io.Closeable {
         nodes = lastBlock.getLocations();
         errorIndex = -1;   // no errors yet.
         if (nodes.length < 1) {
-          throw new IOException("Unable to retrieve blocks locations " +
-                                " for last block " + block +
-                                "of file " + src);
+          throw new IOException("Unable to retrieve blocks locations" +
+                                " for append to last block " + block +
+                                " of file " + src);
         }
         // keep trying to setup a pipeline until you know all DNs are dead
         while (processDatanodeError(true, true)) {

+ 30 - 0
src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -220,6 +220,12 @@ public class DataNode extends Configured
   boolean isBlockTokenEnabled;
   BlockTokenSecretManager blockTokenSecretManager;
   boolean isBlockTokenInitialized = false;
+
+  /**
+   * Testing hook that allows tests to delay the sending of blockReceived RPCs
+   * to the namenode. This can help find bugs in append.
+   */
+  int artificialBlockReceivedDelay = 0;
   
   public DataBlockScanner blockScanner = null;
   public Daemon blockScannerThread = null;
@@ -363,6 +369,10 @@ public class DataNode extends Configured
     // register datanode MXBean
     this.registerMXBean(conf); // register the MXBean for DataNode
     
+    // Allow configuration to delay block reports to find bugs
+    artificialBlockReceivedDelay = conf.getInt(
+        "dfs.datanode.artificialBlockReceivedDelay", 0);
+
     // find free port or use privileged port provide
     ServerSocket ss;
     if(secureResources == null) {
@@ -962,6 +972,7 @@ public class DataNode extends Configured
               receivedBlockList.wait(waitTime);
             } catch (InterruptedException ie) {
             }
+            delayBeforeBlockReceived();
           }
         } // synchronized
       } catch(RemoteException re) {
@@ -981,6 +992,25 @@ public class DataNode extends Configured
     } // while (shouldRun)
   } // offerService
 
+  /**
+   * When a block has been received, we can delay some period of time before
+   * reporting it to the DN, for the purpose of testing. This simulates
+   * the actual latency of blockReceived on a real network (where the client
+   * may be closer to the NN than the DNs).
+   */
+  private void delayBeforeBlockReceived() {
+    if (artificialBlockReceivedDelay > 0 && !receivedBlockList.isEmpty()) {
+      try {
+        long sleepFor = (long)R.nextInt(artificialBlockReceivedDelay);
+        LOG.debug("DataNode " + dnRegistration + " sleeping for " +
+                  "artificial delay: " + sleepFor + " ms");
+        Thread.sleep(sleepFor);
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
   /**
    * Process an array of datanode commands
    * 

+ 103 - 72
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -2241,11 +2241,17 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
         DatanodeDescriptor node =
           datanodeMap.get(newtargets[i].getStorageID());
         if (node != null) {
-          node.addBlock(newblockinfo);
+          if (closeFile) {
+            // If we aren't closing the file, we shouldn't add it to the
+            // block list for the node, since the block is still under
+            // construction there. (in getAdditionalBlock, for example
+            // we don't add to the block map for the targets)
+            node.addBlock(newblockinfo);
+          }
           descriptorsList.add(node);
         } else {
-          LOG.warn("commitBlockSynchronization included a target DN " +
-            newtargets[i] + " which is not known to DN. Ignoring.");
+          LOG.error("commitBlockSynchronization included a target DN " +
+            newtargets[i] + " which is not known to NN. Ignoring.");
         }
       }
       if (!descriptorsList.isEmpty()) {
@@ -3464,41 +3470,82 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
       // then we need to do some special processing.
       storedBlock = blocksMap.getStoredBlockWithoutMatchingGS(block);
 
-      // If the block ID is valid, and it either (a) belongs to a file under
-      // construction, or (b) the reported genstamp is higher than what we
-      // know about, then we accept the block.
-      if (storedBlock != null && storedBlock.getINode() != null &&
-          (storedBlock.getGenerationStamp() <= block.getGenerationStamp() ||
-           storedBlock.getINode().isUnderConstruction())) {
-        NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
-          + "addStoredBlock request received for " + block + " on "
-          + node.getName() + " size " + block.getNumBytes()
-          + " and it belongs to a file under construction. ");
-      } else {
-        storedBlock = null;
+      if (storedBlock == null) {
+        return rejectAddStoredBlock(
+          block, node,
+          "Block not in blockMap with any generation stamp");
+      }
+
+      INodeFile inode = storedBlock.getINode();
+      if (inode == null) {
+        return rejectAddStoredBlock(
+          block, node,
+          "Block does not correspond to any file");
+      }
+
+      boolean reportedOldGS = block.getGenerationStamp() < storedBlock.getGenerationStamp();
+      boolean reportedNewGS = block.getGenerationStamp() > storedBlock.getGenerationStamp();
+      boolean underConstruction = inode.isUnderConstruction();
+      boolean isLastBlock = inode.getLastBlock() != null &&
+        inode.getLastBlock().getBlockId() == block.getBlockId();
+
+      // We can report a stale generation stamp for the last block under construction,
+      // we just need to make sure it ends up in targets.
+      if (reportedOldGS && !(underConstruction && isLastBlock)) {
+        return rejectAddStoredBlock(
+          block, node,
+          "Reported block has old generation stamp but is not the last block of " +
+          "an under-construction file. (current generation is " +
+          storedBlock.getGenerationStamp() + ")");
+      }
+
+      // Don't add blocks to the DN when they're part of the in-progress last block
+      // and have an inconsistent generation stamp. Instead just add them to targets
+      // for recovery purposes. They will get added to the node when
+      // commitBlockSynchronization runs
+      if (underConstruction && isLastBlock && (reportedOldGS || reportedNewGS)) {
+        NameNode.stateChangeLog.info(
+          "BLOCK* NameSystem.addStoredBlock: "
+          + "Targets updated: block " + block + " on " + node.getName() +
+          " is added as a target for block " + storedBlock + " with size " +
+          block.getNumBytes());
+        ((INodeFileUnderConstruction)inode).addTarget(node);
+        return block;
       }
     }
-    if(storedBlock == null || storedBlock.getINode() == null) {
-      // If this block does not belong to anyfile, then we are done.
-      NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
-                                   + "addStoredBlock request received for " 
-                                   + block + " on " + node.getName()
-                                   + " size " + block.getNumBytes()
-                                   + " But it does not belong to any file.");
-      addToInvalidates(block, node);
-      return block;
+
+    INodeFile fileINode = storedBlock.getINode();
+    if (fileINode == null) {
+      return rejectAddStoredBlock(
+        block, node,
+        "Block does not correspond to any file");
     }
-     
-    // add block to the data-node
-    boolean added = node.addBlock(storedBlock);
-    
     assert storedBlock != null : "Block must be stored by now";
 
+    // add block to the data-node
+    boolean added = node.addBlock(storedBlock);    
+
+
+    // Is the block being reported the last block of an underconstruction file?
+    boolean blockUnderConstruction = false;
+    if (fileINode.isUnderConstruction()) {
+      INodeFileUnderConstruction cons = (INodeFileUnderConstruction) fileINode;
+      Block last = fileINode.getLastBlock();
+      if (last == null) {
+        // This should never happen, but better to handle it properly than to throw
+        // an NPE below.
+        LOG.error("Null blocks for reported block=" + block + " stored=" + storedBlock +
+          " inode=" + fileINode);
+        return block;
+      }
+      blockUnderConstruction = last.equals(storedBlock);
+    }
+
+    // block == storedBlock when this addStoredBlock is the result of a block report
     if (block != storedBlock) {
       if (block.getNumBytes() >= 0) {
         long cursize = storedBlock.getNumBytes();
-        INodeFile file = (storedBlock != null) ? storedBlock.getINode() : null;
-        boolean underConstruction = (file == null ? false : file.isUnderConstruction());
+        INodeFile file = storedBlock.getINode();
         if (cursize == 0) {
           storedBlock.setNumBytes(block.getNumBytes());
         } else if (cursize != block.getNumBytes()) {
@@ -3507,43 +3554,39 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
                    " current size is " + cursize +
                    " reported size is " + block.getNumBytes());
           try {
-            if (cursize > block.getNumBytes()) {
+            if (cursize > block.getNumBytes() && !blockUnderConstruction) {
               // new replica is smaller in size than existing block.
               // Mark the new replica as corrupt.
-              if (!underConstruction) {
-                LOG.warn("Mark new replica " + block + " from " + node.getName() + 
-                    "as corrupt because its length is shorter than existing ones");
-                markBlockAsCorrupt(block, node);
-              }
+              LOG.warn("Mark new replica " + block + " from " + node.getName() + 
+                  "as corrupt because its length is shorter than existing ones");
+              markBlockAsCorrupt(block, node);
             } else {
               // new replica is larger in size than existing block.
-              // Mark pre-existing replicas as corrupt.
-              int numNodes = blocksMap.numNodes(block);
-              int count = 0;
-              DatanodeDescriptor nodes[] = new DatanodeDescriptor[numNodes];
-              Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
-              for (; it != null && it.hasNext(); ) {
-                DatanodeDescriptor dd = it.next();
-                if (!dd.equals(node)) {
-                  nodes[count++] = dd;
+              if (!blockUnderConstruction) {
+                // Mark pre-existing replicas as corrupt.
+                int numNodes = blocksMap.numNodes(block);
+                int count = 0;
+                DatanodeDescriptor nodes[] = new DatanodeDescriptor[numNodes];
+                Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
+                for (; it != null && it.hasNext();) {
+                  DatanodeDescriptor dd = it.next();
+                  if (!dd.equals(node)) {
+                    nodes[count++] = dd;
+                  }
+                }
+                for (int j = 0; j < count; j++) {
+                  LOG.warn("Mark existing replica "
+                      + block
+                      + " from "
+                      + node.getName()
+                      + " as corrupt because its length is shorter than the new one");
+                  markBlockAsCorrupt(block, nodes[j]);
                 }
-              }
-              for (int j = 0; j < count && !underConstruction; j++) {
-                LOG.warn("Mark existing replica " + block + " from " + node.getName() + 
-                " as corrupt because its length is shorter than the new one");
-                markBlockAsCorrupt(block, nodes[j]);
               }
               //
               // change the size of block in blocksMap
               //
-              storedBlock = blocksMap.getStoredBlock(block); //extra look up!
-              if (storedBlock == null) {
-                LOG.warn("Block " + block + 
-                   " reported from " + node.getName() + 
-                   " does not exist in blockMap. Surprise! Surprise!");
-              } else {
-                storedBlock.setNumBytes(block.getNumBytes());
-              }
+              storedBlock.setNumBytes(block.getNumBytes());
             }
           } catch (IOException e) {
             LOG.warn("Error in deleting bad block " + block + e);
@@ -3603,21 +3646,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
     // if file is being actively written to, then do not check 
     // replication-factor here. It will be checked when the file is closed.
     //
-    INodeFile fileINode = null;
-    fileINode = storedBlock.getINode();
-    if (fileINode.isUnderConstruction()) {
+    if (blockUnderConstruction) {
       INodeFileUnderConstruction cons = (INodeFileUnderConstruction) fileINode;
-      Block[] blocks = fileINode.getBlocks();
-      // If this is the last block of this
-      // file, then set targets. This enables lease recovery to occur.
-      // This is especially important after a restart of the NN.
-      Block last = blocks[blocks.length-1];
-      if (last.equals(storedBlock)) {
-        Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(last);
-        for (int i = 0; it != null && it.hasNext(); i++) {
-          cons.addTarget(it.next());
-        }
-      }
+      cons.addTarget(node);
       return block;
     }
 

+ 4 - 0
src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -252,6 +252,10 @@ public class MiniDFSCluster {
     conf.setInt("dfs.replication", Math.min(replication, numDataNodes));
     conf.setInt("dfs.safemode.extension", 0);
     conf.setInt("dfs.namenode.decommission.interval", 3); // 3 second
+
+    // Set a small delay on blockReceived in the minicluster to approximate
+    // a real cluster a little better and suss out bugs.
+    conf.setInt("dfs.datanode.artificialBlockReceivedDelay", 5);
     
     // Format and clean out DataNode directories
     if (format) {

+ 4 - 5
src/test/org/apache/hadoop/hdfs/TestFileAppend2.java

@@ -66,11 +66,7 @@ public class TestFileAppend2 extends TestCase {
   int numberOfFiles = 50;
   int numThreads = 10;
   int numAppendsPerThread = 20;
-/***
-  int numberOfFiles = 1;
-  int numThreads = 1;
-  int numAppendsPerThread = 2000;
-****/
+  int artificialBlockReceivedDelay = 50;
   Workload[] workload = null;
   ArrayList<Path> testFiles = new ArrayList<Path>();
   volatile static boolean globalStatus = true;
@@ -376,11 +372,14 @@ public class TestFileAppend2 extends TestCase {
     conf.setInt("dfs.socket.timeout", 30000);
     conf.setInt("dfs.datanode.socket.write.timeout", 30000);
     conf.setInt("dfs.datanode.handler.count", 50);
+    conf.setInt("dfs.datanode.artificialBlockReceivedDelay",
+                artificialBlockReceivedDelay);
     conf.setBoolean("dfs.support.append", true);
 
     MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, 
                                                 true, null);
     cluster.waitActive();
+
     FileSystem fs = cluster.getFileSystem();
 
     try {

+ 141 - 0
src/test/org/apache/hadoop/hdfs/TestFileAppend4.java

@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
 import org.apache.hadoop.hdfs.server.namenode.FSImageAdapter;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -110,6 +111,9 @@ public class TestFileAppend4 extends TestCase {
     // (for cluster.shutdown(); fs.close() idiom)
     conf.setInt("ipc.client.connect.max.retries", 1);
     conf.setInt("dfs.client.block.recovery.retries", 1);
+    // Delay blockReceived calls from DNs to be more similar to a real
+    // cluster. 10ms is enough so that client often gets there first.
+    conf.setInt("dfs.datanode.artificialBlockReceivedDelay", 10);
   }
 
   @Override
@@ -480,10 +484,12 @@ public class TestFileAppend4 extends TestCase {
       LOG.info("START second instance.");
 
       recoverFile(fs1);
+      LOG.info("Recovered file");
       
       // the 2 DNs with the larger sequence number should win
       BlockLocation[] bl = fs1.getFileBlockLocations(
           fs1.getFileStatus(file1), 0, BLOCK_SIZE);
+      LOG.info("Checking blocks");
       assertTrue("Should have one block", bl.length == 1);
       assertTrue("Should have 2 replicas for that block, not " + 
                  bl[0].getNames().length, bl[0].getNames().length == 2);  
@@ -491,6 +497,7 @@ public class TestFileAppend4 extends TestCase {
       assertFileSize(fs1, BLOCK_SIZE*3/4);
       checkFile(fs1, BLOCK_SIZE*3/4);
 
+      LOG.info("Checking replication");
       // verify that, over time, the block has been replicated to 3 DN
       cluster.getNameNode().getNamesystem().restartReplicationWork();
       waitForBlockReplication(fs1, file1.toString(), 3, 20);
@@ -1080,7 +1087,141 @@ public class TestFileAppend4 extends TestCase {
       return invocation.callRealMethod();
     }
   }
+  
+  /**
+   * Test that a file is not considered complete when it only has in-progress
+   * blocks. This ensures that when a block is appended to, it is converted
+   * back into the right kind of "in progress" state.
+   */
+  public void testNotPrematurelyComplete() throws Exception {
+    LOG.info("START");
+    cluster = new MiniDFSCluster(conf, 3, true, null);
+    FileSystem fs1 = cluster.getFileSystem();
+    try {
+      int halfBlock = (int)BLOCK_SIZE/2;
+      short rep = 3; // replication
+      assertTrue(BLOCK_SIZE%4 == 0);
+
+      file1 = new Path("/delayedReceiveBlock");
+
+      // write 1/2 block & close
+      stm = fs1.create(file1, true, (int)BLOCK_SIZE*2, rep, BLOCK_SIZE);
+      AppendTestUtil.write(stm, 0, halfBlock);
+      stm.close();
+
+      NameNode nn = cluster.getNameNode();
+      LOG.info("======== Appending");
+      stm = fs1.append(file1);
+      LOG.info("======== Writing");
+      AppendTestUtil.write(stm, 0, halfBlock/2);
+      LOG.info("======== Checking progress");
+      assertFalse(NameNodeAdapter.checkFileProgress(nn.namesystem, "/delayedReceiveBlock", true));
+      LOG.info("======== Closing");
+      stm.close();
+
+    } finally {
+      LOG.info("======== Cleaning up");
+      fs1.close();
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test that the restart of a DN and the subsequent pipeline recovery do not cause
+   * a file to become prematurely considered "complete". (ie that the block
+   * synchronization as part of pipeline recovery doesn't add the block to the
+   * nodes taking part in recovery)
+   */
+  public void testNotPrematurelyCompleteWithFailure() throws Exception {
+    LOG.info("START");
+    cluster = new MiniDFSCluster(conf, 3, true, null);
+    FileSystem fs1 = cluster.getFileSystem();
+    try {
+      int halfBlock = (int)BLOCK_SIZE/2;
+      short rep = 3; // replication
+      assertTrue(BLOCK_SIZE%4 == 0);
+
+      file1 = new Path("/delayedReceiveBlock");
+
+      // write 1/2 block & close
+      stm = fs1.create(file1, true, (int)BLOCK_SIZE*2, rep, BLOCK_SIZE);
+      AppendTestUtil.write(stm, 0, halfBlock);
+      stm.close();
 
+      NameNode nn = cluster.getNameNode();
+      LOG.info("======== Appending");
+      stm = fs1.append(file1);
+      LOG.info("======== Writing");
+      AppendTestUtil.write(stm, 0, halfBlock/4);
+
+      // restart one of the datanodes and wait for a few of its heartbeats
+      // so that it will report the recovered replica
+      MiniDFSCluster.DataNodeProperties dnprops = cluster.stopDataNode(0);
+      stm.sync();
+      assertTrue(cluster.restartDataNode(dnprops));
+      for (int i = 0; i < 2; i++) {
+        cluster.waitForDNHeartbeat(0, 3000);
+      }
+
+      AppendTestUtil.write(stm, 0, halfBlock/4);
+
+      LOG.info("======== Checking progress");
+      assertFalse(NameNodeAdapter.checkFileProgress(nn.namesystem, "/delayedReceiveBlock", true));
+      LOG.info("======== Closing");
+      stm.close();
+
+    } finally {
+      LOG.info("======== Cleaning up");
+      fs1.close();
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test that the restart of a DN and the subsequent pipeline recovery do not cause
+   * a file to become prematurely considered "complete", when it's a fresh file
+   * with no .append() called.
+   */
+  public void testNotPrematurelyCompleteWithFailureNotReopened() throws Exception {
+    LOG.info("START");
+    cluster = new MiniDFSCluster(conf, 3, true, null);
+    NameNode nn = cluster.getNameNode();
+    FileSystem fs1 = cluster.getFileSystem();
+    try {
+      short rep = 3; // replication
+
+      file1 = new Path("/delayedReceiveBlock");
+
+      stm = fs1.create(file1, true, (int)BLOCK_SIZE*2, rep, 64*1024*1024);
+      LOG.info("======== Writing");
+      AppendTestUtil.write(stm, 0, 1024*1024);
+
+      LOG.info("======== Waiting for a block allocation");
+      waitForBlockReplication(fs1, "/delayedReceiveBlock", 0, 3000);
+
+      LOG.info("======== Checking not complete");
+      assertFalse(NameNodeAdapter.checkFileProgress(nn.namesystem, "/delayedReceiveBlock", true));
+
+      // Stop one of the DNs, don't restart
+      MiniDFSCluster.DataNodeProperties dnprops = cluster.stopDataNode(0);
+
+      // Write some more data
+      AppendTestUtil.write(stm, 0, 1024*1024);
+
+      // Make sure we don't see the file as complete
+      LOG.info("======== Checking progress");
+      assertFalse(NameNodeAdapter.checkFileProgress(nn.namesystem, "/delayedReceiveBlock", true));
+      LOG.info("======== Closing");
+      stm.close();
+
+    } finally {
+      LOG.info("======== Cleaning up");
+      fs1.close();
+      cluster.shutdown();
+    }
+  }
+
+  
   /**
    * Mockito answer helper that will throw an exception a given number
    * of times before eventually succeding.

+ 27 - 0
src/test/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java

@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+import java.io.IOException;
+
+public abstract class NameNodeAdapter {
+  public static boolean checkFileProgress(FSNamesystem fsn, String path, boolean checkall) throws IOException {
+    INodeFile f = fsn.dir.getFileINode(path);
+    return fsn.checkFileProgress(f, checkall);
+  }
+}
+