瀏覽代碼

HDFS-4680. Audit logging of delegation tokens for MR tracing. (Andrew Wang)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.1-beta@1522047 13f79535-47bb-0310-9956-ffa450edef68
Andrew Wang 11 年之前
父節點
當前提交
ea643ffce6

+ 19 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/TokenIdentifier.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.security.token;
 import java.io.IOException;
 import java.util.Arrays;
 
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -35,6 +36,9 @@ import org.apache.hadoop.security.UserGroupInformation;
 @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
 @InterfaceStability.Evolving
 public abstract class TokenIdentifier implements Writable {
+
+  private String trackingId = null;
+
   /**
    * Get the token kind
    * @return the kind of the token
@@ -62,4 +66,19 @@ public abstract class TokenIdentifier implements Writable {
     }
     return Arrays.copyOf(buf.getData(), buf.getLength());
   }
+
+  /**
+   * Returns a tracking identifier that can be used to associate usages of a
+   * token across multiple client sessions.
+   *
+   * Currently, this function just returns an MD5 of {{@link #getBytes()}.
+   *
+   * @return tracking identifier
+   */
+  public String getTrackingId() {
+    if (trackingId == null) {
+      trackingId = DigestUtils.md5Hex(getBytes());
+    }
+    return trackingId;
+  }
 }

+ 34 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java

@@ -86,6 +86,11 @@ extends AbstractDelegationTokenIdentifier>
   private long tokenMaxLifetime;
   private long tokenRemoverScanInterval;
   private long tokenRenewInterval;
+  /**
+   * Whether to store a token's tracking ID in its TokenInformation.
+   * Can be overridden by a subclass.
+   */
+  protected boolean storeTokenTrackingId;
   private Thread tokenRemoverThread;
   protected volatile boolean running;
 
@@ -102,6 +107,7 @@ extends AbstractDelegationTokenIdentifier>
     this.tokenMaxLifetime = delegationTokenMaxLifetime;
     this.tokenRenewInterval = delegationTokenRenewInterval;
     this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval;
+    this.storeTokenTrackingId = false;
   }
 
   /** should be called before this object is used */
@@ -201,7 +207,7 @@ extends AbstractDelegationTokenIdentifier>
     }
     if (currentTokens.get(identifier) == null) {
       currentTokens.put(identifier, new DelegationTokenInformation(renewDate,
-          password));
+          password, getTrackingIdIfEnabled(identifier)));
     } else {
       throw new IOException(
           "Same delegation token being added twice.");
@@ -280,7 +286,7 @@ extends AbstractDelegationTokenIdentifier>
     byte[] password = createPassword(identifier.getBytes(), currentKey.getKey());
     storeNewToken(identifier, now + tokenRenewInterval);
     currentTokens.put(identifier, new DelegationTokenInformation(now
-        + tokenRenewInterval, password));
+        + tokenRenewInterval, password, getTrackingIdIfEnabled(identifier)));
     return password;
   }
 
@@ -299,6 +305,21 @@ extends AbstractDelegationTokenIdentifier>
     return info.getPassword();
   }
 
+  protected String getTrackingIdIfEnabled(TokenIdent ident) {
+    if (storeTokenTrackingId) {
+      return ident.getTrackingId();
+    }
+    return null;
+  }
+
+  public synchronized String getTokenTrackingId(TokenIdent identifier) {
+    DelegationTokenInformation info = currentTokens.get(identifier);
+    if (info == null) {
+      return null;
+    }
+    return info.getTrackingId();
+  }
+
   /**
    * Verifies that the given identifier and password are valid and match.
    * @param identifier Token identifier.
@@ -359,8 +380,9 @@ extends AbstractDelegationTokenIdentifier>
           + " is trying to renew a token with " + "wrong password");
     }
     long renewTime = Math.min(id.getMaxDate(), now + tokenRenewInterval);
+    String trackingId = getTrackingIdIfEnabled(id);
     DelegationTokenInformation info = new DelegationTokenInformation(renewTime,
-        password);
+        password, trackingId);
 
     if (currentTokens.get(id) == null) {
       throw new InvalidToken("Renewal request for unknown token");
@@ -420,9 +442,13 @@ extends AbstractDelegationTokenIdentifier>
   public static class DelegationTokenInformation {
     long renewDate;
     byte[] password;
-    public DelegationTokenInformation(long renewDate, byte[] password) {
+    String trackingId;
+
+    public DelegationTokenInformation(long renewDate, byte[] password,
+        String trackingId) {
       this.renewDate = renewDate;
       this.password = password;
+      this.trackingId = trackingId;
     }
     /** returns renew date */
     public long getRenewDate() {
@@ -432,6 +458,10 @@ extends AbstractDelegationTokenIdentifier>
     byte[] getPassword() {
       return password;
     }
+    /** returns tracking id */
+    public String getTrackingId() {
+      return trackingId;
+    }
   }
   
   /** Remove expired delegation tokens from cache */

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -72,6 +72,8 @@ Release 2.1.1-beta - UNRELEASED
 
     HDFS-5150. Allow per NN SPN for internal SPNEGO. (kihwal)
 
+    HDFS-4680. Audit logging of delegation tokens for MR tracing. (Andrew Wang)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -264,6 +264,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_CLIENT_LOCAL_INTERFACES = "dfs.client.local.interfaces";
   public static final String  DFS_NAMENODE_AUDIT_LOGGERS_KEY = "dfs.namenode.audit.loggers";
   public static final String  DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME = "default";
+  public static final String  DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY = "dfs.namenode.audit.log.token.tracking.id";
+  public static final boolean DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT = false;
 
   // Much code in hdfs is not yet updated to use these keys.
   public static final String  DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY = "dfs.client.block.write.locateFollowingBlock.retries";

+ 15 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java

@@ -58,6 +58,15 @@ public class DelegationTokenSecretManager
       .getLog(DelegationTokenSecretManager.class);
   
   private final FSNamesystem namesystem;
+
+  public DelegationTokenSecretManager(long delegationKeyUpdateInterval,
+      long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
+      long delegationTokenRemoverScanInterval, FSNamesystem namesystem) {
+    this(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
+        delegationTokenRenewInterval, delegationTokenRemoverScanInterval, false,
+        namesystem);
+  }
+
   /**
    * Create a secret manager
    * @param delegationKeyUpdateInterval the number of seconds for rolling new
@@ -67,13 +76,16 @@ public class DelegationTokenSecretManager
    * @param delegationTokenRenewInterval how often the tokens must be renewed
    * @param delegationTokenRemoverScanInterval how often the tokens are scanned
    *        for expired tokens
+   * @param storeTokenTrackingId whether to store the token's tracking id
    */
   public DelegationTokenSecretManager(long delegationKeyUpdateInterval,
       long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
-      long delegationTokenRemoverScanInterval, FSNamesystem namesystem) {
+      long delegationTokenRemoverScanInterval, boolean storeTokenTrackingId,
+      FSNamesystem namesystem) {
     super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
         delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
     this.namesystem = namesystem;
+    this.storeTokenTrackingId = storeTokenTrackingId;
   }
 
   @Override //SecretManager
@@ -184,7 +196,7 @@ public class DelegationTokenSecretManager
     }
     if (currentTokens.get(identifier) == null) {
       currentTokens.put(identifier, new DelegationTokenInformation(expiryTime,
-          password));
+          password, getTrackingIdIfEnabled(identifier)));
     } else {
       throw new IOException(
           "Same delegation token being added twice; invalid entry in fsimage or editlogs");
@@ -223,7 +235,7 @@ public class DelegationTokenSecretManager
       byte[] password = createPassword(identifier.getBytes(), allKeys
           .get(keyId).getKey());
       currentTokens.put(identifier, new DelegationTokenInformation(expiryTime,
-          password));
+          password, getTrackingIdIfEnabled(identifier)));
     }
   }
 

+ 40 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -36,6 +36,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KE
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY;
@@ -218,6 +220,8 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
@@ -293,8 +297,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
           stat.getGroup(), symlink, path);
     }
     for (AuditLogger logger : auditLoggers) {
-      logger.logAuditEvent(succeeded, ugi.toString(), addr,
-          cmd, src, dst, status);
+      if (logger instanceof HdfsAuditLogger) {
+        HdfsAuditLogger hdfsLogger = (HdfsAuditLogger) logger;
+        hdfsLogger.logAuditEvent(succeeded, ugi.toString(), addr, cmd, src, dst,
+            status, ugi, dtSecretManager);
+      } else {
+        logger.logAuditEvent(succeeded, ugi.toString(), addr,
+            cmd, src, dst, status);
+      }
     }
   }
 
@@ -5840,7 +5850,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
             DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT),
         conf.getLong(DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
             DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT),
-        DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL, this);
+        DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL,
+        conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY,
+            DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT),
+        this);
   }
 
   /**
@@ -6647,17 +6660,22 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * defined in the config file. It can also be explicitly listed in the
    * config file.
    */
-  private static class DefaultAuditLogger implements AuditLogger {
+  private static class DefaultAuditLogger extends HdfsAuditLogger {
+
+    private boolean logTokenTrackingId;
 
     @Override
     public void initialize(Configuration conf) {
-      // Nothing to do.
+      logTokenTrackingId = conf.getBoolean(
+          DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY,
+          DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT);
     }
 
     @Override
     public void logAuditEvent(boolean succeeded, String userName,
         InetAddress addr, String cmd, String src, String dst,
-        FileStatus status) {
+        FileStatus status, UserGroupInformation ugi,
+        DelegationTokenSecretManager dtSecretManager) {
       if (auditLog.isInfoEnabled()) {
         final StringBuilder sb = auditBuffer.get();
         sb.setLength(0);
@@ -6675,6 +6693,22 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
           sb.append(status.getGroup()).append(":");
           sb.append(status.getPermission());
         }
+        if (logTokenTrackingId) {
+          sb.append("\t").append("trackingId=");
+          String trackingId = null;
+          if (ugi != null && dtSecretManager != null
+              && ugi.getAuthenticationMethod() == AuthenticationMethod.TOKEN) {
+            for (TokenIdentifier tid: ugi.getTokenIdentifiers()) {
+              if (tid instanceof DelegationTokenIdentifier) {
+                DelegationTokenIdentifier dtid =
+                    (DelegationTokenIdentifier)tid;
+                trackingId = dtSecretManager.getTokenTrackingId(dtid);
+                break;
+              }
+            }
+          }
+          sb.append(trackingId);
+        }
         auditLog.info(sb);
       }
     }

+ 66 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/HdfsAuditLogger.java

@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.net.InetAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Extension of {@link AuditLogger}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class HdfsAuditLogger implements AuditLogger {
+
+  @Override
+  public void logAuditEvent(boolean succeeded, String userName,
+      InetAddress addr, String cmd, String src, String dst,
+      FileStatus status) {
+    logAuditEvent(succeeded, userName, addr, cmd, src, dst, status, null,
+        null);
+  }
+
+  /**
+   * Same as
+   * {@link #logAuditEvent(boolean, String, InetAddress, String, String, String, FileStatus)}
+   * with additional parameters related to logging delegation token tracking
+   * IDs.
+   * 
+   * @param succeeded Whether authorization succeeded.
+   * @param userName Name of the user executing the request.
+   * @param addr Remote address of the request.
+   * @param cmd The requested command.
+   * @param src Path of affected source file.
+   * @param dst Path of affected destination file (if any).
+   * @param stat File information for operations that change the file's metadata
+   *          (permissions, owner, times, etc).
+   * @param ugi UserGroupInformation of the current user, or null if not logging
+   *          token tracking information
+   * @param dtSecretManager The token secret manager, or null if not logging
+   *          token tracking information
+   */
+  public abstract void logAuditEvent(boolean succeeded, String userName,
+      InetAddress addr, String cmd, String src, String dst,
+      FileStatus stat, UserGroupInformation ugi,
+      DelegationTokenSecretManager dtSecretManager);
+}