Просмотр исходного кода

HDFS-278. HDFS Outputstream close does not hang forever. (dhruba)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@793469 13f79535-47bb-0310-9956-ffa450edef68
Dhruba Borthakur 16 лет назад
Родитель
Сommit
f58775d5c6

+ 2 - 0
CHANGES.txt

@@ -32,6 +32,8 @@ Trunk (unreleased changes)
     HDFS-204. Add a new metrics FilesInGetListingOps to the Namenode.
     HDFS-204. Add a new metrics FilesInGetListingOps to the Namenode.
     (Jitendra Nath Pandey via szetszwo)
     (Jitendra Nath Pandey via szetszwo)
 
 
+    HDFS-278. HDFS Outputstream close does not hang forever. (dhruba)
+
   BUG FIXES
   BUG FIXES
     HDFS-76. Better error message to users when commands fail because of 
     HDFS-76. Better error message to users when commands fail because of 
     lack of quota. Allow quota to be set even if the limit is lower than
     lack of quota. Allow quota to be set even if the limit is lower than

+ 65 - 5
src/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -85,8 +85,12 @@ public class DFSClient implements FSConstants, java.io.Closeable {
   final int writePacketSize;
   final int writePacketSize;
   private final FileSystem.Statistics stats;
   private final FileSystem.Statistics stats;
   private int maxBlockAcquireFailures;
   private int maxBlockAcquireFailures;
-    
- 
+  private final int hdfsTimeout;    // timeout value for a DFS operation.
+
+  /**
+   * The locking hierarchy is to first acquire lock on DFSClient object, followed by 
+   * lock on leasechecker, followed by lock on an individual DFSOutputStream.
+   */
   public static ClientProtocol createNamenode(Configuration conf) throws IOException {
   public static ClientProtocol createNamenode(Configuration conf) throws IOException {
     return createNamenode(NameNode.getAddress(conf), conf);
     return createNamenode(NameNode.getAddress(conf), conf);
   }
   }
@@ -169,7 +173,9 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     this.maxBlockAcquireFailures = 
     this.maxBlockAcquireFailures = 
                           conf.getInt("dfs.client.max.block.acquire.failures",
                           conf.getInt("dfs.client.max.block.acquire.failures",
                                       MAX_BLOCK_ACQUIRE_FAILURES);
                                       MAX_BLOCK_ACQUIRE_FAILURES);
-    
+    // The hdfsTimeout is currently the same as the ipc timeout 
+    this.hdfsTimeout = Client.getTimeout(conf);
+
     try {
     try {
       this.ugi = UnixUserGroupInformation.login(conf, true);
       this.ugi = UnixUserGroupInformation.login(conf, true);
     } catch (LoginException e) {
     } catch (LoginException e) {
@@ -989,6 +995,25 @@ public class DFSClient implements FSConstants, java.io.Closeable {
       }
       }
     }
     }
 
 
+    /**
+     * Abort all open files. Release resources held. Ignore all errors.
+     */
+    synchronized void abort() {
+      clientRunning = false;
+      while (!pendingCreates.isEmpty()) {
+        String src = pendingCreates.firstKey();
+        DFSOutputStream out = (DFSOutputStream)pendingCreates.remove(src);
+        if (out != null) {
+          try {
+            out.abort();
+          } catch (IOException ie) {
+            LOG.error("Exception aborting file " + src+ ": ", ie);
+          }
+        }
+      }
+      RPC.stopProxy(rpcNamenode); // close connections to the namenode
+    }
+
     private void renew() throws IOException {
     private void renew() throws IOException {
       synchronized(this) {
       synchronized(this) {
         if (pendingCreates.isEmpty()) {
         if (pendingCreates.isEmpty()) {
@@ -1004,13 +1029,25 @@ public class DFSClient implements FSConstants, java.io.Closeable {
      */
      */
     public void run() {
     public void run() {
       long lastRenewed = 0;
       long lastRenewed = 0;
+      int renewal = (int)(LEASE_SOFTLIMIT_PERIOD / 2);
+      if (hdfsTimeout > 0) {
+        renewal = Math.min(renewal, hdfsTimeout/2);
+      }
       while (clientRunning && !Thread.interrupted()) {
       while (clientRunning && !Thread.interrupted()) {
-        if (System.currentTimeMillis() - lastRenewed > (LEASE_SOFTLIMIT_PERIOD / 2)) {
+        if (System.currentTimeMillis() - lastRenewed > renewal) {
           try {
           try {
             renew();
             renew();
             lastRenewed = System.currentTimeMillis();
             lastRenewed = System.currentTimeMillis();
+          } catch (SocketTimeoutException ie) {
+            LOG.warn("Problem renewing lease for " + clientName +
+                     " for a period of " + (hdfsTimeout/1000) +
+                     " seconds. Shutting down HDFS client...", ie);
+            abort();
+            break;
           } catch (IOException ie) {
           } catch (IOException ie) {
-            LOG.warn("Problem renewing lease for " + clientName, ie);
+            LOG.warn("Problem renewing lease for " + clientName +
+                     " for a period of " + (hdfsTimeout/1000) +
+                     " seconds. Will retry shortly...", ie);
           }
           }
         }
         }
 
 
@@ -3141,6 +3178,19 @@ public class DFSClient implements FSConstants, java.io.Closeable {
       }
       }
     }
     }
 
 
+    /**
+     * Aborts this output stream and releases any system 
+     * resources associated with this stream.
+     */
+    synchronized void abort() throws IOException {
+      if (closed) {
+        return;
+      }
+      streamer.setLastException(new IOException("Lease timeout of " +
+                               (hdfsTimeout/1000) + " seconds expired."));
+      closeThreads();
+    }
+ 
     // shutdown datastreamer and responseprocessor threads.
     // shutdown datastreamer and responseprocessor threads.
     private void closeThreads() throws IOException {
     private void closeThreads() throws IOException {
       try {
       try {
@@ -3204,6 +3254,16 @@ public class DFSClient implements FSConstants, java.io.Closeable {
       while (!fileComplete) {
       while (!fileComplete) {
         fileComplete = namenode.complete(src, clientName);
         fileComplete = namenode.complete(src, clientName);
         if (!fileComplete) {
         if (!fileComplete) {
+          if (!clientRunning ||
+                (hdfsTimeout > 0 &&
+                 localstart + hdfsTimeout < System.currentTimeMillis())) {
+              String msg = "Unable to close file because dfsclient " +
+                            " was unable to contact the HDFS servers." +
+                            " clientRunning " + clientRunning +
+                            " hdfsTimeout " + hdfsTimeout;
+              LOG.info(msg);
+              throw new IOException(msg);
+          }
           try {
           try {
             Thread.sleep(400);
             Thread.sleep(400);
             if (System.currentTimeMillis() - localstart > 5000) {
             if (System.currentTimeMillis() - localstart > 5000) {

+ 47 - 0
src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java

@@ -842,6 +842,53 @@ public class TestFileCreation extends junit.framework.TestCase {
       dfs.close();
       dfs.close();
     } finally {
     } finally {
       System.out.println("testFsClose successful");
       System.out.println("testFsClose successful");
+      cluster.shutdown();
+    }
+  }
+
+  // test closing file after cluster is shutdown
+  public void testFsCloseAfterClusterShutdown() throws IOException {
+    System.out.println("test testFsCloseAfterClusterShutdown start");
+    final int DATANODE_NUM = 3;
+
+    Configuration conf = new Configuration();
+    conf.setInt("dfs.replication.min", 3);
+    conf.setBoolean("ipc.client.ping", false); // hdfs timeout is default 60 seconds
+    conf.setInt("ipc.ping.interval", 10000); // hdfs timeout is now 10 second
+
+    // create cluster
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, DATANODE_NUM, true, null);
+    DistributedFileSystem dfs = null;
+    try {
+      cluster.waitActive();
+      dfs = (DistributedFileSystem)cluster.getFileSystem();
+
+      // create a new file.
+      final String f = DIR + "dhrubashutdown";
+      final Path fpath = new Path(f);
+      FSDataOutputStream out = TestFileCreation.createFile(dfs, fpath, DATANODE_NUM);
+      out.write("something_dhruba".getBytes());
+      out.sync();    // ensure that block is allocated
+
+      // shutdown last datanode in pipeline.
+      cluster.stopDataNode(2);
+
+      // close file. Since we have set the minReplcatio to 3 but have killed one
+      // of the three datanodes, the close call will loop until the hdfsTimeout is
+      // encountered.
+      boolean hasException = false;
+      try {
+        out.close();
+        System.out.println("testFsCloseAfterClusterShutdown: Error here");
+      } catch (IOException e) {
+        hasException = true;
+      }
+      assertTrue("Failed to close file after cluster shutdown", hasException);
+    } finally {
+      System.out.println("testFsCloseAfterClusterShutdown successful");
+      if (cluster != null) {
+        cluster.shutdown();
+      }
     }
     }
   }
   }
 }
 }