|
@@ -19,6 +19,7 @@
|
|
|
package org.apache.hadoop.fs.azure;
|
|
|
|
|
|
import java.io.DataInputStream;
|
|
|
+import java.io.EOFException;
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
@@ -49,6 +50,7 @@ import org.apache.hadoop.fs.BufferedFSInputStream;
|
|
|
import org.apache.hadoop.fs.CreateFlag;
|
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
+import org.apache.hadoop.fs.FSExceptionMessages;
|
|
|
import org.apache.hadoop.fs.FSInputStream;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
@@ -62,7 +64,6 @@ import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper;
|
|
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
-
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.codehaus.jackson.JsonNode;
|
|
@@ -74,9 +75,11 @@ import org.codehaus.jackson.map.ObjectMapper;
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.microsoft.azure.storage.AccessCondition;
|
|
|
import com.microsoft.azure.storage.OperationContext;
|
|
|
+import com.microsoft.azure.storage.StorageErrorCode;
|
|
|
import com.microsoft.azure.storage.StorageException;
|
|
|
import com.microsoft.azure.storage.blob.CloudBlob;
|
|
|
-import com.microsoft.azure.storage.core.*;
|
|
|
+import com.microsoft.azure.storage.StorageErrorCodeStrings;
|
|
|
+import org.apache.hadoop.io.IOUtils;
|
|
|
|
|
|
/**
|
|
|
* A {@link FileSystem} for reading and writing files stored on <a
|
|
@@ -88,7 +91,6 @@ import com.microsoft.azure.storage.core.*;
|
|
|
@InterfaceStability.Stable
|
|
|
public class NativeAzureFileSystem extends FileSystem {
|
|
|
private static final int USER_WX_PERMISION = 0300;
|
|
|
-
|
|
|
/**
|
|
|
* A description of a folder rename operation, including the source and
|
|
|
* destination keys, and descriptions of the files in the source folder.
|
|
@@ -712,7 +714,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
|
* @returns int An integer corresponding to the byte read.
|
|
|
*/
|
|
|
@Override
|
|
|
- public synchronized int read() throws IOException {
|
|
|
+ public synchronized int read() throws FileNotFoundException, IOException {
|
|
|
try {
|
|
|
int result = 0;
|
|
|
result = in.read();
|
|
@@ -726,13 +728,21 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
|
//
|
|
|
return result;
|
|
|
} catch(IOException e) {
|
|
|
- if (e.getCause() instanceof StorageException) {
|
|
|
- StorageException storageExcp = (StorageException) e.getCause();
|
|
|
+
|
|
|
+ Throwable innerException = checkForAzureStorageException(e);
|
|
|
+
|
|
|
+ if (innerException instanceof StorageException) {
|
|
|
+
|
|
|
LOG.error("Encountered Storage Exception for read on Blob : {}"
|
|
|
+ " Exception details: {} Error Code : {}",
|
|
|
- key, e.getMessage(), storageExcp.getErrorCode());
|
|
|
+ key, e, ((StorageException) innerException).getErrorCode());
|
|
|
+
|
|
|
+ if (isFileNotFoundException((StorageException) innerException)) {
|
|
|
+ throw new FileNotFoundException(String.format("%s is not found", key));
|
|
|
+ }
|
|
|
}
|
|
|
- throw e;
|
|
|
+
|
|
|
+ throw e;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -757,7 +767,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
|
* there is no more data because the end of stream is reached.
|
|
|
*/
|
|
|
@Override
|
|
|
- public synchronized int read(byte[] b, int off, int len) throws IOException {
|
|
|
+ public synchronized int read(byte[] b, int off, int len) throws FileNotFoundException, IOException {
|
|
|
try {
|
|
|
int result = 0;
|
|
|
result = in.read(b, off, len);
|
|
@@ -772,29 +782,56 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
|
// Return to the caller with the result.
|
|
|
return result;
|
|
|
} catch(IOException e) {
|
|
|
- if (e.getCause() instanceof StorageException) {
|
|
|
- StorageException storageExcp = (StorageException) e.getCause();
|
|
|
+
|
|
|
+ Throwable innerException = checkForAzureStorageException(e);
|
|
|
+
|
|
|
+ if (innerException instanceof StorageException) {
|
|
|
+
|
|
|
LOG.error("Encountered Storage Exception for read on Blob : {}"
|
|
|
+ " Exception details: {} Error Code : {}",
|
|
|
- key, e.getMessage(), storageExcp.getErrorCode());
|
|
|
+ key, e, ((StorageException) innerException).getErrorCode());
|
|
|
+
|
|
|
+ if (isFileNotFoundException((StorageException) innerException)) {
|
|
|
+ throw new FileNotFoundException(String.format("%s is not found", key));
|
|
|
+ }
|
|
|
}
|
|
|
- throw e;
|
|
|
+
|
|
|
+ throw e;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void close() throws IOException {
|
|
|
- in.close();
|
|
|
- closed = true;
|
|
|
+ public synchronized void close() throws IOException {
|
|
|
+ if (!closed) {
|
|
|
+ closed = true;
|
|
|
+ IOUtils.closeStream(in);
|
|
|
+ in = null;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public synchronized void seek(long pos) throws IOException {
|
|
|
- in.close();
|
|
|
- in = store.retrieve(key);
|
|
|
- this.pos = in.skip(pos);
|
|
|
- LOG.debug("Seek to position {}. Bytes skipped {}", pos,
|
|
|
- this.pos);
|
|
|
+ public synchronized void seek(long pos) throws FileNotFoundException, EOFException, IOException {
|
|
|
+ try {
|
|
|
+ checkNotClosed();
|
|
|
+ if (pos < 0) {
|
|
|
+ throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
|
|
|
+ }
|
|
|
+ IOUtils.closeStream(in);
|
|
|
+ in = store.retrieve(key);
|
|
|
+ this.pos = in.skip(pos);
|
|
|
+ LOG.debug("Seek to position {}. Bytes skipped {}", pos,
|
|
|
+ this.pos);
|
|
|
+ } catch(IOException e) {
|
|
|
+
|
|
|
+ Throwable innerException = checkForAzureStorageException(e);
|
|
|
+
|
|
|
+ if (innerException instanceof StorageException
|
|
|
+ && isFileNotFoundException((StorageException) innerException)) {
|
|
|
+ throw new FileNotFoundException(String.format("%s is not found", key));
|
|
|
+ }
|
|
|
+
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -806,6 +843,50 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
|
public boolean seekToNewSource(long targetPos) throws IOException {
|
|
|
return false;
|
|
|
}
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Helper method to recursively check if the cause of the exception is
|
|
|
+ * a Azure storage exception.
|
|
|
+ */
|
|
|
+ private Throwable checkForAzureStorageException(IOException e) {
|
|
|
+
|
|
|
+ Throwable innerException = e.getCause();
|
|
|
+
|
|
|
+ while (innerException != null
|
|
|
+ && !(innerException instanceof StorageException)) {
|
|
|
+ innerException = innerException.getCause();
|
|
|
+ }
|
|
|
+
|
|
|
+ return innerException;
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Helper method to check if the AzureStorageException is
|
|
|
+ * because backing blob was not found.
|
|
|
+ */
|
|
|
+ private boolean isFileNotFoundException(StorageException e) {
|
|
|
+
|
|
|
+ String errorCode = ((StorageException) e).getErrorCode();
|
|
|
+ if (errorCode != null
|
|
|
+ && (errorCode.equals(StorageErrorCodeStrings.BLOB_NOT_FOUND)
|
|
|
+ || errorCode.equals(StorageErrorCodeStrings.RESOURCE_NOT_FOUND)
|
|
|
+ || errorCode.equals(StorageErrorCode.BLOB_NOT_FOUND.toString())
|
|
|
+ || errorCode.equals(StorageErrorCode.RESOURCE_NOT_FOUND.toString()))) {
|
|
|
+
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Helper method to check if a stream is closed.
|
|
|
+ */
|
|
|
+ private void checkNotClosed() throws IOException {
|
|
|
+ if (closed) {
|
|
|
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private class NativeAzureFsOutputStream extends OutputStream {
|