瀏覽代碼

HADOOP-12482. Race condition in JMX cache update. (Tony Wu via lei)

(cherry picked from commit 0eb9c60c5bec79f531da8cb3226d7e8b1d7e6639)
(cherry picked from commit 929699f7eb692d1cc0d02cee4d36778e4cd37b96)
Lei Xu 9 年之前
父節點
當前提交
76fa5a924e

+ 2 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -60,6 +60,8 @@ Release 2.6.5 - UNRELEASED
     HADOOP-12348. MetricsSystemImpl creates MetricsSourceAdapter with wrong
     time unit parameter. (zxu via rkanter)
 
+    HADOOP-12482. Race condition in JMX cache update. (Tony Wu via lei)
+
 Release 2.6.4 - 2016-02-11
 
   INCOMPATIBLE CHANGES

+ 11 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java

@@ -60,6 +60,7 @@ class MetricsSourceAdapter implements DynamicMBean {
   private final Iterable<MetricsTag> injectedTags;
 
   private Iterable<MetricsRecordImpl> lastRecs;
+  private boolean lastRecsCleared;
   private long jmxCacheTS = 0;
   private long jmxCacheTTL;
   private MBeanInfo infoCache;
@@ -80,6 +81,9 @@ class MetricsSourceAdapter implements DynamicMBean {
     this.metricFilter = metricFilter;
     this.jmxCacheTTL = checkArg(jmxCacheTTL, jmxCacheTTL > 0, "jmxCacheTTL");
     this.startMBeans = startMBeans;
+    // Initialize to true so we always trigger update MBeanInfo cache the first
+    // time calling updateJmxCache
+    this.lastRecsCleared = true;
   }
 
   MetricsSourceAdapter(String prefix, String name, String description,
@@ -158,8 +162,12 @@ class MetricsSourceAdapter implements DynamicMBean {
       if (Time.now() - jmxCacheTS >= jmxCacheTTL) {
         // temporarilly advance the expiry while updating the cache
         jmxCacheTS = Time.now() + jmxCacheTTL;
-        if (lastRecs == null) {
+        // lastRecs might have been set to an object already by another thread.
+        // Track the fact that lastRecs has been reset once to make sure refresh
+        // is correctly triggered.
+        if (lastRecsCleared) {
           getAllMetrics = true;
+          lastRecsCleared = false;
         }
       }
       else {
@@ -178,7 +186,8 @@ class MetricsSourceAdapter implements DynamicMBean {
         updateInfoCache();
       }
       jmxCacheTS = Time.now();
-      lastRecs = null;  // in case regular interval update is not running
+      lastRecs = null; // in case regular interval update is not running
+      lastRecsCleared = true;
     }
   }
 

+ 193 - 2
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSourceAdapter.java

@@ -22,7 +22,13 @@ import static org.junit.Assert.*;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.metrics2.MetricsCollector;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.metrics2.MetricsSource;
@@ -36,13 +42,14 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import static org.apache.hadoop.metrics2.lib.Interns.info;
 import static org.junit.Assert.assertEquals;
 
+import org.apache.log4j.Logger;
 import org.junit.Test;
 
 import javax.management.MBeanAttributeInfo;
 import javax.management.MBeanInfo;
 
 public class TestMetricsSourceAdapter {
-
+  private static final int RACE_TEST_RUNTIME = 10000; // 10 seconds
 
   @Test
   public void testPurgeOldMetrics() throws Exception {
@@ -73,7 +80,7 @@ public class TestMetricsSourceAdapter {
   }
 
   //generate a new key per each call
-  class PurgableSource implements MetricsSource {
+  private static class PurgableSource implements MetricsSource {
     int nextKey = 0;
     String lastKeyName = null;
     @Override
@@ -135,4 +142,188 @@ public class TestMetricsSourceAdapter {
       c1.incr();
     }
   }
+
+  /**
+   * Test a race condition when updating the JMX cache (HADOOP-12482):
+   * 1. Thread A reads the JMX metric every 2 JMX cache TTL. It marks the JMX
+   *    cache to be updated by marking lastRecs to null. After this it adds a
+   *    new key to the metrics. The next call to read should pick up this new
+   *    key.
+   * 2. Thread B triggers JMX metric update every 1 JMX cache TTL. It assigns
+   *    lastRecs to a new object (not null any more).
+   * 3. Thread A tries to read JMX metric again, sees lastRecs is not null and
+   *    does not update JMX cache. As a result the read does not pickup the new
+   *    metric.
+   * @throws Exception
+   */
+  @Test
+  public void testMetricCacheUpdateRace() throws Exception {
+    // Create test source with a single metric counter of value 1.
+    TestMetricsSource source = new TestMetricsSource();
+    MetricsSourceBuilder sourceBuilder =
+        MetricsAnnotations.newSourceBuilder(source);
+
+    final long JMX_CACHE_TTL = 250; // ms
+    List<MetricsTag> injectedTags = new ArrayList<MetricsTag>();
+    MetricsSourceAdapter sourceAdapter =
+        new MetricsSourceAdapter("test", "test",
+            "test JMX cache update race condition", sourceBuilder.build(),
+            injectedTags, null, null, JMX_CACHE_TTL, false);
+
+    ScheduledExecutorService updaterExecutor =
+        Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().build());
+    ScheduledExecutorService readerExecutor =
+        Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().build());
+
+    final AtomicBoolean hasError = new AtomicBoolean(false);
+
+    // Wake up every 1 JMX cache TTL to set lastRecs before updateJmxCache() is
+    // called.
+    SourceUpdater srcUpdater = new SourceUpdater(sourceAdapter, hasError);
+    ScheduledFuture<?> updaterFuture =
+        updaterExecutor.scheduleAtFixedRate(srcUpdater,
+            sourceAdapter.getJmxCacheTTL(), sourceAdapter.getJmxCacheTTL(),
+            TimeUnit.MILLISECONDS);
+    srcUpdater.setFuture(updaterFuture);
+
+    // Wake up every 2 JMX cache TTL so updateJmxCache() will try to update
+    // JMX cache.
+    SourceReader srcReader = new SourceReader(source, sourceAdapter, hasError);
+    ScheduledFuture<?> readerFuture =
+        readerExecutor.scheduleAtFixedRate(srcReader,
+            0, // set JMX info cache at the beginning
+            2 * sourceAdapter.getJmxCacheTTL(), TimeUnit.MILLISECONDS);
+    srcReader.setFuture(readerFuture);
+
+    // Let the threads do their work.
+    Thread.sleep(RACE_TEST_RUNTIME);
+
+    assertFalse("Hit error", hasError.get());
+
+    // cleanup
+    updaterExecutor.shutdownNow();
+    readerExecutor.shutdownNow();
+    updaterExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
+    readerExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Thread safe source: stores a key value pair. Allows thread safe key-value
+   * pair reads/writes.
+   */
+  private static class TestMetricsSource implements MetricsSource {
+    private String key = "key0";
+    private int val = 0;
+
+    synchronized String getKey() {
+      return key;
+    }
+
+    synchronized void setKV(final String newKey, final int newVal) {
+      key = newKey;
+      val = newVal;
+    }
+
+    @Override
+    public void getMetrics(MetricsCollector collector, boolean all) {
+      MetricsRecordBuilder rb =
+          collector.addRecord("TestMetricsSource").setContext("test");
+      synchronized(this) {
+        rb.addGauge(info(key, "TestMetricsSource key"), val);
+      }
+    }
+  }
+
+  /**
+   * An thread that updates the metrics source every 1 JMX cache TTL
+   */
+  private static class SourceUpdater implements Runnable {
+    private MetricsSourceAdapter sa = null;
+    private ScheduledFuture<?> future = null;
+    private AtomicBoolean hasError = null;
+    private static final Logger LOG = Logger.getLogger(SourceUpdater.class);
+
+    public SourceUpdater(MetricsSourceAdapter sourceAdapter,
+        AtomicBoolean err) {
+      sa = sourceAdapter;
+      hasError = err;
+    }
+
+    public void setFuture(ScheduledFuture<?> f) {
+      future = f;
+    }
+
+    @Override
+    public void run() {
+      MetricsCollectorImpl builder = new MetricsCollectorImpl();
+      try {
+        // This resets lastRecs.
+        sa.getMetrics(builder, true);
+        LOG.info("reset lastRecs");
+      } catch (Exception e) {
+        // catch all errors
+        hasError.set(true);
+        LOG.error(e.getStackTrace());
+      } finally {
+        if (hasError.get()) {
+          LOG.error("Hit error, stopping now");
+          future.cancel(false);
+        }
+      }
+    }
+  }
+
+  /**
+   * An thread that reads the metrics source every JMX cache TTL. After each
+   * read it updates the metric source to report a new key. The next read must
+   * be able to pick up this new key.
+   */
+  private static class SourceReader implements Runnable {
+    private MetricsSourceAdapter sa = null;
+    private TestMetricsSource src = null;
+    private int cnt = 0;
+    private ScheduledFuture<?> future = null;
+    private AtomicBoolean hasError = null;
+    private static final Logger LOG = Logger.getLogger(SourceReader.class);
+
+    public SourceReader(
+        TestMetricsSource source, MetricsSourceAdapter sourceAdapter,
+        AtomicBoolean err) {
+      src = source;
+      sa = sourceAdapter;
+      hasError = err;
+    }
+
+    public void setFuture(ScheduledFuture<?> f) {
+      future = f;
+    }
+
+    @Override
+    public void run() {
+      try {
+        // This will trigger updateJmxCache().
+        MBeanInfo info = sa.getMBeanInfo();
+        final String key = src.getKey();
+        for (MBeanAttributeInfo mBeanAttributeInfo : info.getAttributes()) {
+          // Found the new key, update the metric source and move on.
+          if (mBeanAttributeInfo.getName().equals(key)) {
+            LOG.info("found key/val=" + cnt + "/" + cnt);
+            cnt++;
+            src.setKV("key" + cnt, cnt);
+            return;
+          }
+        }
+        LOG.error("key=" + key + " not found. Stopping now.");
+        hasError.set(true);
+      } catch (Exception e) {
+        // catch other errors
+        hasError.set(true);
+        LOG.error(e.getStackTrace());
+      } finally {
+        if (hasError.get()) {
+          future.cancel(false);
+        }
+      }
+    }
+  }
 }