|
@@ -16,7 +16,7 @@
|
|
* limitations under the License.
|
|
* limitations under the License.
|
|
*/
|
|
*/
|
|
|
|
|
|
-package org.apache.hadoop.fs.s3a;
|
|
|
|
|
|
+package org.apache.hadoop.util;
|
|
|
|
|
|
import com.google.common.util.concurrent.ForwardingListeningExecutorService;
|
|
import com.google.common.util.concurrent.ForwardingListeningExecutorService;
|
|
import com.google.common.util.concurrent.Futures;
|
|
import com.google.common.util.concurrent.Futures;
|
|
@@ -42,17 +42,13 @@ import java.util.concurrent.TimeoutException;
|
|
* This is a refactoring of {@link BlockingThreadPoolExecutorService}; that code
|
|
* This is a refactoring of {@link BlockingThreadPoolExecutorService}; that code
|
|
* contains the thread pool logic, whereas this isolates the semaphore
|
|
* contains the thread pool logic, whereas this isolates the semaphore
|
|
* and submit logic for use with other thread pools and delegation models.
|
|
* and submit logic for use with other thread pools and delegation models.
|
|
- * In particular, it <i>permits multiple per stream executors to share a
|
|
|
|
- * single per-FS-instance executor; the latter to throttle overall
|
|
|
|
- * load from the the FS, the others to limit the amount of load which
|
|
|
|
- * a single output stream can generate.</i>
|
|
|
|
* <p>
|
|
* <p>
|
|
* This is inspired by <a href="https://github.com/apache/incubator-s4/blob/master/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java">
|
|
* This is inspired by <a href="https://github.com/apache/incubator-s4/blob/master/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java">
|
|
* this s4 threadpool</a>
|
|
* this s4 threadpool</a>
|
|
*/
|
|
*/
|
|
@SuppressWarnings("NullableProblems")
|
|
@SuppressWarnings("NullableProblems")
|
|
@InterfaceAudience.Private
|
|
@InterfaceAudience.Private
|
|
-class SemaphoredDelegatingExecutor extends
|
|
|
|
|
|
+public class SemaphoredDelegatingExecutor extends
|
|
ForwardingListeningExecutorService {
|
|
ForwardingListeningExecutorService {
|
|
|
|
|
|
private final Semaphore queueingPermits;
|
|
private final Semaphore queueingPermits;
|
|
@@ -65,7 +61,8 @@ class SemaphoredDelegatingExecutor extends
|
|
* @param permitCount number of permits into the queue permitted
|
|
* @param permitCount number of permits into the queue permitted
|
|
* @param fair should the semaphore be "fair"
|
|
* @param fair should the semaphore be "fair"
|
|
*/
|
|
*/
|
|
- SemaphoredDelegatingExecutor(ListeningExecutorService executorDelegatee,
|
|
|
|
|
|
+ public SemaphoredDelegatingExecutor(
|
|
|
|
+ ListeningExecutorService executorDelegatee,
|
|
int permitCount,
|
|
int permitCount,
|
|
boolean fair) {
|
|
boolean fair) {
|
|
this.permitCount = permitCount;
|
|
this.permitCount = permitCount;
|
|
@@ -190,7 +187,7 @@ class SemaphoredDelegatingExecutor extends
|
|
|
|
|
|
private Runnable delegatee;
|
|
private Runnable delegatee;
|
|
|
|
|
|
- public RunnableWithPermitRelease(Runnable delegatee) {
|
|
|
|
|
|
+ RunnableWithPermitRelease(Runnable delegatee) {
|
|
this.delegatee = delegatee;
|
|
this.delegatee = delegatee;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -212,7 +209,7 @@ class SemaphoredDelegatingExecutor extends
|
|
|
|
|
|
private Callable<T> delegatee;
|
|
private Callable<T> delegatee;
|
|
|
|
|
|
- public CallableWithPermitRelease(Callable<T> delegatee) {
|
|
|
|
|
|
+ CallableWithPermitRelease(Callable<T> delegatee) {
|
|
this.delegatee = delegatee;
|
|
this.delegatee = delegatee;
|
|
}
|
|
}
|
|
|
|
|