浏览代码

YARN-7225. Add queue and partition info to RM audit log. Contributed by Eric Payne

(cherry picked from commit 62ec800bd4fcb2b6704cd7ee306ed192b07d636f)
Jonathan Hung 6 年之前
父节点
当前提交
2becae83fd

+ 8 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java

@@ -558,7 +558,8 @@ public class ClientRMService extends AbstractService implements
       LOG.warn("Unable to get the current user.", ie);
       RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
           ie.getMessage(), "ClientRMService",
-          "Exception in submitting application", applicationId, callerContext);
+          "Exception in submitting application", applicationId, callerContext,
+          submissionContext.getQueue());
       throw RPCUtil.getRemoteException(ie);
     }
 
@@ -580,7 +581,8 @@ public class ClientRMService extends AbstractService implements
             ". Flow run should be a long integer", e);
         RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
             e.getMessage(), "ClientRMService",
-            "Exception in submitting application", applicationId);
+            "Exception in submitting application", applicationId,
+            submissionContext.getQueue());
         throw RPCUtil.getRemoteException(e);
       }
     }
@@ -639,12 +641,14 @@ public class ClientRMService extends AbstractService implements
       LOG.info("Application with id " + applicationId.getId() + 
           " submitted by user " + user);
       RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
-          "ClientRMService", applicationId, callerContext);
+          "ClientRMService", applicationId, callerContext,
+          submissionContext.getQueue());
     } catch (YarnException e) {
       LOG.info("Exception in submitting " + applicationId, e);
       RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
           e.getMessage(), "ClientRMService",
-          "Exception in submitting application", applicationId, callerContext);
+          "Exception in submitting application", applicationId, callerContext,
+          submissionContext.getQueue());
       throw e;
     }
 

+ 71 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java

@@ -40,7 +40,7 @@ public class RMAuditLogger {
   static enum Keys {USER, OPERATION, TARGET, RESULT, IP, PERMISSIONS,
                     DESCRIPTION, APPID, APPATTEMPTID, CONTAINERID, 
                     CALLERCONTEXT, CALLERSIGNATURE, RESOURCE, QUEUENAME,
-                    INCLUDEAPPS, INCLUDECHILDQUEUES, RECURSIVE}
+                    INCLUDEAPPS, INCLUDECHILDQUEUES, RECURSIVE, NODELABEL}
 
   public static class AuditConstants {
     static final String SUCCESS = "SUCCESS";
@@ -98,7 +98,7 @@ public class RMAuditLogger {
       ApplicationId appId, ApplicationAttemptId attemptId,
       ContainerId containerId, Resource resource) {
     return createSuccessLog(user, operation, target, appId, attemptId,
-        containerId, resource, null, Server.getRemoteIp());
+        containerId, resource, null, Server.getRemoteIp(), null, null);
   }
 
   /**
@@ -124,7 +124,7 @@ public class RMAuditLogger {
   static String createSuccessLog(String user, String operation, String target,
       ApplicationId appId, ApplicationAttemptId attemptId,
       ContainerId containerId, Resource resource, CallerContext callerContext,
-      InetAddress ip) {
+      InetAddress ip, String queueName, String partition) {
     StringBuilder b =
         createStringBuilderForSuccessEvent(user, operation, target, ip);
     if (appId != null) {
@@ -140,6 +140,12 @@ public class RMAuditLogger {
       add(Keys.RESOURCE, resource.toString(), b);
     }
     appendCallerContext(b, callerContext);
+    if (queueName != null) {
+      add(Keys.QUEUENAME, queueName, b);
+    }
+    if (partition != null) {
+      add(Keys.NODELABEL, partition, b);
+    }
     return b.toString();
   }
   
@@ -202,6 +208,32 @@ public class RMAuditLogger {
     }
   }
 
+  /**
+   * Create a readable and parseable audit log string for a successful event.
+   *
+   * @param user User who made the service request to the ResourceManager
+   * @param operation Operation requested by the user.
+   * @param target The target on which the operation is being performed.
+   * @param appId Application Id in which operation was performed.
+   * @param containerId Container Id in which operation was performed.
+   * @param resource Resource associated with container.
+   * @param queueName Name of queue.
+   * @param partition Name of labeled partition.
+   *
+   * <br><br>
+   * Note that the {@link RMAuditLogger} uses tabs ('\t') as a key-val delimiter
+   * and hence the value fields should not contains tabs ('\t').
+   */
+  public static void logSuccess(String user, String operation, String target,
+      ApplicationId appId, ContainerId containerId, Resource resource,
+      String queueName, String partition) {
+    if (LOG.isInfoEnabled()) {
+      LOG.info(createSuccessLog(user, operation, target, appId, null,
+          containerId, resource, null, Server.getRemoteIp(), queueName,
+          partition));
+    }
+  }
+
   /**
    * Create a general readable and parseable audit log string for a successful
    * event.
@@ -263,12 +295,20 @@ public class RMAuditLogger {
           null, null));
     }
   }
-  
+
   public static void logSuccess(String user, String operation, String target,
       ApplicationId appId, CallerContext callerContext) {
     if (LOG.isInfoEnabled()) {
       LOG.info(createSuccessLog(user, operation, target, appId, null, null,
-          null, callerContext, Server.getRemoteIp()));
+          null, callerContext, Server.getRemoteIp(), null, null));
+    }
+  }
+
+  public static void logSuccess(String user, String operation, String target,
+      ApplicationId appId, CallerContext callerContext, String queueName) {
+    if (LOG.isInfoEnabled()) {
+      LOG.info(createSuccessLog(user, operation, target, appId, null, null,
+          null, callerContext, Server.getRemoteIp(), queueName, null));
     }
   }
 
@@ -296,7 +336,7 @@ public class RMAuditLogger {
       ApplicationId appId, InetAddress ip) {
     if (LOG.isInfoEnabled()) {
       LOG.info(createSuccessLog(user, operation, target, appId, null, null,
-          null, null, ip));
+          null, null, ip, null, null));
     }
   }
 
@@ -355,7 +395,7 @@ public class RMAuditLogger {
   static String createFailureLog(String user, String operation, String perm,
       String target, String description, ApplicationId appId,
       ApplicationAttemptId attemptId, ContainerId containerId,
-      Resource resource, CallerContext callerContext) {
+      Resource resource, CallerContext callerContext, String queueName) {
     StringBuilder b = createStringBuilderForFailureLog(user,
         operation, target, description, perm);
     if (appId != null) {
@@ -371,6 +411,9 @@ public class RMAuditLogger {
       add(Keys.RESOURCE, resource.toString(), b);
     }
     appendCallerContext(b, callerContext);
+    if (queueName != null) {
+      add(Keys.QUEUENAME, queueName, b);
+    }
     return b.toString();
   }
 
@@ -381,7 +424,7 @@ public class RMAuditLogger {
       String target, String description, ApplicationId appId,
       ApplicationAttemptId attemptId, ContainerId containerId, Resource resource) {
     return createFailureLog(user, operation, perm, target, description, appId,
-        attemptId, containerId, resource, null);
+        attemptId, containerId, resource, null, null);
   }
 
   /**
@@ -447,13 +490,22 @@ public class RMAuditLogger {
           appId, attemptId, null, null));
     }
   }
-  
+
   public static void logFailure(String user, String operation, String perm,
       String target, String description, ApplicationId appId,
       CallerContext callerContext) {
     if (LOG.isWarnEnabled()) {
       LOG.warn(createFailureLog(user, operation, perm, target, description,
-          appId, null, null, null, callerContext));
+          appId, null, null, null, callerContext, null));
+    }
+  }
+
+  public static void logFailure(String user, String operation, String perm,
+      String target, String description, ApplicationId appId,
+      CallerContext callerContext, String queueName) {
+    if (LOG.isWarnEnabled()) {
+      LOG.warn(createFailureLog(user, operation, perm, target, description,
+          appId, null, null, null, callerContext, queueName));
     }
   }
 
@@ -480,6 +532,15 @@ public class RMAuditLogger {
     }
   }
 
+  public static void logFailure(String user, String operation, String perm,
+      String target, String description, ApplicationId appId,
+      String queueName) {
+    if (LOG.isWarnEnabled()) {
+      LOG.warn(createFailureLog(user, operation, perm, target, description,
+          appId, null, null, null, null, queueName));
+    }
+  }
+
   /**
    * Create a readable and parseable audit log string for a failed event.
    *

+ 17 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java

@@ -191,9 +191,16 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
 
       containersToPreempt.remove(containerId);
 
+      // In order to save space in the audit log, only include the partition
+      // if it is not the default partition.
+      String containerPartition = null;
+      if (partition != null && !partition.isEmpty()) {
+        containerPartition = partition;
+      }
       Resource containerResource = rmContainer.getContainer().getResource();
       RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER,
-          "SchedulerApp", getApplicationId(), containerId, containerResource);
+          "SchedulerApp", getApplicationId(), containerId, containerResource,
+          getQueueName(), containerPartition);
 
       // Update usage metrics
       queue.getMetrics().releaseResources(partition,
@@ -557,9 +564,17 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
                 + " host=" + rmContainer.getAllocatedNode().getHost()
                 + " type=" + allocation.getAllocationLocalityType());
           }
+          // In order to save space in the audit log, only include the partition
+          // if it is not the default partition.
+          String partition =
+              schedulerContainer.getSchedulerNode().getPartition();
+          if (partition != null && partition.isEmpty()) {
+            partition = null;
+          }
           RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
               "SchedulerApp", getApplicationId(), containerId,
-              allocation.getAllocatedOrReservedResource());
+              allocation.getAllocatedOrReservedResource(), getQueueName(),
+              partition);
         } else {
           // If the rmContainer's state is already updated to RESERVED, this is
           // a reReservation

+ 3 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java

@@ -168,7 +168,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
 
       Resource containerResource = rmContainer.getContainer().getResource();
       RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER,
-          "SchedulerApp", getApplicationId(), containerId, containerResource);
+          "SchedulerApp", getApplicationId(), containerId, containerResource,
+          rmContainer.getQueueName(), null);
 
       // Update usage metrics
       queue.getMetrics().releaseResources(
@@ -486,7 +487,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
       }
       RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
           "SchedulerApp", getApplicationId(), container.getId(),
-          container.getResource());
+          container.getResource(), getQueueName(), null);
     } finally {
       writeLock.unlock();
     }

+ 9 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java

@@ -98,9 +98,17 @@ public class FifoAppAttempt extends FiCaSchedulerApp {
             .getApplicationAttemptId() + " container=" + containerId + " host="
             + container.getNodeId().getHost() + " type=" + type);
       }
+      // In order to save space in the audit log, only include the partition
+      // if it is not the default partition.
+      String partition = null;
+      if (appAMNodePartitionName != null &&
+            !appAMNodePartitionName.isEmpty()) {
+        partition = appAMNodePartitionName;
+      }
       RMAuditLogger.logSuccess(getUser(),
           RMAuditLogger.AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
-          getApplicationId(), containerId, container.getResource());
+          getApplicationId(), containerId, container.getResource(),
+          getQueueName(), partition);
 
       return rmContainer;
     } finally {

+ 21 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAuditLogger.java

@@ -67,6 +67,7 @@ public class TestRMAuditLogger {
   private static final Resource RESOURCE = mock(Resource.class);
   private static final String CALLER_CONTEXT = "context";
   private static final byte[] CALLER_SIGNATURE = "signature".getBytes();
+  private static final String PARTITION = "label1";
 
   @Before
   public void setUp() throws Exception {
@@ -132,6 +133,14 @@ public class TestRMAuditLogger {
         ApplicationAttemptId attemptId, ContainerId containerId,
         CallerContext callerContext, Resource resource, InetAddress remoteIp,
         RMAuditLogger.ArgsBuilder args) {
+    testSuccessLogFormatHelper(checkIP, appId, attemptId, containerId,
+        callerContext, resource, remoteIp, args, null, null);
+  }
+
+  private void testSuccessLogFormatHelper(boolean checkIP, ApplicationId appId,
+        ApplicationAttemptId attemptId, ContainerId containerId,
+        CallerContext callerContext, Resource resource, InetAddress remoteIp,
+        RMAuditLogger.ArgsBuilder args, String queueName, String partition) {
     String sLog;
     InetAddress tmpIp = checkIP ? remoteIp : null;
     if (args != null) {
@@ -139,7 +148,8 @@ public class TestRMAuditLogger {
           tmpIp, args);
     } else {
       sLog = RMAuditLogger.createSuccessLog(USER, OPERATION, TARGET, appId,
-          attemptId, containerId, resource, callerContext, tmpIp);
+          attemptId, containerId, resource, callerContext, tmpIp, queueName,
+          partition);
     }
     StringBuilder expLog = new StringBuilder();
     expLog.append("USER=test\t");
@@ -177,6 +187,13 @@ public class TestRMAuditLogger {
     if (args != null) {
       expLog.append("\tQUEUENAME=root");
       expLog.append("\tRECURSIVE=true");
+    } else {
+      if (queueName != null) {
+        expLog.append("\tQUEUENAME=" + QUEUE);
+      }
+    }
+    if (partition != null) {
+      expLog.append("\tNODELABEL=" + PARTITION);
     }
     assertEquals(expLog.toString(), sLog);
   }
@@ -258,6 +275,8 @@ public class TestRMAuditLogger {
         .append(Keys.QUEUENAME, QUEUE).append(Keys.RECURSIVE, "true");
     testSuccessLogFormatHelper(checkIP, null, null, null, null, null,
         Server.getRemoteIp(), args);
+    testSuccessLogFormatHelper(checkIP, null, null, null, null, null,
+        Server.getRemoteIp(), null, QUEUE, PARTITION);
     testSuccessLogFormatHelperWithIP(checkIP, APPID, ATTEMPTID, CONTAINERID);
     testSuccessLogNulls(checkIP);
   }
@@ -283,7 +302,7 @@ public class TestRMAuditLogger {
         RMAuditLogger.ArgsBuilder args) {
     String fLog = args == null ?
       RMAuditLogger.createFailureLog(USER, OPERATION, PERM, TARGET, DESC,
-          appId, attemptId, containerId, resource, callerContext) :
+          appId, attemptId, containerId, resource, callerContext, null) :
         RMAuditLogger.createFailureLog(USER, OPERATION, PERM, TARGET, DESC,
             args);
     StringBuilder expLog = new StringBuilder();