|
@@ -90,6 +90,7 @@ import org.apache.hadoop.fs.CreateFlag;
|
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.Globber;
|
|
|
+import org.apache.hadoop.fs.Options;
|
|
|
import org.apache.hadoop.fs.impl.OpenFileParameters;
|
|
|
import org.apache.hadoop.fs.permission.FsAction;
|
|
|
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
|
|
@@ -109,6 +110,7 @@ import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
|
|
|
import org.apache.hadoop.fs.s3a.impl.InternalConstants;
|
|
|
import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
|
|
|
import org.apache.hadoop.fs.s3a.impl.MkdirOperation;
|
|
|
+import org.apache.hadoop.fs.s3a.impl.OpenFileSupport;
|
|
|
import org.apache.hadoop.fs.s3a.impl.OperationCallbacks;
|
|
|
import org.apache.hadoop.fs.s3a.impl.RenameOperation;
|
|
|
import org.apache.hadoop.fs.s3a.impl.RequestFactoryImpl;
|
|
@@ -116,7 +118,6 @@ import org.apache.hadoop.fs.s3a.impl.S3AMultipartUploaderBuilder;
|
|
|
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
|
|
|
import org.apache.hadoop.fs.s3a.impl.StoreContext;
|
|
|
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
|
|
|
-import org.apache.hadoop.fs.s3a.select.InternalSelectConstants;
|
|
|
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations;
|
|
|
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
|
|
|
import org.apache.hadoop.fs.statistics.DurationTracker;
|
|
@@ -169,6 +170,7 @@ import org.apache.hadoop.fs.s3a.select.SelectConstants;
|
|
|
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
|
|
|
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
|
|
|
import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
|
|
|
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
|
|
|
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
|
|
|
import org.apache.hadoop.fs.s3a.statistics.impl.BondedS3AStatisticsContext;
|
|
|
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
|
|
@@ -187,7 +189,8 @@ import org.apache.hadoop.util.functional.CallableRaisingIOE;
|
|
|
import static java.util.Objects.requireNonNull;
|
|
|
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL;
|
|
|
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT;
|
|
|
-import static org.apache.hadoop.fs.impl.AbstractFSBuilderImpl.rejectUnknownMandatoryKeys;
|
|
|
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
|
|
|
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
|
|
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
|
|
import static org.apache.hadoop.fs.s3a.Constants.*;
|
|
|
import static org.apache.hadoop.fs.s3a.Invoker.*;
|
|
@@ -298,7 +301,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
/** Storage Statistics Bonded to the instrumentation. */
|
|
|
private S3AStorageStatistics storageStatistics;
|
|
|
|
|
|
- private long readAhead;
|
|
|
+ /**
|
|
|
+ * Default input policy; may be overridden in
|
|
|
+ * {@code openFile()}.
|
|
|
+ */
|
|
|
private S3AInputPolicy inputPolicy;
|
|
|
private ChangeDetectionPolicy changeDetectionPolicy;
|
|
|
private final AtomicBoolean closed = new AtomicBoolean(false);
|
|
@@ -327,6 +333,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
|
|
|
private final ListingOperationCallbacks listingOperationCallbacks =
|
|
|
new ListingOperationCallbacksImpl();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Helper for the openFile() method.
|
|
|
+ */
|
|
|
+ private OpenFileSupport openFileHelper;
|
|
|
+
|
|
|
/**
|
|
|
* Directory policy.
|
|
|
*/
|
|
@@ -465,9 +477,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
longBytesOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
|
|
|
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
|
|
|
|
|
|
- readAhead = longBytesOption(conf, READAHEAD_RANGE,
|
|
|
- DEFAULT_READAHEAD_RANGE, 0);
|
|
|
-
|
|
|
initThreadPools(conf);
|
|
|
|
|
|
int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION);
|
|
@@ -508,7 +517,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
doBucketProbing();
|
|
|
|
|
|
inputPolicy = S3AInputPolicy.getPolicy(
|
|
|
- conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL));
|
|
|
+ conf.getTrimmed(INPUT_FADVISE,
|
|
|
+ Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT),
|
|
|
+ S3AInputPolicy.Normal);
|
|
|
LOG.debug("Input fadvise policy = {}", inputPolicy);
|
|
|
changeDetectionPolicy = ChangeDetectionPolicy.getPolicy(conf);
|
|
|
LOG.debug("Change detection policy = {}", changeDetectionPolicy);
|
|
@@ -555,6 +566,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
checkArgument(pageSize <= InternalConstants.MAX_ENTRIES_TO_DELETE,
|
|
|
"page size out of range: %s", pageSize);
|
|
|
listing = new Listing(listingOperationCallbacks, createStoreContext());
|
|
|
+ // now the open file logic
|
|
|
+ openFileHelper = new OpenFileSupport(
|
|
|
+ changeDetectionPolicy,
|
|
|
+ longBytesOption(conf, READAHEAD_RANGE,
|
|
|
+ DEFAULT_READAHEAD_RANGE, 0),
|
|
|
+ username,
|
|
|
+ intOption(conf, IO_FILE_BUFFER_SIZE_KEY,
|
|
|
+ IO_FILE_BUFFER_SIZE_DEFAULT, 0),
|
|
|
+ longBytesOption(conf, ASYNC_DRAIN_THRESHOLD,
|
|
|
+ DEFAULT_ASYNC_DRAIN_THRESHOLD, 0),
|
|
|
+ inputPolicy);
|
|
|
} catch (AmazonClientException e) {
|
|
|
// amazon client exception: stop all services then throw the translation
|
|
|
cleanupWithLogger(LOG, span);
|
|
@@ -1178,15 +1200,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
return fixBucketRegion(region);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Returns the read ahead range value used by this filesystem.
|
|
|
- * @return the readahead range
|
|
|
- */
|
|
|
- @VisibleForTesting
|
|
|
- long getReadAheadRange() {
|
|
|
- return readAhead;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Get the input policy for this FS instance.
|
|
|
* @return the input policy
|
|
@@ -1268,13 +1281,16 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
|
|
|
/**
|
|
|
* Change the input policy for this FS.
|
|
|
+ * This is now a no-op, retained in case some application
|
|
|
+ * or external test invokes it.
|
|
|
+ *
|
|
|
+ * @deprecated use openFile() options
|
|
|
* @param inputPolicy new policy
|
|
|
*/
|
|
|
@InterfaceStability.Unstable
|
|
|
+ @Deprecated
|
|
|
public void setInputPolicy(S3AInputPolicy inputPolicy) {
|
|
|
- Objects.requireNonNull(inputPolicy, "Null inputStrategy");
|
|
|
- LOG.debug("Setting input strategy: {}", inputPolicy);
|
|
|
- this.inputPolicy = inputPolicy;
|
|
|
+ LOG.warn("setInputPolicy is no longer supported");
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1392,64 +1408,46 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
@Retries.RetryTranslated
|
|
|
public FSDataInputStream open(Path f, int bufferSize)
|
|
|
throws IOException {
|
|
|
- return open(f, Optional.empty(), Optional.empty());
|
|
|
+ return executeOpen(qualify(f),
|
|
|
+ openFileHelper.openSimpleFile(bufferSize));
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Opens an FSDataInputStream at the indicated Path.
|
|
|
- * if status contains an S3AFileStatus reference, it is used
|
|
|
- * and so a HEAD request to the store is avoided.
|
|
|
- *
|
|
|
- * @param file the file to open
|
|
|
- * @param options configuration options if opened with the builder API.
|
|
|
- * @param providedStatus optional file status.
|
|
|
+ * The {@code fileInformation} parameter controls how the file
|
|
|
+ * is opened, whether it is normal vs. an S3 select call,
|
|
|
+ * can a HEAD be skipped, etc.
|
|
|
+ * @param path the file to open
|
|
|
+ * @param fileInformation information about the file to open
|
|
|
* @throws IOException IO failure.
|
|
|
*/
|
|
|
- @Retries.RetryTranslated
|
|
|
@AuditEntryPoint
|
|
|
- private FSDataInputStream open(
|
|
|
- final Path file,
|
|
|
- final Optional<Configuration> options,
|
|
|
- final Optional<S3AFileStatus> providedStatus)
|
|
|
+ @Retries.RetryTranslated
|
|
|
+ private FSDataInputStream executeOpen(
|
|
|
+ final Path path,
|
|
|
+ final OpenFileSupport.OpenFileInformation fileInformation)
|
|
|
throws IOException {
|
|
|
-
|
|
|
- final Path path = qualify(file);
|
|
|
+ // create the input stream statistics before opening
|
|
|
+ // the file so that the time to prepare to open the file is included.
|
|
|
+ S3AInputStreamStatistics inputStreamStats =
|
|
|
+ statisticsContext.newInputStreamStatistics();
|
|
|
// this span is passed into the stream.
|
|
|
final AuditSpan auditSpan = entryPoint(INVOCATION_OPEN, path);
|
|
|
- S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path,
|
|
|
- providedStatus);
|
|
|
-
|
|
|
- S3AReadOpContext readContext;
|
|
|
- if (options.isPresent()) {
|
|
|
- Configuration o = options.get();
|
|
|
- // normal path. Open the file with the chosen seek policy, if different
|
|
|
- // from the normal one.
|
|
|
- // and readahead.
|
|
|
- S3AInputPolicy policy = S3AInputPolicy.getPolicy(
|
|
|
- o.get(INPUT_FADVISE, inputPolicy.toString()));
|
|
|
- long readAheadRange2 = o.getLong(READAHEAD_RANGE, readAhead);
|
|
|
- // TODO support change detection policy from options?
|
|
|
- readContext = createReadContext(
|
|
|
- fileStatus,
|
|
|
- policy,
|
|
|
- changeDetectionPolicy,
|
|
|
- readAheadRange2,
|
|
|
- auditSpan);
|
|
|
- } else {
|
|
|
- readContext = createReadContext(
|
|
|
- fileStatus,
|
|
|
- inputPolicy,
|
|
|
- changeDetectionPolicy,
|
|
|
- readAhead,
|
|
|
- auditSpan);
|
|
|
- }
|
|
|
+ final S3AFileStatus fileStatus =
|
|
|
+ trackDuration(inputStreamStats,
|
|
|
+ ACTION_FILE_OPENED.getSymbol(), () ->
|
|
|
+ extractOrFetchSimpleFileStatus(path, fileInformation));
|
|
|
+ S3AReadOpContext readContext = createReadContext(
|
|
|
+ fileStatus,
|
|
|
+ auditSpan);
|
|
|
+ fileInformation.applyOptions(readContext);
|
|
|
LOG.debug("Opening '{}'", readContext);
|
|
|
-
|
|
|
return new FSDataInputStream(
|
|
|
new S3AInputStream(
|
|
|
- readContext,
|
|
|
- createObjectAttributes(fileStatus),
|
|
|
- createInputStreamCallbacks(auditSpan)));
|
|
|
+ readContext.build(),
|
|
|
+ createObjectAttributes(path, fileStatus),
|
|
|
+ createInputStreamCallbacks(auditSpan),
|
|
|
+ inputStreamStats));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1503,34 +1501,40 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
return s3.getObject(request);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public <T> CompletableFuture<T> submit(final CallableRaisingIOE<T> operation) {
|
|
|
+ CompletableFuture<T> result = new CompletableFuture<>();
|
|
|
+ unboundedThreadPool.submit(() ->
|
|
|
+ LambdaUtils.eval(result, () -> {
|
|
|
+ try (AuditSpan span = auditSpan.activate()) {
|
|
|
+ return operation.apply();
|
|
|
+ }
|
|
|
+ }));
|
|
|
+ return result;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Create the read context for reading from the referenced file,
|
|
|
* using FS state as well as the status.
|
|
|
* @param fileStatus file status.
|
|
|
- * @param seekPolicy input policy for this operation
|
|
|
- * @param changePolicy change policy for this operation.
|
|
|
- * @param readAheadRange readahead value.
|
|
|
* @param auditSpan audit span.
|
|
|
* @return a context for read and select operations.
|
|
|
*/
|
|
|
@VisibleForTesting
|
|
|
protected S3AReadOpContext createReadContext(
|
|
|
final FileStatus fileStatus,
|
|
|
- final S3AInputPolicy seekPolicy,
|
|
|
- final ChangeDetectionPolicy changePolicy,
|
|
|
- final long readAheadRange,
|
|
|
final AuditSpan auditSpan) {
|
|
|
- return new S3AReadOpContext(fileStatus.getPath(),
|
|
|
+ final S3AReadOpContext roc = new S3AReadOpContext(
|
|
|
+ fileStatus.getPath(),
|
|
|
invoker,
|
|
|
statistics,
|
|
|
statisticsContext,
|
|
|
- fileStatus,
|
|
|
- seekPolicy,
|
|
|
- changePolicy,
|
|
|
- readAheadRange,
|
|
|
- auditSpan);
|
|
|
+ fileStatus)
|
|
|
+ .withAuditSpan(auditSpan);
|
|
|
+ openFileHelper.applyDefaultOptions(roc);
|
|
|
+ return roc.build();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1558,13 +1562,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
|
|
|
/**
|
|
|
* Create the attributes of an object for subsequent use.
|
|
|
+ * @param path path -this is used over the file status path.
|
|
|
* @param fileStatus file status to build from.
|
|
|
* @return attributes to use when building the query.
|
|
|
*/
|
|
|
private S3ObjectAttributes createObjectAttributes(
|
|
|
+ final Path path,
|
|
|
final S3AFileStatus fileStatus) {
|
|
|
return createObjectAttributes(
|
|
|
- fileStatus.getPath(),
|
|
|
+ path,
|
|
|
fileStatus.getEtag(),
|
|
|
fileStatus.getVersionId(),
|
|
|
fileStatus.getLen());
|
|
@@ -1981,14 +1987,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
@Override
|
|
|
public S3ObjectAttributes createObjectAttributes(
|
|
|
final S3AFileStatus fileStatus) {
|
|
|
- return S3AFileSystem.this.createObjectAttributes(fileStatus);
|
|
|
+ return S3AFileSystem.this.createObjectAttributes(
|
|
|
+ fileStatus.getPath(),
|
|
|
+ fileStatus);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public S3AReadOpContext createReadContext(final FileStatus fileStatus) {
|
|
|
return S3AFileSystem.this.createReadContext(fileStatus,
|
|
|
- inputPolicy,
|
|
|
- changeDetectionPolicy, readAhead,
|
|
|
auditSpan);
|
|
|
}
|
|
|
|
|
@@ -4085,9 +4091,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
/**
|
|
|
* Return the number of bytes that large input files should be optimally
|
|
|
* be split into to minimize I/O time.
|
|
|
- * @deprecated use {@link #getDefaultBlockSize(Path)} instead
|
|
|
*/
|
|
|
- @Deprecated
|
|
|
public long getDefaultBlockSize() {
|
|
|
return getConf().getLongBytes(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE);
|
|
|
}
|
|
@@ -4106,14 +4110,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
"S3AFileSystem{");
|
|
|
sb.append("uri=").append(uri);
|
|
|
sb.append(", workingDir=").append(workingDir);
|
|
|
- sb.append(", inputPolicy=").append(inputPolicy);
|
|
|
sb.append(", partSize=").append(partSize);
|
|
|
sb.append(", enableMultiObjectsDelete=").append(enableMultiObjectsDelete);
|
|
|
sb.append(", maxKeys=").append(maxKeys);
|
|
|
if (cannedACL != null) {
|
|
|
- sb.append(", cannedACL=").append(cannedACL.toString());
|
|
|
+ sb.append(", cannedACL=").append(cannedACL);
|
|
|
+ }
|
|
|
+ if (openFileHelper != null) {
|
|
|
+ sb.append(", ").append(openFileHelper);
|
|
|
}
|
|
|
- sb.append(", readAhead=").append(readAhead);
|
|
|
if (getConf() != null) {
|
|
|
sb.append(", blockSize=").append(getDefaultBlockSize());
|
|
|
}
|
|
@@ -4799,23 +4804,26 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
@Retries.RetryTranslated
|
|
|
@AuditEntryPoint
|
|
|
private FSDataInputStream select(final Path source,
|
|
|
- final String expression,
|
|
|
final Configuration options,
|
|
|
- final Optional<S3AFileStatus> providedStatus)
|
|
|
+ final OpenFileSupport.OpenFileInformation fileInformation)
|
|
|
throws IOException {
|
|
|
- final AuditSpan auditSpan = entryPoint(OBJECT_SELECT_REQUESTS, source);
|
|
|
requireSelectSupport(source);
|
|
|
+ final AuditSpan auditSpan = entryPoint(OBJECT_SELECT_REQUESTS, source);
|
|
|
final Path path = makeQualified(source);
|
|
|
+ String expression = fileInformation.getSql();
|
|
|
final S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path,
|
|
|
- providedStatus);
|
|
|
+ fileInformation);
|
|
|
|
|
|
// readahead range can be dynamically set
|
|
|
- long ra = options.getLong(READAHEAD_RANGE, readAhead);
|
|
|
- S3ObjectAttributes objectAttributes = createObjectAttributes(fileStatus);
|
|
|
- S3AReadOpContext readContext = createReadContext(fileStatus, inputPolicy,
|
|
|
- changeDetectionPolicy, ra, auditSpan);
|
|
|
+ S3ObjectAttributes objectAttributes = createObjectAttributes(
|
|
|
+ path, fileStatus);
|
|
|
+ ChangeDetectionPolicy changePolicy = fileInformation.getChangePolicy();
|
|
|
+ S3AReadOpContext readContext = createReadContext(
|
|
|
+ fileStatus,
|
|
|
+ auditSpan);
|
|
|
+ fileInformation.applyOptions(readContext);
|
|
|
|
|
|
- if (changeDetectionPolicy.getSource() != ChangeDetectionPolicy.Source.None
|
|
|
+ if (changePolicy.getSource() != ChangeDetectionPolicy.Source.None
|
|
|
&& fileStatus.getEtag() != null) {
|
|
|
// if there is change detection, and the status includes at least an
|
|
|
// etag,
|
|
@@ -4827,7 +4835,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
// version in the final read; nor can we check the etag match)
|
|
|
ChangeTracker changeTracker =
|
|
|
new ChangeTracker(uri.toString(),
|
|
|
- changeDetectionPolicy,
|
|
|
+ changePolicy,
|
|
|
readContext.getS3AStatisticsContext()
|
|
|
.newInputStreamStatistics()
|
|
|
.getChangeTrackerStatistics(),
|
|
@@ -4865,38 +4873,36 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Extract the status from the optional parameter, querying
|
|
|
- * S3 if it is absent.
|
|
|
- * @param path path of the status
|
|
|
- * @param optStatus optional status
|
|
|
+ * Get the file status of the source file.
|
|
|
+ * If in the fileInformation parameter return that
|
|
|
+ * if not found, issue a HEAD request, looking for a
|
|
|
+ * file only.
|
|
|
+ * @param path path of the file to open
|
|
|
+ * @param fileInformation information on the file to open
|
|
|
* @return a file status
|
|
|
- * @throws FileNotFoundException if there is no normal file at that path
|
|
|
+ * @throws FileNotFoundException if a HEAD request found no file
|
|
|
* @throws IOException IO failure
|
|
|
*/
|
|
|
private S3AFileStatus extractOrFetchSimpleFileStatus(
|
|
|
- final Path path, final Optional<S3AFileStatus> optStatus)
|
|
|
+ final Path path,
|
|
|
+ final OpenFileSupport.OpenFileInformation fileInformation)
|
|
|
throws IOException {
|
|
|
- S3AFileStatus fileStatus;
|
|
|
- if (optStatus.isPresent()) {
|
|
|
- fileStatus = optStatus.get();
|
|
|
+ S3AFileStatus fileStatus = fileInformation.getStatus();
|
|
|
+ if (fileStatus == null) {
|
|
|
// we check here for the passed in status
|
|
|
// being a directory
|
|
|
- if (fileStatus.isDirectory()) {
|
|
|
- throw new FileNotFoundException(path.toString() + " is a directory");
|
|
|
- }
|
|
|
- } else {
|
|
|
- // Executes a HEAD only.
|
|
|
- // therefore: if there is is a dir marker, this
|
|
|
- // will raise a FileNotFoundException
|
|
|
fileStatus = innerGetFileStatus(path, false,
|
|
|
StatusProbeEnum.HEAD_ONLY);
|
|
|
}
|
|
|
+ if (fileStatus.isDirectory()) {
|
|
|
+ throw new FileNotFoundException(path.toString() + " is a directory");
|
|
|
+ }
|
|
|
|
|
|
return fileStatus;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Initiate the open or select operation.
|
|
|
+ * Initiate the open() or select() operation.
|
|
|
* This is invoked from both the FileSystem and FileContext APIs.
|
|
|
* It's declared as an audit entry point but the span creation is pushed
|
|
|
* down into the open/select methods it ultimately calls.
|
|
@@ -4915,54 +4921,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
final Path rawPath,
|
|
|
final OpenFileParameters parameters) throws IOException {
|
|
|
final Path path = qualify(rawPath);
|
|
|
- Configuration options = parameters.getOptions();
|
|
|
- Set<String> mandatoryKeys = parameters.getMandatoryKeys();
|
|
|
- String sql = options.get(SelectConstants.SELECT_SQL, null);
|
|
|
- boolean isSelect = sql != null;
|
|
|
- // choice of keys depends on open type
|
|
|
- if (isSelect) {
|
|
|
- rejectUnknownMandatoryKeys(
|
|
|
- mandatoryKeys,
|
|
|
- InternalSelectConstants.SELECT_OPTIONS,
|
|
|
- "for " + path + " in S3 Select operation");
|
|
|
- } else {
|
|
|
- rejectUnknownMandatoryKeys(
|
|
|
- mandatoryKeys,
|
|
|
- InternalConstants.STANDARD_OPENFILE_KEYS,
|
|
|
- "for " + path + " in non-select file I/O");
|
|
|
- }
|
|
|
- FileStatus providedStatus = parameters.getStatus();
|
|
|
- S3AFileStatus fileStatus;
|
|
|
- if (providedStatus != null) {
|
|
|
- Preconditions.checkArgument(path.equals(providedStatus.getPath()),
|
|
|
- "FileStatus parameter is not for the path %s: %s",
|
|
|
- path, providedStatus);
|
|
|
- if (providedStatus instanceof S3AFileStatus) {
|
|
|
- // can use this status to skip our own probes,
|
|
|
- // including etag and version.
|
|
|
- LOG.debug("File was opened with a supplied S3AFileStatus;"
|
|
|
- + " skipping getFileStatus call in open() operation: {}",
|
|
|
- providedStatus);
|
|
|
- fileStatus = (S3AFileStatus) providedStatus;
|
|
|
- } else if (providedStatus instanceof S3ALocatedFileStatus) {
|
|
|
- LOG.debug("File was opened with a supplied S3ALocatedFileStatus;"
|
|
|
- + " skipping getFileStatus call in open() operation: {}",
|
|
|
- providedStatus);
|
|
|
- fileStatus = ((S3ALocatedFileStatus) providedStatus).toS3AFileStatus();
|
|
|
- } else {
|
|
|
- LOG.debug("Ignoring file status {}", providedStatus);
|
|
|
- fileStatus = null;
|
|
|
- }
|
|
|
- } else {
|
|
|
- fileStatus = null;
|
|
|
- }
|
|
|
- Optional<S3AFileStatus> ost = Optional.ofNullable(fileStatus);
|
|
|
+ OpenFileSupport.OpenFileInformation fileInformation =
|
|
|
+ openFileHelper.prepareToOpenFile(
|
|
|
+ path,
|
|
|
+ parameters,
|
|
|
+ getDefaultBlockSize());
|
|
|
CompletableFuture<FSDataInputStream> result = new CompletableFuture<>();
|
|
|
- if (!isSelect) {
|
|
|
+ if (!fileInformation.isS3Select()) {
|
|
|
// normal path.
|
|
|
unboundedThreadPool.submit(() ->
|
|
|
LambdaUtils.eval(result,
|
|
|
- () -> open(path, Optional.of(options), ost)));
|
|
|
+ () -> executeOpen(path, fileInformation)));
|
|
|
} else {
|
|
|
// it is a select statement.
|
|
|
// fail fast if the operation is not available
|
|
@@ -4970,7 +4939,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
// submit the query
|
|
|
unboundedThreadPool.submit(() ->
|
|
|
LambdaUtils.eval(result,
|
|
|
- () -> select(path, sql, options, ost)));
|
|
|
+ () -> select(path, parameters.getOptions(), fileInformation)));
|
|
|
}
|
|
|
return result;
|
|
|
}
|