Browse Source

YARN-1376. NM need to notify the log aggregation status to RM through Node heartbeat. Contributed by Xuan Gong.

Junping Du 10 năm trước cách đây
mục cha
commit
92431c9617
26 tập tin đã thay đổi với 1291 bổ sung11 xóa
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 11 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  3. 104 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java
  4. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
  5. 227 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java
  6. 81 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
  7. 31 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/LogAggregationStatus.java
  8. 11 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java
  9. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
  10. 13 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
  11. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
  12. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
  13. 70 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
  14. 33 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
  15. 9 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
  16. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
  17. 88 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
  18. 23 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
  19. 26 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
  20. 41 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppLogAggregationStatusPage.java
  21. 148 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java
  22. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java
  23. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java
  24. 6 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
  25. 318 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
  26. 6 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -60,6 +60,9 @@ Release 2.8.0 - UNRELEASED
     container-executor for outbound network traffic control. (Sidharta Seethana
     via vinodkv)
 
+    YARN-1376. NM need to notify the log aggregation status to RM through 
+    heartbeat. (Xuan Gong via junping_du)
+
   IMPROVEMENTS
 
     YARN-1880. Cleanup TestApplicationClientProtocolOnHA

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -741,6 +741,17 @@ public class YarnConfiguration extends Configuration {
       YARN_PREFIX + "log-aggregation.retain-check-interval-seconds";
   public static final long DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS = -1;
 
+  /**
+   * How long for ResourceManager to wait for NodeManager to report its
+   * log aggregation status. If waiting time of which the log aggregation status
+   * is reported from NodeManager exceeds the configured value, RM will report
+   * log aggregation status for this NodeManager as TIME_OUT
+   */
+  public static final String LOG_AGGREGATION_STATUS_TIME_OUT_MS =
+      YARN_PREFIX + "log-aggregation-status.time-out.ms";
+  public static final long DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS
+      = 10 * 60 * 1000;
+
   /**
    * Number of seconds to retain logs on the NodeManager. Only applicable if Log
    * aggregation is disabled

+ 104 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java

@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * {@code LogAggregationReport} is a report for log aggregation status
+ * in one NodeManager of an application.
+ * <p>
+ * It includes details such as:
+ * <ul>
+ *   <li>{@link ApplicationId} of the application.</li>
+ *   <li>{@link NodeId} of the NodeManager.</li>
+ *   <li>{@link LogAggregationStatus}</li>
+ *   <li>Diagnostic information</li>
+ * </ul>
+ *
+ */
+@Public
+@Unstable
+public abstract class LogAggregationReport {
+
+  @Public
+  @Unstable
+  public static LogAggregationReport newInstance(ApplicationId appId,
+      NodeId nodeId, LogAggregationStatus status, String diagnosticMessage) {
+    LogAggregationReport report = Records.newRecord(LogAggregationReport.class);
+    report.setApplicationId(appId);
+    report.setLogAggregationStatus(status);
+    report.setDiagnosticMessage(diagnosticMessage);
+    return report;
+  }
+
+  /**
+   * Get the <code>ApplicationId</code> of the application.
+   * @return <code>ApplicationId</code> of the application
+   */
+  @Public
+  @Unstable
+  public abstract ApplicationId getApplicationId();
+
+  @Public
+  @Unstable
+  public abstract void setApplicationId(ApplicationId appId);
+
+  /**
+   * Get the <code>NodeId</code>.
+   * @return <code>NodeId</code>
+   */
+  @Public
+  @Unstable
+  public abstract NodeId getNodeId();
+
+  @Public
+  @Unstable
+  public abstract void setNodeId(NodeId nodeId);
+
+  /**
+   * Get the <code>LogAggregationStatus</code>.
+   * @return <code>LogAggregationStatus</code>
+   */
+  @Public
+  @Unstable
+  public abstract LogAggregationStatus getLogAggregationStatus();
+
+  @Public
+  @Unstable
+  public abstract void setLogAggregationStatus(
+      LogAggregationStatus logAggregationStatus);
+
+  /**
+   * Get  the <em>diagnositic information</em> of this log aggregation
+   * @return <em>diagnositic information</em> of this log aggregation
+   */
+  @Public
+  @Unstable
+  public abstract String getDiagnosticMessage();
+
+  @Public
+  @Unstable
+  public abstract void setDiagnosticMessage(String diagnosticMessage);
+}

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java

@@ -18,8 +18,10 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords;
 
+import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.util.Records;
@@ -51,4 +53,10 @@ public abstract class NodeHeartbeatRequest {
   
   public abstract Set<String> getNodeLabels();
   public abstract void setNodeLabels(Set<String> nodeLabels);
+
+  public abstract Map<ApplicationId, LogAggregationReport>
+      getLogAggregationReportsForApps();
+
+  public abstract void setLogAggregationReportsForApps(
+      Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps);
 }

+ 227 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java

@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.LogAggregationStatusProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class LogAggregationReportPBImpl extends LogAggregationReport {
+
+  LogAggregationReportProto proto = LogAggregationReportProto
+    .getDefaultInstance();
+  LogAggregationReportProto.Builder builder = null;
+  boolean viaProto = false;
+
+  private static final String LOGAGGREGATION_STATUS_PREFIX = "LOG_";
+
+  private ApplicationId applicationId;
+  private NodeId nodeId;
+
+  public LogAggregationReportPBImpl() {
+    builder = LogAggregationReportProto.newBuilder();
+  }
+
+  public LogAggregationReportPBImpl(LogAggregationReportProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public LogAggregationReportProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null)
+      return false;
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+
+  private void mergeLocalToBuilder() {
+    if (this.applicationId != null
+        && !((ApplicationIdPBImpl) this.applicationId).getProto().equals(
+          builder.getApplicationId())) {
+      builder.setApplicationId(convertToProtoFormat(this.applicationId));
+    }
+
+    if (this.nodeId != null
+        && !((NodeIdPBImpl) this.nodeId).getProto().equals(
+          builder.getNodeId())) {
+      builder.setNodeId(convertToProtoFormat(this.nodeId));
+    }
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto)
+      maybeInitBuilder();
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = LogAggregationReportProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public ApplicationId getApplicationId() {
+    if (this.applicationId != null) {
+      return this.applicationId;
+    }
+
+    LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasApplicationId()) {
+      return null;
+    }
+    this.applicationId = convertFromProtoFormat(p.getApplicationId());
+    return this.applicationId;
+  }
+
+  @Override
+  public void setApplicationId(ApplicationId appId) {
+    maybeInitBuilder();
+    if (appId == null)
+      builder.clearApplicationId();
+    this.applicationId = appId;
+  }
+
+  private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+    return ((ApplicationIdPBImpl) t).getProto();
+  }
+
+  private ApplicationIdPBImpl convertFromProtoFormat(
+      ApplicationIdProto applicationId) {
+    return new ApplicationIdPBImpl(applicationId);
+  }
+
+  @Override
+  public LogAggregationStatus getLogAggregationStatus() {
+    LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasLogAggregationStatus()) {
+      return null;
+    }
+    return convertFromProtoFormat(p.getLogAggregationStatus());
+  }
+
+  @Override
+  public void
+      setLogAggregationStatus(LogAggregationStatus logAggregationStatus) {
+    maybeInitBuilder();
+    if (logAggregationStatus == null) {
+      builder.clearLogAggregationStatus();
+      return;
+    }
+    builder.setLogAggregationStatus(convertToProtoFormat(logAggregationStatus));
+  }
+
+  private LogAggregationStatus convertFromProtoFormat(
+      LogAggregationStatusProto s) {
+    return LogAggregationStatus.valueOf(s.name().replace(
+      LOGAGGREGATION_STATUS_PREFIX, ""));
+  }
+
+  private LogAggregationStatusProto
+      convertToProtoFormat(LogAggregationStatus s) {
+    return LogAggregationStatusProto.valueOf(LOGAGGREGATION_STATUS_PREFIX
+        + s.name());
+  }
+
+  @Override
+  public String getDiagnosticMessage() {
+    LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasDiagnostics()) {
+      return null;
+    }
+    return p.getDiagnostics();
+  }
+
+  @Override
+  public void setDiagnosticMessage(String diagnosticMessage) {
+    maybeInitBuilder();
+    if (diagnosticMessage == null) {
+      builder.clearDiagnostics();
+      return;
+    }
+    builder.setDiagnostics(diagnosticMessage);
+  }
+
+  @Override
+  public NodeId getNodeId() {
+    if (this.nodeId != null) {
+      return this.nodeId;
+    }
+
+    LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasNodeId()) {
+      return null;
+    }
+    this.nodeId = convertFromProtoFormat(p.getNodeId());
+    return this.nodeId;
+  }
+
+  @Override
+  public void setNodeId(NodeId nodeId) {
+    maybeInitBuilder();
+    if (nodeId == null)
+      builder.clearNodeId();
+    this.nodeId = nodeId;
+  }
+
+  private NodeIdProto convertToProtoFormat(NodeId t) {
+    return ((NodeIdPBImpl) t).getProto();
+  }
+
+  private NodeIdPBImpl convertFromProtoFormat(NodeIdProto nodeId) {
+    return new NodeIdPBImpl(nodeId);
+  }
+}

+ 81 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java

@@ -18,15 +18,24 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
 
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdToLabelsProto;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportsForAppsProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@@ -42,6 +51,8 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
   private MasterKey lastKnownContainerTokenMasterKey = null;
   private MasterKey lastKnownNMTokenMasterKey = null;
   private Set<String> labels = null;
+  private Map<ApplicationId, LogAggregationReport>
+      logAggregationReportsForApps = null;
   
   public NodeHeartbeatRequestPBImpl() {
     builder = NodeHeartbeatRequestProto.newBuilder();
@@ -91,6 +102,25 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
       builder.setNodeLabels(StringArrayProto.newBuilder()
           .addAllElements(this.labels).build());
     }
+    if (this.logAggregationReportsForApps != null) {
+      addLogAggregationStatusForAppsToProto();
+    }
+  }
+
+  private void addLogAggregationStatusForAppsToProto() {
+    maybeInitBuilder();
+    builder.clearLogAggregationReportsForApps();
+    for (Entry<ApplicationId, LogAggregationReport> entry : logAggregationReportsForApps
+      .entrySet()) {
+      builder.addLogAggregationReportsForApps(LogAggregationReportsForAppsProto
+        .newBuilder().setAppId(convertToProtoFormat(entry.getKey()))
+        .setLogAggregationReport(convertToProtoFormat(entry.getValue())));
+    }
+  }
+
+  private LogAggregationReportProto convertToProtoFormat(
+      LogAggregationReport value) {
+    return ((LogAggregationReportPBImpl) value).getProto();
   }
 
   private void mergeLocalToProto() {
@@ -215,4 +245,54 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
     StringArrayProto nodeLabels = p.getNodeLabels();
     labels = new HashSet<String>(nodeLabels.getElementsList());
   }
+
+  private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
+    return new ApplicationIdPBImpl(p);
+  }
+
+  private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+    return ((ApplicationIdPBImpl) t).getProto();
+  }
+
+  @Override
+  public Map<ApplicationId, LogAggregationReport>
+      getLogAggregationReportsForApps() {
+    if (this.logAggregationReportsForApps != null) {
+      return this.logAggregationReportsForApps;
+    }
+    initLogAggregationReportsForApps();
+    return logAggregationReportsForApps;
+  }
+
+  private void initLogAggregationReportsForApps() {
+    NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
+    List<LogAggregationReportsForAppsProto> list =
+        p.getLogAggregationReportsForAppsList();
+    this.logAggregationReportsForApps =
+        new HashMap<ApplicationId, LogAggregationReport>();
+    for (LogAggregationReportsForAppsProto c : list) {
+      ApplicationId appId = convertFromProtoFormat(c.getAppId());
+      LogAggregationReport report =
+          convertFromProtoFormat(c.getLogAggregationReport());
+      this.logAggregationReportsForApps.put(appId, report);
+    }
+  }
+
+  private LogAggregationReport convertFromProtoFormat(
+      LogAggregationReportProto logAggregationReport) {
+    return new LogAggregationReportPBImpl(logAggregationReport);
+  }
+
+  @Override
+  public void setLogAggregationReportsForApps(
+      Map<ApplicationId, LogAggregationReport> logAggregationStatusForApps) {
+    if (logAggregationStatusForApps == null
+        || logAggregationStatusForApps.isEmpty()) {
+      return;
+    }
+    maybeInitBuilder();
+    this.logAggregationReportsForApps =
+        new HashMap<ApplicationId, LogAggregationReport>();
+    this.logAggregationReportsForApps.putAll(logAggregationStatusForApps);
+  }
 }  

+ 31 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/LogAggregationStatus.java

@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records;
+
+/**
+ * <p>Status of Log aggregation.</p>
+ */
+public enum LogAggregationStatus {
+  DISABLED,
+  NOT_START,
+  RUNNING,
+  FINISHED,
+  FAILED,
+  TIME_OUT
+}

+ 11 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java

@@ -21,8 +21,10 @@ package org.apache.hadoop.yarn.server.webapp;
 import static org.apache.hadoop.yarn.util.StringHelper.join;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.APPLICATION_ID;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.WEB_UI_TYPE;
+
 import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
+
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -47,6 +49,7 @@ import org.apache.hadoop.yarn.server.webapp.dao.AppInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
 import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.util.Times;
+import org.apache.hadoop.yarn.webapp.ResponseInfo;
 import org.apache.hadoop.yarn.webapp.YarnWebParams;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
@@ -154,7 +157,7 @@ public class AppBlock extends HtmlBlock {
       html.script().$type("text/javascript")._(script.toString())._();
     }
 
-    info("Application Overview")
+    ResponseInfo overviewTable = info("Application Overview")
       ._("User:", app.getUser())
       ._("Name:", app.getName())
       ._("Application Type:", app.getType())
@@ -181,8 +184,13 @@ public class AppBlock extends HtmlBlock {
           .getAppState() == YarnApplicationState.FINISHED
             || app.getAppState() == YarnApplicationState.FAILED
             || app.getAppState() == YarnApplicationState.KILLED ? "History"
-            : "ApplicationMaster")
-      ._("Diagnostics:",
+            : "ApplicationMaster");
+    if (webUiType != null
+        && webUiType.equals(YarnWebParams.RM_WEB_UI)) {
+      overviewTable._("Log Aggregation Status",
+        root_url("logaggregationstatus", app.getAppId()), "Status");
+    }
+    overviewTable._("Diagnostics:",
         app.getDiagnosticsInfo() == null ? "" : app.getDiagnosticsInfo());
 
     Collection<ApplicationAttemptReport> attempts;

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto

@@ -54,3 +54,11 @@ message VersionProto {
   optional int32 minor_version = 2;
 }
 
+enum LogAggregationStatusProto {
+  LOG_DISABLED = 1;
+  LOG_NOT_START = 2;
+  LOG_RUNNING = 3;
+  LOG_FINISHED = 4;
+  LOG_TIME_OUT = 5;
+}
+

+ 13 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto

@@ -50,6 +50,19 @@ message NodeHeartbeatRequestProto {
   optional MasterKeyProto last_known_container_token_master_key = 2;
   optional MasterKeyProto last_known_nm_token_master_key = 3;
   optional StringArrayProto nodeLabels = 4;
+  repeated LogAggregationReportsForAppsProto log_aggregation_reports_for_apps = 5;
+}
+
+message LogAggregationReportsForAppsProto {
+  optional ApplicationIdProto appId = 1;
+  optional LogAggregationReportProto log_aggregation_report = 2;
+}
+
+message LogAggregationReportProto {
+optional ApplicationIdProto application_id = 1;
+optional NodeIdProto node_id = 2;
+optional LogAggregationStatusProto log_aggregation_status = 3;
+optional string diagnostics = 4 [default = "N/A"];
 }
 
 message NodeHeartbeatResponseProto {

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.nodemanager;
 
 import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.security.Credentials;
@@ -26,6 +27,7 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -77,4 +79,7 @@ public interface Context {
   boolean getDecommissioned();
 
   void setDecommissioned(boolean isDecommissioned);
+
+  ConcurrentLinkedQueue<LogAggregationReport>
+      getLogAggregationStatusForApps();
 }

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java

@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@@ -384,6 +386,8 @@ public class NodeManager extends CompositeService
         .getRecordFactory(null).newRecordInstance(NodeHealthStatus.class);
     private final NMStateStoreService stateStore;
     private boolean isDecommissioned = false;
+    private final ConcurrentLinkedQueue<LogAggregationReport>
+        logAggregationReportForApps;
 
     public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
         NMTokenSecretManagerInNM nmTokenSecretManager,
@@ -397,6 +401,8 @@ public class NodeManager extends CompositeService
       this.nodeHealthStatus.setHealthReport("Healthy");
       this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis());
       this.stateStore = stateStore;
+      this.logAggregationReportForApps = new ConcurrentLinkedQueue<
+          LogAggregationReport>();
     }
 
     /**
@@ -488,6 +494,12 @@ public class NodeManager extends CompositeService
         Map<ApplicationId, Credentials> systemCredentials) {
       this.systemCredentials = systemCredentials;
     }
+
+    @Override
+    public ConcurrentLinkedQueue<LogAggregationReport>
+        getLogAggregationStatusForApps() {
+      return this.logAggregationReportForApps;
+    }
   }
 
 

+ 70 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -30,6 +30,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.Random;
 import java.util.Set;
 
@@ -58,6 +59,7 @@ import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.ServerRMProxy;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -73,6 +75,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
+import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -115,6 +118,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   // Duration for which to track recently stopped container.
   private long durationToTrackStoppedContainers;
 
+  private boolean logAggregationEnabled;
+
+  private final List<LogAggregationReport> logAggregationReportForAppsTempList;
+
   private final NodeHealthCheckerService healthChecker;
   private final NodeManagerMetrics metrics;
 
@@ -144,6 +151,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     this.recentlyStoppedContainers = new LinkedHashMap<ContainerId, Long>();
     this.pendingCompletedContainers =
         new HashMap<ContainerId, ContainerStatus>();
+    this.logAggregationReportForAppsTempList =
+        new ArrayList<LogAggregationReport>();
   }
 
   @Override
@@ -193,6 +202,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     LOG.info("Initialized nodemanager for " + nodeId + ":" +
         " physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
         " virtual-cores=" + virtualCores);
+
+    this.logAggregationEnabled =
+        conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+          YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
   }
 
   @Override
@@ -649,6 +662,18 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
                     NodeStatusUpdaterImpl.this.context
                         .getNMTokenSecretManager().getCurrentKey(),
                     nodeLabelsForHeartbeat);
+
+            if (logAggregationEnabled) {
+              // pull log aggregation status for application running in this NM
+              Map<ApplicationId, LogAggregationReport> logAggregationReports =
+                  getLogAggregationReportsForApps(context
+                    .getLogAggregationStatusForApps());
+              if (logAggregationReports != null
+                  && !logAggregationReports.isEmpty()) {
+                request.setLogAggregationReportsForApps(logAggregationReports);
+              }
+            }
+
             response = resourceTracker.nodeHeartbeat(request);
             //get next heartbeat interval from response
             nextHeartBeatInterval = response.getNextHeartBeatInterval();
@@ -698,6 +723,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
             removeOrTrackCompletedContainersFromContext(response
                   .getContainersToBeRemovedFromNM());
 
+            logAggregationReportForAppsTempList.clear();
             lastHeartbeatID = response.getResponseId();
             List<ContainerId> containersToCleanup = response
                 .getContainersToCleanup();
@@ -782,6 +808,48 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
         new Thread(statusUpdaterRunnable, "Node Status Updater");
     statusUpdater.start();
   }
-  
-  
+
+  private Map<ApplicationId, LogAggregationReport>
+      getLogAggregationReportsForApps(
+          ConcurrentLinkedQueue<LogAggregationReport> lastestLogAggregationStatus) {
+    Map<ApplicationId, LogAggregationReport> latestLogAggregationReports =
+        new HashMap<ApplicationId, LogAggregationReport>();
+    LogAggregationReport status;
+    while ((status = lastestLogAggregationStatus.poll()) != null) {
+      this.logAggregationReportForAppsTempList.add(status);
+    }
+    for (LogAggregationReport logAggregationReport
+        : this.logAggregationReportForAppsTempList) {
+      LogAggregationReport report = null;
+      if (latestLogAggregationReports.containsKey(logAggregationReport
+        .getApplicationId())) {
+        report =
+            latestLogAggregationReports.get(logAggregationReport
+              .getApplicationId());
+        report.setLogAggregationStatus(logAggregationReport
+          .getLogAggregationStatus());
+        String message = report.getDiagnosticMessage();
+        if (logAggregationReport.getDiagnosticMessage() != null
+            && !logAggregationReport.getDiagnosticMessage().isEmpty()) {
+          if (message != null) {
+            message += logAggregationReport.getDiagnosticMessage();
+          } else {
+            message = logAggregationReport.getDiagnosticMessage();
+          }
+          report.setDiagnosticMessage(message);
+        }
+      } else {
+        report = Records.newRecord(LogAggregationReport.class);
+        report.setApplicationId(logAggregationReport.getApplicationId());
+        report.setNodeId(this.nodeId);
+        report.setLogAggregationStatus(logAggregationReport
+          .getLogAggregationStatus());
+        report
+          .setDiagnosticMessage(logAggregationReport.getDiagnosticMessage());
+      }
+      latestLogAggregationReports.put(logAggregationReport.getApplicationId(),
+        report);
+    }
+    return latestLogAggregationReports;
+  }
 }

+ 33 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java

@@ -57,12 +57,16 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
 import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
 import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.Times;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
@@ -120,6 +124,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
   // This variable is only for testing
   private final AtomicBoolean waiting = new AtomicBoolean(false);
 
+  private boolean renameTemporaryLogFileFailed = false;
+
   private final Map<ContainerId, ContainerLogAggregator> containerLogAggregators =
       new HashMap<ContainerId, ContainerLogAggregator>();
 
@@ -292,12 +298,14 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
         writer.close();
       }
 
+      long currentTime = System.currentTimeMillis();
       final Path renamedPath = this.rollingMonitorInterval <= 0
               ? remoteNodeLogFileForApp : new Path(
                 remoteNodeLogFileForApp.getParent(),
                 remoteNodeLogFileForApp.getName() + "_"
-                    + System.currentTimeMillis());
+                    + currentTime);
 
+      String diagnosticMessage = "";
       final boolean rename = uploadedLogsInThisCycle;
       try {
         userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@@ -314,12 +322,36 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
             return null;
           }
         });
+        diagnosticMessage =
+            "Log uploaded successfully for Application: " + appId
+                + " in NodeManager: "
+                + LogAggregationUtils.getNodeString(nodeId) + " at "
+                + Times.format(currentTime) + "\n";
       } catch (Exception e) {
         LOG.error(
           "Failed to move temporary log file to final location: ["
               + remoteNodeTmpLogFileForApp + "] to ["
               + renamedPath + "]", e);
+        diagnosticMessage =
+            "Log uploaded failed for Application: " + appId
+                + " in NodeManager: "
+                + LogAggregationUtils.getNodeString(nodeId) + " at "
+                + Times.format(currentTime) + "\n";
+        renameTemporaryLogFileFailed = true;
+      }
+
+      LogAggregationReport report =
+          Records.newRecord(LogAggregationReport.class);
+      report.setApplicationId(appId);
+      report.setNodeId(nodeId);
+      report.setDiagnosticMessage(diagnosticMessage);
+      if (appFinished) {
+        report.setLogAggregationStatus(renameTemporaryLogFileFailed
+            ? LogAggregationStatus.FAILED : LogAggregationStatus.FINISHED);
+      } else {
+        report.setLogAggregationStatus(LogAggregationStatus.RUNNING);
       }
+      this.context.getLogAggregationStatusForApps().add(report);
     } finally {
       if (writer != null) {
         writer.close();

+ 9 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

@@ -458,10 +458,16 @@ public class ResourceTrackerService extends AbstractService implements
     }
 
     // 4. Send status to RMNode, saving the latest response.
-    this.rmContext.getDispatcher().getEventHandler().handle(
+    RMNodeStatusEvent nodeStatusEvent =
         new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
-            remoteNodeStatus.getContainersStatuses(), 
-            remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse));
+          remoteNodeStatus.getContainersStatuses(),
+          remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse);
+    if (request.getLogAggregationReportsForApps() != null
+        && !request.getLogAggregationReportsForApps().isEmpty()) {
+      nodeStatusEvent.setLogAggregationReportsForApps(request
+        .getLogAggregationReportsForApps());
+    }
+    this.rmContext.getDispatcher().getEventHandler().handle(nodeStatusEvent);
 
     // 5. Update node's labels to RM's NodeLabelManager.
     if (isDistributesNodeLabelsConf && request.getNodeLabels() != null) {

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
@@ -242,4 +243,6 @@ public interface RMApp extends EventHandler<RMAppEvent> {
   ReservationId getReservationId();
   
   ResourceRequest getAMResourceRequest();
+
+  Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp();
 }

+ 88 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -25,9 +25,11 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -61,6 +63,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
@@ -142,6 +146,12 @@ public class RMAppImpl implements RMApp, Recoverable {
       new AppFinishedTransition();
   private Set<NodeId> ranNodes = new ConcurrentSkipListSet<NodeId>();
 
+  private final boolean logAggregationEnabled;
+  private long logAggregationStartTime = 0;
+  private final long logAggregationStatusTimeout;
+  private final Map<NodeId, LogAggregationReport> logAggregationStatus =
+      new HashMap<NodeId, LogAggregationReport>();
+
   // These states stored are only valid when app is at killing or final_saving.
   private RMAppState stateBeforeKilling;
   private RMAppState stateBeforeFinalSaving;
@@ -413,6 +423,19 @@ public class RMAppImpl implements RMApp, Recoverable {
 
     rmContext.getRMApplicationHistoryWriter().applicationStarted(this);
     rmContext.getSystemMetricsPublisher().appCreated(this, startTime);
+
+    long localLogAggregationStatusTimeout =
+        conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
+          YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
+    if (localLogAggregationStatusTimeout <= 0) {
+      this.logAggregationStatusTimeout =
+          YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS;
+    } else {
+      this.logAggregationStatusTimeout = localLogAggregationStatusTimeout;
+    }
+    this.logAggregationEnabled =
+        conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+          YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
   }
 
   @Override
@@ -803,6 +826,12 @@ public class RMAppImpl implements RMApp, Recoverable {
       
       // otherwise, add it to ranNodes for further process
       app.ranNodes.add(nodeAddedEvent.getNodeId());
+
+      app.logAggregationStatus.put(nodeAddedEvent.getNodeId(),
+        LogAggregationReport.newInstance(app.applicationId, nodeAddedEvent
+          .getNodeId(), app.logAggregationEnabled
+            ? LogAggregationStatus.NOT_START : LogAggregationStatus.DISABLED,
+          ""));
     };
   }
 
@@ -1153,6 +1182,7 @@ public class RMAppImpl implements RMApp, Recoverable {
     }
 
     public void transition(RMAppImpl app, RMAppEvent event) {
+      app.logAggregationStartTime = System.currentTimeMillis();
       for (NodeId nodeId : app.getRanNodes()) {
         app.handler.handle(
             new RMNodeCleanAppEvent(nodeId, app.applicationId));
@@ -1356,4 +1386,62 @@ public class RMAppImpl implements RMApp, Recoverable {
     }
     return credentials;
   }
+
+  @Override
+  public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() {
+    try {
+      this.readLock.lock();
+      Map<NodeId, LogAggregationReport> outputs =
+          new HashMap<NodeId, LogAggregationReport>();
+      outputs.putAll(logAggregationStatus);
+      for (Entry<NodeId, LogAggregationReport> output : outputs.entrySet()) {
+        if (!output.getValue().getLogAggregationStatus()
+          .equals(LogAggregationStatus.TIME_OUT)
+            && !output.getValue().getLogAggregationStatus()
+              .equals(LogAggregationStatus.FINISHED)
+            && isAppInFinalState(this)
+            && System.currentTimeMillis() > this.logAggregationStartTime
+                + this.logAggregationStatusTimeout) {
+          output.getValue().setLogAggregationStatus(
+            LogAggregationStatus.TIME_OUT);
+        }
+      }
+      return outputs;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) {
+    try {
+      this.writeLock.lock();
+      if (this.logAggregationEnabled) {
+        LogAggregationReport curReport = this.logAggregationStatus.get(nodeId);
+        if (curReport == null) {
+          this.logAggregationStatus.put(nodeId, report);
+        } else {
+          if (curReport.getLogAggregationStatus().equals(
+            LogAggregationStatus.TIME_OUT)) {
+            if (report.getLogAggregationStatus().equals(
+              LogAggregationStatus.FINISHED)) {
+              curReport.setLogAggregationStatus(report
+                .getLogAggregationStatus());
+            }
+          } else {
+            curReport.setLogAggregationStatus(report.getLogAggregationStatus());
+          }
+
+          if (report.getDiagnosticMessage() != null
+              && !report.getDiagnosticMessage().isEmpty()) {
+            curReport
+              .setDiagnosticMessage(curReport.getDiagnosticMessage() == null
+                  ? report.getDiagnosticMessage() : curReport
+                    .getDiagnosticMessage() + report.getDiagnosticMessage());
+          }
+        }
+      }
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
 }

+ 23 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -22,6 +22,8 @@ import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -47,6 +49,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
@@ -56,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -243,7 +247,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
     this.stateMachine = stateMachineFactory.make(this);
     
-    this.nodeUpdateQueue = new ConcurrentLinkedQueue<UpdatedContainerInfo>();  
+    this.nodeUpdateQueue = new ConcurrentLinkedQueue<UpdatedContainerInfo>();
   }
 
   @Override
@@ -773,6 +777,13 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
       rmNode.handleContainerStatus(statusEvent.getContainers());
 
+      Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps =
+          statusEvent.getLogAggregationReportsForApps();
+      if (logAggregationReportsForApps != null
+          && !logAggregationReportsForApps.isEmpty()) {
+        rmNode.handleLogAggregationStatus(logAggregationReportsForApps);
+      }
+
       if(rmNode.nextHeartBeat) {
         rmNode.nextHeartBeat = false;
         rmNode.context.getDispatcher().getEventHandler().handle(
@@ -903,4 +914,15 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     }
   }
 
+  private void handleLogAggregationStatus(
+      Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps) {
+    for (Entry<ApplicationId, LogAggregationReport> report :
+        logAggregationReportsForApps.entrySet()) {
+      RMApp rmApp = this.context.getRMApps().get(report.getKey());
+      if (rmApp != null) {
+        ((RMAppImpl)rmApp).aggregateLogReport(this.nodeId, report.getValue());
+      }
+    }
+  }
+
  }

+ 26 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java

@@ -19,10 +19,12 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
 
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 
@@ -32,6 +34,7 @@ public class RMNodeStatusEvent extends RMNodeEvent {
   private final List<ContainerStatus> containersCollection;
   private final NodeHeartbeatResponse latestResponse;
   private final List<ApplicationId> keepAliveAppIds;
+  private Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps;
 
   public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
       List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
@@ -41,6 +44,19 @@ public class RMNodeStatusEvent extends RMNodeEvent {
     this.containersCollection = collection;
     this.keepAliveAppIds = keepAliveAppIds;
     this.latestResponse = latestResponse;
+    this.logAggregationReportsForApps = null;
+  }
+
+  public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
+      List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
+      NodeHeartbeatResponse latestResponse,
+      Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps) {
+    super(nodeId, RMNodeEventType.STATUS_UPDATE);
+    this.nodeHealthStatus = nodeHealthStatus;
+    this.containersCollection = collection;
+    this.keepAliveAppIds = keepAliveAppIds;
+    this.latestResponse = latestResponse;
+    this.logAggregationReportsForApps = logAggregationReportsForApps;
   }
 
   public NodeHealthStatus getNodeHealthStatus() {
@@ -58,4 +74,14 @@ public class RMNodeStatusEvent extends RMNodeEvent {
   public List<ApplicationId> getKeepAliveAppIds() {
     return this.keepAliveAppIds;
   }
+
+  public Map<ApplicationId, LogAggregationReport>
+      getLogAggregationReportsForApps() {
+    return this.logAggregationReportsForApps;
+  }
+
+  public void setLogAggregationReportsForApps(
+      Map<ApplicationId, LogAggregationReport> logAggregationReportsForApps) {
+    this.logAggregationReportsForApps = logAggregationReportsForApps;
+  }
 }

+ 41 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppLogAggregationStatusPage.java

@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import static org.apache.hadoop.yarn.util.StringHelper.join;
+import org.apache.hadoop.yarn.webapp.SubView;
+import org.apache.hadoop.yarn.webapp.YarnWebParams;
+
+public class AppLogAggregationStatusPage extends RmView{
+
+  @Override
+  protected void preHead(Page.HTML<_> html) {
+    commonPreHead(html);
+    String appId = $(YarnWebParams.APPLICATION_ID);
+    set(
+      TITLE,
+      appId.isEmpty() ? "Bad request: missing application ID" : join(
+        "Application ", $(YarnWebParams.APPLICATION_ID)));
+  }
+
+  @Override
+  protected Class<? extends SubView> content() {
+    return RMAppLogAggregationStatusBlock.class;
+  }
+}

+ 148 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java

@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import static org.apache.hadoop.yarn.util.StringHelper.join;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.APPLICATION_ID;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI._INFO_WRAP;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI._TH;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+
+import com.google.inject.Inject;
+
+public class RMAppLogAggregationStatusBlock extends HtmlBlock {
+
+  private static final Log LOG = LogFactory
+    .getLog(RMAppLogAggregationStatusBlock.class);
+  private final ResourceManager rm;
+  private final Configuration conf;
+
+  @Inject
+  RMAppLogAggregationStatusBlock(ViewContext ctx, ResourceManager rm,
+      Configuration conf) {
+    super(ctx);
+    this.rm = rm;
+    this.conf = conf;
+  }
+
+  @Override
+  protected void render(Block html) {
+    String aid = $(APPLICATION_ID);
+    if (aid.isEmpty()) {
+      puts("Bad request: requires Application ID");
+      return;
+    }
+
+    ApplicationId appId;
+    try {
+      appId = Apps.toAppID(aid);
+    } catch (Exception e) {
+      puts("Invalid Application ID: " + aid);
+      return;
+    }
+
+    setTitle(join("Application ", aid));
+
+    // Add LogAggregationStatus description table
+    // to explain the meaning of different LogAggregationStatus
+    DIV<Hamlet> div_description = html.div(_INFO_WRAP);
+    TABLE<DIV<Hamlet>> table_description =
+        div_description.table("#LogAggregationStatusDecription");
+    table_description.
+      tr().
+        th(_TH, "Log Aggregation Status").
+        th(_TH, "Description").
+      _();
+    table_description.tr().td(LogAggregationStatus.DISABLED.name())
+      .td("Log Aggregation is Disabled.")._();
+    table_description.tr().td(LogAggregationStatus.NOT_START.name())
+      .td("Log Aggregation does not Start.")._();
+    table_description.tr().td(LogAggregationStatus.RUNNING.name())
+      .td("Log Aggregation is Running.")._();
+    table_description.tr().td(LogAggregationStatus.FINISHED.name())
+      .td("Log Aggregation is Finished. All of the logs have been "
+          + "aggregated successfully.")._();
+    table_description.tr().td(LogAggregationStatus.FAILED.name())
+      .td("Log Aggregation is Failed. At least one of the logs "
+          + "have not been aggregated.")._();
+    table_description.tr().td(LogAggregationStatus.TIME_OUT.name())
+      .td("Does not get the Log aggregation status for a long time. "
+          + "Not sure what is the current Log Aggregation Status.")._();
+    table_description._();
+    div_description._();
+
+    boolean logAggregationEnabled =
+        conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+          YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
+    // Application Log aggregation status Table
+    DIV<Hamlet> div = html.div(_INFO_WRAP);
+    TABLE<DIV<Hamlet>> table =
+        div.h3(
+          "Log Aggregation: "
+              + (logAggregationEnabled ? "Enabled" : "Disabled")).table(
+          "#LogAggregationStatus");
+    table.
+      tr().
+        th(_TH, "NodeId").
+        th(_TH, "Log Aggregation Status").
+        th(_TH, "Diagnostis Message").
+      _();
+
+    RMApp rmApp = rm.getRMContext().getRMApps().get(appId);
+    if (rmApp != null) {
+      Map<NodeId, LogAggregationReport> logAggregationReports =
+          rmApp.getLogAggregationReportsForApp();
+      if (logAggregationReports != null && !logAggregationReports.isEmpty()) {
+        for (Entry<NodeId, LogAggregationReport> report :
+            logAggregationReports.entrySet()) {
+          LogAggregationStatus status =
+              report.getValue() == null ? null : report.getValue()
+                .getLogAggregationStatus();
+          String message =
+              report.getValue() == null ? null : report.getValue()
+                .getDiagnosticMessage();
+          table.tr()
+            .td(report.getKey().toString())
+            .td(status == null ? "N/A" : status.toString())
+            .td(message == null ? "N/A" : message)._();
+        }
+      }
+    }
+    table._();
+    div._();
+  }
+}

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java

@@ -68,6 +68,8 @@ public class RMWebApp extends WebApp implements YarnWebParams {
       "appattempt");
     route(pajoin("/container", CONTAINER_ID), RmController.class, "container");
     route("/errors-and-warnings", RmController.class, "errorsAndWarnings");
+    route(pajoin("/logaggregationstatus", APPLICATION_ID),
+      RmController.class, "logaggregationstatus");
   }
 
   @Override

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java

@@ -109,4 +109,8 @@ public class RmController extends Controller {
   public void errorsAndWarnings() {
     render(RMErrorsAndWarningsPage.class);
   }
+
+  public void logaggregationstatus() {
+    render(AppLogAggregationStatusPage.class);
+  }
 }

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@@ -190,6 +191,11 @@ public abstract class MockAsm extends MockApps {
     public ResourceRequest getAMResourceRequest() {
       return this.amReq; 
     }
+
+    @Override
+    public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
   }
 
   public static RMApp newApplication(int i) {

+ 318 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java

@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.logaggregationstatus;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.event.InlineDispatcher;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestRMAppLogAggregationStatus {
+
+  private RMContext rmContext;
+  private YarnScheduler scheduler;
+
+  private SchedulerEventType eventType;
+
+  private ApplicationId appId;
+
+
+  private final class TestSchedulerEventDispatcher implements
+  EventHandler<SchedulerEvent> {
+    @Override
+    public void handle(SchedulerEvent event) {
+      scheduler.handle(event);
+    }
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    InlineDispatcher rmDispatcher = new InlineDispatcher();
+
+    rmContext =
+        new RMContextImpl(rmDispatcher, null, null, null,
+          null, null, null, null, null,
+          new RMApplicationHistoryWriter());
+    rmContext.setSystemMetricsPublisher(new SystemMetricsPublisher());
+
+    scheduler = mock(YarnScheduler.class);
+    doAnswer(
+        new Answer<Void>() {
+
+          @Override
+          public Void answer(InvocationOnMock invocation) throws Throwable {
+            final SchedulerEvent event = (SchedulerEvent)(invocation.getArguments()[0]);
+            eventType = event.getType();
+            if (eventType == SchedulerEventType.NODE_UPDATE) {
+              //DO NOTHING
+            }
+            return null;
+          }
+        }
+        ).when(scheduler).handle(any(SchedulerEvent.class));
+
+    rmDispatcher.register(SchedulerEventType.class,
+        new TestSchedulerEventDispatcher());
+
+    appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Test
+  public void testLogAggregationStatus() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+    conf.setLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS, 1500);
+    RMApp rmApp = createRMApp(conf);
+    this.rmContext.getRMApps().put(appId, rmApp);
+    rmApp.handle(new RMAppEvent(this.appId, RMAppEventType.START));
+    rmApp.handle(new RMAppEvent(this.appId, RMAppEventType.APP_NEW_SAVED));
+    rmApp.handle(new RMAppEvent(this.appId, RMAppEventType.APP_ACCEPTED));
+
+    // This application will be running on two nodes
+    NodeId nodeId1 = NodeId.newInstance("localhost", 1234);
+    Resource capability = Resource.newInstance(4096, 4);
+    RMNodeImpl node1 =
+        new RMNodeImpl(nodeId1, rmContext, null, 0, 0, null, capability, null);
+    node1.handle(new RMNodeStartedEvent(nodeId1, null, null));
+    rmApp.handle(new RMAppRunningOnNodeEvent(this.appId, nodeId1));
+
+    NodeId nodeId2 = NodeId.newInstance("localhost", 2345);
+    RMNodeImpl node2 =
+        new RMNodeImpl(nodeId2, rmContext, null, 0, 0, null, capability, null);
+    node2.handle(new RMNodeStartedEvent(node2.getNodeID(), null, null));
+    rmApp.handle(new RMAppRunningOnNodeEvent(this.appId, nodeId2));
+
+    // The initial log aggregation status for these two nodes
+    // should be NOT_STARTED
+    Map<NodeId, LogAggregationReport> logAggregationStatus =
+        rmApp.getLogAggregationReportsForApp();
+    Assert.assertEquals(2, logAggregationStatus.size());
+    Assert.assertTrue(logAggregationStatus.containsKey(nodeId1));
+    Assert.assertTrue(logAggregationStatus.containsKey(nodeId2));
+    for (Entry<NodeId, LogAggregationReport> report : logAggregationStatus
+      .entrySet()) {
+      Assert.assertEquals(LogAggregationStatus.NOT_START, report.getValue()
+        .getLogAggregationStatus());
+    }
+
+    Map<ApplicationId, LogAggregationReport> node1ReportForApp =
+        new HashMap<ApplicationId, LogAggregationReport>();
+    String messageForNode1_1 =
+        "node1 logAggregation status updated at " + System.currentTimeMillis();
+    LogAggregationReport report1 =
+        LogAggregationReport.newInstance(appId, nodeId1,
+          LogAggregationStatus.RUNNING, messageForNode1_1);
+    node1ReportForApp.put(appId, report1);
+    node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
+      .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
+      null, node1ReportForApp));
+
+    Map<ApplicationId, LogAggregationReport> node2ReportForApp =
+        new HashMap<ApplicationId, LogAggregationReport>();
+    String messageForNode2_1 =
+        "node2 logAggregation status updated at " + System.currentTimeMillis();
+    LogAggregationReport report2 =
+        LogAggregationReport.newInstance(appId, nodeId2,
+          LogAggregationStatus.RUNNING, messageForNode2_1);
+    node2ReportForApp.put(appId, report2);
+    node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus
+      .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
+      null, node2ReportForApp));
+    // node1 and node2 has updated its log aggregation status
+    // verify that the log aggregation status for node1, node2
+    // has been changed
+    logAggregationStatus = rmApp.getLogAggregationReportsForApp();
+    Assert.assertEquals(2, logAggregationStatus.size());
+    Assert.assertTrue(logAggregationStatus.containsKey(nodeId1));
+    Assert.assertTrue(logAggregationStatus.containsKey(nodeId2));
+    for (Entry<NodeId, LogAggregationReport> report : logAggregationStatus
+      .entrySet()) {
+      if (report.getKey().equals(node1.getNodeID())) {
+        Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
+          .getLogAggregationStatus());
+        Assert.assertEquals(messageForNode1_1, report.getValue()
+          .getDiagnosticMessage());
+      } else if (report.getKey().equals(node2.getNodeID())) {
+        Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
+          .getLogAggregationStatus());
+        Assert.assertEquals(messageForNode2_1, report.getValue()
+          .getDiagnosticMessage());
+      } else {
+        // should not contain log aggregation report for other nodes
+        Assert
+          .fail("should not contain log aggregation report for other nodes");
+      }
+    }
+
+    // node1 updates its log aggregation status again
+    Map<ApplicationId, LogAggregationReport> node1ReportForApp2 =
+        new HashMap<ApplicationId, LogAggregationReport>();
+    String messageForNode1_2 =
+        "node1 logAggregation status updated at " + System.currentTimeMillis();
+    LogAggregationReport report1_2 =
+        LogAggregationReport.newInstance(appId, nodeId1,
+          LogAggregationStatus.RUNNING, messageForNode1_2);
+    node1ReportForApp2.put(appId, report1_2);
+    node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
+      .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
+      null, node1ReportForApp2));
+
+    // verify that the log aggregation status for node1
+    // has been changed
+    // verify that the log aggregation status for node2
+    // does not change
+    logAggregationStatus = rmApp.getLogAggregationReportsForApp();
+    Assert.assertEquals(2, logAggregationStatus.size());
+    Assert.assertTrue(logAggregationStatus.containsKey(nodeId1));
+    Assert.assertTrue(logAggregationStatus.containsKey(nodeId2));
+    for (Entry<NodeId, LogAggregationReport> report : logAggregationStatus
+      .entrySet()) {
+      if (report.getKey().equals(node1.getNodeID())) {
+        Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
+          .getLogAggregationStatus());
+        Assert.assertEquals(messageForNode1_1 + messageForNode1_2, report
+          .getValue().getDiagnosticMessage());
+      } else if (report.getKey().equals(node2.getNodeID())) {
+        Assert.assertEquals(LogAggregationStatus.RUNNING, report.getValue()
+          .getLogAggregationStatus());
+        Assert.assertEquals(messageForNode2_1, report.getValue()
+          .getDiagnosticMessage());
+      } else {
+        // should not contain log aggregation report for other nodes
+        Assert
+          .fail("should not contain log aggregation report for other nodes");
+      }
+    }
+
+    // kill the application
+    rmApp.handle(new RMAppEvent(appId, RMAppEventType.KILL));
+    rmApp.handle(new RMAppEvent(appId, RMAppEventType.ATTEMPT_KILLED));
+    rmApp.handle(new RMAppEvent(appId, RMAppEventType.APP_UPDATE_SAVED));
+    Assert.assertEquals(RMAppState.KILLED, rmApp.getState());
+
+    // wait for 1500 ms
+    Thread.sleep(1500);
+
+    // the log aggregation status for both nodes should be changed
+    // to TIME_OUT
+    logAggregationStatus = rmApp.getLogAggregationReportsForApp();
+    Assert.assertEquals(2, logAggregationStatus.size());
+    Assert.assertTrue(logAggregationStatus.containsKey(nodeId1));
+    Assert.assertTrue(logAggregationStatus.containsKey(nodeId2));
+    for (Entry<NodeId, LogAggregationReport> report : logAggregationStatus
+      .entrySet()) {
+      Assert.assertEquals(LogAggregationStatus.TIME_OUT, report.getValue()
+        .getLogAggregationStatus());
+    }
+
+    // Finally, node1 finished its log aggregation and sent out its final
+    // log aggregation status. The log aggregation status for node1 should
+    // be changed from TIME_OUT to Finished
+    Map<ApplicationId, LogAggregationReport> node1ReportForApp3 =
+        new HashMap<ApplicationId, LogAggregationReport>();
+    String messageForNode1_3 =
+        "node1 final logAggregation status updated at "
+            + System.currentTimeMillis();
+    LogAggregationReport report1_3 =
+        LogAggregationReport.newInstance(appId, nodeId1,
+          LogAggregationStatus.FINISHED, messageForNode1_3);
+    node1ReportForApp3.put(appId, report1_3);
+    node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
+      .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
+      null, node1ReportForApp3));
+
+    logAggregationStatus = rmApp.getLogAggregationReportsForApp();
+    Assert.assertEquals(2, logAggregationStatus.size());
+    Assert.assertTrue(logAggregationStatus.containsKey(nodeId1));
+    Assert.assertTrue(logAggregationStatus.containsKey(nodeId2));
+    for (Entry<NodeId, LogAggregationReport> report : logAggregationStatus
+      .entrySet()) {
+      if (report.getKey().equals(node1.getNodeID())) {
+        Assert.assertEquals(LogAggregationStatus.FINISHED, report.getValue()
+          .getLogAggregationStatus());
+        Assert.assertEquals(messageForNode1_1 + messageForNode1_2
+            + messageForNode1_3, report.getValue().getDiagnosticMessage());
+      } else if (report.getKey().equals(node2.getNodeID())) {
+        Assert.assertEquals(LogAggregationStatus.TIME_OUT, report.getValue()
+          .getLogAggregationStatus());
+      } else {
+        // should not contain log aggregation report for other nodes
+        Assert
+          .fail("should not contain log aggregation report for other nodes");
+      }
+    }
+  }
+
+  private RMApp createRMApp(Configuration conf) {
+    ApplicationSubmissionContext submissionContext =
+        ApplicationSubmissionContext.newInstance(appId, "test", "default",
+          Priority.newInstance(0), null, false, true,
+          2, Resource.newInstance(10, 2), "test");
+    return new RMAppImpl(this.appId, this.rmContext,
+      conf, "test", "test", "default", submissionContext,
+      this.rmContext.getScheduler(),
+      this.rmContext.getApplicationMasterService(),
+      System.currentTimeMillis(), "test",
+      null, null);
+  }
+}

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java

@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
@@ -271,4 +272,9 @@ public class MockRMApp implements RMApp {
   public ResourceRequest getAMResourceRequest() {
     return this.amReq; 
   }
+
+  @Override
+  public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
 }