Prechádzať zdrojové kódy

HADOOP-16833. InstrumentedLock should log lock queue time. Contributed by Stephen O'Donnell.

Change-Id: Idddff05051b6f642b88e51694b40c5bb1bef0026
Arpit Agarwal 5 rokov pred
rodič
commit
0cfff16ac0

+ 107 - 14
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedLock.java

@@ -55,8 +55,10 @@ public class InstrumentedLock implements Lock {
 
   // Tracking counters for lock statistics.
   private volatile long lockAcquireTimestamp;
-  private final AtomicLong lastLogTimestamp;
-  private final AtomicLong warningsSuppressed = new AtomicLong(0);
+  private final AtomicLong lastHoldLogTimestamp;
+  private final AtomicLong lastWaitLogTimestamp;
+  private final SuppressedStats holdStats = new SuppressedStats();
+  private final SuppressedStats waitStats = new SuppressedStats();
 
   /**
    * Create a instrumented lock instance which logs a warning message
@@ -91,19 +93,24 @@ public class InstrumentedLock implements Lock {
     this.logger = logger;
     minLoggingGap = minLoggingGapMs;
     lockWarningThreshold = lockWarningThresholdMs;
-    lastLogTimestamp = new AtomicLong(
+    lastHoldLogTimestamp = new AtomicLong(
       clock.monotonicNow() - Math.max(minLoggingGap, lockWarningThreshold));
+    lastWaitLogTimestamp = new AtomicLong(lastHoldLogTimestamp.get());
   }
 
   @Override
   public void lock() {
+    long waitStart = clock.monotonicNow();
     lock.lock();
+    check(waitStart, clock.monotonicNow(), false);
     startLockTiming();
   }
 
   @Override
   public void lockInterruptibly() throws InterruptedException {
+    long waitStart = clock.monotonicNow();
     lock.lockInterruptibly();
+    check(waitStart, clock.monotonicNow(), false);
     startLockTiming();
   }
 
@@ -118,11 +125,14 @@ public class InstrumentedLock implements Lock {
 
   @Override
   public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
+    long waitStart = clock.monotonicNow();
+    boolean retval = false;
     if (lock.tryLock(time, unit)) {
       startLockTiming();
-      return true;
+      retval = true;
     }
-    return false;
+    check(waitStart, clock.monotonicNow(), false);
+    return retval;
   }
 
   @Override
@@ -130,7 +140,7 @@ public class InstrumentedLock implements Lock {
     long localLockReleaseTime = clock.monotonicNow();
     long localLockAcquireTime = lockAcquireTimestamp;
     lock.unlock();
-    check(localLockAcquireTime, localLockReleaseTime);
+    check(localLockAcquireTime, localLockReleaseTime, true);
   }
 
   @Override
@@ -139,12 +149,25 @@ public class InstrumentedLock implements Lock {
   }
 
   @VisibleForTesting
-  void logWarning(long lockHeldTime, long suppressed) {
+  void logWarning(long lockHeldTime, SuppressedSnapshot stats) {
     logger.warn(String.format("Lock held time above threshold: " +
         "lock identifier: %s " +
         "lockHeldTimeMs=%d ms. Suppressed %d lock warnings. " +
+        "Longest suppressed LockHeldTimeMs=%d. " +
         "The stack trace is: %s" ,
-        name, lockHeldTime, suppressed,
+        name, lockHeldTime, stats.getSuppressedCount(),
+        stats.getMaxSuppressedWait(),
+        StringUtils.getStackTrace(Thread.currentThread())));
+  }
+
+  @VisibleForTesting
+  void logWaitWarning(long lockWaitTime, SuppressedSnapshot stats) {
+    logger.warn(String.format("Waited above threshold to acquire lock: " +
+        "lock identifier: %s " +
+        "waitTimeMs=%d ms. Suppressed %d lock wait warnings. " +
+        "Longest suppressed WaitTimeMs=%d. " +
+        "The stack trace is: %s", name, lockWaitTime,
+        stats.getSuppressedCount(), stats.getMaxSuppressedWait(),
         StringUtils.getStackTrace(Thread.currentThread())));
   }
 
@@ -163,27 +186,41 @@ public class InstrumentedLock implements Lock {
    * @param acquireTime  - timestamp just after acquiring the lock.
    * @param releaseTime - timestamp just before releasing the lock.
    */
-  protected void check(long acquireTime, long releaseTime) {
+  protected void check(long acquireTime, long releaseTime,
+       boolean checkLockHeld) {
     if (!logger.isWarnEnabled()) {
       return;
     }
 
     final long lockHeldTime = releaseTime - acquireTime;
     if (lockWarningThreshold - lockHeldTime < 0) {
+      AtomicLong lastLogTime;
+      SuppressedStats stats;
+      if (checkLockHeld) {
+        lastLogTime = lastHoldLogTimestamp;
+        stats = holdStats;
+      } else {
+        lastLogTime = lastWaitLogTimestamp;
+        stats = waitStats;
+      }
       long now;
       long localLastLogTs;
       do {
         now = clock.monotonicNow();
-        localLastLogTs = lastLogTimestamp.get();
+        localLastLogTs = lastLogTime.get();
         long deltaSinceLastLog = now - localLastLogTs;
         // check should print log or not
         if (deltaSinceLastLog - minLoggingGap < 0) {
-          warningsSuppressed.incrementAndGet();
+          stats.incrementSuppressed(lockHeldTime);
           return;
         }
-      } while (!lastLogTimestamp.compareAndSet(localLastLogTs, now));
-      long suppressed = warningsSuppressed.getAndSet(0);
-      logWarning(lockHeldTime, suppressed);
+      } while (!lastLogTime.compareAndSet(localLastLogTs, now));
+      SuppressedSnapshot statsSnapshot = stats.snapshot();
+      if (checkLockHeld) {
+        logWarning(lockHeldTime, statsSnapshot);
+      } else {
+        logWaitWarning(lockHeldTime, statsSnapshot);
+      }
     }
   }
 
@@ -194,4 +231,60 @@ public class InstrumentedLock implements Lock {
   protected Timer getTimer() {
     return clock;
   }
+
+  /**
+   * Internal class to track statistics about suppressed log messages in an
+   * atomic way.
+   */
+  private static class SuppressedStats {
+    private long suppressedCount = 0;
+    private long maxSuppressedWait = 0;
+
+    /**
+     * Increments the suppressed counter and increases the max wait time if the
+     * passed wait is greater than the current maxSuppressedWait.
+     * @param wait The wait time for this suppressed message
+     */
+    synchronized public void incrementSuppressed(long wait) {
+      suppressedCount++;
+      if (wait > maxSuppressedWait) {
+        maxSuppressedWait = wait;
+      }
+    }
+
+    /**
+     * Captures the current value of the counts into a SuppressedSnapshot object
+     * and resets the values to zero.
+     *
+     * @return SuppressedSnapshot containing the current value of the counters
+     */
+    synchronized public SuppressedSnapshot snapshot() {
+      SuppressedSnapshot snap =
+          new SuppressedSnapshot(suppressedCount, maxSuppressedWait);
+      suppressedCount = 0;
+      maxSuppressedWait = 0;
+      return snap;
+    }
+  }
+
+  /**
+   * Immutable class to capture a snapshot of suppressed log message stats.
+   */
+  protected static class SuppressedSnapshot {
+    private long suppressedCount = 0;
+    private long maxSuppressedWait = 0;
+
+    public SuppressedSnapshot(long suppressedCount, long maxWait) {
+      this.suppressedCount = suppressedCount;
+      this.maxSuppressedWait = maxWait;
+    }
+
+    public long getMaxSuppressedWait() {
+      return maxSuppressedWait;
+    }
+
+    public long getSuppressedCount() {
+      return suppressedCount;
+    }
+  }
 }

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadLock.java

@@ -75,7 +75,7 @@ public class InstrumentedReadLock extends InstrumentedLock {
     getLock().unlock();
     if (needReport) {
       readLockHeldTimeStamp.remove();
-      check(localLockAcquireTime, localLockReleaseTime);
+      check(localLockAcquireTime, localLockReleaseTime, true);
     }
   }
 

+ 109 - 2
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedLock.java

@@ -17,9 +17,11 @@
  */
 package org.apache.hadoop.util;
 
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -117,12 +119,14 @@ public class TestInstrumentedLock {
 
     final AtomicLong wlogged = new AtomicLong(0);
     final AtomicLong wsuppresed = new AtomicLong(0);
+    final AtomicLong wMaxWait = new AtomicLong(0);
     InstrumentedLock lock = new InstrumentedLock(
         testname, LOG, mlock, 2000, 300, mclock) {
       @Override
-      void logWarning(long lockHeldTime, long suppressed) {
+      void logWarning(long lockHeldTime, SuppressedSnapshot stats) {
         wlogged.incrementAndGet();
-        wsuppresed.set(suppressed);
+        wsuppresed.set(stats.getSuppressedCount());
+        wMaxWait.set(stats.getMaxSuppressedWait());
       }
     };
 
@@ -132,12 +136,14 @@ public class TestInstrumentedLock {
     lock.unlock(); // t = 200
     assertEquals(0, wlogged.get());
     assertEquals(0, wsuppresed.get());
+    assertEquals(0, wMaxWait.get());
 
     lock.lock();   // t = 200
     time.set(700);
     lock.unlock(); // t = 700
     assertEquals(1, wlogged.get());
     assertEquals(0, wsuppresed.get());
+    assertEquals(0, wMaxWait.get());
 
     // despite the lock held time is greater than threshold
     // suppress the log warning due to the logging gap
@@ -147,6 +153,7 @@ public class TestInstrumentedLock {
     lock.unlock(); // t = 1100
     assertEquals(1, wlogged.get());
     assertEquals(0, wsuppresed.get());
+    assertEquals(0, wMaxWait.get());
 
     // log a warning message when the lock held time is greater the threshold
     // and the logging time gap is satisfied. Also should display suppressed
@@ -157,6 +164,106 @@ public class TestInstrumentedLock {
     lock.unlock(); // t = 2800
     assertEquals(2, wlogged.get());
     assertEquals(1, wsuppresed.get());
+    assertEquals(400, wMaxWait.get());
+  }
+
+  /**
+   * Test the lock logs warning when lock wait / queue time is greater than
+   * threshold and not log warning otherwise.
+   * @throws Exception
+   */
+  @Test(timeout=10000)
+  public void testLockLongWaitReport() throws Exception {
+    String testname = name.getMethodName();
+    final AtomicLong time = new AtomicLong(0);
+    Timer mclock = new Timer() {
+      @Override
+      public long monotonicNow() {
+        return time.get();
+      }
+    };
+    Lock mlock = new ReentrantLock(true); //mock(Lock.class);
+
+    final AtomicLong wlogged = new AtomicLong(0);
+    final AtomicLong wsuppresed = new AtomicLong(0);
+    final AtomicLong wMaxWait = new AtomicLong(0);
+    InstrumentedLock lock = new InstrumentedLock(
+        testname, LOG, mlock, 2000, 300, mclock) {
+      @Override
+      void logWaitWarning(long lockHeldTime, SuppressedSnapshot stats) {
+        wlogged.incrementAndGet();
+        wsuppresed.set(stats.getSuppressedCount());
+        wMaxWait.set(stats.getMaxSuppressedWait());
+      }
+    };
+
+    // do not log warning when the lock held time is short
+    lock.lock();   // t = 0
+
+    Thread competingThread = lockUnlockThread(lock);
+    time.set(200);
+    lock.unlock(); // t = 200
+    competingThread.join();
+    assertEquals(0, wlogged.get());
+    assertEquals(0, wsuppresed.get());
+    assertEquals(0, wMaxWait.get());
+
+
+    lock.lock();   // t = 200
+    competingThread = lockUnlockThread(lock);
+    time.set(700);
+    lock.unlock(); // t = 700
+    competingThread.join();
+
+    // The competing thread will have waited for 500ms, so it should log
+    assertEquals(1, wlogged.get());
+    assertEquals(0, wsuppresed.get());
+    assertEquals(0, wMaxWait.get());
+
+    // despite the lock wait time is greater than threshold
+    // suppress the log warning due to the logging gap
+    // (not recorded in wsuppressed until next log message)
+    lock.lock();   // t = 700
+    competingThread = lockUnlockThread(lock);
+    time.set(1100);
+    lock.unlock(); // t = 1100
+    competingThread.join();
+    assertEquals(1, wlogged.get());
+    assertEquals(0, wsuppresed.get());
+    assertEquals(0, wMaxWait.get());
+
+    // log a warning message when the lock held time is greater the threshold
+    // and the logging time gap is satisfied. Also should display suppressed
+    // previous warnings.
+    time.set(2400);
+    lock.lock();   // t = 2400
+    competingThread = lockUnlockThread(lock);
+    time.set(2800);
+    lock.unlock(); // t = 2800
+    competingThread.join();
+    assertEquals(2, wlogged.get());
+    assertEquals(1, wsuppresed.get());
+    assertEquals(400, wMaxWait.get());
+  }
+
+  private Thread lockUnlockThread(Lock lock) throws InterruptedException {
+    CountDownLatch countDownLatch = new CountDownLatch(1);
+    Thread t = new Thread(() -> {
+      try {
+        assertFalse(lock.tryLock());
+        countDownLatch.countDown();
+        lock.lock();
+      } finally {
+        lock.unlock();
+      }
+    });
+    t.start();
+    countDownLatch.await();
+    // Even with the countdown latch, the main thread releases the lock
+    // before this thread actually starts waiting on it, so introducing a
+    // short sleep so the competing thread can block on the lock as intended.
+    Thread.sleep(3);
+    return t;
   }
 
 }

+ 5 - 4
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedReadWriteLock.java

@@ -146,9 +146,10 @@ public class TestInstrumentedReadWriteLock {
     InstrumentedReadLock readLock = new InstrumentedReadLock(testname, LOG,
         readWriteLock, 2000, 300, mclock) {
       @Override
-      protected void logWarning(long lockHeldTime, long suppressed) {
+      protected void logWarning(
+          long lockHeldTime, SuppressedSnapshot stats) {
         wlogged.incrementAndGet();
-        wsuppresed.set(suppressed);
+        wsuppresed.set(stats.getSuppressedCount());
       }
     };
 
@@ -200,9 +201,9 @@ public class TestInstrumentedReadWriteLock {
     InstrumentedWriteLock writeLock = new InstrumentedWriteLock(testname, LOG,
         readWriteLock, 2000, 300, mclock) {
       @Override
-      protected void logWarning(long lockHeldTime, long suppressed) {
+      protected void logWarning(long lockHeldTime, SuppressedSnapshot stats) {
         wlogged.incrementAndGet();
-        wsuppresed.set(suppressed);
+        wsuppresed.set(stats.getSuppressedCount());
       }
     };