Browse Source

HADOOP-16785. Improve wasb and abfs resilience on double close() calls.

This hardens the wasb and abfs output streams' resilience to being invoked
in/after close().

wasb:
  Explicity raise IOEs on operations invoked after close,
  rather than implicitly raise NPEs.
  This ensures that invocations which catch and swallow IOEs will perform as
  expected.

abfs:
  When rethrowing an IOException in the close() call, explicitly wrap it
  with a new instance of the same subclass.
  This is needed to handle failures in try-with-resources clauses, where
  any exception in closed() is added as a suppressed exception to the one
  thrown in the try {} clause
  *and you cannot attach the same exception to itself*

Contributed by Steve Loughran.

Change-Id: Ic44b494ff5da332b47d6c198ceb67b965d34dd1b
Steve Loughran 5 years ago
parent
commit
17aa8f6764

+ 1 - 1
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java

@@ -406,7 +406,7 @@ public final class LambdaTestUtils {
       throws Exception {
       throws Exception {
     try {
     try {
       eval.call();
       eval.call();
-      throw new AssertionError("Expected an exception");
+      throw new AssertionError("Expected an exception of type " + clazz);
     } catch (Throwable e) {
     } catch (Throwable e) {
       if (clazz.isAssignableFrom(e.getClass())) {
       if (clazz.isAssignableFrom(e.getClass())) {
         return (E)e;
         return (E)e;

+ 14 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java

@@ -1083,6 +1083,7 @@ public class NativeAzureFileSystem extends FileSystem {
      */
      */
     @Override
     @Override
     public void write(int b) throws IOException {
     public void write(int b) throws IOException {
+      checkOpen();
       try {
       try {
         out.write(b);
         out.write(b);
       } catch(IOException e) {
       } catch(IOException e) {
@@ -1106,6 +1107,7 @@ public class NativeAzureFileSystem extends FileSystem {
      */
      */
     @Override
     @Override
     public void write(byte[] b) throws IOException {
     public void write(byte[] b) throws IOException {
+      checkOpen();
       try {
       try {
         out.write(b);
         out.write(b);
       } catch(IOException e) {
       } catch(IOException e) {
@@ -1136,6 +1138,7 @@ public class NativeAzureFileSystem extends FileSystem {
      */
      */
     @Override
     @Override
     public void write(byte[] b, int off, int len) throws IOException {
     public void write(byte[] b, int off, int len) throws IOException {
+      checkOpen();
       try {
       try {
         out.write(b, off, len);
         out.write(b, off, len);
       } catch(IOException e) {
       } catch(IOException e) {
@@ -1198,6 +1201,17 @@ public class NativeAzureFileSystem extends FileSystem {
     private void restoreKey() throws IOException {
     private void restoreKey() throws IOException {
       store.rename(getEncodedKey(), getKey());
       store.rename(getEncodedKey(), getKey());
     }
     }
+
+    /**
+     * Check for the stream being open.
+     * @throws IOException if the stream is closed.
+     */
+    private void checkOpen() throws IOException {
+      if (out == null) {
+        throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+      }
+    }
+
   }
   }
 
 
   private URI uri;
   private URI uri;

+ 8 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java

@@ -43,6 +43,8 @@ import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.fs.Syncable;
 
 
+import static org.apache.hadoop.io.IOUtils.wrapException;
+
 /**
 /**
  * The BlobFsOutputStream for Rest AbfsClient.
  * The BlobFsOutputStream for Rest AbfsClient.
  */
  */
@@ -246,6 +248,12 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     try {
     try {
       flushInternal(true);
       flushInternal(true);
       threadExecutor.shutdown();
       threadExecutor.shutdown();
+    } catch (IOException e) {
+      // Problems surface in try-with-resources clauses if
+      // the exception thrown in a close == the one already thrown
+      // -so we wrap any exception with a new one.
+      // See HADOOP-16785
+      throw wrapException(path, e.getMessage(), e);
     } finally {
     } finally {
       lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
       lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
       buffer = null;
       buffer = null;

+ 1 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AbstractWasbTestBase.java

@@ -52,7 +52,7 @@ public abstract class AbstractWasbTestBase extends AbstractWasbTestWithTimeout
   @Before
   @Before
   public void setUp() throws Exception {
   public void setUp() throws Exception {
     AzureBlobStorageTestAccount account = createTestAccount();
     AzureBlobStorageTestAccount account = createTestAccount();
-    assumeNotNull(account);
+    assumeNotNull("test account", account);
     bindToTestAccount(account);
     bindToTestAccount(account);
   }
   }
 
 

+ 20 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionHandling.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.azure;
 package org.apache.hadoop.fs.azure;
 
 
 import java.io.FileNotFoundException;
 import java.io.FileNotFoundException;
+import java.io.IOException;
 
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -30,7 +31,9 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.junit.After;
 import org.junit.After;
 import org.junit.Test;
 import org.junit.Test;
 
 
+import static org.apache.hadoop.fs.FSExceptionMessages.STREAM_IS_CLOSED;
 import static org.apache.hadoop.fs.azure.ExceptionHandlingTestHelper.*;
 import static org.apache.hadoop.fs.azure.ExceptionHandlingTestHelper.*;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 
 /**
 /**
  * Single threaded exception handling.
  * Single threaded exception handling.
@@ -265,6 +268,23 @@ public class ITestFileSystemOperationExceptionHandling
     inputStream = fs.open(testPath);
     inputStream = fs.open(testPath);
   }
   }
 
 
+  /**
+   * Attempts to write to the azure stream after it is closed will raise
+   * an IOException.
+   */
+  @Test
+  public void testWriteAfterClose() throws Throwable {
+    FSDataOutputStream out = fs.create(testPath);
+    out.close();
+    intercept(IOException.class, STREAM_IS_CLOSED,
+        () -> out.write('a'));
+    intercept(IOException.class, STREAM_IS_CLOSED,
+        () -> out.write(new byte[]{'a'}));
+    out.hsync();
+    out.flush();
+    out.close();
+  }
+
   @After
   @After
   public void tearDown() throws Exception {
   public void tearDown() throws Exception {
     if (inputStream != null) {
     if (inputStream != null) {

+ 51 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.azurebfs;
 package org.apache.hadoop.fs.azurebfs;
 
 
 import java.io.FileNotFoundException;
 import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.util.EnumSet;
 import java.util.EnumSet;
 
 
 import org.junit.Test;
 import org.junit.Test;
@@ -27,8 +28,10 @@ import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.test.GenericTestUtils;
 
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 
 /**
 /**
  * Test create operation.
  * Test create operation.
@@ -104,4 +107,52 @@ public class ITestAzureBlobFileSystemCreate extends
         .close();
         .close();
     assertIsFile(fs, testFile);
     assertIsFile(fs, testFile);
   }
   }
+
+  /**
+   * Attempts to use to the ABFS stream after it is closed.
+   */
+  @Test
+  public void testWriteAfterClose() throws Throwable {
+    final AzureBlobFileSystem fs = getFileSystem();
+    Path testPath = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
+    FSDataOutputStream out = fs.create(testPath);
+    out.close();
+    intercept(IOException.class, () -> out.write('a'));
+    intercept(IOException.class, () -> out.write(new byte[]{'a'}));
+    // hsync is not ignored on a closed stream
+    // out.hsync();
+    out.flush();
+    out.close();
+  }
+
+  /**
+   * Attempts to double close an ABFS output stream from within a
+   * FilterOutputStream.
+   * That class handles a double failure on close badly if the second
+   * exception rethrows the first.
+   */
+  @Test
+  public void testTryWithResources() throws Throwable {
+    final AzureBlobFileSystem fs = getFileSystem();
+    Path testPath = new Path(TEST_FOLDER_PATH, TEST_CHILD_FILE);
+    try (FSDataOutputStream out = fs.create(testPath)) {
+      out.write('1');
+      out.hsync();
+      // this will cause the next write to failAll
+      fs.delete(testPath, false);
+      out.write('2');
+      out.hsync();
+      fail("Expected a failure");
+    } catch (FileNotFoundException fnfe) {
+      // the exception raised in close() must be in the caught exception's
+      // suppressed list
+      Throwable[] suppressed = fnfe.getSuppressed();
+      assertEquals("suppressed count", 1, suppressed.length);
+      Throwable inner = suppressed[0];
+      if (!(inner instanceof IOException)) {
+        throw inner;
+      }
+      GenericTestUtils.assertExceptionContains(fnfe.getMessage(), inner);
+    }
+  }
 }
 }