|
@@ -26,6 +26,11 @@ import java.net.URI;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Date;
|
|
|
import java.util.List;
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
+import java.util.concurrent.ThreadFactory;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
import org.apache.hadoop.fs.s3.S3Credentials;
|
|
|
|
|
@@ -77,6 +82,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
private String bucket;
|
|
|
private int maxKeys;
|
|
|
private long partSize;
|
|
|
+ private TransferManager transfers;
|
|
|
private int partSizeThreshold;
|
|
|
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
|
|
|
private CannedAccessControlList cannedACL;
|
|
@@ -85,6 +91,55 @@ public class S3AFileSystem extends FileSystem {
|
|
|
// The maximum number of entries that can be deleted in any call to s3
|
|
|
private static final int MAX_ENTRIES_TO_DELETE = 1000;
|
|
|
|
|
|
+ private static final AtomicInteger poolNumber = new AtomicInteger(1);
|
|
|
+ /**
|
|
|
+ * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
|
|
|
+ * with a common prefix.
|
|
|
+ * @param prefix The prefix of every created Thread's name
|
|
|
+ * @return a {@link java.util.concurrent.ThreadFactory} that names threads
|
|
|
+ */
|
|
|
+ public static ThreadFactory getNamedThreadFactory(final String prefix) {
|
|
|
+ SecurityManager s = System.getSecurityManager();
|
|
|
+ final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread()
|
|
|
+ .getThreadGroup();
|
|
|
+
|
|
|
+ return new ThreadFactory() {
|
|
|
+ final AtomicInteger threadNumber = new AtomicInteger(1);
|
|
|
+ private final int poolNum = poolNumber.getAndIncrement();
|
|
|
+ final ThreadGroup group = threadGroup;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Thread newThread(Runnable r) {
|
|
|
+ final String name = prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement();
|
|
|
+ return new Thread(group, r, name);
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get a named {@link ThreadFactory} that just builds daemon threads.
|
|
|
+ * @param prefix name prefix for all threads created from the factory
|
|
|
+ * @return a thread factory that creates named, daemon threads with
|
|
|
+ * the supplied exception handler and normal priority
|
|
|
+ */
|
|
|
+ private static ThreadFactory newDaemonThreadFactory(final String prefix) {
|
|
|
+ final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
|
|
|
+ return new ThreadFactory() {
|
|
|
+ @Override
|
|
|
+ public Thread newThread(Runnable r) {
|
|
|
+ Thread t = namedFactory.newThread(r);
|
|
|
+ if (!t.isDaemon()) {
|
|
|
+ t.setDaemon(true);
|
|
|
+ }
|
|
|
+ if (t.getPriority() != Thread.NORM_PRIORITY) {
|
|
|
+ t.setPriority(Thread.NORM_PRIORITY);
|
|
|
+ }
|
|
|
+ return t;
|
|
|
+ }
|
|
|
+
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
/** Called after a new FileSystem instance is constructed.
|
|
|
* @param name a uri whose authority section names the host, port, etc.
|
|
|
* for this FileSystem
|
|
@@ -93,7 +148,6 @@ public class S3AFileSystem extends FileSystem {
|
|
|
public void initialize(URI name, Configuration conf) throws IOException {
|
|
|
super.initialize(name, conf);
|
|
|
|
|
|
-
|
|
|
uri = URI.create(name.getScheme() + "://" + name.getAuthority());
|
|
|
workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(this.uri,
|
|
|
this.getWorkingDirectory());
|
|
@@ -138,6 +192,34 @@ public class S3AFileSystem extends FileSystem {
|
|
|
partSizeThreshold = 5 * 1024 * 1024;
|
|
|
}
|
|
|
|
|
|
+ int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
|
|
|
+ int coreThreads = conf.getInt(CORE_THREADS, DEFAULT_CORE_THREADS);
|
|
|
+ if (maxThreads == 0) {
|
|
|
+ maxThreads = Runtime.getRuntime().availableProcessors() * 8;
|
|
|
+ }
|
|
|
+ if (coreThreads == 0) {
|
|
|
+ coreThreads = Runtime.getRuntime().availableProcessors() * 8;
|
|
|
+ }
|
|
|
+ long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
|
|
|
+ LinkedBlockingQueue<Runnable> workQueue =
|
|
|
+ new LinkedBlockingQueue<Runnable>(maxThreads *
|
|
|
+ conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS));
|
|
|
+ ThreadPoolExecutor tpe = new ThreadPoolExecutor(
|
|
|
+ coreThreads,
|
|
|
+ maxThreads,
|
|
|
+ keepAliveTime,
|
|
|
+ TimeUnit.SECONDS,
|
|
|
+ workQueue,
|
|
|
+ newDaemonThreadFactory("s3a-transfer-shared-"));
|
|
|
+ tpe.allowCoreThreadTimeOut(true);
|
|
|
+
|
|
|
+ TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
|
|
|
+ transferConfiguration.setMinimumUploadPartSize(partSize);
|
|
|
+ transferConfiguration.setMultipartUploadThreshold(partSizeThreshold);
|
|
|
+
|
|
|
+ transfers = new TransferManager(s3, tpe);
|
|
|
+ transfers.setConfiguration(transferConfiguration);
|
|
|
+
|
|
|
String cannedACLName = conf.get(CANNED_ACL, DEFAULT_CANNED_ACL);
|
|
|
if (!cannedACLName.isEmpty()) {
|
|
|
cannedACL = CannedAccessControlList.valueOf(cannedACLName);
|
|
@@ -155,11 +237,10 @@ public class S3AFileSystem extends FileSystem {
|
|
|
DEFAULT_PURGE_EXISTING_MULTIPART_AGE);
|
|
|
|
|
|
if (purgeExistingMultipart) {
|
|
|
- TransferManager transferManager = new TransferManager(s3);
|
|
|
Date purgeBefore = new Date(new Date().getTime() - purgeExistingMultipartAge*1000);
|
|
|
|
|
|
- transferManager.abortMultipartUploads(bucket, purgeBefore);
|
|
|
- transferManager.shutdownNow(false);
|
|
|
+ transfers.abortMultipartUploads(bucket, purgeBefore);
|
|
|
+ transfers.shutdownNow(false);
|
|
|
}
|
|
|
|
|
|
serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM);
|
|
@@ -245,7 +326,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
}
|
|
|
|
|
|
// We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file
|
|
|
- return new FSDataOutputStream(new S3AOutputStream(getConf(), s3, this,
|
|
|
+ return new FSDataOutputStream(new S3AOutputStream(getConf(), transfers, this,
|
|
|
bucket, key, progress, cannedACL, statistics,
|
|
|
serverSideEncryptionAlgorithm), null);
|
|
|
}
|