|
@@ -95,6 +95,7 @@ import org.apache.hadoop.fs.CreateFlag;
|
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.Globber;
|
|
|
+import org.apache.hadoop.fs.impl.OpenFileParameters;
|
|
|
import org.apache.hadoop.fs.s3a.auth.SignerManager;
|
|
|
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationOperations;
|
|
|
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider;
|
|
@@ -978,27 +979,30 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
@Retries.RetryTranslated
|
|
|
public FSDataInputStream open(Path f, int bufferSize)
|
|
|
throws IOException {
|
|
|
- return open(f, Optional.empty());
|
|
|
+ return open(f, Optional.empty(), Optional.empty());
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Opens an FSDataInputStream at the indicated Path.
|
|
|
- * @param path the file to open
|
|
|
+ * if status contains an S3AFileStatus reference, it is used
|
|
|
+ * and so a HEAD request to the store is avoided.
|
|
|
+ *
|
|
|
+ * @param file the file to open
|
|
|
* @param options configuration options if opened with the builder API.
|
|
|
+ * @param providedStatus optional file status.
|
|
|
* @throws IOException IO failure.
|
|
|
*/
|
|
|
@Retries.RetryTranslated
|
|
|
private FSDataInputStream open(
|
|
|
- final Path path,
|
|
|
- final Optional<Configuration> options)
|
|
|
+ final Path file,
|
|
|
+ final Optional<Configuration> options,
|
|
|
+ final Optional<S3AFileStatus> providedStatus)
|
|
|
throws IOException {
|
|
|
|
|
|
entryPoint(INVOCATION_OPEN);
|
|
|
- final S3AFileStatus fileStatus = (S3AFileStatus) getFileStatus(path);
|
|
|
- if (fileStatus.isDirectory()) {
|
|
|
- throw new FileNotFoundException("Can't open " + path
|
|
|
- + " because it is a directory");
|
|
|
- }
|
|
|
+ final Path path = qualify(file);
|
|
|
+ S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path,
|
|
|
+ providedStatus);
|
|
|
|
|
|
S3AReadOpContext readContext;
|
|
|
if (options.isPresent()) {
|
|
@@ -4303,22 +4307,21 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
* @param source path to source data
|
|
|
* @param expression select expression
|
|
|
* @param options request configuration from the builder.
|
|
|
+ * @param providedStatus any passed in status
|
|
|
* @return the stream of the results
|
|
|
* @throws IOException IO failure
|
|
|
*/
|
|
|
@Retries.RetryTranslated
|
|
|
private FSDataInputStream select(final Path source,
|
|
|
final String expression,
|
|
|
- final Configuration options)
|
|
|
+ final Configuration options,
|
|
|
+ final Optional<S3AFileStatus> providedStatus)
|
|
|
throws IOException {
|
|
|
entryPoint(OBJECT_SELECT_REQUESTS);
|
|
|
requireSelectSupport(source);
|
|
|
final Path path = makeQualified(source);
|
|
|
- // call getFileStatus(), which will look at S3Guard first,
|
|
|
- // so the operation will fail if it is not there or S3Guard believes it has
|
|
|
- // been deleted.
|
|
|
- // validation of the file status are delegated to the binding.
|
|
|
- final S3AFileStatus fileStatus = (S3AFileStatus) getFileStatus(path);
|
|
|
+ final S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path,
|
|
|
+ providedStatus);
|
|
|
|
|
|
// readahead range can be dynamically set
|
|
|
long ra = options.getLong(READAHEAD_RANGE, readAhead);
|
|
@@ -4326,10 +4329,16 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
S3AReadOpContext readContext = createReadContext(fileStatus, inputPolicy,
|
|
|
changeDetectionPolicy, ra);
|
|
|
|
|
|
- if (!fileStatus.isDirectory()) {
|
|
|
+ if (changeDetectionPolicy.getSource() != ChangeDetectionPolicy.Source.None
|
|
|
+ && fileStatus.getETag() != null) {
|
|
|
+ // if there is change detection, and the status includes at least an
|
|
|
+ // etag,
|
|
|
// check that the object metadata lines up with what is expected
|
|
|
// based on the object attributes (which may contain an eTag or
|
|
|
- // versionId) from S3Guard
|
|
|
+ // versionId).
|
|
|
+ // This is because the select API doesn't offer this.
|
|
|
+ // (note: this is trouble for version checking as cannot force the old
|
|
|
+ // version in the final read; nor can we check the etag match)
|
|
|
ChangeTracker changeTracker =
|
|
|
new ChangeTracker(uri.toString(),
|
|
|
changeDetectionPolicy,
|
|
@@ -4364,12 +4373,42 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Extract the status from the optional parameter, querying
|
|
|
+ * S3Guard/s3 if it is absent.
|
|
|
+ * @param path path of the status
|
|
|
+ * @param optStatus optional status
|
|
|
+ * @return a file status
|
|
|
+ * @throws FileNotFoundException if there is no normal file at that path
|
|
|
+ * @throws IOException IO failure
|
|
|
+ */
|
|
|
+ private S3AFileStatus extractOrFetchSimpleFileStatus(
|
|
|
+ final Path path, final Optional<S3AFileStatus> optStatus)
|
|
|
+ throws IOException {
|
|
|
+ S3AFileStatus fileStatus;
|
|
|
+ if (optStatus.isPresent()) {
|
|
|
+ fileStatus = optStatus.get();
|
|
|
+ } else {
|
|
|
+ // this looks at S3guard and gets any type of status back,
|
|
|
+ // if it falls back to S3 it does a HEAD only.
|
|
|
+ // therefore: if there is no S3Guard and there is a dir, this
|
|
|
+ // will raise a FileNotFoundException
|
|
|
+ fileStatus = innerGetFileStatus(path, false,
|
|
|
+ StatusProbeEnum.HEAD_ONLY);
|
|
|
+ }
|
|
|
+ // we check here for the passed in status or the S3Guard value
|
|
|
+ // for being a directory
|
|
|
+ if (fileStatus.isDirectory()) {
|
|
|
+ throw new FileNotFoundException(path.toString() + " is a directory");
|
|
|
+ }
|
|
|
+ return fileStatus;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Initiate the open or select operation.
|
|
|
* This is invoked from both the FileSystem and FileContext APIs
|
|
|
- * @param path path to the file
|
|
|
- * @param mandatoryKeys set of options declared as mandatory.
|
|
|
- * @param options options set during the build sequence.
|
|
|
+ * @param rawPath path to the file
|
|
|
+ * @param parameters open file parameters from the builder.
|
|
|
* @return a future which will evaluate to the opened/selected file.
|
|
|
* @throws IOException failure to resolve the link.
|
|
|
* @throws PathIOException operation is a select request but S3 select is
|
|
@@ -4379,10 +4418,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
@Override
|
|
|
@Retries.RetryTranslated
|
|
|
public CompletableFuture<FSDataInputStream> openFileWithOptions(
|
|
|
- final Path path,
|
|
|
- final Set<String> mandatoryKeys,
|
|
|
- final Configuration options,
|
|
|
- final int bufferSize) throws IOException {
|
|
|
+ final Path rawPath,
|
|
|
+ final OpenFileParameters parameters) throws IOException {
|
|
|
+ final Path path = qualify(rawPath);
|
|
|
+ Configuration options = parameters.getOptions();
|
|
|
+ Set<String> mandatoryKeys = parameters.getMandatoryKeys();
|
|
|
String sql = options.get(SelectConstants.SELECT_SQL, null);
|
|
|
boolean isSelect = sql != null;
|
|
|
// choice of keys depends on open type
|
|
@@ -4397,20 +4437,46 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
InternalConstants.STANDARD_OPENFILE_KEYS,
|
|
|
"for " + path + " in non-select file I/O");
|
|
|
}
|
|
|
+ FileStatus providedStatus = parameters.getStatus();
|
|
|
+ S3AFileStatus fileStatus;
|
|
|
+ if (providedStatus != null) {
|
|
|
+ Preconditions.checkArgument(path.equals(providedStatus.getPath()),
|
|
|
+ "FileStatus parameter is not for the path %s: %s",
|
|
|
+ path, providedStatus);
|
|
|
+ if (providedStatus instanceof S3AFileStatus) {
|
|
|
+ // can use this status to skip our own probes,
|
|
|
+ // including etag and version.
|
|
|
+ LOG.debug("File was opened with a supplied S3AFileStatus;"
|
|
|
+ + " skipping getFileStatus call in open() operation: {}",
|
|
|
+ providedStatus);
|
|
|
+ fileStatus = (S3AFileStatus) providedStatus;
|
|
|
+ } else if (providedStatus instanceof S3ALocatedFileStatus) {
|
|
|
+ LOG.debug("File was opened with a supplied S3ALocatedFileStatus;"
|
|
|
+ + " skipping getFileStatus call in open() operation: {}",
|
|
|
+ providedStatus);
|
|
|
+ fileStatus = ((S3ALocatedFileStatus) providedStatus).toS3AFileStatus();
|
|
|
+ } else {
|
|
|
+ LOG.debug("Ignoring file status {}", providedStatus);
|
|
|
+ fileStatus = null;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ fileStatus = null;
|
|
|
+ }
|
|
|
+ Optional<S3AFileStatus> ost = Optional.ofNullable(fileStatus);
|
|
|
CompletableFuture<FSDataInputStream> result = new CompletableFuture<>();
|
|
|
if (!isSelect) {
|
|
|
// normal path.
|
|
|
unboundedThreadPool.submit(() ->
|
|
|
LambdaUtils.eval(result,
|
|
|
- () -> open(path, Optional.of(options))));
|
|
|
+ () -> open(path, Optional.of(options), ost)));
|
|
|
} else {
|
|
|
// it is a select statement.
|
|
|
- // fail fast if the method is not present
|
|
|
+ // fail fast if the operation is not available
|
|
|
requireSelectSupport(path);
|
|
|
// submit the query
|
|
|
unboundedThreadPool.submit(() ->
|
|
|
LambdaUtils.eval(result,
|
|
|
- () -> select(path, sql, options)));
|
|
|
+ () -> select(path, sql, options, ost)));
|
|
|
}
|
|
|
return result;
|
|
|
}
|