|
@@ -27,12 +27,14 @@ import java.net.URI;
|
|
|
import java.nio.file.AccessDeniedException;
|
|
|
import java.text.DateFormat;
|
|
|
import java.text.SimpleDateFormat;
|
|
|
+import java.time.Instant;
|
|
|
+import java.time.OffsetDateTime;
|
|
|
+import java.time.ZoneOffset;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
import java.util.Date;
|
|
|
import java.util.EnumSet;
|
|
|
-import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Locale;
|
|
|
import java.util.Map;
|
|
@@ -84,6 +86,8 @@ import com.google.common.util.concurrent.ListeningExecutorService;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import org.apache.commons.lang3.tuple.Pair;
|
|
|
+import org.apache.commons.lang3.tuple.Triple;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -91,8 +95,15 @@ import org.apache.hadoop.fs.CreateFlag;
|
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
|
|
|
+import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
|
|
|
import org.apache.hadoop.fs.s3a.impl.CopyOutcome;
|
|
|
+import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport;
|
|
|
+import org.apache.hadoop.fs.s3a.impl.RenameOperation;
|
|
|
+import org.apache.hadoop.fs.s3a.impl.StoreContext;
|
|
|
+import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
|
|
|
import org.apache.hadoop.fs.s3a.select.InternalSelectConstants;
|
|
|
+import org.apache.hadoop.io.IOUtils;
|
|
|
+import org.apache.hadoop.util.DurationInfo;
|
|
|
import org.apache.hadoop.util.LambdaUtils;
|
|
|
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
@@ -203,6 +214,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
private TransferManager transfers;
|
|
|
private ListeningExecutorService boundedThreadPool;
|
|
|
private ExecutorService unboundedThreadPool;
|
|
|
+ private int executorCapacity;
|
|
|
private long multiPartThreshold;
|
|
|
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
|
|
|
private static final Logger PROGRESS =
|
|
@@ -380,6 +392,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
LOG.debug("Using S3ABlockOutputStream with buffer = {}; block={};" +
|
|
|
" queue limit={}",
|
|
|
blockOutputBuffer, partSize, blockOutputActiveBlocks);
|
|
|
+ long authDirTtl = conf.getTimeDuration(METADATASTORE_METADATA_TTL,
|
|
|
+ DEFAULT_METADATASTORE_METADATA_TTL, TimeUnit.MILLISECONDS);
|
|
|
+ ttlTimeProvider = new S3Guard.TtlTimeProvider(authDirTtl);
|
|
|
|
|
|
setMetadataStore(S3Guard.getMetadataStore(this));
|
|
|
allowAuthoritative = conf.getBoolean(METADATASTORE_AUTHORITATIVE,
|
|
@@ -389,17 +404,18 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
getMetadataStore(), allowAuthoritative);
|
|
|
}
|
|
|
initMultipartUploads(conf);
|
|
|
- if (hasMetadataStore()) {
|
|
|
- long authDirTtl = conf.getTimeDuration(METADATASTORE_METADATA_TTL,
|
|
|
- DEFAULT_METADATASTORE_METADATA_TTL, TimeUnit.MILLISECONDS);
|
|
|
- ttlTimeProvider = new S3Guard.TtlTimeProvider(authDirTtl);
|
|
|
- }
|
|
|
} catch (AmazonClientException e) {
|
|
|
throw translateException("initializing ", new Path(name), e);
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Initialize the thread pool.
|
|
|
+ * This must be re-invoked after replacing the S3Client during test
|
|
|
+ * runs.
|
|
|
+ * @param conf configuration.
|
|
|
+ */
|
|
|
private void initThreadPools(Configuration conf) {
|
|
|
int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
|
|
|
if (maxThreads < 2) {
|
|
@@ -418,9 +434,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
unboundedThreadPool = new ThreadPoolExecutor(
|
|
|
maxThreads, Integer.MAX_VALUE,
|
|
|
keepAliveTime, TimeUnit.SECONDS,
|
|
|
- new LinkedBlockingQueue<Runnable>(),
|
|
|
+ new LinkedBlockingQueue<>(),
|
|
|
BlockingThreadPoolExecutorService.newDaemonThreadFactory(
|
|
|
"s3a-transfer-unbounded"));
|
|
|
+ executorCapacity = intOption(conf,
|
|
|
+ EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -689,6 +707,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
* @return the region in which a bucket is located
|
|
|
* @throws IOException on any failure.
|
|
|
*/
|
|
|
+ @VisibleForTesting
|
|
|
@Retries.RetryTranslated
|
|
|
public String getBucketLocation(String bucketName) throws IOException {
|
|
|
return invoker.retry("getBucketLocation()", bucketName, true,
|
|
@@ -733,21 +752,29 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
|
|
|
/**
|
|
|
* Demand create the directory allocator, then create a temporary file.
|
|
|
+ * This does not mark the file for deletion when a process exits.
|
|
|
* {@link LocalDirAllocator#createTmpFileForWrite(String, long, Configuration)}.
|
|
|
- * @param pathStr prefix for the temporary file
|
|
|
- * @param size the size of the file that is going to be written
|
|
|
- * @param conf the Configuration object
|
|
|
- * @return a unique temporary file
|
|
|
- * @throws IOException IO problems
|
|
|
+ * @param pathStr prefix for the temporary file
|
|
|
+ * @param size the size of the file that is going to be written
|
|
|
+ * @param conf the Configuration object
|
|
|
+ * @return a unique temporary file
|
|
|
+ * @throws IOException IO problems
|
|
|
*/
|
|
|
- synchronized File createTmpFileForWrite(String pathStr, long size,
|
|
|
+ File createTmpFileForWrite(String pathStr, long size,
|
|
|
Configuration conf) throws IOException {
|
|
|
if (directoryAllocator == null) {
|
|
|
- String bufferDir = conf.get(BUFFER_DIR) != null
|
|
|
- ? BUFFER_DIR : HADOOP_TMP_DIR;
|
|
|
- directoryAllocator = new LocalDirAllocator(bufferDir);
|
|
|
+ synchronized (this) {
|
|
|
+ String bufferDir = conf.get(BUFFER_DIR) != null
|
|
|
+ ? BUFFER_DIR : HADOOP_TMP_DIR;
|
|
|
+ directoryAllocator = new LocalDirAllocator(bufferDir);
|
|
|
+ }
|
|
|
}
|
|
|
- return directoryAllocator.createTmpFileForWrite(pathStr, size, conf);
|
|
|
+ Path path = directoryAllocator.getLocalPathForWrite(pathStr,
|
|
|
+ size, conf);
|
|
|
+ File dir = new File(path.getParent().toUri().getPath());
|
|
|
+ String prefix = path.getName();
|
|
|
+ // create a temp file on this directory
|
|
|
+ return File.createTempFile(prefix, null, dir);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -929,11 +956,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
return new FSDataInputStream(
|
|
|
new S3AInputStream(
|
|
|
readContext,
|
|
|
- createObjectAttributes(
|
|
|
- path,
|
|
|
- fileStatus.getETag(),
|
|
|
- fileStatus.getVersionId()),
|
|
|
- fileStatus.getLen(),
|
|
|
+ createObjectAttributes(fileStatus),
|
|
|
s3));
|
|
|
}
|
|
|
|
|
@@ -963,22 +986,40 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Create the attributes of an object for a get/select request.
|
|
|
+ * Create the attributes of an object for subsequent use.
|
|
|
* @param f path path of the request.
|
|
|
* @param eTag the eTag of the S3 object
|
|
|
* @param versionId S3 object version ID
|
|
|
+ * @param len length of the file
|
|
|
* @return attributes to use when building the query.
|
|
|
*/
|
|
|
private S3ObjectAttributes createObjectAttributes(
|
|
|
final Path f,
|
|
|
final String eTag,
|
|
|
- final String versionId) {
|
|
|
+ final String versionId,
|
|
|
+ final long len) {
|
|
|
return new S3ObjectAttributes(bucket,
|
|
|
+ f,
|
|
|
pathToKey(f),
|
|
|
getServerSideEncryptionAlgorithm(),
|
|
|
encryptionSecrets.getEncryptionKey(),
|
|
|
eTag,
|
|
|
- versionId);
|
|
|
+ versionId,
|
|
|
+ len);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Create the attributes of an object for subsequent use.
|
|
|
+ * @param fileStatus file status to build from.
|
|
|
+ * @return attributes to use when building the query.
|
|
|
+ */
|
|
|
+ private S3ObjectAttributes createObjectAttributes(
|
|
|
+ final S3AFileStatus fileStatus) {
|
|
|
+ return createObjectAttributes(
|
|
|
+ fileStatus.getPath(),
|
|
|
+ fileStatus.getETag(),
|
|
|
+ fileStatus.getVersionId(),
|
|
|
+ fileStatus.getLen());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1117,9 +1158,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
* @throws IOException on IO failure
|
|
|
* @return true if rename is successful
|
|
|
*/
|
|
|
+ @Retries.RetryTranslated
|
|
|
public boolean rename(Path src, Path dst) throws IOException {
|
|
|
- try {
|
|
|
- return innerRename(src, dst);
|
|
|
+ try (DurationInfo ignored = new DurationInfo(LOG, false,
|
|
|
+ "rename(%s, %s", src, dst)) {
|
|
|
+ long bytesCopied = innerRename(src, dst);
|
|
|
+ LOG.debug("Copied {} bytes", bytesCopied);
|
|
|
+ return true;
|
|
|
} catch (AmazonClientException e) {
|
|
|
throw translateException("rename(" + src +", " + dst + ")", src, e);
|
|
|
} catch (RenameFailedException e) {
|
|
@@ -1132,33 +1177,22 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * The inner rename operation. See {@link #rename(Path, Path)} for
|
|
|
- * the description of the operation.
|
|
|
- * This operation throws an exception on any failure which needs to be
|
|
|
- * reported and downgraded to a failure.
|
|
|
- * Retries: retry translated, assuming all operations it is called do
|
|
|
- * so. For safely, consider catch and handle AmazonClientException
|
|
|
- * because this is such a complex method there's a risk it could surface.
|
|
|
- * @param source path to be renamed
|
|
|
- * @param dest new path after rename
|
|
|
+ * Validate the rename parameters and status of the filesystem;
|
|
|
+ * returns the source and any destination File Status.
|
|
|
+ * @param src qualified path to be renamed
|
|
|
+ * @param dst qualified path after rename
|
|
|
+ * @return the source and (possibly null) destination status entries.
|
|
|
* @throws RenameFailedException if some criteria for a state changing
|
|
|
* rename was not met. This means work didn't happen; it's not something
|
|
|
* which is reported upstream to the FileSystem APIs, for which the semantics
|
|
|
* of "false" are pretty vague.
|
|
|
* @throws FileNotFoundException there's no source file.
|
|
|
* @throws IOException on IO failure.
|
|
|
- * @throws AmazonClientException on failures inside the AWS SDK
|
|
|
*/
|
|
|
- @Retries.RetryMixed
|
|
|
- private boolean innerRename(Path source, Path dest)
|
|
|
- throws RenameFailedException, FileNotFoundException, IOException,
|
|
|
- AmazonClientException {
|
|
|
- Path src = qualify(source);
|
|
|
- Path dst = qualify(dest);
|
|
|
-
|
|
|
- LOG.debug("Rename path {} to {}", src, dst);
|
|
|
- entryPoint(INVOCATION_RENAME);
|
|
|
-
|
|
|
+ @Retries.RetryTranslated
|
|
|
+ private Pair<S3AFileStatus, S3AFileStatus> initiateRename(
|
|
|
+ final Path src,
|
|
|
+ final Path dst) throws IOException {
|
|
|
String srcKey = pathToKey(src);
|
|
|
String dstKey = pathToKey(dst);
|
|
|
|
|
@@ -1227,131 +1261,126 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ return Pair.of(srcStatus, dstStatus);
|
|
|
+ }
|
|
|
|
|
|
- // If we have a MetadataStore, track deletions/creations.
|
|
|
- Collection<Path> srcPaths = null;
|
|
|
- List<PathMetadata> dstMetas = null;
|
|
|
- if (hasMetadataStore()) {
|
|
|
- srcPaths = new HashSet<>(); // srcPaths need fast look up before put
|
|
|
- dstMetas = new ArrayList<>();
|
|
|
- }
|
|
|
- // TODO S3Guard HADOOP-13761: retries when source paths are not visible yet
|
|
|
- // TODO S3Guard: performance: mark destination dirs as authoritative
|
|
|
-
|
|
|
- // Ok! Time to start
|
|
|
- if (srcStatus.isFile()) {
|
|
|
- LOG.debug("rename: renaming file {} to {}", src, dst);
|
|
|
- long length = srcStatus.getLen();
|
|
|
- S3ObjectAttributes objectAttributes =
|
|
|
- createObjectAttributes(srcStatus.getPath(),
|
|
|
- srcStatus.getETag(), srcStatus.getVersionId());
|
|
|
- S3AReadOpContext readContext = createReadContext(srcStatus, inputPolicy,
|
|
|
- changeDetectionPolicy, readAhead);
|
|
|
- if (dstStatus != null && dstStatus.isDirectory()) {
|
|
|
- String newDstKey = maybeAddTrailingSlash(dstKey);
|
|
|
- String filename =
|
|
|
- srcKey.substring(pathToKey(src.getParent()).length()+1);
|
|
|
- newDstKey = newDstKey + filename;
|
|
|
- CopyResult copyResult = copyFile(srcKey, newDstKey, length,
|
|
|
- objectAttributes, readContext);
|
|
|
- S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, src,
|
|
|
- keyToQualifiedPath(newDstKey), length, getDefaultBlockSize(dst),
|
|
|
- username, copyResult.getETag(), copyResult.getVersionId());
|
|
|
- } else {
|
|
|
- CopyResult copyResult = copyFile(srcKey, dstKey, srcStatus.getLen(),
|
|
|
- objectAttributes, readContext);
|
|
|
- S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, src, dst,
|
|
|
- length, getDefaultBlockSize(dst), username,
|
|
|
- copyResult.getETag(), copyResult.getVersionId());
|
|
|
- }
|
|
|
- innerDelete(srcStatus, false);
|
|
|
- } else {
|
|
|
- LOG.debug("rename: renaming directory {} to {}", src, dst);
|
|
|
+ /**
|
|
|
+ * The inner rename operation. See {@link #rename(Path, Path)} for
|
|
|
+ * the description of the operation.
|
|
|
+ * This operation throws an exception on any failure which needs to be
|
|
|
+ * reported and downgraded to a failure.
|
|
|
+ * Retries: retry translated, assuming all operations it is called do
|
|
|
+ * so. For safely, consider catch and handle AmazonClientException
|
|
|
+ * because this is such a complex method there's a risk it could surface.
|
|
|
+ * @param source path to be renamed
|
|
|
+ * @param dest new path after rename
|
|
|
+ * @throws RenameFailedException if some criteria for a state changing
|
|
|
+ * rename was not met. This means work didn't happen; it's not something
|
|
|
+ * which is reported upstream to the FileSystem APIs, for which the semantics
|
|
|
+ * of "false" are pretty vague.
|
|
|
+ * @return the number of bytes copied.
|
|
|
+ * @throws FileNotFoundException there's no source file.
|
|
|
+ * @throws IOException on IO failure.
|
|
|
+ * @throws AmazonClientException on failures inside the AWS SDK
|
|
|
+ */
|
|
|
+ @Retries.RetryMixed
|
|
|
+ private long innerRename(Path source, Path dest)
|
|
|
+ throws RenameFailedException, FileNotFoundException, IOException,
|
|
|
+ AmazonClientException {
|
|
|
+ Path src = qualify(source);
|
|
|
+ Path dst = qualify(dest);
|
|
|
|
|
|
- // This is a directory to directory copy
|
|
|
- dstKey = maybeAddTrailingSlash(dstKey);
|
|
|
- srcKey = maybeAddTrailingSlash(srcKey);
|
|
|
+ LOG.debug("Rename path {} to {}", src, dst);
|
|
|
+ entryPoint(INVOCATION_RENAME);
|
|
|
|
|
|
- //Verify dest is not a child of the source directory
|
|
|
- if (dstKey.startsWith(srcKey)) {
|
|
|
- throw new RenameFailedException(srcKey, dstKey,
|
|
|
- "cannot rename a directory to a subdirectory of itself ");
|
|
|
- }
|
|
|
+ String srcKey = pathToKey(src);
|
|
|
+ String dstKey = pathToKey(dst);
|
|
|
|
|
|
- List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>();
|
|
|
- if (dstStatus != null && dstStatus.isEmptyDirectory() == Tristate.TRUE) {
|
|
|
- // delete unnecessary fake directory.
|
|
|
- keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey));
|
|
|
- }
|
|
|
+ Pair<S3AFileStatus, S3AFileStatus> p = initiateRename(src, dst);
|
|
|
|
|
|
- Path parentPath = keyToQualifiedPath(srcKey);
|
|
|
- RemoteIterator<S3ALocatedFileStatus> iterator =
|
|
|
- listFilesAndEmptyDirectories(parentPath, true);
|
|
|
- while (iterator.hasNext()) {
|
|
|
- S3ALocatedFileStatus status = iterator.next();
|
|
|
- long length = status.getLen();
|
|
|
- String key = pathToKey(status.getPath());
|
|
|
- if (status.isDirectory() && !key.endsWith("/")) {
|
|
|
- key += "/";
|
|
|
- }
|
|
|
- keysToDelete
|
|
|
- .add(new DeleteObjectsRequest.KeyVersion(key));
|
|
|
- String newDstKey =
|
|
|
- dstKey + key.substring(srcKey.length());
|
|
|
- S3ObjectAttributes objectAttributes =
|
|
|
- createObjectAttributes(status.getPath(),
|
|
|
- status.getETag(), status.getVersionId());
|
|
|
- S3AReadOpContext readContext = createReadContext(status, inputPolicy,
|
|
|
- changeDetectionPolicy, readAhead);
|
|
|
- CopyResult copyResult = copyFile(key, newDstKey, length,
|
|
|
- objectAttributes, readContext);
|
|
|
-
|
|
|
- if (hasMetadataStore()) {
|
|
|
- // with a metadata store, the object entries need to be updated,
|
|
|
- // including, potentially, the ancestors
|
|
|
- Path childSrc = keyToQualifiedPath(key);
|
|
|
- Path childDst = keyToQualifiedPath(newDstKey);
|
|
|
- if (objectRepresentsDirectory(key, length)) {
|
|
|
- S3Guard.addMoveDir(metadataStore, srcPaths, dstMetas, childSrc,
|
|
|
- childDst, username);
|
|
|
- } else {
|
|
|
- S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, childSrc,
|
|
|
- childDst, length, getDefaultBlockSize(childDst), username,
|
|
|
- copyResult.getETag(), copyResult.getVersionId());
|
|
|
- }
|
|
|
- // Ancestor directories may not be listed, so we explicitly add them
|
|
|
- S3Guard.addMoveAncestors(metadataStore, srcPaths, dstMetas,
|
|
|
- keyToQualifiedPath(srcKey), childSrc, childDst, username);
|
|
|
- }
|
|
|
+ // Initiate the rename.
|
|
|
+ // this will call back into this class via the rename callbacks
|
|
|
+ // and interact directly with any metastore.
|
|
|
+ RenameOperation renameOperation = new RenameOperation(
|
|
|
+ createStoreContext(),
|
|
|
+ src, srcKey, p.getLeft(),
|
|
|
+ dst, dstKey, p.getRight(),
|
|
|
+ new RenameOperationCallbacksImpl());
|
|
|
+ return renameOperation.executeRename();
|
|
|
+ }
|
|
|
|
|
|
- if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) {
|
|
|
- removeKeys(keysToDelete, true, false);
|
|
|
- }
|
|
|
- }
|
|
|
- if (!keysToDelete.isEmpty()) {
|
|
|
- removeKeys(keysToDelete, false, false);
|
|
|
- }
|
|
|
+ /**
|
|
|
+ * All the callbacks made by the rename operation of the filesystem.
|
|
|
+ * This separation allows the operation to be factored out and
|
|
|
+ * still avoid knowledge of the S3AFilesystem implementation.
|
|
|
+ */
|
|
|
+ private class RenameOperationCallbacksImpl implements
|
|
|
+ RenameOperation.RenameOperationCallbacks {
|
|
|
|
|
|
- // We moved all the children, now move the top-level dir
|
|
|
- // Empty directory should have been added as the object summary
|
|
|
- if (hasMetadataStore()
|
|
|
- && srcPaths != null
|
|
|
- && !srcPaths.contains(src)) {
|
|
|
- LOG.debug("To move the non-empty top-level dir src={} and dst={}",
|
|
|
- src, dst);
|
|
|
- S3Guard.addMoveDir(metadataStore, srcPaths, dstMetas, src, dst,
|
|
|
- username);
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public S3ObjectAttributes createObjectAttributes(final Path path,
|
|
|
+ final String eTag,
|
|
|
+ final String versionId,
|
|
|
+ final long len) {
|
|
|
+ return S3AFileSystem.this.createObjectAttributes(path, eTag, versionId,
|
|
|
+ len);
|
|
|
}
|
|
|
|
|
|
- metadataStore.move(srcPaths, dstMetas, ttlTimeProvider);
|
|
|
+ @Override
|
|
|
+ public S3ObjectAttributes createObjectAttributes(final S3AFileStatus fileStatus) {
|
|
|
+ return S3AFileSystem.this.createObjectAttributes(fileStatus);
|
|
|
+ }
|
|
|
|
|
|
- if (!src.getParent().equals(dst.getParent())) {
|
|
|
- LOG.debug("source & dest parents are different; fix up dir markers");
|
|
|
- deleteUnnecessaryFakeDirectories(dst.getParent());
|
|
|
- maybeCreateFakeParentDirectory(src);
|
|
|
+ @Override
|
|
|
+ public S3AReadOpContext createReadContext(final FileStatus fileStatus) {
|
|
|
+ return S3AFileSystem.this.createReadContext(fileStatus,
|
|
|
+ inputPolicy,
|
|
|
+ changeDetectionPolicy, readAhead);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void deleteObjectAtPath(final Path path,
|
|
|
+ final String key,
|
|
|
+ final boolean isFile)
|
|
|
+ throws IOException {
|
|
|
+ S3AFileSystem.this.deleteObjectAtPath(path, key, isFile);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ @Retries.RetryTranslated
|
|
|
+ public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
|
|
|
+ final Path path) throws IOException {
|
|
|
+ return S3AFileSystem.this.listFilesAndEmptyDirectories(path, true);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public CopyResult copyFile(final String srcKey,
|
|
|
+ final String destKey,
|
|
|
+ final S3ObjectAttributes srcAttributes,
|
|
|
+ final S3AReadOpContext readContext) throws IOException {
|
|
|
+ return S3AFileSystem.this.copyFile(srcKey, destKey,
|
|
|
+ srcAttributes.getLen(), srcAttributes, readContext);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void removeKeys(final List<DeleteObjectsRequest.KeyVersion> keysToDelete,
|
|
|
+ final boolean deleteFakeDir,
|
|
|
+ final List<Path> undeletedObjectsOnFailure)
|
|
|
+ throws MultiObjectDeleteException, AmazonClientException, IOException {
|
|
|
+ S3AFileSystem.this.removeKeys(keysToDelete, deleteFakeDir,
|
|
|
+ undeletedObjectsOnFailure);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void finishRename(final Path sourceRenamed, final Path destCreated)
|
|
|
+ throws IOException {
|
|
|
+ Path destParent = destCreated.getParent();
|
|
|
+ if (!sourceRenamed.getParent().equals(destParent)) {
|
|
|
+ LOG.debug("source & dest parents are different; fix up dir markers");
|
|
|
+ deleteUnnecessaryFakeDirectories(destParent);
|
|
|
+ maybeCreateFakeParentDirectory(sourceRenamed);
|
|
|
+ }
|
|
|
}
|
|
|
- return true;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1380,6 +1409,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
public ObjectMetadata getObjectMetadata(Path path,
|
|
|
ChangeTracker changeTracker, Invoker changeInvoker, String operation)
|
|
|
throws IOException {
|
|
|
+ checkNotClosed();
|
|
|
return once("getObjectMetadata", path.toString(),
|
|
|
() ->
|
|
|
// this always does a full HEAD to the object
|
|
@@ -1609,16 +1639,19 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
incrementReadOperations();
|
|
|
incrementStatistic(OBJECT_LIST_REQUESTS);
|
|
|
validateListArguments(request);
|
|
|
- return invoker.retryUntranslated(
|
|
|
- request.toString(),
|
|
|
- true,
|
|
|
- () -> {
|
|
|
- if (useListV1) {
|
|
|
- return S3ListResult.v1(s3.listObjects(request.getV1()));
|
|
|
- } else {
|
|
|
- return S3ListResult.v2(s3.listObjectsV2(request.getV2()));
|
|
|
- }
|
|
|
- });
|
|
|
+ try(DurationInfo ignored =
|
|
|
+ new DurationInfo(LOG, false, "LIST")) {
|
|
|
+ return invoker.retryUntranslated(
|
|
|
+ request.toString(),
|
|
|
+ true,
|
|
|
+ () -> {
|
|
|
+ if (useListV1) {
|
|
|
+ return S3ListResult.v1(s3.listObjects(request.getV1()));
|
|
|
+ } else {
|
|
|
+ return S3ListResult.v2(s3.listObjectsV2(request.getV2()));
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1646,20 +1679,23 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
S3ListResult prevResult) throws IOException {
|
|
|
incrementReadOperations();
|
|
|
validateListArguments(request);
|
|
|
- return invoker.retryUntranslated(
|
|
|
- request.toString(),
|
|
|
- true,
|
|
|
- () -> {
|
|
|
- incrementStatistic(OBJECT_CONTINUE_LIST_REQUESTS);
|
|
|
- if (useListV1) {
|
|
|
- return S3ListResult.v1(
|
|
|
- s3.listNextBatchOfObjects(prevResult.getV1()));
|
|
|
- } else {
|
|
|
- request.getV2().setContinuationToken(prevResult.getV2()
|
|
|
- .getNextContinuationToken());
|
|
|
- return S3ListResult.v2(s3.listObjectsV2(request.getV2()));
|
|
|
- }
|
|
|
- });
|
|
|
+ try(DurationInfo ignored =
|
|
|
+ new DurationInfo(LOG, false, "LIST (continued)")) {
|
|
|
+ return invoker.retryUntranslated(
|
|
|
+ request.toString(),
|
|
|
+ true,
|
|
|
+ () -> {
|
|
|
+ incrementStatistic(OBJECT_CONTINUE_LIST_REQUESTS);
|
|
|
+ if (useListV1) {
|
|
|
+ return S3ListResult.v1(
|
|
|
+ s3.listNextBatchOfObjects(prevResult.getV1()));
|
|
|
+ } else {
|
|
|
+ request.getV2().setContinuationToken(prevResult.getV2()
|
|
|
+ .getNextContinuationToken());
|
|
|
+ return S3ListResult.v2(s3.listObjectsV2(request.getV2()));
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1697,6 +1733,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
throws AmazonClientException, IOException {
|
|
|
blockRootDelete(key);
|
|
|
incrementWriteOperations();
|
|
|
+ LOG.debug("DELETE {}", key);
|
|
|
invoker.retryUntranslated("Delete "+ bucket + ":/" + key,
|
|
|
DELETE_CONSIDERED_IDEMPOTENT,
|
|
|
()-> {
|
|
@@ -1714,9 +1751,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
* @param key key of entry
|
|
|
* @param isFile is the path a file (used for instrumentation only)
|
|
|
* @throws AmazonClientException problems working with S3
|
|
|
- * @throws IOException IO failure
|
|
|
+ * @throws IOException IO failure in the metastore
|
|
|
*/
|
|
|
- @Retries.RetryRaw
|
|
|
+ @Retries.RetryMixed
|
|
|
void deleteObjectAtPath(Path f, String key, boolean isFile)
|
|
|
throws AmazonClientException, IOException {
|
|
|
if (isFile) {
|
|
@@ -1755,7 +1792,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
private void deleteObjects(DeleteObjectsRequest deleteRequest)
|
|
|
throws MultiObjectDeleteException, AmazonClientException, IOException {
|
|
|
incrementWriteOperations();
|
|
|
- try {
|
|
|
+ try(DurationInfo ignored =
|
|
|
+ new DurationInfo(LOG, false, "DELETE %d keys",
|
|
|
+ deleteRequest.getKeys().size())) {
|
|
|
invoker.retryUntranslated("delete",
|
|
|
DELETE_CONSIDERED_IDEMPOTENT,
|
|
|
() -> {
|
|
@@ -1892,7 +1931,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
incrementPutCompletedStatistics(true, len);
|
|
|
// update metadata
|
|
|
finishedWrite(putObjectRequest.getKey(), len,
|
|
|
- result.getETag(), result.getVersionId());
|
|
|
+ result.getETag(), result.getVersionId(), null);
|
|
|
return result;
|
|
|
} catch (AmazonClientException e) {
|
|
|
incrementPutCompletedStatistics(false, len);
|
|
@@ -1993,23 +2032,23 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * A helper method to delete a list of keys on a s3-backend.
|
|
|
+ * Delete a list of keys on a s3-backend.
|
|
|
+ * This does <i>not</i> update the metastore.
|
|
|
* Retry policy: retry untranslated; delete considered idempotent.
|
|
|
* @param keysToDelete collection of keys to delete on the s3-backend.
|
|
|
* if empty, no request is made of the object store.
|
|
|
- * @param clearKeys clears the keysToDelete-list after processing the list
|
|
|
- * when set to true
|
|
|
* @param deleteFakeDir indicates whether this is for deleting fake dirs
|
|
|
* @throws InvalidRequestException if the request was rejected due to
|
|
|
* a mistaken attempt to delete the root directory.
|
|
|
* @throws MultiObjectDeleteException one or more of the keys could not
|
|
|
* be deleted in a multiple object delete operation.
|
|
|
- * @throws AmazonClientException amazon-layer failure.
|
|
|
+ * The number of rejected objects will be added to the metric
|
|
|
+ * {@link Statistic#FILES_DELETE_REJECTED}.
|
|
|
+ * @throws AmazonClientException other amazon-layer failure.
|
|
|
*/
|
|
|
- @VisibleForTesting
|
|
|
@Retries.RetryRaw
|
|
|
- void removeKeys(List<DeleteObjectsRequest.KeyVersion> keysToDelete,
|
|
|
- boolean clearKeys, boolean deleteFakeDir)
|
|
|
+ private void removeKeysS3(List<DeleteObjectsRequest.KeyVersion> keysToDelete,
|
|
|
+ boolean deleteFakeDir)
|
|
|
throws MultiObjectDeleteException, AmazonClientException,
|
|
|
IOException {
|
|
|
if (keysToDelete.isEmpty()) {
|
|
@@ -2019,22 +2058,118 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) {
|
|
|
blockRootDelete(keyVersion.getKey());
|
|
|
}
|
|
|
- if (enableMultiObjectsDelete) {
|
|
|
- deleteObjects(new DeleteObjectsRequest(bucket)
|
|
|
- .withKeys(keysToDelete)
|
|
|
- .withQuiet(true));
|
|
|
- } else {
|
|
|
- for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) {
|
|
|
- deleteObject(keyVersion.getKey());
|
|
|
+ try {
|
|
|
+ if (enableMultiObjectsDelete) {
|
|
|
+ deleteObjects(new DeleteObjectsRequest(bucket)
|
|
|
+ .withKeys(keysToDelete)
|
|
|
+ .withQuiet(true));
|
|
|
+ } else {
|
|
|
+ for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) {
|
|
|
+ deleteObject(keyVersion.getKey());
|
|
|
+ }
|
|
|
}
|
|
|
+ } catch (MultiObjectDeleteException ex) {
|
|
|
+ // partial delete.
|
|
|
+ // Update the stats with the count of the actual number of successful
|
|
|
+ // deletions.
|
|
|
+ int rejected = ex.getErrors().size();
|
|
|
+ noteDeleted(keysToDelete.size() - rejected, deleteFakeDir);
|
|
|
+ incrementStatistic(FILES_DELETE_REJECTED, rejected);
|
|
|
+ throw ex;
|
|
|
}
|
|
|
+ noteDeleted(keysToDelete.size(), deleteFakeDir);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Note the deletion of files or fake directories deleted.
|
|
|
+ * @param count count of keys deleted.
|
|
|
+ * @param deleteFakeDir are the deletions fake directories?
|
|
|
+ */
|
|
|
+ private void noteDeleted(final int count, final boolean deleteFakeDir) {
|
|
|
if (!deleteFakeDir) {
|
|
|
- instrumentation.fileDeleted(keysToDelete.size());
|
|
|
+ instrumentation.fileDeleted(count);
|
|
|
} else {
|
|
|
- instrumentation.fakeDirsDeleted(keysToDelete.size());
|
|
|
+ instrumentation.fakeDirsDeleted(count);
|
|
|
}
|
|
|
- if (clearKeys) {
|
|
|
- keysToDelete.clear();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Invoke {@link #removeKeysS3(List, boolean)} with handling of
|
|
|
+ * {@code MultiObjectDeleteException}.
|
|
|
+ *
|
|
|
+ * @param keysToDelete collection of keys to delete on the s3-backend.
|
|
|
+ * if empty, no request is made of the object store.
|
|
|
+ * @param deleteFakeDir indicates whether this is for deleting fake dirs
|
|
|
+ * @throws InvalidRequestException if the request was rejected due to
|
|
|
+ * a mistaken attempt to delete the root directory.
|
|
|
+ * @throws MultiObjectDeleteException one or more of the keys could not
|
|
|
+ * be deleted in a multiple object delete operation.
|
|
|
+ * @throws AmazonClientException amazon-layer failure.
|
|
|
+ * @throws IOException other IO Exception.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ @Retries.RetryMixed
|
|
|
+ void removeKeys(
|
|
|
+ final List<DeleteObjectsRequest.KeyVersion> keysToDelete,
|
|
|
+ final boolean deleteFakeDir)
|
|
|
+ throws MultiObjectDeleteException, AmazonClientException,
|
|
|
+ IOException {
|
|
|
+ removeKeys(keysToDelete, deleteFakeDir, new ArrayList<>());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Invoke {@link #removeKeysS3(List, boolean)} with handling of
|
|
|
+ * {@code MultiObjectDeleteException} before the exception is rethrown.
|
|
|
+ * Specifically:
|
|
|
+ * <ol>
|
|
|
+ * <li>Failure and !deleteFakeDir: S3Guard is updated with all
|
|
|
+ * deleted entries</li>
|
|
|
+ * <li>Failure where deleteFakeDir == true: do nothing with S3Guard</li>
|
|
|
+ * <li>Success: do nothing with S3Guard</li>
|
|
|
+ * </ol>
|
|
|
+ * @param keysToDelete collection of keys to delete on the s3-backend.
|
|
|
+ * if empty, no request is made of the object store.
|
|
|
+ * @param deleteFakeDir indicates whether this is for deleting fake dirs.
|
|
|
+ * @param undeletedObjectsOnFailure List which will be built up of all
|
|
|
+ * files that were not deleted. This happens even as an exception
|
|
|
+ * is raised.
|
|
|
+ * @throws InvalidRequestException if the request was rejected due to
|
|
|
+ * a mistaken attempt to delete the root directory.
|
|
|
+ * @throws MultiObjectDeleteException one or more of the keys could not
|
|
|
+ * be deleted in a multiple object delete operation.
|
|
|
+ * @throws AmazonClientException amazon-layer failure.
|
|
|
+ * @throws IOException other IO Exception.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ @Retries.RetryMixed
|
|
|
+ void removeKeys(
|
|
|
+ final List<DeleteObjectsRequest.KeyVersion> keysToDelete,
|
|
|
+ final boolean deleteFakeDir,
|
|
|
+ final List<Path> undeletedObjectsOnFailure)
|
|
|
+ throws MultiObjectDeleteException, AmazonClientException,
|
|
|
+ IOException {
|
|
|
+ undeletedObjectsOnFailure.clear();
|
|
|
+ try(DurationInfo ignored = new DurationInfo(LOG, false, "Deleting")) {
|
|
|
+ removeKeysS3(keysToDelete, deleteFakeDir);
|
|
|
+ } catch (MultiObjectDeleteException ex) {
|
|
|
+ LOG.debug("Partial delete failure");
|
|
|
+ // what to do if an IOE was raised? Given an exception was being
|
|
|
+ // raised anyway, and the failures are logged, do nothing.
|
|
|
+ if (!deleteFakeDir) {
|
|
|
+ // when deleting fake directories we don't want to delete metastore
|
|
|
+ // entries so we only process these failures on "real" deletes.
|
|
|
+ Triple<List<Path>, List<Path>, List<Pair<Path, IOException>>> results =
|
|
|
+ new MultiObjectDeleteSupport(createStoreContext())
|
|
|
+ .processDeleteFailure(ex, keysToDelete);
|
|
|
+ undeletedObjectsOnFailure.addAll(results.getMiddle());
|
|
|
+ }
|
|
|
+ throw ex;
|
|
|
+ } catch (AmazonClientException | IOException ex) {
|
|
|
+ List<Path> paths = new MultiObjectDeleteSupport(createStoreContext())
|
|
|
+ .processDeleteFailureGenericException(ex, keysToDelete);
|
|
|
+ // other failures. Assume nothing was deleted
|
|
|
+ undeletedObjectsOnFailure.addAll(paths);
|
|
|
+ throw ex;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2067,7 +2202,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
}
|
|
|
return outcome;
|
|
|
} catch (FileNotFoundException e) {
|
|
|
- LOG.debug("Couldn't delete {} - does not exist", f);
|
|
|
+ LOG.debug("Couldn't delete {} - does not exist: {}", f, e.toString());
|
|
|
instrumentation.errorIgnored();
|
|
|
return false;
|
|
|
} catch (AmazonClientException e) {
|
|
@@ -2131,22 +2266,25 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
LOG.debug("Got object to delete {}", summary.getKey());
|
|
|
|
|
|
if (keys.size() == MAX_ENTRIES_TO_DELETE) {
|
|
|
- removeKeys(keys, true, false);
|
|
|
+ // delete a single page of keys
|
|
|
+ removeKeys(keys, false);
|
|
|
+ keys.clear();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
if (objects.isTruncated()) {
|
|
|
objects = continueListObjects(request, objects);
|
|
|
} else {
|
|
|
- if (!keys.isEmpty()) {
|
|
|
- // TODO: HADOOP-13761 S3Guard: retries
|
|
|
- removeKeys(keys, false, false);
|
|
|
- }
|
|
|
+ // there is no more data: delete the final set of entries.
|
|
|
+ removeKeys(keys, false);
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- metadataStore.deleteSubtree(f, ttlTimeProvider);
|
|
|
+ try(DurationInfo ignored =
|
|
|
+ new DurationInfo(LOG, false, "Delete metastore")) {
|
|
|
+ metadataStore.deleteSubtree(f, ttlTimeProvider);
|
|
|
+ }
|
|
|
} else {
|
|
|
LOG.debug("delete: Path is a file");
|
|
|
deleteObjectAtPath(f, key, true);
|
|
@@ -2276,6 +2414,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
while (files.hasNext()) {
|
|
|
result.add(files.next());
|
|
|
}
|
|
|
+ // merge the results. This will update the store as needed
|
|
|
return S3Guard.dirListingUnion(metadataStore, path, result, dirMeta,
|
|
|
allowAuthoritative, ttlTimeProvider);
|
|
|
} else {
|
|
@@ -2464,6 +2603,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
S3AFileStatus innerGetFileStatus(final Path f,
|
|
|
boolean needEmptyDirectoryFlag) throws IOException {
|
|
|
entryPoint(INVOCATION_GET_FILE_STATUS);
|
|
|
+ checkNotClosed();
|
|
|
final Path path = qualify(f);
|
|
|
String key = pathToKey(path);
|
|
|
LOG.debug("Getting path status for {} ({})", path, key);
|
|
@@ -2476,8 +2616,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
Set<Path> tombstones = Collections.emptySet();
|
|
|
if (pm != null) {
|
|
|
if (pm.isDeleted()) {
|
|
|
+ OffsetDateTime deletedAt = OffsetDateTime.ofInstant(
|
|
|
+ Instant.ofEpochMilli(pm.getFileStatus().getModificationTime()),
|
|
|
+ ZoneOffset.UTC);
|
|
|
throw new FileNotFoundException("Path " + f + " is recorded as " +
|
|
|
- "deleted by S3Guard");
|
|
|
+ "deleted by S3Guard at " + deletedAt);
|
|
|
}
|
|
|
|
|
|
// if ms is not authoritative, check S3 if there's any recent
|
|
@@ -2504,8 +2647,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
final long s3ModTime = s3AFileStatus.getModificationTime();
|
|
|
|
|
|
if(s3ModTime > msModTime) {
|
|
|
- LOG.debug("S3Guard metadata for {} is outdated, updating it",
|
|
|
- path);
|
|
|
+ LOG.debug("S3Guard metadata for {} is outdated;"
|
|
|
+ + " s3modtime={}; msModTime={} updating metastore",
|
|
|
+ path, s3ModTime, msModTime);
|
|
|
return S3Guard.putAndReturn(metadataStore, s3AFileStatus,
|
|
|
instrumentation, ttlTimeProvider);
|
|
|
}
|
|
@@ -2835,7 +2979,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
listener.uploadCompleted();
|
|
|
// post-write actions
|
|
|
finishedWrite(key, info.getLength(),
|
|
|
- result.getETag(), result.getVersionId());
|
|
|
+ result.getETag(), result.getVersionId(), null);
|
|
|
return result;
|
|
|
}
|
|
|
|
|
@@ -3168,7 +3312,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
/**
|
|
|
* Perform post-write actions.
|
|
|
* Calls {@link #deleteUnnecessaryFakeDirectories(Path)} and then
|
|
|
- * {@link S3Guard#addAncestors(MetadataStore, Path, String)}}.
|
|
|
+ * updates any metastore.
|
|
|
* This operation MUST be called after any PUT/multipart PUT completes
|
|
|
* successfully.
|
|
|
*
|
|
@@ -3182,6 +3326,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
* @param length total length of file written
|
|
|
* @param eTag eTag of the written object
|
|
|
* @param versionId S3 object versionId of the written object
|
|
|
+ * @param operationState state of any ongoing bulk operation.
|
|
|
* @throws MetadataPersistenceException if metadata about the write could
|
|
|
* not be saved to the metadata store and
|
|
|
* fs.s3a.metadatastore.fail.on.write.error=true
|
|
@@ -3189,22 +3334,39 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
@InterfaceAudience.Private
|
|
|
@Retries.RetryTranslated("Except if failOnMetadataWriteError=false, in which"
|
|
|
+ " case RetryExceptionsSwallowed")
|
|
|
- void finishedWrite(String key, long length, String eTag, String versionId)
|
|
|
+ void finishedWrite(String key, long length, String eTag, String versionId,
|
|
|
+ @Nullable final BulkOperationState operationState)
|
|
|
throws MetadataPersistenceException {
|
|
|
- LOG.debug("Finished write to {}, len {}", key, length);
|
|
|
+ LOG.debug("Finished write to {}, len {}. etag {}, version {}",
|
|
|
+ key, length, eTag, versionId);
|
|
|
Path p = keyToQualifiedPath(key);
|
|
|
Preconditions.checkArgument(length >= 0, "content length is negative");
|
|
|
deleteUnnecessaryFakeDirectories(p.getParent());
|
|
|
+ // this is only set if there is a metastore to update and the
|
|
|
+ // operationState parameter passed in was null.
|
|
|
+ BulkOperationState stateToClose = null;
|
|
|
|
|
|
// See note about failure semantics in S3Guard documentation
|
|
|
try {
|
|
|
if (hasMetadataStore()) {
|
|
|
- S3Guard.addAncestors(metadataStore, p, username, ttlTimeProvider);
|
|
|
+ BulkOperationState activeState = operationState;
|
|
|
+ if (activeState == null) {
|
|
|
+ // create an operation state if there was none, so that the
|
|
|
+ // information gleaned from addAncestors is preserved into the
|
|
|
+ // subsequent put.
|
|
|
+ stateToClose = S3Guard.initiateBulkWrite(metadataStore,
|
|
|
+ BulkOperationState.OperationType.Put,
|
|
|
+ keyToPath(key));
|
|
|
+ activeState = stateToClose;
|
|
|
+ }
|
|
|
+ S3Guard.addAncestors(metadataStore, p, ttlTimeProvider, activeState);
|
|
|
S3AFileStatus status = createUploadFileStatus(p,
|
|
|
S3AUtils.objectRepresentsDirectory(key, length), length,
|
|
|
getDefaultBlockSize(p), username, eTag, versionId);
|
|
|
- S3Guard.putAndReturn(metadataStore, status, instrumentation,
|
|
|
- ttlTimeProvider);
|
|
|
+ S3Guard.putAndReturn(metadataStore, status,
|
|
|
+ instrumentation,
|
|
|
+ ttlTimeProvider,
|
|
|
+ activeState);
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
if (failOnMetadataWriteError) {
|
|
@@ -3214,6 +3376,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
p, e);
|
|
|
}
|
|
|
instrumentation.errorIgnored();
|
|
|
+ } finally {
|
|
|
+ // if a new operation state was created, close it.
|
|
|
+ IOUtils.cleanupWithLogger(LOG, stateToClose);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -3233,7 +3398,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
path = path.getParent();
|
|
|
}
|
|
|
try {
|
|
|
- removeKeys(keysToRemove, false, true);
|
|
|
+ removeKeys(keysToRemove, true);
|
|
|
} catch(AmazonClientException | IOException e) {
|
|
|
instrumentation.errorIgnored();
|
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -3594,11 +3759,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
};
|
|
|
}
|
|
|
|
|
|
- @Retries.OnceTranslated
|
|
|
+ /**
|
|
|
+ * Recursive List of files and empty directories.
|
|
|
+ * @param f path to list from
|
|
|
+ * @return an iterator.
|
|
|
+ * @throws IOException failure
|
|
|
+ */
|
|
|
+ @Retries.RetryTranslated
|
|
|
public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
|
|
|
Path f, boolean recursive) throws IOException {
|
|
|
- return innerListFiles(f, recursive,
|
|
|
- new Listing.AcceptAllButS3nDirs());
|
|
|
+ return invoker.retry("list", f.toString(), true,
|
|
|
+ () -> innerListFiles(f, recursive, new Listing.AcceptAllButS3nDirs()));
|
|
|
}
|
|
|
|
|
|
@Retries.OnceTranslated
|
|
@@ -3904,8 +4075,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
|
|
|
// readahead range can be dynamically set
|
|
|
long ra = options.getLong(READAHEAD_RANGE, readAhead);
|
|
|
- S3ObjectAttributes objectAttributes = createObjectAttributes(
|
|
|
- path, fileStatus.getETag(), fileStatus.getVersionId());
|
|
|
+ S3ObjectAttributes objectAttributes = createObjectAttributes(fileStatus);
|
|
|
S3AReadOpContext readContext = createReadContext(fileStatus, inputPolicy,
|
|
|
changeDetectionPolicy, ra);
|
|
|
|
|
@@ -3998,4 +4168,59 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Build an immutable store context.
|
|
|
+ * If called while the FS is being initialized,
|
|
|
+ * some of the context will be incomplete.
|
|
|
+ * new store context instances should be created as appropriate.
|
|
|
+ * @return the store context of this FS.
|
|
|
+ */
|
|
|
+ @InterfaceAudience.Private
|
|
|
+ public StoreContext createStoreContext() {
|
|
|
+ return new StoreContext(
|
|
|
+ getUri(),
|
|
|
+ getBucket(),
|
|
|
+ getConf(),
|
|
|
+ getUsername(),
|
|
|
+ owner,
|
|
|
+ boundedThreadPool,
|
|
|
+ executorCapacity,
|
|
|
+ invoker,
|
|
|
+ getInstrumentation(),
|
|
|
+ getStorageStatistics(),
|
|
|
+ getInputPolicy(),
|
|
|
+ changeDetectionPolicy,
|
|
|
+ enableMultiObjectsDelete,
|
|
|
+ metadataStore,
|
|
|
+ useListV1,
|
|
|
+ new ContextAccessorsImpl(),
|
|
|
+ getTtlTimeProvider());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The implementation of context accessors.
|
|
|
+ */
|
|
|
+ private class ContextAccessorsImpl implements ContextAccessors {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Path keyToPath(final String key) {
|
|
|
+ return keyToQualifiedPath(key);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String pathToKey(final Path path) {
|
|
|
+ return S3AFileSystem.this.pathToKey(path);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public File createTempFile(final String prefix, final long size)
|
|
|
+ throws IOException {
|
|
|
+ return createTmpFileForWrite(prefix, size, getConf());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String getBucketLocation() throws IOException {
|
|
|
+ return S3AFileSystem.this.getBucketLocation();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|