Browse Source

HADOOP-18162. hadoop-common support for MAPREDUCE-7341 Manifest Committer

* New statistic names in StoreStatisticNames
  (for joint use with s3a committers)
* Improvements to IOStatistics implementation classes
* RateLimiting wrapper to guava RateLimiter
* S3A committer Tasks moved over as TaskPool and
  added support for RemoteIterator
* JsonSerialization.load() to fail fast if source does not exist

+ tests.

This commit is a prerequisite for the main MAPREDUCE-7341 Manifest Committer
patch.

Contributed by Steve Loughran

Change-Id: Ia92e2ab5083ac3d8d3d713a4d9cb3e9e0278f654
Steve Loughran 3 years ago
parent
commit
9037f9a334

+ 6 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java

@@ -112,6 +112,9 @@ public final class StoreStatisticNames {
   /** {@value}. */
   public static final String OP_MODIFY_ACL_ENTRIES = "op_modify_acl_entries";
 
+  /** {@value}. */
+  public static final String OP_MSYNC = "op_msync";
+
   /** {@value}. */
   public static final String OP_OPEN = "op_open";
 
@@ -172,6 +175,9 @@ public final class StoreStatisticNames {
   public static final String STORE_IO_THROTTLED
       = "store_io_throttled";
 
+  /** Rate limiting was reported {@value}. */
+  public static final String STORE_IO_RATE_LIMITED = "store_io_rate_limited";
+
   /** Requests made of a store: {@value}. */
   public static final String STORE_IO_REQUEST
       = "store_io_request";

+ 28 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.statistics.impl;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.Serializable;
+import java.time.Duration;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -450,12 +451,37 @@ public final class IOStatisticsBinding {
    * @param factory factory of duration trackers
    * @param statistic statistic key
    * @param input input callable.
+   * @throws IOException IO failure.
    */
   public static void trackDurationOfInvocation(
       DurationTrackerFactory factory,
       String statistic,
       InvocationRaisingIOE input) throws IOException {
 
+    measureDurationOfInvocation(factory, statistic, input);
+  }
+
+  /**
+   * Given an IOException raising callable/lambda expression,
+   * execute it and update the relevant statistic,
+   * returning the measured duration.
+   *
+   * {@link #trackDurationOfInvocation(DurationTrackerFactory, String, InvocationRaisingIOE)}
+   * with the duration returned for logging etc.; added as a new
+   * method to avoid linking problems with any code calling the existing
+   * method.
+   *
+   * @param factory factory of duration trackers
+   * @param statistic statistic key
+   * @param input input callable.
+   * @return the duration of the operation, as measured by the duration tracker.
+   * @throws IOException IO failure.
+   */
+  public static Duration measureDurationOfInvocation(
+      DurationTrackerFactory factory,
+      String statistic,
+      InvocationRaisingIOE input) throws IOException {
+
     // create the tracker outside try-with-resources so
     // that failures can be set in the catcher.
     DurationTracker tracker = createTracker(factory, statistic);
@@ -473,6 +499,7 @@ public final class IOStatisticsBinding {
       // set the failed flag.
       tracker.close();
     }
+    return tracker.asDuration();
   }
 
   /**
@@ -622,7 +649,7 @@ public final class IOStatisticsBinding {
    * @param statistic statistic to track
    * @return a duration tracker.
    */
-  private static DurationTracker createTracker(
+  public static DurationTracker createTracker(
       @Nullable final DurationTrackerFactory factory,
       final String statistic) {
     return factory != null

+ 11 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsStore.java

@@ -255,4 +255,15 @@ public interface IOStatisticsStore extends IOStatistics,
    */
   void addTimedOperation(String prefix, Duration duration);
 
+  /**
+   * Add a statistics sample as a min, max and mean and count.
+   * @param key key to add.
+   * @param count count.
+   */
+  default void addSample(String key, long count) {
+    incrementCounter(key, count);
+    addMeanStatisticSample(key, count);
+    addMaximumSample(key, count);
+    addMinimumSample(key, count);
+  }
 }

+ 5 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/PairedDurationTrackerFactory.java

@@ -88,6 +88,11 @@ final class PairedDurationTrackerFactory implements DurationTrackerFactory {
     public Duration asDuration() {
       return firstDuration.asDuration();
     }
+
+    @Override
+    public String toString() {
+      return firstDuration.toString();
+    }
   }
 
 }

+ 7 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/StatisticDurationTracker.java

@@ -103,4 +103,11 @@ public class StatisticDurationTracker extends OperationDuration
     }
     iostats.addTimedOperation(name, asDuration());
   }
+
+  @Override
+  public String toString() {
+    return " Duration of " +
+        (failed? (key + StoreStatisticNames.SUFFIX_FAILURES) : key)
+        + ": " + super.toString();
+  }
 }

+ 4 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JsonSerialization.java

@@ -172,6 +172,9 @@ public class JsonSerialization<T> {
   @SuppressWarnings("unchecked")
   public synchronized T load(File jsonFile)
       throws IOException, JsonParseException, JsonMappingException {
+    if (!jsonFile.exists()) {
+      throw new FileNotFoundException("No such file: " + jsonFile);
+    }
     if (!jsonFile.isFile()) {
       throw new FileNotFoundException("Not a file: " + jsonFile);
     }
@@ -181,7 +184,7 @@ public class JsonSerialization<T> {
     try {
       return mapper.readValue(jsonFile, classType);
     } catch (IOException e) {
-      LOG.error("Exception while parsing json file {}", jsonFile, e);
+      LOG.warn("Exception while parsing json file {}", jsonFile, e);
       throw e;
     }
   }

+ 52 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimiting.java

@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.time.Duration;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Minimal subset of google rate limiter class.
+ * Can be used to throttle use of object stores where excess load
+ * will trigger cluster-wide throttling, backoff etc. and so collapse
+ * performance.
+ * The time waited is returned as a Duration type.
+ * The google rate limiter implements this by allowing a caller to ask for
+ * more capacity than is available. This will be granted
+ * but the subsequent request will be blocked if the bucket of
+ * capacity hasn't let refilled to the point where there is
+ * capacity again.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface RateLimiting {
+
+  /**
+   * Acquire rate limiter capacity.
+   * If there is not enough space, the permits will be acquired,
+   * but the subsequent call will block until the capacity has been
+   * refilled.
+   * @param requestedCapacity capacity to acquire.
+   * @return time spent waiting for output.
+   */
+  Duration acquire(int requestedCapacity);
+
+}

+ 102 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimitingFactory.java

@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.time.Duration;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.RateLimiter;
+
+/**
+ * Factory for Rate Limiting.
+ * This should be only place in the code where the guava RateLimiter is imported.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class RateLimitingFactory {
+
+  private static final RateLimiting UNLIMITED = new NoRateLimiting();
+
+  /**
+   * No waiting took place.
+   */
+  private static final Duration INSTANTLY = Duration.ofMillis(0);
+
+  private RateLimitingFactory() {
+  }
+
+  /**
+   * No Rate Limiting.
+   */
+  private static class NoRateLimiting implements RateLimiting {
+
+
+    @Override
+    public Duration acquire(int requestedCapacity) {
+      return INSTANTLY;
+    }
+  }
+
+  /**
+   * Rate limiting restricted to that of a google rate limiter.
+   */
+  private static final class RestrictedRateLimiting implements RateLimiting {
+    private final RateLimiter limiter;
+
+    /**
+     * Constructor.
+     * @param capacityPerSecond capacity in permits/second.
+     */
+    private RestrictedRateLimiting(int capacityPerSecond) {
+      this.limiter = RateLimiter.create(capacityPerSecond);
+    }
+
+    @Override
+    public Duration acquire(int requestedCapacity) {
+      final double delayMillis = limiter.acquire(requestedCapacity);
+      return delayMillis == 0
+          ? INSTANTLY
+          : Duration.ofMillis((long) (delayMillis * 1000));
+    }
+
+  }
+
+  /**
+   * Get the unlimited rate.
+   * @return a rate limiter which always has capacity.
+   */
+  public static RateLimiting unlimitedRate() {
+    return UNLIMITED;
+  }
+
+  /**
+   * Create an instance.
+   * If the rate is 0; return the unlimited rate.
+   * @param capacity capacity in permits/second.
+   * @return limiter restricted to the given capacity.
+   */
+  public static RateLimiting create(int capacity) {
+
+    return capacity == 0
+        ? unlimitedRate()
+        : new RestrictedRateLimiting(capacity);
+  }
+
+}

+ 74 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CloseableTaskPoolSubmitter.java

@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.functional;
+
+import java.io.Closeable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A task submitter which is closeable, and whose close() call
+ * shuts down the pool. This can help manage
+ * thread pool lifecycles.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public final class CloseableTaskPoolSubmitter implements TaskPool.Submitter,
+    Closeable {
+
+  /** Executors. */
+  private ExecutorService pool;
+
+  /**
+   * Constructor.
+   * @param pool non-null executor.
+   */
+  public CloseableTaskPoolSubmitter(final ExecutorService pool) {
+    this.pool = requireNonNull(pool);
+  }
+
+  /**
+   * Get the pool.
+   * @return the pool.
+   */
+  public ExecutorService getPool() {
+    return pool;
+  }
+
+  /**
+   * Shut down the pool.
+   */
+  @Override
+  public void close() {
+    if (pool != null) {
+      pool.shutdown();
+      pool = null;
+    }
+  }
+
+  @Override
+  public Future<?> submit(final Runnable task) {
+    return pool.submit(task);
+  }
+}

+ 613 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/TaskPool.java

@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.functional;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.util.functional.RemoteIterators.remoteIteratorFromIterable;
+
+/**
+ * Utility class for parallel execution, takes closures for the various
+ * actions.
+ * There is no retry logic: it is expected to be handled by the closures.
+ * From {@code org.apache.hadoop.fs.s3a.commit.Tasks} which came from
+ * the Netflix committer patch.
+ * Apache Iceberg has its own version of this, with a common ancestor
+ * at some point in its history.
+ * A key difference with this class is that the iterator is always,
+ * internally, an {@link RemoteIterator}.
+ * This is to allow tasks to be scheduled while incremental operations
+ * such as paged directory listings are still collecting in results.
+ *
+ * While awaiting completion, this thread spins and sleeps a time of
+ * {@link #SLEEP_INTERVAL_AWAITING_COMPLETION}, which, being a
+ * busy-wait, is inefficient.
+ * There's an implicit assumption that remote IO is being performed, and
+ * so this is not impacting throughput/performance.
+ *
+ * History:
+ * This class came with the Netflix contributions to the S3A committers
+ * in HADOOP-13786.
+ * It was moved into hadoop-common for use in the manifest committer and
+ * anywhere else it is needed, and renamed in the process as
+ * "Tasks" has too many meanings in the hadoop source.
+ * The iterator was then changed from a normal java iterable
+ * to a hadoop {@link org.apache.hadoop.fs.RemoteIterator}.
+ * This allows a task pool to be supplied with incremental listings
+ * from object stores, scheduling work as pages of listing
+ * results come in, rather than blocking until the entire
+ * directory/directory tree etc has been enumerated.
+ *
+ * There is a variant of this in Apache Iceberg in
+ * {@code org.apache.iceberg.util.Tasks}
+ * That is not derived from any version in the hadoop codebase, it
+ * just shares a common ancestor somewhere in the Netflix codebase.
+ * It is the more sophisticated version.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class TaskPool {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TaskPool.class);
+
+  /**
+   * Interval in milliseconds to await completion.
+   */
+  private static final int SLEEP_INTERVAL_AWAITING_COMPLETION = 10;
+
+  private TaskPool() {
+  }
+
+  /**
+   * Callback invoked to process an item.
+   * @param <I> item type being processed
+   * @param <E> exception class which may be raised
+   */
+  @FunctionalInterface
+  public interface Task<I, E extends Exception> {
+    void run(I item) throws E;
+  }
+
+  /**
+   * Callback invoked on a failure.
+   * @param <I> item type being processed
+   * @param <E> exception class which may be raised
+   */
+  @FunctionalInterface
+  public interface FailureTask<I, E extends Exception> {
+
+    /**
+     * process a failure.
+     * @param item item the task is processing
+     * @param exception the exception which was raised.
+     * @throws E Exception of type E
+     */
+    void run(I item, Exception exception) throws E;
+  }
+
+  /**
+   * Builder for task execution.
+   * @param <I> item type
+   */
+  public static class Builder<I> {
+    private final RemoteIterator<I> items;
+    private Submitter service = null;
+    private FailureTask<I, ?> onFailure = null;
+    private boolean stopOnFailure = false;
+    private boolean suppressExceptions = false;
+    private Task<I, ?> revertTask = null;
+    private boolean stopRevertsOnFailure = false;
+    private Task<I, ?> abortTask = null;
+    private boolean stopAbortsOnFailure = false;
+    private int sleepInterval = SLEEP_INTERVAL_AWAITING_COMPLETION;
+
+    /**
+     * Create the builder.
+     * @param items items to process
+     */
+    Builder(RemoteIterator<I> items) {
+      this.items = requireNonNull(items, "items");
+    }
+
+    /**
+     * Create the builder.
+     * @param items items to process
+     */
+    Builder(Iterable<I> items) {
+      this(remoteIteratorFromIterable(items));
+    }
+
+    /**
+     * Declare executor service: if null, the tasks are executed in a single
+     * thread.
+     * @param submitter service to schedule tasks with.
+     * @return this builder.
+     */
+    public Builder<I> executeWith(@Nullable Submitter submitter) {
+
+      this.service = submitter;
+      return this;
+    }
+
+    /**
+     * Task to invoke on failure.
+     * @param task task
+     * @return the builder
+     */
+    public Builder<I> onFailure(FailureTask<I, ?> task) {
+      this.onFailure = task;
+      return this;
+    }
+
+    public Builder<I> stopOnFailure() {
+      this.stopOnFailure = true;
+      return this;
+    }
+
+    /**
+     * Suppress exceptions from tasks.
+     * RemoteIterator exceptions are not suppressable.
+     * @return the builder.
+     */
+    public Builder<I> suppressExceptions() {
+      return suppressExceptions(true);
+    }
+
+    /**
+     * Suppress exceptions from tasks.
+     * RemoteIterator exceptions are not suppressable.
+     * @param suppress new value
+     * @return the builder.
+     */
+    public Builder<I> suppressExceptions(boolean suppress) {
+      this.suppressExceptions = suppress;
+      return this;
+    }
+
+    /**
+     * Task to revert with after another task failed.
+     * @param task task to execute
+     * @return the builder
+     */
+    public Builder<I> revertWith(Task<I, ?> task) {
+      this.revertTask = task;
+      return this;
+    }
+
+    /**
+     * Stop trying to revert if one operation fails.
+     * @return the builder
+     */
+    public Builder<I> stopRevertsOnFailure() {
+      this.stopRevertsOnFailure = true;
+      return this;
+    }
+
+    /**
+     * Task to abort with after another task failed.
+     * @param task task to execute
+     * @return the builder
+     */
+    public Builder<I> abortWith(Task<I, ?> task) {
+      this.abortTask = task;
+      return this;
+    }
+
+    /**
+     * Stop trying to abort if one operation fails.
+     * @return the builder
+     */
+    public Builder<I> stopAbortsOnFailure() {
+      this.stopAbortsOnFailure = true;
+      return this;
+    }
+
+    /**
+     * Set the sleep interval.
+     * @param value new value
+     * @return the builder
+     */
+    public Builder sleepInterval(final int value) {
+      sleepInterval = value;
+      return this;
+    }
+
+    /**
+     * Execute the task across the data.
+     * @param task task to execute
+     * @param <E> exception which may be raised in execution.
+     * @return true if the operation executed successfully
+     * @throws E any exception raised.
+     * @throws IOException IOExceptions raised by remote iterator or in execution.
+     */
+    public <E extends Exception> boolean run(Task<I, E> task) throws E, IOException {
+      requireNonNull(items, "items");
+      if (!items.hasNext()) {
+        // if there are no items, return without worrying about
+        // execution pools, errors etc.
+        return true;
+      }
+      if (service != null) {
+        // thread pool, so run in parallel
+        return runParallel(task);
+      } else {
+        // single threaded execution.
+        return runSingleThreaded(task);
+      }
+    }
+
+    /**
+     * Single threaded execution.
+     * @param task task to execute
+     * @param <E> exception which may be raised in execution.
+     * @return true if the operation executed successfully
+     * @throws E any exception raised.
+     * @throws IOException IOExceptions raised by remote iterator or in execution.
+     */
+    private <E extends Exception> boolean runSingleThreaded(Task<I, E> task)
+        throws E, IOException {
+      List<I> succeeded = new ArrayList<>();
+      List<Exception> exceptions = new ArrayList<>();
+
+      RemoteIterator<I> iterator = items;
+      boolean threw = true;
+      try {
+        while (iterator.hasNext()) {
+          I item = iterator.next();
+          try {
+            task.run(item);
+            succeeded.add(item);
+
+          } catch (Exception e) {
+            exceptions.add(e);
+
+            if (onFailure != null) {
+              try {
+                onFailure.run(item, e);
+              } catch (Exception failException) {
+                LOG.error("Failed to clean up on failure", e);
+                // keep going
+              }
+            }
+
+            if (stopOnFailure) {
+              break;
+            }
+          }
+        }
+
+        threw = false;
+      } catch (IOException iteratorIOE) {
+        // an IOE is reaised here during iteration
+        LOG.debug("IOException when iterating through {}", iterator, iteratorIOE);
+        throw iteratorIOE;
+      } finally {
+        // threw handles exceptions that were *not* caught by the catch block,
+        // and exceptions that were caught and possibly handled by onFailure
+        // are kept in exceptions.
+        if (threw || !exceptions.isEmpty()) {
+          if (revertTask != null) {
+            boolean failed = false;
+            for (I item : succeeded) {
+              try {
+                revertTask.run(item);
+              } catch (Exception e) {
+                LOG.error("Failed to revert task", e);
+                failed = true;
+                // keep going
+              }
+              if (stopRevertsOnFailure && failed) {
+                break;
+              }
+            }
+          }
+
+          if (abortTask != null) {
+            boolean failed = false;
+            while (iterator.hasNext()) {
+              try {
+                abortTask.run(iterator.next());
+              } catch (Exception e) {
+                failed = true;
+                LOG.error("Failed to abort task", e);
+                // keep going
+              }
+              if (stopAbortsOnFailure && failed) {
+                break;
+              }
+            }
+          }
+        }
+      }
+
+      if (!suppressExceptions && !exceptions.isEmpty()) {
+        TaskPool.<E>throwOne(exceptions);
+      }
+
+      return exceptions.isEmpty();
+    }
+
+    /**
+     * Parallel execution.
+     * @param task task to execute
+     * @param <E> exception which may be raised in execution.
+     * @return true if the operation executed successfully
+     * @throws E any exception raised.
+     * @throws IOException IOExceptions raised by remote iterator or in execution.
+     */
+    private <E extends Exception> boolean runParallel(final Task<I, E> task)
+        throws E, IOException {
+      final Queue<I> succeeded = new ConcurrentLinkedQueue<>();
+      final Queue<Exception> exceptions = new ConcurrentLinkedQueue<>();
+      final AtomicBoolean taskFailed = new AtomicBoolean(false);
+      final AtomicBoolean abortFailed = new AtomicBoolean(false);
+      final AtomicBoolean revertFailed = new AtomicBoolean(false);
+
+      List<Future<?>> futures = new ArrayList<>();
+
+      IOException iteratorIOE = null;
+      final RemoteIterator<I> iterator = this.items;
+      try {
+        while(iterator.hasNext()) {
+          final I item = iterator.next();
+          // submit a task for each item that will either run or abort the task
+          futures.add(service.submit(() -> {
+            if (!(stopOnFailure && taskFailed.get())) {
+              // run the task
+              boolean threw = true;
+              try {
+                LOG.debug("Executing task");
+                task.run(item);
+                succeeded.add(item);
+                LOG.debug("Task succeeded");
+
+                threw = false;
+
+              } catch (Exception e) {
+                taskFailed.set(true);
+                exceptions.add(e);
+                LOG.info("Task failed {}", e.toString());
+                LOG.debug("Task failed", e);
+
+                if (onFailure != null) {
+                  try {
+                    onFailure.run(item, e);
+                  } catch (Exception failException) {
+                    LOG.warn("Failed to clean up on failure", e);
+                    // swallow the exception
+                  }
+                }
+              } finally {
+                if (threw) {
+                  taskFailed.set(true);
+                }
+              }
+
+            } else if (abortTask != null) {
+              // abort the task instead of running it
+              if (stopAbortsOnFailure && abortFailed.get()) {
+                return;
+              }
+
+              boolean failed = true;
+              try {
+                LOG.info("Aborting task");
+                abortTask.run(item);
+                failed = false;
+              } catch (Exception e) {
+                LOG.error("Failed to abort task", e);
+                // swallow the exception
+              } finally {
+                if (failed) {
+                  abortFailed.set(true);
+                }
+              }
+            }
+          }));
+        }
+      } catch (IOException e) {
+        // iterator failure.
+        LOG.debug("IOException when iterating through {}", iterator, e);
+        iteratorIOE = e;
+        // mark as a task failure so all submitted tasks will halt/abort
+        taskFailed.set(true);
+      }
+
+      // let the above tasks complete (or abort)
+      waitFor(futures, sleepInterval);
+      int futureCount = futures.size();
+      futures.clear();
+
+      if (taskFailed.get() && revertTask != null) {
+        // at least one task failed, revert any that succeeded
+        LOG.info("Reverting all {} succeeded tasks from {} futures",
+            succeeded.size(), futureCount);
+        for (final I item : succeeded) {
+          futures.add(service.submit(() -> {
+            if (stopRevertsOnFailure && revertFailed.get()) {
+              return;
+            }
+
+            boolean failed = true;
+            try {
+              revertTask.run(item);
+              failed = false;
+            } catch (Exception e) {
+              LOG.error("Failed to revert task", e);
+              // swallow the exception
+            } finally {
+              if (failed) {
+                revertFailed.set(true);
+              }
+            }
+          }));
+        }
+
+        // let the revert tasks complete
+        waitFor(futures, sleepInterval);
+      }
+
+      // give priority to execution exceptions over
+      // iterator exceptions.
+      if (!suppressExceptions && !exceptions.isEmpty()) {
+        // there's an exception list to build up, cast and throw.
+        TaskPool.<E>throwOne(exceptions);
+      }
+
+      // raise any iterator exception.
+      // this can not be suppressed.
+      if (iteratorIOE != null) {
+        throw iteratorIOE;
+      }
+
+      // return true if all tasks succeeded.
+      return !taskFailed.get();
+    }
+  }
+
+  /**
+   * Wait for all the futures to complete; there's a small sleep between
+   * each iteration; enough to yield the CPU.
+   * @param futures futures.
+   * @param sleepInterval Interval in milliseconds to await completion.
+   */
+  private static void waitFor(Collection<Future<?>> futures, int sleepInterval) {
+    int size = futures.size();
+    LOG.debug("Waiting for {} tasks to complete", size);
+    int oldNumFinished = 0;
+    while (true) {
+      int numFinished = (int) futures.stream().filter(Future::isDone).count();
+
+      if (oldNumFinished != numFinished) {
+        LOG.debug("Finished count -> {}/{}", numFinished, size);
+        oldNumFinished = numFinished;
+      }
+
+      if (numFinished == size) {
+        // all of the futures are done, stop looping
+        break;
+      } else {
+        try {
+          Thread.sleep(sleepInterval);
+        } catch (InterruptedException e) {
+          futures.forEach(future -> future.cancel(true));
+          Thread.currentThread().interrupt();
+          break;
+        }
+      }
+    }
+  }
+
+  /**
+   * Create a task builder for the iterable.
+   * @param items item source.
+   * @param <I> type of result.
+   * @return builder.
+   */
+  public static <I> Builder<I> foreach(Iterable<I> items) {
+    return new Builder<>(requireNonNull(items, "items"));
+  }
+
+  /**
+   * Create a task builder for the remote iterator.
+   * @param items item source.
+   * @param <I> type of result.
+   * @return builder.
+   */
+  public static <I> Builder<I> foreach(RemoteIterator<I> items) {
+    return new Builder<>(items);
+  }
+
+  public static <I> Builder<I> foreach(I[] items) {
+    return new Builder<>(Arrays.asList(requireNonNull(items, "items")));
+  }
+
+  /**
+   * Throw one exception, adding the others as suppressed
+   * exceptions attached to the one thrown.
+   * This method never completes normally.
+   * @param exceptions collection of exceptions
+   * @param <E> class of exceptions
+   * @throws E an extracted exception.
+   */
+  private static <E extends Exception> void throwOne(
+      Collection<Exception> exceptions)
+      throws E {
+    Iterator<Exception> iter = exceptions.iterator();
+    Exception e = iter.next();
+    Class<? extends Exception> exceptionClass = e.getClass();
+
+    while (iter.hasNext()) {
+      Exception other = iter.next();
+      if (!exceptionClass.isInstance(other)) {
+        e.addSuppressed(other);
+      }
+    }
+
+    TaskPool.<E>castAndThrow(e);
+  }
+
+  /**
+   * Raise an exception of the declared type.
+   * This method never completes normally.
+   * @param e exception
+   * @param <E> class of exceptions
+   * @throws E a recast exception.
+   */
+  @SuppressWarnings("unchecked")
+  private static <E extends Exception> void castAndThrow(Exception e) throws E {
+    if (e instanceof RuntimeException) {
+      throw (RuntimeException) e;
+    }
+    throw (E) e;
+  }
+
+  /**
+   * Interface to whatever lets us submit tasks.
+   */
+  public interface Submitter {
+
+    /**
+     * Submit work.
+     * @param task task to execute
+     * @return the future of the submitted task.
+     */
+    Future<?> submit(Runnable task);
+  }
+
+}

+ 585 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/functional/TestTaskPool.java

@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.functional;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test Task Pool class.
+ * This is pulled straight out of the S3A version.
+ */
+@RunWith(Parameterized.class)
+public class TestTaskPool extends HadoopTestBase {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestTaskPool.class);
+
+  public static final int ITEM_COUNT = 16;
+
+  private static final int FAILPOINT = 8;
+
+  private final int numThreads;
+
+  /**
+   * Thread pool for task execution.
+   */
+  private ExecutorService threadPool;
+
+  /**
+   * Task submitter bonded to the thread pool, or
+   * null for the 0-thread case.
+   */
+  private TaskPool.Submitter submitter;
+
+  private final CounterTask failingTask
+      = new CounterTask("failing committer", FAILPOINT, Item::commit);
+
+  private final FailureCounter failures
+      = new FailureCounter("failures", 0, null);
+
+  private final CounterTask reverter
+      = new CounterTask("reverter", 0, Item::revert);
+
+  private final CounterTask aborter
+      = new CounterTask("aborter", 0, Item::abort);
+
+  /**
+   * Test array for parameterized test runs: how many threads and
+   * to use. Threading makes some of the assertions brittle; there are
+   * more checks on single thread than parallel ops.
+   * @return a list of parameter tuples.
+   */
+  @Parameterized.Parameters(name = "threads={0}")
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[][]{
+        {0},
+        {1},
+        {3},
+        {8},
+        {16},
+    });
+  }
+
+  private List<Item> items;
+
+  /**
+   * Construct the parameterized test.
+   * @param numThreads number of threads
+   */
+  public TestTaskPool(int numThreads) {
+    this.numThreads = numThreads;
+  }
+
+  /**
+   * In a parallel test run there is more than one thread doing the execution.
+   * @return true if the threadpool size is >1
+   */
+  public boolean isParallel() {
+    return numThreads > 1;
+  }
+
+  @Before
+  public void setup() {
+    items = IntStream.rangeClosed(1, ITEM_COUNT)
+        .mapToObj(i -> new Item(i,
+            String.format("With %d threads", numThreads)))
+        .collect(Collectors.toList());
+
+    if (numThreads > 0) {
+      threadPool = Executors.newFixedThreadPool(numThreads,
+          new ThreadFactoryBuilder()
+              .setDaemon(true)
+              .setNameFormat(getMethodName() + "-pool-%d")
+              .build());
+      submitter = new PoolSubmitter();
+    } else {
+      submitter = null;
+    }
+
+  }
+
+  @After
+  public void teardown() {
+    if (threadPool != null) {
+      threadPool.shutdown();
+      threadPool = null;
+    }
+  }
+
+  private class PoolSubmitter implements TaskPool.Submitter {
+
+    @Override
+    public Future<?> submit(final Runnable task) {
+      return threadPool.submit(task);
+    }
+
+  }
+
+  /**
+   * create the builder.
+   * @return pre-inited builder
+   */
+  private TaskPool.Builder<Item> builder() {
+    return TaskPool.foreach(items).executeWith(submitter);
+  }
+
+  private void assertRun(TaskPool.Builder<Item> builder,
+      CounterTask task) throws IOException {
+    boolean b = builder.run(task);
+    assertTrue("Run of " + task + " failed", b);
+  }
+
+  private void assertFailed(TaskPool.Builder<Item> builder,
+      CounterTask task) throws IOException {
+    boolean b = builder.run(task);
+    assertFalse("Run of " + task + " unexpectedly succeeded", b);
+  }
+
+  private String itemsToString() {
+    return "[" + items.stream().map(Item::toString)
+        .collect(Collectors.joining("\n")) + "]";
+  }
+
+  @Test
+  public void testSimpleInvocation() throws Throwable {
+    CounterTask t = new CounterTask("simple", 0, Item::commit);
+    assertRun(builder(), t);
+    t.assertInvoked("", ITEM_COUNT);
+  }
+
+  @Test
+  public void testFailNoStoppingSuppressed() throws Throwable {
+    assertFailed(builder().suppressExceptions(), failingTask);
+    failingTask.assertInvoked("Continued through operations", ITEM_COUNT);
+    items.forEach(Item::assertCommittedOrFailed);
+  }
+
+  @Test
+  public void testFailFastSuppressed() throws Throwable {
+    assertFailed(builder()
+            .suppressExceptions()
+            .stopOnFailure(),
+        failingTask);
+    if (isParallel()) {
+      failingTask.assertInvokedAtLeast("stop fast", FAILPOINT);
+    } else {
+      failingTask.assertInvoked("stop fast", FAILPOINT);
+    }
+  }
+
+  @Test
+  public void testFailedCallAbortSuppressed() throws Throwable {
+    assertFailed(builder()
+            .stopOnFailure()
+            .suppressExceptions()
+            .abortWith(aborter),
+        failingTask);
+    failingTask.assertInvokedAtLeast("success", FAILPOINT);
+    if (!isParallel()) {
+      aborter.assertInvokedAtLeast("abort", 1);
+      // all uncommitted items were aborted
+      items.stream().filter(i -> !i.committed)
+          .map(Item::assertAborted);
+      items.stream().filter(i -> i.committed)
+          .forEach(i -> assertFalse(i.toString(), i.aborted));
+    }
+  }
+
+  @Test
+  public void testFailedCalledWhenNotStoppingSuppressed() throws Throwable {
+    assertFailed(builder()
+            .suppressExceptions()
+            .onFailure(failures),
+        failingTask);
+    failingTask.assertInvokedAtLeast("success", FAILPOINT);
+    // only one failure was triggered
+    failures.assertInvoked("failure event", 1);
+  }
+
+  @Test
+  public void testFailFastCallRevertSuppressed() throws Throwable {
+    assertFailed(builder()
+            .stopOnFailure()
+            .revertWith(reverter)
+            .abortWith(aborter)
+            .suppressExceptions()
+            .onFailure(failures),
+        failingTask);
+    failingTask.assertInvokedAtLeast("success", FAILPOINT);
+    if (!isParallel()) {
+      aborter.assertInvokedAtLeast("abort", 1);
+      // all uncommitted items were aborted
+      items.stream().filter(i -> !i.committed)
+          .filter(i -> !i.failed)
+          .forEach(Item::assertAborted);
+    }
+    // all committed were reverted
+    items.stream().filter(i -> i.committed && !i.failed)
+        .forEach(Item::assertReverted);
+    // all reverted items are committed
+    items.stream().filter(i -> i.reverted)
+        .forEach(Item::assertCommitted);
+
+    // only one failure was triggered
+    failures.assertInvoked("failure event", 1);
+  }
+
+  @Test
+  public void testFailSlowCallRevertSuppressed() throws Throwable {
+    assertFailed(builder()
+            .suppressExceptions()
+            .revertWith(reverter)
+            .onFailure(failures),
+        failingTask);
+    failingTask.assertInvokedAtLeast("success", FAILPOINT);
+    // all committed were reverted
+    // identify which task failed from the set
+    int failing = failures.getItem().id;
+    items.stream()
+        .filter(i -> i.id != failing)
+        .filter(i -> i.committed)
+        .forEach(Item::assertReverted);
+    // all reverted items are committed
+    items.stream().filter(i -> i.reverted)
+        .forEach(Item::assertCommitted);
+
+    // only one failure was triggered
+    failures.assertInvoked("failure event", 1);
+  }
+
+  @Test
+  public void testFailFastExceptions() throws Throwable {
+    intercept(IOException.class,
+        () -> builder()
+            .stopOnFailure()
+            .run(failingTask));
+    if (isParallel()) {
+      failingTask.assertInvokedAtLeast("stop fast", FAILPOINT);
+    } else {
+      failingTask.assertInvoked("stop fast", FAILPOINT);
+    }
+  }
+
+  @Test
+  public void testFailSlowExceptions() throws Throwable {
+    intercept(IOException.class,
+        () -> builder()
+            .run(failingTask));
+    failingTask.assertInvoked("continued through operations", ITEM_COUNT);
+    items.forEach(Item::assertCommittedOrFailed);
+  }
+
+  @Test
+  public void testFailFastExceptionsWithAbortFailure() throws Throwable {
+    CounterTask failFirst = new CounterTask("task", 1, Item::commit);
+    CounterTask a = new CounterTask("aborter", 1, Item::abort);
+    intercept(IOException.class,
+        () -> builder()
+            .stopOnFailure()
+            .abortWith(a)
+            .run(failFirst));
+    if (!isParallel()) {
+      // expect the other tasks to be aborted
+      a.assertInvokedAtLeast("abort", ITEM_COUNT - 1);
+    }
+  }
+
+  @Test
+  public void testFailFastExceptionsWithAbortFailureStopped() throws Throwable {
+    CounterTask failFirst = new CounterTask("task", 1, Item::commit);
+    CounterTask a = new CounterTask("aborter", 1, Item::abort);
+    intercept(IOException.class,
+        () -> builder()
+            .stopOnFailure()
+            .stopAbortsOnFailure()
+            .abortWith(a)
+            .run(failFirst));
+    if (!isParallel()) {
+      // expect the other tasks to be aborted
+      a.assertInvoked("abort", 1);
+    }
+  }
+
+  /**
+   * Fail the last one committed, all the rest will be reverted.
+   * The actual ID of the last task has to be picke dup from the
+   * failure callback, as in the pool it may be one of any.
+   */
+  @Test
+  public void testRevertAllSuppressed() throws Throwable {
+    CounterTask failLast = new CounterTask("task", ITEM_COUNT, Item::commit);
+
+    assertFailed(builder()
+            .suppressExceptions()
+            .stopOnFailure()
+            .revertWith(reverter)
+            .abortWith(aborter)
+            .onFailure(failures),
+        failLast);
+    failLast.assertInvoked("success", ITEM_COUNT);
+    int abCount = aborter.getCount();
+    int revCount = reverter.getCount();
+    assertEquals(ITEM_COUNT, 1 + abCount + revCount);
+    // identify which task failed from the set
+    int failing = failures.getItem().id;
+    // all committed were reverted
+    items.stream()
+        .filter(i -> i.id != failing)
+        .filter(i -> i.committed)
+        .forEach(Item::assertReverted);
+    items.stream()
+        .filter(i -> i.id != failing)
+        .filter(i -> !i.committed)
+        .forEach(Item::assertAborted);
+    // all reverted items are committed
+    items.stream().filter(i -> i.reverted)
+        .forEach(Item::assertCommitted);
+
+    // only one failure was triggered
+    failures.assertInvoked("failure event", 1);
+  }
+
+
+  /**
+   * The Item which tasks process.
+   */
+  private final class Item {
+
+    private final int id;
+
+    private final String text;
+
+    private volatile boolean committed, aborted, reverted, failed;
+
+    private Item(int item, String text) {
+      this.id = item;
+      this.text = text;
+    }
+
+    boolean commit() {
+      committed = true;
+      return true;
+    }
+
+    boolean abort() {
+      aborted = true;
+      return true;
+    }
+
+    boolean revert() {
+      reverted = true;
+      return true;
+    }
+
+    boolean fail() {
+      failed = true;
+      return true;
+    }
+
+    public Item assertCommitted() {
+      assertTrue(toString() + " was not committed in\n"
+              + itemsToString(),
+          committed);
+      return this;
+    }
+
+    public Item assertCommittedOrFailed() {
+      assertTrue(toString() + " was not committed nor failed in\n"
+              + itemsToString(),
+          committed || failed);
+      return this;
+    }
+
+    public Item assertAborted() {
+      assertTrue(toString() + " was not aborted in\n"
+              + itemsToString(),
+          aborted);
+      return this;
+    }
+
+    public Item assertReverted() {
+      assertTrue(toString() + " was not reverted in\n"
+              + itemsToString(),
+          reverted);
+      return this;
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder("Item{");
+      sb.append(String.format("[%02d]", id));
+      sb.append(", committed=").append(committed);
+      sb.append(", aborted=").append(aborted);
+      sb.append(", reverted=").append(reverted);
+      sb.append(", failed=").append(failed);
+      sb.append(", text=").append(text);
+      sb.append('}');
+      return sb.toString();
+    }
+  }
+
+  /**
+   * Class which can count invocations and, if limit > 0, will raise
+   * an exception on the specific invocation of {@link #note(Object)}
+   * whose count == limit.
+   */
+  private class BaseCounter {
+
+    private final AtomicInteger counter = new AtomicInteger(0);
+
+    private final int limit;
+
+    private final String name;
+
+    private Item item;
+
+    private final Optional<Function<Item, Boolean>> action;
+
+    /**
+     * Base counter, tracks items.
+     * @param name name for string/exception/logs.
+     * @param limit limit at which an exception is raised, 0 == never
+     * @param action optional action to invoke after the increment,
+     * before limit check
+     */
+    BaseCounter(String name,
+        int limit,
+        Function<Item, Boolean> action) {
+      this.name = name;
+      this.limit = limit;
+      this.action = Optional.ofNullable(action);
+    }
+
+    /**
+     * Apply the action to an item; log at info afterwards with both the
+     * before and after string values of the item.
+     * @param i item to process.
+     * @throws IOException failure in the action
+     */
+    void process(Item i) throws IOException {
+      this.item = i;
+      int count = counter.incrementAndGet();
+      if (limit == count) {
+        i.fail();
+        LOG.info("{}: Failed {}", this, i);
+        throw new IOException(String.format("%s: Limit %d reached for %s",
+            this, limit, i));
+      }
+      String before = i.toString();
+      action.map(a -> a.apply(i));
+      LOG.info("{}: {} -> {}", this, before, i);
+    }
+
+    int getCount() {
+      return counter.get();
+    }
+
+    Item getItem() {
+      return item;
+    }
+
+    void assertInvoked(String text, int expected) {
+      assertEquals(toString() + ": " + text, expected, getCount());
+    }
+
+    void assertInvokedAtLeast(String text, int expected) {
+      int actual = getCount();
+      assertTrue(toString() + ": " + text
+              + "-expected " + expected
+              + " invocations, but got " + actual
+              + " in " + itemsToString(),
+          expected <= actual);
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder(
+          "BaseCounter{");
+      sb.append("name='").append(name).append('\'');
+      sb.append(", count=").append(counter.get());
+      sb.append(", limit=").append(limit);
+      sb.append(", item=").append(item);
+      sb.append('}');
+      return sb.toString();
+    }
+  }
+
+  private final class CounterTask
+      extends BaseCounter implements TaskPool.Task<Item, IOException> {
+
+    private CounterTask(String name, int limit,
+        Function<Item, Boolean> action) {
+      super(name, limit, action);
+    }
+
+    @Override
+    public void run(Item item) throws IOException {
+      process(item);
+    }
+
+  }
+
+  private final class FailureCounter
+      extends BaseCounter
+      implements TaskPool.FailureTask<Item, IOException> {
+
+    private Exception exception;
+
+    private FailureCounter(String name, int limit,
+        Function<Item, Boolean> action) {
+      super(name, limit, action);
+    }
+
+    @Override
+    public void run(Item item, Exception ex) throws IOException {
+      process(item);
+      this.exception = ex;
+    }
+
+    private Exception getException() {
+      return exception;
+    }
+
+  }
+
+}