Browse Source

YARN-4720. Skip unnecessary NN operations in log aggregation. (Jun Gong via mingma)

Ming Ma 9 years ago
parent
commit
7f3139e54d

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -884,6 +884,9 @@ Release 2.8.0 - UNRELEASED
     YARN-4066. Large number of queues choke fair scheduler.
     (Johan Gustavsson via kasha)
 
+    YARN-4720. Skip unnecessary NN operations in log aggregation.
+    (Jun Gong via mingma)
+
   BUG FIXES
 
     YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena 

+ 44 - 24
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java

@@ -127,6 +127,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
   // This variable is only for testing
   private final AtomicBoolean waiting = new AtomicBoolean(false);
 
+  // This variable is only for testing
+  private int logAggregationTimes = 0;
+
   private boolean renameTemporaryLogFileFailed = false;
 
   private final Map<ContainerId, ContainerLogAggregator> containerLogAggregators =
@@ -311,7 +314,15 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     }
 
     LogWriter writer = null;
+    String diagnosticMessage = "";
+    boolean logAggregationSucceedInThisCycle = true;
     try {
+      if (pendingContainerInThisCycle.isEmpty()) {
+        return;
+      }
+
+      logAggregationTimes++;
+
       try {
         writer =
             new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp,
@@ -321,6 +332,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
         writer.writeApplicationOwner(this.userUgi.getShortUserName());
 
       } catch (IOException e1) {
+        logAggregationSucceedInThisCycle = false;
         LOG.error("Cannot create writer for app " + this.applicationId
             + ". Skip log upload this time. ", e1);
         return;
@@ -369,20 +381,16 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
                 remoteNodeLogFileForApp.getName() + "_"
                     + currentTime);
 
-      String diagnosticMessage = "";
-      boolean logAggregationSucceedInThisCycle = true;
       final boolean rename = uploadedLogsInThisCycle;
       try {
         userUgi.doAs(new PrivilegedExceptionAction<Object>() {
           @Override
           public Object run() throws Exception {
             FileSystem remoteFS = remoteNodeLogFileForApp.getFileSystem(conf);
-            if (remoteFS.exists(remoteNodeTmpLogFileForApp)) {
-              if (rename) {
-                remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath);
-              } else {
-                remoteFS.delete(remoteNodeTmpLogFileForApp, false);
-              }
+            if (rename) {
+              remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath);
+            } else {
+              remoteFS.delete(remoteNodeTmpLogFileForApp, false);
             }
             return null;
           }
@@ -405,33 +413,39 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
         renameTemporaryLogFileFailed = true;
         logAggregationSucceedInThisCycle = false;
       }
-
-      LogAggregationReport report =
-          Records.newRecord(LogAggregationReport.class);
-      report.setApplicationId(appId);
-      report.setDiagnosticMessage(diagnosticMessage);
-      report.setLogAggregationStatus(logAggregationSucceedInThisCycle
-          ? LogAggregationStatus.RUNNING
-          : LogAggregationStatus.RUNNING_WITH_FAILURE);
-      this.context.getLogAggregationStatusForApps().add(report);
+    } finally {
+      LogAggregationStatus logAggregationStatus =
+          logAggregationSucceedInThisCycle
+              ? LogAggregationStatus.RUNNING
+              : LogAggregationStatus.RUNNING_WITH_FAILURE;
+      sendLogAggregationReport(logAggregationStatus, diagnosticMessage);
       if (appFinished) {
         // If the app is finished, one extra final report with log aggregation
         // status SUCCEEDED/FAILED will be sent to RM to inform the RM
         // that the log aggregation in this NM is completed.
-        LogAggregationReport finalReport =
-            Records.newRecord(LogAggregationReport.class);
-        finalReport.setApplicationId(appId);
-        finalReport.setLogAggregationStatus(renameTemporaryLogFileFailed
-            ? LogAggregationStatus.FAILED : LogAggregationStatus.SUCCEEDED);
-        this.context.getLogAggregationStatusForApps().add(finalReport);
+        LogAggregationStatus finalLogAggregationStatus =
+            renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle
+                ? LogAggregationStatus.FAILED
+                : LogAggregationStatus.SUCCEEDED;
+        sendLogAggregationReport(finalLogAggregationStatus, "");
       }
-    } finally {
+
       if (writer != null) {
         writer.close();
       }
     }
   }
 
+  private void sendLogAggregationReport(
+      LogAggregationStatus logAggregationStatus, String diagnosticMessage) {
+    LogAggregationReport report =
+        Records.newRecord(LogAggregationReport.class);
+    report.setApplicationId(appId);
+    report.setDiagnosticMessage(diagnosticMessage);
+    report.setLogAggregationStatus(logAggregationStatus);
+    this.context.getLogAggregationStatusForApps().add(report);
+  }
+
   private void cleanOldLogs() {
     try {
       final FileSystem remoteFS =
@@ -669,4 +683,10 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
   public UserGroupInformation getUgi() {
     return this.userUgi;
   }
+
+  @Private
+  @VisibleForTesting
+  public int getLogAggregationTimes() {
+    return this.logAggregationTimes;
+  }
 }

+ 59 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java

@@ -2270,6 +2270,65 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     logAggregationService.stop();
   }
 
+  @Test (timeout = 20000)
+  public void testSkipUnnecessaryNNOperationsForShortJob() throws Exception {
+    LogAggregationContext logAggregationContext =
+        Records.newRecord(LogAggregationContext.class);
+    logAggregationContext.setLogAggregationPolicyClassName(
+        FailedOrKilledContainerLogAggregationPolicy.class.getName());
+    verifySkipUnnecessaryNNOperations(logAggregationContext, 0, 2);
+  }
+
+  @Test (timeout = 20000)
+  public void testSkipUnnecessaryNNOperationsForService() throws Exception {
+    this.conf.setLong(
+        YarnConfiguration.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS,
+        3600);
+    LogAggregationContext contextWithAMOnly =
+        Records.newRecord(LogAggregationContext.class);
+    contextWithAMOnly.setLogAggregationPolicyClassName(
+        AMOnlyLogAggregationPolicy.class.getName());
+    contextWithAMOnly.setRolledLogsIncludePattern("sys*");
+    contextWithAMOnly.setRolledLogsExcludePattern("std_final");
+    verifySkipUnnecessaryNNOperations(contextWithAMOnly, 1, 4);
+  }
+
+  private void verifySkipUnnecessaryNNOperations(
+      LogAggregationContext logAggregationContext,
+      int expectedLogAggregationTimes, int expectedAggregationReportNum)
+      throws Exception {
+    LogAggregationService logAggregationService = new LogAggregationService(
+        dispatcher, this.context, this.delSrvc, super.dirsHandler);
+    logAggregationService.init(this.conf);
+    logAggregationService.start();
+
+    ApplicationId appId = createApplication();
+    logAggregationService.handle(new LogHandlerAppStartedEvent(appId, this.user,
+        null, this.acls, logAggregationContext));
+
+    // Container finishes
+    String[] logFiles = new String[] { "stdout" };
+    finishContainer(appId, logAggregationService,
+        ContainerType.APPLICATION_MASTER, 1, 0, logFiles);
+    AppLogAggregatorImpl aggregator =
+        (AppLogAggregatorImpl) logAggregationService.getAppLogAggregators()
+            .get(appId);
+    aggregator.doLogAggregationOutOfBand();
+
+    Thread.sleep(2000);
+    aggregator.doLogAggregationOutOfBand();
+    Thread.sleep(2000);
+
+    // App finishes.
+    logAggregationService.handle(new LogHandlerAppFinishedEvent(appId));
+    logAggregationService.stop();
+
+    assertEquals(expectedLogAggregationTimes,
+        aggregator.getLogAggregationTimes());
+    assertEquals(expectedAggregationReportNum,
+        this.context.getLogAggregationStatusForApps().size());
+  }
+
   private int numOfLogsAvailable(LogAggregationService logAggregationService,
       ApplicationId appId, boolean sizeLimited, String lastLogFile)
       throws IOException {