فهرست منبع

HADOOP-19330. S3A: Add LeakReporter; use in S3AInputStream (#7151)

If a file is opened for reading through the S3A connector
is not closed, then when garbage collection takes place

* An error message is reported at WARN, including the file name.
* A stack trace of where the stream was created is reported
  at INFO.
* A best-effort attempt is made to release any active HTTPS
  connection.
* The filesystem IOStatistic stream_leaks is incremented.

The intent is to make it easier to identify where streams
are being opened and not closed -as these consume resources
including often HTTPS connections from the connection pool
of limited size.

It MUST NOT be relied on as a way to clean up open
files/streams automatically; some of the normal actions of
the close() method are omitted.

Instead: view the warning messages and IOStatistics as a
sign of a problem, the stack trace as a way of identifying
what application code/library needs to be investigated.

Contributed by Steve Loughran
Steve Loughran 5 ماه پیش
والد
کامیت
7999db55da

+ 143 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/LeakReporter.java

@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.util.functional.RunnableRaisingIOE;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A class to report leaks of streams.
+ * <p>
+ * It is created during object creation, and closed during finalization.
+ * Predicates should be supplied for the {@link #isOpen} probe check if the
+ * resource is still open, and an operation to actually close the
+ * target.
+ */
+public class LeakReporter implements Closeable {
+
+  /**
+   * Name of logger used to report leaks: {@value}.
+   */
+  public static final String RESOURCE_LEAKS_LOG_NAME = "org.apache.hadoop.fs.resource.leaks";
+
+  /**
+   * Special log for leaked streams.
+   */
+  private static final Logger LEAK_LOG =
+      LoggerFactory.getLogger(RESOURCE_LEAKS_LOG_NAME);
+
+  /**
+   * Format string used to build the thread information: {@value}.
+   */
+  @VisibleForTesting
+  static final String THREAD_FORMAT =
+      "; thread: %s; id: %d";
+
+  /**
+   * Re-entrancy check.
+   */
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+
+  /**
+   * Predicate to check if the resource is open.
+   */
+  private final BooleanSupplier isOpen;
+
+  /**
+   * Action to close the resource.
+   */
+  private final RunnableRaisingIOE closeAction;
+
+  /**
+   * Stack trace of object creation; used to
+   * report of unclosed streams in finalize().
+   */
+  private final IOException leakException;
+
+  /**
+   * Constructor.
+   * <p>
+   * Validates the parameters and builds the stack;
+   * append "; thread: " + thread name.
+   * @param message error message
+   * @param isOpen open predicate
+   * @param closeAction action to close
+   */
+  public LeakReporter(
+      final String message,
+      final BooleanSupplier isOpen,
+      final RunnableRaisingIOE closeAction) {
+    this.isOpen = requireNonNull(isOpen);
+    this.closeAction = requireNonNull(closeAction);
+    // build the warning thread.
+    // This includes the error string to print, so as to avoid
+    // constructing objects in finalize().
+    this.leakException = new IOException(message
+        + String.format(THREAD_FORMAT,
+        Thread.currentThread().getName(),
+        Thread.currentThread().getId()));
+  }
+
+  /**
+   * Close the resource.
+   */
+  @Override
+  public void close() {
+    try {
+      if (!closed.getAndSet(true) && isOpen.getAsBoolean()) {
+        // log a warning with the creation stack
+        LEAK_LOG.warn(leakException.getMessage());
+        // The creation stack is logged at INFO, so that
+        // it is possible to configure logging to print
+        // the name of files left open, without printing
+        // the stacks. This is better for production use.
+
+        LEAK_LOG.info("stack", leakException);
+        closeAction.apply();
+      }
+    } catch (Exception e) {
+      LEAK_LOG.info("executing leak cleanup actions", e);
+    }
+  }
+
+  public IOException getLeakException() {
+    return leakException;
+  }
+
+  public boolean isClosed() {
+    return closed.get();
+  }
+
+  @Override
+  public String toString() {
+    return "LeakReporter{" +
+        "closed=" + closed.get() +
+        '}';
+  }
+}

+ 8 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java

@@ -40,6 +40,14 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceStability.Evolving
 @InterfaceStability.Evolving
 public final class StreamStatisticNames {
 public final class StreamStatisticNames {
 
 
+  /**
+   * Count of Stream leaks from an application which
+   * is not cleaning up correctly.
+   * Value :{@value}.
+   */
+  public static final String STREAM_LEAKS =
+      "stream_leaks";
+
   /**
   /**
    * Count of times the TCP stream was aborted.
    * Count of times the TCP stream was aborted.
    * Value: {@value}.
    * Value: {@value}.

+ 47 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/RunnableRaisingIOE.java

@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.functional;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+
+/**
+ * Runnable interface whose {@link #apply()} method may raise
+ * an IOE.
+ * The implementation of {@link Runnable#run} invokes this
+ * and converts any raised IOE into an {@link UncheckedIOException}.
+ */
+@FunctionalInterface
+public interface RunnableRaisingIOE extends Runnable {
+
+  /**
+   * Apply the operation.
+   * @throws IOException Any IO failure
+   */
+  void apply() throws IOException;
+
+  @Override
+  default void run() {
+    try {
+      apply();
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+}

+ 165 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestLeakReporter.java

@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import static org.apache.hadoop.fs.impl.LeakReporter.THREAD_FORMAT;
+import static org.apache.hadoop.test.GenericTestUtils.LogCapturer.captureLogs;
+
+public final class TestLeakReporter extends AbstractHadoopTestBase {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestLeakReporter.class);
+
+  /**
+   * Count of close calls.
+   */
+  private final AtomicInteger closeCount = new AtomicInteger();
+
+  /**
+   * Big test: creates a reporter, closes it.
+   * Verifies that the error message and stack traces is printed when
+   * open, and that the close callback was invoked.
+   * <p>
+   * After the first invocation, a second invocation is ignored.
+   */
+  @Test
+  public void testLeakInvocation() throws Throwable {
+
+    final String message = "<message>";
+    final LeakReporter reporter = new LeakReporter(message,
+        () -> true,
+        this::closed);
+
+    // store the old thread name and change it,
+    // so the log test can verify that the old thread name is printed.
+    String oldName = Thread.currentThread().getName();
+    Thread.currentThread().setName("thread");
+    // Capture the logs
+    GenericTestUtils.LogCapturer logs =
+        captureLogs(LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME));
+    expectClose(reporter, 1);
+
+    // check the log
+    logs.stopCapturing();
+    final String output = logs.getOutput();
+    LOG.info("output of leak log is {}", output);
+
+    final String threadInfo = String.format(THREAD_FORMAT,
+        oldName,
+        Thread.currentThread().getId());
+    // log auditing
+    Assertions.assertThat(output)
+        .describedAs("output from the logs")
+        .contains("WARN")
+        .contains(message)
+        .contains(Thread.currentThread().getName())
+        .contains(threadInfo)
+        .contains("TestLeakReporter.testLeakInvocation")
+        .contains("INFO")
+        .contains("stack");
+
+    // no reentrancy
+    expectClose(reporter, 1);
+  }
+
+  /**
+   * Expect the close operation to result in
+   * a value of the close count to be as expected.
+   * @param reporter leak reporter
+   * @param expected expected value after the close
+   */
+  private void expectClose(final LeakReporter reporter, final int expected) {
+    reporter.close();
+    assertCloseCount(expected);
+  }
+
+  /**
+   * Close operation: increments the counter.
+   */
+  private void closed() {
+    closeCount.incrementAndGet();
+  }
+
+  /**
+   * When the source is closed, no leak cleanup takes place.
+   */
+  @Test
+  public void testLeakSkipped() throws Throwable {
+
+    final LeakReporter reporter = new LeakReporter("<message>",
+        () -> false,
+        this::closed);
+    expectClose(reporter, 0);
+  }
+
+  /**
+   * If the probe raises an exception, the exception is swallowed
+   * and the close action is never invoked.
+   */
+  @Test
+  public void testProbeFailureSwallowed() throws Throwable {
+    final LeakReporter reporter = new LeakReporter("<message>",
+        this::raiseNPE,
+        this::closed);
+    expectClose(reporter, 0);
+  }
+
+  /**
+   * Any exception raised in the close action it is swallowed.
+   */
+  @Test
+  public void testCloseActionSwallowed() throws Throwable {
+    final LeakReporter reporter = new LeakReporter("<message>",
+        () -> true,
+        this::raiseNPE);
+    reporter.close();
+
+    Assertions.assertThat(reporter.isClosed())
+        .describedAs("reporter closed)")
+        .isTrue();
+  }
+
+  /**
+   * Always raises an NPE.
+   * @return never
+   */
+  private boolean raiseNPE() {
+    throw new NullPointerException("oops");
+  }
+
+  /**
+   * Assert that the value of {@link #closeCount} is as expected.
+   * @param ex expected.
+   */
+  private void assertCloseCount(final int ex) {
+    Assertions.assertThat(closeCount.get())
+        .describedAs("close count")
+        .isEqualTo(ex);
+  }
+}

+ 5 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -160,6 +160,7 @@ import org.apache.hadoop.fs.statistics.FileSystemStatisticNames;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.fs.statistics.IOStatisticsSource;
 import org.apache.hadoop.fs.statistics.IOStatisticsSource;
 import org.apache.hadoop.fs.statistics.IOStatisticsContext;
 import org.apache.hadoop.fs.statistics.IOStatisticsContext;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
 import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
 import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
 import org.apache.hadoop.fs.store.LogExactlyOnce;
 import org.apache.hadoop.fs.store.LogExactlyOnce;
 import org.apache.hadoop.fs.store.audit.AuditEntryPoint;
 import org.apache.hadoop.fs.store.audit.AuditEntryPoint;
@@ -5587,6 +5588,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     case AWS_S3_ACCESS_GRANTS_ENABLED:
     case AWS_S3_ACCESS_GRANTS_ENABLED:
       return s3AccessGrantsEnabled;
       return s3AccessGrantsEnabled;
 
 
+      // stream leak detection.
+    case StreamStatisticNames.STREAM_LEAKS:
+      return !prefetchEnabled;
+
     default:
     default:
       // is it a performance flag?
       // is it a performance flag?
       if (performanceFlags.hasCapability(capability)) {
       if (performanceFlags.hasCapability(capability)) {

+ 69 - 3
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

@@ -29,6 +29,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.List;
 import java.util.Optional;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.IntFunction;
 import java.util.function.IntFunction;
@@ -39,6 +40,8 @@ import software.amazon.awssdk.services.s3.model.GetObjectResponse;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
+import org.apache.hadoop.fs.impl.LeakReporter;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
 import org.apache.hadoop.util.Preconditions;
 import org.apache.hadoop.util.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -116,6 +119,9 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
    */
    */
   private static final int TMP_BUFFER_MAX_SIZE = 64 * 1024;
   private static final int TMP_BUFFER_MAX_SIZE = 64 * 1024;
 
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(S3AInputStream.class);
+
   /**
   /**
    * Atomic boolean variable to stop all ongoing vectored read operation
    * Atomic boolean variable to stop all ongoing vectored read operation
    * for this input stream. This will be set to true when the stream is
    * for this input stream. This will be set to true when the stream is
@@ -159,8 +165,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
   private final Optional<Long> fileLength;
   private final Optional<Long> fileLength;
 
 
   private final String uri;
   private final String uri;
-  private static final Logger LOG =
-      LoggerFactory.getLogger(S3AInputStream.class);
+
   private final S3AInputStreamStatistics streamStatistics;
   private final S3AInputStreamStatistics streamStatistics;
   private S3AInputPolicy inputPolicy;
   private S3AInputPolicy inputPolicy;
   private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
   private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
@@ -202,6 +207,12 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
   /** Aggregator used to aggregate per thread IOStatistics. */
   /** Aggregator used to aggregate per thread IOStatistics. */
   private final IOStatisticsAggregator threadIOStatistics;
   private final IOStatisticsAggregator threadIOStatistics;
 
 
+  /**
+   * Report of leaks.
+   * with report and abort unclosed streams in finalize().
+   */
+  private final LeakReporter leakReporter;
+
   /**
   /**
    * Create the stream.
    * Create the stream.
    * This does not attempt to open it; that is only done on the first
    * This does not attempt to open it; that is only done on the first
@@ -242,6 +253,60 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     this.boundedThreadPool = boundedThreadPool;
     this.boundedThreadPool = boundedThreadPool;
     this.vectoredIOContext = context.getVectoredIOContext();
     this.vectoredIOContext = context.getVectoredIOContext();
     this.threadIOStatistics = requireNonNull(ctx.getIOStatisticsAggregator());
     this.threadIOStatistics = requireNonNull(ctx.getIOStatisticsAggregator());
+    // build the leak reporter
+    this.leakReporter = new LeakReporter(
+        "Stream not closed while reading " + uri,
+        this::isStreamOpen,
+        () -> abortInFinalizer());
+  }
+
+  /**
+   * Finalizer.
+   * <p>
+   * Verify that the inner stream is closed.
+   * <p>
+   * If it is not, it means streams are being leaked in application code.
+   * Log a warning, including the stack trace of the caller,
+   * then abort the stream.
+   * <p>
+   * This does not attempt to invoke {@link #close()} as that is
+   * a more complex operation, and this method is being executed
+   * during a GC finalization phase.
+   * <p>
+   * Applications MUST close their streams; this is a defensive
+   * operation to return http connections and warn the end users
+   * that their applications are at risk of running out of connections.
+   *
+   * {@inheritDoc}
+   */
+  @Override
+  protected void finalize() throws Throwable {
+    leakReporter.close();
+    super.finalize();
+  }
+
+  /**
+   * Probe for stream being open.
+   * Not synchronized; the flag is volatile.
+   * @return true if the stream is still open.
+   */
+  private boolean isStreamOpen() {
+    return !closed;
+  }
+
+  /**
+   * Brute force stream close; invoked by {@link LeakReporter}.
+   * All exceptions raised are ignored.
+   */
+  private void abortInFinalizer() {
+    try {
+      // stream was leaked: update statistic
+      streamStatistics.streamLeaked();
+      // abort the stream. This merges statistics into the filesystem.
+      closeStream("finalize()", true, true).get();
+    } catch (InterruptedException | ExecutionException ignroed) {
+      /* ignore this failure shutdown */
+    }
   }
   }
 
 
   /**
   /**
@@ -710,7 +775,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
         forceAbort ? "abort" : "soft");
         forceAbort ? "abort" : "soft");
     boolean shouldAbort = forceAbort || remaining > readahead;
     boolean shouldAbort = forceAbort || remaining > readahead;
     CompletableFuture<Boolean> operation;
     CompletableFuture<Boolean> operation;
-    SDKStreamDrainer drainer = new SDKStreamDrainer(
+    SDKStreamDrainer<ResponseInputStream<GetObjectResponse>> drainer = new SDKStreamDrainer<>(
         uri,
         uri,
         wrappedStream,
         wrappedStream,
         shouldAbort,
         shouldAbort,
@@ -1357,6 +1422,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     switch (toLowerCase(capability)) {
     switch (toLowerCase(capability)) {
     case StreamCapabilities.IOSTATISTICS:
     case StreamCapabilities.IOSTATISTICS:
     case StreamCapabilities.IOSTATISTICS_CONTEXT:
     case StreamCapabilities.IOSTATISTICS_CONTEXT:
+    case StreamStatisticNames.STREAM_LEAKS:
     case StreamCapabilities.READAHEAD:
     case StreamCapabilities.READAHEAD:
     case StreamCapabilities.UNBUFFER:
     case StreamCapabilities.UNBUFFER:
     case StreamCapabilities.VECTOREDIO:
     case StreamCapabilities.VECTOREDIO:

+ 10 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java

@@ -862,6 +862,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
       this.filesystemStatistics = filesystemStatistics;
       this.filesystemStatistics = filesystemStatistics;
       IOStatisticsStore st = iostatisticsStore()
       IOStatisticsStore st = iostatisticsStore()
           .withCounters(
           .withCounters(
+              StreamStatisticNames.STREAM_LEAKS,
               StreamStatisticNames.STREAM_READ_ABORTED,
               StreamStatisticNames.STREAM_READ_ABORTED,
               StreamStatisticNames.STREAM_READ_BYTES_DISCARDED_ABORT,
               StreamStatisticNames.STREAM_READ_BYTES_DISCARDED_ABORT,
               StreamStatisticNames.STREAM_READ_CLOSED,
               StreamStatisticNames.STREAM_READ_CLOSED,
@@ -1126,6 +1127,15 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
       merge(true);
       merge(true);
     }
     }
 
 
+    /**
+     * Stream was leaked.
+     */
+    public void streamLeaked() {
+      increment(StreamStatisticNames.STREAM_LEAKS);
+      // merge as if closed.
+      merge(true);
+    }
+
     /**
     /**
      * {@inheritDoc}.
      * {@inheritDoc}.
      * As well as incrementing the {@code STREAM_READ_SEEK_POLICY_CHANGED}
      * As well as incrementing the {@code STREAM_READ_SEEK_POLICY_CHANGED}

+ 4 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java

@@ -312,6 +312,10 @@ public enum Statistic {
       StoreStatisticNames.OBJECT_PUT_BYTES_PENDING,
       StoreStatisticNames.OBJECT_PUT_BYTES_PENDING,
       "number of bytes queued for upload/being actively uploaded",
       "number of bytes queued for upload/being actively uploaded",
       TYPE_GAUGE),
       TYPE_GAUGE),
+  STREAM_LEAKS(
+      StreamStatisticNames.STREAM_LEAKS,
+      "Streams detected as not closed safely",
+      TYPE_COUNTER),
   STREAM_READ_ABORTED(
   STREAM_READ_ABORTED(
       StreamStatisticNames.STREAM_READ_ABORTED,
       StreamStatisticNames.STREAM_READ_ABORTED,
       "Count of times the TCP stream was aborted",
       "Count of times the TCP stream was aborted",

+ 4 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java

@@ -210,4 +210,8 @@ public interface S3AInputStreamStatistics extends AutoCloseable,
    */
    */
   DurationTracker initiateInnerStreamClose(boolean abort);
   DurationTracker initiateInnerStreamClose(boolean abort);
 
 
+  /**
+   * Stream was leaked.
+   */
+  default void streamLeaked() {};
 }
 }

+ 84 - 0
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md

@@ -485,6 +485,90 @@ If `storediag` doesn't connect to your S3 store, *nothing else will*.
 Based on the experience of people who field support calls, here are
 Based on the experience of people who field support calls, here are
 some of the main connectivity issues which cause problems.
 some of the main connectivity issues which cause problems.
 
 
+### <a name="Not-enough-connections"></a> Connection pool overloaded
+
+If more connections are needed than the HTTP connection pool has,
+then worker threads will block until one is freed.
+
+If the wait exceeds the time set in `fs.s3a.connection.acquisition.timeout`,
+the operation will fail with `"Timeout waiting for connection from pool`.
+
+This may be retried, but time has been lost, which results in slower operations.
+If queries suddenly gets slower as the number of active operations increase,
+then this is a possible cause.
+
+Fixes:
+
+Increase the value of `fs.s3a.connection.maximum`.
+This is the general fix on query engines such as Apache Spark, and Apache Impala
+which run many workers threads simultaneously, and do not keep files open past
+the duration of a single task within a larger query.
+
+It can also surface with applications which deliberately keep files open
+for extended periods.
+These should ideally call `unbuffer()` on the input streams.
+This will free up the connection until another read operation is invoked -yet
+still re-open faster than if `open(Path)` were invoked.
+
+Applications may also be "leaking" http connections by failing to
+`close()` them. This is potentially fatal as eventually the connection pool
+can get exhausted -at which point the program will no longer work.
+
+This can only be fixed in the application code: it is _not_ a bug in
+the S3A filesystem.
+
+1. Applications MUST call `close()` on an input stream when the contents of
+   the file are longer needed.
+2. If long-lived applications eventually fail with unrecoverable
+   `ApiCallTimeout` exceptions, they are not doing so.
+
+To aid in identifying the location of these leaks, when a JVM garbage
+collection releases an unreferenced `S3AInputStream` instance,
+it will log at `WARN` level that it has not been closed,
+listing the file URL, and the thread name + ID of the the thread
+which creating the file.
+The the stack trace of the `open()` call will be logged at `INFO`
+
+```
+2024-11-13 12:48:24,537 [Finalizer] WARN  resource.leaks (LeakReporter.java:close(114)) - Stream not closed while reading s3a://bucket/test/testFinalizer; thread: JUnit-testFinalizer; id: 11
+2024-11-13 12:48:24,537 [Finalizer] INFO  resource.leaks (LeakReporter.java:close(120)) - stack
+java.io.IOException: Stream not closed while reading s3a://bucket/test/testFinalizer; thread: JUnit-testFinalizer; id: 11
+    at org.apache.hadoop.fs.impl.LeakReporter.<init>(LeakReporter.java:101)
+    at org.apache.hadoop.fs.s3a.S3AInputStream.<init>(S3AInputStream.java:257)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.executeOpen(S3AFileSystem.java:1891)
+    at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:1841)
+    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:997)
+    at org.apache.hadoop.fs.s3a.ITestS3AInputStreamLeakage.testFinalizer(ITestS3AInputStreamLeakage.java:99)
+```
+
+It will also `abort()` the HTTP connection, freeing up space in the connection pool.
+This automated cleanup is _not_ a substitute for applications correctly closing
+input streams -it only happens during garbage collection, and this may not be
+rapid enough to prevent an application running out of connections.
+
+It is possible to stop these warning messages from being logged,
+by restricting the log `org.apache.hadoop.fs.resource.leaks` to
+only log at `ERROR` or above.
+This will also disable error logging for _all other resources whose leaks
+are detected.
+
+```properties
+log4j.logger.org.apache.hadoop.fs.s3a.connection.leaks=ERROR
+```
+
+To disable stack traces without the URI/thread information, set the log level to `WARN`
+
+```properties
+log4j.logger.org.apache.hadoop.fs.s3a.connection.leaks=WARN
+```
+
+This is better for production deployments: leakages are reported but
+stack traces only of relevance to the application developers are
+omitted.
+
+Finally, note that the filesystem and thread context IOStatistic `stream_leaks"` is updated;
+if these statistics are collected then the existence of leakages can be detected.
+
 ### <a name="inconsistent-config"></a> Inconsistent configuration across a cluster
 ### <a name="inconsistent-config"></a> Inconsistent configuration across a cluster
 
 
 All hosts in the cluster need to have the configuration secrets;
 All hosts in the cluster need to have the configuration secrets;

+ 161 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java

@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.lang.ref.WeakReference;
+import java.time.Duration;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
+import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_LEAKS;
+import static org.apache.hadoop.test.GenericTestUtils.LogCapturer.captureLogs;
+
+/**
+ * Test Stream leakage.
+ */
+public class ITestS3AInputStreamLeakage extends AbstractS3ATestBase {
+
+  /**
+   * How big a file to create?
+   */
+  public static final int FILE_SIZE = 1024;
+
+  public static final byte[] DATASET = dataset(FILE_SIZE, '0', 10);
+
+  /**
+   * Time to wait after a GC/finalize is triggered before looking at the log.
+   */
+  public static final long GC_DELAY = Duration.ofSeconds(1).toMillis();
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    assume("Stream leak detection not avaialable",
+        getFileSystem().hasCapability(STREAM_LEAKS));
+  }
+
+  /**
+   * This test forces a GC of an open file then verifies that the
+   * log contains the error message.
+   * <p>
+   * Care is needed here to ensure that no strong references are held to the
+   * stream, otherwise: no GC.
+   * <p>
+   * It also assumes that {@code System.gc()} will do enough of a treewalk to
+   * prepare the stream for garbage collection (a weak ref is used to verify
+   * that it was removed as a reference), and that
+   * {@code System.runFinalization()} will then
+   * invoke the finalization.
+   * <p>
+   * The finalize code runs its own thread "Finalizer"; this is async enough
+   * that assertions on log entries only work if there is a pause after
+   * finalization is triggered and the log is reviewed.
+   * <p>
+   * The stream leak counter of the FileSystem is also updated; this
+   * is verified.
+   * <p>
+   * Note: if the stream under test is not an S3AInputStream (i.e. is a prefetching one,
+   * this test is skipped. If/when the prefetching stream adds the same code,
+   * this check can be removed.
+   */
+  @Test
+  public void testFinalizer() throws Throwable {
+    Path path = methodPath();
+    final S3AFileSystem fs = getFileSystem();
+
+    ContractTestUtils.createFile(fs, path, true, DATASET);
+
+    // DO NOT use try-with-resources; this
+    // test MUST be able to remove all references
+    // to the stream
+    FSDataInputStream in = fs.open(path);
+
+    try {
+      Assertions.assertThat(in.hasCapability(STREAM_LEAKS))
+          .describedAs("Stream leak detection not supported in: " + in.getClass())
+          .isTrue();
+
+      Assertions.assertThat(in.read())
+          .describedAs("first byte read from %s", in)
+          .isEqualTo(DATASET[0]);
+
+      // get a weak ref so that after a GC we can look for it and verify it is gone
+      Assertions.assertThat(((S3AInputStream) in.getWrappedStream()).isObjectStreamOpen())
+          .describedAs("stream http connection status")
+          .isTrue();
+      // weak reference to track GC progress
+      WeakReference<S3AInputStream> wrs =
+          new WeakReference<>((S3AInputStream) in.getWrappedStream());
+
+      // Capture the logs
+      GenericTestUtils.LogCapturer logs =
+          captureLogs(LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME));
+
+      LOG.info("captured log");
+
+      // remove strong reference to the stream
+      in = null;
+      // force the gc.
+      System.gc();
+      // make sure the GC removed the S3AInputStream.
+      Assertions.assertThat(wrs.get())
+          .describedAs("weak stream reference wasn't GC'd")
+          .isNull();
+
+      // finalize
+      System.runFinalization();
+
+      // finalize is async, so add a brief wait for it to be called.
+      // without this the log may or may not be empty
+      Thread.sleep(GC_DELAY);
+      LOG.info("end of log");
+
+      // check the log
+      logs.stopCapturing();
+      final String output = logs.getOutput();
+      LOG.info("output of leak log is {}", output);
+      Assertions.assertThat(output)
+          .describedAs("output from the logs during GC")
+          .contains("drain or abort reason finalize()")  // stream release
+          .contains(path.toUri().toString())             // path
+          .contains(Thread.currentThread().getName())    // thread
+          .contains("testFinalizer");                    // stack
+
+      // verify that leakages are added to the FS statistics
+      assertThatStatisticCounter(fs.getIOStatistics(), STREAM_LEAKS)
+          .isEqualTo(1);
+    } finally {
+      if (in != null) {
+        IOUtils.cleanupWithLogger(LOG, in);
+      }
+    }
+  }
+}