Browse Source

YARN-6339. Improve performance for createAndGetApplicationReport. (Yunjiong Zhao via wangda)

Wangda Tan 8 years ago
parent
commit
cd014d57aa

+ 4 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java

@@ -296,6 +296,8 @@ public class ProtoUtils {
    * Log Aggregation Status
    */
   private static final String LOG_AGGREGATION_STATUS_PREFIX = "LOG_";
+  private static final int LOG_AGGREGATION_STATUS_PREFIX_LEN =
+      LOG_AGGREGATION_STATUS_PREFIX.length();
   public static LogAggregationStatusProto convertToProtoFormat(
       LogAggregationStatus e) {
     return LogAggregationStatusProto.valueOf(LOG_AGGREGATION_STATUS_PREFIX
@@ -304,8 +306,8 @@ public class ProtoUtils {
 
   public static LogAggregationStatus convertFromProtoFormat(
       LogAggregationStatusProto e) {
-    return LogAggregationStatus.valueOf(e.name().replace(
-      LOG_AGGREGATION_STATUS_PREFIX, ""));
+    return LogAggregationStatus.valueOf(e.name().substring(
+        LOG_AGGREGATION_STATUS_PREFIX_LEN));
   }
 
   /*

+ 19 - 13
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -34,6 +34,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
@@ -177,8 +178,8 @@ public class RMAppImpl implements RMApp, Recoverable {
   private long logAggregationStartTime = 0;
   private final long logAggregationStatusTimeout;
   private final Map<NodeId, LogAggregationReport> logAggregationStatus =
-      new HashMap<NodeId, LogAggregationReport>();
-  private LogAggregationStatus logAggregationStatusForAppReport;
+      new ConcurrentHashMap<NodeId, LogAggregationReport>();
+  private volatile LogAggregationStatus logAggregationStatusForAppReport;
   private int logAggregationSucceed = 0;
   private int logAggregationFailed = 0;
   private Map<NodeId, List<String>> logAggregationDiagnosticsForNMs =
@@ -1697,26 +1698,23 @@ public class RMAppImpl implements RMApp, Recoverable {
   public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() {
     try {
       this.readLock.lock();
-      Map<NodeId, LogAggregationReport> outputs =
-          new HashMap<NodeId, LogAggregationReport>();
-      outputs.putAll(logAggregationStatus);
-      if (!isLogAggregationFinished()) {
-        for (Entry<NodeId, LogAggregationReport> output : outputs.entrySet()) {
+      if (!isLogAggregationFinished() && isAppInFinalState(this) &&
+          System.currentTimeMillis() > this.logAggregationStartTime
+          + this.logAggregationStatusTimeout) {
+        for (Entry<NodeId, LogAggregationReport> output :
+            logAggregationStatus.entrySet()) {
           if (!output.getValue().getLogAggregationStatus()
             .equals(LogAggregationStatus.TIME_OUT)
               && !output.getValue().getLogAggregationStatus()
                 .equals(LogAggregationStatus.SUCCEEDED)
               && !output.getValue().getLogAggregationStatus()
-                .equals(LogAggregationStatus.FAILED)
-              && isAppInFinalState(this)
-              && System.currentTimeMillis() > this.logAggregationStartTime
-                  + this.logAggregationStatusTimeout) {
+                .equals(LogAggregationStatus.FAILED)) {
             output.getValue().setLogAggregationStatus(
               LogAggregationStatus.TIME_OUT);
           }
         }
       }
-      return outputs;
+      return Collections.unmodifiableMap(logAggregationStatus);
     } finally {
       this.readLock.unlock();
     }
@@ -1824,11 +1822,17 @@ public class RMAppImpl implements RMApp, Recoverable {
         // the log aggregation is finished. And the log aggregation status will
         // not be updated anymore.
         if (logFailedCount > 0 && isAppInFinalState(this)) {
+          this.logAggregationStatusForAppReport =
+              LogAggregationStatus.FAILED;
           return LogAggregationStatus.FAILED;
         } else if (logTimeOutCount > 0) {
+          this.logAggregationStatusForAppReport =
+              LogAggregationStatus.TIME_OUT;
           return LogAggregationStatus.TIME_OUT;
         }
         if (isAppInFinalState(this)) {
+          this.logAggregationStatusForAppReport =
+              LogAggregationStatus.SUCCEEDED;
           return LogAggregationStatus.SUCCEEDED;
         }
       } else if (logRunningWithFailure > 0) {
@@ -1844,7 +1848,9 @@ public class RMAppImpl implements RMApp, Recoverable {
     return this.logAggregationStatusForAppReport
       .equals(LogAggregationStatus.SUCCEEDED)
         || this.logAggregationStatusForAppReport
-          .equals(LogAggregationStatus.FAILED);
+          .equals(LogAggregationStatus.FAILED)
+        || this.logAggregationStatusForAppReport
+          .equals(LogAggregationStatus.TIME_OUT);
 
   }
 

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java

@@ -413,6 +413,8 @@ public class TestRMAppLogAggregationStatus {
     Assert.assertEquals(LogAggregationStatus.TIME_OUT,
       rmApp.getLogAggregationStatusForAppReport());
 
+    rmApp = (RMAppImpl)createRMApp(conf);
+    rmApp.handle(new RMAppEvent(rmApp.getApplicationId(), RMAppEventType.KILL));
     // If the log aggregation status for all NMs are SUCCEEDED and Application
     // is at the final state, the log aggregation status for this app will
     // return SUCCEEDED