|
@@ -59,6 +59,7 @@ import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.Seekable;
|
|
|
import org.apache.hadoop.fs.StreamCapabilities;
|
|
|
import org.apache.hadoop.fs.Syncable;
|
|
|
+import org.apache.hadoop.fs.XAttrSetFlag;
|
|
|
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
|
|
|
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
|
|
|
import org.apache.hadoop.fs.azure.security.Constants;
|
|
@@ -184,7 +185,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
|
} catch (IOException e) {
|
|
|
this.committed = false;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if (!this.committed) {
|
|
|
LOG.error("Deleting corruped rename pending file {} \n {}",
|
|
|
redoFile, contents);
|
|
@@ -507,7 +508,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
|
/**
|
|
|
* Recover from a folder rename failure by redoing the intended work,
|
|
|
* as recorded in the -RenamePending.json file.
|
|
|
- *
|
|
|
+ *
|
|
|
* @throws IOException Thrown when fail to redo.
|
|
|
*/
|
|
|
public void redo() throws IOException {
|
|
@@ -675,7 +676,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
|
/**
|
|
|
* The time span in seconds before which we consider a temp blob to be
|
|
|
* dangling (not being actively uploaded to) and up for reclamation.
|
|
|
- *
|
|
|
+ *
|
|
|
* So e.g. if this is 60, then any temporary blobs more than a minute old
|
|
|
* would be considered dangling.
|
|
|
*/
|
|
@@ -1083,12 +1084,13 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
|
* write is that one byte is written to the output stream. The byte to be
|
|
|
* written is the eight low-order bits of the argument b. The 24 high-order
|
|
|
* bits of b are ignored.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param b
|
|
|
* 32-bit integer of block of 4 bytes
|
|
|
*/
|
|
|
@Override
|
|
|
public void write(int b) throws IOException {
|
|
|
+ checkOpen();
|
|
|
try {
|
|
|
out.write(b);
|
|
|
} catch(IOException e) {
|
|
@@ -1106,12 +1108,13 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
|
* Writes b.length bytes from the specified byte array to this output
|
|
|
* stream. The general contract for write(b) is that it should have exactly
|
|
|
* the same effect as the call write(b, 0, b.length).
|
|
|
- *
|
|
|
+ *
|
|
|
* @param b
|
|
|
* Block of bytes to be written to the output stream.
|
|
|
*/
|
|
|
@Override
|
|
|
public void write(byte[] b) throws IOException {
|
|
|
+ checkOpen();
|
|
|
try {
|
|
|
out.write(b);
|
|
|
} catch(IOException e) {
|
|
@@ -1132,7 +1135,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
|
* are written to the output stream in order; element <code>b[off]</code>
|
|
|
* is the first byte written and <code>b[off+len-1]</code> is the last
|
|
|
* byte written by this operation.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param b
|
|
|
* Byte array to be written.
|
|
|
* @param off
|
|
@@ -1142,6 +1145,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
|
*/
|
|
|
@Override
|
|
|
public void write(byte[] b, int off, int len) throws IOException {
|
|
|
+ checkOpen();
|
|
|
try {
|
|
|
out.write(b, off, len);
|
|
|
} catch(IOException e) {
|
|
@@ -1157,7 +1161,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
|
|
|
|
/**
|
|
|
* Get the blob name.
|
|
|
- *
|
|
|
+ *
|
|
|
* @return String Blob name.
|
|
|
*/
|
|
|
public String getKey() {
|
|
@@ -1166,7 +1170,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
|
|
|
|
/**
|
|
|
* Set the blob name.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param key
|
|
|
* Blob name.
|
|
|
*/
|
|
@@ -1176,7 +1180,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
|
|
|
|
/**
|
|
|
* Get the blob name.
|
|
|
- *
|
|
|
+ *
|
|
|
* @return String Blob name.
|
|
|
*/
|
|
|
public String getEncodedKey() {
|
|
@@ -1185,7 +1189,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
|
|
|
|
/**
|
|
|
* Set the blob name.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param anEncodedKey
|
|
|
* Blob name.
|
|
|
*/
|
|
@@ -1204,6 +1208,17 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
|
private void restoreKey() throws IOException {
|
|
|
store.rename(getEncodedKey(), getKey());
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Check for the stream being open.
|
|
|
+ * @throws IOException if the stream is closed.
|
|
|
+ */
|
|
|
+ private void checkOpen() throws IOException {
|
|
|
+ if (out == null) {
|
|
|
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
|
|
|
private URI uri;
|
|
@@ -3555,6 +3570,76 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Set the value of an attribute for a path.
|
|
|
+ *
|
|
|
+ * @param path The path on which to set the attribute
|
|
|
+ * @param xAttrName The attribute to set
|
|
|
+ * @param value The byte value of the attribute to set (encoded in utf-8)
|
|
|
+ * @param flag The mode in which to set the attribute
|
|
|
+ * @throws IOException If there was an issue setting the attribute on Azure
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void setXAttr(Path path, String xAttrName, byte[] value, EnumSet<XAttrSetFlag> flag) throws IOException {
|
|
|
+ Path absolutePath = makeAbsolute(path);
|
|
|
+ performAuthCheck(absolutePath, WasbAuthorizationOperations.WRITE, "setXAttr", absolutePath);
|
|
|
+
|
|
|
+ String key = pathToKey(absolutePath);
|
|
|
+ FileMetadata metadata;
|
|
|
+ try {
|
|
|
+ metadata = store.retrieveMetadata(key);
|
|
|
+ } catch (IOException ex) {
|
|
|
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
|
|
|
+ if (innerException instanceof StorageException
|
|
|
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
|
|
+ throw new FileNotFoundException("File " + path + " doesn't exists.");
|
|
|
+ }
|
|
|
+ throw ex;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (metadata == null) {
|
|
|
+ throw new FileNotFoundException("File doesn't exist: " + path);
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean xAttrExists = store.retrieveAttribute(key, xAttrName) != null;
|
|
|
+ XAttrSetFlag.validate(xAttrName, xAttrExists, flag);
|
|
|
+ store.storeAttribute(key, xAttrName, value);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the value of an attribute for a path.
|
|
|
+ *
|
|
|
+ * @param path The path on which to get the attribute
|
|
|
+ * @param xAttrName The attribute to get
|
|
|
+ * @return The bytes of the attribute's value (encoded in utf-8)
|
|
|
+ * or null if the attribute does not exist
|
|
|
+ * @throws IOException If there was an issue getting the attribute from Azure
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public byte[] getXAttr(Path path, String xAttrName) throws IOException {
|
|
|
+ Path absolutePath = makeAbsolute(path);
|
|
|
+ performAuthCheck(absolutePath, WasbAuthorizationOperations.READ, "getXAttr", absolutePath);
|
|
|
+
|
|
|
+ String key = pathToKey(absolutePath);
|
|
|
+ FileMetadata metadata;
|
|
|
+ try {
|
|
|
+ metadata = store.retrieveMetadata(key);
|
|
|
+ } catch (IOException ex) {
|
|
|
+ Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
|
|
|
+ if (innerException instanceof StorageException
|
|
|
+ && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
|
|
+ throw new FileNotFoundException("File " + path + " doesn't exists.");
|
|
|
+ }
|
|
|
+ throw ex;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (metadata == null) {
|
|
|
+ throw new FileNotFoundException("File doesn't exist: " + path);
|
|
|
+ }
|
|
|
+
|
|
|
+ return store.retrieveAttribute(key, xAttrName);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Is the user allowed?
|
|
|
* <ol>
|
|
@@ -3728,7 +3813,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
|
/**
|
|
|
* Implements recover and delete (-move and -delete) behaviors for handling
|
|
|
* dangling files (blobs whose upload was interrupted).
|
|
|
- *
|
|
|
+ *
|
|
|
* @param root
|
|
|
* The root path to check from.
|
|
|
* @param handler
|
|
@@ -3770,7 +3855,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
|
* the data to a temporary blob, but for some reason we crashed in the middle
|
|
|
* of the upload and left them there. If any are found, we move them to the
|
|
|
* destination given.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param root
|
|
|
* The root path to consider.
|
|
|
* @param destination
|
|
@@ -3790,7 +3875,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
|
* meaning that they are place-holder blobs that we created while we upload
|
|
|
* the data to a temporary blob, but for some reason we crashed in the middle
|
|
|
* of the upload and left them there. If any are found, we delete them.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param root
|
|
|
* The root path to consider.
|
|
|
* @throws IOException Thrown when fail to delete.
|
|
@@ -3812,7 +3897,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
|
* Encode the key with a random prefix for load balancing in Azure storage.
|
|
|
* Upload data to a random temporary file then do storage side renaming to
|
|
|
* recover the original key.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param aKey a key to be encoded.
|
|
|
* @return Encoded version of the original key.
|
|
|
*/
|