浏览代码

HADOOP-15039/HADOOP-15189. Move SemaphoredDelegatingExecutor to hadoop-common
Contributed by Genmao Yu

Steve Loughran 7 年之前
父节点
当前提交
ec4f5f0f0d

+ 3 - 3
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java → hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java

@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.fs.s3a;
+package org.apache.hadoop.util;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -42,7 +42,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
  * this s4 threadpool</a>
  */
 @InterfaceAudience.Private
-final class BlockingThreadPoolExecutorService
+public final class BlockingThreadPoolExecutorService
     extends SemaphoredDelegatingExecutor {
 
   private static Logger LOG = LoggerFactory
@@ -86,7 +86,7 @@ final class BlockingThreadPoolExecutorService
    * @return a thread factory that creates named, daemon threads with
    * the supplied exception handler and normal priority
    */
-  static ThreadFactory newDaemonThreadFactory(final String prefix) {
+  public static ThreadFactory newDaemonThreadFactory(final String prefix) {
     final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
     return new ThreadFactory() {
       @Override

+ 6 - 9
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java → hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java

@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.fs.s3a;
+package org.apache.hadoop.util;
 
 import com.google.common.util.concurrent.ForwardingListeningExecutorService;
 import com.google.common.util.concurrent.Futures;
@@ -42,17 +42,13 @@ import java.util.concurrent.TimeoutException;
  * This is a refactoring of {@link BlockingThreadPoolExecutorService}; that code
  * contains the thread pool logic, whereas this isolates the semaphore
  * and submit logic for use with other thread pools and delegation models.
- * In particular, it <i>permits multiple per stream executors to share a
- * single per-FS-instance executor; the latter to throttle overall
- * load from the the FS, the others to limit the amount of load which
- * a single output stream can generate.</i>
  * <p>
  * This is inspired by <a href="https://github.com/apache/incubator-s4/blob/master/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java">
  * this s4 threadpool</a>
  */
 @SuppressWarnings("NullableProblems")
 @InterfaceAudience.Private
-class SemaphoredDelegatingExecutor extends
+public class SemaphoredDelegatingExecutor extends
     ForwardingListeningExecutorService {
 
   private final Semaphore queueingPermits;
@@ -65,7 +61,8 @@ class SemaphoredDelegatingExecutor extends
    * @param permitCount number of permits into the queue permitted
    * @param fair should the semaphore be "fair"
    */
-  SemaphoredDelegatingExecutor(ListeningExecutorService executorDelegatee,
+  public SemaphoredDelegatingExecutor(
+      ListeningExecutorService executorDelegatee,
       int permitCount,
       boolean fair) {
     this.permitCount = permitCount;
@@ -190,7 +187,7 @@ class SemaphoredDelegatingExecutor extends
 
     private Runnable delegatee;
 
-    public RunnableWithPermitRelease(Runnable delegatee) {
+    RunnableWithPermitRelease(Runnable delegatee) {
       this.delegatee = delegatee;
     }
 
@@ -212,7 +209,7 @@ class SemaphoredDelegatingExecutor extends
 
     private Callable<T> delegatee;
 
-    public CallableWithPermitRelease(Callable<T> delegatee) {
+    CallableWithPermitRelease(Callable<T> delegatee) {
       this.delegatee = delegatee;
     }
 

+ 2 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -103,8 +103,10 @@ import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
 import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
 import org.apache.hadoop.fs.s3native.S3xLoginHelper;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
 
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.Listing.ACCEPT_ALL;

+ 2 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java

@@ -19,6 +19,8 @@
 package org.apache.hadoop.fs.s3a;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
+import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
 import org.apache.hadoop.util.StopWatch;
 
 import org.junit.AfterClass;