Browse Source

HDFS-6243. HA NameNode transition to active or shutdown may leave lingering image transfer thread. Contributed by Chris Nauroth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1587410 13f79535-47bb-0310-9956-ffa450edef68
Chris Nauroth 11 years ago
parent
commit
01af3a3177

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -343,6 +343,9 @@ Release 2.5.0 - UNRELEASED
 
     HDFS-6238. TestDirectoryScanner leaks file descriptors. (cnauroth)
 
+    HDFS-6243. HA NameNode transition to active or shutdown may leave lingering
+    image transfer thread. (cnauroth)
+
 Release 2.4.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 37 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java

@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
+import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.hdfs.web.URLConnectionFactory;
 import org.apache.hadoop.io.IOUtils;
@@ -193,14 +194,32 @@ public class TransferFsImage {
    * @param storage the storage directory to transfer the image from
    * @param nnf the NameNodeFile type of the image
    * @param txid the transaction ID of the image to be uploaded
+   * @throws IOException if there is an I/O error
    */
   public static void uploadImageFromStorage(URL fsName, Configuration conf,
       NNStorage storage, NameNodeFile nnf, long txid) throws IOException {
-    
+    uploadImageFromStorage(fsName, conf, storage, nnf, txid, null);
+  }
+
+  /**
+   * Requests that the NameNode download an image from this node.  Allows for
+   * optional external cancelation.
+   *
+   * @param fsName the http address for the remote NN
+   * @param conf Configuration
+   * @param storage the storage directory to transfer the image from
+   * @param nnf the NameNodeFile type of the image
+   * @param txid the transaction ID of the image to be uploaded
+   * @param canceler optional canceler to check for abort of upload
+   * @throws IOException if there is an I/O error or cancellation
+   */
+  public static void uploadImageFromStorage(URL fsName, Configuration conf,
+      NNStorage storage, NameNodeFile nnf, long txid, Canceler canceler)
+      throws IOException {
     URL url = new URL(fsName, ImageServlet.PATH_SPEC);
     long startTime = Time.monotonicNow();
     try {
-      uploadImage(url, conf, storage, nnf, txid);
+      uploadImage(url, conf, storage, nnf, txid, canceler);
     } catch (HttpPutFailedException e) {
       if (e.getResponseCode() == HttpServletResponse.SC_CONFLICT) {
         // this is OK - this means that a previous attempt to upload
@@ -223,7 +242,8 @@ public class TransferFsImage {
    * Uploads the imagefile using HTTP PUT method
    */
   private static void uploadImage(URL url, Configuration conf,
-      NNStorage storage, NameNodeFile nnf, long txId) throws IOException {
+      NNStorage storage, NameNodeFile nnf, long txId, Canceler canceler)
+      throws IOException {
 
     File imageFile = storage.findImageFile(nnf, txId);
     if (imageFile == null) {
@@ -267,7 +287,7 @@ public class TransferFsImage {
       ImageServlet.setVerificationHeadersForPut(connection, imageFile);
 
       // Write the file to output stream.
-      writeFileToPutRequest(conf, connection, imageFile);
+      writeFileToPutRequest(conf, connection, imageFile, canceler);
 
       int responseCode = connection.getResponseCode();
       if (responseCode != HttpURLConnection.HTTP_OK) {
@@ -286,7 +306,7 @@ public class TransferFsImage {
   }
 
   private static void writeFileToPutRequest(Configuration conf,
-      HttpURLConnection connection, File imageFile)
+      HttpURLConnection connection, File imageFile, Canceler canceler)
       throws FileNotFoundException, IOException {
     connection.setRequestProperty(CONTENT_TYPE, "application/octet-stream");
     connection.setRequestProperty(CONTENT_TRANSFER_ENCODING, "binary");
@@ -294,7 +314,7 @@ public class TransferFsImage {
     FileInputStream input = new FileInputStream(imageFile);
     try {
       copyFileToStream(output, imageFile, input,
-          ImageServlet.getThrottler(conf));
+          ImageServlet.getThrottler(conf), canceler);
     } finally {
       IOUtils.closeStream(input);
       IOUtils.closeStream(output);
@@ -308,6 +328,12 @@ public class TransferFsImage {
   public static void copyFileToStream(OutputStream out, File localfile,
       FileInputStream infile, DataTransferThrottler throttler)
     throws IOException {
+    copyFileToStream(out, localfile, infile, throttler, null);
+  }
+
+  private static void copyFileToStream(OutputStream out, File localfile,
+      FileInputStream infile, DataTransferThrottler throttler,
+      Canceler canceler) throws IOException {
     byte buf[] = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE];
     try {
       CheckpointFaultInjector.getInstance()
@@ -324,6 +350,10 @@ public class TransferFsImage {
       }
       int num = 1;
       while (num > 0) {
+        if (canceler != null && canceler.isCancelled()) {
+          throw new SaveNamespaceCancelledException(
+            canceler.getCancellationReason());
+        }
         num = infile.read(buf);
         if (num <= 0) {
           break;
@@ -337,7 +367,7 @@ public class TransferFsImage {
         
         out.write(buf, 0, num);
         if (throttler != null) {
-          throttler.throttle(num);
+          throttler.throttle(num, canceler);
         }
       }
     } finally {

+ 6 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java

@@ -196,13 +196,18 @@ public class StandbyCheckpointer {
       @Override
       public Void call() throws IOException {
         TransferFsImage.uploadImageFromStorage(activeNNAddress, conf,
-            namesystem.getFSImage().getStorage(), imageType, txid);
+            namesystem.getFSImage().getStorage(), imageType, txid, canceler);
         return null;
       }
     });
     executor.shutdown();
     try {
       upload.get();
+    } catch (InterruptedException e) {
+      // The background thread may be blocked waiting in the throttler, so
+      // interrupt it.
+      upload.cancel(true);
+      throw e;
     } catch (ExecutionException e) {
       throw new IOException("Exception during image upload: " + e.getMessage(),
           e.getCause());

+ 16 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/DataTransferThrottler.java

@@ -81,6 +81,19 @@ public class DataTransferThrottler {
    *     number of bytes sent/received since last time throttle was called
    */
   public synchronized void throttle(long numOfBytes) {
+    throttle(numOfBytes, null);
+  }
+
+  /** Given the numOfBytes sent/received since last time throttle was called,
+   * make the current thread sleep if I/O rate is too fast
+   * compared to the given bandwidth.  Allows for optional external cancelation.
+   *
+   * @param numOfBytes
+   *     number of bytes sent/received since last time throttle was called
+   * @param canceler
+   *     optional canceler to check for abort of throttle
+   */
+  public synchronized void throttle(long numOfBytes, Canceler canceler) {
     if ( numOfBytes <= 0 ) {
       return;
     }
@@ -89,6 +102,9 @@ public class DataTransferThrottler {
     bytesAlreadyUsed += numOfBytes;
 
     while (curReserve <= 0) {
+      if (canceler != null && canceler.isCancelled()) {
+        return;
+      }
       long now = monotonicNow();
       long curPeriodEnd = curPeriodStart + period;
 

+ 27 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java

@@ -25,6 +25,9 @@ import static org.junit.Assert.fail;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
 import java.net.URI;
 import java.net.URL;
 import java.util.List;
@@ -59,6 +62,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
@@ -270,6 +274,29 @@ public class TestStandbyCheckpoints {
     HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(104));
     cluster.transitionToStandby(0);
     cluster.transitionToActive(1);
+
+    // Wait to make sure background TransferFsImageUpload thread was cancelled.
+    // This needs to be done before the next test in the suite starts, so that a
+    // file descriptor is not held open during the next cluster init.
+    cluster.shutdown();
+    cluster = null;
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+        ThreadInfo[] threads = threadBean.getThreadInfo(
+          threadBean.getAllThreadIds(), 1);
+        for (ThreadInfo thread: threads) {
+          if (thread.getThreadName().startsWith("TransferFsImageUpload")) {
+            return false;
+          }
+        }
+        return true;
+      }
+    }, 1000, 30000);
+
+    // Assert that former active did not accept the canceled checkpoint file.
+    assertEquals(0, nn0.getFSImage().getMostRecentCheckpointTxId());
   }
   
   /**