Преглед изворни кода

HDFS-1315. Add fsck event to audit log and remove other audit log events corresponding to FSCK listStatus and open calls. Contributed by Suresh Srinivas.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@967293 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas пре 15 година
родитељ
комит
912cbf6116

+ 2 - 0
CHANGES.txt

@@ -88,6 +88,8 @@ Trunk (unreleased changes)
     HDFS-1302. The HDFS side of the changes corresponding to HADOOP-6861.
     HDFS-1302. The HDFS side of the changes corresponding to HADOOP-6861.
     (Jitendra Pandey & Owen O'Malley via ddas)
     (Jitendra Pandey & Owen O'Malley via ddas)
     
     
+    HDFS-1315. Add fsck event to audit log and remove other audit log events 
+    corresponding to FSCK listStatus and open calls. (suresh)
 
 
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 

+ 42 - 19
src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -675,7 +675,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     checkOwner(src);
     checkOwner(src);
     dir.setPermission(src, permission);
     dir.setPermission(src, permission);
     getEditLog().logSync();
     getEditLog().logSync();
-    if (auditLog.isInfoEnabled()) {
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(src, false);
       final HdfsFileStatus stat = dir.getFileInfo(src, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
                     Server.getRemoteIp(),
@@ -704,7 +704,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     }
     }
     dir.setOwner(src, username, group);
     dir.setOwner(src, username, group);
     getEditLog().logSync();
     getEditLog().logSync();
-    if (auditLog.isInfoEnabled()) {
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(src, false);
       final HdfsFileStatus stat = dir.getFileInfo(src, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
                     Server.getRemoteIp(),
@@ -719,7 +719,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
   LocatedBlocks getBlockLocations(String clientMachine, String src,
   LocatedBlocks getBlockLocations(String clientMachine, String src,
       long offset, long length) throws AccessControlException,
       long offset, long length) throws AccessControlException,
       FileNotFoundException, UnresolvedLinkException, IOException {
       FileNotFoundException, UnresolvedLinkException, IOException {
-    LocatedBlocks blocks = getBlockLocations(src, offset, length, true);
+    LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true);
     if (blocks != null) {
     if (blocks != null) {
       //sort the blocks
       //sort the blocks
       DatanodeDescriptor client = host2DataNodeMap.getDatanodeByHost(
       DatanodeDescriptor client = host2DataNodeMap.getDatanodeByHost(
@@ -737,7 +737,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
    * @throws FileNotFoundException
    * @throws FileNotFoundException
    */
    */
   LocatedBlocks getBlockLocations(String src, long offset, long length,
   LocatedBlocks getBlockLocations(String src, long offset, long length,
-      boolean doAccessTime) throws FileNotFoundException,
+      boolean doAccessTime, boolean needBlockToken) throws FileNotFoundException,
       UnresolvedLinkException, IOException {
       UnresolvedLinkException, IOException {
     if (isPermissionEnabled) {
     if (isPermissionEnabled) {
       checkPathAccess(src, FsAction.READ);
       checkPathAccess(src, FsAction.READ);
@@ -752,8 +752,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
           "Negative length is not supported. File: " + src);
           "Negative length is not supported. File: " + src);
     }
     }
     final LocatedBlocks ret = getBlockLocationsInternal(src,
     final LocatedBlocks ret = getBlockLocationsInternal(src,
-        offset, length, doAccessTime);  
-    if (auditLog.isInfoEnabled()) {
+        offset, length, doAccessTime, needBlockToken);  
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
                     Server.getRemoteIp(),
                     "open", src, null, null);
                     "open", src, null, null);
@@ -764,7 +764,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
   private synchronized LocatedBlocks getBlockLocationsInternal(String src,
   private synchronized LocatedBlocks getBlockLocationsInternal(String src,
                                                        long offset, 
                                                        long offset, 
                                                        long length,
                                                        long length,
-                                                       boolean doAccessTime)
+                                                       boolean doAccessTime, 
+                                                       boolean needBlockToken)
       throws FileNotFoundException, UnresolvedLinkException, IOException {
       throws FileNotFoundException, UnresolvedLinkException, IOException {
     INodeFile inode = dir.getFileINode(src);
     INodeFile inode = dir.getFileINode(src);
     if (inode == null)
     if (inode == null)
@@ -793,7 +794,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
         LOG.debug("last = " + last);
         LOG.debug("last = " + last);
       }
       }
 
 
-      if(isBlockTokenEnabled) setBlockTokens(locatedblocks);
+      if(isBlockTokenEnabled && needBlockToken) {
+        setBlockTokens(locatedblocks);
+      }
 
 
       if (last.isComplete()) {
       if (last.isComplete()) {
         return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
         return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
@@ -957,7 +960,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     getEditLog().logSync();
     getEditLog().logSync();
    
    
     
     
-    if (auditLog.isInfoEnabled()) {
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(target, false);
       final HdfsFileStatus stat = dir.getFileInfo(target, false);
       logAuditEvent(UserGroupInformation.getLoginUser(),
       logAuditEvent(UserGroupInformation.getLoginUser(),
                     Server.getRemoteIp(),
                     Server.getRemoteIp(),
@@ -985,7 +988,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     INodeFile inode = dir.getFileINode(src);
     INodeFile inode = dir.getFileINode(src);
     if (inode != null) {
     if (inode != null) {
       dir.setTimes(src, inode, mtime, atime, true);
       dir.setTimes(src, inode, mtime, atime, true);
-      if (auditLog.isInfoEnabled()) {
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
         final HdfsFileStatus stat = dir.getFileInfo(src, false);
         final HdfsFileStatus stat = dir.getFileInfo(src, false);
         logAuditEvent(UserGroupInformation.getCurrentUser(),
         logAuditEvent(UserGroupInformation.getCurrentUser(),
                       Server.getRemoteIp(),
                       Server.getRemoteIp(),
@@ -1007,7 +1010,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     }
     }
     createSymlinkInternal(target, link, dirPerms, createParent);
     createSymlinkInternal(target, link, dirPerms, createParent);
     getEditLog().logSync();
     getEditLog().logSync();
-    if (auditLog.isInfoEnabled()) {
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(link, false);
       final HdfsFileStatus stat = dir.getFileInfo(link, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
                     Server.getRemoteIp(),
@@ -1063,7 +1066,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     throws IOException, UnresolvedLinkException {
     throws IOException, UnresolvedLinkException {
     boolean status = setReplicationInternal(src, replication);
     boolean status = setReplicationInternal(src, replication);
     getEditLog().logSync();
     getEditLog().logSync();
-    if (status && auditLog.isInfoEnabled()) {
+    if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
                     Server.getRemoteIp(),
                     "setReplication", src, null, null);
                     "setReplication", src, null, null);
@@ -1148,7 +1151,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     startFileInternal(src, permissions, holder, clientMachine, flag,
     startFileInternal(src, permissions, holder, clientMachine, flag,
         createParent, replication, blockSize);
         createParent, replication, blockSize);
     getEditLog().logSync();
     getEditLog().logSync();
-    if (auditLog.isInfoEnabled()) {
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(src, false);
       final HdfsFileStatus stat = dir.getFileInfo(src, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
                     Server.getRemoteIp(),
@@ -1407,7 +1410,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
       }
       }
     }
     }
 
 
-    if (auditLog.isInfoEnabled()) {
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
                     Server.getRemoteIp(),
                     "append", src, null, null);
                     "append", src, null, null);
@@ -1695,7 +1698,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     throws IOException, UnresolvedLinkException {
     throws IOException, UnresolvedLinkException {
     boolean status = renameToInternal(src, dst);
     boolean status = renameToInternal(src, dst);
     getEditLog().logSync();
     getEditLog().logSync();
-    if (status && auditLog.isInfoEnabled()) {
+    if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(dst, false);
       final HdfsFileStatus stat = dir.getFileInfo(dst, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
                     Server.getRemoteIp(),
@@ -1738,7 +1741,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
       throws IOException, UnresolvedLinkException {
       throws IOException, UnresolvedLinkException {
     renameToInternal(src, dst, options);
     renameToInternal(src, dst, options);
     getEditLog().logSync();
     getEditLog().logSync();
-    if (auditLog.isInfoEnabled()) {
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       StringBuilder cmd = new StringBuilder("rename options=");
       StringBuilder cmd = new StringBuilder("rename options=");
       for (Rename option : options) {
       for (Rename option : options) {
         cmd.append(option.value()).append(" ");
         cmd.append(option.value()).append(" ");
@@ -1787,7 +1790,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
         NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
         NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
       }
       }
       boolean status = deleteInternal(src, true);
       boolean status = deleteInternal(src, true);
-      if (status && auditLog.isInfoEnabled()) {
+      if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
         logAuditEvent(UserGroupInformation.getCurrentUser(),
         logAuditEvent(UserGroupInformation.getCurrentUser(),
                       Server.getRemoteIp(),
                       Server.getRemoteIp(),
                       "delete", src, null, null);
                       "delete", src, null, null);
@@ -1895,7 +1898,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
       boolean createParent) throws IOException, UnresolvedLinkException {
       boolean createParent) throws IOException, UnresolvedLinkException {
     boolean status = mkdirsInternal(src, permissions, createParent);
     boolean status = mkdirsInternal(src, permissions, createParent);
     getEditLog().logSync();
     getEditLog().logSync();
-    if (status && auditLog.isInfoEnabled()) {
+    if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(src, false);
       final HdfsFileStatus stat = dir.getFileInfo(src, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
                     Server.getRemoteIp(),
@@ -2268,7 +2271,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
         checkTraverse(src);
         checkTraverse(src);
       }
       }
     }
     }
-    if (auditLog.isInfoEnabled()) {
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
                     Server.getRemoteIp(),
                     "listStatus", src, null, null);
                     "listStatus", src, null, null);
@@ -4679,4 +4682,24 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     }
     }
     return authMethod;
     return authMethod;
   }
   }
+  
+  /**
+   * If the remote IP for namenode method invokation is null, then the
+   * invocation is internal to the namenode. Client invoked methods are invoked
+   * over RPC and always have address != null.
+   */
+  private boolean isExternalInvocation() {
+    return Server.getRemoteIp() != null;
+  }
+  
+  /**
+   * Log fsck event in the audit log 
+   */
+  void logFsckEvent(String src, InetAddress remoteAddress) throws IOException {
+    if (auditLog.isInfoEnabled()) {
+      logAuditEvent(UserGroupInformation.getCurrentUser(),
+                    remoteAddress,
+                    "fsck", src, null, null);
+    }
+  }
 }
 }

+ 4 - 1
src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.PrintWriter;
+import java.net.InetAddress;
 import java.security.PrivilegedExceptionAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.Map;
 import java.util.Map;
 
 
@@ -46,6 +47,8 @@ public class FsckServlet extends DfsServlet {
     @SuppressWarnings("unchecked")
     @SuppressWarnings("unchecked")
     final Map<String,String[]> pmap = request.getParameterMap();
     final Map<String,String[]> pmap = request.getParameterMap();
     final PrintWriter out = response.getWriter();
     final PrintWriter out = response.getWriter();
+    final InetAddress remoteAddress = 
+      InetAddress.getByName(request.getRemoteAddr());
     final Configuration conf = 
     final Configuration conf = 
       (Configuration) getServletContext().getAttribute(JspHelper.CURRENT_CONF);
       (Configuration) getServletContext().getAttribute(JspHelper.CURRENT_CONF);
 
 
@@ -64,7 +67,7 @@ public class FsckServlet extends DfsServlet {
           final short minReplication = namesystem.getMinReplication();
           final short minReplication = namesystem.getMinReplication();
 
 
           new NamenodeFsck(conf, nn, nn.getNetworkTopology(), pmap, out,
           new NamenodeFsck(conf, nn, nn.getNetworkTopology(), pmap, out,
-              totalDatanodes, minReplication).fsck();
+              totalDatanodes, minReplication, remoteAddress).fsck();
           
           
           return null;
           return null;
         }
         }

+ 0 - 13
src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -719,19 +719,6 @@ public class NameNode implements NamenodeProtocols, FSConstants {
                                         src, offset, length);
                                         src, offset, length);
   }
   }
   
   
-  /**
-   * The specification of this method matches that of
-   * {@link getBlockLocations(Path)}
-   * except that it does not update the file's access time.
-   */
-  LocatedBlocks getBlockLocationsNoATime(String src, 
-                                         long offset, 
-                                         long length)
-      throws IOException {
-    myMetrics.numGetBlockLocations.inc();
-    return namesystem.getBlockLocations(src, offset, length, false);
-  }
-  
   private static String getClientMachine() {
   private static String getClientMachine() {
     String clientMachine = Server.getRemoteAddress();
     String clientMachine = Server.getRemoteAddress();
     if (clientMachine == null) {
     if (clientMachine == null) {

+ 19 - 6
src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 import java.io.IOException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.OutputStream;
 import java.io.PrintWriter;
 import java.io.PrintWriter;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.Socket;
 import java.util.ArrayList;
 import java.util.ArrayList;
@@ -49,6 +50,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
 
 
 /**
 /**
  * This class provides rudimentary checking of DFS volumes for errors and
  * This class provides rudimentary checking of DFS volumes for errors and
@@ -93,6 +95,7 @@ public class NamenodeFsck {
   private final NetworkTopology networktopology;
   private final NetworkTopology networktopology;
   private final int totalDatanodes;
   private final int totalDatanodes;
   private final short minReplication;
   private final short minReplication;
+  private final InetAddress remoteAddress;
 
 
   private String lostFound = null;
   private String lostFound = null;
   private boolean lfInited = false;
   private boolean lfInited = false;
@@ -113,20 +116,24 @@ public class NamenodeFsck {
    * Filesystem checker.
    * Filesystem checker.
    * @param conf configuration (namenode config)
    * @param conf configuration (namenode config)
    * @param nn namenode that this fsck is going to use
    * @param nn namenode that this fsck is going to use
-   * @param pmap key=value[] map that is passed to the http servlet as url parameters
-   * @param response the object into which  this servelet writes the url contents
+   * @param pmap key=value[] map passed to the http servlet as url parameters
+   * @param out output stream to write the fsck output
+   * @param totalDatanodes number of live datanodes
+   * @param minReplication minimum replication
+   * @param remoteAddress source address of the fsck request
    * @throws IOException
    * @throws IOException
    */
    */
   NamenodeFsck(Configuration conf, NameNode namenode,
   NamenodeFsck(Configuration conf, NameNode namenode,
       NetworkTopology networktopology, 
       NetworkTopology networktopology, 
       Map<String,String[]> pmap, PrintWriter out,
       Map<String,String[]> pmap, PrintWriter out,
-      int totalDatanodes, short minReplication) {
+      int totalDatanodes, short minReplication, InetAddress remoteAddress) {
     this.conf = conf;
     this.conf = conf;
     this.namenode = namenode;
     this.namenode = namenode;
     this.networktopology = networktopology;
     this.networktopology = networktopology;
     this.out = out;
     this.out = out;
     this.totalDatanodes = totalDatanodes;
     this.totalDatanodes = totalDatanodes;
     this.minReplication = minReplication;
     this.minReplication = minReplication;
+    this.remoteAddress = remoteAddress;
 
 
     for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
     for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
       String key = it.next();
       String key = it.next();
@@ -148,7 +155,11 @@ public class NamenodeFsck {
   public void fsck() {
   public void fsck() {
     final long startTime = System.currentTimeMillis();
     final long startTime = System.currentTimeMillis();
     try {
     try {
-      out.println("Namenode FSCK started at " + new Date());
+      String msg = "FSCK started by " + UserGroupInformation.getCurrentUser()
+          + " from " + remoteAddress + " for path " + path + " at " + new Date();
+      LOG.info(msg);
+      out.println(msg);
+      namenode.getNamesystem().logFsckEvent(path, remoteAddress);
 
 
       final HdfsFileStatus file = namenode.getFileInfo(path);
       final HdfsFileStatus file = namenode.getFileInfo(path);
       if (file != null) {
       if (file != null) {
@@ -182,7 +193,6 @@ public class NamenodeFsck {
       } else {
       } else {
         out.print("\n\nPath '" + path + "' " + NONEXISTENT_STATUS);
         out.print("\n\nPath '" + path + "' " + NONEXISTENT_STATUS);
       }
       }
-
     } catch (Exception e) {
     } catch (Exception e) {
       String errMsg = "Fsck on path '" + path + "' " + FAILURE_STATUS;
       String errMsg = "Fsck on path '" + path + "' " + FAILURE_STATUS;
       LOG.warn(errMsg, e);
       LOG.warn(errMsg, e);
@@ -270,7 +280,10 @@ public class NamenodeFsck {
       return;
       return;
     }
     }
     long fileLen = file.getLen();
     long fileLen = file.getLen();
-    LocatedBlocks blocks = namenode.getBlockLocationsNoATime(path, 0, fileLen);
+    // Get block locations without updating the file access time 
+    // and without block access tokens
+    LocatedBlocks blocks = namenode.getNamesystem().getBlockLocations(path, 0,
+        fileLen, false, false);
     if (blocks == null) { // the file is deleted
     if (blocks == null) { // the file is deleted
       return;
       return;
     }
     }

+ 1 - 1
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java

@@ -38,7 +38,7 @@ public class NameNodeAdapter {
   public static LocatedBlocks getBlockLocations(NameNode namenode,
   public static LocatedBlocks getBlockLocations(NameNode namenode,
       String src, long offset, long length) throws IOException {
       String src, long offset, long length) throws IOException {
     return namenode.getNamesystem().getBlockLocations(
     return namenode.getNamesystem().getBlockLocations(
-        src, offset, length, false);
+        src, offset, length, false, true);
   }
   }
 
 
   /**
   /**

+ 46 - 0
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java

@@ -18,8 +18,10 @@
 
 
 package org.apache.hadoop.hdfs.server.namenode;
 package org.apache.hadoop.hdfs.server.namenode;
 
 
+import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.File;
+import java.io.FileReader;
 import java.io.IOException;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.io.PrintStream;
 import java.io.RandomAccessFile;
 import java.io.RandomAccessFile;
@@ -27,6 +29,7 @@ import java.net.InetSocketAddress;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileChannel;
 import java.security.PrivilegedExceptionAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.Random;
 import java.util.Random;
+import java.util.regex.Pattern;
 
 
 import junit.framework.TestCase;
 import junit.framework.TestCase;
 
 
@@ -52,11 +55,25 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Level;
 import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.log4j.RollingFileAppender;
 
 
 /**
 /**
  * A JUnit test for doing fsck
  * A JUnit test for doing fsck
  */
  */
 public class TestFsck extends TestCase {
 public class TestFsck extends TestCase {
+  static final String auditLogFile = System.getProperty("test.build.dir",
+      "build/test") + "/audit.log";
+  
+  // Pattern for: 
+  // ugi=name ip=/address cmd=FSCK src=/ dst=null perm=null
+  static final Pattern fsckPattern = Pattern.compile(
+      "ugi=.*?\\s" + 
+      "ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" + 
+      "cmd=fsck\\ssrc=\\/\\sdst=null\\s" + 
+      "perm=null");
+  
   static String runFsck(Configuration conf, int expectedErrCode, 
   static String runFsck(Configuration conf, int expectedErrCode, 
                         boolean checkErrorCode,String... path) 
                         boolean checkErrorCode,String... path) 
                         throws Exception {
                         throws Exception {
@@ -91,7 +108,9 @@ public class TestFsck extends TestCase {
       final Path file = new Path(fileName);
       final Path file = new Path(fileName);
       long aTime = fs.getFileStatus(file).getAccessTime();
       long aTime = fs.getFileStatus(file).getAccessTime();
       Thread.sleep(precision);
       Thread.sleep(precision);
+      setupAuditLogs();
       String outStr = runFsck(conf, 0, true, "/");
       String outStr = runFsck(conf, 0, true, "/");
+      verifyAuditLogs();
       assertEquals(aTime, fs.getFileStatus(file).getAccessTime());
       assertEquals(aTime, fs.getFileStatus(file).getAccessTime());
       assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
       assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
       System.out.println(outStr);
       System.out.println(outStr);
@@ -117,6 +136,33 @@ public class TestFsck extends TestCase {
     }
     }
   }
   }
 
 
+  /** Sets up log4j logger for auditlogs */
+  private void setupAuditLogs() throws IOException {
+    File file = new File(auditLogFile);
+    if (file.exists()) {
+      file.delete();
+    }
+    Logger logger = ((Log4JLogger) FSNamesystem.auditLog).getLogger();
+    logger.setLevel(Level.INFO);
+    PatternLayout layout = new PatternLayout("%m%n");
+    RollingFileAppender appender = new RollingFileAppender(layout, auditLogFile);
+    logger.addAppender(appender);
+  }
+  
+  private void verifyAuditLogs() throws IOException {
+    // Turn off the logs
+    Logger logger = ((Log4JLogger) FSNamesystem.auditLog).getLogger();
+    logger.setLevel(Level.OFF);
+    
+    // Ensure audit log has only one for FSCK
+    BufferedReader reader = new BufferedReader(new FileReader(auditLogFile));
+    String line = reader.readLine();
+    assertNotNull(line);
+    assertTrue("Expected fsck event not found in audit log",
+        fsckPattern.matcher(line).matches());
+    assertNull("Unexpected event in audit log", reader.readLine());
+  }
+  
   public void testFsckNonExistent() throws Exception {
   public void testFsckNonExistent() throws Exception {
     DFSTestUtil util = new DFSTestUtil("TestFsck", 20, 3, 8*1024);
     DFSTestUtil util = new DFSTestUtil("TestFsck", 20, 3, 8*1024);
     MiniDFSCluster cluster = null;
     MiniDFSCluster cluster = null;