浏览代码

HADOOP-18091. S3A auditing leaks memory through ThreadLocal references (#3930)

Adds a new map type WeakReferenceMap, which stores weak
references to values, and a WeakReferenceThreadMap subclass
to more closely resemble a thread local type, as it is a
map of threadId to value.

Construct it with a factory method and optional callback
for notification on loss and regeneration.

 WeakReferenceThreadMap<WrappingAuditSpan> activeSpan =
      new WeakReferenceThreadMap<>(
          (k) -> getUnbondedSpan(),
          this::noteSpanReferenceLost);

This is used in ActiveAuditManagerS3A for span tracking.

Relates to
* HADOOP-17511. Add an Audit plugin point for S3A
* HADOOP-18094. Disable S3A auditing by default.

Contributed by Steve Loughran.
Steve Loughran 3 年之前
父节点
当前提交
efdec92cab
共有 18 个文件被更改,包括 1345 次插入38 次删除
  1. 24 3
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/audit/CommonAuditContext.java
  2. 54 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java
  3. 261 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java
  4. 11 0
      hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
  5. 18 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/audit/TestCommonAuditContext.java
  6. 199 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWeakReferenceMap.java
  7. 1 1
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java
  8. 11 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/OperationAuditor.java
  9. 1 1
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/S3AAuditConstants.java
  10. 110 20
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java
  11. 3 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
  12. 2 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
  13. 2 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
  14. 8 7
      hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing.md
  15. 101 6
      hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing_architecture.md
  16. 130 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/MemoryHungryAuditor.java
  17. 406 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/impl/TestActiveAuditManagerThreadLeakage.java
  18. 3 0
      hadoop-tools/hadoop-aws/src/test/resources/log4j.properties

+ 24 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/audit/CommonAuditContext.java

@@ -24,6 +24,9 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Supplier;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
@@ -69,11 +72,16 @@ import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_THREAD1;
  * {@link #currentAuditContext()} to get the thread-local
  * context for the caller, which can then be manipulated.
  *
+ * For further information, especially related to memory consumption,
+ * read the document `auditing_architecture` in the `hadoop-aws` module.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
 public final class CommonAuditContext {
 
+  private static final Logger LOG = LoggerFactory.getLogger(
+      CommonAuditContext.class);
+
   /**
    * Process ID; currently built from UUID and timestamp.
    */
@@ -92,7 +100,7 @@ public final class CommonAuditContext {
    * Supplier operations must themselves be thread safe.
    */
   private final Map<String, Supplier<String>> evaluatedEntries =
-      new ConcurrentHashMap<>();
+      new ConcurrentHashMap<>(1);
 
   static {
     // process ID is fixed.
@@ -108,7 +116,7 @@ public final class CommonAuditContext {
    * the span is finalized.
    */
   private static final ThreadLocal<CommonAuditContext> ACTIVE_CONTEXT =
-      ThreadLocal.withInitial(() -> createInstance());
+      ThreadLocal.withInitial(CommonAuditContext::createInstance);
 
   private CommonAuditContext() {
   }
@@ -125,11 +133,21 @@ public final class CommonAuditContext {
 
   /**
    * Put a context entry dynamically evaluated on demand.
+   * Important: as these supplier methods are long-lived,
+   * the supplier function <i>MUST NOT</i> be part of/refer to
+   * any object instance of significant memory size.
+   * Applications SHOULD remove references when they are
+   * no longer needed.
+   * When logged at TRACE, prints the key and stack trace of the caller,
+   * to allow for debugging of any problems.
    * @param key key
    * @param value new value
    * @return old value or null
    */
   public Supplier<String> put(String key, Supplier<String> value) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Adding context entry {}", key, new Exception(key));
+    }
     return evaluatedEntries.put(key, value);
   }
 
@@ -138,6 +156,9 @@ public final class CommonAuditContext {
    * @param key key
    */
   public void remove(String key) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Remove context entry {}", key);
+    }
     evaluatedEntries.remove(key);
   }
 
@@ -168,7 +189,7 @@ public final class CommonAuditContext {
   private void init() {
 
     // thread 1 is dynamic
-    put(PARAM_THREAD1, () -> currentThreadID());
+    put(PARAM_THREAD1, CommonAuditContext::currentThreadID);
   }
 
   /**

+ 54 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java

@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.util.WeakReferenceMap;
+
+/**
+ * A WeakReferenceMap for threads.
+ * @param <V> value type of the map
+ */
+public class WeakReferenceThreadMap<V> extends WeakReferenceMap<Long, V> {
+
+  public WeakReferenceThreadMap(final Function<? super Long, ? extends V> factory,
+      @Nullable final Consumer<? super Long> referenceLost) {
+    super(factory, referenceLost);
+  }
+
+  public V getForCurrentThread() {
+    return get(currentThreadId());
+  }
+
+  public V removeForCurrentThread() {
+    return remove(currentThreadId());
+  }
+
+  public long currentThreadId() {
+    return Thread.currentThread().getId();
+  }
+
+  public V setForCurrentThread(V newVal) {
+    return put(currentThreadId(), newVal);
+  }
+
+}

+ 261 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java

@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.lang.ref.WeakReference;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A map of keys type K to objects of type V which uses weak references,
+ * so does lot leak memory through long-lived references
+ * <i>at the expense of losing references when GC takes place.</i>.
+ *
+ * This class is intended be used instead of ThreadLocal storage when
+ * references are to be cleaned up when the instance holding.
+ * In this use case, the key is the Long key.
+ *
+ * Concurrency.
+ * The class assumes that map entries are rarely contended for when writing,
+ * and that not blocking other threads is more important than atomicity.
+ * - a ConcurrentHashMap is used to map keys to weak references, with
+ *   all its guarantees.
+ * - there is no automatic pruning.
+ * - see {@link #create(Object)} for the concurrency semantics on entry creation.
+ */
+@InterfaceAudience.Private
+public class WeakReferenceMap<K, V> {
+
+  /**
+   * The reference map.
+   */
+  private final Map<K, WeakReference<V>> map = new ConcurrentHashMap<>();
+
+  /**
+   * Supplier of new instances.
+   */
+  private final Function<? super K, ? extends V> factory;
+
+  /**
+   * Nullable callback when a get on a key got a weak reference back.
+   * The assumption is that this is for logging/stats, which is why
+   * no attempt is made to use the call as a supplier of a new value.
+   */
+  private final Consumer<? super K> referenceLost;
+
+  /**
+   * Counter of references lost.
+   */
+  private final AtomicLong referenceLostCount = new AtomicLong();
+
+  /**
+   * Counter of entries created.
+   */
+  private final AtomicLong entriesCreatedCount = new AtomicLong();
+
+  /**
+   * instantiate.
+   * @param factory supplier of new instances
+   * @param referenceLost optional callback on lost references.
+   */
+  public WeakReferenceMap(
+      Function<? super K, ? extends V> factory,
+      @Nullable final Consumer<? super K> referenceLost) {
+
+    this.factory = requireNonNull(factory);
+    this.referenceLost = referenceLost;
+  }
+
+  @Override
+  public String toString() {
+    return "WeakReferenceMap{" +
+        "size=" + size() +
+        ", referenceLostCount=" + referenceLostCount +
+        ", entriesCreatedCount=" + entriesCreatedCount +
+        '}';
+  }
+
+  /**
+   * Map size.
+   * @return the current map size.
+   */
+  public int size() {
+    return map.size();
+  }
+
+  /**
+   * Clear all entries.
+   */
+  public void clear() {
+    map.clear();
+  }
+
+  /**
+   * look up the value, returning the possibly empty weak reference
+   * to a value, or null if no value was found.
+   * @param key key to look up
+   * @return null if there is no entry, a weak reference if found
+   */
+  public WeakReference<V> lookup(K key) {
+    return map.get(key);
+  }
+
+  /**
+   * Get the value, creating if needed.
+   * @param key key.
+   * @return an instance.
+   */
+  public V get(K key) {
+    final WeakReference<V> current = lookup(key);
+    V val = resolve(current);
+    if (val != null) {
+      // all good.
+      return  val;
+    }
+
+    // here, either no ref, or the value is null
+    if (current != null) {
+      noteLost(key);
+    }
+    return create(key);
+  }
+
+  /**
+   * Create a new instance under a key.
+   * The instance is created, added to the map and then the
+   * map value retrieved.
+   * This ensures that the reference returned is that in the map,
+   * even if there is more than one entry being created at the same time.
+   * @param key key
+   * @return the value
+   */
+  public V create(K key) {
+    entriesCreatedCount.incrementAndGet();
+    WeakReference<V> newRef = new WeakReference<>(
+        requireNonNull(factory.apply(key)));
+    map.put(key, newRef);
+    return map.get(key).get();
+  }
+
+  /**
+   * Put a value under the key.
+   * A null value can be put, though on a get() call
+   * a new entry is generated
+   *
+   * @param key key
+   * @param value value
+   * @return any old non-null reference.
+   */
+  public V put(K key, V value) {
+    return resolve(map.put(key, new WeakReference<>(value)));
+  }
+
+  /**
+   * Remove any value under the key.
+   * @param key key
+   * @return any old non-null reference.
+   */
+  public V remove(K key) {
+    return resolve(map.remove(key));
+  }
+
+  /**
+   * Does the map have a valid reference for this object?
+   * no-side effects: there's no attempt to notify or cleanup
+   * if the reference is null.
+   * @param key key to look up
+   * @return true if there is a valid reference.
+   */
+  public boolean containsKey(K key) {
+    final WeakReference<V> current = lookup(key);
+    return resolve(current) != null;
+  }
+
+  /**
+   * Given a possibly null weak reference, resolve
+   * its value.
+   * @param r reference to resolve
+   * @return the value or null
+   */
+  private V resolve(WeakReference<V> r) {
+    return r == null ? null : r.get();
+  }
+
+  /**
+   * Prune all null weak references, calling the referenceLost
+   * callback for each one.
+   *
+   * non-atomic and non-blocking.
+   * @return the number of entries pruned.
+   */
+  public int prune() {
+    int count = 0;
+    final Iterator<Map.Entry<K, WeakReference<V>>> it = map.entrySet().iterator();
+    while (it.hasNext()) {
+      final Map.Entry<K, WeakReference<V>> next = it.next();
+      if (next.getValue().get() == null) {
+        it.remove();
+        count++;
+        noteLost(next.getKey());
+      }
+    }
+    return count;
+  }
+
+  /**
+   * Notify the reference lost callback.
+   * @param key key of lost reference
+   */
+  private void noteLost(final K key) {
+    // incrment local counter
+    referenceLostCount.incrementAndGet();
+
+    // and call any notification function supplied in the constructor
+    if (referenceLost != null) {
+      referenceLost.accept(key);
+    }
+  }
+
+  /**
+   * Get count of references lost as detected
+   * during prune() or get() calls.
+   * @return count of references lost
+   */
+  public final long getReferenceLostCount() {
+    return referenceLostCount.get();
+  }
+
+  /**
+   * Get count of entries created on demand.
+   * @return count of entries created
+   */
+  public final long getEntriesCreatedCount() {
+    return entriesCreatedCount.get();
+  }
+}

+ 11 - 0
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -2025,6 +2025,17 @@
   </description>
 </property>
 
+<!--
+The switch to turn S3A auditing on or off.
+-->
+<property>
+  <name>fs.s3a.audit.enabled</name>
+  <value>true</value>
+  <description>
+    Should auditing of S3A requests be enabled?
+  </description>
+</property>
+
   <!-- Azure file system properties -->
 <property>
   <name>fs.AbstractFileSystem.wasb.impl</name>

+ 18 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/audit/TestCommonAuditContext.java

@@ -132,6 +132,15 @@ public class TestCommonAuditContext extends AbstractHadoopTestBase {
         .describedAs("Value of context element %s", key)
         .isNotBlank();
   }
+  /**
+   * Assert a context value is null.
+   * @param key key to look up
+   */
+  private void assertContextValueIsNull(final String key) {
+    assertThat(context.get(key))
+        .describedAs("Value of context element %s", key)
+        .isNull();
+  }
 
   @Test
   public void testNoteEntryPoint() throws Throwable {
@@ -158,4 +167,13 @@ public class TestCommonAuditContext extends AbstractHadoopTestBase {
     return anAssert;
   }
 
+  @Test
+  public void testAddRemove() throws Throwable {
+    final String key = "testAddRemove";
+    assertContextValueIsNull(key);
+    context.put(key, key);
+    assertContextValue(key).isEqualTo(key);
+    context.remove(key);
+    assertContextValueIsNull(key);
+  }
 }

+ 199 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWeakReferenceMap.java

@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+/**
+ * Test {@link WeakReferenceMap}.
+ * There's no attempt to force GC here, so the tests are
+ * more about the basic behavior not the handling of empty references.
+ */
+public class TestWeakReferenceMap extends AbstractHadoopTestBase {
+
+  public static final String FACTORY_STRING = "recreated %d";
+
+  /**
+   * The map to test.
+   */
+  private WeakReferenceMap<Integer, String> referenceMap;
+
+  /**
+   * List of references notified of loss.
+   */
+  private List<Integer> lostReferences;
+
+  @Before
+  public void setup() {
+    lostReferences = new ArrayList<>();
+    referenceMap = new WeakReferenceMap<>(
+        this::factory,
+        this::referenceLost);
+  }
+
+  /**
+   * Reference lost callback.
+   * @param key key lost
+   */
+  private void referenceLost(Integer key) {
+    lostReferences.add(key);
+  }
+
+
+  /**
+   * Basic insertions and lookups of those values.
+   */
+  @Test
+  public void testBasicOperationsWithValidReferences() {
+
+    referenceMap.put(1, "1");
+    referenceMap.put(2, "2");
+    assertMapSize(2);
+    assertMapContainsKey(1);
+    assertMapEntryEquals(1, "1");
+    assertMapEntryEquals(2, "2");
+    // overwrite
+    referenceMap.put(1, "3");
+    assertMapEntryEquals(1, "3");
+
+    // remove an entry
+    referenceMap.remove(1);
+    assertMapDoesNotContainKey(1);
+    assertMapSize(1);
+
+    // clear the map
+    referenceMap.clear();
+    assertMapSize(0);
+  }
+
+  /**
+   * pruning removes null entries, leaves the others alone.
+   */
+  @Test
+  public void testPruneNullEntries() {
+    referenceMap.put(1, "1");
+    assertPruned(0);
+    referenceMap.put(2, null);
+    assertMapSize(2);
+    assertPruned(1);
+    assertMapSize(1);
+    assertMapDoesNotContainKey(2);
+    assertMapEntryEquals(1, "1");
+    assertLostCount(1);
+  }
+
+  /**
+   * Demand create entries.
+   */
+  @Test
+  public void testDemandCreateEntries() {
+
+    // ask for an unknown key and expect a generated value
+    assertMapEntryEquals(1, factory(1));
+    assertMapSize(1);
+    assertMapContainsKey(1);
+    assertLostCount(0);
+
+    // an empty ref has the same outcome
+    referenceMap.put(2, null);
+    assertMapEntryEquals(2, factory(2));
+    // but the lost coun goes up
+    assertLostCount(1);
+
+  }
+
+  /**
+   * Assert that the value of a map entry is as expected.
+   * Will trigger entry creation if the key is absent.
+   * @param key key
+   * @param val expected valued
+   */
+  private void assertMapEntryEquals(int key, String val) {
+    Assertions.assertThat(referenceMap.get(key))
+        .describedAs("map enty of key %d", key)
+        .isEqualTo(val);
+  }
+
+  /**
+   * Assert that a map entry is present.
+   * @param key key
+   */
+  private void assertMapContainsKey(int key) {
+    Assertions.assertThat(referenceMap.containsKey(key))
+        .describedAs("map enty of key %d should be present", key)
+        .isTrue();
+  }
+
+  /**
+   * Assert that a map entry is not present.
+   * @param key key
+   */
+  private void assertMapDoesNotContainKey(int key) {
+    Assertions.assertThat(referenceMap.containsKey(key))
+        .describedAs("map enty of key %d should be absent", key)
+        .isFalse();
+  }
+
+  /**
+   * Assert map size.
+   * @param size expected size.
+   */
+  private void assertMapSize(int size) {
+    Assertions.assertThat(referenceMap.size())
+        .describedAs("size of map %s", referenceMap)
+        .isEqualTo(size);
+  }
+
+  /**
+   * Assert prune returned the given count.
+   * @param count expected count.
+   */
+  private void assertPruned(int count) {
+    Assertions.assertThat(referenceMap.prune())
+        .describedAs("number of entries pruned from map %s", referenceMap)
+        .isEqualTo(count);
+  }
+
+  /**
+   * Assert number of entries lost matches expected count.
+   * @param count expected count.
+   */
+  private void assertLostCount(int count) {
+    Assertions.assertThat(lostReferences)
+        .describedAs("number of entries lost from map %s", referenceMap)
+        .hasSize(count);
+  }
+
+  /**
+   * Factory operation.
+   * @param key map key
+   * @return a string
+   */
+  private String factory(Integer key) {
+    return String.format(FACTORY_STRING, key);
+  }
+
+}

+ 1 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java

@@ -210,7 +210,7 @@ public class AWSRequestAnalyzer {
    * @param request request
    * @return true if the transfer manager creates them.
    */
-  public static final boolean
+  public static boolean
       isRequestNotAlwaysInSpan(final Object request) {
     return request instanceof CopyPartRequest
         || request instanceof CompleteMultipartUploadRequest

+ 11 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/OperationAuditor.java

@@ -70,4 +70,15 @@ public interface OperationAuditor extends Service,
    * @return ID
    */
   String getAuditorId();
+
+  /**
+   * Span reference lost from GC operations.
+   * This is only called when an attempt is made to retrieve on
+   * the active thread or when a prune operation is cleaning up.
+   *
+   * @param threadId thread ID.
+   */
+  default void noteSpanReferenceLost(long threadId) {
+
+  }
 }

+ 1 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/S3AAuditConstants.java

@@ -44,7 +44,7 @@ public final class S3AAuditConstants {
    * Default auditing flag.
    * Value: {@value}.
    */
-  public static final boolean AUDIT_ENABLED_DEFAULT = false;
+  public static final boolean AUDIT_ENABLED_DEFAULT = true;
 
 
   /**

+ 110 - 20
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java

@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.amazonaws.AmazonWebServiceRequest;
 import com.amazonaws.HandlerContextAware;
@@ -33,16 +34,17 @@ import com.amazonaws.handlers.HandlerAfterAttemptContext;
 import com.amazonaws.handlers.HandlerBeforeAttemptContext;
 import com.amazonaws.handlers.RequestHandler2;
 import com.amazonaws.http.HttpResponse;
-import com.amazonaws.services.s3.transfer.Transfer;
 import com.amazonaws.services.s3.transfer.internal.TransferStateChangeListener;
 import org.apache.hadoop.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.impl.WeakReferenceThreadMap;
 import org.apache.hadoop.fs.s3a.S3AFileStatus;
 import org.apache.hadoop.fs.s3a.Statistic;
 import org.apache.hadoop.fs.s3a.audit.AWSAuditEventCallbacks;
@@ -88,6 +90,11 @@ import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.AUDIT_REQUEST_HAN
  * then the IOStatistics counter for {@code AUDIT_FAILURE}
  * is incremented.
  *
+ * Uses the WeakReferenceThreadMap to store spans for threads.
+ * Provided a calling class retains a reference to the span,
+ * the active span will be retained.
+ *
+ *
  */
 @InterfaceAudience.Private
 public final class ActiveAuditManagerS3A
@@ -111,6 +118,14 @@ public final class ActiveAuditManagerS3A
   public static final String NOT_A_WRAPPED_SPAN
       = "Span attached to request is not a wrapped span";
 
+  /**
+   * Arbitrary threshold for triggering pruning on deactivation.
+   * High enough it doesn't happen very often, low enough
+   * that it will happen regularly on a busy system.
+   * Value: {@value}.
+   */
+  static final int PRUNE_THRESHOLD = 10_000;
+
   /**
    * Audit service.
    */
@@ -127,12 +142,27 @@ public final class ActiveAuditManagerS3A
    */
   private WrappingAuditSpan unbondedSpan;
 
+  /**
+   * How many spans have to be deactivated before a prune is triggered?
+   * Fixed as a constant for now unless/until some pressing need
+   * for it to be made configurable ever surfaces.
+   */
+  private final int pruneThreshold = PRUNE_THRESHOLD;
+
+  /**
+   * Count down to next pruning.
+   */
+  private final AtomicInteger deactivationsBeforePrune = new AtomicInteger();
+
   /**
    * Thread local span. This defaults to being
    * the unbonded span.
    */
-  private final ThreadLocal<WrappingAuditSpan> activeSpan =
-      ThreadLocal.withInitial(() -> getUnbondedSpan());
+
+  private final WeakReferenceThreadMap<WrappingAuditSpan> activeSpanMap =
+      new WeakReferenceThreadMap<>(
+          (k) -> getUnbondedSpan(),
+          this::noteSpanReferenceLost);
 
   /**
    * Destination for recording statistics, especially duration/count of
@@ -147,6 +177,7 @@ public final class ActiveAuditManagerS3A
   public ActiveAuditManagerS3A(final IOStatisticsStore iostatistics) {
     super("ActiveAuditManagerS3A");
     this.ioStatisticsStore = iostatistics;
+    this.deactivationsBeforePrune.set(pruneThreshold);
   }
 
   @Override
@@ -178,6 +209,13 @@ public final class ActiveAuditManagerS3A
     LOG.debug("Started audit service {}", auditor);
   }
 
+  @Override
+  protected void serviceStop() throws Exception {
+    // clear all references.
+    activeSpanMap.clear();
+    super.serviceStop();
+  }
+
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder(super.toString());
@@ -225,7 +263,7 @@ public final class ActiveAuditManagerS3A
    * @return the active WrappingAuditSpan
    */
   private WrappingAuditSpan activeSpan() {
-    return activeSpan.get();
+    return activeSpanMap.getForCurrentThread();
   }
 
   /**
@@ -247,13 +285,66 @@ public final class ActiveAuditManagerS3A
    */
   private WrappingAuditSpan switchToActiveSpan(WrappingAuditSpan span) {
     if (span != null && span.isValidSpan()) {
-      activeSpan.set(span);
+      activeSpanMap.setForCurrentThread(span);
     } else {
-      activeSpan.set(unbondedSpan);
+      activeSpanMap.removeForCurrentThread();
     }
     return activeSpan();
   }
 
+  /**
+   * Span reference lost from GC operations.
+   * This is only called when an attempt is made to retrieve on
+   * the active thread or when a prune operation is cleaning up.
+   *
+   * @param threadId thread ID.
+   */
+  private void noteSpanReferenceLost(long threadId) {
+    auditor.noteSpanReferenceLost(threadId);
+  }
+
+  /**
+   * Prune all null weak references, calling the referenceLost
+   * callback for each one.
+   *
+   * non-atomic and non-blocking.
+   * @return the number of entries pruned.
+   */
+  @VisibleForTesting
+  int prune() {
+    return activeSpanMap.prune();
+  }
+
+  /**
+   * remove the span from the reference map, shrinking the map in the process.
+   * if/when a new span is activated in the thread, a new entry will be created.
+   * and if queried for a span, the unbounded span will be automatically
+   * added to the map for this thread ID.
+   *
+   */
+  @VisibleForTesting
+  boolean removeActiveSpanFromMap() {
+    // remove from the map
+    activeSpanMap.removeForCurrentThread();
+    if (deactivationsBeforePrune.decrementAndGet() == 0) {
+      // trigger a prune
+      activeSpanMap.prune();
+      deactivationsBeforePrune.set(pruneThreshold);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Get the map of threads to active spans; allows
+   * for testing of weak reference resolution after GC.
+   * @return the span map
+   */
+  @VisibleForTesting
+  WeakReferenceThreadMap<WrappingAuditSpan> getActiveSpanMap() {
+    return activeSpanMap;
+  }
+
   /**
    * The Span ID in the audit manager is the ID of the auditor,
    * which can be used in the filesystem toString() method
@@ -331,13 +422,7 @@ public final class ActiveAuditManagerS3A
   @Override
   public TransferStateChangeListener createStateChangeListener() {
     final WrappingAuditSpan span = activeSpan();
-    return new TransferStateChangeListener() {
-      @Override
-      public void transferStateChanged(final Transfer transfer,
-          final Transfer.TransferState state) {
-        switchToActiveSpan(span);
-      }
-    };
+    return (transfer, state) -> switchToActiveSpan(span);
   }
 
   @Override
@@ -641,16 +726,21 @@ public final class ActiveAuditManagerS3A
      */
     @Override
     public void deactivate() {
-      // no-op for invalid spans,
+
+      // span is inactive; ignore
+      if (!isActive()) {
+        return;
+      }
+      // skipped for invalid spans,
       // so as to prevent the unbounded span from being closed
       // and everything getting very confused.
-      if (!isValid || !isActive()) {
-        return;
+      if (isValid) {
+        // deactivate the span
+        span.deactivate();
       }
-      // deactivate the span
-      span.deactivate();
-      // and go to the unbounded one.
-      switchToActiveSpan(getUnbondedSpan());
+      // remove the span from the reference map,
+      // sporadically triggering a prune operation.
+      removeActiveSpanFromMap();
     }
 
     /**

+ 3 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java

@@ -904,6 +904,8 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter
       jobCompleted(false);
       abortJobInternal(context, true);
       throw e;
+    } finally {
+      resetCommonContext();
     }
   }
 
@@ -946,6 +948,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter
     } finally {
       destroyThreadPool();
       cleanupStagingDirs();
+      resetCommonContext();
     }
   }
 

+ 2 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java

@@ -169,6 +169,7 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
       // delete the task attempt so there's no possibility of a second attempt
       deleteTaskAttemptPathQuietly(context);
       destroyThreadPool();
+      resetCommonContext();
     }
     getCommitOperations().taskCompleted(true);
     LOG.debug("aggregate statistics\n{}",
@@ -252,6 +253,7 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
           attemptPath.getFileSystem(context.getConfiguration()),
           attemptPath, true);
       destroyThreadPool();
+      resetCommonContext();
     }
   }
 

+ 2 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java

@@ -600,6 +600,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
       throw e;
     } finally {
       destroyThreadPool();
+      resetCommonContext();
     }
     getCommitOperations().taskCompleted(true);
   }
@@ -739,6 +740,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
       throw e;
     } finally {
       destroyThreadPool();
+      resetCommonContext();
     }
   }
 

+ 8 - 7
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing.md

@@ -24,15 +24,17 @@ this document covers its use.
 
 ## Important: Auditing is disabled by default
 
-Due to a memory leak from the use of `ThreadLocal` fields, this auditing feature leaks memory as S3A filesystem
-instances are created and deleted.
-This causes problems in long-lived processes which either do not re-use filesystem
+Due to a memory leak from the use of `ThreadLocal` fields, this auditing feature
+leaked memory as S3A filesystem instances were created and deleted.
+This caused problems in long-lived processes which either do not re-use filesystem
 instances, or attempt to delete all instances belonging to specific users.
 See [HADOOP-18091](https://issues.apache.org/jira/browse/HADOOP-18091) _S3A auditing leaks memory through ThreadLocal references_.
 
-To avoid these memory leaks, auditing is disabled by default.
+To avoid these memory leaks, auditing was disabled by default in the hadoop 3.3.2 release.
 
-To turn auditing on, set `fs.s3a.audit.enabled` to `true`.
+As these memory leaks have now been fixed, auditing has been re-enabled.
+
+To disable it, set `fs.s3a.audit.enabled` to `false`.
 
 ## Auditing workflow
 
@@ -84,7 +86,7 @@ Other auditor classes may be used instead.
 
 | Option | Meaning | Default Value |
 |--------|---------|---------------|
-| `fs.s3a.audit.enabled` | Is auditing enabled | `false` |
+| `fs.s3a.audit.enabled` | Is auditing enabled? | `true` |
 | `fs.s3a.audit.service.classname` | Auditor classname | `org.apache.hadoop.fs.s3a.audit.impl.LoggingAuditor` |
 | `fs.s3a.audit.request.handlers` | List of extra subclasses of AWS SDK RequestHandler2 to include in handler chain | `""` |
 | `fs.s3a.audit.referrer.enabled` | Logging auditor to publish the audit information in the HTTP Referrer header | `true` |
@@ -138,7 +140,6 @@ The Logging Auditor is enabled by providing its classname in the option
 </property>
 ```
 
-
 To print auditing events in the local client logs, set the associated Log4J log
 to log at debug:
 

+ 101 - 6
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing_architecture.md

@@ -119,16 +119,78 @@ The auditor then creates and returns a span for the specific operation.
 The AuditManagerS3A will automatically activate the span returned by the auditor
 (i.e. assign it the thread local variable tracking the active span in each thread).
 
-### Memory Leakage through `ThreadLocal` use
+### Memory Leakage through `ThreadLocal` misuse
 
-This architecture contains a critical defect,
+The original implementation of the integration with the S3AFileSystem class
+contained a critical defect,
 [HADOOP-18091](https://issues.apache.org/jira/browse/HADOOP-18091) _S3A auditing leaks memory through ThreadLocal references_.
 
-The code was written assuming that when the `ActiveAuditManagerS3A` service is
-stopped, it's `ThreadLocal` fields would be freed.
-In fact, they are retained until the threads with references are terminated.
+The original code was written with the assumption that when the `ActiveAuditManagerS3A` service was
+garbage collected, references in its `ThreadLocal` field would be freed.
+In fact, they are retained until all threads with references are terminated.
+If any long-lived thread had performed an s3 operation which created a span,
+a reference back to the audit manager instance was created
+*whose lifetime was that of the thread*
+
+In short-lived processes, and long-lived processes where a limited set of
+`S3AFileSystem` instances were reused, this had no adverse effect.
+Indeed, if the filesystem instances were retained in the cache until
+the process was shut down, there would be strong references to the owning
+`S3AFileSystem` instance anyway.
+
+Where it did have problems was when the following conditions were met
+1. Process was long-lived.
+2. Long-lived threads in the process invoked filesystem operations on `s3a://` URLs.
+3. Either `S3AFileSystem` instances were created repeatedly, rather than retrieved
+   from the cache of active instances.
+4. Or, after a query for a specific user was completed,
+   `FileSystem.closeAllForUGI(UserGroupInformation)` was invoked to remove all
+   cached FS instances of that user.
+
+Conditions 1, 2 and 4 are exactly those which long-lived Hive services can
+encounter.
+
+_Auditing was disabled by default until a fix was implemented._
+
+The memory leak has been fixed using a new class `org.apache.hadoop.util.WeakReferenceMap`
+to store a map of thread IDs to active spans. When the S3A filesystem is closed,
+its audit manager service is stopped and all references to spans removed from the
+map of thread ID to span.
+
+Weak References are used for the map so that span references do not consume memory even if
+threads are terminated without resetting the span reference of that thread.
+There is therefore a theoretical risk that if a garbage collection takes place during
+execution of a spanned operation, the reference will be lost.
+
+This is not considered an issue as all bounded entry points into the S3A filesystem
+retain a strong reference to their audit span.
+
+All entry points which return an object which can invoke s3 operations (input and output
+streams, list iterators, etc.) also retain a strong reference to their span, a reference
+they activate before performing S3 operations.
+A factory method is provided to demand-generate a new span if, somehow, these conditions
+are not met. The "unbounded span" is used here.
+Except in deployments where `fs.s3a.audit.reject.out.of.span.operations` is true,
+invoking S3 operations within the unbounded span are permitted.
+That option is set to `true` within S3A test suites.
+Therefore it is unlikely that any operations are invoked in unbounded spans except
+for the special case of copy operations invoked by the transfer manager threads.
+Those are already ignored in the logging auditor, whose unbounded span ignores
+requests which `AWSRequestAnalyzer.isRequestNotAlwaysInSpan()` indicates
+may happen outside of a span.
+This is restricted to bucket location probes possibly performed by the SDK
+on instantiation, and copy part/complete calls.
+
+
+```java
+  public static boolean
+      isRequestNotAlwaysInSpan(final Object request) {
+    return request instanceof CopyPartRequest
+        || request instanceof CompleteMultipartUploadRequest
+        || request instanceof GetBucketLocationRequest;
+  }
+```
 
-This is why auditing is now disabled by default until a fix is implemented.
 
 ### Class `org.apache.hadoop.fs.audit.CommonAuditContext`
 
@@ -149,6 +211,39 @@ Spans may be used on different thread from that which they were created.
 Spans MUST always use the values from the `currentAuditContext()` in the creation
 thread.
 
+#### Memory Usage of `CommonAuditContext`
+
+The `CommonAuditContext` map has a `ThreadLocal` field to store global
+information which is intended to span multiple operations across multiple
+filesystems, for example the MapReduce or Spark job ID, which is set
+in the S3A committers.
+
+Applications and Hadoop code MUST NOT attach context entries
+which directly or indirectly consumes lots of memory, as the life
+of that memory use will become that of the thread.
+
+Applications and Hadoop code SHOULD remove context entries when
+no-longer needed.
+
+If memory leakage is suspected here, set the log
+`org.apache.hadoop.fs.audit.CommonAuditContext` to `TRACE`
+to log the origin of operations which add log entries.
+
+This will produce a log entry whose stack trace will show the caller chain.
+```
+2022-01-26 16:10:28,384 TRACE audit.CommonAuditContext (CommonAuditContext.java:put(149)) - Adding context entry t1
+java.lang.Exception: t1
+    at org.apache.hadoop.fs.audit.CommonAuditContext.put(CommonAuditContext.java:149)
+    at org.apache.hadoop.fs.audit.CommonAuditContext.init(CommonAuditContext.java:192)
+    at org.apache.hadoop.fs.audit.CommonAuditContext.createInstance(CommonAuditContext.java:210)
+    at org.apache.hadoop.fs.audit.CommonAuditContext.lambda$static$0(CommonAuditContext.java:119)
+    at java.lang.ThreadLocal$SuppliedThreadLocal.initialValue(ThreadLocal.java:284)
+    at java.lang.ThreadLocal.setInitialValue(ThreadLocal.java:180)
+    at java.lang.ThreadLocal.get(ThreadLocal.java:170)
+    at org.apache.hadoop.fs.audit.CommonAuditContext.currentAuditContext(CommonAuditContext.java:219)
+    at org.apache.hadoop.fs.audit.TestCommonAuditContext.<init>(TestCommonAuditContext.java:54)
+```
+
 
 ### class `NoopAuditor`
 

+ 130 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/MemoryHungryAuditor.java

@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit;
+
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.s3a.audit.impl.AbstractAuditSpanImpl;
+import org.apache.hadoop.fs.s3a.audit.impl.AbstractOperationAuditor;
+
+
+/**
+ * An audit service which consumes lots of memory.
+ */
+public class MemoryHungryAuditor extends AbstractOperationAuditor {
+
+  public static final String NAME = "org.apache.hadoop.fs.s3a.audit.MemoryHungryAuditor";
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MemoryHungryAuditor.class);
+  /**
+   * How big is each manager?
+   */
+  public static final int MANAGER_SIZE = 10 * 1024 * 1024;
+
+  /**
+   * How big is each span?
+   */
+  public static final int SPAN_SIZE = 512 * 1024;
+
+  private static final AtomicLong INSTANCE_COUNT = new AtomicLong();
+
+  private final AtomicLong spanCount = new AtomicLong();
+
+  private final byte[] data = new byte[MANAGER_SIZE];
+
+  /**
+   * unbonded span created on demand.
+   */
+  private AuditSpanS3A unbondedSpan;
+
+
+  /**
+   * Constructor.
+   */
+  public MemoryHungryAuditor() {
+    super("MemoryHungryAuditor");
+    INSTANCE_COUNT.incrementAndGet();
+  }
+
+  public long getSpanCount() {
+    return spanCount.get();
+  }
+
+  @Override
+  public AuditSpanS3A createSpan(
+      final String operation,
+      @Nullable final String path1,
+      @Nullable final String path2) {
+    spanCount.incrementAndGet();
+    return new MemorySpan(createSpanID(), operation);
+  }
+
+  @Override
+  public AuditSpanS3A getUnbondedSpan() {
+    if (unbondedSpan == null) {
+      unbondedSpan = new MemorySpan(createSpanID(), "unbonded");
+    }
+    return unbondedSpan;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%s instance %d span count %d",
+        super.toString(),
+        getInstanceCount(),
+        getSpanCount());
+  }
+
+  @Override
+  public void noteSpanReferenceLost(final long threadId) {
+    LOG.info("Span lost for thread {}", threadId);
+  }
+
+  public static long getInstanceCount() {
+    return INSTANCE_COUNT.get();
+  }
+
+  /**
+   * A span which consumes a lot of memory.
+   */
+  private static final class MemorySpan extends AbstractAuditSpanImpl {
+
+    private final byte[] data = new byte[SPAN_SIZE];
+
+    private MemorySpan(final String spanId, final String operationName) {
+      super(spanId, operationName);
+    }
+
+    @Override
+    public AuditSpanS3A activate() {
+      return this;
+    }
+
+    @Override
+    public void deactivate() {
+    }
+
+  }
+
+}

+ 406 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/impl/TestActiveAuditManagerThreadLeakage.java

@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit.impl;
+
+import java.io.IOException;
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.impl.WeakReferenceThreadMap;
+import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
+import org.apache.hadoop.fs.s3a.audit.MemoryHungryAuditor;
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.AUDIT_SERVICE_CLASSNAME;
+import static org.apache.hadoop.fs.s3a.audit.impl.ActiveAuditManagerS3A.PRUNE_THRESHOLD;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.emptyStatisticsStore;
+
+/**
+ * This test attempts to recreate the OOM problems of
+ * HADOOP-18091. S3A auditing leaks memory through ThreadLocal references
+ * it does this by creating a thread pool, then
+ * creates and destroys FS instances, with threads in
+ * the pool (but not the main JUnit test thread) creating
+ * audit spans.
+ *
+ * With a custom audit span with a large memory footprint,
+ * memory demands will be high, and if the closed instances
+ * don't get cleaned up, memory runs out.
+ * GCs are forced.
+ * It is critical no spans are created in the junit thread because they will
+ * last for the duration of the test JVM.
+ */
+@SuppressWarnings("ResultOfMethodCallIgnored")
+public class TestActiveAuditManagerThreadLeakage extends AbstractHadoopTestBase {
+  /**
+   * Logging.
+   */
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestActiveAuditManagerThreadLeakage.class);
+
+  /** how many managers to sequentially create. */
+  private static final int MANAGER_COUNT = 500;
+
+  /** size of long lived thread pool. */
+  private static final int THREAD_COUNT = 20;
+  private ExecutorService workers;
+
+  /**
+   * count of prunings which have taken place in the manager lifecycle
+   * operations.
+   */
+  private int pruneCount;
+
+  /**
+   * As audit managers are created they are added to the list,
+   * so we can verify they get GC'd.
+   */
+  private final List<WeakReference<ActiveAuditManagerS3A>> auditManagers =
+      new ArrayList<>();
+
+  @After
+  public void teardown() {
+    if (workers != null) {
+      workers.shutdown();
+    }
+  }
+
+
+  /**
+   * When the service is stopped, the span map is
+   * cleared immediately.
+   */
+  @Test
+  public void testSpanMapClearedInServiceStop() throws IOException {
+    try (ActiveAuditManagerS3A auditManager =
+             new ActiveAuditManagerS3A(emptyStatisticsStore())) {
+      auditManager.init(createMemoryHungryConfiguration());
+      auditManager.start();
+      auditManager.getActiveAuditSpan();
+      // get the span map
+      final WeakReferenceThreadMap<?> spanMap
+          = auditManager.getActiveSpanMap();
+      Assertions.assertThat(spanMap.size())
+          .describedAs("map size")
+          .isEqualTo(1);
+      auditManager.stop();
+      Assertions.assertThat(spanMap.size())
+          .describedAs("map size")
+          .isEqualTo(0);
+    }
+  }
+
+  @Test
+  public void testMemoryLeak() throws Throwable {
+    workers = Executors.newFixedThreadPool(THREAD_COUNT);
+    for (int i = 0; i < MANAGER_COUNT; i++) {
+      final long oneAuditConsumption = createAndTestOneAuditor();
+      LOG.info("manager {} memory retained {}", i, oneAuditConsumption);
+    }
+
+    // pruning must have taken place.
+    // that's somewhat implicit in the test not going OOM.
+    // but if memory allocation in test runs increase, it
+    // may cease to hold. in which case: create more
+    // audit managers
+    LOG.info("Total prune count {}", pruneCount);
+
+    Assertions.assertThat(pruneCount)
+        .describedAs("Total prune count")
+        .isNotZero();
+
+    // now count number of audit managers GC'd
+    // some must have been GC'd, showing that no other
+    // references are being retained internally.
+    Assertions.assertThat(auditManagers.stream()
+            .filter((r) -> r.get() == null)
+            .count())
+        .describedAs("number of audit managers garbage collected")
+        .isNotZero();
+  }
+
+  /**
+   * Create, use and then shutdown one auditor in a unique thread.
+   * @return memory consumed/released
+   */
+  private long createAndTestOneAuditor() throws Exception {
+    long original = Runtime.getRuntime().freeMemory();
+    ExecutorService factory = Executors.newSingleThreadExecutor();
+
+    try {
+      pruneCount += factory.submit(this::createAuditorAndWorkers).get();
+    } finally {
+      factory.shutdown();
+      factory.awaitTermination(60, TimeUnit.SECONDS);
+    }
+
+
+    final long current = Runtime.getRuntime().freeMemory();
+    return current - original;
+
+  }
+
+  /**
+   * This is the core of the leakage test.
+   * Create an audit manager and spans across multiple threads.
+   * The spans are created in the long-lived pool, so if there is
+   * any bonding of the life of managers/spans to that of threads,
+   * it will surface as OOM events.
+   * @return count of weak references whose reference values were
+   * nullified.
+   */
+  private int createAuditorAndWorkers()
+      throws IOException, InterruptedException, ExecutionException {
+    try (ActiveAuditManagerS3A auditManager =
+             new ActiveAuditManagerS3A(emptyStatisticsStore())) {
+      auditManager.init(createMemoryHungryConfiguration());
+      auditManager.start();
+      LOG.info("Using {}", auditManager);
+      auditManagers.add(new WeakReference<>(auditManager));
+
+      // no guarantee every thread gets used, so track
+      // in a set. This will give us the thread ID of every
+      // entry in the map.
+
+      Set<Long> threadIds = new HashSet<>();
+
+      List<Future<Result>> futures = new ArrayList<>(THREAD_COUNT);
+
+      // perform the spanning operation in a long lived thread.
+      for (int i = 0; i < THREAD_COUNT; i++) {
+        futures.add(workers.submit(() -> spanningOperation(auditManager)));
+      }
+
+      // get the results and so determine the thread IDs
+      for (Future<Result> future : futures) {
+        final Result r = future.get();
+        threadIds.add(r.getThreadId());
+      }
+
+      final int threadsUsed = threadIds.size();
+      final Long[] threadIdArray = threadIds.toArray(new Long[0]);
+
+      // gc
+      System.gc();
+      // get the span map
+      final WeakReferenceThreadMap<?> spanMap
+          = auditManager.getActiveSpanMap();
+
+      // count number of spans removed
+      final long derefenced = threadIds.stream()
+          .filter((id) -> !spanMap.containsKey(id))
+          .count();
+      if (derefenced > 0) {
+        LOG.info("{} executed across {} threads and dereferenced {} entries",
+            auditManager, threadsUsed, derefenced);
+      }
+
+      // resolve not quite all of the threads.
+      // why not all? leaves at least one for pruning
+      // but it does complicate some of the assertions...
+      int spansRecreated = 0;
+      int subset = threadIdArray.length - 1;
+      LOG.info("Resolving {} thread references", subset);
+      for (int i = 0; i < subset; i++) {
+        final long id = threadIdArray[i];
+
+        // note whether or not the span is present
+        final boolean present = spanMap.containsKey(id);
+
+        // get the the span for that ID. which must never be
+        // null
+        Assertions.assertThat(spanMap.get(id))
+            .describedAs("Span map entry for thread %d", id)
+            .isNotNull();
+
+        // if it wasn't present, the unbounded span must therefore have been
+        // bounded to this map entry.
+        if (!present) {
+          spansRecreated++;
+        }
+      }
+      LOG.info("Recreated {} spans", subset);
+
+      // if the number of spans lost is more than the number
+      // of entries not probed, then at least one span was
+      // recreated
+      if (derefenced > threadIdArray.length - subset) {
+        Assertions.assertThat(spansRecreated)
+            .describedAs("number of recreated spans")
+            .isGreaterThan(0);
+      }
+
+      // now prune.
+      int pruned = auditManager.prune();
+      if (pruned > 0) {
+        LOG.info("{} executed across {} threads and pruned {} entries",
+            auditManager, threadsUsed, pruned);
+      }
+      Assertions.assertThat(pruned)
+          .describedAs("Count of references pruned")
+          .isEqualTo(derefenced - spansRecreated);
+      return pruned + (int) derefenced;
+    }
+
+  }
+
+  private Configuration createMemoryHungryConfiguration() {
+    final Configuration conf = new Configuration(false);
+    conf.set(AUDIT_SERVICE_CLASSNAME, MemoryHungryAuditor.NAME);
+    return conf;
+  }
+
+  /**
+   * The operation in each worker thread.
+   * @param auditManager audit manager
+   * @return result of the call
+   * @throws IOException troluble
+   */
+  private Result spanningOperation(final ActiveAuditManagerS3A auditManager)
+      throws IOException {
+    auditManager.getActiveAuditSpan();
+    final AuditSpanS3A auditSpan =
+        auditManager.createSpan("span", null, null);
+    Assertions.assertThat(auditSpan)
+        .describedAs("audit span for current thread")
+        .isNotNull();
+    // this is needed to ensure that more of the thread pool is used up
+    Thread.yield();
+    return new Result(Thread.currentThread().getId());
+  }
+
+  /**
+   * Result of the spanning operation.
+   */
+  private static final class Result {
+    /** thread operation took place in. */
+    private final long threadId;
+
+
+    private Result(final long threadId) {
+      this.threadId = threadId;
+    }
+
+    private long getThreadId() {
+      return threadId;
+    }
+
+
+  }
+
+  /**
+   * Verify that pruning takes place intermittently.
+   */
+  @Test
+  public void testRegularPruning() throws Throwable {
+    try (ActiveAuditManagerS3A auditManager =
+             new ActiveAuditManagerS3A(emptyStatisticsStore())) {
+      auditManager.init(createMemoryHungryConfiguration());
+      auditManager.start();
+      // get the span map
+      final WeakReferenceThreadMap<?> spanMap
+          = auditManager.getActiveSpanMap();
+      // add a null entry at a thread ID other than this one
+      spanMap.put(Thread.currentThread().getId() + 1, null);
+
+      // remove this span enough times that pruning shall take
+      // place twice
+      // this verifies that pruning takes place and that the
+      // counter is reset
+      int pruningCount = 0;
+      for (int i = 0; i < PRUNE_THRESHOLD * 2 + 1; i++) {
+        boolean pruned = auditManager.removeActiveSpanFromMap();
+        if (pruned) {
+          pruningCount++;
+        }
+      }
+      // pruning must have taken place
+      Assertions.assertThat(pruningCount)
+          .describedAs("Intermittent pruning count")
+          .isEqualTo(2);
+    }
+  }
+
+  /**
+   * Verify span deactivation removes the entry from the map.
+   */
+  @Test
+  public void testSpanDeactivationRemovesEntryFromMap() throws Throwable {
+    try (ActiveAuditManagerS3A auditManager =
+             new ActiveAuditManagerS3A(emptyStatisticsStore())) {
+      auditManager.init(createMemoryHungryConfiguration());
+      auditManager.start();
+      // get the span map
+      final WeakReferenceThreadMap<?> spanMap
+          = auditManager.getActiveSpanMap();
+      final AuditSpanS3A auditSpan =
+          auditManager.createSpan("span", null, null);
+      Assertions.assertThat(auditManager.getActiveAuditSpan())
+          .describedAs("active span")
+          .isSameAs(auditSpan);
+      // this assert gets used repeatedly, so define a lambda-exp
+      // which can be envoked with different arguments
+      Consumer<Boolean> assertMapHasKey = expected ->
+          Assertions.assertThat(spanMap.containsKey(spanMap.currentThreadId()))
+              .describedAs("map entry for current thread")
+              .isEqualTo(expected);
+
+      // sets the span to null
+      auditSpan.deactivate();
+
+      // there's no entry
+      assertMapHasKey.accept(false);
+
+      // asking for the current span will return the unbonded one
+      final AuditSpanS3A newSpan = auditManager.getActiveAuditSpan();
+      Assertions.assertThat(newSpan)
+          .describedAs("active span")
+          .isNotNull()
+          .matches(s -> !s.isValidSpan());
+      // which is in the map
+      // there's an entry
+      assertMapHasKey.accept(true);
+
+      // deactivating the old span does nothing
+      auditSpan.deactivate();
+      assertMapHasKey.accept(true);
+
+      // deactivating the current unbounded span does
+      // remove the entry
+      newSpan.deactivate();
+      assertMapHasKey.accept(false);
+    }
+  }
+}

+ 3 - 0
hadoop-tools/hadoop-aws/src/test/resources/log4j.properties

@@ -82,3 +82,6 @@ log4j.logger.org.apache.hadoop.fs.s3a.S3AStorageStatistics=INFO
 #log4j.logger.org.apache.hadoop.fs.s3a.audit=DEBUG
 # log request creation, span lifecycle and other low-level details
 #log4j.logger.org.apache.hadoop.fs.s3a.audit=TRACE
+
+# uncomment this to trace where context entries are set
+# log4j.logger.org.apache.hadoop.fs.audit.CommonAuditContext=TRACE