|
@@ -68,6 +68,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.LocalFileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
+import org.apache.hadoop.security.ProviderUtils;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
|
|
|
import static org.apache.hadoop.fs.s3a.Constants.*;
|
|
@@ -170,16 +171,16 @@ public class S3AFileSystem extends FileSystem {
|
|
|
bucket = name.getHost();
|
|
|
|
|
|
ClientConfiguration awsConf = new ClientConfiguration();
|
|
|
- awsConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS,
|
|
|
+ awsConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS,
|
|
|
DEFAULT_MAXIMUM_CONNECTIONS));
|
|
|
boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
|
|
|
DEFAULT_SECURE_CONNECTIONS);
|
|
|
awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
|
|
|
- awsConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES,
|
|
|
+ awsConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES,
|
|
|
DEFAULT_MAX_ERROR_RETRIES));
|
|
|
awsConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT,
|
|
|
DEFAULT_ESTABLISH_TIMEOUT));
|
|
|
- awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT,
|
|
|
+ awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT,
|
|
|
DEFAULT_SOCKET_TIMEOUT));
|
|
|
String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
|
|
|
if(!signerOverride.isEmpty()) {
|
|
@@ -321,9 +322,9 @@ public class S3AFileSystem extends FileSystem {
|
|
|
}
|
|
|
|
|
|
private void initMultipartUploads(Configuration conf) {
|
|
|
- boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART,
|
|
|
+ boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART,
|
|
|
DEFAULT_PURGE_EXISTING_MULTIPART);
|
|
|
- long purgeExistingMultipartAge = conf.getLong(PURGE_EXISTING_MULTIPART_AGE,
|
|
|
+ long purgeExistingMultipartAge = conf.getLong(PURGE_EXISTING_MULTIPART_AGE,
|
|
|
DEFAULT_PURGE_EXISTING_MULTIPART_AGE);
|
|
|
|
|
|
if (purgeExistingMultipart) {
|
|
@@ -355,9 +356,11 @@ public class S3AFileSystem extends FileSystem {
|
|
|
accessKey = userInfo;
|
|
|
}
|
|
|
}
|
|
|
+ Configuration c = ProviderUtils.excludeIncompatibleCredentialProviders(
|
|
|
+ conf, S3AFileSystem.class);
|
|
|
if (accessKey == null) {
|
|
|
try {
|
|
|
- final char[] key = conf.getPassword(ACCESS_KEY);
|
|
|
+ final char[] key = c.getPassword(ACCESS_KEY);
|
|
|
if (key != null) {
|
|
|
accessKey = (new String(key)).trim();
|
|
|
}
|
|
@@ -367,7 +370,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
}
|
|
|
if (secretKey == null) {
|
|
|
try {
|
|
|
- final char[] pass = conf.getPassword(SECRET_KEY);
|
|
|
+ final char[] pass = c.getPassword(SECRET_KEY);
|
|
|
if (pass != null) {
|
|
|
secretKey = (new String(pass)).trim();
|
|
|
}
|
|
@@ -448,7 +451,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
throw new FileNotFoundException("Can't open " + f + " because it is a directory");
|
|
|
}
|
|
|
|
|
|
- return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f),
|
|
|
+ return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f),
|
|
|
fileStatus.getLen(), s3, statistics));
|
|
|
}
|
|
|
|
|
@@ -483,7 +486,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
}
|
|
|
// We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file
|
|
|
return new FSDataOutputStream(new S3AOutputStream(getConf(), transfers, this,
|
|
|
- bucket, key, progress, cannedACL, statistics,
|
|
|
+ bucket, key, progress, cannedACL, statistics,
|
|
|
serverSideEncryptionAlgorithm), null);
|
|
|
}
|
|
|
|
|
@@ -494,7 +497,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
* @param progress for reporting progress if it is not null.
|
|
|
* @throws IOException indicating that append is not supported.
|
|
|
*/
|
|
|
- public FSDataOutputStream append(Path f, int bufferSize,
|
|
|
+ public FSDataOutputStream append(Path f, int bufferSize,
|
|
|
Progressable progress) throws IOException {
|
|
|
throw new IOException("Not supported");
|
|
|
}
|
|
@@ -504,8 +507,8 @@ public class S3AFileSystem extends FileSystem {
|
|
|
* Renames Path src to Path dst. Can take place on local fs
|
|
|
* or remote DFS.
|
|
|
*
|
|
|
- * Warning: S3 does not support renames. This method does a copy which can
|
|
|
- * take S3 some time to execute with large files and directories. Since
|
|
|
+ * Warning: S3 does not support renames. This method does a copy which can
|
|
|
+ * take S3 some time to execute with large files and directories. Since
|
|
|
* there is no Progressable passed in, this can time out jobs.
|
|
|
*
|
|
|
* Note: This implementation differs with other S3 drivers. Specifically:
|
|
@@ -618,7 +621,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- List<DeleteObjectsRequest.KeyVersion> keysToDelete =
|
|
|
+ List<DeleteObjectsRequest.KeyVersion> keysToDelete =
|
|
|
new ArrayList<>();
|
|
|
if (dstStatus != null && dstStatus.isEmptyDirectory()) {
|
|
|
// delete unnecessary fake directory.
|
|
@@ -724,7 +727,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
}
|
|
|
|
|
|
if (!recursive && !status.isEmptyDirectory()) {
|
|
|
- throw new IOException("Path is a folder: " + f +
|
|
|
+ throw new IOException("Path is a folder: " + f +
|
|
|
" and it is not an empty directory");
|
|
|
}
|
|
|
|
|
@@ -755,7 +758,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
//request.setDelimiter("/");
|
|
|
request.setMaxKeys(maxKeys);
|
|
|
|
|
|
- List<DeleteObjectsRequest.KeyVersion> keys =
|
|
|
+ List<DeleteObjectsRequest.KeyVersion> keys =
|
|
|
new ArrayList<>();
|
|
|
ObjectListing objects = s3.listObjects(request);
|
|
|
statistics.incrementReadOps(1);
|
|
@@ -859,7 +862,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
LOG.debug("Adding: fd: " + keyPath);
|
|
|
}
|
|
|
} else {
|
|
|
- result.add(new S3AFileStatus(summary.getSize(),
|
|
|
+ result.add(new S3AFileStatus(summary.getSize(),
|
|
|
dateToLong(summary.getLastModified()), keyPath,
|
|
|
getDefaultBlockSize(f.makeQualified(uri, workingDir))));
|
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -927,7 +930,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
* @param f path to create
|
|
|
* @param permission to apply to f
|
|
|
*/
|
|
|
- // TODO: If we have created an empty file at /foo/bar and we then call
|
|
|
+ // TODO: If we have created an empty file at /foo/bar and we then call
|
|
|
// mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/?
|
|
|
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -950,7 +953,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
FileStatus fileStatus = getFileStatus(fPart);
|
|
|
if (fileStatus.isFile()) {
|
|
|
throw new FileAlreadyExistsException(String.format(
|
|
|
- "Can't make directory for path '%s' since it is a file.",
|
|
|
+ "Can't make directory for path '%s' since it is a file.",
|
|
|
fPart));
|
|
|
}
|
|
|
} catch (FileNotFoundException fnfe) {
|
|
@@ -1056,9 +1059,9 @@ public class S3AFileSystem extends FileSystem {
|
|
|
if (!objects.getCommonPrefixes().isEmpty()
|
|
|
|| objects.getObjectSummaries().size() > 0) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Found path as directory (with /): " +
|
|
|
- objects.getCommonPrefixes().size() + "/" +
|
|
|
- objects.getObjectSummaries().size());
|
|
|
+ LOG.debug("Found path as directory (with /): " +
|
|
|
+ objects.getCommonPrefixes().size() + "/" +
|
|
|
+ objects.getObjectSummaries().size());
|
|
|
|
|
|
for (S3ObjectSummary summary : objects.getObjectSummaries()) {
|
|
|
LOG.debug("Summary: " + summary.getKey() + " " + summary.getSize());
|
|
@@ -1104,7 +1107,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
* @param dst path
|
|
|
*/
|
|
|
@Override
|
|
|
- public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
|
|
|
+ public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
|
|
|
Path dst) throws IOException {
|
|
|
String key = pathToKey(dst);
|
|
|
|