|
@@ -28,6 +28,7 @@ import java.net.URI;
|
|
import java.net.URISyntaxException;
|
|
import java.net.URISyntaxException;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
|
|
+import java.util.Arrays;
|
|
import java.util.EnumSet;
|
|
import java.util.EnumSet;
|
|
import java.util.concurrent.Callable;
|
|
import java.util.concurrent.Callable;
|
|
import java.util.concurrent.ExecutionException;
|
|
import java.util.concurrent.ExecutionException;
|
|
@@ -62,9 +63,12 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnh
|
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriAuthorityException;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriAuthorityException;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
|
|
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
|
|
|
|
+import org.apache.hadoop.fs.azurebfs.extensions.AbfsAuthorizationException;
|
|
|
|
+import org.apache.hadoop.fs.azurebfs.extensions.AbfsAuthorizer;
|
|
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
|
|
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
|
|
import org.apache.hadoop.fs.permission.AclEntry;
|
|
import org.apache.hadoop.fs.permission.AclEntry;
|
|
import org.apache.hadoop.fs.permission.AclStatus;
|
|
import org.apache.hadoop.fs.permission.AclStatus;
|
|
|
|
+import org.apache.hadoop.fs.permission.FsAction;
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
import org.apache.hadoop.security.token.Token;
|
|
import org.apache.hadoop.security.token.Token;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
@@ -87,6 +91,7 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
|
|
|
|
private boolean delegationTokenEnabled = false;
|
|
private boolean delegationTokenEnabled = false;
|
|
private AbfsDelegationTokenManager delegationTokenManager;
|
|
private AbfsDelegationTokenManager delegationTokenManager;
|
|
|
|
+ private AbfsAuthorizer authorizer;
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void initialize(URI uri, Configuration configuration)
|
|
public void initialize(URI uri, Configuration configuration)
|
|
@@ -132,6 +137,10 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
}
|
|
}
|
|
|
|
|
|
AbfsClientThrottlingIntercept.initializeSingleton(abfsConfiguration.isAutoThrottlingEnabled());
|
|
AbfsClientThrottlingIntercept.initializeSingleton(abfsConfiguration.isAutoThrottlingEnabled());
|
|
|
|
+
|
|
|
|
+ // Initialize ABFS authorizer
|
|
|
|
+ //
|
|
|
|
+ this.authorizer = abfsConfiguration.getAbfsAuthorizer();
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -158,8 +167,11 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
public FSDataInputStream open(final Path path, final int bufferSize) throws IOException {
|
|
public FSDataInputStream open(final Path path, final int bufferSize) throws IOException {
|
|
LOG.debug("AzureBlobFileSystem.open path: {} bufferSize: {}", path, bufferSize);
|
|
LOG.debug("AzureBlobFileSystem.open path: {} bufferSize: {}", path, bufferSize);
|
|
|
|
|
|
|
|
+ Path qualifiedPath = makeQualified(path);
|
|
|
|
+ performAbfsAuthCheck(FsAction.READ, qualifiedPath);
|
|
|
|
+
|
|
try {
|
|
try {
|
|
- InputStream inputStream = abfsStore.openFileForRead(makeQualified(path), statistics);
|
|
|
|
|
|
+ InputStream inputStream = abfsStore.openFileForRead(qualifiedPath, statistics);
|
|
return new FSDataInputStream(inputStream);
|
|
return new FSDataInputStream(inputStream);
|
|
} catch(AzureBlobFileSystemException ex) {
|
|
} catch(AzureBlobFileSystemException ex) {
|
|
checkException(path, ex);
|
|
checkException(path, ex);
|
|
@@ -176,8 +188,11 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
overwrite,
|
|
overwrite,
|
|
blockSize);
|
|
blockSize);
|
|
|
|
|
|
|
|
+ Path qualifiedPath = makeQualified(f);
|
|
|
|
+ performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
|
|
|
|
+
|
|
try {
|
|
try {
|
|
- OutputStream outputStream = abfsStore.createFile(makeQualified(f), overwrite,
|
|
|
|
|
|
+ OutputStream outputStream = abfsStore.createFile(qualifiedPath, overwrite,
|
|
permission == null ? FsPermission.getFileDefault() : permission, FsPermission.getUMask(getConf()));
|
|
permission == null ? FsPermission.getFileDefault() : permission, FsPermission.getUMask(getConf()));
|
|
return new FSDataOutputStream(outputStream, statistics);
|
|
return new FSDataOutputStream(outputStream, statistics);
|
|
} catch(AzureBlobFileSystemException ex) {
|
|
} catch(AzureBlobFileSystemException ex) {
|
|
@@ -236,8 +251,11 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
f.toString(),
|
|
f.toString(),
|
|
bufferSize);
|
|
bufferSize);
|
|
|
|
|
|
|
|
+ Path qualifiedPath = makeQualified(f);
|
|
|
|
+ performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
|
|
|
|
+
|
|
try {
|
|
try {
|
|
- OutputStream outputStream = abfsStore.openFileForWrite(makeQualified(f), false);
|
|
|
|
|
|
+ OutputStream outputStream = abfsStore.openFileForWrite(qualifiedPath, false);
|
|
return new FSDataOutputStream(outputStream, statistics);
|
|
return new FSDataOutputStream(outputStream, statistics);
|
|
} catch(AzureBlobFileSystemException ex) {
|
|
} catch(AzureBlobFileSystemException ex) {
|
|
checkException(f, ex);
|
|
checkException(f, ex);
|
|
@@ -267,7 +285,11 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
adjustedDst = new Path(dst, sourceFileName);
|
|
adjustedDst = new Path(dst, sourceFileName);
|
|
}
|
|
}
|
|
|
|
|
|
- abfsStore.rename(makeQualified(src), makeQualified(adjustedDst));
|
|
|
|
|
|
+ Path qualifiedSrcPath = makeQualified(src);
|
|
|
|
+ Path qualifiedDstPath = makeQualified(adjustedDst);
|
|
|
|
+ performAbfsAuthCheck(FsAction.READ_WRITE, qualifiedSrcPath, qualifiedDstPath);
|
|
|
|
+
|
|
|
|
+ abfsStore.rename(qualifiedSrcPath, qualifiedDstPath);
|
|
return true;
|
|
return true;
|
|
} catch(AzureBlobFileSystemException ex) {
|
|
} catch(AzureBlobFileSystemException ex) {
|
|
checkException(
|
|
checkException(
|
|
@@ -289,6 +311,9 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
LOG.debug(
|
|
LOG.debug(
|
|
"AzureBlobFileSystem.delete path: {} recursive: {}", f.toString(), recursive);
|
|
"AzureBlobFileSystem.delete path: {} recursive: {}", f.toString(), recursive);
|
|
|
|
|
|
|
|
+ Path qualifiedPath = makeQualified(f);
|
|
|
|
+ performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
|
|
|
|
+
|
|
if (f.isRoot()) {
|
|
if (f.isRoot()) {
|
|
if (!recursive) {
|
|
if (!recursive) {
|
|
return false;
|
|
return false;
|
|
@@ -298,7 +323,7 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
}
|
|
}
|
|
|
|
|
|
try {
|
|
try {
|
|
- abfsStore.delete(makeQualified(f), recursive);
|
|
|
|
|
|
+ abfsStore.delete(qualifiedPath, recursive);
|
|
return true;
|
|
return true;
|
|
} catch (AzureBlobFileSystemException ex) {
|
|
} catch (AzureBlobFileSystemException ex) {
|
|
checkException(f, ex, AzureServiceErrorCode.PATH_NOT_FOUND);
|
|
checkException(f, ex, AzureServiceErrorCode.PATH_NOT_FOUND);
|
|
@@ -312,8 +337,11 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
LOG.debug(
|
|
LOG.debug(
|
|
"AzureBlobFileSystem.listStatus path: {}", f.toString());
|
|
"AzureBlobFileSystem.listStatus path: {}", f.toString());
|
|
|
|
|
|
|
|
+ Path qualifiedPath = makeQualified(f);
|
|
|
|
+ performAbfsAuthCheck(FsAction.READ, qualifiedPath);
|
|
|
|
+
|
|
try {
|
|
try {
|
|
- FileStatus[] result = abfsStore.listStatus(makeQualified(f));
|
|
|
|
|
|
+ FileStatus[] result = abfsStore.listStatus(qualifiedPath);
|
|
return result;
|
|
return result;
|
|
} catch (AzureBlobFileSystemException ex) {
|
|
} catch (AzureBlobFileSystemException ex) {
|
|
checkException(f, ex);
|
|
checkException(f, ex);
|
|
@@ -332,8 +360,11 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ Path qualifiedPath = makeQualified(f);
|
|
|
|
+ performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
|
|
|
|
+
|
|
try {
|
|
try {
|
|
- abfsStore.createDirectory(makeQualified(f), permission == null ? FsPermission.getDirDefault() : permission,
|
|
|
|
|
|
+ abfsStore.createDirectory(qualifiedPath, permission == null ? FsPermission.getDirDefault() : permission,
|
|
FsPermission.getUMask(getConf()));
|
|
FsPermission.getUMask(getConf()));
|
|
return true;
|
|
return true;
|
|
} catch (AzureBlobFileSystemException ex) {
|
|
} catch (AzureBlobFileSystemException ex) {
|
|
@@ -357,8 +388,11 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
public FileStatus getFileStatus(final Path f) throws IOException {
|
|
public FileStatus getFileStatus(final Path f) throws IOException {
|
|
LOG.debug("AzureBlobFileSystem.getFileStatus path: {}", f);
|
|
LOG.debug("AzureBlobFileSystem.getFileStatus path: {}", f);
|
|
|
|
|
|
|
|
+ Path qualifiedPath = makeQualified(f);
|
|
|
|
+ performAbfsAuthCheck(FsAction.READ, qualifiedPath);
|
|
|
|
+
|
|
try {
|
|
try {
|
|
- return abfsStore.getFileStatus(makeQualified(f));
|
|
|
|
|
|
+ return abfsStore.getFileStatus(qualifiedPath);
|
|
} catch(AzureBlobFileSystemException ex) {
|
|
} catch(AzureBlobFileSystemException ex) {
|
|
checkException(f, ex);
|
|
checkException(f, ex);
|
|
return null;
|
|
return null;
|
|
@@ -528,8 +562,11 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
throw new IllegalArgumentException("A valid owner or group must be specified.");
|
|
throw new IllegalArgumentException("A valid owner or group must be specified.");
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ Path qualifiedPath = makeQualified(path);
|
|
|
|
+ performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
|
|
|
|
+
|
|
try {
|
|
try {
|
|
- abfsStore.setOwner(makeQualified(path),
|
|
|
|
|
|
+ abfsStore.setOwner(qualifiedPath,
|
|
owner,
|
|
owner,
|
|
group);
|
|
group);
|
|
} catch (AzureBlobFileSystemException ex) {
|
|
} catch (AzureBlobFileSystemException ex) {
|
|
@@ -556,8 +593,11 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
throw new IllegalArgumentException("The permission can't be null");
|
|
throw new IllegalArgumentException("The permission can't be null");
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ Path qualifiedPath = makeQualified(path);
|
|
|
|
+ performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
|
|
|
|
+
|
|
try {
|
|
try {
|
|
- abfsStore.setPermission(makeQualified(path),
|
|
|
|
|
|
+ abfsStore.setPermission(qualifiedPath,
|
|
permission);
|
|
permission);
|
|
} catch (AzureBlobFileSystemException ex) {
|
|
} catch (AzureBlobFileSystemException ex) {
|
|
checkException(path, ex);
|
|
checkException(path, ex);
|
|
@@ -589,8 +629,11 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
throw new IllegalArgumentException("The value of the aclSpec parameter is invalid.");
|
|
throw new IllegalArgumentException("The value of the aclSpec parameter is invalid.");
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ Path qualifiedPath = makeQualified(path);
|
|
|
|
+ performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
|
|
|
|
+
|
|
try {
|
|
try {
|
|
- abfsStore.modifyAclEntries(makeQualified(path),
|
|
|
|
|
|
+ abfsStore.modifyAclEntries(qualifiedPath,
|
|
aclSpec);
|
|
aclSpec);
|
|
} catch (AzureBlobFileSystemException ex) {
|
|
} catch (AzureBlobFileSystemException ex) {
|
|
checkException(path, ex);
|
|
checkException(path, ex);
|
|
@@ -620,8 +663,11 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
throw new IllegalArgumentException("The aclSpec argument is invalid.");
|
|
throw new IllegalArgumentException("The aclSpec argument is invalid.");
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ Path qualifiedPath = makeQualified(path);
|
|
|
|
+ performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
|
|
|
|
+
|
|
try {
|
|
try {
|
|
- abfsStore.removeAclEntries(makeQualified(path), aclSpec);
|
|
|
|
|
|
+ abfsStore.removeAclEntries(qualifiedPath, aclSpec);
|
|
} catch (AzureBlobFileSystemException ex) {
|
|
} catch (AzureBlobFileSystemException ex) {
|
|
checkException(path, ex);
|
|
checkException(path, ex);
|
|
}
|
|
}
|
|
@@ -643,8 +689,11 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
+ "hierarchical namespace enabled.");
|
|
+ "hierarchical namespace enabled.");
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ Path qualifiedPath = makeQualified(path);
|
|
|
|
+ performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
|
|
|
|
+
|
|
try {
|
|
try {
|
|
- abfsStore.removeDefaultAcl(makeQualified(path));
|
|
|
|
|
|
+ abfsStore.removeDefaultAcl(qualifiedPath);
|
|
} catch (AzureBlobFileSystemException ex) {
|
|
} catch (AzureBlobFileSystemException ex) {
|
|
checkException(path, ex);
|
|
checkException(path, ex);
|
|
}
|
|
}
|
|
@@ -668,8 +717,11 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
+ "hierarchical namespace enabled.");
|
|
+ "hierarchical namespace enabled.");
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ Path qualifiedPath = makeQualified(path);
|
|
|
|
+ performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
|
|
|
|
+
|
|
try {
|
|
try {
|
|
- abfsStore.removeAcl(makeQualified(path));
|
|
|
|
|
|
+ abfsStore.removeAcl(qualifiedPath);
|
|
} catch (AzureBlobFileSystemException ex) {
|
|
} catch (AzureBlobFileSystemException ex) {
|
|
checkException(path, ex);
|
|
checkException(path, ex);
|
|
}
|
|
}
|
|
@@ -700,8 +752,11 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
throw new IllegalArgumentException("The aclSpec argument is invalid.");
|
|
throw new IllegalArgumentException("The aclSpec argument is invalid.");
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ Path qualifiedPath = makeQualified(path);
|
|
|
|
+ performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
|
|
|
|
+
|
|
try {
|
|
try {
|
|
- abfsStore.setAcl(makeQualified(path), aclSpec);
|
|
|
|
|
|
+ abfsStore.setAcl(qualifiedPath, aclSpec);
|
|
} catch (AzureBlobFileSystemException ex) {
|
|
} catch (AzureBlobFileSystemException ex) {
|
|
checkException(path, ex);
|
|
checkException(path, ex);
|
|
}
|
|
}
|
|
@@ -724,8 +779,11 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
+ "hierarchical namespace enabled.");
|
|
+ "hierarchical namespace enabled.");
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ Path qualifiedPath = makeQualified(path);
|
|
|
|
+ performAbfsAuthCheck(FsAction.READ, qualifiedPath);
|
|
|
|
+
|
|
try {
|
|
try {
|
|
- return abfsStore.getAclStatus(makeQualified(path));
|
|
|
|
|
|
+ return abfsStore.getAclStatus(qualifiedPath);
|
|
} catch (AzureBlobFileSystemException ex) {
|
|
} catch (AzureBlobFileSystemException ex) {
|
|
checkException(path, ex);
|
|
checkException(path, ex);
|
|
return null;
|
|
return null;
|
|
@@ -950,4 +1008,30 @@ public class AzureBlobFileSystem extends FileSystem {
|
|
boolean getIsNamespaceEnabeld() throws AzureBlobFileSystemException {
|
|
boolean getIsNamespaceEnabeld() throws AzureBlobFileSystemException {
|
|
return abfsStore.getIsNamespaceEnabled();
|
|
return abfsStore.getIsNamespaceEnabled();
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Use ABFS authorizer to check if user is authorized to perform specific
|
|
|
|
+ * {@link FsAction} on specified {@link Path}s.
|
|
|
|
+ *
|
|
|
|
+ * @param action The {@link FsAction} being requested on the provided {@link Path}s.
|
|
|
|
+ * @param paths The absolute paths of the storage being accessed.
|
|
|
|
+ * @throws AbfsAuthorizationException on authorization failure.
|
|
|
|
+ * @throws IOException network problems or similar.
|
|
|
|
+ * @throws IllegalArgumentException if the required parameters are not provided.
|
|
|
|
+ */
|
|
|
|
+ private void performAbfsAuthCheck(FsAction action, Path... paths)
|
|
|
|
+ throws AbfsAuthorizationException, IOException {
|
|
|
|
+ if (authorizer == null) {
|
|
|
|
+ LOG.debug("ABFS authorizer is not initialized. No authorization check will be performed.");
|
|
|
|
+ } else {
|
|
|
|
+ Preconditions.checkArgument(paths.length > 0, "no paths supplied for authorization check");
|
|
|
|
+
|
|
|
|
+ LOG.debug("Auth check for action: {} on paths: {}", action.toString(), Arrays.toString(paths));
|
|
|
|
+ if (!authorizer.isAuthorized(action, paths)) {
|
|
|
|
+ throw new AbfsAuthorizationException(
|
|
|
|
+ "User is not authorized for action " + action.toString()
|
|
|
|
+ + " on paths: " + Arrays.toString(paths));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|