|
@@ -25,12 +25,9 @@ import java.io.FileNotFoundException;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.io.InputStream;
|
|
import java.io.InputStream;
|
|
import java.io.OutputStream;
|
|
import java.io.OutputStream;
|
|
-import java.net.InetAddress;
|
|
|
|
import java.net.URI;
|
|
import java.net.URI;
|
|
import java.net.URISyntaxException;
|
|
import java.net.URISyntaxException;
|
|
-import java.net.URL;
|
|
|
|
import java.nio.charset.Charset;
|
|
import java.nio.charset.Charset;
|
|
-import java.security.PrivilegedExceptionAction;
|
|
|
|
import java.text.SimpleDateFormat;
|
|
import java.text.SimpleDateFormat;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Date;
|
|
import java.util.Date;
|
|
@@ -64,15 +61,10 @@ import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
|
|
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
|
|
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
|
|
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
|
|
-import org.apache.hadoop.fs.azure.security.Constants;
|
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
import org.apache.hadoop.fs.permission.PermissionStatus;
|
|
import org.apache.hadoop.fs.permission.PermissionStatus;
|
|
import org.apache.hadoop.io.IOUtils;
|
|
import org.apache.hadoop.io.IOUtils;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
-import org.apache.hadoop.security.token.Token;
|
|
|
|
-import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
|
|
|
|
-import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
|
|
|
|
-import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
|
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
import org.apache.hadoop.util.Progressable;
|
|
import org.apache.hadoop.util.Time;
|
|
import org.apache.hadoop.util.Time;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
@@ -1114,39 +1106,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
// A counter to create unique (within-process) names for my metrics sources.
|
|
// A counter to create unique (within-process) names for my metrics sources.
|
|
private static AtomicInteger metricsSourceNameCounter = new AtomicInteger();
|
|
private static AtomicInteger metricsSourceNameCounter = new AtomicInteger();
|
|
private boolean appendSupportEnabled = false;
|
|
private boolean appendSupportEnabled = false;
|
|
- private DelegationTokenAuthenticatedURL authURL;
|
|
|
|
- private DelegationTokenAuthenticatedURL.Token authToken = new DelegationTokenAuthenticatedURL.Token();
|
|
|
|
- private String credServiceUrl;
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Configuration key to enable authorization support in WASB.
|
|
|
|
- */
|
|
|
|
- public static final String KEY_AZURE_AUTHORIZATION =
|
|
|
|
- "fs.azure.authorization";
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Default value for the authorization support in WASB.
|
|
|
|
- */
|
|
|
|
- private static final boolean DEFAULT_AZURE_AUTHORIZATION = false;
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Flag controlling authorization support in WASB.
|
|
|
|
- */
|
|
|
|
- private boolean azureAuthorization = false;
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Flag controlling Kerberos support in WASB.
|
|
|
|
- */
|
|
|
|
- private boolean kerberosSupportEnabled = false;
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Authorizer to use when authorization support is enabled in
|
|
|
|
- * WASB.
|
|
|
|
- */
|
|
|
|
- private WasbAuthorizerInterface authorizer = null;
|
|
|
|
-
|
|
|
|
- private String delegationToken = null;
|
|
|
|
-
|
|
|
|
|
|
+
|
|
public NativeAzureFileSystem() {
|
|
public NativeAzureFileSystem() {
|
|
// set store in initialize()
|
|
// set store in initialize()
|
|
}
|
|
}
|
|
@@ -1277,31 +1237,6 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
// Initialize thread counts from user configuration
|
|
// Initialize thread counts from user configuration
|
|
deleteThreadCount = conf.getInt(AZURE_DELETE_THREADS, DEFAULT_AZURE_DELETE_THREADS);
|
|
deleteThreadCount = conf.getInt(AZURE_DELETE_THREADS, DEFAULT_AZURE_DELETE_THREADS);
|
|
renameThreadCount = conf.getInt(AZURE_RENAME_THREADS, DEFAULT_AZURE_RENAME_THREADS);
|
|
renameThreadCount = conf.getInt(AZURE_RENAME_THREADS, DEFAULT_AZURE_RENAME_THREADS);
|
|
-
|
|
|
|
- this.azureAuthorization = conf.getBoolean(KEY_AZURE_AUTHORIZATION,
|
|
|
|
- DEFAULT_AZURE_AUTHORIZATION);
|
|
|
|
- this.kerberosSupportEnabled = conf.getBoolean(
|
|
|
|
- Constants.AZURE_KERBEROS_SUPPORT_PROPERTY_NAME, false);
|
|
|
|
-
|
|
|
|
- if (this.azureAuthorization) {
|
|
|
|
-
|
|
|
|
- this.authorizer =
|
|
|
|
- new RemoteWasbAuthorizerImpl();
|
|
|
|
- authorizer.init(conf);
|
|
|
|
- }
|
|
|
|
- if (UserGroupInformation.isSecurityEnabled() && kerberosSupportEnabled) {
|
|
|
|
- DelegationTokenAuthenticator authenticator = new KerberosDelegationTokenAuthenticator();
|
|
|
|
- authURL = new DelegationTokenAuthenticatedURL(authenticator);
|
|
|
|
- credServiceUrl = conf.get(Constants.KEY_CRED_SERVICE_URL, String
|
|
|
|
- .format("http://%s:%s",
|
|
|
|
- InetAddress.getLocalHost().getCanonicalHostName(),
|
|
|
|
- Constants.DEFAULT_CRED_SERVICE_PORT));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @VisibleForTesting
|
|
|
|
- public void updateWasbAuthorizer(WasbAuthorizerInterface authorizer) {
|
|
|
|
- this.authorizer = authorizer;
|
|
|
|
}
|
|
}
|
|
|
|
|
|
private NativeFileSystemStore createDefaultStore(Configuration conf) {
|
|
private NativeFileSystemStore createDefaultStore(Configuration conf) {
|
|
@@ -1415,15 +1350,6 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
return store;
|
|
return store;
|
|
}
|
|
}
|
|
|
|
|
|
- private void performAuthCheck(String path, String accessType,
|
|
|
|
- String operation) throws WasbAuthorizationException, IOException {
|
|
|
|
-
|
|
|
|
- if (azureAuthorization && !this.authorizer.authorize(path, accessType)) {
|
|
|
|
- throw new WasbAuthorizationException(operation
|
|
|
|
- + " operation for Path : " + path + " not allowed");
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Gets the metrics source for this file system.
|
|
* Gets the metrics source for this file system.
|
|
* This is mainly here for unit testing purposes.
|
|
* This is mainly here for unit testing purposes.
|
|
@@ -1446,10 +1372,6 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
LOG.debug("Opening file: {} for append", f);
|
|
LOG.debug("Opening file: {} for append", f);
|
|
|
|
|
|
Path absolutePath = makeAbsolute(f);
|
|
Path absolutePath = makeAbsolute(f);
|
|
-
|
|
|
|
- performAuthCheck(absolutePath.toString(),
|
|
|
|
- WasbAuthorizationOperations.WRITE.toString(), "append");
|
|
|
|
-
|
|
|
|
String key = pathToKey(absolutePath);
|
|
String key = pathToKey(absolutePath);
|
|
FileMetadata meta = null;
|
|
FileMetadata meta = null;
|
|
try {
|
|
try {
|
|
@@ -1650,10 +1572,6 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
}
|
|
}
|
|
|
|
|
|
Path absolutePath = makeAbsolute(f);
|
|
Path absolutePath = makeAbsolute(f);
|
|
-
|
|
|
|
- performAuthCheck(absolutePath.toString(),
|
|
|
|
- WasbAuthorizationOperations.WRITE.toString(), "create");
|
|
|
|
-
|
|
|
|
String key = pathToKey(absolutePath);
|
|
String key = pathToKey(absolutePath);
|
|
|
|
|
|
FileMetadata existingMetadata = store.retrieveMetadata(key);
|
|
FileMetadata existingMetadata = store.retrieveMetadata(key);
|
|
@@ -1776,10 +1694,6 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
LOG.debug("Deleting file: {}", f.toString());
|
|
LOG.debug("Deleting file: {}", f.toString());
|
|
|
|
|
|
Path absolutePath = makeAbsolute(f);
|
|
Path absolutePath = makeAbsolute(f);
|
|
-
|
|
|
|
- performAuthCheck(absolutePath.toString(),
|
|
|
|
- WasbAuthorizationOperations.EXECUTE.toString(), "delete");
|
|
|
|
-
|
|
|
|
String key = pathToKey(absolutePath);
|
|
String key = pathToKey(absolutePath);
|
|
|
|
|
|
// Capture the metadata for the path.
|
|
// Capture the metadata for the path.
|
|
@@ -2050,10 +1964,6 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
|
|
|
|
// Capture the absolute path and the path to key.
|
|
// Capture the absolute path and the path to key.
|
|
Path absolutePath = makeAbsolute(f);
|
|
Path absolutePath = makeAbsolute(f);
|
|
-
|
|
|
|
- performAuthCheck(absolutePath.toString(),
|
|
|
|
- WasbAuthorizationOperations.EXECUTE.toString(), "getFileStatus");
|
|
|
|
-
|
|
|
|
String key = pathToKey(absolutePath);
|
|
String key = pathToKey(absolutePath);
|
|
if (key.length() == 0) { // root always exists
|
|
if (key.length() == 0) { // root always exists
|
|
return newDirectory(null, absolutePath);
|
|
return newDirectory(null, absolutePath);
|
|
@@ -2152,10 +2062,6 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
LOG.debug("Listing status for {}", f.toString());
|
|
LOG.debug("Listing status for {}", f.toString());
|
|
|
|
|
|
Path absolutePath = makeAbsolute(f);
|
|
Path absolutePath = makeAbsolute(f);
|
|
-
|
|
|
|
- performAuthCheck(absolutePath.toString(),
|
|
|
|
- WasbAuthorizationOperations.EXECUTE.toString(), "list");
|
|
|
|
-
|
|
|
|
String key = pathToKey(absolutePath);
|
|
String key = pathToKey(absolutePath);
|
|
Set<FileStatus> status = new TreeSet<FileStatus>();
|
|
Set<FileStatus> status = new TreeSet<FileStatus>();
|
|
FileMetadata meta = null;
|
|
FileMetadata meta = null;
|
|
@@ -2378,10 +2284,6 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
}
|
|
}
|
|
|
|
|
|
Path absolutePath = makeAbsolute(f);
|
|
Path absolutePath = makeAbsolute(f);
|
|
-
|
|
|
|
- performAuthCheck(absolutePath.toString(),
|
|
|
|
- WasbAuthorizationOperations.EXECUTE.toString(), "mkdirs");
|
|
|
|
-
|
|
|
|
PermissionStatus permissionStatus = null;
|
|
PermissionStatus permissionStatus = null;
|
|
if(noUmask) {
|
|
if(noUmask) {
|
|
// ensure owner still has wx permissions at the minimum
|
|
// ensure owner still has wx permissions at the minimum
|
|
@@ -2435,10 +2337,6 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
LOG.debug("Opening file: {}", f.toString());
|
|
LOG.debug("Opening file: {}", f.toString());
|
|
|
|
|
|
Path absolutePath = makeAbsolute(f);
|
|
Path absolutePath = makeAbsolute(f);
|
|
-
|
|
|
|
- performAuthCheck(absolutePath.toString(),
|
|
|
|
- WasbAuthorizationOperations.READ.toString(), "read");
|
|
|
|
-
|
|
|
|
String key = pathToKey(absolutePath);
|
|
String key = pathToKey(absolutePath);
|
|
FileMetadata meta = null;
|
|
FileMetadata meta = null;
|
|
try {
|
|
try {
|
|
@@ -2495,12 +2393,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
+ " through WASB that has colons in the name");
|
|
+ " through WASB that has colons in the name");
|
|
}
|
|
}
|
|
|
|
|
|
- Path absolutePath = makeAbsolute(src);
|
|
|
|
-
|
|
|
|
- performAuthCheck(absolutePath.toString(),
|
|
|
|
- WasbAuthorizationOperations.EXECUTE.toString(), "rename");
|
|
|
|
-
|
|
|
|
- String srcKey = pathToKey(absolutePath);
|
|
|
|
|
|
+ String srcKey = pathToKey(makeAbsolute(src));
|
|
|
|
|
|
if (srcKey.length() == 0) {
|
|
if (srcKey.length() == 0) {
|
|
// Cannot rename root of file system
|
|
// Cannot rename root of file system
|
|
@@ -2802,10 +2695,6 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
@Override
|
|
@Override
|
|
public void setPermission(Path p, FsPermission permission) throws FileNotFoundException, IOException {
|
|
public void setPermission(Path p, FsPermission permission) throws FileNotFoundException, IOException {
|
|
Path absolutePath = makeAbsolute(p);
|
|
Path absolutePath = makeAbsolute(p);
|
|
-
|
|
|
|
- performAuthCheck(absolutePath.toString(),
|
|
|
|
- WasbAuthorizationOperations.EXECUTE.toString(), "setPermission");
|
|
|
|
-
|
|
|
|
String key = pathToKey(absolutePath);
|
|
String key = pathToKey(absolutePath);
|
|
FileMetadata metadata = null;
|
|
FileMetadata metadata = null;
|
|
try {
|
|
try {
|
|
@@ -2844,10 +2733,6 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
public void setOwner(Path p, String username, String groupname)
|
|
public void setOwner(Path p, String username, String groupname)
|
|
throws IOException {
|
|
throws IOException {
|
|
Path absolutePath = makeAbsolute(p);
|
|
Path absolutePath = makeAbsolute(p);
|
|
-
|
|
|
|
- performAuthCheck(absolutePath.toString(),
|
|
|
|
- WasbAuthorizationOperations.EXECUTE.toString(), "setOwner");
|
|
|
|
-
|
|
|
|
String key = pathToKey(absolutePath);
|
|
String key = pathToKey(absolutePath);
|
|
FileMetadata metadata = null;
|
|
FileMetadata metadata = null;
|
|
|
|
|
|
@@ -2910,42 +2795,6 @@ public class NativeAzureFileSystem extends FileSystem {
|
|
isClosed = true;
|
|
isClosed = true;
|
|
}
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
|
- public Token<?> getDelegationToken(final String renewer) throws IOException {
|
|
|
|
- if(kerberosSupportEnabled) {
|
|
|
|
- try {
|
|
|
|
- final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
|
|
- UserGroupInformation connectUgi = ugi.getRealUser();
|
|
|
|
- final UserGroupInformation proxyUser = connectUgi;
|
|
|
|
- if (connectUgi == null) {
|
|
|
|
- connectUgi = ugi;
|
|
|
|
- }
|
|
|
|
- if(!connectUgi.hasKerberosCredentials()){
|
|
|
|
- connectUgi = UserGroupInformation.getLoginUser();
|
|
|
|
- }
|
|
|
|
- connectUgi.checkTGTAndReloginFromKeytab();
|
|
|
|
- return connectUgi.doAs(new PrivilegedExceptionAction<Token<?>>() {
|
|
|
|
- @Override
|
|
|
|
- public Token<?> run() throws Exception {
|
|
|
|
- return authURL.getDelegationToken(new URL(credServiceUrl
|
|
|
|
- + Constants.DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT),
|
|
|
|
- authToken, renewer, (proxyUser != null)? ugi.getShortUserName(): null);
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
- } catch (Exception ex) {
|
|
|
|
- LOG.error("Error in fetching the delegation token from remote service",
|
|
|
|
- ex);
|
|
|
|
- if (ex instanceof IOException) {
|
|
|
|
- throw (IOException) ex;
|
|
|
|
- } else {
|
|
|
|
- throw new IOException(ex);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
- return super.getDelegationToken(renewer);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* A handler that defines what to do with blobs whose upload was
|
|
* A handler that defines what to do with blobs whose upload was
|
|
* interrupted.
|
|
* interrupted.
|