Browse Source

HADOOP-18688. S3A audit header to include count of items in delete ops (#5621)

The auditor-generated http referrer URL now includes the count of keys
to delete in the "ks" query parameter

Contributed by Viraj Jasani
Viraj Jasani 1 year ago
parent
commit
949d5ca20b

+ 5 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/audit/AuditConstants.java

@@ -115,4 +115,9 @@ public final class AuditConstants {
    */
   public static final String PARAM_TIMESTAMP = "ts";
 
+  /**
+   * Num of files to be deleted as part of the bulk delete request.
+   */
+  public static final String DELETE_KEYS_SIZE = "ks";
+
 }

+ 23 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java

@@ -25,6 +25,8 @@ import java.util.HashMap;
 import java.util.Map;
 
 import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.services.s3.model.DeleteObjectRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
 import com.amazonaws.services.s3.model.GetObjectRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,6 +43,7 @@ import org.apache.hadoop.fs.store.LogExactlyOnce;
 import org.apache.hadoop.fs.store.audit.HttpReferrerAuditHeader;
 import org.apache.hadoop.security.UserGroupInformation;
 
+import static org.apache.hadoop.fs.audit.AuditConstants.DELETE_KEYS_SIZE;
 import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_FILESYSTEM_ID;
 import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_PRINCIPAL;
 import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_THREAD0;
@@ -359,6 +362,8 @@ public class LoggingAuditor
         final T request) {
       // attach range for GetObject requests
       attachRangeFromRequest(request);
+      // for delete op, attach the number of files to delete
+      attachDeleteKeySizeAttribute(request);
       // build the referrer header
       final String header = referrer.buildHttpReferrer();
       // update the outer class's field.
@@ -385,6 +390,24 @@ public class LoggingAuditor
       return request;
     }
 
+    /**
+     * For delete requests, attach delete key size as a referrer attribute.
+     *
+     * @param request the request object.
+     * @param <T> type of the request.
+     */
+    private <T extends AmazonWebServiceRequest> void attachDeleteKeySizeAttribute(T request) {
+      if (request instanceof DeleteObjectsRequest) {
+        int keySize = ((DeleteObjectsRequest) request).getKeys().size();
+        this.set(DELETE_KEYS_SIZE, String.valueOf(keySize));
+      } else if (request instanceof DeleteObjectRequest) {
+        String key = ((DeleteObjectRequest) request).getKey();
+        if (key != null && key.length() > 0) {
+          this.set(DELETE_KEYS_SIZE, "1");
+        }
+      }
+    }
+
     @Override
     public String toString() {
       final StringBuilder sb = new StringBuilder(

+ 2 - 0
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing.md

@@ -211,6 +211,7 @@ https://audit.example.org/hadoop/1/op_rename/3c0d9b7e-2a63-43d9-a220-3c574d768ef
     &pr=alice
     &p2=s3a://alice-london/path2
     &ps=235865a0-d399-4696-9978-64568db1b51c
+    &ks=5
     &id=3c0d9b7e-2a63-43d9-a220-3c574d768ef3-3
     &t0=12
     &fs=af5943a9-b6f6-4eec-9c58-008982fc492a
@@ -237,6 +238,7 @@ If any of the field values were `null`, the field is omitted.
 | `t0` | Thread 0: thread span was created in | `100` |
 | `t1` | Thread 1: thread this operation was executed in | `200` |
 | `ts` | Timestamp (UTC epoch millis) | `1617116985923` |
+| `ks` | Key size (num of files) to delete as part of the given request (applicable to delete and rename ops) | `5` |
 
 _Notes_
 

+ 21 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/AbstractAuditingTest.java

@@ -19,9 +19,13 @@
 package org.apache.hadoop.fs.s3a.audit;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
 import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
 import com.amazonaws.services.s3.model.GetObjectRequest;
 import org.junit.After;
@@ -234,4 +238,21 @@ public abstract class AbstractAuditingTest extends AbstractHadoopTestBase {
             .isNull();
   }
 
+  /**
+   * Create head request for bulk delete and pass it through beforeExecution of the manager.
+   *
+   * @param keys keys to be provided in the bulk delete request.
+   * @return a processed request.
+   */
+  protected DeleteObjectsRequest headForBulkDelete(String... keys) {
+    if (keys == null || keys.length == 0) {
+      return null;
+    }
+    List<DeleteObjectsRequest.KeyVersion> keysToDelete = Arrays
+        .stream(keys)
+        .map(DeleteObjectsRequest.KeyVersion::new)
+        .collect(Collectors.toList());
+    return manager.beforeExecution(requestFactory.newBulkDeleteRequest(keysToDelete));
+  }
+
 }

+ 63 - 20
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestHttpReferrerAuditHeader.java

@@ -18,10 +18,12 @@
 
 package org.apache.hadoop.fs.s3a.audit;
 
+import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.Map;
 import java.util.regex.Matcher;
 
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
 import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
 import com.amazonaws.services.s3.model.GetObjectRequest;
 import org.junit.Before;
@@ -36,6 +38,7 @@ import org.apache.hadoop.fs.audit.CommonAuditContext;
 import org.apache.hadoop.fs.store.audit.HttpReferrerAuditHeader;
 import org.apache.hadoop.security.UserGroupInformation;
 
+import static org.apache.hadoop.fs.audit.AuditConstants.DELETE_KEYS_SIZE;
 import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.loggingAuditConfig;
 import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_FILTER;
 import static org.apache.hadoop.fs.s3a.audit.S3LogParser.*;
@@ -104,16 +107,8 @@ public class TestHttpReferrerAuditHeader extends AbstractAuditingTest {
     LOG.info("Header is {}", header);
     Map<String, String> params
         = HttpReferrerAuditHeader.extractQueryParameters(header);
-    assertMapContains(params, PARAM_PRINCIPAL,
-        UserGroupInformation.getCurrentUser().getUserName());
-    assertMapContains(params, PARAM_FILESYSTEM_ID, auditor.getAuditorId());
-    assertMapContains(params, PARAM_OP, OPERATION);
-    assertMapContains(params, PARAM_PATH, PATH_1);
-    assertMapContains(params, PARAM_PATH2, PATH_2);
-    String threadID = CommonAuditContext.currentThreadID();
-    assertMapContains(params, PARAM_THREAD0, threadID);
-    assertMapContains(params, PARAM_THREAD1, threadID);
-    assertMapContains(params, PARAM_ID, span.getSpanId());
+    final String threadId = CommonAuditContext.currentThreadID();
+    compareCommonHeaders(params, PATH_1, PATH_2, threadId, span);
     assertThat(span.getTimestamp())
         .describedAs("Timestamp of " + span)
         .isEqualTo(ts);
@@ -135,16 +130,8 @@ public class TestHttpReferrerAuditHeader extends AbstractAuditingTest {
     AuditSpan span = getManager().createSpan(OPERATION, p1, p2);
     long ts = span.getTimestamp();
     Map<String, String> params = issueRequestAndExtractParameters();
-    assertMapContains(params, PARAM_PRINCIPAL,
-        UserGroupInformation.getCurrentUser().getUserName());
-    assertMapContains(params, PARAM_FILESYSTEM_ID, auditor.getAuditorId());
-    assertMapContains(params, PARAM_OP, OPERATION);
-    assertMapContains(params, PARAM_PATH, p1);
-    assertMapContains(params, PARAM_PATH2, p2);
-    String threadID = CommonAuditContext.currentThreadID();
-    assertMapContains(params, PARAM_THREAD0, threadID);
-    assertMapContains(params, PARAM_THREAD1, threadID);
-    assertMapContains(params, PARAM_ID, span.getSpanId());
+    final String threadId = CommonAuditContext.currentThreadID();
+    compareCommonHeaders(params, p1, p2, threadId, span);
     assertThat(span.getTimestamp())
         .describedAs("Timestamp of " + span)
         .isEqualTo(ts);
@@ -350,6 +337,62 @@ public class TestHttpReferrerAuditHeader extends AbstractAuditingTest {
     assertMapNotContains(params, PARAM_RANGE);
   }
 
+  @Test
+  public void testHttpReferrerForBulkDelete() throws Throwable {
+    AuditSpan span = span();
+    long ts = span.getTimestamp();
+    DeleteObjectsRequest request = headForBulkDelete(
+        "key_01",
+        "key_02",
+        "key_03");
+    Map<String, String> headers
+        = request.getCustomRequestHeaders();
+    assertThat(headers)
+        .describedAs("Custom headers")
+        .containsKey(HEADER_REFERRER);
+    String header = headers.get(HEADER_REFERRER);
+    LOG.info("Header is {}", header);
+    Map<String, String> params
+        = HttpReferrerAuditHeader.extractQueryParameters(header);
+    final String threadId = CommonAuditContext.currentThreadID();
+    compareCommonHeaders(params, PATH_1, PATH_2, threadId, span);
+    assertMapContains(params, DELETE_KEYS_SIZE, "3");
+    assertThat(span.getTimestamp())
+        .describedAs("Timestamp of " + span)
+        .isEqualTo(ts);
+    assertMapNotContains(params, PARAM_RANGE);
+
+    assertMapContains(params, PARAM_TIMESTAMP,
+        Long.toString(ts));
+  }
+
+  /**
+   * Utility to compare common params from the referer header.
+   *
+   * @param params map of params extracted from the header.
+   * @param path1 first path.
+   * @param path2 second path.
+   * @param threadID thread id.
+   * @param span audit span object.
+   * @throws IOException if login fails and/or current user cannot be retrieved.
+   */
+  private void compareCommonHeaders(final Map<String, String> params,
+      final String path1,
+      final String path2,
+      final String threadID,
+      final AuditSpan span) throws IOException {
+    assertMapContains(params, PARAM_PRINCIPAL,
+        UserGroupInformation.getCurrentUser().getUserName());
+    assertMapContains(params, PARAM_FILESYSTEM_ID,
+        auditor.getAuditorId());
+    assertMapContains(params, PARAM_OP, OPERATION);
+    assertMapContains(params, PARAM_PATH, path1);
+    assertMapContains(params, PARAM_PATH2, path2);
+    assertMapContains(params, PARAM_THREAD0, threadID);
+    assertMapContains(params, PARAM_THREAD1, threadID);
+    assertMapContains(params, PARAM_ID, span.getSpanId());
+  }
+
   /**
    * Expect a field with quote stripping to match the expected value.
    * @param str string to strip