Procházet zdrojové kódy

HADOOP-16004. ABFS: Convert 404 error response in AbfsInputStream and AbfsOutPutStream to FileNotFoundException.

Contributed by Da Zhou.

(cherry picked from commit 346c0c8aff0b206d45f34dbce4fcc81364115d95)
Da Zhou před 6 roky
rodič
revize
f122ae7279

+ 9 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java

@@ -19,13 +19,16 @@
 package org.apache.hadoop.fs.azurebfs.services;
 
 import java.io.EOFException;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.net.HttpURLConnection;
 
 import com.google.common.base.Preconditions;
 
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 
 /**
@@ -225,6 +228,12 @@ public class AbfsInputStream extends FSInputStream {
     try {
       op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag);
     } catch (AzureBlobFileSystemException ex) {
+      if (ex instanceof AbfsRestOperationException) {
+        AbfsRestOperationException ere = (AbfsRestOperationException) ex;
+        if (ere.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
+          throw new FileNotFoundException(ere.getMessage());
+        }
+      }
       throw new IOException(ex);
     }
     long bytesRead = op.getResult().getBytesReceived();

+ 15 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java

@@ -18,9 +18,11 @@
 
 package org.apache.hadoop.fs.azurebfs.services;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.OutputStream;
+import java.net.HttpURLConnection;
 import java.util.Locale;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -33,10 +35,11 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.Syncable;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 
 /**
  * The BlobFsOutputStream for Rest AbfsClient.
@@ -290,6 +293,12 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
       try {
         writeOperation.task.get();
       } catch (Exception ex) {
+        if (ex.getCause() instanceof AbfsRestOperationException) {
+          if (((AbfsRestOperationException) ex.getCause()).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
+            throw new FileNotFoundException(ex.getMessage());
+          }
+        }
+
         if (ex.getCause() instanceof AzureBlobFileSystemException) {
           ex = (AzureBlobFileSystemException) ex.getCause();
         }
@@ -313,6 +322,11 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
     try {
       client.flush(path, offset, retainUncommitedData);
     } catch (AzureBlobFileSystemException ex) {
+      if (ex instanceof AbfsRestOperationException) {
+        if (((AbfsRestOperationException) ex).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
+          throw new FileNotFoundException(ex.getMessage());
+        }
+      }
       throw new IOException(ex);
     }
     this.lastFlushOffset = offset;

+ 48 - 3
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.azurebfs;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Random;
@@ -30,9 +31,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertArrayEquals;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 /**
  * Test end to end between ABFS client and ABFS server.
@@ -136,6 +135,52 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
     inputStream.close();
   }
 
+  @Test
+  public void testReadWithFileNotFoundException() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    final Path testFilePath = new Path(methodName.getMethodName());
+    testWriteOneByteToFile(testFilePath);
+
+    FSDataInputStream inputStream = fs.open(testFilePath, TEST_DEFAULT_BUFFER_SIZE);
+    fs.delete(testFilePath, true);
+    assertFalse(fs.exists(testFilePath));
+
+    intercept(FileNotFoundException.class,
+            () -> inputStream.read(new byte[1]));
+  }
+
+  @Test
+  public void testWriteWithFileNotFoundException() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    final Path testFilePath = new Path(methodName.getMethodName());
+
+    FSDataOutputStream stream = fs.create(testFilePath);
+    assertTrue(fs.exists(testFilePath));
+    stream.write(TEST_BYTE);
+
+    fs.delete(testFilePath, true);
+    assertFalse(fs.exists(testFilePath));
+
+    // trigger append call
+    intercept(FileNotFoundException.class,
+            () -> stream.close());
+  }
+
+  @Test
+  public void testFlushWithFileNotFoundException() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    final Path testFilePath = new Path(methodName.getMethodName());
+
+    FSDataOutputStream stream = fs.create(testFilePath);
+    assertTrue(fs.exists(testFilePath));
+
+    fs.delete(testFilePath, true);
+    assertFalse(fs.exists(testFilePath));
+
+    intercept(FileNotFoundException.class,
+            () -> stream.close());
+  }
+
   private void testWriteOneByteToFile(Path testFilePath) throws Exception {
     final AzureBlobFileSystem fs = getFileSystem();
     try(FSDataOutputStream stream = fs.create(testFilePath)) {