Browse Source

HADOOP-18456. NullPointerException in ObjectListingIterator. (#4909)

This problem surfaced in impala integration tests
   IMPALA-11592. TestLocalCatalogRetries.test_fetch_metadata_retry fails in S3 build
after the change
  HADOOP-17461. Add thread-level IOStatistics Context
The actual GC race condition came with
 HADOOP-18091. S3A auditing leaks memory through ThreadLocal references

The fix for this is, if our hypothesis is correct, in WeakReferenceMap.create()
where a strong reference to the new value is kept in a local variable
*and referred to later* so that the JVM will not GC it.

Along with the fix, extra assertions ensure that if the problem is not fixed,
applications will fail faster/more meaningfully. 

Contributed by Steve Loughran.
Steve Loughran 2 years ago
parent
commit
0676495950

+ 33 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java

@@ -25,6 +25,8 @@ import javax.annotation.Nullable;
 
 import org.apache.hadoop.util.WeakReferenceMap;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  * A WeakReferenceMap for threads.
  * @param <V> value type of the map
@@ -36,30 +38,55 @@ public class WeakReferenceThreadMap<V> extends WeakReferenceMap<Long, V> {
     super(factory, referenceLost);
   }
 
+  /**
+   * Get the value for the current thread, creating if needed.
+   * @return an instance.
+   */
   public V getForCurrentThread() {
     return get(currentThreadId());
   }
 
+  /**
+   * Remove the reference for the current thread.
+   * @return any reference value which existed.
+   */
   public V removeForCurrentThread() {
     return remove(currentThreadId());
   }
 
+  /**
+   * Get the current thread ID.
+   * @return thread ID.
+   */
   public long currentThreadId() {
     return Thread.currentThread().getId();
   }
 
+  /**
+   * Set the new value for the current thread.
+   * @param newVal new reference to set for the active thread.
+   * @return the previously set value, possibly null
+   */
   public V setForCurrentThread(V newVal) {
+    requireNonNull(newVal);
     long id = currentThreadId();
 
     // if the same object is already in the map, just return it.
-    WeakReference<V> ref = lookup(id);
-    // Reference value could be set to null. Thus, ref.get() could return
-    // null. Should be handled accordingly while using the returned value.
-    if (ref != null && ref.get() == newVal) {
-      return ref.get();
+    WeakReference<V> existingWeakRef = lookup(id);
+
+    // The looked up reference could be one of
+    // 1. null: nothing there
+    // 2. valid but get() == null : reference lost by GC.
+    // 3. different from the new value
+    // 4. the same as the old value
+    if (resolve(existingWeakRef) == newVal) {
+      // case 4: do nothing, return the new value
+      return newVal;
+    } else {
+      // cases 1, 2, 3: update the map and return the old value
+      return put(id, newVal);
     }
 
-    return put(id, newVal);
   }
 
 }

+ 7 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsContext.java

@@ -20,6 +20,8 @@ package org.apache.hadoop.fs.statistics;
 
 import org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  * An interface defined to capture thread-level IOStatistics by using per
  * thread context.
@@ -67,7 +69,11 @@ public interface IOStatisticsContext extends IOStatisticsSource {
    * @return instance of IOStatisticsContext for the context.
    */
   static IOStatisticsContext getCurrentIOStatisticsContext() {
-    return IOStatisticsContextIntegration.getCurrentIOStatisticsContext();
+    // the null check is just a safety check to highlight exactly where a null value would
+    // be returned if HADOOP-18456 has resurfaced.
+    return requireNonNull(
+        IOStatisticsContextIntegration.getCurrentIOStatisticsContext(),
+        "Null IOStatisticsContext");
   }
 
   /**

+ 8 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextIntegration.java

@@ -100,7 +100,10 @@ public final class IOStatisticsContextIntegration {
    * @return an instance of IOStatisticsContext.
    */
   private static IOStatisticsContext createNewInstance(Long key) {
-    return new IOStatisticsContextImpl(key, INSTANCE_ID.getAndIncrement());
+    IOStatisticsContextImpl instance =
+        new IOStatisticsContextImpl(key, INSTANCE_ID.getAndIncrement());
+    LOG.debug("Created instance {}", instance);
+    return instance;
   }
 
   /**
@@ -131,9 +134,11 @@ public final class IOStatisticsContextIntegration {
       IOStatisticsContext statisticsContext) {
     if (isThreadIOStatsEnabled) {
       if (statisticsContext == null) {
+        // new value is null, so remove it
         ACTIVE_IOSTATS_CONTEXT.removeForCurrentThread();
-      }
-      if (ACTIVE_IOSTATS_CONTEXT.getForCurrentThread() != statisticsContext) {
+      } else {
+        // the setter is efficient in that it does not create a new
+        // reference if the context is unchanged.
         ACTIVE_IOSTATS_CONTEXT.setForCurrentThread(statisticsContext);
       }
     }

+ 84 - 13
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java

@@ -28,7 +28,11 @@ import java.util.function.Function;
 
 import javax.annotation.Nullable;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.store.LogExactlyOnce;
 
 import static java.util.Objects.requireNonNull;
 
@@ -52,6 +56,9 @@ import static java.util.Objects.requireNonNull;
 @InterfaceAudience.Private
 public class WeakReferenceMap<K, V> {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(WeakReferenceMap.class);
+
   /**
    * The reference map.
    */
@@ -79,6 +86,12 @@ public class WeakReferenceMap<K, V> {
    */
   private final AtomicLong entriesCreatedCount = new AtomicLong();
 
+  /**
+   * Log to report loss of a reference during the create phase, which
+   * is believed to be a cause of HADOOP-18456.
+   */
+  private final LogExactlyOnce referenceLostDuringCreation = new LogExactlyOnce(LOG);
+
   /**
    * instantiate.
    * @param factory supplier of new instances
@@ -132,35 +145,93 @@ public class WeakReferenceMap<K, V> {
    * @return an instance.
    */
   public V get(K key) {
-    final WeakReference<V> current = lookup(key);
-    V val = resolve(current);
-    if (val != null) {
+    final WeakReference<V> currentWeakRef = lookup(key);
+    // resolve it, after which if not null, we have a strong reference
+    V strongVal = resolve(currentWeakRef);
+    if (strongVal != null) {
       // all good.
-      return  val;
+      return  strongVal;
     }
 
-    // here, either no ref, or the value is null
-    if (current != null) {
+    // here, either currentWeakRef was null, or its reference was GC'd.
+    if (currentWeakRef != null) {
+      // garbage collection removed the reference.
+
+      // explicitly remove the weak ref from the map if it has not
+      // been updated by this point
+      // this is here just for completeness.
+      map.remove(key, currentWeakRef);
+
+      // log/report the loss.
       noteLost(key);
     }
+
+    // create a new value and add it to the map
     return create(key);
   }
 
   /**
    * Create a new instance under a key.
+   * <p>
    * The instance is created, added to the map and then the
    * map value retrieved.
    * This ensures that the reference returned is that in the map,
    * even if there is more than one entry being created at the same time.
+   * If that race does occur, it will be logged the first time it happens
+   * for this specific map instance.
+   * <p>
+   * HADOOP-18456 highlighted the risk of a concurrent GC resulting a null
+   * value being retrieved and so returned.
+   * To prevent this:
+   * <ol>
+   *   <li>A strong reference is retained to the newly created instance
+   *       in a local variable.</li>
+   *   <li>That variable is used after the resolution process, to ensure
+   *       the JVM doesn't consider it "unreachable" and so eligible for GC.</li>
+   *   <li>A check is made for the resolved reference being null, and if so,
+   *       the put() is repeated</li>
+   * </ol>
    * @param key key
-   * @return the value
+   * @return the created value
    */
   public V create(K key) {
     entriesCreatedCount.incrementAndGet();
-    WeakReference<V> newRef = new WeakReference<>(
-        requireNonNull(factory.apply(key)));
-    map.put(key, newRef);
-    return map.get(key).get();
+    /*
+     Get a strong ref so even if a GC happens in this method the reference is not lost.
+     It is NOT enough to have a reference in a field, it MUST be used
+     so as to ensure the reference isn't optimized away prematurely.
+     "A reachable object is any object that can be accessed in any potential continuing
+      computation from any live thread."
+    */
+
+    final V strongRef = requireNonNull(factory.apply(key),
+        "factory returned a null instance");
+    V resolvedStrongRef;
+    do {
+      WeakReference<V> newWeakRef = new WeakReference<>(strongRef);
+
+      // put it in the map
+      map.put(key, newWeakRef);
+
+      // get it back from the map
+      WeakReference<V> retrievedWeakRef = map.get(key);
+      // resolve that reference, handling the situation where somehow it was removed from the map
+      // between the put() and the get()
+      resolvedStrongRef = resolve(retrievedWeakRef);
+      if (resolvedStrongRef == null) {
+        referenceLostDuringCreation.warn("reference to %s lost during creation", key);
+        noteLost(key);
+      }
+    } while (resolvedStrongRef == null);
+
+    // note if there was any change in the reference.
+    // as this forces strongRef to be kept in scope
+    if (strongRef != resolvedStrongRef) {
+      LOG.debug("Created instance for key {}: {} overwritten by {}",
+          key, strongRef, resolvedStrongRef);
+    }
+
+    return resolvedStrongRef;
   }
 
   /**
@@ -203,7 +274,7 @@ public class WeakReferenceMap<K, V> {
    * @param r reference to resolve
    * @return the value or null
    */
-  private V resolve(WeakReference<V> r) {
+  protected V resolve(WeakReference<V> r) {
     return r == null ? null : r.get();
   }
 
@@ -233,7 +304,7 @@ public class WeakReferenceMap<K, V> {
    * @param key key of lost reference
    */
   private void noteLost(final K key) {
-    // incrment local counter
+    // increment local counter
     referenceLostCount.incrementAndGet();
 
     // and call any notification function supplied in the constructor

+ 88 - 3
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWeakReferenceMap.java

@@ -20,15 +20,19 @@ package org.apache.hadoop.util;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.assertj.core.api.Assertions;
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.hadoop.fs.impl.WeakReferenceThreadMap;
 import org.apache.hadoop.test.AbstractHadoopTestBase;
 
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
 /**
- * Test {@link WeakReferenceMap}.
+ * Test {@link WeakReferenceMap} and {@link WeakReferenceThreadMap}.
  * There's no attempt to force GC here, so the tests are
  * more about the basic behavior not the handling of empty references.
  */
@@ -125,11 +129,92 @@ public class TestWeakReferenceMap extends AbstractHadoopTestBase {
 
   }
 
+  /**
+   * It is an error to have a factory which returns null.
+   */
+  @Test
+  public void testFactoryReturningNull() throws Throwable {
+    referenceMap = new WeakReferenceMap<>(
+        (k) -> null,
+            null);
+    intercept(NullPointerException.class, () ->
+        referenceMap.get(0));
+  }
+
+  /**
+   * Test the WeakReferenceThreadMap extension.
+   */
+  @Test
+  public void testWeakReferenceThreadMapAssignment()
+      throws Throwable {
+
+    // counters foor the callbacks
+    final AtomicLong created = new AtomicLong();
+    final AtomicLong lost = new AtomicLong();
+
+    WeakReferenceThreadMap<String> threadMap = new WeakReferenceThreadMap<>(
+        id -> "Entry for thread ID " + id + " (" + created.incrementAndGet() + ")",
+        id -> lost.incrementAndGet());
+
+    Assertions.assertThat(threadMap.setForCurrentThread("hello"))
+        .describedAs("current thread map value on first set")
+        .isNull();
+
+    // second attempt returns itself
+    Assertions.assertThat(threadMap.setForCurrentThread("hello"))
+        .describedAs("current thread map value on second set")
+        .isEqualTo("hello");
+
+    // it is forbidden to explicitly set to null via the set() call.
+    intercept(NullPointerException.class, () ->
+        threadMap.setForCurrentThread(null));
+
+    // the map is unchanged
+    Assertions.assertThat(threadMap.getForCurrentThread())
+        .describedAs("current thread map value")
+        .isEqualTo("hello");
+
+    // remove the value and assert what the removed entry was
+    Assertions.assertThat(threadMap.removeForCurrentThread())
+        .describedAs("removed thread map value")
+        .isEqualTo("hello");
+
+    // remove the value again; this time the removed value is null
+    Assertions.assertThat(threadMap.removeForCurrentThread())
+        .describedAs("removed thread map value on second call")
+        .isNull();
+
+    // lookup will return a new instance created by the factory
+    long c1 = created.get();
+    String dynamicValue = threadMap.getForCurrentThread();
+    Assertions.assertThat(dynamicValue)
+        .describedAs("dynamically created thread map value")
+        .startsWith("Entry for thread ID")
+        .contains("(" + (c1 + 1) + ")");
+
+    // and we can overwrite that
+    Assertions.assertThat(threadMap.setForCurrentThread("hello2"))
+        .describedAs("value before the thread entry is changed")
+        .isEqualTo(dynamicValue);
+
+    // simulate a weak gc
+    long threadId = threadMap.currentThreadId();
+    threadMap.put(threadId, null);
+    String updated = threadMap.getForCurrentThread();
+    Assertions.assertThat(lost.get())
+        .describedAs("lost count")
+        .isEqualTo(1);
+    Assertions.assertThat(updated)
+        .describedAs("dynamically created thread map value")
+        .startsWith("Entry for thread ID")
+        .contains("(" + (c1 + 2) + ")");
+  }
+
   /**
    * Assert that the value of a map entry is as expected.
    * Will trigger entry creation if the key is absent.
    * @param key key
-   * @param val expected valued
+   * @param val expected value
    */
   private void assertMapEntryEquals(int key, String val) {
     Assertions.assertThat(referenceMap.get(key))
@@ -143,7 +228,7 @@ public class TestWeakReferenceMap extends AbstractHadoopTestBase {
    */
   private void assertMapContainsKey(int key) {
     Assertions.assertThat(referenceMap.containsKey(key))
-        .describedAs("map enty of key %d should be present", key)
+        .describedAs("map entry of key %d should be present", key)
         .isTrue();
   }