فهرست منبع

HDFS-13791. Limit logging frequency of edit tail related statements. Contributed by Erik Krogen.

Chen Liang 6 سال پیش
والد
کامیت
b4fc93d90b

+ 18 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java

@@ -272,6 +272,24 @@ public class LogThrottlingHelper {
     }
   }
 
+  /**
+   * Helper function to create a message about how many log statements were
+   * suppressed in the provided log action. If no statements were suppressed,
+   * this returns an empty string. The message has the format (without quotes):
+   *
+   * <p/>' (suppressed logging <i>{suppression_count}</i> times)'
+   *
+   * @param action The log action to produce a message about.
+   * @return A message about suppression within this action.
+   */
+  public static String getLogSupressionMessage(LogAction action) {
+    if (action.getCount() > 1) {
+      return " (suppressed logging " + (action.getCount() - 1) + " times)";
+    } else {
+      return "";
+    }
+  }
+
   /**
    * A standard log action which keeps track of all of the values which have
    * been logged. This is also used for internal bookkeeping via its private

+ 13 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java

@@ -56,6 +56,8 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.web.URLConnectionFactory;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.log.LogThrottlingHelper;
+import org.apache.hadoop.log.LogThrottlingHelper.LogAction;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
@@ -105,6 +107,11 @@ public class QuorumJournalManager implements JournalManager {
 
   private int outputBufferCapacity = 512 * 1024;
   private final URLConnectionFactory connectionFactory;
+
+  /** Limit logging about input stream selection to every 5 seconds max. */
+  private static final long SELECT_INPUT_STREAM_LOG_INTERVAL_MS = 5000;
+  private final LogThrottlingHelper selectInputStreamLogHelper =
+      new LogThrottlingHelper(SELECT_INPUT_STREAM_LOG_INTERVAL_MS);
   
   public QuorumJournalManager(Configuration conf,
       URI uri, NamespaceInfo nsInfo) throws IOException {
@@ -565,8 +572,12 @@ public class QuorumJournalManager implements JournalManager {
           "ID " + fromTxnId);
       return;
     }
-    LOG.info("Selected loggers with >= " + maxAllowedTxns +
-        " transactions starting from " + fromTxnId);
+    LogAction logAction = selectInputStreamLogHelper.record(fromTxnId);
+    if (logAction.shouldLog()) {
+      LOG.info("Selected loggers with >= " + maxAllowedTxns + " transactions " +
+          "starting from lowest txn ID " + logAction.getStats(0).getMin() +
+          LogThrottlingHelper.getLogSupressionMessage(logAction));
+    }
     PriorityQueue<EditLogInputStream> allStreams = new PriorityQueue<>(
         responseMap.size(), JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
     for (GetJournaledEditsResponseProto resp : responseMap.values()) {

+ 37 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java

@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import static org.apache.hadoop.hdfs.server.namenode.FSImageFormat.renameReservedPathsOnUpgrade;
-import static org.apache.hadoop.util.Time.monotonicNow;
 
 import java.io.FilterInputStream;
 import java.io.IOException;
@@ -101,27 +100,45 @@ import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
 import org.apache.hadoop.hdfs.util.Holder;
+import org.apache.hadoop.log.LogThrottlingHelper;
 import org.apache.hadoop.util.ChunkedArrayList;
+import org.apache.hadoop.util.Timer;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 
+import static org.apache.hadoop.log.LogThrottlingHelper.LogAction;
+
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class FSEditLogLoader {
   static final Log LOG = LogFactory.getLog(FSEditLogLoader.class.getName());
   static final long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec
 
+  /** Limit logging about edit loading to every 5 seconds max. */
+  @VisibleForTesting
+  static final long LOAD_EDIT_LOG_INTERVAL_MS = 5000;
+  private final LogThrottlingHelper loadEditsLogHelper =
+      new LogThrottlingHelper(LOAD_EDIT_LOG_INTERVAL_MS);
+
   private final FSNamesystem fsNamesys;
   private final BlockManager blockManager;
+  private final Timer timer;
   private long lastAppliedTxId;
   /** Total number of end transactions loaded. */
   private int totalEdits = 0;
   
   public FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId) {
+    this(fsNamesys, lastAppliedTxId, new Timer());
+  }
+
+  @VisibleForTesting
+  FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId, Timer timer) {
     this.fsNamesys = fsNamesys;
     this.blockManager = fsNamesys.getBlockManager();
     this.lastAppliedTxId = lastAppliedTxId;
+    this.timer = timer;
   }
   
   long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId)
@@ -141,13 +158,25 @@ public class FSEditLogLoader {
     prog.beginStep(Phase.LOADING_EDITS, step);
     fsNamesys.writeLock();
     try {
-      long startTime = monotonicNow();
-      FSImage.LOG.info("Start loading edits file " + edits.getName());
+      long startTime = timer.monotonicNow();
+      LogAction preLogAction = loadEditsLogHelper.record("pre", startTime);
+      if (preLogAction.shouldLog()) {
+        FSImage.LOG.info("Start loading edits file " + edits.getName() +
+            LogThrottlingHelper.getLogSupressionMessage(preLogAction));
+      }
       long numEdits = loadEditRecords(edits, false, expectedStartingTxId,
           startOpt, recovery);
-      FSImage.LOG.info("Edits file " + edits.getName() 
-          + " of size " + edits.length() + " edits # " + numEdits 
-          + " loaded in " + (monotonicNow()-startTime)/1000 + " seconds");
+      long endTime = timer.monotonicNow();
+      LogAction postLogAction = loadEditsLogHelper.record("post", endTime,
+          numEdits, edits.length(), endTime - startTime);
+      if (postLogAction.shouldLog()) {
+        String msg = "Loaded " + postLogAction.getCount() + " edits file(s) "
+            + "(the last named " + edits.getName() + ") of " +
+            "total size " + postLogAction.getStats(1).getSum() +
+            ", total edits " + postLogAction.getStats(0).getSum() +
+            ", total load time " + postLogAction.getStats(2).getSum() + " ms";
+        FSImage.LOG.info(msg);
+      }
       return numEdits;
     } finally {
       edits.close();
@@ -182,7 +211,7 @@ public class FSEditLogLoader {
     Step step = createStartupProgressStep(in);
     prog.setTotal(Phase.LOADING_EDITS, step, numTxns);
     Counter counter = prog.getCounter(Phase.LOADING_EDITS, step);
-    long lastLogTime = monotonicNow();
+    long lastLogTime = timer.monotonicNow();
     long lastInodeId = fsNamesys.dir.getLastInodeId();
     
     try {
@@ -262,7 +291,7 @@ public class FSEditLogLoader {
           }
           // log progress
           if (op.hasTransactionId()) {
-            long now = monotonicNow();
+            long now = timer.monotonicNow();
             if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {
               long deltaTxId = lastAppliedTxId - expectedStartingTxId + 1;
               int percent = Math.round((float) deltaTxId / numTxns * 100);

+ 17 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java

@@ -68,6 +68,8 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.hdfs.util.MD5FileUtils;
 import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.log.LogThrottlingHelper;
+import org.apache.hadoop.log.LogThrottlingHelper.LogAction;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.Time;
@@ -128,6 +130,11 @@ public class FSImage implements Closeable {
   private final Set<Long> currentlyCheckpointing =
       Collections.<Long>synchronizedSet(new HashSet<Long>());
 
+  /** Limit logging about edit loading to every 5 seconds max. */
+  private static final long LOAD_EDIT_LOG_INTERVAL_MS = 5000;
+  private final LogThrottlingHelper loadEditLogHelper =
+      new LogThrottlingHelper(LOAD_EDIT_LOG_INTERVAL_MS);
+
   /**
    * Construct an FSImage
    * @param conf Configuration
@@ -891,8 +898,16 @@ public class FSImage implements Closeable {
       
       // Load latest edits
       for (EditLogInputStream editIn : editStreams) {
-        LOG.info("Reading " + editIn + " expecting start txid #" +
-              (lastAppliedTxId + 1));
+        LogAction logAction = loadEditLogHelper.record();
+        if (logAction.shouldLog()) {
+          String logSuppressed = "";
+          if (logAction.getCount() > 1) {
+            logSuppressed = "; suppressed logging for " +
+                (logAction.getCount() - 1) + " edit reads";
+          }
+          LOG.info("Reading " + editIn + " expecting start txid #" +
+              (lastAppliedTxId + 1) + logSuppressed);
+        }
         try {
           loader.loadFSEdits(editIn, lastAppliedTxId + 1, startOpt, recovery);
         } finally {

+ 13 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java

@@ -28,6 +28,8 @@ import org.apache.hadoop.io.IOUtils;
 
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Longs;
+import org.apache.hadoop.log.LogThrottlingHelper;
+import org.apache.hadoop.log.LogThrottlingHelper.LogAction;
 
 /**
  * A merged input stream that handles failover between different edit logs.
@@ -43,6 +45,11 @@ class RedundantEditLogInputStream extends EditLogInputStream {
   private long prevTxId;
   private final EditLogInputStream[] streams;
 
+  /** Limit logging about fast forwarding the stream to every 5 seconds max. */
+  private static final long FAST_FORWARD_LOGGING_INTERVAL_MS = 5000;
+  private final LogThrottlingHelper fastForwardLoggingHelper =
+      new LogThrottlingHelper(FAST_FORWARD_LOGGING_INTERVAL_MS);
+
   /**
    * States that the RedundantEditLogInputStream can be in.
    *
@@ -174,8 +181,12 @@ class RedundantEditLogInputStream extends EditLogInputStream {
       case SKIP_UNTIL:
        try {
           if (prevTxId != HdfsServerConstants.INVALID_TXID) {
-            LOG.info("Fast-forwarding stream '" + streams[curIdx].getName() +
-                "' to transaction ID " + (prevTxId + 1));
+            LogAction logAction = fastForwardLoggingHelper.record();
+            if (logAction.shouldLog()) {
+              LOG.info("Fast-forwarding stream '" + streams[curIdx].getName() +
+                  "' to transaction ID " + (prevTxId + 1) +
+                  LogThrottlingHelper.getLogSupressionMessage(logAction));
+            }
             streams[curIdx].skipUntil(prevTxId + 1);
           }
         } catch (IOException e) {

+ 47 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java

@@ -19,10 +19,13 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 import java.io.BufferedInputStream;
 import java.io.File;
@@ -50,8 +53,10 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
 import org.apache.hadoop.test.PathUtils;
 import org.apache.log4j.Level;
+import org.apache.hadoop.util.FakeTimer;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -90,6 +95,7 @@ public class TestFSEditLogLoader {
   private static final File TEST_DIR = PathUtils.getTestDir(TestFSEditLogLoader.class);
 
   private static final int NUM_DATA_NODES = 0;
+  private static final String FAKE_EDIT_STREAM_NAME = "FAKE_STREAM";
 
   @Test
   public void testDisplayRecentEditLogOpCodes() throws IOException {
@@ -442,4 +448,45 @@ public class TestFSEditLogLoader {
           fromByte(code), FSEditLogOpCodes.fromByte(code));
     }
   }
+  
+  @Test
+  public void setLoadFSEditLogThrottling() throws Exception {
+    FSNamesystem namesystem = mock(FSNamesystem.class);
+    namesystem.dir = mock(FSDirectory.class);
+
+    FakeTimer timer = new FakeTimer();
+    FSEditLogLoader loader = new FSEditLogLoader(namesystem, 0, timer);
+
+    LogCapturer capture = LogCapturer.captureLogs(FSImage.LOG);
+    loader.loadFSEdits(getFakeEditLogInputStream(1, 10), 1);
+    assertTrue(capture.getOutput().contains("Start loading edits file " +
+        FAKE_EDIT_STREAM_NAME));
+    assertTrue(capture.getOutput().contains("Loaded 1 edits file(s)"));
+    assertFalse(capture.getOutput().contains("suppressed"));
+
+    timer.advance(FSEditLogLoader.LOAD_EDIT_LOG_INTERVAL_MS / 2);
+    capture.clearOutput();
+    loader.loadFSEdits(getFakeEditLogInputStream(11, 20), 11);
+    assertFalse(capture.getOutput().contains("Start loading edits file"));
+    assertFalse(capture.getOutput().contains("edits file(s)"));
+
+    timer.advance(FSEditLogLoader.LOAD_EDIT_LOG_INTERVAL_MS);
+    capture.clearOutput();
+    loader.loadFSEdits(getFakeEditLogInputStream(21, 30), 21);
+    assertTrue(capture.getOutput().contains("Start loading edits file " +
+        FAKE_EDIT_STREAM_NAME));
+    assertTrue(capture.getOutput().contains("suppressed logging 1 times"));
+    assertTrue(capture.getOutput().contains("Loaded 2 edits file(s)"));
+    assertTrue(capture.getOutput().contains("total size 2.0"));
+  }
+
+  private EditLogInputStream getFakeEditLogInputStream(long startTx, long endTx)
+      throws IOException {
+    EditLogInputStream fakeStream = mock(EditLogInputStream.class);
+    when(fakeStream.getName()).thenReturn(FAKE_EDIT_STREAM_NAME);
+    when(fakeStream.getFirstTxId()).thenReturn(startTx);
+    when(fakeStream.getLastTxId()).thenReturn(endTx);
+    when(fakeStream.length()).thenReturn(1L);
+    return fakeStream;
+  }
 }