Browse Source

HDFS-7218. FSNamesystem ACL operations should write to audit log on failure. (clamb via yliu)

yliu 10 years ago
parent
commit
73e601259f

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -1010,6 +1010,9 @@ Release 2.6.0 - UNRELEASED
     fails on Windows, because we cannot deny access to the file owner.
     fails on Windows, because we cannot deny access to the file owner.
     (Chris Nauroth via wheat9)
     (Chris Nauroth via wheat9)
 
 
+    HDFS-7218. FSNamesystem ACL operations should write to audit log on
+    failure. (clamb via yliu)
+
     BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
     BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
   
   
       HDFS-6387. HDFS CLI admin tool for creating & deleting an
       HDFS-6387. HDFS CLI admin tool for creating & deleting an

+ 25 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -7862,6 +7862,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   public FSDirectory getFSDirectory() {
   public FSDirectory getFSDirectory() {
     return dir;
     return dir;
   }
   }
+  /** Set the FSDirectory. */
+  @VisibleForTesting
+  public void setFSDirectory(FSDirectory dir) {
+    this.dir = dir;
+  }
   /** @return the cache manager. */
   /** @return the cache manager. */
   public CacheManager getCacheManager() {
   public CacheManager getCacheManager() {
     return cacheManager;
     return cacheManager;
@@ -8728,6 +8733,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       List<AclEntry> newAcl = dir.modifyAclEntries(src, aclSpec);
       List<AclEntry> newAcl = dir.modifyAclEntries(src, aclSpec);
       getEditLog().logSetAcl(src, newAcl);
       getEditLog().logSetAcl(src, newAcl);
       resultingStat = getAuditFileInfo(src, false);
       resultingStat = getAuditFileInfo(src, false);
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "modifyAclEntries", srcArg);
+      throw e;
     } finally {
     } finally {
       writeUnlock();
       writeUnlock();
     }
     }
@@ -8752,6 +8760,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       List<AclEntry> newAcl = dir.removeAclEntries(src, aclSpec);
       List<AclEntry> newAcl = dir.removeAclEntries(src, aclSpec);
       getEditLog().logSetAcl(src, newAcl);
       getEditLog().logSetAcl(src, newAcl);
       resultingStat = getAuditFileInfo(src, false);
       resultingStat = getAuditFileInfo(src, false);
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "removeAclEntries", srcArg);
+      throw e;
     } finally {
     } finally {
       writeUnlock();
       writeUnlock();
     }
     }
@@ -8775,6 +8786,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       List<AclEntry> newAcl = dir.removeDefaultAcl(src);
       List<AclEntry> newAcl = dir.removeDefaultAcl(src);
       getEditLog().logSetAcl(src, newAcl);
       getEditLog().logSetAcl(src, newAcl);
       resultingStat = getAuditFileInfo(src, false);
       resultingStat = getAuditFileInfo(src, false);
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "removeDefaultAcl", srcArg);
+      throw e;
     } finally {
     } finally {
       writeUnlock();
       writeUnlock();
     }
     }
@@ -8798,6 +8812,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       dir.removeAcl(src);
       dir.removeAcl(src);
       getEditLog().logSetAcl(src, AclFeature.EMPTY_ENTRY_LIST);
       getEditLog().logSetAcl(src, AclFeature.EMPTY_ENTRY_LIST);
       resultingStat = getAuditFileInfo(src, false);
       resultingStat = getAuditFileInfo(src, false);
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "removeAcl", srcArg);
+      throw e;
     } finally {
     } finally {
       writeUnlock();
       writeUnlock();
     }
     }
@@ -8821,6 +8838,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       List<AclEntry> newAcl = dir.setAcl(src, aclSpec);
       List<AclEntry> newAcl = dir.setAcl(src, aclSpec);
       getEditLog().logSetAcl(src, newAcl);
       getEditLog().logSetAcl(src, newAcl);
       resultingStat = getAuditFileInfo(src, false);
       resultingStat = getAuditFileInfo(src, false);
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "setAcl", srcArg);
+      throw e;
     } finally {
     } finally {
       writeUnlock();
       writeUnlock();
     }
     }
@@ -8833,6 +8853,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     FSPermissionChecker pc = getPermissionChecker();
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.READ);
     checkOperation(OperationCategory.READ);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    boolean success = false;
     readLock();
     readLock();
     try {
     try {
       checkOperation(OperationCategory.READ);
       checkOperation(OperationCategory.READ);
@@ -8840,9 +8861,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       if (isPermissionEnabled) {
       if (isPermissionEnabled) {
         checkPermission(pc, src, false, null, null, null, null);
         checkPermission(pc, src, false, null, null, null, null);
       }
       }
-      return dir.getAclStatus(src);
+      final AclStatus ret = dir.getAclStatus(src);
+      success = true;
+      return ret;
     } finally {
     } finally {
       readUnlock();
       readUnlock();
+      logAuditEvent(success, "getAclStatus", src);
     }
     }
   }
   }
 
 

+ 95 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java

@@ -19,30 +19,39 @@
 package org.apache.hadoop.hdfs.server.namenode;
 package org.apache.hadoop.hdfs.server.namenode;
 
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY;
+import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.anyString;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.net.HttpURLConnection;
 import java.net.HttpURLConnection;
 import java.net.InetAddress;
 import java.net.InetAddress;
 import java.net.URI;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URISyntaxException;
+import java.util.List;
 
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.web.resources.GetOpParam;
 import org.apache.hadoop.hdfs.web.resources.GetOpParam;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.authorize.ProxyServers;
 import org.apache.hadoop.security.authorize.ProxyServers;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 
 /**
 /**
  * Tests for the {@link AuditLogger} custom audit logging interface.
  * Tests for the {@link AuditLogger} custom audit logging interface.
@@ -166,6 +175,87 @@ public class TestAuditLogger {
     }
     }
   }
   }
 
 
+  @Test
+  public void testAuditLogWithAclFailure() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFS_NAMENODE_ACLS_ENABLED_KEY, true);
+    conf.set(DFS_NAMENODE_AUDIT_LOGGERS_KEY,
+        DummyAuditLogger.class.getName());
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    try {
+      cluster.waitClusterUp();
+      final FSDirectory dir = cluster.getNamesystem().getFSDirectory();
+      // Set up mock FSDirectory to test FSN audit logging during failure
+      final FSDirectory mockedDir = Mockito.spy(dir);
+      Mockito.doThrow(new AccessControlException("mock setAcl exception")).
+          when(mockedDir).
+          setAcl(anyString(), anyListOf(AclEntry.class));
+      Mockito.doThrow(new AccessControlException("mock getAclStatus exception")).
+          when(mockedDir).
+          getAclStatus(anyString());
+      Mockito.doThrow(new AccessControlException("mock removeAcl exception")).
+          when(mockedDir).
+          removeAcl(anyString());
+      Mockito.doThrow(new AccessControlException("mock removeDefaultAcl exception")).
+          when(mockedDir).
+          removeDefaultAcl(anyString());
+      Mockito.doThrow(new AccessControlException("mock removeAclEntries exception")).
+          when(mockedDir).
+          removeAclEntries(anyString(), anyListOf(AclEntry.class));
+      Mockito.doThrow(new AccessControlException("mock modifyAclEntries exception")).
+          when(mockedDir).
+          modifyAclEntries(anyString(), anyListOf(AclEntry.class));
+      // Replace the FSD with the mock FSD.
+      cluster.getNamesystem().setFSDirectory(mockedDir);
+      assertTrue(DummyAuditLogger.initialized);
+      DummyAuditLogger.resetLogCount();
+
+      final FileSystem fs = cluster.getFileSystem();
+      final Path p = new Path("/");
+      final List<AclEntry> acls = Lists.newArrayList();
+
+      try {
+        fs.getAclStatus(p);
+      } catch (AccessControlException e) {
+        assertExceptionContains("mock getAclStatus exception", e);
+      }
+
+      try {
+        fs.setAcl(p, acls);
+      } catch (AccessControlException e) {
+        assertExceptionContains("mock setAcl exception", e);
+      }
+
+      try {
+        fs.removeAcl(p);
+      } catch (AccessControlException e) {
+        assertExceptionContains("mock removeAcl exception", e);
+      }
+
+      try {
+        fs.removeDefaultAcl(p);
+      } catch (AccessControlException e) {
+        assertExceptionContains("mock removeDefaultAcl exception", e);
+      }
+
+      try {
+        fs.removeAclEntries(p, acls);
+      } catch (AccessControlException e) {
+        assertExceptionContains("mock removeAclEntries exception", e);
+      }
+
+      try {
+        fs.modifyAclEntries(p, acls);
+      } catch (AccessControlException e) {
+        assertExceptionContains("mock modifyAclEntries exception", e);
+      }
+      assertEquals(6, DummyAuditLogger.logCount);
+      assertEquals(6, DummyAuditLogger.unsuccessfulCount);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   /**
   /**
    * Tests that a broken audit logger causes requests to fail.
    * Tests that a broken audit logger causes requests to fail.
    */
    */
@@ -194,6 +284,7 @@ public class TestAuditLogger {
 
 
     static boolean initialized;
     static boolean initialized;
     static int logCount;
     static int logCount;
+    static int unsuccessfulCount;
     static short foundPermission;
     static short foundPermission;
     static String remoteAddr;
     static String remoteAddr;
     
     
@@ -203,6 +294,7 @@ public class TestAuditLogger {
 
 
     public static void resetLogCount() {
     public static void resetLogCount() {
       logCount = 0;
       logCount = 0;
+      unsuccessfulCount = 0;
     }
     }
 
 
     public void logAuditEvent(boolean succeeded, String userName,
     public void logAuditEvent(boolean succeeded, String userName,
@@ -210,6 +302,9 @@ public class TestAuditLogger {
         FileStatus stat) {
         FileStatus stat) {
       remoteAddr = addr.getHostAddress();
       remoteAddr = addr.getHostAddress();
       logCount++;
       logCount++;
+      if (!succeeded) {
+        unsuccessfulCount++;
+      }
       if (stat != null) {
       if (stat != null) {
         foundPermission = stat.getPermission().toShort();
         foundPermission = stat.getPermission().toShort();
       }
       }