|
@@ -1,4 +1,4 @@
|
|
-/**
|
|
|
|
|
|
+/*
|
|
* Licensed to the Apache Software Foundation (ASF) under one
|
|
* Licensed to the Apache Software Foundation (ASF) under one
|
|
* or more contributor license agreements. See the NOTICE file
|
|
* or more contributor license agreements. See the NOTICE file
|
|
* distributed with this work for additional information
|
|
* distributed with this work for additional information
|
|
@@ -30,21 +30,26 @@ import java.util.EnumSet;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.Objects;
|
|
import java.util.Objects;
|
|
-import java.util.concurrent.ExecutorService;
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
import com.amazonaws.AmazonClientException;
|
|
import com.amazonaws.AmazonClientException;
|
|
import com.amazonaws.AmazonServiceException;
|
|
import com.amazonaws.AmazonServiceException;
|
|
import com.amazonaws.services.s3.AmazonS3;
|
|
import com.amazonaws.services.s3.AmazonS3;
|
|
|
|
+import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
|
|
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
|
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
|
import com.amazonaws.services.s3.model.CannedAccessControlList;
|
|
import com.amazonaws.services.s3.model.CannedAccessControlList;
|
|
|
|
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
|
|
|
|
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
|
|
|
|
+import com.amazonaws.services.s3.model.CopyObjectRequest;
|
|
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
|
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
|
|
|
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
|
|
import com.amazonaws.services.s3.model.ListObjectsRequest;
|
|
import com.amazonaws.services.s3.model.ListObjectsRequest;
|
|
import com.amazonaws.services.s3.model.ObjectListing;
|
|
import com.amazonaws.services.s3.model.ObjectListing;
|
|
import com.amazonaws.services.s3.model.ObjectMetadata;
|
|
import com.amazonaws.services.s3.model.ObjectMetadata;
|
|
|
|
+import com.amazonaws.services.s3.model.PartETag;
|
|
import com.amazonaws.services.s3.model.PutObjectRequest;
|
|
import com.amazonaws.services.s3.model.PutObjectRequest;
|
|
-import com.amazonaws.services.s3.model.CopyObjectRequest;
|
|
|
|
|
|
+import com.amazonaws.services.s3.model.PutObjectResult;
|
|
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
|
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
|
import com.amazonaws.services.s3.model.UploadPartRequest;
|
|
import com.amazonaws.services.s3.model.UploadPartRequest;
|
|
import com.amazonaws.services.s3.model.UploadPartResult;
|
|
import com.amazonaws.services.s3.model.UploadPartResult;
|
|
@@ -55,6 +60,8 @@ import com.amazonaws.services.s3.transfer.Upload;
|
|
import com.amazonaws.event.ProgressListener;
|
|
import com.amazonaws.event.ProgressListener;
|
|
import com.amazonaws.event.ProgressEvent;
|
|
import com.amazonaws.event.ProgressEvent;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
|
+import com.google.common.base.Preconditions;
|
|
|
|
+import com.google.common.util.concurrent.ListeningExecutorService;
|
|
|
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
import org.apache.commons.lang.StringUtils;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
@@ -68,6 +75,7 @@ import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.GlobalStorageStatistics;
|
|
import org.apache.hadoop.fs.GlobalStorageStatistics;
|
|
import org.apache.hadoop.fs.InvalidRequestException;
|
|
import org.apache.hadoop.fs.InvalidRequestException;
|
|
|
|
+import org.apache.hadoop.fs.LocalDirAllocator;
|
|
import org.apache.hadoop.fs.LocalFileSystem;
|
|
import org.apache.hadoop.fs.LocalFileSystem;
|
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
@@ -118,21 +126,26 @@ public class S3AFileSystem extends FileSystem {
|
|
private long partSize;
|
|
private long partSize;
|
|
private boolean enableMultiObjectsDelete;
|
|
private boolean enableMultiObjectsDelete;
|
|
private TransferManager transfers;
|
|
private TransferManager transfers;
|
|
- private ExecutorService threadPoolExecutor;
|
|
|
|
|
|
+ private ListeningExecutorService threadPoolExecutor;
|
|
private long multiPartThreshold;
|
|
private long multiPartThreshold;
|
|
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
|
|
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
|
|
|
|
+ private static final Logger PROGRESS =
|
|
|
|
+ LoggerFactory.getLogger("org.apache.hadoop.fs.s3a.S3AFileSystem.Progress");
|
|
|
|
+ private LocalDirAllocator directoryAllocator;
|
|
private CannedAccessControlList cannedACL;
|
|
private CannedAccessControlList cannedACL;
|
|
private String serverSideEncryptionAlgorithm;
|
|
private String serverSideEncryptionAlgorithm;
|
|
private S3AInstrumentation instrumentation;
|
|
private S3AInstrumentation instrumentation;
|
|
private S3AStorageStatistics storageStatistics;
|
|
private S3AStorageStatistics storageStatistics;
|
|
private long readAhead;
|
|
private long readAhead;
|
|
private S3AInputPolicy inputPolicy;
|
|
private S3AInputPolicy inputPolicy;
|
|
- private static final AtomicBoolean warnedOfCoreThreadDeprecation =
|
|
|
|
- new AtomicBoolean(false);
|
|
|
|
private final AtomicBoolean closed = new AtomicBoolean(false);
|
|
private final AtomicBoolean closed = new AtomicBoolean(false);
|
|
|
|
|
|
// The maximum number of entries that can be deleted in any call to s3
|
|
// The maximum number of entries that can be deleted in any call to s3
|
|
private static final int MAX_ENTRIES_TO_DELETE = 1000;
|
|
private static final int MAX_ENTRIES_TO_DELETE = 1000;
|
|
|
|
+ private boolean blockUploadEnabled;
|
|
|
|
+ private String blockOutputBuffer;
|
|
|
|
+ private S3ADataBlocks.BlockFactory blockFactory;
|
|
|
|
+ private int blockOutputActiveBlocks;
|
|
|
|
|
|
/** Called after a new FileSystem instance is constructed.
|
|
/** Called after a new FileSystem instance is constructed.
|
|
* @param name a uri whose authority section names the host, port, etc.
|
|
* @param name a uri whose authority section names the host, port, etc.
|
|
@@ -159,18 +172,11 @@ public class S3AFileSystem extends FileSystem {
|
|
|
|
|
|
maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
|
|
maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
|
|
listing = new Listing(this);
|
|
listing = new Listing(this);
|
|
- partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
|
|
|
|
- if (partSize < 5 * 1024 * 1024) {
|
|
|
|
- LOG.error(MULTIPART_SIZE + " must be at least 5 MB");
|
|
|
|
- partSize = 5 * 1024 * 1024;
|
|
|
|
- }
|
|
|
|
|
|
+ partSize = getMultipartSizeProperty(conf,
|
|
|
|
+ MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
|
|
|
|
+ multiPartThreshold = getMultipartSizeProperty(conf,
|
|
|
|
+ MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD);
|
|
|
|
|
|
- multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD,
|
|
|
|
- DEFAULT_MIN_MULTIPART_THRESHOLD);
|
|
|
|
- if (multiPartThreshold < 5 * 1024 * 1024) {
|
|
|
|
- LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
|
|
|
|
- multiPartThreshold = 5 * 1024 * 1024;
|
|
|
|
- }
|
|
|
|
//check but do not store the block size
|
|
//check but do not store the block size
|
|
longOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
|
|
longOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
|
|
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
|
|
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
|
|
@@ -186,26 +192,19 @@ public class S3AFileSystem extends FileSystem {
|
|
}
|
|
}
|
|
});
|
|
});
|
|
|
|
|
|
- if (conf.get("fs.s3a.threads.core") != null &&
|
|
|
|
- warnedOfCoreThreadDeprecation.compareAndSet(false, true)) {
|
|
|
|
- LoggerFactory.getLogger(
|
|
|
|
- "org.apache.hadoop.conf.Configuration.deprecation")
|
|
|
|
- .warn("Unsupported option \"fs.s3a.threads.core\"" +
|
|
|
|
- " will be ignored {}", conf.get("fs.s3a.threads.core"));
|
|
|
|
- }
|
|
|
|
int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
|
|
int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
|
|
if (maxThreads < 2) {
|
|
if (maxThreads < 2) {
|
|
LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2.");
|
|
LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2.");
|
|
maxThreads = 2;
|
|
maxThreads = 2;
|
|
}
|
|
}
|
|
- int totalTasks = conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS);
|
|
|
|
- if (totalTasks < 1) {
|
|
|
|
- LOG.warn(MAX_TOTAL_TASKS + "must be at least 1: forcing to 1.");
|
|
|
|
- totalTasks = 1;
|
|
|
|
- }
|
|
|
|
- long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
|
|
|
|
- threadPoolExecutor = new BlockingThreadPoolExecutorService(maxThreads,
|
|
|
|
- maxThreads + totalTasks, keepAliveTime, TimeUnit.SECONDS,
|
|
|
|
|
|
+ int totalTasks = intOption(conf,
|
|
|
|
+ MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1);
|
|
|
|
+ long keepAliveTime = longOption(conf, KEEPALIVE_TIME,
|
|
|
|
+ DEFAULT_KEEPALIVE_TIME, 0);
|
|
|
|
+ threadPoolExecutor = BlockingThreadPoolExecutorService.newInstance(
|
|
|
|
+ maxThreads,
|
|
|
|
+ maxThreads + totalTasks,
|
|
|
|
+ keepAliveTime, TimeUnit.SECONDS,
|
|
"s3a-transfer-shared");
|
|
"s3a-transfer-shared");
|
|
|
|
|
|
initTransferManager();
|
|
initTransferManager();
|
|
@@ -218,8 +217,25 @@ public class S3AFileSystem extends FileSystem {
|
|
|
|
|
|
serverSideEncryptionAlgorithm =
|
|
serverSideEncryptionAlgorithm =
|
|
conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM);
|
|
conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM);
|
|
|
|
+ LOG.debug("Using encryption {}", serverSideEncryptionAlgorithm);
|
|
inputPolicy = S3AInputPolicy.getPolicy(
|
|
inputPolicy = S3AInputPolicy.getPolicy(
|
|
conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL));
|
|
conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL));
|
|
|
|
+
|
|
|
|
+ blockUploadEnabled = conf.getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD);
|
|
|
|
+
|
|
|
|
+ if (blockUploadEnabled) {
|
|
|
|
+ blockOutputBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER,
|
|
|
|
+ DEFAULT_FAST_UPLOAD_BUFFER);
|
|
|
|
+ partSize = ensureOutputParameterInRange(MULTIPART_SIZE, partSize);
|
|
|
|
+ blockFactory = S3ADataBlocks.createFactory(this, blockOutputBuffer);
|
|
|
|
+ blockOutputActiveBlocks = intOption(conf,
|
|
|
|
+ FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1);
|
|
|
|
+ LOG.debug("Using S3ABlockOutputStream with buffer = {}; block={};" +
|
|
|
|
+ " queue limit={}",
|
|
|
|
+ blockOutputBuffer, partSize, blockOutputActiveBlocks);
|
|
|
|
+ } else {
|
|
|
|
+ LOG.debug("Using S3AOutputStream");
|
|
|
|
+ }
|
|
} catch (AmazonClientException e) {
|
|
} catch (AmazonClientException e) {
|
|
throw translateException("initializing ", new Path(name), e);
|
|
throw translateException("initializing ", new Path(name), e);
|
|
}
|
|
}
|
|
@@ -345,6 +361,33 @@ public class S3AFileSystem extends FileSystem {
|
|
return inputPolicy;
|
|
return inputPolicy;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Demand create the directory allocator, then create a temporary file.
|
|
|
|
+ * {@link LocalDirAllocator#createTmpFileForWrite(String, long, Configuration)}.
|
|
|
|
+ * @param pathStr prefix for the temporary file
|
|
|
|
+ * @param size the size of the file that is going to be written
|
|
|
|
+ * @param conf the Configuration object
|
|
|
|
+ * @return a unique temporary file
|
|
|
|
+ * @throws IOException IO problems
|
|
|
|
+ */
|
|
|
|
+ synchronized File createTmpFileForWrite(String pathStr, long size,
|
|
|
|
+ Configuration conf) throws IOException {
|
|
|
|
+ if (directoryAllocator == null) {
|
|
|
|
+ String bufferDir = conf.get(BUFFER_DIR) != null
|
|
|
|
+ ? BUFFER_DIR : "hadoop.tmp.dir";
|
|
|
|
+ directoryAllocator = new LocalDirAllocator(bufferDir);
|
|
|
|
+ }
|
|
|
|
+ return directoryAllocator.createTmpFileForWrite(pathStr, size, conf);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Get the bucket of this filesystem.
|
|
|
|
+ * @return the bucket
|
|
|
|
+ */
|
|
|
|
+ public String getBucket() {
|
|
|
|
+ return bucket;
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Change the input policy for this FS.
|
|
* Change the input policy for this FS.
|
|
* @param inputPolicy new policy
|
|
* @param inputPolicy new policy
|
|
@@ -469,6 +512,7 @@ public class S3AFileSystem extends FileSystem {
|
|
* @see #setPermission(Path, FsPermission)
|
|
* @see #setPermission(Path, FsPermission)
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
|
|
+ @SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
|
|
public FSDataOutputStream create(Path f, FsPermission permission,
|
|
public FSDataOutputStream create(Path f, FsPermission permission,
|
|
boolean overwrite, int bufferSize, short replication, long blockSize,
|
|
boolean overwrite, int bufferSize, short replication, long blockSize,
|
|
Progressable progress) throws IOException {
|
|
Progressable progress) throws IOException {
|
|
@@ -493,28 +537,33 @@ public class S3AFileSystem extends FileSystem {
|
|
|
|
|
|
}
|
|
}
|
|
instrumentation.fileCreated();
|
|
instrumentation.fileCreated();
|
|
- if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) {
|
|
|
|
- return new FSDataOutputStream(
|
|
|
|
- new S3AFastOutputStream(s3,
|
|
|
|
- this,
|
|
|
|
- bucket,
|
|
|
|
|
|
+ FSDataOutputStream output;
|
|
|
|
+ if (blockUploadEnabled) {
|
|
|
|
+ output = new FSDataOutputStream(
|
|
|
|
+ new S3ABlockOutputStream(this,
|
|
key,
|
|
key,
|
|
|
|
+ new SemaphoredDelegatingExecutor(threadPoolExecutor,
|
|
|
|
+ blockOutputActiveBlocks, true),
|
|
progress,
|
|
progress,
|
|
- cannedACL,
|
|
|
|
partSize,
|
|
partSize,
|
|
- multiPartThreshold,
|
|
|
|
- threadPoolExecutor),
|
|
|
|
- statistics);
|
|
|
|
|
|
+ blockFactory,
|
|
|
|
+ instrumentation.newOutputStreamStatistics(),
|
|
|
|
+ new WriteOperationHelper(key)
|
|
|
|
+ ),
|
|
|
|
+ null);
|
|
|
|
+ } else {
|
|
|
|
+
|
|
|
|
+ // We pass null to FSDataOutputStream so it won't count writes that
|
|
|
|
+ // are being buffered to a file
|
|
|
|
+ output = new FSDataOutputStream(
|
|
|
|
+ new S3AOutputStream(getConf(),
|
|
|
|
+ this,
|
|
|
|
+ key,
|
|
|
|
+ progress
|
|
|
|
+ ),
|
|
|
|
+ null);
|
|
}
|
|
}
|
|
- // We pass null to FSDataOutputStream so it won't count writes that
|
|
|
|
- // are being buffered to a file
|
|
|
|
- return new FSDataOutputStream(
|
|
|
|
- new S3AOutputStream(getConf(),
|
|
|
|
- this,
|
|
|
|
- key,
|
|
|
|
- progress
|
|
|
|
- ),
|
|
|
|
- null);
|
|
|
|
|
|
+ return output;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -757,6 +806,33 @@ public class S3AFileSystem extends FileSystem {
|
|
storageStatistics.incrementCounter(statistic, count);
|
|
storageStatistics.incrementCounter(statistic, count);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Decrement a gauge by a specific value.
|
|
|
|
+ * @param statistic The operation to decrement
|
|
|
|
+ * @param count the count to decrement
|
|
|
|
+ */
|
|
|
|
+ protected void decrementGauge(Statistic statistic, long count) {
|
|
|
|
+ instrumentation.decrementGauge(statistic, count);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Increment a gauge by a specific value.
|
|
|
|
+ * @param statistic The operation to increment
|
|
|
|
+ * @param count the count to increment
|
|
|
|
+ */
|
|
|
|
+ protected void incrementGauge(Statistic statistic, long count) {
|
|
|
|
+ instrumentation.incrementGauge(statistic, count);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Get the storage statistics of this filesystem.
|
|
|
|
+ * @return the storage statistics
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public S3AStorageStatistics getStorageStatistics() {
|
|
|
|
+ return storageStatistics;
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Request object metadata; increments counters in the process.
|
|
* Request object metadata; increments counters in the process.
|
|
* @param key key
|
|
* @param key key
|
|
@@ -904,7 +980,9 @@ public class S3AFileSystem extends FileSystem {
|
|
*/
|
|
*/
|
|
public ObjectMetadata newObjectMetadata(long length) {
|
|
public ObjectMetadata newObjectMetadata(long length) {
|
|
final ObjectMetadata om = newObjectMetadata();
|
|
final ObjectMetadata om = newObjectMetadata();
|
|
- om.setContentLength(length);
|
|
|
|
|
|
+ if (length >= 0) {
|
|
|
|
+ om.setContentLength(length);
|
|
|
|
+ }
|
|
return om;
|
|
return om;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -926,7 +1004,41 @@ public class S3AFileSystem extends FileSystem {
|
|
len = putObjectRequest.getMetadata().getContentLength();
|
|
len = putObjectRequest.getMetadata().getContentLength();
|
|
}
|
|
}
|
|
incrementPutStartStatistics(len);
|
|
incrementPutStartStatistics(len);
|
|
- return transfers.upload(putObjectRequest);
|
|
|
|
|
|
+ try {
|
|
|
|
+ Upload upload = transfers.upload(putObjectRequest);
|
|
|
|
+ incrementPutCompletedStatistics(true, len);
|
|
|
|
+ return upload;
|
|
|
|
+ } catch (AmazonClientException e) {
|
|
|
|
+ incrementPutCompletedStatistics(false, len);
|
|
|
|
+ throw e;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * PUT an object directly (i.e. not via the transfer manager).
|
|
|
|
+ * Byte length is calculated from the file length, or, if there is no
|
|
|
|
+ * file, from the content length of the header.
|
|
|
|
+ * @param putObjectRequest the request
|
|
|
|
+ * @return the upload initiated
|
|
|
|
+ * @throws AmazonClientException on problems
|
|
|
|
+ */
|
|
|
|
+ public PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest)
|
|
|
|
+ throws AmazonClientException {
|
|
|
|
+ long len;
|
|
|
|
+ if (putObjectRequest.getFile() != null) {
|
|
|
|
+ len = putObjectRequest.getFile().length();
|
|
|
|
+ } else {
|
|
|
|
+ len = putObjectRequest.getMetadata().getContentLength();
|
|
|
|
+ }
|
|
|
|
+ incrementPutStartStatistics(len);
|
|
|
|
+ try {
|
|
|
|
+ PutObjectResult result = s3.putObject(putObjectRequest);
|
|
|
|
+ incrementPutCompletedStatistics(true, len);
|
|
|
|
+ return result;
|
|
|
|
+ } catch (AmazonClientException e) {
|
|
|
|
+ incrementPutCompletedStatistics(false, len);
|
|
|
|
+ throw e;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -934,10 +1046,20 @@ public class S3AFileSystem extends FileSystem {
|
|
* Increments the write and put counters
|
|
* Increments the write and put counters
|
|
* @param request request
|
|
* @param request request
|
|
* @return the result of the operation.
|
|
* @return the result of the operation.
|
|
|
|
+ * @throws AmazonClientException on problems
|
|
*/
|
|
*/
|
|
- public UploadPartResult uploadPart(UploadPartRequest request) {
|
|
|
|
- incrementPutStartStatistics(request.getPartSize());
|
|
|
|
- return s3.uploadPart(request);
|
|
|
|
|
|
+ public UploadPartResult uploadPart(UploadPartRequest request)
|
|
|
|
+ throws AmazonClientException {
|
|
|
|
+ long len = request.getPartSize();
|
|
|
|
+ incrementPutStartStatistics(len);
|
|
|
|
+ try {
|
|
|
|
+ UploadPartResult uploadPartResult = s3.uploadPart(request);
|
|
|
|
+ incrementPutCompletedStatistics(true, len);
|
|
|
|
+ return uploadPartResult;
|
|
|
|
+ } catch (AmazonClientException e) {
|
|
|
|
+ incrementPutCompletedStatistics(false, len);
|
|
|
|
+ throw e;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -950,9 +1072,28 @@ public class S3AFileSystem extends FileSystem {
|
|
LOG.debug("PUT start {} bytes", bytes);
|
|
LOG.debug("PUT start {} bytes", bytes);
|
|
incrementWriteOperations();
|
|
incrementWriteOperations();
|
|
incrementStatistic(OBJECT_PUT_REQUESTS);
|
|
incrementStatistic(OBJECT_PUT_REQUESTS);
|
|
|
|
+ incrementGauge(OBJECT_PUT_REQUESTS_ACTIVE, 1);
|
|
|
|
+ if (bytes > 0) {
|
|
|
|
+ incrementGauge(OBJECT_PUT_BYTES_PENDING, bytes);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * At the end of a put/multipart upload operation, update the
|
|
|
|
+ * relevant counters and gauges.
|
|
|
|
+ *
|
|
|
|
+ * @param success did the operation succeed?
|
|
|
|
+ * @param bytes bytes in the request.
|
|
|
|
+ */
|
|
|
|
+ public void incrementPutCompletedStatistics(boolean success, long bytes) {
|
|
|
|
+ LOG.debug("PUT completed success={}; {} bytes", success, bytes);
|
|
|
|
+ incrementWriteOperations();
|
|
if (bytes > 0) {
|
|
if (bytes > 0) {
|
|
incrementStatistic(OBJECT_PUT_BYTES, bytes);
|
|
incrementStatistic(OBJECT_PUT_BYTES, bytes);
|
|
|
|
+ decrementGauge(OBJECT_PUT_BYTES_PENDING, bytes);
|
|
}
|
|
}
|
|
|
|
+ incrementStatistic(OBJECT_PUT_REQUESTS_COMPLETED);
|
|
|
|
+ decrementGauge(OBJECT_PUT_REQUESTS_ACTIVE, 1);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -963,7 +1104,7 @@ public class S3AFileSystem extends FileSystem {
|
|
* @param bytes bytes successfully uploaded.
|
|
* @param bytes bytes successfully uploaded.
|
|
*/
|
|
*/
|
|
public void incrementPutProgressStatistics(String key, long bytes) {
|
|
public void incrementPutProgressStatistics(String key, long bytes) {
|
|
- LOG.debug("PUT {}: {} bytes", key, bytes);
|
|
|
|
|
|
+ PROGRESS.debug("PUT {}: {} bytes", key, bytes);
|
|
incrementWriteOperations();
|
|
incrementWriteOperations();
|
|
if (bytes > 0) {
|
|
if (bytes > 0) {
|
|
statistics.incrementBytesWritten(bytes);
|
|
statistics.incrementBytesWritten(bytes);
|
|
@@ -1483,7 +1624,7 @@ public class S3AFileSystem extends FileSystem {
|
|
LocalFileSystem local = getLocal(getConf());
|
|
LocalFileSystem local = getLocal(getConf());
|
|
File srcfile = local.pathToFile(src);
|
|
File srcfile = local.pathToFile(src);
|
|
|
|
|
|
- final ObjectMetadata om = newObjectMetadata();
|
|
|
|
|
|
+ final ObjectMetadata om = newObjectMetadata(srcfile.length());
|
|
PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, srcfile);
|
|
PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, srcfile);
|
|
Upload up = putObject(putObjectRequest);
|
|
Upload up = putObject(putObjectRequest);
|
|
ProgressableProgressListener listener = new ProgressableProgressListener(
|
|
ProgressableProgressListener listener = new ProgressableProgressListener(
|
|
@@ -1751,6 +1892,10 @@ public class S3AFileSystem extends FileSystem {
|
|
.append(serverSideEncryptionAlgorithm)
|
|
.append(serverSideEncryptionAlgorithm)
|
|
.append('\'');
|
|
.append('\'');
|
|
}
|
|
}
|
|
|
|
+ if (blockFactory != null) {
|
|
|
|
+ sb.append(", blockFactory=").append(blockFactory);
|
|
|
|
+ }
|
|
|
|
+ sb.append(", executor=").append(threadPoolExecutor);
|
|
sb.append(", statistics {")
|
|
sb.append(", statistics {")
|
|
.append(statistics)
|
|
.append(statistics)
|
|
.append("}");
|
|
.append("}");
|
|
@@ -1958,4 +2103,163 @@ public class S3AFileSystem extends FileSystem {
|
|
getFileBlockLocations(status, 0, status.getLen())
|
|
getFileBlockLocations(status, 0, status.getLen())
|
|
: null);
|
|
: null);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Helper for an ongoing write operation.
|
|
|
|
+ * <p>
|
|
|
|
+ * It hides direct access to the S3 API from the output stream,
|
|
|
|
+ * and is a location where the object upload process can be evolved/enhanced.
|
|
|
|
+ * <p>
|
|
|
|
+ * Features
|
|
|
|
+ * <ul>
|
|
|
|
+ * <li>Methods to create and submit requests to S3, so avoiding
|
|
|
|
+ * all direct interaction with the AWS APIs.</li>
|
|
|
|
+ * <li>Some extra preflight checks of arguments, so failing fast on
|
|
|
|
+ * errors.</li>
|
|
|
|
+ * <li>Callbacks to let the FS know of events in the output stream
|
|
|
|
+ * upload process.</li>
|
|
|
|
+ * </ul>
|
|
|
|
+ *
|
|
|
|
+ * Each instance of this state is unique to a single output stream.
|
|
|
|
+ */
|
|
|
|
+ final class WriteOperationHelper {
|
|
|
|
+ private final String key;
|
|
|
|
+
|
|
|
|
+ private WriteOperationHelper(String key) {
|
|
|
|
+ this.key = key;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Create a {@link PutObjectRequest} request.
|
|
|
|
+ * The metadata is assumed to have been configured with the size of the
|
|
|
|
+ * operation.
|
|
|
|
+ * @param inputStream source data.
|
|
|
|
+ * @param length size, if known. Use -1 for not known
|
|
|
|
+ * @return the request
|
|
|
|
+ */
|
|
|
|
+ PutObjectRequest newPutRequest(InputStream inputStream, long length) {
|
|
|
|
+ return newPutObjectRequest(key, newObjectMetadata(length), inputStream);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Callback on a successful write.
|
|
|
|
+ */
|
|
|
|
+ void writeSuccessful() {
|
|
|
|
+ finishedWrite(key);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Callback on a write failure.
|
|
|
|
+ * @param e Any exception raised which triggered the failure.
|
|
|
|
+ */
|
|
|
|
+ void writeFailed(Exception e) {
|
|
|
|
+ LOG.debug("Write to {} failed", this, e);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Create a new object metadata instance.
|
|
|
|
+ * Any standard metadata headers are added here, for example:
|
|
|
|
+ * encryption.
|
|
|
|
+ * @param length size, if known. Use -1 for not known
|
|
|
|
+ * @return a new metadata instance
|
|
|
|
+ */
|
|
|
|
+ public ObjectMetadata newObjectMetadata(long length) {
|
|
|
|
+ return S3AFileSystem.this.newObjectMetadata(length);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Start the multipart upload process.
|
|
|
|
+ * @return the upload result containing the ID
|
|
|
|
+ * @throws IOException IO problem
|
|
|
|
+ */
|
|
|
|
+ String initiateMultiPartUpload() throws IOException {
|
|
|
|
+ LOG.debug("Initiating Multipart upload");
|
|
|
|
+ final InitiateMultipartUploadRequest initiateMPURequest =
|
|
|
|
+ new InitiateMultipartUploadRequest(bucket,
|
|
|
|
+ key,
|
|
|
|
+ newObjectMetadata(-1));
|
|
|
|
+ initiateMPURequest.setCannedACL(cannedACL);
|
|
|
|
+ try {
|
|
|
|
+ return s3.initiateMultipartUpload(initiateMPURequest)
|
|
|
|
+ .getUploadId();
|
|
|
|
+ } catch (AmazonClientException ace) {
|
|
|
|
+ throw translateException("initiate MultiPartUpload", key, ace);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Complete a multipart upload operation.
|
|
|
|
+ * @param uploadId multipart operation Id
|
|
|
|
+ * @param partETags list of partial uploads
|
|
|
|
+ * @return the result
|
|
|
|
+ * @throws AmazonClientException on problems.
|
|
|
|
+ */
|
|
|
|
+ CompleteMultipartUploadResult completeMultipartUpload(String uploadId,
|
|
|
|
+ List<PartETag> partETags) throws AmazonClientException {
|
|
|
|
+ Preconditions.checkNotNull(uploadId);
|
|
|
|
+ Preconditions.checkNotNull(partETags);
|
|
|
|
+ Preconditions.checkArgument(!partETags.isEmpty(),
|
|
|
|
+ "No partitions have been uploaded");
|
|
|
|
+ return s3.completeMultipartUpload(
|
|
|
|
+ new CompleteMultipartUploadRequest(bucket,
|
|
|
|
+ key,
|
|
|
|
+ uploadId,
|
|
|
|
+ partETags));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Abort a multipart upload operation.
|
|
|
|
+ * @param uploadId multipart operation Id
|
|
|
|
+ * @return the result
|
|
|
|
+ * @throws AmazonClientException on problems.
|
|
|
|
+ */
|
|
|
|
+ void abortMultipartUpload(String uploadId) throws AmazonClientException {
|
|
|
|
+ s3.abortMultipartUpload(
|
|
|
|
+ new AbortMultipartUploadRequest(bucket, key, uploadId));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Create and initialize a part request of a multipart upload.
|
|
|
|
+ * @param uploadId ID of ongoing upload
|
|
|
|
+ * @param uploadStream source of data to upload
|
|
|
|
+ * @param partNumber current part number of the upload
|
|
|
|
+ * @param size amount of data
|
|
|
|
+ * @return the request.
|
|
|
|
+ */
|
|
|
|
+ UploadPartRequest newUploadPartRequest(String uploadId,
|
|
|
|
+ InputStream uploadStream,
|
|
|
|
+ int partNumber,
|
|
|
|
+ int size) {
|
|
|
|
+ Preconditions.checkNotNull(uploadId);
|
|
|
|
+ Preconditions.checkNotNull(uploadStream);
|
|
|
|
+ Preconditions.checkArgument(size > 0, "Invalid partition size %s", size);
|
|
|
|
+ Preconditions.checkArgument(partNumber> 0 && partNumber <=10000,
|
|
|
|
+ "partNumber must be between 1 and 10000 inclusive, but is %s",
|
|
|
|
+ partNumber);
|
|
|
|
+
|
|
|
|
+ LOG.debug("Creating part upload request for {} #{} size {}",
|
|
|
|
+ uploadId, partNumber, size);
|
|
|
|
+ return new UploadPartRequest()
|
|
|
|
+ .withBucketName(bucket)
|
|
|
|
+ .withKey(key)
|
|
|
|
+ .withUploadId(uploadId)
|
|
|
|
+ .withInputStream(uploadStream)
|
|
|
|
+ .withPartNumber(partNumber)
|
|
|
|
+ .withPartSize(size);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * The toString method is intended to be used in logging/toString calls.
|
|
|
|
+ * @return a string description.
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public String toString() {
|
|
|
|
+ final StringBuilder sb = new StringBuilder(
|
|
|
|
+ "{bucket=").append(bucket);
|
|
|
|
+ sb.append(", key='").append(key).append('\'');
|
|
|
|
+ sb.append('}');
|
|
|
|
+ return sb.toString();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
}
|
|
}
|