Ver código fonte

Revert "HDFS-9395. Make HDFS audit logging consistant. Contributed by Kuhu Shukla."

Reverting this on branch-2.* as it's an incompatible change.

This reverts commit 83f7f62be379045ad6933689b21b76c7086f919d.

(cherry picked from commit 2486c4c63a35fcef7338ea63f0d8aafa778cd05d)
Vinod Kumar Vavilapalli (I am also known as @tshooter.) 8 anos atrás
pai
commit
d65024eddc

+ 60 - 125
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -1855,16 +1855,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkNameNodeSafeMode("Cannot concat " + target);
       stat = FSDirConcatOp.concat(dir, target, srcs, logRetryCache);
       success = true;
-    } catch (AccessControlException ace) {
-      logAuditEvent(success, "concat", Arrays.toString(srcs), target, stat);
-      throw ace;
     } finally {
       writeUnlock();
       if (success) {
         getEditLog().logSync();
       }
+      logAuditEvent(success, "concat", Arrays.toString(srcs), target, stat);
     }
-    logAuditEvent(success, "concat", Arrays.toString(srcs), target, stat);
   }
 
   /**
@@ -2740,8 +2737,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     boolean success = ret != null && ret.success;
     if (success) {
       getEditLog().logSync();
-      logAuditEvent(success, "rename", src, dst, ret.auditStat);
     }
+    logAuditEvent(success, "rename", src, dst,
+        ret == null ? null : ret.auditStat);
     return success;
   }
 
@@ -3005,19 +3003,16 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     checkOperation(OperationCategory.READ);
     readLock();
     boolean success = true;
-    ContentSummary cs;
     try {
       checkOperation(OperationCategory.READ);
-      cs = FSDirStatAndListingOp.getContentSummary(dir, src);
+      return FSDirStatAndListingOp.getContentSummary(dir, src);
     } catch (AccessControlException ace) {
       success = false;
-      logAuditEvent(success, "contentSummary", src);
       throw ace;
     } finally {
       readUnlock();
+      logAuditEvent(success, "contentSummary", src);
     }
-    logAuditEvent(success, "contentSummary", src);
-    return cs;
   }
 
   /**
@@ -3036,21 +3031,18 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    */
   QuotaUsage getQuotaUsage(final String src) throws IOException {
     checkOperation(OperationCategory.READ);
-    QuotaUsage quotaUsage;
     readLock();
     boolean success = true;
     try {
       checkOperation(OperationCategory.READ);
-      quotaUsage = FSDirStatAndListingOp.getQuotaUsage(dir, src);
+      return FSDirStatAndListingOp.getQuotaUsage(dir, src);
     } catch (AccessControlException ace) {
       success = false;
-      logAuditEvent(success, "quotaUsage", src);
       throw ace;
     } finally {
       readUnlock();
+      logAuditEvent(success, "quotaUsage", src);
     }
-    logAuditEvent(success, "quotaUsage", src);
-    return quotaUsage;
   }
 
   /**
@@ -3073,16 +3065,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkNameNodeSafeMode("Cannot set quota on " + src);
       FSDirAttrOp.setQuota(dir, src, nsQuota, ssQuota, type);
       success = true;
-    } catch (AccessControlException ace) {
-      logAuditEvent(success, "setQuota", src);
-      throw ace;
     } finally {
       writeUnlock();
       if (success) {
         getEditLog().logSync();
       }
+      logAuditEvent(success, "setQuota", src);
     }
-    logAuditEvent(success, "setQuota", src);
   }
 
   /** Persist all metadata about this file.
@@ -6614,23 +6603,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   String createSnapshot(String snapshotRoot, String snapshotName,
                         boolean logRetryCache) throws IOException {
     String snapshotPath = null;
-    boolean success = false;
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot create snapshot for " + snapshotRoot);
       snapshotPath = FSDirSnapshotOp.createSnapshot(dir,
           snapshotManager, snapshotRoot, snapshotName, logRetryCache);
-      success = true;
-    } catch (AccessControlException ace) {
-      logAuditEvent(success, "createSnapshot", snapshotRoot,
-          snapshotPath, null);
-      throw ace;
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    logAuditEvent(success, "createSnapshot", snapshotRoot,
+    logAuditEvent(snapshotPath != null, "createSnapshot", snapshotRoot,
         snapshotPath, null);
     return snapshotPath;
   }
@@ -6647,8 +6630,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       String path, String snapshotOldName, String snapshotNewName,
       boolean logRetryCache) throws IOException {
     boolean success = false;
-    String oldSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotOldName);
-    String newSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotNewName);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -6656,14 +6637,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       FSDirSnapshotOp.renameSnapshot(dir, snapshotManager, path,
           snapshotOldName, snapshotNewName, logRetryCache);
       success = true;
-    } catch (AccessControlException ace) {
-      logAuditEvent(success, "renameSnapshot", oldSnapshotRoot,
-          newSnapshotRoot, null);
-      throw ace;
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
+    String oldSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotOldName);
+    String newSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotNewName);
     logAuditEvent(success, "renameSnapshot", oldSnapshotRoot,
         newSnapshotRoot, null);
   }
@@ -6685,9 +6664,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkOperation(OperationCategory.READ);
       status = FSDirSnapshotOp.getSnapshottableDirListing(dir, snapshotManager);
       success = true;
-    } catch (AccessControlException ace) {
-      logAuditEvent(success, "listSnapshottableDirectory", null, null, null);
-      throw ace;
     } finally {
       readUnlock();
     }
@@ -6714,25 +6690,19 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       String fromSnapshot, String toSnapshot) throws IOException {
     SnapshotDiffReport diffs = null;
     checkOperation(OperationCategory.READ);
-    boolean success = false;
-    String fromSnapshotRoot = (fromSnapshot == null || fromSnapshot.isEmpty()) ?
-        path : Snapshot.getSnapshotPath(path, fromSnapshot);
-    String toSnapshotRoot = (toSnapshot == null || toSnapshot.isEmpty()) ?
-        path : Snapshot.getSnapshotPath(path, toSnapshot);
     readLock();
     try {
       checkOperation(OperationCategory.READ);
       diffs = FSDirSnapshotOp.getSnapshotDiffReport(dir, snapshotManager,
           path, fromSnapshot, toSnapshot);
-      success = true;
-    } catch (AccessControlException ace) {
-      logAuditEvent(success, "computeSnapshotDiff", fromSnapshotRoot,
-          toSnapshotRoot, null);
-      throw ace;
     } finally {
       readUnlock();
     }
-    logAuditEvent(success, "computeSnapshotDiff", fromSnapshotRoot,
+    String fromSnapshotRoot = (fromSnapshot == null || fromSnapshot.isEmpty()) ?
+        path : Snapshot.getSnapshotPath(path, fromSnapshot);
+    String toSnapshotRoot = (toSnapshot == null || toSnapshot.isEmpty()) ?
+        path : Snapshot.getSnapshotPath(path, toSnapshot);
+    logAuditEvent(diffs != null, "computeSnapshotDiff", fromSnapshotRoot,
         toSnapshotRoot, null);
     return diffs;
   }
@@ -6747,19 +6717,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   void deleteSnapshot(String snapshotRoot, String snapshotName,
       boolean logRetryCache) throws IOException {
     boolean success = false;
-    String rootPath = null;
     writeLock();
     BlocksMapUpdateInfo blocksToBeDeleted = null;
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot delete snapshot for " + snapshotRoot);
-      rootPath = Snapshot.getSnapshotPath(snapshotRoot, snapshotName);
+
       blocksToBeDeleted = FSDirSnapshotOp.deleteSnapshot(dir, snapshotManager,
           snapshotRoot, snapshotName, logRetryCache);
       success = true;
-    } catch (AccessControlException ace) {
-      logAuditEvent(success, "deleteSnapshot", rootPath, null, null);
-      throw ace;
     } finally {
       writeUnlock();
     }
@@ -6770,6 +6736,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     if (blocksToBeDeleted != null) {
       removeBlocks(blocksToBeDeleted);
     }
+
+    String rootPath = Snapshot.getSnapshotPath(snapshotRoot, snapshotName);
     logAuditEvent(success, "deleteSnapshot", rootPath, null, null);
   }
 
@@ -6827,7 +6795,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
 
     getEditLog().logSync();
-    logAuditEvent(true, "startRollingUpgrade", null, null, null);
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      logAuditEvent(true, "startRollingUpgrade", null, null, null);
+    }
     return rollingUpgradeInfo;
   }
 
@@ -7018,7 +6988,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       // Sync not needed for ha since the edit was rolled after logging.
       getEditLog().logSync();
     }
-    logAuditEvent(true, "finalizeRollingUpgrade", null, null, null);
+
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      logAuditEvent(true, "finalizeRollingUpgrade", null, null, null);
+    }
     return rollingUpgradeInfo;
   }
 
@@ -7031,8 +7004,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
                          EnumSet<CacheFlag> flags, boolean logRetryCache)
       throws IOException {
     CacheDirectiveInfo effectiveDirective = null;
-    boolean success = false;
-    String effectiveDirectiveStr;
     if (!flags.contains(CacheFlag.FORCE)) {
       cacheManager.waitForRescanIfNeeded();
     }
@@ -7042,27 +7013,24 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkNameNodeSafeMode("Cannot add cache directive");
       effectiveDirective = FSNDNCacheOp.addCacheDirective(this, cacheManager,
           directive, flags, logRetryCache);
-      success = true;
-    } catch (AccessControlException ace) {
-      logAuditEvent(success, "addCacheDirective", null,
-          null, null);
-      throw ace;
     } finally {
       writeUnlock();
+      boolean success = effectiveDirective != null;
       if (success) {
         getEditLog().logSync();
       }
+
+      String effectiveDirectiveStr = effectiveDirective != null ?
+          effectiveDirective.toString() : null;
+      logAuditEvent(success, "addCacheDirective", effectiveDirectiveStr,
+          null, null);
     }
-    effectiveDirectiveStr = effectiveDirective.toString();
-    logAuditEvent(success, "addCacheDirective", effectiveDirectiveStr,
-        null, null);
-    return effectiveDirective.getId();
+    return effectiveDirective != null ? effectiveDirective.getId() : 0;
   }
 
   void modifyCacheDirective(CacheDirectiveInfo directive,
       EnumSet<CacheFlag> flags, boolean logRetryCache) throws IOException {
     boolean success = false;
-    final String idStr = "{id: " + directive.getId() + "}";
     if (!flags.contains(CacheFlag.FORCE)) {
       cacheManager.waitForRescanIfNeeded();
     }
@@ -7073,36 +7041,31 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       FSNDNCacheOp.modifyCacheDirective(this, cacheManager, directive, flags,
           logRetryCache);
       success = true;
-    } catch (AccessControlException ace) {
-      logAuditEvent(success, "modifyCacheDirective", idStr,
-          directive.toString(), null);
-      throw ace;
     } finally {
       writeUnlock();
       if (success) {
         getEditLog().logSync();
       }
+      final String idStr = "{id: " + directive.getId() + "}";
+      logAuditEvent(success, "modifyCacheDirective", idStr,
+          directive.toString(), null);
     }
-    logAuditEvent(success, "modifyCacheDirective", idStr,
-        directive.toString(), null);
   }
 
   void removeCacheDirective(long id, boolean logRetryCache) throws IOException {
     boolean success = false;
-    String idStr = "{id: " + Long.toString(id) + "}";
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot remove cache directives");
       FSNDNCacheOp.removeCacheDirective(this, cacheManager, id, logRetryCache);
       success = true;
-    } catch (AccessControlException ace) {
-      logAuditEvent(success, "removeCacheDirective", idStr, null, null);
-      throw ace;
     } finally {
       writeUnlock();
+      String idStr = "{id: " + Long.toString(id) + "}";
+      logAuditEvent(success, "removeCacheDirective", idStr, null,
+          null);
     }
-    logAuditEvent(success, "removeCacheDirective", idStr, null, null);
     getEditLog().logSync();
   }
 
@@ -7118,15 +7081,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       results = FSNDNCacheOp.listCacheDirectives(this, cacheManager, startId,
           filter);
       success = true;
-    } catch (AccessControlException ace) {
-      logAuditEvent(success, "listCacheDirectives", filter.toString(), null,
-          null);
-      throw ace;
     } finally {
       readUnlock();
+      logAuditEvent(success, "listCacheDirectives", filter.toString(), null,
+          null);
     }
-    logAuditEvent(success, "listCacheDirectives", filter.toString(), null,
-        null);
     return results;
   }
 
@@ -7143,13 +7102,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           logRetryCache);
       poolInfoStr = info.toString();
       success = true;
-    } catch (AccessControlException ace) {
-      logAuditEvent(success, "addCachePool", poolInfoStr, null, null);
-      throw ace;
     } finally {
       writeUnlock();
+      logAuditEvent(success, "addCachePool", poolInfoStr, null, null);
     }
-    logAuditEvent(success, "addCachePool", poolInfoStr, null, null);
+    
     getEditLog().logSync();
   }
 
@@ -7157,23 +7114,19 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       throws IOException {
     writeLock();
     boolean success = false;
-    String poolNameStr = "{poolName: " +
-        (req == null ? null : req.getPoolName()) + "}";
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot modify cache pool"
           + (req == null ? null : req.getPoolName()));
       FSNDNCacheOp.modifyCachePool(this, cacheManager, req, logRetryCache);
       success = true;
-    } catch (AccessControlException ace) {
-      logAuditEvent(success, "modifyCachePool", poolNameStr,
-          req == null ? null : req.toString(), null);
-      throw ace;
     } finally {
       writeUnlock();
+      String poolNameStr = "{poolName: " +
+          (req == null ? null : req.getPoolName()) + "}";
+      logAuditEvent(success, "modifyCachePool", poolNameStr,
+                    req == null ? null : req.toString(), null);
     }
-    logAuditEvent(success, "modifyCachePool", poolNameStr,
-        req == null ? null : req.toString(), null);
 
     getEditLog().logSync();
   }
@@ -7182,20 +7135,18 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       throws IOException {
     writeLock();
     boolean success = false;
-    String poolNameStr = "{poolName: " + cachePoolName + "}";
     try {
       checkOperation(OperationCategory.WRITE);
       checkNameNodeSafeMode("Cannot modify cache pool" + cachePoolName);
       FSNDNCacheOp.removeCachePool(this, cacheManager, cachePoolName,
           logRetryCache);
       success = true;
-    } catch (AccessControlException ace) {
-      logAuditEvent(success, "removeCachePool", poolNameStr, null, null);
-      throw ace;
     } finally {
       writeUnlock();
+      String poolNameStr = "{poolName: " + cachePoolName + "}";
+      logAuditEvent(success, "removeCachePool", poolNameStr, null, null);
     }
-    logAuditEvent(success, "removeCachePool", poolNameStr, null, null);
+    
     getEditLog().logSync();
   }
 
@@ -7210,13 +7161,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       checkOperation(OperationCategory.READ);
       results = FSNDNCacheOp.listCachePools(this, cacheManager, prevKey);
       success = true;
-    } catch (AccessControlException ace) {
-      logAuditEvent(success, "listCachePools", null, null, null);
-      throw ace;
     } finally {
       readUnlock();
+      logAuditEvent(success, "listCachePools", null, null, null);
     }
-    logAuditEvent(success, "listCachePools", null, null, null);
     return results;
   }
 
@@ -7314,19 +7262,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   AclStatus getAclStatus(String src) throws IOException {
     checkOperation(OperationCategory.READ);
-    final AclStatus ret;
+    boolean success = false;
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      ret = FSDirAclOp.getAclStatus(dir, src);
-    } catch(AccessControlException ace) {
-      logAuditEvent(false, "getAclStatus", src);
-      throw ace;
+      final AclStatus ret = FSDirAclOp.getAclStatus(dir, src);
+      success = true;
+      return ret;
     } finally {
       readUnlock();
+      logAuditEvent(success, "getAclStatus", src);
     }
-    logAuditEvent(true, "getAclStatus", src);
-    return ret;
   }
 
   /**
@@ -7381,7 +7327,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     throws AccessControlException, UnresolvedLinkException, IOException {
     HdfsFileStatus resultingStat = null;
     boolean success = false;
-    EncryptionZone encryptionZone;
     final FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.READ);
     readLock();
@@ -7391,15 +7336,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           .getEZForPath(dir, srcArg, pc);
       success = true;
       resultingStat = ezForPath.getValue();
-      encryptionZone = ezForPath.getKey();
-    } catch (AccessControlException ace) {
-      logAuditEvent(success, "getEZForPath", srcArg, null, resultingStat);
-      throw ace;
+      return ezForPath.getKey();
     } finally {
       readUnlock();
+      logAuditEvent(success, "getEZForPath", srcArg, null, resultingStat);
     }
-    logAuditEvent(success, "getEZForPath", srcArg, null, resultingStat);
-    return encryptionZone;
   }
 
   BatchedListEntries<EncryptionZone> listEncryptionZones(long prevId)
@@ -7443,36 +7384,30 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   List<XAttr> getXAttrs(final String src, List<XAttr> xAttrs)
       throws IOException {
     checkOperation(OperationCategory.READ);
-    List<XAttr> fsXattrs;
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      fsXattrs = FSDirXAttrOp.getXAttrs(dir, src, xAttrs);
+      return FSDirXAttrOp.getXAttrs(dir, src, xAttrs);
     } catch (AccessControlException e) {
       logAuditEvent(false, "getXAttrs", src);
       throw e;
     } finally {
       readUnlock();
     }
-    logAuditEvent(true, "getXAttrs", src);
-    return fsXattrs;
   }
 
   List<XAttr> listXAttrs(String src) throws IOException {
     checkOperation(OperationCategory.READ);
-    List<XAttr> fsXattrs;
     readLock();
     try {
       checkOperation(OperationCategory.READ);
-      fsXattrs = FSDirXAttrOp.listXAttrs(dir, src);
+      return FSDirXAttrOp.listXAttrs(dir, src);
     } catch (AccessControlException e) {
       logAuditEvent(false, "listXAttrs", src);
       throw e;
     } finally {
       readUnlock();
     }
-    logAuditEvent(true, "listXAttrs", src);
-    return fsXattrs;
   }
 
   void removeXAttr(String src, XAttr xAttr, boolean logRetryCache)

+ 0 - 585
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLoggerWithCommands.java

@@ -1,585 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.namenode;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BatchedRemoteIterator;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
-import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
-import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
-import java.io.IOException;
-
-import org.junit.BeforeClass;
-import org.junit.AfterClass;
-import org.junit.Ignore;
-import org.junit.Test;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import static org.mockito.Mockito.doThrow;
-
-public class TestAuditLoggerWithCommands {
-
-  static final int NUM_DATA_NODES = 2;
-  static final long seed = 0xDEADBEEFL;
-  static final int blockSize = 8192;
-  private static MiniDFSCluster cluster = null;
-  private static FileSystem fileSys = null;
-  private static FileSystem fs2 = null;
-  private static FileSystem fs = null;
-  private static LogCapturer auditlog;
-  static Configuration conf;
-  static UserGroupInformation user1;
-  static UserGroupInformation user2;
-  private static NamenodeProtocols proto;
-
-  @BeforeClass
-  public static void initialize() throws Exception {
-    // start a cluster
-    conf = new HdfsConfiguration();
-    conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
-    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY,true);
-    cluster =
-        new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
-    cluster.waitActive();
-    user1 =
-        UserGroupInformation.createUserForTesting("theDoctor",
-            new String[]{"tardis"});
-    user2 =
-        UserGroupInformation.createUserForTesting("theEngineer",
-            new String[]{"hadoop"});
-    auditlog = LogCapturer.captureLogs(FSNamesystem.auditLog);
-    proto = cluster.getNameNodeRpc();
-    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
-    fs2 = DFSTestUtil.getFileSystemAs(user2, conf);
-    fs = cluster.getFileSystem();
-  }
-
-  @AfterClass
-  public static void tearDown() throws Exception {
-    fs.close();
-    fs2.close();
-    fileSys.close();
-    cluster.shutdown();
-  }
-
-  @Test
-  public void testGetContentSummary() throws IOException {
-    Path dir1 = new Path("/dir1");
-    Path dir2 = new Path("/dir2");
-    String acePattern =
-        ".*allowed=false.*ugi=theEngineer.*cmd=contentSummary.*";
-    fs.mkdirs(dir1,new FsPermission((short)0600));
-    fs.mkdirs(dir2,new FsPermission((short)0600));
-    fs.setOwner(dir1, user1.getUserName(), user1.getPrimaryGroupName());
-    fs.setOwner(dir2, user2.getUserName(), user2.getPrimaryGroupName());
-    try {
-      fs2.getContentSummary(new Path("/"));
-      fail("The operation should have failed with AccessControlException");
-    } catch (AccessControlException ace) {
-    }
-    int length = verifyAuditLogs(acePattern);
-    try {
-      fs2.getContentSummary(new Path("/dir3"));
-      fail("The operation should have failed with IOException");
-    } catch (IOException e) {
-    }
-    assertTrue("Unexpected log from getContentSummary",
-        length == auditlog.getOutput().split("\n").length);
-  }
-
-  @Test
-  public void testSetQuota() throws Exception {
-    Path path = new Path("/testdir/testdir1");
-    fs.mkdirs(path);
-    try {
-      ((DistributedFileSystem)fileSys).setQuota(path, 10l, 10l);
-      fail("The operation should have failed with AccessControlException");
-    } catch (AccessControlException ace) {
-    }
-    String acePattern =
-        ".*allowed=false.*ugi=theDoctor.*cmd=setQuota.*";
-    verifyAuditLogs(acePattern);
-    String ioePattern =
-        ".*allowed=true.*ugi=" + fs.getFileStatus(path).getOwner().toString() +
-            ".*cmd=delete.*";
-    fs.delete(path, true);
-    try {
-      ((DistributedFileSystem)fs).setQuota(path, 10l, 10l);
-      fail("The operation should have failed with IOException");
-    } catch (IOException ace) {
-    }
-    verifyAuditLogs(ioePattern);
-  }
-
-  @Test
-  public void testConcat() throws Exception {
-    Path file1 = new Path("/file1");
-    Path file2 = new Path("/file2");
-    Path targetDir = new Path("/target");
-    String acePattern =
-        ".*allowed=false.*ugi=theDoctor.*cmd=concat.*";
-    fs.createNewFile(file1);
-    fs.createNewFile(file2);
-    fs.mkdirs(targetDir);
-    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
-    try {
-      fileSys.concat(targetDir, new Path[]{file1, file2});
-      fail("The operation should have failed with AccessControlException");
-    } catch (AccessControlException ace) {
-    }
-    int length = verifyAuditLogs(acePattern);
-    fileSys.close();
-    try {
-      fileSys.concat(targetDir, new Path[]{file1, file2});
-      fail("The operation should have failed with IOException");
-    } catch (IOException e) {
-    }
-    assertTrue("Unexpected log from Concat",
-        length == auditlog.getOutput().split("\n").length);
-  }
-
-  @Test
-  public void testCreateRenameSnapShot() throws Exception {
-    Path srcDir = new Path("/src");
-    String aceCreatePattern =
-        ".*allowed=false.*ugi=theDoctor.*cmd=createSnapshot.*";
-    String aceRenamePattern =
-        ".*allowed=false.*ugi=theDoctor.*cmd=renameSnapshot.*";
-    fs.mkdirs(srcDir);
-    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
-    cluster.getNamesystem().allowSnapshot(srcDir.toString());
-    try {
-      fileSys.createSnapshot(srcDir);
-      fail("The operation should have failed with AccessControlException");
-    } catch (AccessControlException ace) {
-    }
-    verifyAuditLogs(aceCreatePattern);
-    try {
-      Path s1 = fs.createSnapshot(srcDir);
-      fileSys.renameSnapshot(srcDir, s1.getName(), "test");
-      fail("The operation should have failed with AccessControlException");
-    } catch (AccessControlException ace) {
-    }
-    int length = auditlog.getOutput().split("\n").length;
-    verifyAuditLogs(aceRenamePattern);
-    try {
-      fs.createSnapshot(new Path("/test1"));
-      fail("The operation should have failed with IOException");
-    } catch (IOException e) {
-    }
-    try {
-      fs.renameSnapshot(new Path("/test1"), "abc", "test2");
-      fail("The operation should have failed with IOException");
-    } catch (IOException e) {
-    }
-    assertTrue("Unexpected log!",
-        length == auditlog.getOutput().split("\n").length);
-  }
-
-  @Test
-  public void testDeleteSnapshot() throws Exception {
-    Path srcDir = new Path("/src");
-    Path s1;
-    fs.mkdirs(srcDir);
-    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
-    cluster.getNamesystem().allowSnapshot(srcDir.toString());
-    try {
-      s1 = fs.createSnapshot(srcDir);
-      fileSys.deleteSnapshot(srcDir, s1.getName());
-      fail("The operation should have failed with AccessControlException");
-    } catch (AccessControlException ace) {
-    }
-    String aceDeletePattern =
-        ".*allowed=false.*ugi=theDoctor.*cmd=deleteSnapshot.*";
-    int length = verifyAuditLogs(aceDeletePattern);
-    fileSys.close();
-    try {
-      s1 = fs.createSnapshot(srcDir);
-      fileSys.deleteSnapshot(srcDir, s1.getName());
-      fail("The operation should have failed with IOException");
-    } catch (IOException e) {
-    }
-    assertTrue("Unexpected log!",
-        length+1 == auditlog.getOutput().split("\n").length);
-  }
-
-  @Test
-  public void testAddCacheDirective() throws Exception {
-    removeExistingCachePools(null);
-    proto.addCachePool(new CachePoolInfo("pool1").
-        setMode(new FsPermission((short) 0)));
-    CacheDirectiveInfo alpha = new CacheDirectiveInfo.Builder().
-        setPath(new Path("/alpha")).
-        setPool("pool1").
-        build();
-    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
-    try {
-      ((DistributedFileSystem)fileSys).addCacheDirective(alpha);
-      fail("The operation should have failed with AccessControlException");
-    } catch (AccessControlException ace) {
-    }
-    String aceAddCachePattern =
-        ".*allowed=false.*ugi=theDoctor.*cmd=addCache.*";
-    int length = verifyAuditLogs(aceAddCachePattern);
-    try {
-      fileSys.close();
-      ((DistributedFileSystem)fileSys).addCacheDirective(alpha);
-      fail("The operation should have failed with IOException");
-    } catch (IOException e) {
-    }
-    assertTrue("Unexpected log!",
-        length == auditlog.getOutput().split("\n").length);
-  }
-
-  @Test
-  public void testModifyCacheDirective() throws Exception {
-    removeExistingCachePools(null);
-    proto.addCachePool(new CachePoolInfo("pool1").
-        setMode(new FsPermission((short) 0)));
-    CacheDirectiveInfo alpha = new CacheDirectiveInfo.Builder().
-        setPath(new Path("/alpha")).
-        setPool("pool1").
-        build();
-    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
-    Long id =
-        ((DistributedFileSystem)fs).addCacheDirective(alpha);
-    try {
-      ((DistributedFileSystem)fileSys).modifyCacheDirective(
-          new CacheDirectiveInfo.Builder().
-              setId(id).
-              setReplication((short) 1).
-              build());
-      fail("The operation should have failed with AccessControlException");
-    } catch (AccessControlException ace) {
-    }
-    String aceModifyCachePattern =
-        ".*allowed=false.*ugi=theDoctor.*cmd=modifyCache.*";
-    verifyAuditLogs(aceModifyCachePattern);
-    fileSys.close();
-    try {
-      ((DistributedFileSystem)fileSys).modifyCacheDirective(
-          new CacheDirectiveInfo.Builder().
-              setId(id).
-              setReplication((short) 1).
-              build());
-      fail("The operation should have failed with IOException");
-    } catch (IOException e) {
-    }
-  }
-
-  @Test
-  public void testRemoveCacheDirective() throws Exception {
-    removeExistingCachePools(null);
-    proto.addCachePool(new CachePoolInfo("pool1").
-        setMode(new FsPermission((short) 0)));
-    CacheDirectiveInfo alpha = new CacheDirectiveInfo.Builder().
-        setPath(new Path("/alpha")).
-        setPool("pool1").
-        build();
-    String aceRemoveCachePattern =
-        ".*allowed=false.*ugi=theDoctor.*cmd=removeCache.*";
-    int length = -1;
-        Long id =((DistributedFileSystem)fs).addCacheDirective(alpha);
-    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
-    try {
-      ((DistributedFileSystem) fileSys).removeCacheDirective(id);
-      fail("It should have failed with an AccessControlException");
-    } catch (AccessControlException ace) {
-      length = verifyAuditLogs(aceRemoveCachePattern);
-    }
-    try {
-      fileSys.close();
-      ((DistributedFileSystem)fileSys).removeCacheDirective(id);
-      fail("The operation should have failed with IOException");
-    } catch (IOException e) {
-    }
-    assertTrue("Unexpected log!",
-        length == auditlog.getOutput().split("\n").length);
-  }
-
-  @Test
-  public void testGetSnapshotDiffReport() throws Exception {
-    Path snapshotDirPath = new Path("/test");
-    fs.mkdirs(snapshotDirPath, new FsPermission((short) 0));
-    cluster.getNamesystem().allowSnapshot(snapshotDirPath.toString());
-    Path s1 = fs.createSnapshot(snapshotDirPath);
-    Path s2 = fs.createSnapshot(snapshotDirPath);
-    int length;
-    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
-    try {
-      ((DistributedFileSystem) fileSys).getSnapshotDiffReport(snapshotDirPath,
-          s1.getName(), s2.getName());
-      fail("The operation should have failed with AccessControlException");
-    } catch (AccessControlException ace) {
-    }
-    String aceSnapshotDiffPattern =
-        ".*allowed=false.*ugi=theDoctor.*cmd=computeSnapshotDiff.*";
-    length = verifyAuditLogs(aceSnapshotDiffPattern);
-    try {
-      fileSys.close();
-      ((DistributedFileSystem) fileSys).getSnapshotDiffReport(snapshotDirPath,
-          s1.getName(), s2.getName());
-      fail("The operation should have failed with IOException");
-    } catch (IOException e) {
-    }
-    assertTrue("Unexpected log!",
-        length == auditlog.getOutput().split("\n").length);
-  }
-
-  @Test
-  public void testGetQuotaUsage() throws Exception {
-    Path path = new Path("/test");
-    fs.mkdirs(path, new FsPermission((short) 0));
-    String aceGetQuotaUsagePattern =
-        ".*allowed=false.*ugi=theDoctor.*cmd=quotaUsage.*";
-    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
-    try {
-      fileSys.getQuotaUsage(path);
-      fail("The operation should have failed with AccessControlException");
-    } catch (AccessControlException ace) {
-    }
-    int length = verifyAuditLogs(aceGetQuotaUsagePattern);
-    fileSys.close();
-    try {
-      fileSys.getQuotaUsage(path);
-      fail("The operation should have failed with IOException");
-    } catch (IOException e) {
-    }
-    assertTrue("Unexpected log!",
-        length == auditlog.getOutput().split("\n").length);
-  }
-
-  @Test
-  public void testAddCachePool() throws Exception {
-    removeExistingCachePools(null);
-    CachePoolInfo cacheInfo = new CachePoolInfo("pool1").
-        setMode(new FsPermission((short) 0));
-    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
-    try {
-      ((DistributedFileSystem) fileSys).addCachePool(cacheInfo);
-      fail("The operation should have failed with AccessControlException");
-    } catch (AccessControlException ace) {
-    }
-    String aceAddCachePoolPattern =
-        ".*allowed=false.*ugi=theDoctor.*cmd=addCachePool.*";
-    int length = verifyAuditLogs(aceAddCachePoolPattern);
-    try {
-      fileSys.close();
-      ((DistributedFileSystem) fileSys).addCachePool(cacheInfo);
-      fail("The operation should have failed with IOException");
-    } catch (IOException e) {
-    }
-    assertTrue("Unexpected log!",
-        length == auditlog.getOutput().split("\n").length);
-  }
-
-  @Test
-  public void testModifyCachePool() throws Exception {
-    removeExistingCachePools(null);
-    CachePoolInfo cacheInfo = new CachePoolInfo("pool1").
-        setMode(new FsPermission((short) 0));
-      ((DistributedFileSystem) fs).addCachePool(cacheInfo);
-    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
-    try {
-      ((DistributedFileSystem) fileSys).modifyCachePool(cacheInfo);
-      fail("The operation should have failed with AccessControlException");
-    } catch (AccessControlException ace) {
-    }
-    String aceModifyCachePoolPattern =
-        ".*allowed=false.*ugi=theDoctor.*cmd=modifyCachePool.*";
-    int length = verifyAuditLogs(aceModifyCachePoolPattern);
-    try {
-      fileSys.close();
-      ((DistributedFileSystem) fileSys).modifyCachePool(cacheInfo);
-      fail("The operation should have failed with IOException");
-    } catch (IOException e) {
-    }
-    assertTrue("Unexpected log!",
-        length == auditlog.getOutput().split("\n").length);
-  }
-
-  @Test
-  public void testRemoveCachePool() throws Exception {
-    removeExistingCachePools(null);
-    CachePoolInfo cacheInfo = new CachePoolInfo("pool1").
-        setMode(new FsPermission((short) 0));
-    ((DistributedFileSystem) fs).addCachePool(cacheInfo);
-    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
-    try {
-      ((DistributedFileSystem) fileSys).removeCachePool("pool1");
-      fail("The operation should have failed with AccessControlException");
-    } catch (AccessControlException ace) {
-    }
-    String aceRemoveCachePoolPattern =
-        ".*allowed=false.*ugi=theDoctor.*cmd=removeCachePool.*";
-    int length = verifyAuditLogs(aceRemoveCachePoolPattern);
-    assertTrue("Unexpected log!",
-        length == auditlog.getOutput().split("\n").length);
-    try {
-      fileSys.close();
-      ((DistributedFileSystem) fileSys).removeCachePool("pool1");
-      fail("The operation should have failed with IOException");
-    } catch (IOException e) {
-    }
-    assertTrue("Unexpected log!",
-        length == auditlog.getOutput().split("\n").length);
-  }
-
-  @Test
-  public void testGetEZForPath() throws Exception {
-    Path path = new Path("/test");
-    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
-    fs.mkdirs(path,new FsPermission((short)0));
-    String aceGetEzForPathPattern =
-        ".*allowed=false.*ugi=theDoctor.*cmd=getEZForPath.*";
-    try {
-      ((DistributedFileSystem) fileSys).getEZForPath(path);
-      fail("The operation should have failed with AccessControlException");
-    } catch (AccessControlException ace){
-    }
-    int length = verifyAuditLogs(aceGetEzForPathPattern);
-    fileSys.close();
-    try {
-      ((DistributedFileSystem) fileSys).getEZForPath(path);
-      fail("The operation should have failed with IOException");
-    } catch (IOException e){
-    }
-    assertTrue("Unexpected log!",
-        length == auditlog.getOutput().split("\n").length);
-  }
-
-  @Test
-  public void testRenameTo() throws Exception {
-    Path path = new Path("/test");
-    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
-    fs.mkdirs(path,new FsPermission((short)0));
-    String aceRenameToPattern =
-        ".*allowed=false.*ugi=theDoctor.*cmd=rename.*";
-    try {
-      fileSys.rename(path, path);
-      fail("The operation should have failed with AccessControlException");
-    } catch (AccessControlException ace) {
-    }
-    int length = verifyAuditLogs(aceRenameToPattern);
-    fileSys.close();
-    try {
-      fileSys.rename(path, path);
-      fail("The operation should have failed with IOException");
-    } catch (IOException e) {
-    }
-    assertTrue("Unexpected log!",
-        length == auditlog.getOutput().split("\n").length);
-  }
-
-  @Test
-  public void testGetXattrs() throws Exception {
-    Path path = new Path("/test");
-    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
-    fs.mkdirs(path,new FsPermission((short)0));
-    String aceGetXattrsPattern =
-        ".*allowed=false.*ugi=theDoctor.*cmd=getXAttrs.*";
-    try {
-      fileSys.getXAttrs(path);
-      fail("The operation should have failed with AccessControlException");
-    } catch (AccessControlException ace) {
-    }
-    int length = verifyAuditLogs(aceGetXattrsPattern);
-    fileSys.close();
-    try {
-      fileSys.getXAttrs(path);
-      fail("The operation should have failed with IOException");
-    } catch (IOException e) {
-    }
-    assertTrue("Unexpected log!",
-        length == auditlog.getOutput().split("\n").length);
-  }
-
-  @Test
-  public void testListXattrs() throws Exception {
-    Path path = new Path("/test");
-    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
-    fs.mkdirs(path);
-    fs.setOwner(path,user1.getUserName(),user1.getPrimaryGroupName());
-    String aceListXattrsPattern =
-    ".*allowed=true.*ugi=theDoctor.*cmd=listXAttrs.*";
-    fileSys.listXAttrs(path);
-    verifyAuditLogs(aceListXattrsPattern);
-  }
-
-  @Test
-  public void testGetAclStatus() throws Exception {
-    Path path = new Path("/test");
-    fs.mkdirs(path);
-    fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
-    fs.setOwner(path, user1.getUserName(), user1.getPrimaryGroupName());
-    final FSDirectory dir = cluster.getNamesystem().getFSDirectory();
-    final FSDirectory mockedDir = Mockito.spy(dir);
-    AccessControlException ex = new AccessControlException();
-    doThrow(ex).when(mockedDir).getPermissionChecker();
-    cluster.getNamesystem().setFSDirectory(mockedDir);
-    String aceGetAclStatus =
-        ".*allowed=false.*ugi=theDoctor.*cmd=getAclStatus.*";
-    try {
-      fileSys.getAclStatus(path);
-      fail("The operation should have failed with AccessControlException");
-    } catch (AccessControlException ace) {
-    }
-    int length = verifyAuditLogs(aceGetAclStatus);
-    fileSys.close();
-    try {
-      fileSys.getAclStatus(path);
-      verifyAuditLogs(aceGetAclStatus);
-      fail("The operation should have failed with IOException");
-    } catch (IOException ace) {
-    }
-    assertTrue("Unexpected log!",
-        length == auditlog.getOutput().split("\n").length);
-    cluster.getNamesystem().setFSDirectory(dir);
-  }
-
-  private int verifyAuditLogs(String pattern) {
-    int length = auditlog.getOutput().split("\n").length;
-    String lastAudit = auditlog.getOutput().split("\n")[length - 1];
-    assertTrue("Unexpected log!", lastAudit.matches(pattern));
-    return length;
-  }
-
-  private void removeExistingCachePools(String prevPool) throws Exception {
-    BatchedRemoteIterator.BatchedEntries<CachePoolEntry> entries =
-        proto.listCachePools(prevPool);
-    for(int i =0;i < entries.size();i++) {
-      proto.removeCachePool(entries.get(i).getInfo().getPoolName());
-    }
-  }
-}
-