Browse Source

HDFS-17469. Audit log for reportBadBlocks RPC (#6731)

cxzl25 1 year ago
parent
commit
23286b0632

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -993,6 +993,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
     checkNNStartup();
     namesystem.reportBadBlocks(blocks);
+    namesystem.logAuditEvent(true, "reportBadBlocks", null);
   }
 
   @Override // ClientProtocol

+ 20 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLoggerWithCommands.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.ipc.RPC;
@@ -1218,6 +1219,25 @@ public class TestAuditLoggerWithCommands {
     verifyAuditLogs(aceDeletePattern);
   }
 
+  @Test
+  public void testReportBadBlocks() throws IOException {
+    String auditLogString =
+            ".*allowed=true.*cmd=reportBadBlocks.*";
+    FSNamesystem fsNamesystem = spy(cluster.getNamesystem());
+    when(fsNamesystem.isExternalInvocation()).thenReturn(true);
+    Server.Call call = spy(new Server.Call(
+            1, 1, null, null, RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3}));
+    when(call.getRemoteUser()).thenReturn(
+            UserGroupInformation.createRemoteUser(System.getProperty("user.name")));
+    Server.getCurCall().set(call);
+    try {
+      cluster.getNameNodeRpc().reportBadBlocks(new LocatedBlock[]{});
+      verifyAuditLogs(auditLogString);
+    } catch (Exception e) {
+      fail(" The operation threw an exception" + e);
+    }
+  }
+
   private void verifyAuditRestoreFailedStorageACE(
       FSNamesystem fsNamesystem, String arg) throws IOException {
     String operationName = fsNamesystem.getFailedStorageCommand(arg);