Kaynağa Gözat

YARN-3505. Node's Log Aggregation Report with SUCCEED should not cached in RMApps. Contributed by Xuan Gong.

Junping Du 10 yıl önce
ebeveyn
işleme
15ccd967ee
19 değiştirilmiş dosya ile 469 ekleme ve 279 silme
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationStatus.java
  3. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  4. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
  5. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  6. 1 15
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java
  7. 3 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
  8. 0 40
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java
  9. 41 41
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
  10. 17 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java
  11. 4 10
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
  12. 6 40
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
  13. 14 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
  14. 184 44
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
  15. 5 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
  16. 4 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
  17. 10 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppBlock.java
  18. 25 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java
  19. 131 50
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -111,6 +111,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3448. Added a rolling time-to-live LevelDB timeline store implementation.
     (Jonathan Eagles via zjshen)
 
+    YARN-3505. Node's Log Aggregation Report with SUCCEED should not cached in 
+    RMApps. (Xuan Gong via junping_du)
+
   IMPROVEMENTS
 
     YARN-644. Basic null check is not performed on passed in arguments before

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationStatus.java

@@ -34,6 +34,8 @@ public enum LogAggregationStatus {
   /** Log Aggregation is Running. */
   RUNNING,
 
+  /** Log Aggregation is Running, but has failures in previous cycles. */
+  RUNNING_WITH_FAILURE,
   /**
    * Log Aggregation is Succeeded. All of the logs have been aggregated
    * successfully.

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -718,6 +718,16 @@ public class YarnConfiguration extends Configuration {
       + "proxy-user-privileges.enabled";
   public static boolean DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED = false;
 
+  /**
+   * How many diagnostics/failure messages can be saved in RM for
+   * log aggregation. It also defines the number of diagnostics/failure
+   * messages can be shown in log aggregation web ui.
+   */
+  public static final String RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY =
+      RM_PREFIX + "max-log-aggregation-diagnostics-in-memory";
+  public static final int DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY =
+      10;
+
   /** Whether to enable log aggregation */
   public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
       + "log-aggregation-enable";

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto

@@ -204,6 +204,7 @@ enum LogAggregationStatusProto {
   LOG_SUCCEEDED = 4;
   LOG_FAILED = 5;
   LOG_TIME_OUT = 6;
+  LOG_RUNNING_WITH_FAILURE = 7;
 }
 
 message ApplicationAttemptReportProto {

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -674,6 +674,14 @@
     <value>10</value>
   </property>
 
+  <property>
+    <description>Number of diagnostics/failure messages can be saved in RM for
+    log aggregation. It also defines the number of diagnostics/failure
+    messages can be shown in log aggregation web ui.</description>
+    <name>yarn.resourcemanager.max-log-aggregation-diagnostics-in-memory</name>
+    <value>10</value>
+  </property>
+
   <!-- Node Manager Configs -->
   <property>
     <description>The hostname of the NM.</description>

+ 1 - 15
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java

@@ -22,7 +22,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.util.Records;
 
 /**
@@ -32,7 +31,6 @@ import org.apache.hadoop.yarn.util.Records;
  * It includes details such as:
  * <ul>
  *   <li>{@link ApplicationId} of the application.</li>
- *   <li>{@link NodeId} of the NodeManager.</li>
  *   <li>{@link LogAggregationStatus}</li>
  *   <li>Diagnostic information</li>
  * </ul>
@@ -45,7 +43,7 @@ public abstract class LogAggregationReport {
   @Public
   @Unstable
   public static LogAggregationReport newInstance(ApplicationId appId,
-      NodeId nodeId, LogAggregationStatus status, String diagnosticMessage) {
+      LogAggregationStatus status, String diagnosticMessage) {
     LogAggregationReport report = Records.newRecord(LogAggregationReport.class);
     report.setApplicationId(appId);
     report.setLogAggregationStatus(status);
@@ -65,18 +63,6 @@ public abstract class LogAggregationReport {
   @Unstable
   public abstract void setApplicationId(ApplicationId appId);
 
-  /**
-   * Get the <code>NodeId</code>.
-   * @return <code>NodeId</code>
-   */
-  @Public
-  @Unstable
-  public abstract NodeId getNodeId();
-
-  @Public
-  @Unstable
-  public abstract void setNodeId(NodeId nodeId);
-
   /**
    * Get the <code>LogAggregationStatus</code>.
    * @return <code>LogAggregationStatus</code>

+ 3 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java

@@ -18,10 +18,9 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords;
 
-import java.util.Map;
+import java.util.List;
 import java.util.Set;
 
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.util.Records;
@@ -54,9 +53,9 @@ public abstract class NodeHeartbeatRequest {
   public abstract Set<String> getNodeLabels();
   public abstract void setNodeLabels(Set<String> nodeLabels);
 
-  public abstract Map<ApplicationId, LogAggregationReport>
+  public abstract List<LogAggregationReport>
       getLogAggregationReportsForApps();
 
   public abstract void setLogAggregationReportsForApps(
-      Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps);
+      List<LogAggregationReport> logAggregationReportsForApps);
 }

+ 0 - 40
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java

@@ -22,13 +22,10 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationStatusProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProtoOrBuilder;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
@@ -45,7 +42,6 @@ public class LogAggregationReportPBImpl extends LogAggregationReport {
   boolean viaProto = false;
 
   private ApplicationId applicationId;
-  private NodeId nodeId;
 
   public LogAggregationReportPBImpl() {
     builder = LogAggregationReportProto.newBuilder();
@@ -89,12 +85,6 @@ public class LogAggregationReportPBImpl extends LogAggregationReport {
           builder.getApplicationId())) {
       builder.setApplicationId(convertToProtoFormat(this.applicationId));
     }
-
-    if (this.nodeId != null
-        && !((NodeIdPBImpl) this.nodeId).getProto().equals(
-          builder.getNodeId())) {
-      builder.setNodeId(convertToProtoFormat(this.nodeId));
-    }
   }
 
   private void mergeLocalToProto() {
@@ -191,34 +181,4 @@ public class LogAggregationReportPBImpl extends LogAggregationReport {
     }
     builder.setDiagnostics(diagnosticMessage);
   }
-
-  @Override
-  public NodeId getNodeId() {
-    if (this.nodeId != null) {
-      return this.nodeId;
-    }
-
-    LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder;
-    if (!p.hasNodeId()) {
-      return null;
-    }
-    this.nodeId = convertFromProtoFormat(p.getNodeId());
-    return this.nodeId;
-  }
-
-  @Override
-  public void setNodeId(NodeId nodeId) {
-    maybeInitBuilder();
-    if (nodeId == null)
-      builder.clearNodeId();
-    this.nodeId = nodeId;
-  }
-
-  private NodeIdProto convertToProtoFormat(NodeId t) {
-    return ((NodeIdPBImpl) t).getProto();
-  }
-
-  private NodeIdPBImpl convertFromProtoFormat(NodeIdProto nodeId) {
-    return new NodeIdPBImpl(nodeId);
-  }
 }

+ 41 - 41
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java

@@ -18,21 +18,16 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
 
-import java.util.HashMap;
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
-import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportsForAppsProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
@@ -51,9 +46,8 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
   private MasterKey lastKnownContainerTokenMasterKey = null;
   private MasterKey lastKnownNMTokenMasterKey = null;
   private Set<String> labels = null;
-  private Map<ApplicationId, LogAggregationReport>
-      logAggregationReportsForApps = null;
-  
+  private List<LogAggregationReport> logAggregationReportsForApps = null;
+
   public NodeHeartbeatRequestPBImpl() {
     builder = NodeHeartbeatRequestProto.newBuilder();
   }
@@ -110,12 +104,35 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
   private void addLogAggregationStatusForAppsToProto() {
     maybeInitBuilder();
     builder.clearLogAggregationReportsForApps();
-    for (Entry<ApplicationId, LogAggregationReport> entry : logAggregationReportsForApps
-      .entrySet()) {
-      builder.addLogAggregationReportsForApps(LogAggregationReportsForAppsProto
-        .newBuilder().setAppId(convertToProtoFormat(entry.getKey()))
-        .setLogAggregationReport(convertToProtoFormat(entry.getValue())));
+    if (this.logAggregationReportsForApps == null) {
+      return;
     }
+    Iterable<LogAggregationReportProto> it =
+        new Iterable<LogAggregationReportProto>() {
+          @Override
+          public Iterator<LogAggregationReportProto> iterator() {
+            return new Iterator<LogAggregationReportProto>() {
+              private Iterator<LogAggregationReport> iter =
+                  logAggregationReportsForApps.iterator();
+
+              @Override
+              public boolean hasNext() {
+                return iter.hasNext();
+              }
+
+              @Override
+              public LogAggregationReportProto next() {
+                return convertToProtoFormat(iter.next());
+              }
+
+              @Override
+              public void remove() {
+                throw new UnsupportedOperationException();
+              }
+            };
+          }
+        };
+    builder.addAllLogAggregationReportsForApps(it);
   }
 
   private LogAggregationReportProto convertToProtoFormat(
@@ -246,17 +263,8 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
     labels = new HashSet<String>(nodeLabels.getElementsList());
   }
 
-  private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
-    return new ApplicationIdPBImpl(p);
-  }
-
-  private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
-    return ((ApplicationIdPBImpl) t).getProto();
-  }
-
   @Override
-  public Map<ApplicationId, LogAggregationReport>
-      getLogAggregationReportsForApps() {
+  public List<LogAggregationReport> getLogAggregationReportsForApps() {
     if (this.logAggregationReportsForApps != null) {
       return this.logAggregationReportsForApps;
     }
@@ -266,15 +274,11 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
 
   private void initLogAggregationReportsForApps() {
     NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
-    List<LogAggregationReportsForAppsProto> list =
+    List<LogAggregationReportProto> list =
         p.getLogAggregationReportsForAppsList();
-    this.logAggregationReportsForApps =
-        new HashMap<ApplicationId, LogAggregationReport>();
-    for (LogAggregationReportsForAppsProto c : list) {
-      ApplicationId appId = convertFromProtoFormat(c.getAppId());
-      LogAggregationReport report =
-          convertFromProtoFormat(c.getLogAggregationReport());
-      this.logAggregationReportsForApps.put(appId, report);
+    this.logAggregationReportsForApps = new ArrayList<LogAggregationReport>();
+    for (LogAggregationReportProto c : list) {
+      this.logAggregationReportsForApps.add(convertFromProtoFormat(c));
     }
   }
 
@@ -285,14 +289,10 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
 
   @Override
   public void setLogAggregationReportsForApps(
-      Map<ApplicationId, LogAggregationReport> logAggregationStatusForApps) {
-    if (logAggregationStatusForApps == null
-        || logAggregationStatusForApps.isEmpty()) {
-      return;
+      List<LogAggregationReport> logAggregationStatusForApps) {
+    if(logAggregationStatusForApps == null) {
+      builder.clearLogAggregationReportsForApps();
     }
-    maybeInitBuilder();
-    this.logAggregationReportsForApps =
-        new HashMap<ApplicationId, LogAggregationReport>();
-    this.logAggregationReportsForApps.putAll(logAggregationStatusForApps);
+    this.logAggregationReportsForApps = logAggregationStatusForApps;
   }
 }  

+ 17 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java

@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerReport;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
@@ -192,8 +193,17 @@ public class AppBlock extends HtmlBlock {
             : "ApplicationMaster");
     if (webUiType != null
         && webUiType.equals(YarnWebParams.RM_WEB_UI)) {
-      overviewTable._("Log Aggregation Status",
-        root_url("logaggregationstatus", app.getAppId()), "Status");
+      LogAggregationStatus status = getLogAggregationStatus();
+      if (status == null) {
+        overviewTable._("Log Aggregation Status", "N/A");
+      } else if (status == LogAggregationStatus.DISABLED
+          || status == LogAggregationStatus.NOT_START
+          || status == LogAggregationStatus.SUCCEEDED) {
+        overviewTable._("Log Aggregation Status", status.name());
+      } else {
+        overviewTable._("Log Aggregation Status",
+            root_url("logaggregationstatus", app.getAppId()), status.name());
+      }
     }
     overviewTable._("Diagnostics:",
         app.getDiagnosticsInfo() == null ? "" : app.getDiagnosticsInfo());
@@ -342,4 +352,9 @@ public class AppBlock extends HtmlBlock {
   protected void createApplicationMetricsTable(Block html) {
 
   }
+
+  // This will be overrided in RMAppBlock
+  protected LogAggregationStatus getLogAggregationStatus() {
+    return null;
+  }
 }

+ 4 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto

@@ -50,19 +50,13 @@ message NodeHeartbeatRequestProto {
   optional MasterKeyProto last_known_container_token_master_key = 2;
   optional MasterKeyProto last_known_nm_token_master_key = 3;
   optional StringArrayProto nodeLabels = 4;
-  repeated LogAggregationReportsForAppsProto log_aggregation_reports_for_apps = 5;
-}
-
-message LogAggregationReportsForAppsProto {
-  optional ApplicationIdProto appId = 1;
-  optional LogAggregationReportProto log_aggregation_report = 2;
+  repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5;
 }
 
 message LogAggregationReportProto {
-optional ApplicationIdProto application_id = 1;
-optional NodeIdProto node_id = 2;
-optional LogAggregationStatusProto log_aggregation_status = 3;
-optional string diagnostics = 4 [default = "N/A"];
+  optional ApplicationIdProto application_id = 1;
+  optional LogAggregationStatusProto log_aggregation_status = 2;
+  optional string diagnostics = 3 [default = "N/A"];
 }
 
 message NodeHeartbeatResponseProto {

+ 6 - 40
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -75,7 +75,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
-import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -666,7 +665,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
 
             if (logAggregationEnabled) {
               // pull log aggregation status for application running in this NM
-              Map<ApplicationId, LogAggregationReport> logAggregationReports =
+              List<LogAggregationReport> logAggregationReports =
                   getLogAggregationReportsForApps(context
                     .getLogAggregationStatusForApps());
               if (logAggregationReports != null
@@ -810,47 +809,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     statusUpdater.start();
   }
 
-  private Map<ApplicationId, LogAggregationReport>
-      getLogAggregationReportsForApps(
-          ConcurrentLinkedQueue<LogAggregationReport> lastestLogAggregationStatus) {
-    Map<ApplicationId, LogAggregationReport> latestLogAggregationReports =
-        new HashMap<ApplicationId, LogAggregationReport>();
+  private List<LogAggregationReport> getLogAggregationReportsForApps(
+      ConcurrentLinkedQueue<LogAggregationReport> lastestLogAggregationStatus) {
     LogAggregationReport status;
     while ((status = lastestLogAggregationStatus.poll()) != null) {
       this.logAggregationReportForAppsTempList.add(status);
     }
-    for (LogAggregationReport logAggregationReport
-        : this.logAggregationReportForAppsTempList) {
-      LogAggregationReport report = null;
-      if (latestLogAggregationReports.containsKey(logAggregationReport
-        .getApplicationId())) {
-        report =
-            latestLogAggregationReports.get(logAggregationReport
-              .getApplicationId());
-        report.setLogAggregationStatus(logAggregationReport
-          .getLogAggregationStatus());
-        String message = report.getDiagnosticMessage();
-        if (logAggregationReport.getDiagnosticMessage() != null
-            && !logAggregationReport.getDiagnosticMessage().isEmpty()) {
-          if (message != null) {
-            message += logAggregationReport.getDiagnosticMessage();
-          } else {
-            message = logAggregationReport.getDiagnosticMessage();
-          }
-          report.setDiagnosticMessage(message);
-        }
-      } else {
-        report = Records.newRecord(LogAggregationReport.class);
-        report.setApplicationId(logAggregationReport.getApplicationId());
-        report.setNodeId(this.nodeId);
-        report.setLogAggregationStatus(logAggregationReport
-          .getLogAggregationStatus());
-        report
-          .setDiagnosticMessage(logAggregationReport.getDiagnosticMessage());
-      }
-      latestLogAggregationReports.put(logAggregationReport.getApplicationId(),
-        report);
-    }
-    return latestLogAggregationReports;
+    List<LogAggregationReport> reports = new ArrayList<LogAggregationReport>();
+    reports.addAll(logAggregationReportForAppsTempList);
+    return reports;
   }
 }

+ 14 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java

@@ -306,6 +306,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
                     + currentTime);
 
       String diagnosticMessage = "";
+      boolean logAggregationSucceedInThisCycle = true;
       final boolean rename = uploadedLogsInThisCycle;
       try {
         userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@@ -338,20 +339,28 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
                 + LogAggregationUtils.getNodeString(nodeId) + " at "
                 + Times.format(currentTime) + "\n";
         renameTemporaryLogFileFailed = true;
+        logAggregationSucceedInThisCycle = false;
       }
 
       LogAggregationReport report =
           Records.newRecord(LogAggregationReport.class);
       report.setApplicationId(appId);
-      report.setNodeId(nodeId);
       report.setDiagnosticMessage(diagnosticMessage);
+      report.setLogAggregationStatus(logAggregationSucceedInThisCycle
+          ? LogAggregationStatus.RUNNING
+          : LogAggregationStatus.RUNNING_WITH_FAILURE);
+      this.context.getLogAggregationStatusForApps().add(report);
       if (appFinished) {
-        report.setLogAggregationStatus(renameTemporaryLogFileFailed
+        // If the app is finished, one extra final report with log aggregation
+        // status SUCCEEDED/FAILED will be sent to RM to inform the RM
+        // that the log aggregation in this NM is completed.
+        LogAggregationReport finalReport =
+            Records.newRecord(LogAggregationReport.class);
+        finalReport.setApplicationId(appId);
+        finalReport.setLogAggregationStatus(renameTemporaryLogFileFailed
             ? LogAggregationStatus.FAILED : LogAggregationStatus.SUCCEEDED);
-      } else {
-        report.setLogAggregationStatus(LogAggregationStatus.RUNNING);
+        this.context.getLogAggregationStatusForApps().add(report);
       }
-      this.context.getLogAggregationStatusForApps().add(report);
     } finally {
       if (writer != null) {
         writer.close();

+ 184 - 44
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -22,12 +22,15 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -36,6 +39,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -152,6 +156,13 @@ public class RMAppImpl implements RMApp, Recoverable {
   private final Map<NodeId, LogAggregationReport> logAggregationStatus =
       new HashMap<NodeId, LogAggregationReport>();
   private LogAggregationStatus logAggregationStatusForAppReport;
+  private int logAggregationSucceed = 0;
+  private int logAggregationFailed = 0;
+  private Map<NodeId, List<String>> logAggregationDiagnosticsForNMs =
+      new HashMap<NodeId, List<String>>();
+  private Map<NodeId, List<String>> logAggregationFailureMessagesForNMs =
+      new HashMap<NodeId, List<String>>();
+  private final int maxLogAggregationDiagnosticsInMemory;
 
   // These states stored are only valid when app is at killing or final_saving.
   private RMAppState stateBeforeKilling;
@@ -437,6 +448,14 @@ public class RMAppImpl implements RMApp, Recoverable {
     this.logAggregationEnabled =
         conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
           YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
+    if (this.logAggregationEnabled) {
+      this.logAggregationStatusForAppReport = LogAggregationStatus.NOT_START;
+    } else {
+      this.logAggregationStatusForAppReport = LogAggregationStatus.DISABLED;
+    }
+    maxLogAggregationDiagnosticsInMemory = conf.getInt(
+        YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
+        YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
   }
 
   @Override
@@ -834,10 +853,9 @@ public class RMAppImpl implements RMApp, Recoverable {
 
       if (!app.logAggregationStatus.containsKey(nodeAddedEvent.getNodeId())) {
         app.logAggregationStatus.put(nodeAddedEvent.getNodeId(),
-          LogAggregationReport.newInstance(app.applicationId, nodeAddedEvent
-            .getNodeId(), app.logAggregationEnabled
-              ? LogAggregationStatus.NOT_START : LogAggregationStatus.DISABLED,
-            ""));
+          LogAggregationReport.newInstance(app.applicationId,
+            app.logAggregationEnabled ? LogAggregationStatus.NOT_START
+                : LogAggregationStatus.DISABLED, ""));
       }
     };
   }
@@ -1401,18 +1419,20 @@ public class RMAppImpl implements RMApp, Recoverable {
       Map<NodeId, LogAggregationReport> outputs =
           new HashMap<NodeId, LogAggregationReport>();
       outputs.putAll(logAggregationStatus);
-      for (Entry<NodeId, LogAggregationReport> output : outputs.entrySet()) {
-        if (!output.getValue().getLogAggregationStatus()
-          .equals(LogAggregationStatus.TIME_OUT)
-            && !output.getValue().getLogAggregationStatus()
-              .equals(LogAggregationStatus.SUCCEEDED)
-            && !output.getValue().getLogAggregationStatus()
-              .equals(LogAggregationStatus.FAILED)
-            && isAppInFinalState(this)
-            && System.currentTimeMillis() > this.logAggregationStartTime
-                + this.logAggregationStatusTimeout) {
-          output.getValue().setLogAggregationStatus(
-            LogAggregationStatus.TIME_OUT);
+      if (!isLogAggregationFinished()) {
+        for (Entry<NodeId, LogAggregationReport> output : outputs.entrySet()) {
+          if (!output.getValue().getLogAggregationStatus()
+            .equals(LogAggregationStatus.TIME_OUT)
+              && !output.getValue().getLogAggregationStatus()
+                .equals(LogAggregationStatus.SUCCEEDED)
+              && !output.getValue().getLogAggregationStatus()
+                .equals(LogAggregationStatus.FAILED)
+              && isAppInFinalState(this)
+              && System.currentTimeMillis() > this.logAggregationStartTime
+                  + this.logAggregationStatusTimeout) {
+            output.getValue().setLogAggregationStatus(
+              LogAggregationStatus.TIME_OUT);
+          }
         }
       }
       return outputs;
@@ -1424,32 +1444,46 @@ public class RMAppImpl implements RMApp, Recoverable {
   public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) {
     try {
       this.writeLock.lock();
-      if (this.logAggregationEnabled) {
+      if (this.logAggregationEnabled && !isLogAggregationFinished()) {
         LogAggregationReport curReport = this.logAggregationStatus.get(nodeId);
+        boolean stateChangedToFinal = false;
         if (curReport == null) {
           this.logAggregationStatus.put(nodeId, report);
+          if (isLogAggregationFinishedForNM(report)) {
+            stateChangedToFinal = true;
+          }
         } else {
-          if (curReport.getLogAggregationStatus().equals(
-            LogAggregationStatus.TIME_OUT)) {
-            if (report.getLogAggregationStatus().equals(
-              LogAggregationStatus.SUCCEEDED)
-                || report.getLogAggregationStatus().equals(
-                  LogAggregationStatus.FAILED)) {
-              curReport.setLogAggregationStatus(report
-                .getLogAggregationStatus());
+          if (isLogAggregationFinishedForNM(report)) {
+            if (!isLogAggregationFinishedForNM(curReport)) {
+              stateChangedToFinal = true;
             }
-          } else {
-            curReport.setLogAggregationStatus(report.getLogAggregationStatus());
           }
-
-          if (report.getDiagnosticMessage() != null
-              && !report.getDiagnosticMessage().isEmpty()) {
-            curReport
-              .setDiagnosticMessage(curReport.getDiagnosticMessage() == null
-                  ? report.getDiagnosticMessage() : curReport
-                    .getDiagnosticMessage() + report.getDiagnosticMessage());
+          if (report.getLogAggregationStatus() != LogAggregationStatus.RUNNING
+              || curReport.getLogAggregationStatus() !=
+                  LogAggregationStatus.RUNNING_WITH_FAILURE) {
+            if (curReport.getLogAggregationStatus()
+                == LogAggregationStatus.TIME_OUT
+                && report.getLogAggregationStatus()
+                    == LogAggregationStatus.RUNNING) {
+            // If the log aggregation status got from latest nm heartbeat
+            // is Running, and current log aggregation status is TimeOut,
+            // based on whether there are any failure messages for this NM,
+            // we will reset the log aggregation status as RUNNING or
+            // RUNNING_WITH_FAILURE
+              if (logAggregationFailureMessagesForNMs.get(nodeId) != null &&
+                  !logAggregationFailureMessagesForNMs.get(nodeId).isEmpty()) {
+                report.setLogAggregationStatus(
+                    LogAggregationStatus.RUNNING_WITH_FAILURE);
+              }
+            }
+            curReport.setLogAggregationStatus(report
+              .getLogAggregationStatus());
           }
         }
+        updateLogAggregationDiagnosticMessages(nodeId, report);
+        if (isAppInFinalState(this) && stateChangedToFinal) {
+          updateLogAggregationStatus(nodeId);
+        }
       }
     } finally {
       this.writeLock.unlock();
@@ -1458,29 +1492,32 @@ public class RMAppImpl implements RMApp, Recoverable {
 
   @Override
   public LogAggregationStatus getLogAggregationStatusForAppReport() {
-    if (!logAggregationEnabled) {
-      return LogAggregationStatus.DISABLED;
-    }
-    if (this.logAggregationStatusForAppReport == LogAggregationStatus.FAILED
-        || this.logAggregationStatusForAppReport == LogAggregationStatus.SUCCEEDED) {
-      return this.logAggregationStatusForAppReport;
-    }
     try {
       this.readLock.lock();
+      if (! logAggregationEnabled) {
+        return LogAggregationStatus.DISABLED;
+      }
+      if (isLogAggregationFinished()) {
+        return this.logAggregationStatusForAppReport;
+      }
       Map<NodeId, LogAggregationReport> reports =
           getLogAggregationReportsForApp();
       if (reports.size() == 0) {
-        return null;
+        return this.logAggregationStatusForAppReport;
       }
       int logNotStartCount = 0;
       int logCompletedCount = 0;
       int logTimeOutCount = 0;
       int logFailedCount = 0;
+      int logRunningWithFailure = 0;
       for (Entry<NodeId, LogAggregationReport> report : reports.entrySet()) {
         switch (report.getValue().getLogAggregationStatus()) {
           case NOT_START:
             logNotStartCount++;
             break;
+          case RUNNING_WITH_FAILURE:
+            logRunningWithFailure ++;
+            break;
           case SUCCEEDED:
             logCompletedCount++;
             break;
@@ -1506,19 +1543,122 @@ public class RMAppImpl implements RMApp, Recoverable {
         // the log aggregation is finished. And the log aggregation status will
         // not be updated anymore.
         if (logFailedCount > 0 && isAppInFinalState(this)) {
-          this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED;
           return LogAggregationStatus.FAILED;
         } else if (logTimeOutCount > 0) {
           return LogAggregationStatus.TIME_OUT;
         }
         if (isAppInFinalState(this)) {
-          this.logAggregationStatusForAppReport = LogAggregationStatus.SUCCEEDED;
           return LogAggregationStatus.SUCCEEDED;
         }
+      } else if (logRunningWithFailure > 0) {
+        return LogAggregationStatus.RUNNING_WITH_FAILURE;
       }
       return LogAggregationStatus.RUNNING;
     } finally {
       this.readLock.unlock();
     }
   }
+
+  private boolean isLogAggregationFinished() {
+    return this.logAggregationStatusForAppReport
+      .equals(LogAggregationStatus.SUCCEEDED)
+        || this.logAggregationStatusForAppReport
+          .equals(LogAggregationStatus.FAILED);
+
+  }
+
+  private boolean isLogAggregationFinishedForNM(LogAggregationReport report) {
+    return report.getLogAggregationStatus() == LogAggregationStatus.SUCCEEDED
+        || report.getLogAggregationStatus() == LogAggregationStatus.FAILED;
+  }
+
+  private void updateLogAggregationDiagnosticMessages(NodeId nodeId,
+      LogAggregationReport report) {
+    if (report.getDiagnosticMessage() != null
+        && !report.getDiagnosticMessage().isEmpty()) {
+      if (report.getLogAggregationStatus()
+          == LogAggregationStatus.RUNNING ) {
+        List<String> diagnostics = logAggregationDiagnosticsForNMs.get(nodeId);
+        if (diagnostics == null) {
+          diagnostics = new ArrayList<String>();
+          logAggregationDiagnosticsForNMs.put(nodeId, diagnostics);
+        } else {
+          if (diagnostics.size()
+              == maxLogAggregationDiagnosticsInMemory) {
+            diagnostics.remove(0);
+          }
+        }
+        diagnostics.add(report.getDiagnosticMessage());
+        this.logAggregationStatus.get(nodeId).setDiagnosticMessage(
+          StringUtils.join(diagnostics, "\n"));
+      } else if (report.getLogAggregationStatus()
+          == LogAggregationStatus.RUNNING_WITH_FAILURE) {
+        List<String> failureMessages =
+            logAggregationFailureMessagesForNMs.get(nodeId);
+        if (failureMessages == null) {
+          failureMessages = new ArrayList<String>();
+          logAggregationFailureMessagesForNMs.put(nodeId, failureMessages);
+        } else {
+          if (failureMessages.size()
+              == maxLogAggregationDiagnosticsInMemory) {
+            failureMessages.remove(0);
+          }
+        }
+        failureMessages.add(report.getDiagnosticMessage());
+      }
+    }
+  }
+
+  private void updateLogAggregationStatus(NodeId nodeId) {
+    LogAggregationStatus status =
+        this.logAggregationStatus.get(nodeId).getLogAggregationStatus();
+    if (status.equals(LogAggregationStatus.SUCCEEDED)) {
+      this.logAggregationSucceed++;
+    } else if (status.equals(LogAggregationStatus.FAILED)) {
+      this.logAggregationFailed++;
+    }
+    if (this.logAggregationSucceed == this.logAggregationStatus.size()) {
+      this.logAggregationStatusForAppReport =
+          LogAggregationStatus.SUCCEEDED;
+      // Since the log aggregation status for this application for all NMs
+      // is SUCCEEDED, it means all logs are aggregated successfully.
+      // We could remove all the cached log aggregation reports
+      this.logAggregationStatus.clear();
+      this.logAggregationDiagnosticsForNMs.clear();
+      this.logAggregationFailureMessagesForNMs.clear();
+    } else if (this.logAggregationSucceed + this.logAggregationFailed
+        == this.logAggregationStatus.size()) {
+      this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED;
+      // We have collected the log aggregation status for all NMs.
+      // The log aggregation status is FAILED which means the log
+      // aggregation fails in some NMs. We are only interested in the
+      // nodes where the log aggregation is failed. So we could remove
+      // the log aggregation details for those succeeded NMs
+      for (Iterator<Map.Entry<NodeId, LogAggregationReport>> it =
+          this.logAggregationStatus.entrySet().iterator(); it.hasNext();) {
+        Map.Entry<NodeId, LogAggregationReport> entry = it.next();
+        if (entry.getValue().getLogAggregationStatus()
+          .equals(LogAggregationStatus.SUCCEEDED)) {
+          it.remove();
+        }
+      }
+      // the log aggregation has finished/failed.
+      // and the status will not be updated anymore.
+      this.logAggregationDiagnosticsForNMs.clear();
+    }
+  }
+
+  public String getLogAggregationFailureMessagesForNM(NodeId nodeId) {
+    try {
+      this.readLock.lock();
+      List<String> failureMessages =
+          this.logAggregationFailureMessagesForNMs.get(nodeId);
+      if (failureMessages == null || failureMessages.isEmpty()) {
+        return StringUtils.EMPTY;
+      }
+      return StringUtils.join(failureMessages, "\n");
+    } finally {
+      this.readLock.unlock();
+    }
+  }
 }

+ 5 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -22,8 +22,6 @@ import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -777,7 +775,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
       rmNode.handleContainerStatus(statusEvent.getContainers());
 
-      Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps =
+      List<LogAggregationReport> logAggregationReportsForApps =
           statusEvent.getLogAggregationReportsForApps();
       if (logAggregationReportsForApps != null
           && !logAggregationReportsForApps.isEmpty()) {
@@ -915,12 +913,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   }
 
   private void handleLogAggregationStatus(
-      Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps) {
-    for (Entry<ApplicationId, LogAggregationReport> report :
-        logAggregationReportsForApps.entrySet()) {
-      RMApp rmApp = this.context.getRMApps().get(report.getKey());
+      List<LogAggregationReport> logAggregationReportsForApps) {
+    for (LogAggregationReport report : logAggregationReportsForApps) {
+      RMApp rmApp = this.context.getRMApps().get(report.getApplicationId());
       if (rmApp != null) {
-        ((RMAppImpl)rmApp).aggregateLogReport(this.nodeId, report.getValue());
+        ((RMAppImpl)rmApp).aggregateLogReport(this.nodeId, report);
       }
     }
   }

+ 4 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java

@@ -19,8 +19,6 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
 
 import java.util.List;
-import java.util.Map;
-
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -34,7 +32,7 @@ public class RMNodeStatusEvent extends RMNodeEvent {
   private final List<ContainerStatus> containersCollection;
   private final NodeHeartbeatResponse latestResponse;
   private final List<ApplicationId> keepAliveAppIds;
-  private Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps;
+  private List<LogAggregationReport> logAggregationReportsForApps;
 
   public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
       List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
@@ -50,7 +48,7 @@ public class RMNodeStatusEvent extends RMNodeEvent {
   public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
       List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
       NodeHeartbeatResponse latestResponse,
-      Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps) {
+      List<LogAggregationReport> logAggregationReportsForApps) {
     super(nodeId, RMNodeEventType.STATUS_UPDATE);
     this.nodeHealthStatus = nodeHealthStatus;
     this.containersCollection = collection;
@@ -75,13 +73,12 @@ public class RMNodeStatusEvent extends RMNodeEvent {
     return this.keepAliveAppIds;
   }
 
-  public Map<ApplicationId, LogAggregationReport>
-      getLogAggregationReportsForApps() {
+  public List<LogAggregationReport> getLogAggregationReportsForApps() {
     return this.logAggregationReportsForApps;
   }
 
   public void setLogAggregationReportsForApps(
-      Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps) {
+      List<LogAggregationReport> logAggregationReportsForApps) {
     this.logAggregationReportsForApps = logAggregationReportsForApps;
   }
 }

+ 10 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppBlock.java

@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -34,7 +35,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
 import org.apache.hadoop.yarn.server.webapp.AppBlock;
-import org.apache.hadoop.yarn.util.Times;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
@@ -170,4 +170,13 @@ public class RMAppBlock extends AppBlock{
 
     tbody._()._();
   }
+
+  @Override
+  protected LogAggregationStatus getLogAggregationStatus() {
+    RMApp rmApp = this.rm.getRMContext().getRMApps().get(appID);
+    if (rmApp == null) {
+      return null;
+    }
+    return rmApp.getLogAggregationStatusForAppReport();
+  }
 }

+ 25 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java

@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
@@ -93,6 +94,9 @@ public class RMAppLogAggregationStatusBlock extends HtmlBlock {
       .td("Log Aggregation does not Start.")._();
     table_description.tr().td(LogAggregationStatus.RUNNING.name())
       .td("Log Aggregation is Running.")._();
+    table_description.tr().td(LogAggregationStatus.RUNNING_WITH_FAILURE.name())
+      .td("Log Aggregation is Running, but has failures "
+          + "in previous cycles")._();
     table_description.tr().td(LogAggregationStatus.SUCCEEDED.name())
       .td("Log Aggregation is Succeeded. All of the logs have been "
           + "aggregated successfully.")._();
@@ -106,24 +110,29 @@ public class RMAppLogAggregationStatusBlock extends HtmlBlock {
     table_description._();
     div_description._();
 
-    boolean logAggregationEnabled =
-        conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
-          YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
+    RMApp rmApp = rm.getRMContext().getRMApps().get(appId);
     // Application Log aggregation status Table
     DIV<Hamlet> div = html.div(_INFO_WRAP);
     TABLE<DIV<Hamlet>> table =
         div.h3(
           "Log Aggregation: "
-              + (logAggregationEnabled ? "Enabled" : "Disabled")).table(
+              + (rmApp == null ? "N/A" : rmApp
+                .getLogAggregationStatusForAppReport() == null ? "N/A" : rmApp
+                .getLogAggregationStatusForAppReport().name())).table(
           "#LogAggregationStatus");
-    table.
-      tr().
-        th(_TH, "NodeId").
-        th(_TH, "Log Aggregation Status").
-        th(_TH, "Diagnostis Message").
-      _();
 
-    RMApp rmApp = rm.getRMContext().getRMApps().get(appId);
+    int maxLogAggregationDiagnosticsInMemory = conf.getInt(
+      YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
+      YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
+    table
+      .tr()
+      .th(_TH, "NodeId")
+      .th(_TH, "Log Aggregation Status")
+      .th(_TH, "Last "
+          + maxLogAggregationDiagnosticsInMemory + " Diagnostic Messages")
+      .th(_TH, "Last "
+          + maxLogAggregationDiagnosticsInMemory + " Failure Messages")._();
+
     if (rmApp != null) {
       Map<NodeId, LogAggregationReport> logAggregationReports =
           rmApp.getLogAggregationReportsForApp();
@@ -136,10 +145,14 @@ public class RMAppLogAggregationStatusBlock extends HtmlBlock {
           String message =
               report.getValue() == null ? null : report.getValue()
                 .getDiagnosticMessage();
+          String failureMessage =
+              report.getValue() == null ? null : ((RMAppImpl)rmApp)
+                  .getLogAggregationFailureMessagesForNM(report.getKey());
           table.tr()
             .td(report.getKey().toString())
             .td(status == null ? "N/A" : status.toString())
-            .td(message == null ? "N/A" : message)._();
+            .td(message == null ? "N/A" : message)
+            .td(failureMessage == null ? "N/A" : failureMessage)._();
         }
       }
     }

+ 131 - 50
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java

@@ -23,7 +23,7 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
@@ -155,26 +155,26 @@ public class TestRMAppLogAggregationStatus {
         .getLogAggregationStatus());
     }
 
-    Map<ApplicationId, LogAggregationReport> node1ReportForApp =
-        new HashMap<ApplicationId, LogAggregationReport>();
+    List<LogAggregationReport> node1ReportForApp =
+        new ArrayList<LogAggregationReport>();
     String messageForNode1_1 =
         "node1 logAggregation status updated at " + System.currentTimeMillis();
     LogAggregationReport report1 =
-        LogAggregationReport.newInstance(appId, nodeId1,
-          LogAggregationStatus.RUNNING, messageForNode1_1);
-    node1ReportForApp.put(appId, report1);
+        LogAggregationReport.newInstance(appId, LogAggregationStatus.RUNNING,
+          messageForNode1_1);
+    node1ReportForApp.add(report1);
     node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
       .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
       null, node1ReportForApp));
 
-    Map<ApplicationId, LogAggregationReport> node2ReportForApp =
-        new HashMap<ApplicationId, LogAggregationReport>();
+    List<LogAggregationReport> node2ReportForApp =
+        new ArrayList<LogAggregationReport>();
     String messageForNode2_1 =
         "node2 logAggregation status updated at " + System.currentTimeMillis();
     LogAggregationReport report2 =
-        LogAggregationReport.newInstance(appId, nodeId2,
+        LogAggregationReport.newInstance(appId,
           LogAggregationStatus.RUNNING, messageForNode2_1);
-    node2ReportForApp.put(appId, report2);
+    node2ReportForApp.add(report2);
     node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus
       .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
       null, node2ReportForApp));
@@ -205,14 +205,14 @@ public class TestRMAppLogAggregationStatus {
     }
 
     // node1 updates its log aggregation status again
-    Map<ApplicationId, LogAggregationReport> node1ReportForApp2 =
-        new HashMap<ApplicationId, LogAggregationReport>();
+    List<LogAggregationReport> node1ReportForApp2 =
+        new ArrayList<LogAggregationReport>();
     String messageForNode1_2 =
         "node1 logAggregation status updated at " + System.currentTimeMillis();
     LogAggregationReport report1_2 =
-        LogAggregationReport.newInstance(appId, nodeId1,
+        LogAggregationReport.newInstance(appId,
           LogAggregationStatus.RUNNING, messageForNode1_2);
-    node1ReportForApp2.put(appId, report1_2);
+    node1ReportForApp2.add(report1_2);
     node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
       .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
       null, node1ReportForApp2));
@@ -230,8 +230,9 @@ public class TestRMAppLogAggregationStatus {
       if (report.getKey().equals(node1.getNodeID())) {
         Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
           .getLogAggregationStatus());
-        Assert.assertEquals(messageForNode1_1 + messageForNode1_2, report
-          .getValue().getDiagnosticMessage());
+        Assert.assertEquals(
+          messageForNode1_1 + "\n" + messageForNode1_2, report
+            .getValue().getDiagnosticMessage());
       } else if (report.getKey().equals(node2.getNodeID())) {
         Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
           .getLogAggregationStatus());
@@ -268,15 +269,19 @@ public class TestRMAppLogAggregationStatus {
     // Finally, node1 finished its log aggregation and sent out its final
     // log aggregation status. The log aggregation status for node1 should
     // be changed from TIME_OUT to SUCCEEDED
-    Map<ApplicationId, LogAggregationReport> node1ReportForApp3 =
-        new HashMap<ApplicationId, LogAggregationReport>();
-    String messageForNode1_3 =
-        "node1 final logAggregation status updated at "
-            + System.currentTimeMillis();
-    LogAggregationReport report1_3 =
-        LogAggregationReport.newInstance(appId, nodeId1,
-          LogAggregationStatus.SUCCEEDED, messageForNode1_3);
-    node1ReportForApp3.put(appId, report1_3);
+    List<LogAggregationReport> node1ReportForApp3 =
+        new ArrayList<LogAggregationReport>();
+    LogAggregationReport report1_3;
+    for (int i = 0; i < 10 ; i ++) {
+      report1_3 =
+          LogAggregationReport.newInstance(appId,
+            LogAggregationStatus.RUNNING, "test_message_" + i);
+      node1ReportForApp3.add(report1_3);
+    }
+    node1ReportForApp3.add(LogAggregationReport.newInstance(appId,
+      LogAggregationStatus.SUCCEEDED, ""));
+    // For every logAggregationReport cached in memory, we can only save at most
+    // 10 diagnostic messages/failure messages
     node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
       .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
       null, node1ReportForApp3));
@@ -290,8 +295,14 @@ public class TestRMAppLogAggregationStatus {
       if (report.getKey().equals(node1.getNodeID())) {
         Assert.assertEquals(LogAggregationStatus.SUCCEEDED, report.getValue()
           .getLogAggregationStatus());
-        Assert.assertEquals(messageForNode1_1 + messageForNode1_2
-            + messageForNode1_3, report.getValue().getDiagnosticMessage());
+        StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < 9; i ++) {
+          builder.append("test_message_" + i);
+          builder.append("\n");
+        }
+        builder.append("test_message_" + 9);
+        Assert.assertEquals(builder.toString(), report.getValue()
+          .getDiagnosticMessage());
       } else if (report.getKey().equals(node2.getNodeID())) {
         Assert.assertEquals(LogAggregationStatus.TIME_OUT, report.getValue()
           .getLogAggregationStatus());
@@ -301,6 +312,32 @@ public class TestRMAppLogAggregationStatus {
           .fail("should not contain log aggregation report for other nodes");
       }
     }
+
+    // update log aggregationStatus for node2 as FAILED,
+    // so the log aggregation status for the App will become FAILED,
+    // and we only keep the log aggregation reports whose status is FAILED,
+    // so the log aggregation report for node1 will be removed.
+    List<LogAggregationReport> node2ReportForApp2 =
+        new ArrayList<LogAggregationReport>();
+    LogAggregationReport report2_2 =
+        LogAggregationReport.newInstance(appId,
+          LogAggregationStatus.RUNNING_WITH_FAILURE, "Fail_Message");
+    LogAggregationReport report2_3 =
+        LogAggregationReport.newInstance(appId,
+          LogAggregationStatus.FAILED, "");
+    node2ReportForApp2.add(report2_2);
+    node2ReportForApp2.add(report2_3);
+    node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus
+      .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
+      null, node2ReportForApp2));
+    Assert.assertEquals(LogAggregationStatus.FAILED,
+      rmApp.getLogAggregationStatusForAppReport());
+    logAggregationStatus = rmApp.getLogAggregationReportsForApp();
+    Assert.assertTrue(logAggregationStatus.size() == 1);
+    Assert.assertTrue(logAggregationStatus.containsKey(node2.getNodeID()));
+    Assert.assertTrue(!logAggregationStatus.containsKey(node1.getNodeID()));
+    Assert.assertEquals("Fail_Message",
+      ((RMAppImpl)rmApp).getLogAggregationFailureMessagesForNM(nodeId2));
   }
 
   @Test (timeout = 10000)
@@ -317,9 +354,11 @@ public class TestRMAppLogAggregationStatus {
     // Enable the log aggregation
     conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
     rmApp = (RMAppImpl)createRMApp(conf);
-    // If we do not know any NodeManagers for this application ,
-    // the log aggregation status will return null
-    Assert.assertNull(rmApp.getLogAggregationStatusForAppReport());
+    // If we do not know any NodeManagers for this application , and
+    // the log aggregation is enabled, the log aggregation status will
+    // return NOT_START
+    Assert.assertEquals(LogAggregationStatus.NOT_START,
+      rmApp.getLogAggregationStatusForAppReport());
 
     NodeId nodeId1 = NodeId.newInstance("localhost", 1111);
     NodeId nodeId2 = NodeId.newInstance("localhost", 2222);
@@ -329,24 +368,24 @@ public class TestRMAppLogAggregationStatus {
     // If the log aggregation status for all NMs are NOT_START,
     // the log aggregation status for this app will return NOT_START
     rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
     rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
     rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
     rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
     Assert.assertEquals(LogAggregationStatus.NOT_START,
       rmApp.getLogAggregationStatusForAppReport());
 
     rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.NOT_START, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
     rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.RUNNING, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
     rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
     rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.FAILED, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
     Assert.assertEquals(LogAggregationStatus.RUNNING,
       rmApp.getLogAggregationStatusForAppReport());
 
@@ -357,13 +396,13 @@ public class TestRMAppLogAggregationStatus {
     // others are SUCCEEDED, the log aggregation status for this app will
     // return TIME_OUT
     rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
     rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.TIME_OUT, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.TIME_OUT, ""));
     rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
     rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
     Assert.assertEquals(LogAggregationStatus.TIME_OUT,
       rmApp.getLogAggregationStatusForAppReport());
 
@@ -371,17 +410,59 @@ public class TestRMAppLogAggregationStatus {
     // is at the final state, the log aggregation status for this app will
     // return SUCCEEDED
     rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
     rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
     rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
     rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
     Assert.assertEquals(LogAggregationStatus.SUCCEEDED,
       rmApp.getLogAggregationStatusForAppReport());
 
     rmApp = (RMAppImpl)createRMApp(conf);
+    // If the log aggregation status for at least one of NMs are RUNNING,
+    // the log aggregation status for this app will return RUNNING
+    rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
+      rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+    rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
+      rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
+    rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
+      rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+    rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
+      rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+    Assert.assertEquals(LogAggregationStatus.RUNNING,
+      rmApp.getLogAggregationStatusForAppReport());
+
+    // If the log aggregation status for at least one of NMs
+    // are RUNNING_WITH_FAILURE, the log aggregation status
+    // for this app will return RUNNING_WITH_FAILURE
+    rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
+      rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+    rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
+      rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
+    rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
+      rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+    rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
+      rmApp.getApplicationId(), LogAggregationStatus.RUNNING_WITH_FAILURE,
+      ""));
+    Assert.assertEquals(LogAggregationStatus.RUNNING_WITH_FAILURE,
+      rmApp.getLogAggregationStatusForAppReport());
+
+    // For node4, the previous log aggregation status is RUNNING_WITH_FAILURE,
+    // it will not be changed even it get a new log aggregation status
+    // as RUNNING
+    rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
+      rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+    rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
+      rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
+    rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
+      rmApp.getApplicationId(), LogAggregationStatus.NOT_START, ""));
+    rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
+      rmApp.getApplicationId(), LogAggregationStatus.RUNNING, ""));
+    Assert.assertEquals(LogAggregationStatus.RUNNING_WITH_FAILURE,
+      rmApp.getLogAggregationStatusForAppReport());
+
     rmApp.handle(new RMAppEvent(rmApp.getApplicationId(), RMAppEventType.KILL));
     Assert.assertTrue(RMAppImpl.isAppInFinalState(rmApp));
     // If at least of one log aggregation status for one NM is FAILED,
@@ -389,13 +470,13 @@ public class TestRMAppLogAggregationStatus {
     // at the final state, the log aggregation status for this app
     // will return FAILED
     rmApp.aggregateLogReport(nodeId1, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.SUCCEEDED, ""));
     rmApp.aggregateLogReport(nodeId2, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.TIME_OUT, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.TIME_OUT, ""));
     rmApp.aggregateLogReport(nodeId3, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.FAILED, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.FAILED, ""));
     rmApp.aggregateLogReport(nodeId4, LogAggregationReport.newInstance(
-      rmApp.getApplicationId(), nodeId1, LogAggregationStatus.SUCCEEDED, ""));
+      rmApp.getApplicationId(), LogAggregationStatus.FAILED, ""));
     Assert.assertEquals(LogAggregationStatus.FAILED,
       rmApp.getLogAggregationStatusForAppReport());