Browse Source

HDFS-167. Fix a bug in DFSClient that caused infinite retries on write. Contributed by Bill Zeller

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20@805158 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 16 years ago
parent
commit
1af8ddb29f

+ 3 - 0
CHANGES.txt

@@ -209,6 +209,9 @@ Release 0.20.1 - Unreleased
     MAPREDUCE-805. Fixes some deadlocks in the JobTracker due to the fact
     MAPREDUCE-805. Fixes some deadlocks in the JobTracker due to the fact
     the JobTracker lock hierarchy wasn't maintained in some JobInProgress
     the JobTracker lock hierarchy wasn't maintained in some JobInProgress
     method calls. (Amar Kamat via ddas)
     method calls. (Amar Kamat via ddas)
+
+    HDFS-167. Fix a bug in DFSClient that caused infinite retries on write.
+    (Bill Zeller via szetszwo)
  
  
 Release 0.20.0 - 2009-04-15
 Release 0.20.0 - 2009-04-15
 
 

+ 50 - 23
src/hdfs/org/apache/hadoop/hdfs/DFSClient.java

@@ -67,8 +67,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
   public static final Log LOG = LogFactory.getLog(DFSClient.class);
   public static final Log LOG = LogFactory.getLog(DFSClient.class);
   public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
   public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
   private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
   private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
-  public final ClientProtocol namenode;
-  final private ClientProtocol rpcNamenode;
+  public ClientProtocol namenode;
+  private ClientProtocol rpcNamenode;
   final UnixUserGroupInformation ugi;
   final UnixUserGroupInformation ugi;
   volatile boolean clientRunning = true;
   volatile boolean clientRunning = true;
   Random r = new Random();
   Random r = new Random();
@@ -155,6 +155,31 @@ public class DFSClient implements FSConstants, java.io.Closeable {
   public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
   public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
                    FileSystem.Statistics stats)
                    FileSystem.Statistics stats)
     throws IOException {
     throws IOException {
+    this(conf, stats);
+    this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
+    this.namenode = createNamenode(this.rpcNamenode);
+  }
+
+  /** 
+   * Create a new DFSClient connected to the given namenode
+   * and rpcNamenode objects.
+   * 
+   * This constructor was written to allow easy testing of the DFSClient class.
+   * End users will most likely want to use one of the other constructors.
+   */
+  public DFSClient(ClientProtocol namenode, ClientProtocol rpcNamenode,
+                   Configuration conf, FileSystem.Statistics stats)
+    throws IOException {
+      this(conf, stats);
+      this.namenode = namenode;
+      this.rpcNamenode = rpcNamenode;
+  }
+
+  
+  private DFSClient(Configuration conf, FileSystem.Statistics stats)
+    throws IOException {      
+      
+      
     this.conf = conf;
     this.conf = conf;
     this.stats = stats;
     this.stats = stats;
     this.socketTimeout = conf.getInt("dfs.socket.timeout", 
     this.socketTimeout = conf.getInt("dfs.socket.timeout", 
@@ -174,9 +199,6 @@ public class DFSClient implements FSConstants, java.io.Closeable {
       throw (IOException)(new IOException().initCause(e));
       throw (IOException)(new IOException().initCause(e));
     }
     }
 
 
-    this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
-    this.namenode = createNamenode(rpcNamenode);
-
     String taskId = conf.get("mapred.task.id");
     String taskId = conf.get("mapred.task.id");
     if (taskId != null) {
     if (taskId != null) {
       this.clientName = "DFSClient_" + taskId; 
       this.clientName = "DFSClient_" + taskId; 
@@ -2870,7 +2892,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
   
   
     private LocatedBlock locateFollowingBlock(long start
     private LocatedBlock locateFollowingBlock(long start
                                               ) throws IOException {     
                                               ) throws IOException {     
-      int retries = 5;
+      int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5);
       long sleeptime = 400;
       long sleeptime = 400;
       while (true) {
       while (true) {
         long localstart = System.currentTimeMillis();
         long localstart = System.currentTimeMillis();
@@ -2887,25 +2909,30 @@ public class DFSClient implements FSConstants, java.io.Closeable {
               throw ue; // no need to retry these exceptions
               throw ue; // no need to retry these exceptions
             }
             }
             
             
-            if (--retries == 0 && 
-                !NotReplicatedYetException.class.getName().
+            if (NotReplicatedYetException.class.getName().
                 equals(e.getClassName())) {
                 equals(e.getClassName())) {
-              throw e;
+
+                if (retries == 0) { 
+                  throw e;
+                } else {
+                  --retries;
+                  LOG.info(StringUtils.stringifyException(e));
+                  if (System.currentTimeMillis() - localstart > 5000) {
+                    LOG.info("Waiting for replication for "
+                        + (System.currentTimeMillis() - localstart) / 1000
+                        + " seconds");
+                  }
+                  try {
+                    LOG.warn("NotReplicatedYetException sleeping " + src
+                        + " retries left " + retries);
+                    Thread.sleep(sleeptime);
+                    sleeptime *= 2;
+                  } catch (InterruptedException ie) {
+                  }
+                }
             } else {
             } else {
-              LOG.info(StringUtils.stringifyException(e));
-              if (System.currentTimeMillis() - localstart > 5000) {
-                LOG.info("Waiting for replication for " + 
-                         (System.currentTimeMillis() - localstart)/1000 + 
-                         " seconds");
-              }
-              try {
-                LOG.warn("NotReplicatedYetException sleeping " + src +
-                          " retries left " + retries);
-                Thread.sleep(sleeptime);
-                sleeptime *= 2;
-              } catch (InterruptedException ie) {
-              }
-            }                
+              throw e;
+            }
           }
           }
         }
         }
       } 
       } 

+ 141 - 3
src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java

@@ -21,10 +21,18 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.OutputStream;
 
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
+import org.apache.hadoop.hdfs.server.common.*;
+import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.AccessControlException;
 
 
 import junit.framework.TestCase;
 import junit.framework.TestCase;
 
 
@@ -34,6 +42,8 @@ import junit.framework.TestCase;
  * properly in case of errors.
  * properly in case of errors.
  */
  */
 public class TestDFSClientRetries extends TestCase {
 public class TestDFSClientRetries extends TestCase {
+  public static final Log LOG =
+    LogFactory.getLog(TestDFSClientRetries.class.getName());
   
   
   // writes 'len' bytes of data to out.
   // writes 'len' bytes of data to out.
   private static void writeData(OutputStream out, int len) throws IOException {
   private static void writeData(OutputStream out, int len) throws IOException {
@@ -97,4 +107,132 @@ public class TestDFSClientRetries extends TestCase {
   }
   }
   
   
   // more tests related to different failure cases can be added here.
   // more tests related to different failure cases can be added here.
+  
+  class TestNameNode implements ClientProtocol
+  {
+    int num_calls = 0;
+    
+    // The total number of calls that can be made to addBlock
+    // before an exception is thrown
+    int num_calls_allowed; 
+    public final String ADD_BLOCK_EXCEPTION = "Testing exception thrown from"
+                                             + "TestDFSClientRetries::"
+                                             + "TestNameNode::addBlock";
+    public final String RETRY_CONFIG
+          = "dfs.client.block.write.locateFollowingBlock.retries";
+          
+    public TestNameNode(Configuration conf) throws IOException
+    {
+      // +1 because the configuration value is the number of retries and
+      // the first call is not a retry (e.g., 2 retries == 3 total
+      // calls allowed)
+      this.num_calls_allowed = conf.getInt(RETRY_CONFIG, 5) + 1;
+    }
+
+    public long getProtocolVersion(String protocol, 
+                                     long clientVersion)
+    throws IOException
+    {
+      return versionID;
+    }
+
+    public LocatedBlock addBlock(String src, String clientName)
+    throws IOException
+    {
+      num_calls++;
+      if (num_calls > num_calls_allowed) { 
+        throw new IOException("addBlock called more times than "
+                              + RETRY_CONFIG
+                              + " allows.");
+      } else {
+          throw new RemoteException(NotReplicatedYetException.class.getName(),
+                                    ADD_BLOCK_EXCEPTION);
+      }
+    }
+    
+    
+    // The following methods are stub methods that are not needed by this mock class
+
+    public LocatedBlocks  getBlockLocations(String src, long offset, long length) throws IOException { return null; }
+
+    public void create(String src, FsPermission masked, String clientName, boolean overwrite, short replication, long blockSize) throws IOException {}
+
+    public LocatedBlock append(String src, String clientName) throws IOException { return null; }
+
+    public boolean setReplication(String src, short replication) throws IOException { return false; }
+
+    public void setPermission(String src, FsPermission permission) throws IOException {}
+
+    public void setOwner(String src, String username, String groupname) throws IOException {}
+
+    public void abandonBlock(Block b, String src, String holder) throws IOException {}
+
+    public boolean complete(String src, String clientName) throws IOException { return false; }
+
+    public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {}
+
+    public boolean rename(String src, String dst) throws IOException { return false; }
+
+    public boolean delete(String src) throws IOException { return false; }
+
+    public boolean delete(String src, boolean recursive) throws IOException { return false; }
+
+    public boolean mkdirs(String src, FsPermission masked) throws IOException { return false; }
+
+    public FileStatus[] getListing(String src) throws IOException { return null; }
+
+    public void renewLease(String clientName) throws IOException {}
+
+    public long[] getStats() throws IOException { return null; }
+
+    public DatanodeInfo[] getDatanodeReport(FSConstants.DatanodeReportType type) throws IOException { return null; }
+
+    public long getPreferredBlockSize(String filename) throws IOException { return 0; }
+
+    public boolean setSafeMode(FSConstants.SafeModeAction action) throws IOException { return false; }
+
+    public void saveNamespace() throws IOException {}
+
+    public boolean restoreFailedStorage(String arg) throws AccessControlException { return false; }
+
+    public void refreshNodes() throws IOException {}
+
+    public void finalizeUpgrade() throws IOException {}
+
+    public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) throws IOException { return null; }
+
+    public void metaSave(String filename) throws IOException {}
+
+    public FileStatus getFileInfo(String src) throws IOException { return null; }
+
+    public ContentSummary getContentSummary(String path) throws IOException { return null; }
+
+    public void setQuota(String path, long namespaceQuota, long diskspaceQuota) throws IOException {}
+
+    public void fsync(String src, String client) throws IOException {}
+
+    public void setTimes(String src, long mtime, long atime) throws IOException {}
+
+  }
+  
+  public void testNotYetReplicatedErrors() throws IOException
+  {   
+    Configuration conf = new Configuration();
+    
+    // allow 1 retry (2 total calls)
+    conf.setInt("dfs.client.block.write.locateFollowingBlock.retries", 1);
+        
+    TestNameNode tnn = new TestNameNode(conf);
+    DFSClient client = new DFSClient(tnn, tnn, conf, null);
+    OutputStream os = client.create("testfile", true);
+    os.write(20); // write one random byte
+    
+    try {
+      os.close();
+    } catch (Exception e) {
+      assertTrue("Retries are not being stopped correctly",
+           e.getMessage().equals(tnn.ADD_BLOCK_EXCEPTION));
+    }
+  }
+  
 }
 }