Jelajahi Sumber

HDFS-5241. Provide alternate queuing audit logger to reduce logging contention (daryn)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1560761 13f79535-47bb-0310-9956-ffa450edef68
Daryn Sharp 11 tahun lalu
induk
melakukan
a4e66e09e9

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -511,6 +511,9 @@ Release 2.4.0 - UNRELEASED
 
     HDFS-5681. renewLease should not hold fsn write lock. (daryn via Kihwal)
 
+    HDFS-5241. Provide alternate queuing audit logger to reduce logging
+    contention (daryn)
+
   BUG FIXES
 
     HDFS-5034.  Remove debug prints from GetFileLinkInfo (Andrew Wang via Colin

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -304,6 +304,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME = "default";
   public static final String  DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY = "dfs.namenode.audit.log.token.tracking.id";
   public static final boolean DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT = false;
+  public static final String  DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY = "dfs.namenode.audit.log.async";
+  public static final boolean DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT = false;
 
   // Much code in hdfs is not yet updated to use these keys.
   public static final String  DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY = "dfs.client.block.write.locateFollowingBlock.retries";

+ 32 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -38,6 +38,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECI
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME;
@@ -122,6 +124,7 @@ import javax.management.StandardMBean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -250,6 +253,9 @@ import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.VersionInfo;
+import org.apache.log4j.Appender;
+import org.apache.log4j.AsyncAppender;
+import org.apache.log4j.Logger;
 import org.mortbay.util.ajax.JSON;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -654,6 +660,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    */
   FSNamesystem(Configuration conf, FSImage fsImage, boolean ignoreRetryCache)
       throws IOException {
+    if (conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY,
+                        DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT)) {
+      LOG.info("Enabling async auditlog");
+      enableAsyncAuditLog();
+    }
     boolean fair = conf.getBoolean("dfs.namenode.fslock.fair", true);
     LOG.info("fsLock is fair:" + fair);
     fsLock = new FSNamesystemLock(fair);
@@ -7389,5 +7400,26 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       auditLog.info(message);
     }
   }
+
+  private static void enableAsyncAuditLog() {
+    if (!(auditLog instanceof Log4JLogger)) {
+      LOG.warn("Log4j is required to enable async auditlog");
+      return;
+    }
+    Logger logger = ((Log4JLogger)auditLog).getLogger();
+    @SuppressWarnings("unchecked")
+    List<Appender> appenders = Collections.list(logger.getAllAppenders());
+    // failsafe against trying to async it more than once
+    if (!appenders.isEmpty() && !(appenders.get(0) instanceof AsyncAppender)) {
+      AsyncAppender asyncAppender = new AsyncAppender();
+      // change logger to have an async appender containing all the
+      // previously configured appenders
+      for (Appender appender : appenders) {
+        logger.removeAppender(appender);
+        asyncAppender.addAppender(appender);
+      }
+      logger.addAppender(asyncAppender);        
+    }
+  }
 }
 

+ 44 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java

@@ -18,16 +18,17 @@
 
 package org.apache.hadoop.hdfs.server.namenode;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.regex.Pattern;
 
 import org.apache.commons.logging.impl.Log4JLogger;
@@ -46,6 +47,8 @@ import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.PathUtils;
+import org.apache.log4j.Appender;
+import org.apache.log4j.AsyncAppender;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -54,13 +57,30 @@ import org.apache.log4j.RollingFileAppender;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 /**
  * A JUnit test that audit logs are generated
  */
+@RunWith(Parameterized.class)
 public class TestAuditLogs {
   static final String auditLogFile = PathUtils.getTestDirName(TestAuditLogs.class) + "/TestAuditLogs-audit.log";
+  boolean useAsyncLog;
   
+  @Parameters
+  public static Collection<Object[]> data() {
+    Collection<Object[]> params = new ArrayList<Object[]>();
+    params.add(new Object[]{new Boolean(false)});
+    params.add(new Object[]{new Boolean(true)});
+    return params;
+  }
+  
+  public TestAuditLogs(boolean useAsyncLog) {
+    this.useAsyncLog = useAsyncLog;
+  }
+
   // Pattern for: 
   // allowed=(true|false) ugi=name ip=/address cmd={cmd} src={path} dst=null perm=null
   static final Pattern auditPattern = Pattern.compile(
@@ -84,17 +104,28 @@ public class TestAuditLogs {
 
   @Before
   public void setupCluster() throws Exception {
+    // must configure prior to instantiating the namesystem because it
+    // will reconfigure the logger if async is enabled
+    configureAuditLogs();
     conf = new HdfsConfiguration();
     final long precision = 1L;
     conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, precision);
     conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
     conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY, useAsyncLog);
     util = new DFSTestUtil.Builder().setName("TestAuditAllowed").
         setNumFiles(20).build();
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
     fs = cluster.getFileSystem();
     util.createFiles(fs, fileName);
 
+    // make sure the appender is what it's supposed to be
+    Logger logger = ((Log4JLogger) FSNamesystem.auditLog).getLogger();
+    @SuppressWarnings("unchecked")
+    List<Appender> appenders = Collections.list(logger.getAllAppenders());
+    assertEquals(1, appenders.size());
+    assertEquals(useAsyncLog, appenders.get(0) instanceof AsyncAppender);
+    
     fnames = util.getFileNames(fileName);
     util.waitReplication(fs, fileName, (short)3);
     userGroupInfo = UserGroupInformation.createUserForTesting(username, groups);
@@ -203,6 +234,7 @@ public class TestAuditLogs {
     try {
       hftpFs = (HftpFileSystem) new Path(hftpUri).getFileSystem(conf);
       InputStream istream = hftpFs.open(file);
+      @SuppressWarnings("unused")
       int val = istream.read();
       istream.close();
 
@@ -234,6 +266,12 @@ public class TestAuditLogs {
 
   /** Sets up log4j logger for auditlogs */
   private void setupAuditLogs() throws IOException {
+    Logger logger = ((Log4JLogger) FSNamesystem.auditLog).getLogger();
+    // enable logging now that the test is ready to run
+    logger.setLevel(Level.INFO);
+  }
+  
+  private void configureAuditLogs() throws IOException {
     // Shutdown the LogManager to release all logger open file handles.
     // Unfortunately, Apache commons logging library does not provide
     // means to release underlying loggers. For additional info look up
@@ -245,7 +283,8 @@ public class TestAuditLogs {
       assertTrue(file.delete());
     }
     Logger logger = ((Log4JLogger) FSNamesystem.auditLog).getLogger();
-    logger.setLevel(Level.INFO);
+    // disable logging while the cluster startup preps files
+    logger.setLevel(Level.OFF);
     PatternLayout layout = new PatternLayout("%m%n");
     RollingFileAppender appender = new RollingFileAppender(layout, auditLogFile);
     logger.addAppender(appender);