Procházet zdrojové kódy

HDFS-9395. Make HDFS audit logging consistant. Contributed by Kuhu Shukla.

Kihwal Lee před 9 roky
rodič
revize
d27d7fc72e

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -2854,6 +2854,8 @@ Release 2.7.3 - UNRELEASED
     HDFS-8578. On upgrade, Datanode should process all storage/data dirs in
     parallel.  (vinayakumarb and szetszwo via szetszwo)
 
+    HDFS-9395. Make HDFS audit logging consistant (Kuhu Shukla via kihwal)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 131 - 62
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -1792,13 +1792,16 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkNameNodeSafeMode("Cannot concat " + target);
       stat = FSDirConcatOp.concat(dir, target, srcs, logRetryCache);
       success = true;
+    } catch (AccessControlException ace) {
+      logAuditEvent(success, "concat", Arrays.toString(srcs), target, stat);
+      throw ace;
     } finally {
       writeUnlock();
       if (success) {
         getEditLog().logSync();
       }
-      logAuditEvent(success, "concat", Arrays.toString(srcs), target, stat);
     }
+    logAuditEvent(success, "concat", Arrays.toString(srcs), target, stat);
   }
 
   /**
@@ -2652,9 +2655,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     boolean success = ret != null && ret.success;
     if (success) {
       getEditLog().logSync();
+      logAuditEvent(success, "rename", src, dst, ret.auditStat);
     }
-    logAuditEvent(success, "rename", src, dst,
-        ret == null ? null : ret.auditStat);
     return success;
   }
 
@@ -2868,16 +2870,19 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     checkOperation(OperationCategory.READ);
     readLock();
     boolean success = true;
+    ContentSummary cs;
     try {
       checkOperation(OperationCategory.READ);
-      return FSDirStatAndListingOp.getContentSummary(dir, src);
+      cs = FSDirStatAndListingOp.getContentSummary(dir, src);
     } catch (AccessControlException ace) {
       success = false;
+      logAuditEvent(success, "contentSummary", src);
       throw ace;
     } finally {
       readUnlock();
-      logAuditEvent(success, "contentSummary", src);
     }
+    logAuditEvent(success, "contentSummary", src);
+    return cs;
   }
 
   /**
@@ -2896,18 +2901,21 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    */
   QuotaUsage getQuotaUsage(final String src) throws IOException {
     checkOperation(OperationCategory.READ);
+    QuotaUsage quotaUsage;
     readLock();
     boolean success = true;
     try {
       checkOperation(OperationCategory.READ);
-      return FSDirStatAndListingOp.getQuotaUsage(dir, src);
+      quotaUsage = FSDirStatAndListingOp.getQuotaUsage(dir, src);
     } catch (AccessControlException ace) {
       success = false;
+      logAuditEvent(success, "quotaUsage", src);
       throw ace;
     } finally {
       readUnlock();
-      logAuditEvent(success, "quotaUsage", src);
     }
+    logAuditEvent(success, "quotaUsage", src);
+    return quotaUsage;
   }
 
   /**
@@ -2930,13 +2938,16 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkNameNodeSafeMode("Cannot set quota on " + src);
       FSDirAttrOp.setQuota(dir, src, nsQuota, ssQuota, type);
       success = true;
+    } catch (AccessControlException ace) {
+      logAuditEvent(success, "setQuota", src);
+      throw ace;
     } finally {
       writeUnlock();
       if (success) {
         getEditLog().logSync();
       }
-      logAuditEvent(success, "setQuota", src);
     }
+    logAuditEvent(success, "setQuota", src);
   }
 
   /** Persist all metadata about this file.
@@ -5775,17 +5786,23 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   String createSnapshot(String snapshotRoot, String snapshotName,
                         boolean logRetryCache) throws IOException {
     String snapshotPath = null;
+    boolean success = false;
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot create snapshot for " + snapshotRoot);
       snapshotPath = FSDirSnapshotOp.createSnapshot(dir,
           snapshotManager, snapshotRoot, snapshotName, logRetryCache);
+      success = true;
+    } catch (AccessControlException ace) {
+      logAuditEvent(success, "createSnapshot", snapshotRoot,
+          snapshotPath, null);
+      throw ace;
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    logAuditEvent(snapshotPath != null, "createSnapshot", snapshotRoot,
+    logAuditEvent(success, "createSnapshot", snapshotRoot,
         snapshotPath, null);
     return snapshotPath;
   }
@@ -5802,6 +5819,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       String path, String snapshotOldName, String snapshotNewName,
       boolean logRetryCache) throws IOException {
     boolean success = false;
+    String oldSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotOldName);
+    String newSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotNewName);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -5809,12 +5828,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       FSDirSnapshotOp.renameSnapshot(dir, snapshotManager, path,
           snapshotOldName, snapshotNewName, logRetryCache);
       success = true;
+    } catch (AccessControlException ace) {
+      logAuditEvent(success, "renameSnapshot", oldSnapshotRoot,
+          newSnapshotRoot, null);
+      throw ace;
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    String oldSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotOldName);
-    String newSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotNewName);
     logAuditEvent(success, "renameSnapshot", oldSnapshotRoot,
         newSnapshotRoot, null);
   }
@@ -5836,6 +5857,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkOperation(OperationCategory.READ);
       status = FSDirSnapshotOp.getSnapshottableDirListing(dir, snapshotManager);
       success = true;
+    } catch (AccessControlException ace) {
+      logAuditEvent(success, "listSnapshottableDirectory", null, null, null);
+      throw ace;
     } finally {
       readUnlock();
     }
@@ -5862,19 +5886,25 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       String fromSnapshot, String toSnapshot) throws IOException {
     SnapshotDiffReport diffs = null;
     checkOperation(OperationCategory.READ);
+    boolean success = false;
+    String fromSnapshotRoot = (fromSnapshot == null || fromSnapshot.isEmpty()) ?
+        path : Snapshot.getSnapshotPath(path, fromSnapshot);
+    String toSnapshotRoot = (toSnapshot == null || toSnapshot.isEmpty()) ?
+        path : Snapshot.getSnapshotPath(path, toSnapshot);
     readLock();
     try {
       checkOperation(OperationCategory.READ);
       diffs = FSDirSnapshotOp.getSnapshotDiffReport(dir, snapshotManager,
           path, fromSnapshot, toSnapshot);
+      success = true;
+    } catch (AccessControlException ace) {
+      logAuditEvent(success, "computeSnapshotDiff", fromSnapshotRoot,
+          toSnapshotRoot, null);
+      throw ace;
     } finally {
       readUnlock();
     }
-    String fromSnapshotRoot = (fromSnapshot == null || fromSnapshot.isEmpty()) ?
-        path : Snapshot.getSnapshotPath(path, fromSnapshot);
-    String toSnapshotRoot = (toSnapshot == null || toSnapshot.isEmpty()) ?
-        path : Snapshot.getSnapshotPath(path, toSnapshot);
-    logAuditEvent(diffs != null, "computeSnapshotDiff", fromSnapshotRoot,
+    logAuditEvent(success, "computeSnapshotDiff", fromSnapshotRoot,
         toSnapshotRoot, null);
     return diffs;
   }
@@ -5889,15 +5919,19 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   void deleteSnapshot(String snapshotRoot, String snapshotName,
       boolean logRetryCache) throws IOException {
     boolean success = false;
+    String rootPath = null;
     writeLock();
     BlocksMapUpdateInfo blocksToBeDeleted = null;
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot delete snapshot for " + snapshotRoot);
-
+      rootPath = Snapshot.getSnapshotPath(snapshotRoot, snapshotName);
       blocksToBeDeleted = FSDirSnapshotOp.deleteSnapshot(dir, snapshotManager,
           snapshotRoot, snapshotName, logRetryCache);
       success = true;
+    } catch (AccessControlException ace) {
+      logAuditEvent(success, "deleteSnapshot", rootPath, null, null);
+      throw ace;
     } finally {
       writeUnlock();
     }
@@ -5908,8 +5942,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     if (blocksToBeDeleted != null) {
       removeBlocks(blocksToBeDeleted);
     }
-
-    String rootPath = Snapshot.getSnapshotPath(snapshotRoot, snapshotName);
     logAuditEvent(success, "deleteSnapshot", rootPath, null, null);
   }
 
@@ -5967,9 +5999,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
 
     getEditLog().logSync();
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      logAuditEvent(true, "startRollingUpgrade", null, null, null);
-    }
+    logAuditEvent(true, "startRollingUpgrade", null, null, null);
     return rollingUpgradeInfo;
   }
 
@@ -6160,10 +6190,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       // Sync not needed for ha since the edit was rolled after logging.
       getEditLog().logSync();
     }
-
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      logAuditEvent(true, "finalizeRollingUpgrade", null, null, null);
-    }
+    logAuditEvent(true, "finalizeRollingUpgrade", null, null, null);
     return rollingUpgradeInfo;
   }
 
@@ -6176,6 +6203,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
                          EnumSet<CacheFlag> flags, boolean logRetryCache)
       throws IOException {
     CacheDirectiveInfo effectiveDirective = null;
+    boolean success = false;
+    String effectiveDirectiveStr;
     if (!flags.contains(CacheFlag.FORCE)) {
       cacheManager.waitForRescanIfNeeded();
     }
@@ -6185,24 +6214,27 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkNameNodeSafeMode("Cannot add cache directive");
       effectiveDirective = FSNDNCacheOp.addCacheDirective(this, cacheManager,
           directive, flags, logRetryCache);
+      success = true;
+    } catch (AccessControlException ace) {
+      logAuditEvent(success, "addCacheDirective", null,
+          null, null);
+      throw ace;
     } finally {
       writeUnlock();
-      boolean success = effectiveDirective != null;
       if (success) {
         getEditLog().logSync();
       }
-
-      String effectiveDirectiveStr = effectiveDirective != null ?
-          effectiveDirective.toString() : null;
-      logAuditEvent(success, "addCacheDirective", effectiveDirectiveStr,
-          null, null);
     }
-    return effectiveDirective != null ? effectiveDirective.getId() : 0;
+    effectiveDirectiveStr = effectiveDirective.toString();
+    logAuditEvent(success, "addCacheDirective", effectiveDirectiveStr,
+        null, null);
+    return effectiveDirective.getId();
   }
 
   void modifyCacheDirective(CacheDirectiveInfo directive,
       EnumSet<CacheFlag> flags, boolean logRetryCache) throws IOException {
     boolean success = false;
+    final String idStr = "{id: " + directive.getId() + "}";
     if (!flags.contains(CacheFlag.FORCE)) {
       cacheManager.waitForRescanIfNeeded();
     }
@@ -6213,31 +6245,36 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       FSNDNCacheOp.modifyCacheDirective(this, cacheManager, directive, flags,
           logRetryCache);
       success = true;
+    } catch (AccessControlException ace) {
+      logAuditEvent(success, "modifyCacheDirective", idStr,
+          directive.toString(), null);
+      throw ace;
     } finally {
       writeUnlock();
       if (success) {
         getEditLog().logSync();
       }
-      final String idStr = "{id: " + directive.getId() + "}";
-      logAuditEvent(success, "modifyCacheDirective", idStr,
-          directive.toString(), null);
     }
+    logAuditEvent(success, "modifyCacheDirective", idStr,
+        directive.toString(), null);
   }
 
   void removeCacheDirective(long id, boolean logRetryCache) throws IOException {
     boolean success = false;
+    String idStr = "{id: " + Long.toString(id) + "}";
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot remove cache directives");
       FSNDNCacheOp.removeCacheDirective(this, cacheManager, id, logRetryCache);
       success = true;
+    } catch (AccessControlException ace) {
+      logAuditEvent(success, "removeCacheDirective", idStr, null, null);
+      throw ace;
     } finally {
       writeUnlock();
-      String idStr = "{id: " + Long.toString(id) + "}";
-      logAuditEvent(success, "removeCacheDirective", idStr, null,
-          null);
     }
+    logAuditEvent(success, "removeCacheDirective", idStr, null, null);
     getEditLog().logSync();
   }
 
@@ -6253,11 +6290,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       results = FSNDNCacheOp.listCacheDirectives(this, cacheManager, startId,
           filter);
       success = true;
-    } finally {
-      readUnlock();
+    } catch (AccessControlException ace) {
       logAuditEvent(success, "listCacheDirectives", filter.toString(), null,
           null);
+      throw ace;
+    } finally {
+      readUnlock();
     }
+    logAuditEvent(success, "listCacheDirectives", filter.toString(), null,
+        null);
     return results;
   }
 
@@ -6274,11 +6315,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           logRetryCache);
       poolInfoStr = info.toString();
       success = true;
+    } catch (AccessControlException ace) {
+      logAuditEvent(success, "addCachePool", poolInfoStr, null, null);
+      throw ace;
     } finally {
       writeUnlock();
-      logAuditEvent(success, "addCachePool", poolInfoStr, null, null);
     }
-    
+    logAuditEvent(success, "addCachePool", poolInfoStr, null, null);
     getEditLog().logSync();
   }
 
@@ -6286,19 +6329,23 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       throws IOException {
     writeLock();
     boolean success = false;
+    String poolNameStr = "{poolName: " +
+        (req == null ? null : req.getPoolName()) + "}";
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot modify cache pool"
           + (req == null ? null : req.getPoolName()));
       FSNDNCacheOp.modifyCachePool(this, cacheManager, req, logRetryCache);
       success = true;
+    } catch (AccessControlException ace) {
+      logAuditEvent(success, "modifyCachePool", poolNameStr,
+          req == null ? null : req.toString(), null);
+      throw ace;
     } finally {
       writeUnlock();
-      String poolNameStr = "{poolName: " +
-          (req == null ? null : req.getPoolName()) + "}";
-      logAuditEvent(success, "modifyCachePool", poolNameStr,
-                    req == null ? null : req.toString(), null);
     }
+    logAuditEvent(success, "modifyCachePool", poolNameStr,
+        req == null ? null : req.toString(), null);
 
     getEditLog().logSync();
   }
@@ -6307,18 +6354,20 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       throws IOException {
     writeLock();
     boolean success = false;
+    String poolNameStr = "{poolName: " + cachePoolName + "}";
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot modify cache pool" + cachePoolName);
       FSNDNCacheOp.removeCachePool(this, cacheManager, cachePoolName,
           logRetryCache);
       success = true;
+    } catch (AccessControlException ace) {
+      logAuditEvent(success, "removeCachePool", poolNameStr, null, null);
+      throw ace;
     } finally {
       writeUnlock();
-      String poolNameStr = "{poolName: " + cachePoolName + "}";
-      logAuditEvent(success, "removeCachePool", poolNameStr, null, null);
     }
-    
+    logAuditEvent(success, "removeCachePool", poolNameStr, null, null);
     getEditLog().logSync();
   }
 
@@ -6333,10 +6382,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkOperation(OperationCategory.READ);
       results = FSNDNCacheOp.listCachePools(this, cacheManager, prevKey);
       success = true;
+    } catch (AccessControlException ace) {
+      logAuditEvent(success, "listCachePools", null, null, null);
+      throw ace;
     } finally {
       readUnlock();
-      logAuditEvent(success, "listCachePools", null, null, null);
     }
+    logAuditEvent(success, "listCachePools", null, null, null);
     return results;
   }
 
@@ -6434,17 +6486,19 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   AclStatus getAclStatus(String src) throws IOException {
     checkOperation(OperationCategory.READ);
-    boolean success = false;
+    final AclStatus ret;
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      final AclStatus ret = FSDirAclOp.getAclStatus(dir, src);
-      success = true;
-      return ret;
+      ret = FSDirAclOp.getAclStatus(dir, src);
+    } catch(AccessControlException ace) {
+      logAuditEvent(false, "getAclStatus", src);
+      throw ace;
     } finally {
       readUnlock();
-      logAuditEvent(success, "getAclStatus", src);
     }
+    logAuditEvent(true, "getAclStatus", src);
+    return ret;
   }
 
   /**
@@ -6499,6 +6553,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     throws AccessControlException, UnresolvedLinkException, IOException {
     HdfsFileStatus resultingStat = null;
     boolean success = false;
+    EncryptionZone encryptionZone;
     final FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.READ);
     readLock();
@@ -6508,11 +6563,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           .getEZForPath(dir, srcArg, pc);
       success = true;
       resultingStat = ezForPath.getValue();
-      return ezForPath.getKey();
+      encryptionZone = ezForPath.getKey();
+    } catch (AccessControlException ace) {
+      logAuditEvent(success, "getEZForPath", srcArg, null, resultingStat);
+      throw ace;
     } finally {
       readUnlock();
-      logAuditEvent(success, "getEZForPath", srcArg, null, resultingStat);
     }
+    logAuditEvent(success, "getEZForPath", srcArg, null, resultingStat);
+    return encryptionZone;
   }
 
   BatchedListEntries<EncryptionZone> listEncryptionZones(long prevId)
@@ -6556,14 +6615,18 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       resultingStat = FSDirErasureCodingOp.setErasureCodingPolicy(this,
           srcArg, ecPolicy, logRetryCache);
       success = true;
+    } catch (AccessControlException ace) {
+      logAuditEvent(success, "setErasureCodingPolicy", srcArg, null,
+          resultingStat);
+      throw ace;
     } finally {
       writeUnlock();
       if (success) {
         getEditLog().logSync();
       }
-      logAuditEvent(success, "setErasureCodingPolicy", srcArg, null,
-          resultingStat);
     }
+    logAuditEvent(success, "setErasureCodingPolicy", srcArg, null,
+        resultingStat);
   }
 
   /**
@@ -6617,30 +6680,36 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   List<XAttr> getXAttrs(final String src, List<XAttr> xAttrs)
       throws IOException {
     checkOperation(OperationCategory.READ);
+    List<XAttr> fsXattrs;
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      return FSDirXAttrOp.getXAttrs(dir, src, xAttrs);
+      fsXattrs = FSDirXAttrOp.getXAttrs(dir, src, xAttrs);
     } catch (AccessControlException e) {
       logAuditEvent(false, "getXAttrs", src);
       throw e;
     } finally {
       readUnlock();
     }
+    logAuditEvent(true, "getXAttrs", src);
+    return fsXattrs;
   }
 
   List<XAttr> listXAttrs(String src) throws IOException {
     checkOperation(OperationCategory.READ);
+    List<XAttr> fsXattrs;
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      return FSDirXAttrOp.listXAttrs(dir, src);
+      fsXattrs = FSDirXAttrOp.listXAttrs(dir, src);
     } catch (AccessControlException e) {
       logAuditEvent(false, "listXAttrs", src);
       throw e;
     } finally {
       readUnlock();
     }
+    logAuditEvent(true, "listXAttrs", src);
+    return fsXattrs;
   }
 
   void removeXAttr(String src, XAttr xAttr, boolean logRetryCache)

+ 584 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLoggerWithCommands.java

@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BatchedRemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
+import java.io.IOException;
+
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.doThrow;
+
+public class TestAuditLoggerWithCommands {
+
+  static final int NUM_DATA_NODES = 2;
+  static final long seed = 0xDEADBEEFL;
+  static final int blockSize = 8192;
+  private static MiniDFSCluster cluster = null;
+  private static FileSystem fileSys = null;
+  private static FileSystem fs2 = null;
+  private static FileSystem fs = null;
+  private static LogCapturer auditlog;
+  static Configuration conf;
+  static UserGroupInformation user1;
+  static UserGroupInformation user2;
+  private static NamenodeProtocols proto;
+
+  @BeforeClass
+  public static void initialize() throws Exception {
+    // start a cluster
+    conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY,true);
+    cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
+    cluster.waitActive();
+    user1 =
+        UserGroupInformation.createUserForTesting("theDoctor",
+            new String[]{"tardis"});
+    user2 =
+        UserGroupInformation.createUserForTesting("theEngineer",
+            new String[]{"hadoop"});
+    auditlog = LogCapturer.captureLogs(FSNamesystem.auditLog);
+    proto = cluster.getNameNodeRpc();
+    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
+    fs2 = DFSTestUtil.getFileSystemAs(user2, conf);
+    fs = cluster.getFileSystem();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    fs.close();
+    fs2.close();
+    fileSys.close();
+    cluster.shutdown();
+  }
+
+  @Test
+  public void testGetContentSummary() throws IOException {
+    Path dir1 = new Path("/dir1");
+    Path dir2 = new Path("/dir2");
+    String acePattern =
+        ".*allowed=false.*ugi=theEngineer.*cmd=contentSummary.*";
+    fs.mkdirs(dir1,new FsPermission((short)0600));
+    fs.mkdirs(dir2,new FsPermission((short)0600));
+    fs.setOwner(dir1, user1.getUserName(), user1.getPrimaryGroupName());
+    fs.setOwner(dir2, user2.getUserName(), user2.getPrimaryGroupName());
+    try {
+      fs2.getContentSummary(new Path("/"));
+      fail("The operation should have failed with AccessControlException");
+    } catch (AccessControlException ace) {
+    }
+    int length = verifyAuditLogs(acePattern);
+    try {
+      fs2.getContentSummary(new Path("/dir3"));
+      fail("The operation should have failed with IOException");
+    } catch (IOException e) {
+    }
+    assertTrue("Unexpected log from getContentSummary",
+        length == auditlog.getOutput().split("\n").length);
+  }
+
+  @Test
+  public void testSetQuota() throws Exception {
+    Path path = new Path("/testdir/testdir1");
+    fs.mkdirs(path);
+    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
+    try {
+      ((DistributedFileSystem)fileSys).setQuota(path, 10l, 10l);
+      fail("The operation should have failed with AccessControlException");
+    } catch (AccessControlException ace) {
+    }
+    String acePattern =
+        ".*allowed=false.*ugi=theDoctor.*cmd=setQuota.*";
+    int length = verifyAuditLogs(acePattern);
+    fileSys.close();
+    try {
+      ((DistributedFileSystem)fileSys).setQuota(path, 10l, 10l);
+      fail("The operation should have failed with IOException");
+    } catch (IOException ace) {
+    }
+    assertTrue("Unexpected log from getContentSummary",
+        length == auditlog.getOutput().split("\n").length);
+  }
+
+  @Test
+  public void testConcat() throws Exception {
+    Path file1 = new Path("/file1");
+    Path file2 = new Path("/file2");
+    Path targetDir = new Path("/target");
+    String acePattern =
+        ".*allowed=false.*ugi=theDoctor.*cmd=concat.*";
+    fs.createNewFile(file1);
+    fs.createNewFile(file2);
+    fs.mkdirs(targetDir);
+    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
+    try {
+      fileSys.concat(targetDir, new Path[]{file1, file2});
+      fail("The operation should have failed with AccessControlException");
+    } catch (AccessControlException ace) {
+    }
+    int length = verifyAuditLogs(acePattern);
+    fileSys.close();
+    try {
+      fileSys.concat(targetDir, new Path[]{file1, file2});
+      fail("The operation should have failed with IOException");
+    } catch (IOException e) {
+    }
+    assertTrue("Unexpected log from Concat",
+        length == auditlog.getOutput().split("\n").length);
+  }
+
+  @Test
+  public void testCreateRenameSnapShot() throws Exception {
+    Path srcDir = new Path("/src");
+    String aceCreatePattern =
+        ".*allowed=false.*ugi=theDoctor.*cmd=createSnapshot.*";
+    String aceRenamePattern =
+        ".*allowed=false.*ugi=theDoctor.*cmd=renameSnapshot.*";
+    fs.mkdirs(srcDir);
+    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
+    cluster.getNamesystem().allowSnapshot(srcDir.toString());
+    try {
+      fileSys.createSnapshot(srcDir);
+      fail("The operation should have failed with AccessControlException");
+    } catch (AccessControlException ace) {
+    }
+    verifyAuditLogs(aceCreatePattern);
+    try {
+      Path s1 = fs.createSnapshot(srcDir);
+      fileSys.renameSnapshot(srcDir, s1.getName(), "test");
+      fail("The operation should have failed with AccessControlException");
+    } catch (AccessControlException ace) {
+    }
+    int length = auditlog.getOutput().split("\n").length;
+    verifyAuditLogs(aceRenamePattern);
+    try {
+      fs.createSnapshot(new Path("/test1"));
+      fail("The operation should have failed with IOException");
+    } catch (IOException e) {
+    }
+    try {
+      fs.renameSnapshot(new Path("/test1"), "abc", "test2");
+      fail("The operation should have failed with IOException");
+    } catch (IOException e) {
+    }
+    assertTrue("Unexpected log!",
+        length == auditlog.getOutput().split("\n").length);
+  }
+
+  @Test
+  public void testDeleteSnapshot() throws Exception {
+    Path srcDir = new Path("/src");
+    Path s1;
+    fs.mkdirs(srcDir);
+    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
+    cluster.getNamesystem().allowSnapshot(srcDir.toString());
+    try {
+      s1 = fs.createSnapshot(srcDir);
+      fileSys.deleteSnapshot(srcDir, s1.getName());
+      fail("The operation should have failed with AccessControlException");
+    } catch (AccessControlException ace) {
+    }
+    String aceDeletePattern =
+        ".*allowed=false.*ugi=theDoctor.*cmd=deleteSnapshot.*";
+    int length = verifyAuditLogs(aceDeletePattern);
+    fileSys.close();
+    try {
+      s1 = fs.createSnapshot(srcDir);
+      fileSys.deleteSnapshot(srcDir, s1.getName());
+      fail("The operation should have failed with IOException");
+    } catch (IOException e) {
+    }
+    assertTrue("Unexpected log!",
+        length+1 == auditlog.getOutput().split("\n").length);
+  }
+
+  @Test
+  public void testAddCacheDirective() throws Exception {
+    removeExistingCachePools(null);
+    proto.addCachePool(new CachePoolInfo("pool1").
+        setMode(new FsPermission((short) 0)));
+    CacheDirectiveInfo alpha = new CacheDirectiveInfo.Builder().
+        setPath(new Path("/alpha")).
+        setPool("pool1").
+        build();
+    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
+    try {
+      ((DistributedFileSystem)fileSys).addCacheDirective(alpha);
+      fail("The operation should have failed with AccessControlException");
+    } catch (AccessControlException ace) {
+    }
+    String aceAddCachePattern =
+        ".*allowed=false.*ugi=theDoctor.*cmd=addCache.*";
+    int length = verifyAuditLogs(aceAddCachePattern);
+    try {
+      fileSys.close();
+      ((DistributedFileSystem)fileSys).addCacheDirective(alpha);
+      fail("The operation should have failed with IOException");
+    } catch (IOException e) {
+    }
+    assertTrue("Unexpected log!",
+        length == auditlog.getOutput().split("\n").length);
+  }
+
+  @Test
+  public void testModifyCacheDirective() throws Exception {
+    removeExistingCachePools(null);
+    proto.addCachePool(new CachePoolInfo("pool1").
+        setMode(new FsPermission((short) 0)));
+    CacheDirectiveInfo alpha = new CacheDirectiveInfo.Builder().
+        setPath(new Path("/alpha")).
+        setPool("pool1").
+        build();
+    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
+    Long id =
+        ((DistributedFileSystem)fs).addCacheDirective(alpha);
+    try {
+      ((DistributedFileSystem)fileSys).modifyCacheDirective(
+          new CacheDirectiveInfo.Builder().
+              setId(id).
+              setReplication((short) 1).
+              build());
+      fail("The operation should have failed with AccessControlException");
+    } catch (AccessControlException ace) {
+    }
+    String aceModifyCachePattern =
+        ".*allowed=false.*ugi=theDoctor.*cmd=modifyCache.*";
+    verifyAuditLogs(aceModifyCachePattern);
+    fileSys.close();
+    try {
+      ((DistributedFileSystem)fileSys).modifyCacheDirective(
+          new CacheDirectiveInfo.Builder().
+              setId(id).
+              setReplication((short) 1).
+              build());
+      fail("The operation should have failed with IOException");
+    } catch (IOException e) {
+    }
+  }
+
+  @Test
+  public void testRemoveCacheDirective() throws Exception {
+    removeExistingCachePools(null);
+    proto.addCachePool(new CachePoolInfo("pool1").
+        setMode(new FsPermission((short) 0)));
+    CacheDirectiveInfo alpha = new CacheDirectiveInfo.Builder().
+        setPath(new Path("/alpha")).
+        setPool("pool1").
+        build();
+    String aceRemoveCachePattern =
+        ".*allowed=false.*ugi=theDoctor.*cmd=removeCache.*";
+    int length = -1;
+        Long id =((DistributedFileSystem)fs).addCacheDirective(alpha);
+    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
+    try {
+      ((DistributedFileSystem) fileSys).removeCacheDirective(id);
+      fail("It should have failed with an AccessControlException");
+    } catch (AccessControlException ace) {
+      length = verifyAuditLogs(aceRemoveCachePattern);
+    }
+    try {
+      fileSys.close();
+      ((DistributedFileSystem)fileSys).removeCacheDirective(id);
+      fail("The operation should have failed with IOException");
+    } catch (IOException e) {
+    }
+    assertTrue("Unexpected log!",
+        length == auditlog.getOutput().split("\n").length);
+  }
+
+  @Test
+  public void testGetSnapshotDiffReport() throws Exception {
+    Path snapshotDirPath = new Path("/test");
+    fs.mkdirs(snapshotDirPath, new FsPermission((short) 0));
+    cluster.getNamesystem().allowSnapshot(snapshotDirPath.toString());
+    Path s1 = fs.createSnapshot(snapshotDirPath);
+    Path s2 = fs.createSnapshot(snapshotDirPath);
+    int length;
+    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
+    try {
+      ((DistributedFileSystem) fileSys).getSnapshotDiffReport(snapshotDirPath,
+          s1.getName(), s2.getName());
+      fail("The operation should have failed with AccessControlException");
+    } catch (AccessControlException ace) {
+    }
+    String aceSnapshotDiffPattern =
+        ".*allowed=false.*ugi=theDoctor.*cmd=computeSnapshotDiff.*";
+    length = verifyAuditLogs(aceSnapshotDiffPattern);
+    try {
+      fileSys.close();
+      ((DistributedFileSystem) fileSys).getSnapshotDiffReport(snapshotDirPath,
+          s1.getName(), s2.getName());
+      fail("The operation should have failed with IOException");
+    } catch (IOException e) {
+    }
+    assertTrue("Unexpected log!",
+        length == auditlog.getOutput().split("\n").length);
+  }
+
+  @Test
+  public void testGetQuotaUsage() throws Exception {
+    Path path = new Path("/test");
+    fs.mkdirs(path, new FsPermission((short) 0));
+    String aceGetQuotaUsagePattern =
+        ".*allowed=false.*ugi=theDoctor.*cmd=quotaUsage.*";
+    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
+    try {
+      fileSys.getQuotaUsage(path);
+      fail("The operation should have failed with AccessControlException");
+    } catch (AccessControlException ace) {
+    }
+    int length = verifyAuditLogs(aceGetQuotaUsagePattern);
+    fileSys.close();
+    try {
+      fileSys.getQuotaUsage(path);
+      fail("The operation should have failed with IOException");
+    } catch (IOException e) {
+    }
+    assertTrue("Unexpected log!",
+        length == auditlog.getOutput().split("\n").length);
+  }
+
+  @Test
+  public void testAddCachePool() throws Exception {
+    removeExistingCachePools(null);
+    CachePoolInfo cacheInfo = new CachePoolInfo("pool1").
+        setMode(new FsPermission((short) 0));
+    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
+    try {
+      ((DistributedFileSystem) fileSys).addCachePool(cacheInfo);
+      fail("The operation should have failed with AccessControlException");
+    } catch (AccessControlException ace) {
+    }
+    String aceAddCachePoolPattern =
+        ".*allowed=false.*ugi=theDoctor.*cmd=addCachePool.*";
+    int length = verifyAuditLogs(aceAddCachePoolPattern);
+    try {
+      fileSys.close();
+      ((DistributedFileSystem) fileSys).addCachePool(cacheInfo);
+      fail("The operation should have failed with IOException");
+    } catch (IOException e) {
+    }
+    assertTrue("Unexpected log!",
+        length == auditlog.getOutput().split("\n").length);
+  }
+
+  @Test
+  public void testModifyCachePool() throws Exception {
+    removeExistingCachePools(null);
+    CachePoolInfo cacheInfo = new CachePoolInfo("pool1").
+        setMode(new FsPermission((short) 0));
+      ((DistributedFileSystem) fs).addCachePool(cacheInfo);
+    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
+    try {
+      ((DistributedFileSystem) fileSys).modifyCachePool(cacheInfo);
+      fail("The operation should have failed with AccessControlException");
+    } catch (AccessControlException ace) {
+    }
+    String aceModifyCachePoolPattern =
+        ".*allowed=false.*ugi=theDoctor.*cmd=modifyCachePool.*";
+    int length = verifyAuditLogs(aceModifyCachePoolPattern);
+    try {
+      fileSys.close();
+      ((DistributedFileSystem) fileSys).modifyCachePool(cacheInfo);
+      fail("The operation should have failed with IOException");
+    } catch (IOException e) {
+    }
+    assertTrue("Unexpected log!",
+        length == auditlog.getOutput().split("\n").length);
+  }
+
+  @Test
+  public void testRemoveCachePool() throws Exception {
+    removeExistingCachePools(null);
+    CachePoolInfo cacheInfo = new CachePoolInfo("pool1").
+        setMode(new FsPermission((short) 0));
+    ((DistributedFileSystem) fs).addCachePool(cacheInfo);
+    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
+    try {
+      ((DistributedFileSystem) fileSys).removeCachePool("pool1");
+      fail("The operation should have failed with AccessControlException");
+    } catch (AccessControlException ace) {
+    }
+    String aceRemoveCachePoolPattern =
+        ".*allowed=false.*ugi=theDoctor.*cmd=removeCachePool.*";
+    int length = verifyAuditLogs(aceRemoveCachePoolPattern);
+    assertTrue("Unexpected log!",
+        length == auditlog.getOutput().split("\n").length);
+    try {
+      fileSys.close();
+      ((DistributedFileSystem) fileSys).removeCachePool("pool1");
+      fail("The operation should have failed with IOException");
+    } catch (IOException e) {
+    }
+    assertTrue("Unexpected log!",
+        length == auditlog.getOutput().split("\n").length);
+  }
+
+  @Test
+  public void testGetEZForPath() throws Exception {
+    Path path = new Path("/test");
+    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
+    fs.mkdirs(path,new FsPermission((short)0));
+    String aceGetEzForPathPattern =
+        ".*allowed=false.*ugi=theDoctor.*cmd=getEZForPath.*";
+    try {
+      ((DistributedFileSystem) fileSys).getEZForPath(path);
+      fail("The operation should have failed with AccessControlException");
+    } catch (AccessControlException ace){
+    }
+    int length = verifyAuditLogs(aceGetEzForPathPattern);
+    fileSys.close();
+    try {
+      ((DistributedFileSystem) fileSys).getEZForPath(path);
+      fail("The operation should have failed with IOException");
+    } catch (IOException e){
+    }
+    assertTrue("Unexpected log!",
+        length == auditlog.getOutput().split("\n").length);
+  }
+
+  @Test
+  public void testRenameTo() throws Exception {
+    Path path = new Path("/test");
+    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
+    fs.mkdirs(path,new FsPermission((short)0));
+    String aceRenameToPattern =
+        ".*allowed=false.*ugi=theDoctor.*cmd=rename.*";
+    try {
+      fileSys.rename(path, path);
+      fail("The operation should have failed with AccessControlException");
+    } catch (AccessControlException ace) {
+    }
+    int length = verifyAuditLogs(aceRenameToPattern);
+    fileSys.close();
+    try {
+      fileSys.rename(path, path);
+      fail("The operation should have failed with IOException");
+    } catch (IOException e) {
+    }
+    assertTrue("Unexpected log!",
+        length == auditlog.getOutput().split("\n").length);
+  }
+
+  @Test
+  public void testGetXattrs() throws Exception {
+    Path path = new Path("/test");
+    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
+    fs.mkdirs(path,new FsPermission((short)0));
+    String aceGetXattrsPattern =
+        ".*allowed=false.*ugi=theDoctor.*cmd=getXAttrs.*";
+    try {
+      fileSys.getXAttrs(path);
+      fail("The operation should have failed with AccessControlException");
+    } catch (AccessControlException ace) {
+    }
+    int length = verifyAuditLogs(aceGetXattrsPattern);
+    fileSys.close();
+    try {
+      fileSys.getXAttrs(path);
+      fail("The operation should have failed with IOException");
+    } catch (IOException e) {
+    }
+    assertTrue("Unexpected log!",
+        length == auditlog.getOutput().split("\n").length);
+  }
+
+  @Test
+  public void testListXattrs() throws Exception {
+    Path path = new Path("/test");
+    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
+    fs.mkdirs(path);
+    fs.setOwner(path,user1.getUserName(),user1.getPrimaryGroupName());
+    String aceListXattrsPattern =
+    ".*allowed=true.*ugi=theDoctor.*cmd=listXAttrs.*";
+    fileSys.listXAttrs(path);
+    verifyAuditLogs(aceListXattrsPattern);
+  }
+
+  @Test
+  public void testGetAclStatus() throws Exception {
+    Path path = new Path("/test");
+    fs.mkdirs(path);
+    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
+    fs.setOwner(path, user1.getUserName(), user1.getPrimaryGroupName());
+    final FSDirectory dir = cluster.getNamesystem().getFSDirectory();
+    final FSDirectory mockedDir = Mockito.spy(dir);
+    AccessControlException ex = new AccessControlException();
+    doThrow(ex).when(mockedDir).getPermissionChecker();
+    cluster.getNamesystem().setFSDirectory(mockedDir);
+    String aceGetAclStatus =
+        ".*allowed=false.*ugi=theDoctor.*cmd=getAclStatus.*";
+    try {
+      fileSys.getAclStatus(path);
+      fail("The operation should have failed with AccessControlException");
+    } catch (AccessControlException ace) {
+    }
+    int length = verifyAuditLogs(aceGetAclStatus);
+    fileSys.close();
+    try {
+      fileSys.getAclStatus(path);
+      verifyAuditLogs(aceGetAclStatus);
+      fail("The operation should have failed with IOException");
+    } catch (IOException ace) {
+    }
+    assertTrue("Unexpected log!",
+        length == auditlog.getOutput().split("\n").length);
+    cluster.getNamesystem().setFSDirectory(dir);
+  }
+
+  private int verifyAuditLogs(String pattern) {
+    int length = auditlog.getOutput().split("\n").length;
+    String lastAudit = auditlog.getOutput().split("\n")[length - 1];
+    assertTrue("Unexpected log!", lastAudit.matches(pattern));
+    return length;
+  }
+
+  private void removeExistingCachePools(String prevPool) throws Exception {
+    BatchedRemoteIterator.BatchedEntries<CachePoolEntry> entries =
+        proto.listCachePools(prevPool);
+    for(int i =0;i < entries.size();i++) {
+      proto.removeCachePool(entries.get(i).getInfo().getPoolName());
+    }
+  }
+}
+