|
@@ -65,6 +65,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.LocalFileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
+import org.apache.hadoop.security.ProviderUtils;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
|
|
|
import static org.apache.hadoop.fs.s3a.Constants.*;
|
|
@@ -118,16 +119,16 @@ public class S3AFileSystem extends FileSystem {
|
|
|
bucket = name.getHost();
|
|
|
|
|
|
ClientConfiguration awsConf = new ClientConfiguration();
|
|
|
- awsConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS,
|
|
|
+ awsConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS,
|
|
|
DEFAULT_MAXIMUM_CONNECTIONS));
|
|
|
boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
|
|
|
DEFAULT_SECURE_CONNECTIONS);
|
|
|
awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
|
|
|
- awsConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES,
|
|
|
+ awsConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES,
|
|
|
DEFAULT_MAX_ERROR_RETRIES));
|
|
|
awsConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT,
|
|
|
DEFAULT_ESTABLISH_TIMEOUT));
|
|
|
- awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT,
|
|
|
+ awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT,
|
|
|
DEFAULT_SOCKET_TIMEOUT));
|
|
|
String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
|
|
|
if(!signerOverride.isEmpty()) {
|
|
@@ -263,9 +264,9 @@ public class S3AFileSystem extends FileSystem {
|
|
|
}
|
|
|
|
|
|
private void initMultipartUploads(Configuration conf) {
|
|
|
- boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART,
|
|
|
+ boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART,
|
|
|
DEFAULT_PURGE_EXISTING_MULTIPART);
|
|
|
- long purgeExistingMultipartAge = conf.getLong(PURGE_EXISTING_MULTIPART_AGE,
|
|
|
+ long purgeExistingMultipartAge = conf.getLong(PURGE_EXISTING_MULTIPART_AGE,
|
|
|
DEFAULT_PURGE_EXISTING_MULTIPART_AGE);
|
|
|
|
|
|
if (purgeExistingMultipart) {
|
|
@@ -297,9 +298,11 @@ public class S3AFileSystem extends FileSystem {
|
|
|
accessKey = userInfo;
|
|
|
}
|
|
|
}
|
|
|
+ Configuration c = ProviderUtils.excludeIncompatibleCredentialProviders(
|
|
|
+ conf, S3AFileSystem.class);
|
|
|
if (accessKey == null) {
|
|
|
try {
|
|
|
- final char[] key = conf.getPassword(ACCESS_KEY);
|
|
|
+ final char[] key = c.getPassword(ACCESS_KEY);
|
|
|
if (key != null) {
|
|
|
accessKey = (new String(key)).trim();
|
|
|
}
|
|
@@ -309,7 +312,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
}
|
|
|
if (secretKey == null) {
|
|
|
try {
|
|
|
- final char[] pass = conf.getPassword(SECRET_KEY);
|
|
|
+ final char[] pass = c.getPassword(SECRET_KEY);
|
|
|
if (pass != null) {
|
|
|
secretKey = (new String(pass)).trim();
|
|
|
}
|
|
@@ -390,7 +393,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
throw new FileNotFoundException("Can't open " + f + " because it is a directory");
|
|
|
}
|
|
|
|
|
|
- return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f),
|
|
|
+ return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f),
|
|
|
fileStatus.getLen(), s3, statistics));
|
|
|
}
|
|
|
|
|
@@ -425,7 +428,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
}
|
|
|
// We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file
|
|
|
return new FSDataOutputStream(new S3AOutputStream(getConf(), transfers, this,
|
|
|
- bucket, key, progress, cannedACL, statistics,
|
|
|
+ bucket, key, progress, cannedACL, statistics,
|
|
|
serverSideEncryptionAlgorithm), null);
|
|
|
}
|
|
|
|
|
@@ -436,7 +439,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
* @param progress for reporting progress if it is not null.
|
|
|
* @throws IOException indicating that append is not supported.
|
|
|
*/
|
|
|
- public FSDataOutputStream append(Path f, int bufferSize,
|
|
|
+ public FSDataOutputStream append(Path f, int bufferSize,
|
|
|
Progressable progress) throws IOException {
|
|
|
throw new IOException("Not supported");
|
|
|
}
|
|
@@ -446,8 +449,8 @@ public class S3AFileSystem extends FileSystem {
|
|
|
* Renames Path src to Path dst. Can take place on local fs
|
|
|
* or remote DFS.
|
|
|
*
|
|
|
- * Warning: S3 does not support renames. This method does a copy which can
|
|
|
- * take S3 some time to execute with large files and directories. Since
|
|
|
+ * Warning: S3 does not support renames. This method does a copy which can
|
|
|
+ * take S3 some time to execute with large files and directories. Since
|
|
|
* there is no Progressable passed in, this can time out jobs.
|
|
|
*
|
|
|
* Note: This implementation differs with other S3 drivers. Specifically:
|
|
@@ -560,7 +563,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- List<DeleteObjectsRequest.KeyVersion> keysToDelete =
|
|
|
+ List<DeleteObjectsRequest.KeyVersion> keysToDelete =
|
|
|
new ArrayList<>();
|
|
|
if (dstStatus != null && dstStatus.isEmptyDirectory()) {
|
|
|
// delete unnecessary fake directory.
|
|
@@ -666,7 +669,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
}
|
|
|
|
|
|
if (!recursive && !status.isEmptyDirectory()) {
|
|
|
- throw new IOException("Path is a folder: " + f +
|
|
|
+ throw new IOException("Path is a folder: " + f +
|
|
|
" and it is not an empty directory");
|
|
|
}
|
|
|
|
|
@@ -697,7 +700,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
//request.setDelimiter("/");
|
|
|
request.setMaxKeys(maxKeys);
|
|
|
|
|
|
- List<DeleteObjectsRequest.KeyVersion> keys =
|
|
|
+ List<DeleteObjectsRequest.KeyVersion> keys =
|
|
|
new ArrayList<>();
|
|
|
ObjectListing objects = s3.listObjects(request);
|
|
|
statistics.incrementReadOps(1);
|
|
@@ -801,7 +804,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
LOG.debug("Adding: fd: " + keyPath);
|
|
|
}
|
|
|
} else {
|
|
|
- result.add(new S3AFileStatus(summary.getSize(),
|
|
|
+ result.add(new S3AFileStatus(summary.getSize(),
|
|
|
dateToLong(summary.getLastModified()), keyPath,
|
|
|
getDefaultBlockSize(f.makeQualified(uri, workingDir))));
|
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -869,7 +872,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
* @param f path to create
|
|
|
* @param permission to apply to f
|
|
|
*/
|
|
|
- // TODO: If we have created an empty file at /foo/bar and we then call
|
|
|
+ // TODO: If we have created an empty file at /foo/bar and we then call
|
|
|
// mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/?
|
|
|
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -892,7 +895,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
FileStatus fileStatus = getFileStatus(fPart);
|
|
|
if (fileStatus.isFile()) {
|
|
|
throw new FileAlreadyExistsException(String.format(
|
|
|
- "Can't make directory for path '%s' since it is a file.",
|
|
|
+ "Can't make directory for path '%s' since it is a file.",
|
|
|
fPart));
|
|
|
}
|
|
|
} catch (FileNotFoundException fnfe) {
|
|
@@ -998,9 +1001,9 @@ public class S3AFileSystem extends FileSystem {
|
|
|
if (!objects.getCommonPrefixes().isEmpty()
|
|
|
|| objects.getObjectSummaries().size() > 0) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Found path as directory (with /): " +
|
|
|
- objects.getCommonPrefixes().size() + "/" +
|
|
|
- objects.getObjectSummaries().size());
|
|
|
+ LOG.debug("Found path as directory (with /): " +
|
|
|
+ objects.getCommonPrefixes().size() + "/" +
|
|
|
+ objects.getObjectSummaries().size());
|
|
|
|
|
|
for (S3ObjectSummary summary : objects.getObjectSummaries()) {
|
|
|
LOG.debug("Summary: " + summary.getKey() + " " + summary.getSize());
|
|
@@ -1046,7 +1049,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
* @param dst path
|
|
|
*/
|
|
|
@Override
|
|
|
- public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
|
|
|
+ public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
|
|
|
Path dst) throws IOException {
|
|
|
String key = pathToKey(dst);
|
|
|
|