浏览代码

HADOOP-886: make page blob hflush sync on first call, with unit test

Eric Hanson 11 年之前
父节点
当前提交
be21f9efb8

+ 48 - 39
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/azurenative/PageBlobOutputStream.java

@@ -8,6 +8,7 @@ import java.util.concurrent.*;
 
 
 import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.fs.azurenative.StorageInterface.*;
 import org.apache.hadoop.fs.azurenative.StorageInterface.*;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
@@ -77,7 +78,6 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
   /**
   /**
    * The task queue for writing to the server.
    * The task queue for writing to the server.
    */
    */
-  // private final ArrayBlockingQueue<Runnable> ioQueue;
   private final LinkedBlockingQueue<Runnable> ioQueue;
   private final LinkedBlockingQueue<Runnable> ioQueue;
   /**
   /**
    * The thread pool we're using for writing to the server. Note that the IO
    * The thread pool we're using for writing to the server. Note that the IO
@@ -87,6 +87,10 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
    */
    */
   private final ThreadPoolExecutor ioThreadPool;
   private final ThreadPoolExecutor ioThreadPool;
 
 
+  // The last task given to the ioThreadPool to execute, to allow
+  // waiting until it's done.
+  private WriteRequest lastQueuedTask;
+
   public static final Log LOG = LogFactory.getLog(AzureNativeFileSystemStore.class);
   public static final Log LOG = LogFactory.getLog(AzureNativeFileSystemStore.class);
 
 
   // Set the minimum page blob file size to 128MB, which is >> the default block size of 32MB.
   // Set the minimum page blob file size to 128MB, which is >> the default block size of 32MB.
@@ -109,6 +113,7 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
     this.blob = blob;
     this.blob = blob;
     this.outBuffer = new ByteArrayOutputStream();
     this.outBuffer = new ByteArrayOutputStream();
     this.opContext = opContext;
     this.opContext = opContext;
+    this.lastQueuedTask = null;
     // this.ioQueue = new ArrayBlockingQueue<Runnable>(OUTSTANDING_IO_CAPACITY);
     // this.ioQueue = new ArrayBlockingQueue<Runnable>(OUTSTANDING_IO_CAPACITY);
     this.ioQueue = new LinkedBlockingQueue<Runnable>();
     this.ioQueue = new LinkedBlockingQueue<Runnable>();
 
 
@@ -117,6 +122,8 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
     this.ioThreadPool = new ThreadPoolExecutor(1, 1, 2, TimeUnit.SECONDS,
     this.ioThreadPool = new ThreadPoolExecutor(1, 1, 2, TimeUnit.SECONDS,
         ioQueue);
         ioQueue);
 
 
+
+
     // Make page blob files have a size that is the greater of a
     // Make page blob files have a size that is the greater of a
     // minimum size, or the value of fs.azure.page.blob.size from configuration.
     // minimum size, or the value of fs.azure.page.blob.size from configuration.
     long pageBlobConfigSize = conf.getLong("fs.azure.page.blob.size", 0);
     long pageBlobConfigSize = conf.getLong("fs.azure.page.blob.size", 0);
@@ -144,42 +151,37 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
    */
    */
   @Override
   @Override
   public void close() throws IOException {
   public void close() throws IOException {
-    log("Closing page blob output stream.");
-
+    LOG.debug("Closing page blob output stream.");
     flush();
     flush();
     checkStreamState();
     checkStreamState();
     ioThreadPool.shutdown();
     ioThreadPool.shutdown();
     try {
     try {
-      log("Before awaitTermination");
-      log(ioThreadPool.toString());
+      LOG.debug(ioThreadPool.toString());
       if (!ioThreadPool.awaitTermination(10, TimeUnit.MINUTES)) {
       if (!ioThreadPool.awaitTermination(10, TimeUnit.MINUTES)) {
-        log("Timed out after 10 minutes");
+        LOG.debug("Timed out after 10 minutes waiting for IO requests to finish");
         logAllStackTraces();
         logAllStackTraces();
-        log(ioThreadPool.toString());
+        LOG.debug(ioThreadPool.toString());
         throw new IOException("Timed out waiting for IO requests to finish");
         throw new IOException("Timed out waiting for IO requests to finish");
       }
       }
-      log("After awaitTermination");
     } catch (InterruptedException e) {
     } catch (InterruptedException e) {
+      LOG.debug("Caught InterruptedException");
+
+      // Restore the interrupted status
+      Thread.currentThread().interrupt();
     }
     }
 
 
     this.lastError = new IOException("Stream is already closed.");
     this.lastError = new IOException("Stream is already closed.");
   }
   }
 
 
-  // Shorthand for logging, and to allow easy switching to INFO level
-  // for unit testing.
-  private void log(String s) {
-    LOG.debug(s);
-  }
-
   // Log the stacks of all threads.
   // Log the stacks of all threads.
   private void logAllStackTraces() {
   private void logAllStackTraces() {
     Map liveThreads = Thread.getAllStackTraces();
     Map liveThreads = Thread.getAllStackTraces();
     for (Iterator i = liveThreads.keySet().iterator(); i.hasNext(); ) {
     for (Iterator i = liveThreads.keySet().iterator(); i.hasNext(); ) {
       Thread key = (Thread) i.next();
       Thread key = (Thread) i.next();
-      log("Thread " + key.getName());
+      LOG.debug("Thread " + key.getName());
       StackTraceElement[] trace = (StackTraceElement[]) liveThreads.get(key);
       StackTraceElement[] trace = (StackTraceElement[]) liveThreads.get(key);
       for (int j = 0; j < trace.length; j++) {
       for (int j = 0; j < trace.length; j++) {
-        log("\tat " + trace[j]);
+        LOG.debug("\tat " + trace[j]);
       }
       }
     }
     }
   }
   }
@@ -202,7 +204,9 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
     @Override
     @Override
     public void run() {
     public void run() {
       try {
       try {
+        LOG.debug("before runInternal()");
         runInternal();
         runInternal();
+        LOG.debug("after runInternal()");
       } finally {
       } finally {
         doneSignal.countDown();
         doneSignal.countDown();
       }
       }
@@ -292,16 +296,22 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
     private void writePayloadToServer(byte[] rawPayload) {
     private void writePayloadToServer(byte[] rawPayload) {
       final ByteArrayInputStream wrapperStream =
       final ByteArrayInputStream wrapperStream =
                   new ByteArrayInputStream(rawPayload);
                   new ByteArrayInputStream(rawPayload);
+      LOG.debug("writing payload of " + rawPayload.length + " bytes to Azure page blob");
       try {
       try {
+        long start = System.currentTimeMillis();
         blob.uploadPages(wrapperStream, currentBlobOffset, rawPayload.length,
         blob.uploadPages(wrapperStream, currentBlobOffset, rawPayload.length,
             withMD5Checking(), PageBlobOutputStream.this.opContext);
             withMD5Checking(), PageBlobOutputStream.this.opContext);
+        long end = System.currentTimeMillis();
+        LOG.trace("Azure uploadPages time for " + rawPayload.length + " bytes = " + (end - start));
       } catch (IOException ex) {
       } catch (IOException ex) {
+        LOG.debug(ExceptionUtils.getStackTrace(ex));
         lastError = ex;
         lastError = ex;
       } catch (StorageException ex) {
       } catch (StorageException ex) {
-       lastError = new IOException(ex);
+        LOG.debug(ExceptionUtils.getStackTrace(ex));
+        lastError = new IOException(ex);
       }
       }
       if (lastError != null) {
       if (lastError != null) {
-        log("Caught error in PageBlobOutputStream#writePayloadToServer()");
+        LOG.debug("Caught error in PageBlobOutputStream#writePayloadToServer()");
       }
       }
     }
     }
   }
   }
@@ -310,7 +320,8 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
     if (outBuffer.size() == 0) {
     if (outBuffer.size() == 0) {
       return;
       return;
     }
     }
-    ioThreadPool.execute(new WriteRequest(outBuffer.toByteArray()));
+    lastQueuedTask = new WriteRequest(outBuffer.toByteArray());
+    ioThreadPool.execute(lastQueuedTask);
     outBuffer = new ByteArrayOutputStream();
     outBuffer = new ByteArrayOutputStream();
   }
   }
 
 
@@ -418,35 +429,34 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
     }
     }
   }
   }
 
 
-  // Force all data in the output stream to be written to Azure storage.
-  // Wait to return until this is complete.
+  /**
+   * Force all data in the output stream to be written to Azure storage.
+   * Wait to return until this is complete.
+   */
   @Override
   @Override
-  public void hsync() throws IOException {
+  public synchronized void hsync() throws IOException {
     LOG.debug("Entering PageBlobOutputStream#hsync().");
     LOG.debug("Entering PageBlobOutputStream#hsync().");
+    long start = System.currentTimeMillis();
   	flush();
   	flush();
     LOG.debug(ioThreadPool.toString());
     LOG.debug(ioThreadPool.toString());
-  	Runnable[] ioQueueSnapshot = ioQueue.toArray(new Runnable[0]);
-  	LOG.debug("IO queue snapshot length: " + ioQueueSnapshot.length);
-  	if (ioQueueSnapshot.length == 0) {
-  	  return;
-  	}
-  	WriteRequest lastRequest =
-  	  (WriteRequest) ioQueueSnapshot[ioQueueSnapshot.length - 1];
-  	try {
-  		lastRequest.waitTillDone();
-  	} catch (InterruptedException e) {
-
-  	  // Yield, we've been interrupted.
-  	}
-    LOG.debug("Leaving PageBlobOutputStream#hsync().");
+    try {
+      if (lastQueuedTask != null) {
+        lastQueuedTask.waitTillDone();
+      }
+    } catch (InterruptedException e1) {
+
+      // Restore the interrupted status
+      Thread.currentThread().interrupt();
+    }
+    LOG.debug("Leaving PageBlobOutputStream#hsync(). Total hsync duration = "
+  	  + (System.currentTimeMillis() - start) + " msec.");
   }
   }
 
 
   @Override
   @Override
 
 
   public void hflush() throws IOException {
   public void hflush() throws IOException {
-    LOG.debug("PageBlobOutputStream#hflush()");
 
 
-    // HBase relies on hflush() to force data to storage, so call hsync,
+    // hflush is required to force data to storage, so call hsync,
     // which does that.
     // which does that.
     hsync();
     hsync();
   }
   }
@@ -455,7 +465,6 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
   public void sync() throws IOException {
   public void sync() throws IOException {
 
 
     // Sync has been deprecated in favor of hflush.
     // Sync has been deprecated in favor of hflush.
-    LOG.debug("PageBlobOutputStream#sync()");
     hflush();
     hflush();
   }
   }
 
 

+ 70 - 4
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/azurenative/TestReadAndSeekPageBlobAfterWrite.java

@@ -27,11 +27,14 @@ import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.Random;
 import java.util.Random;
 
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azure.AzureException;
 import org.apache.hadoop.fs.azure.AzureException;
+import org.apache.hadoop.util.Time;
 import org.junit.After;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
@@ -41,6 +44,7 @@ import org.junit.Test;
  * or just a part of it.
  * or just a part of it.
  */
  */
 public class TestReadAndSeekPageBlobAfterWrite {
 public class TestReadAndSeekPageBlobAfterWrite {
+  private static final Log LOG = LogFactory.getLog(TestReadAndSeekPageBlobAfterWrite.class);
 
 
   private FileSystem fs;
   private FileSystem fs;
   private AzureBlobStorageTestAccount testAccount;
   private AzureBlobStorageTestAccount testAccount;
@@ -51,7 +55,7 @@ public class TestReadAndSeekPageBlobAfterWrite {
 
 
   // Size of data on page (excluding header)
   // Size of data on page (excluding header)
   private static final int PAGE_DATA_SIZE = PAGE_SIZE - PageBlobFormatHelpers.PAGE_HEADER_SIZE;
   private static final int PAGE_DATA_SIZE = PAGE_SIZE - PageBlobFormatHelpers.PAGE_HEADER_SIZE;
-  private static final int MAX_BYTES = (10000000 / PAGE_SIZE) * PAGE_SIZE; // maximum bytes in a file that we'll test
+  private static final int MAX_BYTES = 33554432; // maximum bytes in a file that we'll test
   private static final int MAX_PAGES = MAX_BYTES / PAGE_SIZE; // maximum number of pages we'll test
   private static final int MAX_PAGES = MAX_BYTES / PAGE_SIZE; // maximum number of pages we'll test
   private Random rand = new Random();
   private Random rand = new Random();
 
 
@@ -72,6 +76,9 @@ public class TestReadAndSeekPageBlobAfterWrite {
     }
     }
     assumeNotNull(testAccount);
     assumeNotNull(testAccount);
 
 
+    // Make sure we are using an integral number of pages.
+    assertEquals(0, MAX_BYTES % PAGE_SIZE);
+
     // load an in-memory array of random data
     // load an in-memory array of random data
     randomData = new byte[PAGE_SIZE * MAX_PAGES];
     randomData = new byte[PAGE_SIZE * MAX_PAGES];
     rand.nextBytes(randomData);
     rand.nextBytes(randomData);
@@ -236,20 +243,62 @@ public class TestReadAndSeekPageBlobAfterWrite {
   // hflush/hsync.
   // hflush/hsync.
   @Test
   @Test
   public void testManySmallWritesWithHFlush() throws IOException {
   public void testManySmallWritesWithHFlush() throws IOException {
-    final int NUM_WRITES = 50;
-    final int RECORD_LENGTH = 100;
-    final int SYNC_INTERVAL = 20;
+    writeAndReadOneFile(50, 100, 20);
+  }
+
+  /**
+   * Write a total of numWrites * recordLength data to a file, read it back,
+   * and check to make sure what was read is the same as what was written.
+   * The syncInterval is the number of writes after which to call hflush to
+   * force the data to storage.
+   */
+  private void writeAndReadOneFile(int numWrites, int recordLength, int syncInterval) throws IOException {
+    final int NUM_WRITES = numWrites;
+    final int RECORD_LENGTH = recordLength;
+    final int SYNC_INTERVAL = syncInterval;
+
+    // A lower bound on the minimum time we think it will take to do
+    // a write to Azure storage.
+    final long MINIMUM_EXPECTED_TIME = 20;
+    LOG.info("Writing " + NUM_WRITES * RECORD_LENGTH + " bytes to " + PATH.getName());
     FSDataOutputStream output = fs.create(PATH);
     FSDataOutputStream output = fs.create(PATH);
+    int writesSinceHFlush = 0;
     try {
     try {
+
+      // Do a flush and hflush to exercise case for empty write queue in PageBlobOutputStream,
+      // to test concurrent execution gates.
+      output.flush();
+      output.hflush();
       for (int i = 0; i < NUM_WRITES; i++) {
       for (int i = 0; i < NUM_WRITES; i++) {
         output.write(randomData, i * RECORD_LENGTH, RECORD_LENGTH);
         output.write(randomData, i * RECORD_LENGTH, RECORD_LENGTH);
+        writesSinceHFlush++;
         output.flush();
         output.flush();
         if ((i % SYNC_INTERVAL) == 0) {
         if ((i % SYNC_INTERVAL) == 0) {
+          long start = Time.monotonicNow();
           output.hflush();
           output.hflush();
+          writesSinceHFlush = 0;
+          long end = Time.monotonicNow();
+
+          // A true, round-trip synchronous flush to Azure must take
+          // a significant amount of time or we are not syncing to storage correctly.
+          LOG.debug("hflush duration = " + (end - start) + " msec.");
+          assertTrue(String.format(
+            "hflush duration of %d, less than minimum expected of %d",
+            end - start, MINIMUM_EXPECTED_TIME),
+            end - start >= MINIMUM_EXPECTED_TIME);
         }
         }
       }
       }
     } finally {
     } finally {
+      long start = Time.monotonicNow();
       output.close();
       output.close();
+      long end = Time.monotonicNow();
+      LOG.debug("close duration = " + (end - start) + " msec.");
+      if (writesSinceHFlush > 0) {
+        assertTrue(String.format(
+            "close duration with >= 1 pending write is %d, less than minimum expected of %d",
+            end - start, MINIMUM_EXPECTED_TIME),
+            end - start >= MINIMUM_EXPECTED_TIME);
+        }
     }
     }
 
 
     // Read the data back and check it.
     // Read the data back and check it.
@@ -263,5 +312,22 @@ public class TestReadAndSeekPageBlobAfterWrite {
     } finally {
     } finally {
       stream.close();
       stream.close();
     }
     }
+
+    // delete the file
+    fs.delete(PATH, false);
+  }
+
+  // Test writing to a large file repeatedly as a stress test.
+  // Set the repetitions to a larger number for manual testing
+  // for a longer stress run.
+  @Test
+  public void testLargeFileStress() throws IOException {
+    int numWrites = 32;
+    int recordSize = 1024 * 1024;
+    int syncInterval = 10;
+    int repetitions = 1;
+    for (int i = 0; i < repetitions; i++) {
+      writeAndReadOneFile(numWrites, recordSize, syncInterval);
+    }
   }
   }
 }
 }