浏览代码

HDFS-927 DFSInputStream retries too many times for new block locations

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20@908204 13f79535-47bb-0310-9956-ffa450edef68
Michael Stack 15 年之前
父节点
当前提交
0022db3630

+ 1 - 0
.eclipse.templates/.classpath

@@ -29,6 +29,7 @@
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/jets3t-0.6.1.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/junit-3.8.1.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/log4j-1.2.15.jar"/>
+	<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/mockito-all-1.8.0.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/oro-2.0.8.jar"/>
   	<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/jetty-6.1.14.jar"/>
   	<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/jetty-util-6.1.14.jar"/>

+ 5 - 1
ivy.xml

@@ -256,6 +256,10 @@
       rev="${slf4j-log4j12.version}"
       conf="common->master">
     </dependency>
-    </dependencies>
+    <dependency org="org.mockito"
+      name="mockito-all"
+      rev="${mockito-all.version}"
+      conf="common->master"/>
+</dependencies>
   
 </ivy-module>

+ 2 - 0
ivy/libraries.properties

@@ -57,6 +57,8 @@ kfs.version=0.1
 log4j.version=1.2.15
 lucene-core.version=2.3.1
 
+mockito-all.version=1.8.0
+
 oro.version=2.0.8
 
 rats-lib.version=0.5.1

+ 21 - 3
src/hdfs/org/apache/hadoop/hdfs/DFSClient.java

@@ -186,9 +186,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
     // dfs.write.packet.size is an internal config variable
     this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
-    this.maxBlockAcquireFailures = 
-                          conf.getInt("dfs.client.max.block.acquire.failures",
-                                      MAX_BLOCK_ACQUIRE_FAILURES);
+    this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);
     
     try {
       this.ugi = UnixUserGroupInformation.login(conf, true);
@@ -218,6 +216,11 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     }
   }
 
+  static int getMaxBlockAcquireFailures(Configuration conf) {
+    return conf.getInt("dfs.client.max.block.acquire.failures",
+                       MAX_BLOCK_ACQUIRE_FAILURES);
+  }
+
   private void checkOpen() throws IOException {
     if (!clientRunning) {
       IOException result = new IOException("Filesystem closed");
@@ -1450,6 +1453,18 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     private Block currentBlock = null;
     private long pos = 0;
     private long blockEnd = -1;
+
+    /**
+     * This variable tracks the number of failures since the start of the
+     * most recent user-facing operation. That is to say, it should be reset
+     * whenever the user makes a call on this stream, and if at any point
+     * during the retry logic, the failure count exceeds a threshold,
+     * the errors will be thrown back to the operation.
+     *
+     * Specifically this counts the number of times the client has gone
+     * back to the namenode to get a new list of block locations, and is
+     * capped at maxBlockAcquireFailures
+     */
     private int failures = 0;
 
     /* XXX Use of CocurrentHashMap is temp fix. Need to fix 
@@ -1742,6 +1757,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
       if (closed) {
         throw new IOException("Stream closed");
       }
+      failures = 0;
+
       if (pos < getFileLength()) {
         int retries = 2;
         while (retries > 0) {
@@ -1885,6 +1902,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
       if (closed) {
         throw new IOException("Stream closed");
       }
+      failures = 0;
       long filelen = getFileLength();
       if ((position < 0) || (position >= filelen)) {
         return -1;

+ 55 - 10
src/test/org/apache/hadoop/hdfs/TestCrcCorruption.java

@@ -23,11 +23,13 @@ import java.nio.channels.FileChannel;
 import java.nio.ByteBuffer;
 import java.util.Random;
 import junit.framework.*;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 
 /**
  * A JUnit test for corrupted file handling.
@@ -57,17 +59,7 @@ import org.apache.hadoop.fs.Path;
  *     replica was created from the non-corrupted replica.
  */
 public class TestCrcCorruption extends TestCase {
-  
-  public TestCrcCorruption(String testName) {
-    super(testName);
-  }
-
-  protected void setUp() throws Exception {
-  }
 
-  protected void tearDown() throws Exception {
-  }
-  
   /** 
    * check if DFS can handle corrupted CRC blocks
    */
@@ -222,4 +214,57 @@ public class TestCrcCorruption extends TestCase {
     DFSTestUtil util2 = new DFSTestUtil("TestCrcCorruption", 40, 3, 400);
     thistest(conf2, util2);
   }
+
+  /**
+   * Make a single-DN cluster, corrupt a block, and make sure
+   * there's no infinite loop, but rather it eventually
+   * reports the exception to the client.
+   */
+  public void testEntirelyCorruptFileOneNode() throws Exception {
+    doTestEntirelyCorruptFile(1);
+  }
+
+  /**
+   * Same thing with multiple datanodes - in history, this has
+   * behaved differently than the above.
+   *
+   * This test usually completes in around 15 seconds - if it
+   * times out, this suggests that the client is retrying
+   * indefinitely.
+   */
+  public void testEntirelyCorruptFileThreeNodes() throws Exception {
+    doTestEntirelyCorruptFile(3);
+  }
+
+  private void doTestEntirelyCorruptFile(int numDataNodes) throws Exception {
+    long fileSize = 4096;
+    Path file = new Path("/testFile");
+
+    Configuration conf = new Configuration();
+    conf.setInt("dfs.replication", numDataNodes);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+
+    try {
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+
+      DFSTestUtil.createFile(fs, file, fileSize, (short)numDataNodes, 12345L /*seed*/);
+      DFSTestUtil.waitReplication(fs, file, (short)numDataNodes);
+
+      String block = DFSTestUtil.getFirstBlock(fs, file).getBlockName();
+      cluster.corruptBlockOnDataNodes(block);
+
+      try {
+        IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(), conf,
+                          true);
+        fail("Didn't get exception");
+      } catch (IOException ioe) {
+        DFSClient.LOG.info("Got expected exception", ioe);
+      }
+
+    } finally {
+      cluster.shutdown();
+    }
+    
+  }
 }

+ 127 - 2
src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java

@@ -20,22 +20,28 @@ package org.apache.hadoop.hdfs;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSClient.DFSInputStream;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
 import org.apache.hadoop.hdfs.server.common.*;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.AccessControlException;
 
 import junit.framework.TestCase;
-
+import static org.mockito.Mockito.*;
+import org.mockito.stubbing.Answer;
+import org.mockito.invocation.InvocationOnMock;
 
 /**
  * These tests make sure that DFSClient retries fetching data from DFS
@@ -234,5 +240,124 @@ public class TestDFSClientRetries extends TestCase {
            e.getMessage().equals(tnn.ADD_BLOCK_EXCEPTION));
     }
   }
-  
+
+  /**
+   * This tests that DFSInputStream failures are counted for a given read
+   * operation, and not over the lifetime of the stream. It is a regression
+   * test for HDFS-127.
+   */
+  public void testFailuresArePerOperation() throws Exception
+  {
+    long fileSize = 4096;
+    Path file = new Path("/testFile");
+
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+
+    int maxBlockAcquires = DFSClient.getMaxBlockAcquireFailures(conf);
+    assertTrue(maxBlockAcquires > 0);
+
+    try {
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+      NameNode preSpyNN = cluster.getNameNode();
+      NameNode spyNN = spy(preSpyNN);
+      DFSClient client = new DFSClient(null, spyNN, conf, null);
+
+      DFSTestUtil.createFile(fs, file, fileSize, (short)1, 12345L /*seed*/);
+
+      // If the client will retry maxBlockAcquires times, then if we fail
+      // any more than that number of times, the operation should entirely
+      // fail.
+      doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires + 1))
+        .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong());
+      try {
+        IOUtils.copyBytes(client.open(file.toString()), new IOUtils.NullOutputStream(), conf,
+                          true);
+        fail("Didn't get exception");
+      } catch (IOException ioe) {
+        DFSClient.LOG.info("Got expected exception", ioe);
+      }
+
+      // If we fail exactly that many times, then it should succeed.
+      doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires))
+        .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong());
+      IOUtils.copyBytes(client.open(file.toString()), new IOUtils.NullOutputStream(), conf,
+                        true);
+
+      DFSClient.LOG.info("Starting test case for failure reset");
+
+      // Now the tricky case - if we fail a few times on one read, then succeed,
+      // then fail some more on another read, it shouldn't fail.
+      doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires))
+        .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong());
+      DFSInputStream is = client.open(file.toString());
+      byte buf[] = new byte[10];
+      IOUtils.readFully(is, buf, 0, buf.length);
+
+      DFSClient.LOG.info("First read successful after some failures.");
+
+      // Further reads at this point will succeed since it has the good block locations.
+      // So, force the block locations on this stream to be refreshed from bad info.
+      // When reading again, it should start from a fresh failure count, since
+      // we're starting a new operation on the user level.
+      doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires))
+        .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong());
+      is.openInfo();
+      // Seek to beginning forces a reopen of the BlockReader - otherwise it'll
+      // just keep reading on the existing stream and the fact that we've poisoned
+      // the block info won't do anything.
+      is.seek(0);
+      IOUtils.readFully(is, buf, 0, buf.length);
+
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Mock Answer implementation of NN.getBlockLocations that will return
+   * a poisoned block list a certain number of times before returning
+   * a proper one.
+   */
+  private static class FailNTimesAnswer implements Answer<LocatedBlocks> {
+    private int failuresLeft;
+    private NameNode realNN;
+
+    public FailNTimesAnswer(NameNode realNN, int timesToFail) {
+      failuresLeft = timesToFail;
+      this.realNN = realNN;
+    }
+
+    public LocatedBlocks answer(InvocationOnMock invocation) throws IOException {
+      Object args[] = invocation.getArguments();
+      LocatedBlocks realAnswer = realNN.getBlockLocations(
+        (String)args[0],
+        (Long)args[1],
+        (Long)args[2]);
+
+      if (failuresLeft-- > 0) {
+        NameNode.LOG.info("FailNTimesAnswer injecting failure.");
+        return makeBadBlockList(realAnswer);
+      }
+      NameNode.LOG.info("FailNTimesAnswer no longer failing.");
+      return realAnswer;
+    }
+
+    private LocatedBlocks makeBadBlockList(LocatedBlocks goodBlockList) {
+      LocatedBlock goodLocatedBlock = goodBlockList.get(0);
+      LocatedBlock badLocatedBlock = new LocatedBlock(
+        goodLocatedBlock.getBlock(),
+        new DatanodeInfo[] {
+          new DatanodeInfo(new DatanodeID("255.255.255.255:234"))
+        },
+        goodLocatedBlock.getStartOffset(),
+        false);
+
+
+      List<LocatedBlock> badBlocks = new ArrayList<LocatedBlock>();
+      badBlocks.add(badLocatedBlock);
+      return new LocatedBlocks(goodBlockList.getFileLength(), badBlocks, false);
+    }
+  }
 }