|
@@ -1106,7 +1106,31 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
// A counter to create unique (within-process) names for my metrics sources.
|
|
// A counter to create unique (within-process) names for my metrics sources.
|
|
private static AtomicInteger metricsSourceNameCounter = new AtomicInteger();
|
|
private static AtomicInteger metricsSourceNameCounter = new AtomicInteger();
|
|
private boolean appendSupportEnabled = false;
|
|
private boolean appendSupportEnabled = false;
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Configuration key to enable authorization support in WASB.
|
|
|
|
+ */
|
|
|
|
+ public static final String KEY_AZURE_AUTHORIZATION =
|
|
|
|
+ "fs.azure.authorization";
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Default value for the authorization support in WASB.
|
|
|
|
+ */
|
|
|
|
+ private static final boolean DEFAULT_AZURE_AUTHORIZATION = false;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Flag controlling authorization support in WASB.
|
|
|
|
+ */
|
|
|
|
+ private boolean azureAuthorization = false;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Authorizer to use when authorization support is enabled in
|
|
|
|
+ * WASB.
|
|
|
|
+ */
|
|
|
|
+ private WasbAuthorizerInterface authorizer = null;
|
|
|
|
+
|
|
|
|
+ private String delegationToken = null;
|
|
|
|
+
|
|
public NativeAzureFileSystem() {
|
|
public NativeAzureFileSystem() {
|
|
// set store in initialize()
|
|
// set store in initialize()
|
|
}
|
|
}
|
|
@@ -1146,11 +1170,11 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
return baseName + number;
|
|
return baseName + number;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Checks if the given URI scheme is a scheme that's affiliated with the Azure
|
|
* Checks if the given URI scheme is a scheme that's affiliated with the Azure
|
|
* File System.
|
|
* File System.
|
|
- *
|
|
|
|
|
|
+ *
|
|
* @param scheme
|
|
* @param scheme
|
|
* The URI scheme.
|
|
* The URI scheme.
|
|
* @return true iff it's an Azure File System URI scheme.
|
|
* @return true iff it's an Azure File System URI scheme.
|
|
@@ -1167,7 +1191,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
/**
|
|
/**
|
|
* Puts in the authority of the default file system if it is a WASB file
|
|
* Puts in the authority of the default file system if it is a WASB file
|
|
* system and the given URI's authority is null.
|
|
* system and the given URI's authority is null.
|
|
- *
|
|
|
|
|
|
+ *
|
|
* @return The URI with reconstructed authority if necessary and possible.
|
|
* @return The URI with reconstructed authority if necessary and possible.
|
|
*/
|
|
*/
|
|
private static URI reconstructAuthorityIfNeeded(URI uri, Configuration conf) {
|
|
private static URI reconstructAuthorityIfNeeded(URI uri, Configuration conf) {
|
|
@@ -1237,6 +1261,24 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
// Initialize thread counts from user configuration
|
|
// Initialize thread counts from user configuration
|
|
deleteThreadCount = conf.getInt(AZURE_DELETE_THREADS, DEFAULT_AZURE_DELETE_THREADS);
|
|
deleteThreadCount = conf.getInt(AZURE_DELETE_THREADS, DEFAULT_AZURE_DELETE_THREADS);
|
|
renameThreadCount = conf.getInt(AZURE_RENAME_THREADS, DEFAULT_AZURE_RENAME_THREADS);
|
|
renameThreadCount = conf.getInt(AZURE_RENAME_THREADS, DEFAULT_AZURE_RENAME_THREADS);
|
|
|
|
+
|
|
|
|
+ boolean useSecureMode = conf.getBoolean(AzureNativeFileSystemStore.KEY_USE_SECURE_MODE,
|
|
|
|
+ AzureNativeFileSystemStore.DEFAULT_USE_SECURE_MODE);
|
|
|
|
+
|
|
|
|
+ this.azureAuthorization = useSecureMode &&
|
|
|
|
+ conf.getBoolean(KEY_AZURE_AUTHORIZATION, DEFAULT_AZURE_AUTHORIZATION);
|
|
|
|
+
|
|
|
|
+ if (this.azureAuthorization) {
|
|
|
|
+
|
|
|
|
+ this.authorizer =
|
|
|
|
+ new RemoteWasbAuthorizerImpl();
|
|
|
|
+ authorizer.init(conf);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ public void updateWasbAuthorizer(WasbAuthorizerInterface authorizer) {
|
|
|
|
+ this.authorizer = authorizer;
|
|
}
|
|
}
|
|
|
|
|
|
private NativeFileSystemStore createDefaultStore(Configuration conf) {
|
|
private NativeFileSystemStore createDefaultStore(Configuration conf) {
|
|
@@ -1338,18 +1380,28 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
/**
|
|
/**
|
|
* For unit test purposes, retrieves the AzureNativeFileSystemStore store
|
|
* For unit test purposes, retrieves the AzureNativeFileSystemStore store
|
|
* backing this file system.
|
|
* backing this file system.
|
|
- *
|
|
|
|
|
|
+ *
|
|
* @return The store object.
|
|
* @return The store object.
|
|
*/
|
|
*/
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
public AzureNativeFileSystemStore getStore() {
|
|
public AzureNativeFileSystemStore getStore() {
|
|
return actualStore;
|
|
return actualStore;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
NativeFileSystemStore getStoreInterface() {
|
|
NativeFileSystemStore getStoreInterface() {
|
|
return store;
|
|
return store;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private void performAuthCheck(String path, String accessType,
|
|
|
|
+ String operation) throws WasbAuthorizationException, IOException {
|
|
|
|
+
|
|
|
|
+ if (azureAuthorization && this.authorizer != null &&
|
|
|
|
+ !this.authorizer.authorize(path, accessType, delegationToken)) {
|
|
|
|
+ throw new WasbAuthorizationException(operation
|
|
|
|
+ + " operation for Path : " + path + " not allowed");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Gets the metrics source for this file system.
|
|
* Gets the metrics source for this file system.
|
|
* This is mainly here for unit testing purposes.
|
|
* This is mainly here for unit testing purposes.
|
|
@@ -1372,6 +1424,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
LOG.debug("Opening file: {} for append", f);
|
|
LOG.debug("Opening file: {} for append", f);
|
|
|
|
|
|
Path absolutePath = makeAbsolute(f);
|
|
Path absolutePath = makeAbsolute(f);
|
|
|
|
+
|
|
|
|
+ performAuthCheck(absolutePath.toString(),
|
|
|
|
+ WasbAuthorizationOperations.WRITE.toString(), "append");
|
|
|
|
+
|
|
String key = pathToKey(absolutePath);
|
|
String key = pathToKey(absolutePath);
|
|
FileMetadata meta = null;
|
|
FileMetadata meta = null;
|
|
try {
|
|
try {
|
|
@@ -1434,6 +1490,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
* Get a self-renewing lease on the specified file.
|
|
* Get a self-renewing lease on the specified file.
|
|
* @param path path whose lease to be renewed.
|
|
* @param path path whose lease to be renewed.
|
|
* @return Lease
|
|
* @return Lease
|
|
|
|
+ * @throws AzureException when not being able to acquire a lease on the path
|
|
*/
|
|
*/
|
|
public SelfRenewingLease acquireLease(Path path) throws AzureException {
|
|
public SelfRenewingLease acquireLease(Path path) throws AzureException {
|
|
String fullKey = pathToKey(makeAbsolute(path));
|
|
String fullKey = pathToKey(makeAbsolute(path));
|
|
@@ -1572,6 +1629,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
}
|
|
}
|
|
|
|
|
|
Path absolutePath = makeAbsolute(f);
|
|
Path absolutePath = makeAbsolute(f);
|
|
|
|
+
|
|
|
|
+ performAuthCheck(absolutePath.toString(),
|
|
|
|
+ WasbAuthorizationOperations.WRITE.toString(), "create");
|
|
|
|
+
|
|
String key = pathToKey(absolutePath);
|
|
String key = pathToKey(absolutePath);
|
|
|
|
|
|
FileMetadata existingMetadata = store.retrieveMetadata(key);
|
|
FileMetadata existingMetadata = store.retrieveMetadata(key);
|
|
@@ -1652,10 +1713,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
// Construct the data output stream from the buffered output stream.
|
|
// Construct the data output stream from the buffered output stream.
|
|
FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics);
|
|
FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics);
|
|
|
|
|
|
-
|
|
|
|
|
|
+
|
|
// Increment the counter
|
|
// Increment the counter
|
|
instrumentation.fileCreated();
|
|
instrumentation.fileCreated();
|
|
-
|
|
|
|
|
|
+
|
|
// Return data output stream to caller.
|
|
// Return data output stream to caller.
|
|
return fsOut;
|
|
return fsOut;
|
|
}
|
|
}
|
|
@@ -1694,6 +1755,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
LOG.debug("Deleting file: {}", f.toString());
|
|
LOG.debug("Deleting file: {}", f.toString());
|
|
|
|
|
|
Path absolutePath = makeAbsolute(f);
|
|
Path absolutePath = makeAbsolute(f);
|
|
|
|
+
|
|
|
|
+ performAuthCheck(absolutePath.toString(),
|
|
|
|
+ WasbAuthorizationOperations.EXECUTE.toString(), "delete");
|
|
|
|
+
|
|
String key = pathToKey(absolutePath);
|
|
String key = pathToKey(absolutePath);
|
|
|
|
|
|
// Capture the metadata for the path.
|
|
// Capture the metadata for the path.
|
|
@@ -1964,6 +2029,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
|
|
|
|
// Capture the absolute path and the path to key.
|
|
// Capture the absolute path and the path to key.
|
|
Path absolutePath = makeAbsolute(f);
|
|
Path absolutePath = makeAbsolute(f);
|
|
|
|
+
|
|
|
|
+ performAuthCheck(absolutePath.toString(),
|
|
|
|
+ WasbAuthorizationOperations.EXECUTE.toString(), "getFileStatus");
|
|
|
|
+
|
|
String key = pathToKey(absolutePath);
|
|
String key = pathToKey(absolutePath);
|
|
if (key.length() == 0) { // root always exists
|
|
if (key.length() == 0) { // root always exists
|
|
return newDirectory(null, absolutePath);
|
|
return newDirectory(null, absolutePath);
|
|
@@ -2062,6 +2131,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
LOG.debug("Listing status for {}", f.toString());
|
|
LOG.debug("Listing status for {}", f.toString());
|
|
|
|
|
|
Path absolutePath = makeAbsolute(f);
|
|
Path absolutePath = makeAbsolute(f);
|
|
|
|
+
|
|
|
|
+ performAuthCheck(absolutePath.toString(),
|
|
|
|
+ WasbAuthorizationOperations.EXECUTE.toString(), "list");
|
|
|
|
+
|
|
String key = pathToKey(absolutePath);
|
|
String key = pathToKey(absolutePath);
|
|
Set<FileStatus> status = new TreeSet<FileStatus>();
|
|
Set<FileStatus> status = new TreeSet<FileStatus>();
|
|
FileMetadata meta = null;
|
|
FileMetadata meta = null;
|
|
@@ -2228,7 +2301,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
|
|
|
|
/**
|
|
/**
|
|
* Applies the applicable UMASK's on the given permission.
|
|
* Applies the applicable UMASK's on the given permission.
|
|
- *
|
|
|
|
|
|
+ *
|
|
* @param permission
|
|
* @param permission
|
|
* The permission to mask.
|
|
* The permission to mask.
|
|
* @param applyMode
|
|
* @param applyMode
|
|
@@ -2250,7 +2323,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
/**
|
|
/**
|
|
* Creates the PermissionStatus object to use for the given permission, based
|
|
* Creates the PermissionStatus object to use for the given permission, based
|
|
* on the current user in context.
|
|
* on the current user in context.
|
|
- *
|
|
|
|
|
|
+ *
|
|
* @param permission
|
|
* @param permission
|
|
* The permission for the file.
|
|
* The permission for the file.
|
|
* @return The permission status object to use.
|
|
* @return The permission status object to use.
|
|
@@ -2284,6 +2357,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
}
|
|
}
|
|
|
|
|
|
Path absolutePath = makeAbsolute(f);
|
|
Path absolutePath = makeAbsolute(f);
|
|
|
|
+
|
|
|
|
+ performAuthCheck(absolutePath.toString(),
|
|
|
|
+ WasbAuthorizationOperations.EXECUTE.toString(), "mkdirs");
|
|
|
|
+
|
|
PermissionStatus permissionStatus = null;
|
|
PermissionStatus permissionStatus = null;
|
|
if(noUmask) {
|
|
if(noUmask) {
|
|
// ensure owner still has wx permissions at the minimum
|
|
// ensure owner still has wx permissions at the minimum
|
|
@@ -2337,6 +2414,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
LOG.debug("Opening file: {}", f.toString());
|
|
LOG.debug("Opening file: {}", f.toString());
|
|
|
|
|
|
Path absolutePath = makeAbsolute(f);
|
|
Path absolutePath = makeAbsolute(f);
|
|
|
|
+
|
|
|
|
+ performAuthCheck(absolutePath.toString(),
|
|
|
|
+ WasbAuthorizationOperations.READ.toString(), "read");
|
|
|
|
+
|
|
String key = pathToKey(absolutePath);
|
|
String key = pathToKey(absolutePath);
|
|
FileMetadata meta = null;
|
|
FileMetadata meta = null;
|
|
try {
|
|
try {
|
|
@@ -2393,7 +2474,12 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
+ " through WASB that has colons in the name");
|
|
+ " through WASB that has colons in the name");
|
|
}
|
|
}
|
|
|
|
|
|
- String srcKey = pathToKey(makeAbsolute(src));
|
|
|
|
|
|
+ Path absolutePath = makeAbsolute(src);
|
|
|
|
+
|
|
|
|
+ performAuthCheck(absolutePath.toString(),
|
|
|
|
+ WasbAuthorizationOperations.EXECUTE.toString(), "rename");
|
|
|
|
+
|
|
|
|
+ String srcKey = pathToKey(absolutePath);
|
|
|
|
|
|
if (srcKey.length() == 0) {
|
|
if (srcKey.length() == 0) {
|
|
// Cannot rename root of file system
|
|
// Cannot rename root of file system
|
|
@@ -2695,6 +2781,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
@Override
|
|
@Override
|
|
public void setPermission(Path p, FsPermission permission) throws FileNotFoundException, IOException {
|
|
public void setPermission(Path p, FsPermission permission) throws FileNotFoundException, IOException {
|
|
Path absolutePath = makeAbsolute(p);
|
|
Path absolutePath = makeAbsolute(p);
|
|
|
|
+
|
|
|
|
+ performAuthCheck(absolutePath.toString(),
|
|
|
|
+ WasbAuthorizationOperations.EXECUTE.toString(), "setPermission");
|
|
|
|
+
|
|
String key = pathToKey(absolutePath);
|
|
String key = pathToKey(absolutePath);
|
|
FileMetadata metadata = null;
|
|
FileMetadata metadata = null;
|
|
try {
|
|
try {
|
|
@@ -2733,6 +2823,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
public void setOwner(Path p, String username, String groupname)
|
|
public void setOwner(Path p, String username, String groupname)
|
|
throws IOException {
|
|
throws IOException {
|
|
Path absolutePath = makeAbsolute(p);
|
|
Path absolutePath = makeAbsolute(p);
|
|
|
|
+
|
|
|
|
+ performAuthCheck(absolutePath.toString(),
|
|
|
|
+ WasbAuthorizationOperations.EXECUTE.toString(), "setOwner");
|
|
|
|
+
|
|
String key = pathToKey(absolutePath);
|
|
String key = pathToKey(absolutePath);
|
|
FileMetadata metadata = null;
|
|
FileMetadata metadata = null;
|
|
|
|
|