瀏覽代碼

HADOOP-18180. Replace use of twitter util-core with java futures in S3A prefetching stream (#4115)

Contributed by PJ Fanning.
PJ Fanning 3 年之前
父節點
當前提交
5a1f4dd5c1

+ 0 - 6
hadoop-tools/hadoop-aws/pom.xml

@@ -468,12 +468,6 @@
       <artifactId>aws-java-sdk-bundle</artifactId>
       <scope>compile</scope>
     </dependency>
-    <dependency>
-      <groupId>com.twitter</groupId>
-      <artifactId>util-core_2.11</artifactId>
-      <version>21.2.0</version>
-      <scope>compile</scope>
-    </dependency>
     <dependency>
       <groupId>org.assertj</groupId>
       <artifactId>assertj-core</artifactId>

+ 2 - 5
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferData.java

@@ -22,10 +22,9 @@ package org.apache.hadoop.fs.common;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Future;
 import java.util.zip.CRC32;
 
-import com.twitter.util.Awaitable.CanAwait;
-import com.twitter.util.Future;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -263,8 +262,6 @@ public class BufferData {
     return false;
   }
 
-  private static final CanAwait CAN_AWAIT = () -> false;
-
   public String toString() {
 
     return String.format(
@@ -281,7 +278,7 @@ public class BufferData {
     if (f == null) {
       return "--";
     } else {
-      return this.action.isReady(CAN_AWAIT) ? "done" : "not done";
+      return this.action.isDone() ? "done" : "not done";
     }
   }
 

+ 2 - 3
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java

@@ -26,9 +26,8 @@ import java.util.Collections;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CancellationException;
+import java.util.concurrent.Future;
 
-import com.twitter.util.Future;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -233,7 +232,7 @@ public class BufferPool implements Closeable {
     for (BufferData data : this.getAll()) {
       Future<Void> actionFuture = data.getActionFuture();
       if (actionFuture != null) {
-        actionFuture.raise(new CancellationException("BufferPool is closing."));
+        actionFuture.cancel(true);
       }
     }
 

+ 18 - 16
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java

@@ -21,13 +21,13 @@ package org.apache.hadoop.fs.common;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 
-import com.twitter.util.Await;
-import com.twitter.util.ExceptionalFunction0;
-import com.twitter.util.Future;
-import com.twitter.util.FuturePool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,9 +37,10 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class CachingBlockManager extends BlockManager {
   private static final Logger LOG = LoggerFactory.getLogger(CachingBlockManager.class);
+  private static final int TIMEOUT_MINUTES = 60;
 
   // Asynchronous tasks are performed in this pool.
-  private final FuturePool futurePool;
+  private final ExecutorServiceFuturePool futurePool;
 
   // Pool of shared ByteBuffer instances.
   private BufferPool bufferPool;
@@ -78,7 +79,7 @@ public abstract class CachingBlockManager extends BlockManager {
    * @throws IllegalArgumentException if bufferPoolSize is zero or negative.
    */
   public CachingBlockManager(
-      FuturePool futurePool,
+      ExecutorServiceFuturePool futurePool,
       BlockData blockData,
       int bufferPoolSize) {
     super(blockData);
@@ -247,7 +248,7 @@ public abstract class CachingBlockManager extends BlockManager {
 
       BlockOperations.Operation op = this.ops.requestPrefetch(blockNumber);
       PrefetchTask prefetchTask = new PrefetchTask(data, this);
-      Future<Void> prefetchFuture = this.futurePool.apply(prefetchTask);
+      Future<Void> prefetchFuture = this.futurePool.executeFunction(prefetchTask);
       data.setPrefetch(prefetchFuture);
       this.ops.end(op);
     }
@@ -344,7 +345,7 @@ public abstract class CachingBlockManager extends BlockManager {
   /**
    * Read task that is submitted to the future pool.
    */
-  private static class PrefetchTask extends ExceptionalFunction0<Void> {
+  private static class PrefetchTask implements Supplier<Void> {
     private final BufferData data;
     private final CachingBlockManager blockManager;
 
@@ -354,7 +355,7 @@ public abstract class CachingBlockManager extends BlockManager {
     }
 
     @Override
-    public Void applyE() {
+    public Void get() {
       try {
         this.blockManager.prefetch(data);
       } catch (Exception e) {
@@ -412,11 +413,13 @@ public abstract class CachingBlockManager extends BlockManager {
       if (state == BufferData.State.PREFETCHING) {
         blockFuture = data.getActionFuture();
       } else {
-        blockFuture = Future.value(null);
+        CompletableFuture<Void> cf = new CompletableFuture<>();
+        cf.complete(null);
+        blockFuture = cf;
       }
 
       CachePutTask task = new CachePutTask(data, blockFuture, this);
-      Future<Void> actionFuture = this.futurePool.apply(task);
+      Future<Void> actionFuture = this.futurePool.executeFunction(task);
       data.setCaching(actionFuture);
       this.ops.end(op);
     }
@@ -433,14 +436,13 @@ public abstract class CachingBlockManager extends BlockManager {
     }
 
     try {
-      Await.result(blockFuture);
+      blockFuture.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
       if (data.stateEqualsOneOf(BufferData.State.DONE)) {
         // There was an error during prefetch.
         return;
       }
     } catch (Exception e) {
-      String message = String.format("error waitng on blockFuture: %s", data);
-      LOG.error(message, e);
+      LOG.error("error waiting on blockFuture: {}", data, e);
       data.setDone();
       return;
     }
@@ -500,7 +502,7 @@ public abstract class CachingBlockManager extends BlockManager {
     this.cache.put(blockNumber, buffer);
   }
 
-  private static class CachePutTask extends ExceptionalFunction0<Void> {
+  private static class CachePutTask implements Supplier<Void> {
     private final BufferData data;
 
     // Block being asynchronously fetched.
@@ -519,7 +521,7 @@ public abstract class CachingBlockManager extends BlockManager {
     }
 
     @Override
-    public Void applyE() {
+    public Void get() {
       this.blockManager.addToCacheAndRelease(this.data, this.blockFuture);
       return null;
     }

+ 70 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java

@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.util.Locale;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.function.Supplier;
+
+/**
+ * A FuturePool implementation backed by a java.util.concurrent.ExecutorService.
+ *
+ * If a piece of work has started, it cannot (currently) be cancelled.
+ *
+ * This class is a simplified version of <code>com.twitter:util-core_2.11</code>
+ * ExecutorServiceFuturePool designed to avoid depending on that Scala library.
+ * One problem with using a Scala library is that many downstream projects
+ * (eg Apache Spark) use Scala, and they might want to use a different version of Scala
+ * from the version that Hadoop chooses to use.
+ *
+ */
+public class ExecutorServiceFuturePool {
+  private ExecutorService executor;
+
+  public ExecutorServiceFuturePool(ExecutorService executor) {
+    this.executor = executor;
+  }
+
+  /**
+   * @param f function to run in future on executor pool
+   * @return future
+   * @throws java.util.concurrent.RejectedExecutionException can be thrown
+   * @throws NullPointerException if f param is null
+   */
+  public Future<Void> executeFunction(final Supplier<Void> f) {
+    return executor.submit(f::get);
+  }
+
+  /**
+   * @param r runnable to run in future on executor pool
+   * @return future
+   * @throws java.util.concurrent.RejectedExecutionException can be thrown
+   * @throws NullPointerException if r param is null
+   */
+  @SuppressWarnings("unchecked")
+  public Future<Void> executeRunnable(final Runnable r) {
+    return (Future<Void>) executor.submit(r::run);
+  }
+
+  public String toString() {
+    return String.format(Locale.ROOT, "ExecutorServiceFuturePool(executor=%s)", executor);
+  }
+}

+ 3 - 4
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -79,8 +79,7 @@ import com.amazonaws.services.s3.transfer.model.CopyResult;
 import com.amazonaws.services.s3.transfer.model.UploadResult;
 import com.amazonaws.event.ProgressListener;
 
-import com.twitter.util.ExecutorServiceFuturePool;
-import com.twitter.util.FuturePool;
+import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -294,7 +293,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   private ThreadPoolExecutor unboundedThreadPool;
 
   // S3 reads are prefetched asynchronously using this future pool.
-  private FuturePool futurePool;
+  private ExecutorServiceFuturePool futurePool;
 
   // If true, the prefetching input stream is used for reads.
   private boolean prefetchEnabled;
@@ -1620,7 +1619,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         statisticsContext,
         fileStatus,
         vectoredIOContext,
-        IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator())
+        IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator(),
         futurePool,
         prefetchBlockSize,
         prefetchBlockCount)

+ 7 - 8
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java

@@ -18,11 +18,10 @@
 
 package org.apache.hadoop.fs.s3a;
 
-import com.twitter.util.FuturePool;
-
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
 import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
 import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
 import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
@@ -77,7 +76,7 @@ public class S3AReadOpContext extends S3AOpContext {
   private final IOStatisticsAggregator ioStatisticsAggregator;
 
   // S3 reads are prefetched asynchronously using this future pool.
-  private FuturePool futurePool;
+  private ExecutorServiceFuturePool futurePool;
 
   // Size in bytes of a single prefetch block.
   private final int prefetchBlockSize;
@@ -94,7 +93,7 @@ public class S3AReadOpContext extends S3AOpContext {
    * @param dstFileStatus target file status
    * @param vectoredIOContext context for vectored read operation.
    * @param ioStatisticsAggregator IOStatistics aggregator for each thread.
-   * @param futurePool the FuturePool instance used by async prefetches.
+   * @param futurePool the ExecutorServiceFuturePool instance used by async prefetches.
    * @param prefetchBlockSize the size (in number of bytes) of each prefetched block.
    * @param prefetchBlockCount maximum number of prefetched blocks.
    */
@@ -106,7 +105,7 @@ public class S3AReadOpContext extends S3AOpContext {
       FileStatus dstFileStatus,
       VectoredIOContext vectoredIOContext,
       IOStatisticsAggregator ioStatisticsAggregator,
-      FuturePool futurePool,
+      ExecutorServiceFuturePool futurePool,
       int prefetchBlockSize,
       int prefetchBlockCount) {
 
@@ -258,11 +257,11 @@ public class S3AReadOpContext extends S3AOpContext {
   }
 
   /**
-   * Gets the {@code FuturePool} used for asynchronous prefetches.
+   * Gets the {@code ExecutorServiceFuturePool} used for asynchronous prefetches.
    *
-   * @return the {@code FuturePool} used for asynchronous prefetches.
+   * @return the {@code ExecutorServiceFuturePool} used for asynchronous prefetches.
    */
-  public FuturePool getFuturePool() {
+  public ExecutorServiceFuturePool getFuturePool() {
     return this.futurePool;
   }
 

+ 2 - 2
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java

@@ -22,12 +22,12 @@ package org.apache.hadoop.fs.s3a.read;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import com.twitter.util.FuturePool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.fs.common.BlockData;
 import org.apache.hadoop.fs.common.CachingBlockManager;
+import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
 import org.apache.hadoop.fs.common.Validate;
 
 /**
@@ -52,7 +52,7 @@ public class S3CachingBlockManager extends CachingBlockManager {
    * @throws IllegalArgumentException if reader is null.
    */
   public S3CachingBlockManager(
-      FuturePool futurePool,
+      ExecutorServiceFuturePool futurePool,
       S3Reader reader,
       BlockData blockData,
       int bufferPoolSize) {

+ 2 - 2
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java

@@ -21,13 +21,13 @@ package org.apache.hadoop.fs.s3a.read;
 
 import java.io.IOException;
 
-import com.twitter.util.FuturePool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.fs.common.BlockData;
 import org.apache.hadoop.fs.common.BlockManager;
 import org.apache.hadoop.fs.common.BufferData;
+import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
 import org.apache.hadoop.fs.s3a.S3AInputStream;
 import org.apache.hadoop.fs.s3a.S3AReadOpContext;
 import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
@@ -186,7 +186,7 @@ public class S3CachingInputStream extends S3InputStream {
   }
 
   protected BlockManager createBlockManager(
-      FuturePool futurePool,
+      ExecutorServiceFuturePool futurePool,
       S3Reader reader,
       BlockData blockData,
       int bufferPoolSize) {

+ 6 - 4
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferData.java

@@ -24,8 +24,8 @@ import java.nio.ReadOnlyBufferException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
-import com.twitter.util.Future;
 import org.junit.Test;
 
 import org.apache.hadoop.test.AbstractHadoopTestBase;
@@ -83,13 +83,14 @@ public class TestBufferData extends AbstractHadoopTestBase {
 
     assertEquals(BufferData.State.BLANK, data.getState());
 
-    Future<Void> actionFuture = Future.value(null);
+    CompletableFuture<Void> actionFuture = new CompletableFuture<>();
+    actionFuture.complete(null);
     data.setPrefetch(actionFuture);
     assertEquals(BufferData.State.PREFETCHING, data.getState());
     assertNotNull(data.getActionFuture());
     assertSame(actionFuture, data.getActionFuture());
 
-    Future<Void> actionFuture2 = Future.value(null);
+    CompletableFuture<Void> actionFuture2 = new CompletableFuture<>();
     data.setCaching(actionFuture2);
     assertEquals(BufferData.State.CACHING, data.getState());
     assertNotNull(data.getActionFuture());
@@ -117,7 +118,8 @@ public class TestBufferData extends AbstractHadoopTestBase {
 
   @Test
   public void testInvalidStateUpdates() throws Exception {
-    Future<Void> actionFuture = Future.value(null);
+    CompletableFuture<Void> actionFuture = new CompletableFuture<>();
+    actionFuture.complete(null);
     testInvalidStateUpdatesHelper(
         (d) -> d.setPrefetch(actionFuture),
         BufferData.State.BLANK,

+ 92 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java

@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+import org.apache.hadoop.test.LambdaTestUtils;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestExecutorServiceFuturePool extends AbstractHadoopTestBase {
+
+  private ExecutorService executorService;
+
+  @Before
+  public void setUp() {
+    executorService = Executors.newFixedThreadPool(3);
+  }
+
+  @After
+  public void tearDown() {
+    if (executorService != null) {
+      executorService.shutdownNow();
+    }
+  }
+
+  @Test
+  public void testRunnableSucceeds() throws Exception {
+    ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService);
+    final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+    Future<Void> future = futurePool.executeRunnable(() -> atomicBoolean.set(true));
+    future.get(30, TimeUnit.SECONDS);
+    assertTrue("atomicBoolean set to true?", atomicBoolean.get());
+  }
+
+  @Test
+  public void testSupplierSucceeds() throws Exception {
+    ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService);
+    final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+    Future<Void> future = futurePool.executeFunction(() -> {
+      atomicBoolean.set(true);
+      return null;
+    });
+    future.get(30, TimeUnit.SECONDS);
+    assertTrue("atomicBoolean set to true?", atomicBoolean.get());
+  }
+
+  @Test
+  public void testRunnableFails() throws Exception {
+    ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService);
+    Future<Void> future = futurePool.executeRunnable(() -> {
+      throw new IllegalStateException("deliberate");
+    });
+    LambdaTestUtils.intercept(ExecutionException.class, () -> future.get(30, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void testSupplierFails() throws Exception {
+    ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService);
+    Future<Void> future = futurePool.executeFunction(() -> {
+      throw new IllegalStateException("deliberate");
+    });
+    LambdaTestUtils.intercept(ExecutionException.class, () -> future.get(30, TimeUnit.SECONDS));
+  }
+}

+ 7 - 7
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java

@@ -34,11 +34,11 @@ import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.model.S3ObjectInputStream;
-import com.twitter.util.FuturePool;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.common.BlockCache;
 import org.apache.hadoop.fs.common.BlockData;
+import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
 import org.apache.hadoop.fs.common.SingleFilePerBlockCache;
 import org.apache.hadoop.fs.common.Validate;
 import org.apache.hadoop.fs.s3a.Invoker;
@@ -109,7 +109,7 @@ public final class Fakes {
   }
 
   public static S3AReadOpContext createReadContext(
-      FuturePool futurePool,
+      ExecutorServiceFuturePool futurePool,
       String key,
       int fileSize,
       int prefetchBlockSize,
@@ -195,7 +195,7 @@ public final class Fakes {
 
   public static S3InputStream createInputStream(
       Class<? extends S3InputStream> clazz,
-      FuturePool futurePool,
+      ExecutorServiceFuturePool futurePool,
       String bucket,
       String key,
       int fileSize,
@@ -225,7 +225,7 @@ public final class Fakes {
   }
 
   public static TestS3InMemoryInputStream createS3InMemoryInputStream(
-      FuturePool futurePool,
+      ExecutorServiceFuturePool futurePool,
       String bucket,
       String key,
       int fileSize) {
@@ -235,7 +235,7 @@ public final class Fakes {
   }
 
   public static TestS3CachingInputStream createS3CachingInputStream(
-      FuturePool futurePool,
+      ExecutorServiceFuturePool futurePool,
       String bucket,
       String key,
       int fileSize,
@@ -322,7 +322,7 @@ public final class Fakes {
 
   public static class TestS3CachingBlockManager extends S3CachingBlockManager {
     public TestS3CachingBlockManager(
-        FuturePool futurePool,
+        ExecutorServiceFuturePool futurePool,
         S3Reader reader,
         BlockData blockData,
         int bufferPoolSize) {
@@ -359,7 +359,7 @@ public final class Fakes {
 
     @Override
     protected S3CachingBlockManager createBlockManager(
-        FuturePool futurePool,
+        ExecutorServiceFuturePool futurePool,
         S3Reader reader,
         BlockData blockData,
         int bufferPoolSize) {

+ 3 - 4
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java

@@ -24,13 +24,12 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import com.twitter.util.ExecutorServiceFuturePool;
-import com.twitter.util.FuturePool;
 import org.junit.Test;
 
 import org.apache.hadoop.fs.common.BlockData;
 import org.apache.hadoop.fs.common.BufferData;
 import org.apache.hadoop.fs.common.ExceptionAsserts;
+import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
 import org.apache.hadoop.test.AbstractHadoopTestBase;
 
 import static org.junit.Assert.assertEquals;
@@ -41,7 +40,7 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase {
   static final int POOL_SIZE = 3;
 
   private final ExecutorService threadPool = Executors.newFixedThreadPool(4);
-  private final FuturePool futurePool = new ExecutorServiceFuturePool(threadPool);
+  private final ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(threadPool);
 
   private final BlockData blockData = new BlockData(FILE_SIZE, BLOCK_SIZE);
 
@@ -106,7 +105,7 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase {
    */
   static class TestBlockManager extends S3CachingBlockManager {
     TestBlockManager(
-        FuturePool futurePool,
+        ExecutorServiceFuturePool futurePool,
         S3Reader reader,
         BlockData blockData,
         int bufferPoolSize) {

+ 2 - 3
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3File.java

@@ -22,11 +22,10 @@ package org.apache.hadoop.fs.s3a.read;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import com.twitter.util.ExecutorServiceFuturePool;
-import com.twitter.util.FuturePool;
 import org.junit.Test;
 
 import org.apache.hadoop.fs.common.ExceptionAsserts;
+import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
 import org.apache.hadoop.fs.s3a.S3AInputStream;
 import org.apache.hadoop.fs.s3a.S3AReadOpContext;
 import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
@@ -36,7 +35,7 @@ import org.apache.hadoop.test.AbstractHadoopTestBase;
 
 public class TestS3File extends AbstractHadoopTestBase {
   private final ExecutorService threadPool = Executors.newFixedThreadPool(1);
-  private final FuturePool futurePool = new ExecutorServiceFuturePool(threadPool);
+  private final ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(threadPool);
   private final S3AInputStream.InputStreamCallbacks client = MockS3File.createClient("bucket");
 
   @Test

+ 2 - 3
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java

@@ -24,12 +24,11 @@ import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import com.twitter.util.ExecutorServiceFuturePool;
-import com.twitter.util.FuturePool;
 import org.junit.Test;
 
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.common.ExceptionAsserts;
+import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
 import org.apache.hadoop.fs.s3a.S3AInputStream;
 import org.apache.hadoop.fs.s3a.S3AReadOpContext;
 import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
@@ -45,7 +44,7 @@ public class TestS3InputStream extends AbstractHadoopTestBase {
   private static final int FILE_SIZE = 10;
 
   private final ExecutorService threadPool = Executors.newFixedThreadPool(4);
-  private final FuturePool futurePool = new ExecutorServiceFuturePool(threadPool);
+  private final ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(threadPool);
   private final S3AInputStream.InputStreamCallbacks client = MockS3File.createClient("bucket");
 
   @Test