Browse Source

YARN-221. NM should provide a way for AM to tell it not to aggregate
logs. Contributed by Ming Ma

(cherry picked from commit 37e1c3d82a96d781e1c9982988b7de4aa5242d0c)

Xuan 9 years ago
parent
commit
205b98c059
26 changed files with 1343 additions and 171 deletions
  1. 12 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
  2. 3 0
      hadoop-yarn-project/CHANGES.txt
  3. 95 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java
  4. 6 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  5. 54 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerLogAggregationPolicy.java
  6. 71 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerLogContext.java
  7. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
  8. 40 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java
  9. 8 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogsRetentionPolicy.java
  10. 24 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  11. 2 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
  12. 31 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AMOnlyLogAggregationPolicy.java
  13. 35 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AMOrFailedContainerLogAggregationPolicy.java
  14. 31 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AbstractContainerLogAggregationPolicy.java
  15. 2 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
  16. 82 49
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
  17. 33 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/FailedContainerLogAggregationPolicy.java
  18. 30 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/FailedOrKilledContainerLogAggregationPolicy.java
  19. 10 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
  20. 30 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/NoneContainerLogAggregationPolicy.java
  21. 124 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/SampleContainerLogAggregationPolicy.java
  22. 3 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java
  23. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
  24. 601 76
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
  25. 4 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
  26. 9 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java

+ 12 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java

@@ -315,7 +315,18 @@ public class StringUtils {
    * @return the arraylist of the comma seperated string values
    */
   public static String[] getStrings(String str){
-    Collection<String> values = getStringCollection(str);
+    String delim = ",";
+    return getStrings(str, delim);
+  }
+
+  /**
+   * Returns an arraylist of strings.
+   * @param str the string values
+   * @param delim delimiter to separate the values
+   * @return the arraylist of the seperated string values
+   */
+  public static String[] getStrings(String str, String delim){
+    Collection<String> values = getStringCollection(str, delim);
     if(values.size() == 0) {
       return null;
     }

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -123,6 +123,9 @@ Release 2.8.0 - UNRELEASED
     YARN-2923. Support configuration based NodeLabelsProvider Service in Distributed 
     Node Label Configuration Setup. (Naganarasimha G R)
 
+    YARN-221. NM should provide a way for AM to tell it not to aggregate logs.
+    (Ming Ma via xgong)
+
   IMPROVEMENTS
 
     YARN-644. Basic null check is not performed on passed in arguments before

+ 95 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java

@@ -54,6 +54,43 @@ import org.apache.hadoop.yarn.util.Records;
  *     name matches both the include and the exclude pattern, this file
  *     will be excluded eventually.
  *   </li>
+ *   <li>
+ *     policyClassName. The policy class name that implements
+ *     ContainerLogAggregationPolicy. At runtime, nodemanager will the policy
+ *     if a given container's log should be aggregated based on the
+ *     ContainerType and other runtime state such as exit code by calling
+ *     ContainerLogAggregationPolicy#shouldDoLogAggregation.
+ *     This is useful when the app only wants to aggregate logs of a subset of
+ *     containers. Here are the available policies. Please make sure to specify
+ *     the canonical name by prefixing org.apache.hadoop.yarn.server.
+ *     nodemanager.containermanager.logaggregation.
+ *     to the class simple name below.
+ *     NoneContainerLogAggregationPolicy: skip aggregation for all containers.
+ *     AllContainerLogAggregationPolicy: aggregate all containers.
+ *     AMOrFailedContainerLogAggregationPolicy: aggregate application master
+ *         or failed containers.
+ *     FailedOrKilledContainerLogAggregationPolicy: aggregate failed or killed
+ *         containers
+ *     FailedContainerLogAggregationPolicy: aggregate failed containers
+ *     AMOnlyLogAggregationPolicy: aggregate application master containers
+ *     SampleContainerLogAggregationPolicy: sample logs of successful worker
+ *         containers, in addition to application master and failed/killed
+ *         containers.
+ *     If it isn't specified, it will use the cluster-wide default policy
+ *     defined by configuration yarn.nodemanager.log-aggregation.policy.class.
+ *     The default value of yarn.nodemanager.log-aggregation.policy.class is
+ *     AllContainerLogAggregationPolicy.
+ *   </li>
+ *   <li>
+ *     policyParameters. The parameters passed to the policy class via
+ *     ContainerLogAggregationPolicy#parseParameters during the policy object
+ *     initialization. This is optional. Some policy class might use parameters
+ *     to adjust its settings. It is up to policy class to define the scheme of
+ *     parameters.
+ *     For example, SampleContainerLogAggregationPolicy supports the format of
+ *     "SR:0.5,MIN:50", which means sample rate of 50% beyond the first 50
+ *     successful worker containers.
+ *   </li>
  * </ul>
  *
  * @see ApplicationSubmissionContext
@@ -87,6 +124,23 @@ public abstract class LogAggregationContext {
     return context;
   }
 
+  @Public
+  @Unstable
+  public static LogAggregationContext newInstance(String includePattern,
+      String excludePattern, String rolledLogsIncludePattern,
+      String rolledLogsExcludePattern, String policyClassName,
+      String policyParameters) {
+    LogAggregationContext context =
+        Records.newRecord(LogAggregationContext.class);
+    context.setIncludePattern(includePattern);
+    context.setExcludePattern(excludePattern);
+    context.setRolledLogsIncludePattern(rolledLogsIncludePattern);
+    context.setRolledLogsExcludePattern(rolledLogsExcludePattern);
+    context.setLogAggregationPolicyClassName(policyClassName);
+    context.setLogAggregationPolicyParameters(policyParameters);
+    return context;
+  }
+
   /**
    * Get include pattern. This includePattern only takes affect
    * on logs that exist at the time of application finish.
@@ -164,4 +218,45 @@ public abstract class LogAggregationContext {
   @Unstable
   public abstract void setRolledLogsExcludePattern(
       String rolledLogsExcludePattern);
+
+  /**
+   * Get the log aggregation policy class.
+   *
+   * @return log aggregation policy class
+   */
+  @Public
+  @Unstable
+  public abstract String getLogAggregationPolicyClassName();
+
+  /**
+   * Set the log aggregation policy class.
+   *
+   * @param className
+   */
+  @Public
+  @Unstable
+  public abstract void setLogAggregationPolicyClassName(
+      String className);
+
+  /**
+   * Get the log aggregation policy parameters.
+   *
+   * @return log aggregation policy parameters
+   */
+  @Public
+  @Unstable
+  public abstract String getLogAggregationPolicyParameters();
+
+  /**
+   * Set the log aggregation policy parameters.
+   * There is no schema defined for the parameters string.
+   * It is up to the log aggregation policy class to decide how to parse
+   * the parameters string.
+   *
+   * @param parameters
+   */
+  @Public
+  @Unstable
+  public abstract void setLogAggregationPolicyParameters(
+      String parameters);
 }

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -1237,6 +1237,12 @@ public class YarnConfiguration extends Configuration {
       NM_RECOVERY_PREFIX + "supervised";
   public static final boolean DEFAULT_NM_RECOVERY_SUPERVISED = false;
 
+  public static final String NM_LOG_AGG_POLICY_CLASS =
+      NM_PREFIX + "log-aggregation.policy.class";
+
+  public static final String NM_LOG_AGG_POLICY_CLASS_PARAMETERS = NM_PREFIX
+      + "log-aggregation.policy.parameters";
+
   ////////////////////////////////
   // Web Proxy Configs
   ////////////////////////////////

+ 54 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerLogAggregationPolicy.java

@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+
+/**
+ * This API is used by NodeManager to decide if a given container's logs
+ * should be aggregated at run time.
+ */
+@Public
+@Unstable
+public interface ContainerLogAggregationPolicy {
+
+  /**
+   * <p>
+   * The method used by the NodeManager log aggregation service
+   * to initial the policy object with parameters specified by the application
+   * or the cluster-wide setting.
+   * </p>
+   *
+   * @param parameters parameters with scheme defined by the policy class.
+   */
+  void parseParameters(String parameters);
+
+  /**
+   * <p>
+   * The method used by the NodeManager log aggregation service
+   * to ask the policy object if a given container's logs should be aggregated.
+   * </p>
+   *
+   * @param logContext ContainerLogContext
+   * @return Whether or not the container's logs should be aggregated.
+   */
+  boolean shouldDoLogAggregation(ContainerLogContext logContext);
+}

+ 71 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerLogContext.java

@@ -0,0 +1,71 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+/**
+ * Context class for {@link ContainerLogAggregationPolicy}.
+ */
+@Public
+@Unstable
+public class ContainerLogContext {
+  private final ContainerId containerId;
+  private final ContainerType containerType;
+  private int exitCode;
+
+  @Public
+  @Unstable
+  public ContainerLogContext(ContainerId containerId,
+      ContainerType containerType, int exitCode) {
+    this.containerId = containerId;
+    this.containerType = containerType;
+    this.exitCode = exitCode;
+  }
+
+  /**
+   * Get {@link ContainerId} of the container.
+   *
+   * @return the container ID
+   */
+  public ContainerId getContainerId() {
+    return containerId;
+  }
+
+  /**
+   * Get {@link ContainerType} the type of the container.
+   *
+   * @return the type of the container
+   */
+  public ContainerType getContainerType() {
+    return containerType;
+  }
+
+  /**
+   * Get the exit code of the container.
+   *
+   * @return the exit code
+   */
+  public int getExitCode() {
+    return exitCode;
+  }
+
+}

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto

@@ -343,6 +343,8 @@ message LogAggregationContextProto {
  optional string exclude_pattern = 2 [default = ""];
  optional string rolled_logs_include_pattern = 3 [default = ""];
  optional string rolled_logs_exclude_pattern = 4 [default = ".*"];
+ optional string log_aggregation_policy_class_name = 5;
+ optional string log_aggregation_policy_parameters = 6;
 }
 
 enum ApplicationAccessTypeProto {

+ 40 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java

@@ -155,4 +155,44 @@ public class LogAggregationContextPBImpl extends LogAggregationContext{
     }
     builder.setRolledLogsExcludePattern(rolledLogsExcludePattern);
   }
+
+  @Override
+  public String getLogAggregationPolicyClassName() {
+    LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
+    if (! p.hasLogAggregationPolicyClassName()) {
+      return null;
+    }
+    return p.getLogAggregationPolicyClassName();
+  }
+
+  @Override
+  public void setLogAggregationPolicyClassName(
+      String className) {
+    maybeInitBuilder();
+    if (className == null) {
+      builder.clearLogAggregationPolicyClassName();
+      return;
+    }
+    builder.setLogAggregationPolicyClassName(className);
+  }
+
+  @Override
+  public String getLogAggregationPolicyParameters() {
+    LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
+    if (! p.hasLogAggregationPolicyParameters()) {
+      return null;
+    }
+    return p.getLogAggregationPolicyParameters();
+  }
+
+  @Override
+  public void setLogAggregationPolicyParameters(
+      String config) {
+    maybeInitBuilder();
+    if (config == null) {
+      builder.clearLogAggregationPolicyParameters();
+      return;
+    }
+    builder.setLogAggregationPolicyParameters(config);
+  }
 }

+ 8 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogsRetentionPolicy.java

@@ -16,14 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.yarn.logaggregation;
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.server.api.ContainerLogContext;
 
 @Private
-/**
- * This API is not exposed to end-users yet.
- */
-public enum ContainerLogsRetentionPolicy {
-  APPLICATION_MASTER_ONLY, AM_AND_FAILED_CONTAINERS_ONLY, ALL_CONTAINERS 
-}
+public class AllContainerLogAggregationPolicy extends
+    AbstractContainerLogAggregationPolicy {
+  public boolean shouldDoLogAggregation(ContainerLogContext logContext) {
+    return true;
+  }
+}

+ 24 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -2226,4 +2226,28 @@
     <value>0</value>
   </property>
 
+  <property>
+    <description>
+    The default log aggregation policy class. Applications can
+    override it via LogAggregationContext. This configuration can provide
+    some cluster-side default behavior so that if the application doesn't
+    specify any policy via LogAggregationContext administrators of the cluster
+    can adjust the policy globally.
+    </description>
+    <name>yarn.nodemanager.log-aggregation.policy.class</name>
+    <value>org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AllContainerLogAggregationPolicy</value>
+  </property>
+
+  <property>
+    <description>
+    The default parameters for the log aggregation policy. Applications can
+    override it via LogAggregationContext. This configuration can provide
+    some cluster-side default behavior so that if the application doesn't
+    specify any policy via LogAggregationContext administrators of the cluster
+    can adjust the policy globally.
+    </description>
+    <name>yarn.nodemanager.log-aggregation.policy.parameters</name>
+    <value></value>
+  </property>
+
 </configuration>

+ 2 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java

@@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
@@ -242,8 +241,8 @@ public class ApplicationImpl implements Application {
       app.logAggregationContext = initEvent.getLogAggregationContext();
       app.dispatcher.getEventHandler().handle(
           new LogHandlerAppStartedEvent(app.appId, app.user,
-              app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
-              app.applicationACLs, app.logAggregationContext)); 
+              app.credentials, app.applicationACLs,
+              app.logAggregationContext));
     }
   }
 

+ 31 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AMOnlyLogAggregationPolicy.java

@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.server.api.ContainerLogContext;
+import org.apache.hadoop.yarn.server.api.ContainerType;
+
+@Private
+public class AMOnlyLogAggregationPolicy extends
+    AbstractContainerLogAggregationPolicy {
+  public boolean shouldDoLogAggregation(ContainerLogContext logContext) {
+   return logContext.getContainerType() == ContainerType.APPLICATION_MASTER;
+  }
+}

+ 35 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AMOrFailedContainerLogAggregationPolicy.java

@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.server.api.ContainerLogContext;
+import org.apache.hadoop.yarn.server.api.ContainerType;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
+
+@Private
+public class AMOrFailedContainerLogAggregationPolicy extends
+    AbstractContainerLogAggregationPolicy {
+  public boolean shouldDoLogAggregation(ContainerLogContext logContext) {
+    int exitCode = logContext.getExitCode();
+    return logContext.getContainerType() == ContainerType.APPLICATION_MASTER ||
+        (exitCode != 0 && exitCode != ExitCode.FORCE_KILLED.getExitCode()
+        && exitCode != ExitCode.TERMINATED.getExitCode());
+  }
+}

+ 31 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AbstractContainerLogAggregationPolicy.java

@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy;
+
+// The class provides no-op implementation for parseParameters. Polices
+// that don't need parameters can derive from this class.
+@Private
+public abstract class AbstractContainerLogAggregationPolicy implements
+    ContainerLogAggregationPolicy {
+  public void parseParameters(String parameters) {
+  }
+}

+ 2 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java

@@ -18,12 +18,11 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
 
-import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.api.ContainerLogContext;
 
 public interface AppLogAggregator extends Runnable {
 
-  void startContainerLogAggregation(ContainerId containerId,
-      boolean wasContainerSuccessful);
+  void startContainerLogAggregation(ContainerLogContext logContext);
 
   void abortLogAggregation();
 

+ 82 - 49
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java

@@ -44,6 +44,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -56,9 +57,12 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
-import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
 import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy;
+import org.apache.hadoop.yarn.server.api.ContainerLogContext;
+import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
@@ -107,7 +111,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
   private final UserGroupInformation userUgi;
   private final Path remoteNodeLogFileForApp;
   private final Path remoteNodeTmpLogFileForApp;
-  private final ContainerLogsRetentionPolicy retentionPolicy;
 
   private final BlockingQueue<ContainerId> pendingContainers;
   private final AtomicBoolean appFinishing = new AtomicBoolean();
@@ -128,12 +131,12 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
 
   private final Map<ContainerId, ContainerLogAggregator> containerLogAggregators =
       new HashMap<ContainerId, ContainerLogAggregator>();
+  private final ContainerLogAggregationPolicy logAggPolicy;
 
   public AppLogAggregatorImpl(Dispatcher dispatcher,
       DeletionService deletionService, Configuration conf,
       ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId,
       LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
-      ContainerLogsRetentionPolicy retentionPolicy,
       Map<ApplicationAccessType, String> appAcls,
       LogAggregationContext logAggregationContext, Context context,
       FileContext lfs) {
@@ -146,7 +149,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     this.dirsHandler = dirsHandler;
     this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
     this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
-    this.retentionPolicy = retentionPolicy;
     this.pendingContainers = new LinkedBlockingQueue<ContainerId>();
     this.appAcls = appAcls;
     this.lfs = lfs;
@@ -204,6 +206,66 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
             || this.logAggregationContext.getRolledLogsIncludePattern() == null
             || this.logAggregationContext.getRolledLogsIncludePattern()
               .isEmpty() ? false : true;
+    this.logAggPolicy = getLogAggPolicy(conf);
+  }
+
+  private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) {
+    ContainerLogAggregationPolicy policy = getLogAggPolicyInstance(conf);
+    String params = getLogAggPolicyParameters(conf);
+    if (params != null) {
+      policy.parseParameters(params);
+    }
+    return policy;
+  }
+
+  // Use the policy class specified in LogAggregationContext if available.
+  // Otherwise use the cluster-wide default policy class.
+  private ContainerLogAggregationPolicy getLogAggPolicyInstance(
+      Configuration conf) {
+    Class<? extends ContainerLogAggregationPolicy> policyClass = null;
+    if (this.logAggregationContext != null) {
+      String className =
+          this.logAggregationContext.getLogAggregationPolicyClassName();
+      if (className != null) {
+        try {
+          Class<?> policyFromContext = conf.getClassByName(className);
+          if (ContainerLogAggregationPolicy.class.isAssignableFrom(
+              policyFromContext)) {
+            policyClass = policyFromContext.asSubclass(
+                ContainerLogAggregationPolicy.class);
+          } else {
+            LOG.warn(this.appId + " specified invalid log aggregation policy " +
+                className);
+          }
+        } catch (ClassNotFoundException cnfe) {
+          // We don't fail the app if the policy class isn't valid.
+          LOG.warn(this.appId + " specified invalid log aggregation policy " +
+              className);
+        }
+      }
+    }
+    if (policyClass == null) {
+      policyClass = conf.getClass(YarnConfiguration.NM_LOG_AGG_POLICY_CLASS,
+          AllContainerLogAggregationPolicy.class,
+              ContainerLogAggregationPolicy.class);
+    } else {
+      LOG.info(this.appId + " specifies ContainerLogAggregationPolicy of "
+          + policyClass);
+    }
+    return ReflectionUtils.newInstance(policyClass, conf);
+  }
+
+  // Use the policy parameters specified in LogAggregationContext if available.
+  // Otherwise use the cluster-wide default policy parameters.
+  private String getLogAggPolicyParameters(Configuration conf) {
+    String params = null;
+    if (this.logAggregationContext != null) {
+      params = this.logAggregationContext.getLogAggregationPolicyParameters();
+    }
+    if (params == null) {
+      params = conf.get(YarnConfiguration.NM_LOG_AGG_POLICY_CLASS_PARAMETERS);
+    }
+    return params;
   }
 
   private void uploadLogsForContainers(boolean appFinished) {
@@ -228,21 +290,22 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     // Create a set of Containers whose logs will be uploaded in this cycle.
     // It includes:
     // a) all containers in pendingContainers: those containers are finished
-    //    and satisfy the retentionPolicy.
+    //    and satisfy the ContainerLogAggregationPolicy.
     // b) some set of running containers: For all the Running containers,
-    // we have ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY,
-    // so simply set wasContainerSuccessful as true to
-    // bypass FAILED_CONTAINERS check and find the running containers 
-    // which satisfy the retentionPolicy.
+    //    we use exitCode of 0 to find those which satisfy the
+    //    ContainerLogAggregationPolicy.
     Set<ContainerId> pendingContainerInThisCycle = new HashSet<ContainerId>();
     this.pendingContainers.drainTo(pendingContainerInThisCycle);
     Set<ContainerId> finishedContainers =
         new HashSet<ContainerId>(pendingContainerInThisCycle);
     if (this.context.getApplications().get(this.appId) != null) {
-      for (ContainerId container : this.context.getApplications()
-        .get(this.appId).getContainers().keySet()) {
-        if (shouldUploadLogs(container, true)) {
-          pendingContainerInThisCycle.add(container);
+      for (Container container : this.context.getApplications()
+        .get(this.appId).getContainers().values()) {
+        ContainerType containerType =
+            container.getContainerTokenIdentifier().getContainerType();
+        if (shouldUploadLogs(new ContainerLogContext(
+            container.getContainerId(), containerType, 0))) {
+          pendingContainerInThisCycle.add(container.getContainerId());
         }
       }
     }
@@ -506,46 +569,16 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
 
   // TODO: The condition: containerId.getId() == 1 to determine an AM container
   // is not always true.
-  private boolean shouldUploadLogs(ContainerId containerId,
-      boolean wasContainerSuccessful) {
-
-    // All containers
-    if (this.retentionPolicy
-        .equals(ContainerLogsRetentionPolicy.ALL_CONTAINERS)) {
-      return true;
-    }
-
-    // AM Container only
-    if (this.retentionPolicy
-        .equals(ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY)) {
-      if ((containerId.getContainerId()
-          & ContainerId.CONTAINER_ID_BITMASK)== 1) {
-        return true;
-      }
-      return false;
-    }
-
-    // AM + Failing containers
-    if (this.retentionPolicy
-        .equals(ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY)) {
-      if ((containerId.getContainerId()
-          & ContainerId.CONTAINER_ID_BITMASK) == 1) {
-        return true;
-      } else if(!wasContainerSuccessful) {
-        return true;
-      }
-      return false;
-    }
-    return false;
+  private boolean shouldUploadLogs(ContainerLogContext logContext) {
+    return logAggPolicy.shouldDoLogAggregation(logContext);
   }
 
   @Override
-  public void startContainerLogAggregation(ContainerId containerId,
-      boolean wasContainerSuccessful) {
-    if (shouldUploadLogs(containerId, wasContainerSuccessful)) {
-      LOG.info("Considering container " + containerId
+  public void startContainerLogAggregation(ContainerLogContext logContext) {
+    if (shouldUploadLogs(logContext)) {
+      LOG.info("Considering container " + logContext.getContainerId()
           + " for log-aggregation");
-      this.pendingContainers.add(containerId);
+      this.pendingContainers.add(logContext.getContainerId());
     }
   }
 

+ 33 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/FailedContainerLogAggregationPolicy.java

@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.server.api.ContainerLogContext;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
+
+@Private
+public class FailedContainerLogAggregationPolicy extends
+    AbstractContainerLogAggregationPolicy {
+  public boolean shouldDoLogAggregation(ContainerLogContext logContext) {
+    int exitCode = logContext.getExitCode();
+    return exitCode != 0 && exitCode != ExitCode.FORCE_KILLED.getExitCode()
+        && exitCode != ExitCode.TERMINATED.getExitCode();
+  }
+}

+ 30 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/FailedOrKilledContainerLogAggregationPolicy.java

@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.server.api.ContainerLogContext;
+
+@Private
+public class FailedOrKilledContainerLogAggregationPolicy extends
+    AbstractContainerLogAggregationPolicy {
+  public boolean shouldDoLogAggregation(ContainerLogContext logContext) {
+    return logContext.getExitCode() != 0;
+  }
+}

+ 10 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java

@@ -48,8 +48,9 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
 import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.server.api.ContainerLogContext;
+import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
@@ -314,13 +315,12 @@ public class LogAggregationService extends AbstractService implements
 
   @SuppressWarnings("unchecked")
   private void initApp(final ApplicationId appId, String user,
-      Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
-      Map<ApplicationAccessType, String> appAcls,
+      Credentials credentials, Map<ApplicationAccessType, String> appAcls,
       LogAggregationContext logAggregationContext) {
     ApplicationEvent eventResponse;
     try {
       verifyAndCreateRemoteLogDir(getConfig());
-      initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls,
+      initAppAggregator(appId, user, credentials, appAcls,
           logAggregationContext);
       eventResponse = new ApplicationEvent(appId,
           ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
@@ -342,8 +342,7 @@ public class LogAggregationService extends AbstractService implements
 
 
   protected void initAppAggregator(final ApplicationId appId, String user,
-      Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
-      Map<ApplicationAccessType, String> appAcls,
+      Credentials credentials, Map<ApplicationAccessType, String> appAcls,
       LogAggregationContext logAggregationContext) {
 
     // Get user's FileSystem credentials
@@ -357,7 +356,7 @@ public class LogAggregationService extends AbstractService implements
     final AppLogAggregator appLogAggregator =
         new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
             getConfig(), appId, userUgi, this.nodeId, dirsHandler,
-            getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy,
+            getRemoteNodeLogFileForApp(appId, user),
             appAcls, logAggregationContext, this.context,
             getLocalFileContext(getConfig()));
     if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
@@ -420,7 +419,10 @@ public class LogAggregationService extends AbstractService implements
           + ", did it fail to start?");
       return;
     }
-    aggregator.startContainerLogAggregation(containerId, exitCode == 0);
+    ContainerType containerType = context.getContainers().get(
+        containerId).getContainerTokenIdentifier().getContainerType();
+    aggregator.startContainerLogAggregation(
+        new ContainerLogContext(containerId, containerType, exitCode));
   }
 
   private void stopApp(ApplicationId appId) {
@@ -445,7 +447,6 @@ public class LogAggregationService extends AbstractService implements
             (LogHandlerAppStartedEvent) event;
         initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
             appStartEvent.getCredentials(),
-            appStartEvent.getLogRetentionPolicy(),
             appStartEvent.getApplicationAcls(),
             appStartEvent.getLogAggregationContext());
         break;

+ 30 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/NoneContainerLogAggregationPolicy.java

@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.server.api.ContainerLogContext;
+
+@Private
+public class NoneContainerLogAggregationPolicy extends
+    AbstractContainerLogAggregationPolicy {
+  public boolean shouldDoLogAggregation(ContainerLogContext logContext) {
+    return false;
+  }
+}

+ 124 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/SampleContainerLogAggregationPolicy.java

@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
+
+import java.util.Collection;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy;
+import org.apache.hadoop.yarn.server.api.ContainerLogContext;
+import org.apache.hadoop.yarn.server.api.ContainerType;
+
+/**
+ * The sample policy samples logs of successful worker containers to aggregate.
+ * It always aggregates AM container and failed/killed worker
+ * containers' logs. To make sure small applications have enough logs, it only
+ * applies sampling beyond minimal number of containers. The parameters can be
+ * configured by SAMPLE_RATE and MIN_THRESHOLD. For example if SAMPLE_RATE is
+ * 0.2 and MIN_THRESHOLD is 20, for an application with 100 successful
+ * worker containers, 20 + (100-20) * 0.2 = 36 containers's logs will be
+ * aggregated.
+ */
+@Private
+public class SampleContainerLogAggregationPolicy implements
+    ContainerLogAggregationPolicy  {
+  private static final Log LOG =
+      LogFactory.getLog(SampleContainerLogAggregationPolicy.class);
+
+  static String SAMPLE_RATE = "SR";
+  public static final float DEFAULT_SAMPLE_RATE = 0.2f;
+
+  static String MIN_THRESHOLD = "MIN";
+  public static final int DEFAULT_SAMPLE_MIN_THRESHOLD = 20;
+
+  private float sampleRate = DEFAULT_SAMPLE_RATE;
+  private int minThreshold = DEFAULT_SAMPLE_MIN_THRESHOLD;
+
+  static public String buildParameters(float sampleRate, int minThreshold) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(SAMPLE_RATE).append(":").append(sampleRate).append(",").
+        append(MIN_THRESHOLD).append(":").append(minThreshold);
+    return sb.toString();
+  }
+
+  // Parameters are comma separated properties, for example
+  // "SR:0.5,MIN:50"
+  public void parseParameters(String parameters) {
+    Collection<String> params = StringUtils.getStringCollection(parameters);
+    for(String param : params) {
+      // The first element is the property name.
+      // The second element is the property value.
+      String[] property = StringUtils.getStrings(param, ":");
+      if (property == null || property.length != 2) {
+        continue;
+      }
+      if (property[0].equals(SAMPLE_RATE)) {
+        try {
+          float sampleRate = Float.parseFloat(property[1]);
+          if (sampleRate >= 0.0 && sampleRate <= 1.0) {
+            this.sampleRate = sampleRate;
+          } else {
+            LOG.warn("The format isn't valid. Sample rate falls back to the " +
+                "default value " + DEFAULT_SAMPLE_RATE);
+          }
+        } catch (NumberFormatException nfe) {
+          LOG.warn("The format isn't valid. Sample rate falls back to the " +
+              "default value " + DEFAULT_SAMPLE_RATE);
+        }
+      } else if (property[0].equals(MIN_THRESHOLD)) {
+        try {
+          int minThreshold = Integer.parseInt(property[1]);
+          if (minThreshold >= 0) {
+            this.minThreshold = minThreshold;
+          } else {
+            LOG.warn("The format isn't valid. Min threshold falls back to " +
+                "the default value " + DEFAULT_SAMPLE_MIN_THRESHOLD);
+          }
+        } catch (NumberFormatException nfe) {
+          LOG.warn("The format isn't valid. Min threshold falls back to the " +
+              "default value " + DEFAULT_SAMPLE_MIN_THRESHOLD);
+        }
+      }
+    }
+  }
+
+  public boolean shouldDoLogAggregation(ContainerLogContext logContext) {
+    if (logContext.getContainerType() ==
+        ContainerType.APPLICATION_MASTER || logContext.getExitCode() != 0) {
+      // If it is AM or failed or killed container, enable log aggregation.
+      return true;
+    }
+
+    // Only sample log aggregation for large applications.
+    // We assume the container id is continuously allocated from number 1 and
+    // Worker containers start from id 2. So logs of worker containers with ids
+    // in [2, minThreshold + 1] will be aggregated.
+    if ((logContext.getContainerId().getContainerId() &
+        ContainerId.CONTAINER_ID_BITMASK) < minThreshold + 2) {
+      return true;
+    }
+
+    // Sample log aggregation for the rest of successful worker containers
+    return (sampleRate != 0 &&
+        logContext.getContainerId().hashCode() % (1/sampleRate) == 0);
+  }
+}

+ 3 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java

@@ -24,32 +24,27 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
-import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
 
 public class LogHandlerAppStartedEvent extends LogHandlerEvent {
 
   private final ApplicationId applicationId;
-  private final ContainerLogsRetentionPolicy retentionPolicy;
   private final String user;
   private final Credentials credentials;
   private final Map<ApplicationAccessType, String> appAcls;
   private final LogAggregationContext logAggregationContext;
 
   public LogHandlerAppStartedEvent(ApplicationId appId, String user,
-      Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy,
-      Map<ApplicationAccessType, String> appAcls) {
-    this(appId, user, credentials, retentionPolicy, appAcls, null);
+      Credentials credentials, Map<ApplicationAccessType, String> appAcls) {
+    this(appId, user, credentials, appAcls, null);
   }
 
   public LogHandlerAppStartedEvent(ApplicationId appId, String user,
-      Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy,
-      Map<ApplicationAccessType, String> appAcls,
+      Credentials credentials, Map<ApplicationAccessType, String> appAcls,
       LogAggregationContext logAggregationContext) {
     super(LogHandlerEventType.APPLICATION_STARTED);
     this.applicationId = appId;
     this.user = user;
     this.credentials = credentials;
-    this.retentionPolicy = retentionPolicy;
     this.appAcls = appAcls;
     this.logAggregationContext = logAggregationContext;
   }
@@ -62,10 +57,6 @@ public class LogHandlerAppStartedEvent extends LogHandlerEvent {
     return this.credentials;
   }
 
-  public ContainerLogsRetentionPolicy getLogRetentionPolicy() {
-    return this.retentionPolicy;
-  }
-
   public String getUser() {
     return this.user;
   }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java

@@ -87,6 +87,7 @@ public class TestAuxServices {
       this.stoppedApps = new ArrayList<Integer>();
     }
 
+    @SuppressWarnings("unchecked")
     public ArrayList<Integer> getAppIdsStopped() {
       return (ArrayList<Integer>)this.stoppedApps.clone();
     }

+ 601 - 76
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java

@@ -86,6 +86,8 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
@@ -99,11 +101,13 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
-import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
 import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy;
+import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
@@ -191,12 +195,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     app1LogDir.mkdir();
     logAggregationService
         .handle(new LogHandlerAppStartedEvent(
-            application1, this.user, null,
-            ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+            application1, this.user, null, this.acls));
 
     ApplicationAttemptId appAttemptId =
         BuilderUtils.newApplicationAttemptId(application1, 1);
-    ContainerId container11 = BuilderUtils.newContainerId(appAttemptId, 1);
+    ContainerId container11 = createContainer(appAttemptId, 1,
+        ContainerType.APPLICATION_MASTER);
     // Simulate log-file creation
     writeContainerLogs(app1LogDir, container11, new String[] { "stdout",
         "stderr", "syslog" });
@@ -302,11 +306,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     LogAggregationContext context =
         LogAggregationContext.newInstance("HOST*", "sys*");
     logAggregationService.handle(new LogHandlerAppStartedEvent(app, this.user,
-        null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls, context));
+        null, this.acls, context));
 
     ApplicationAttemptId appAttemptId =
         BuilderUtils.newApplicationAttemptId(app, 1);
-    ContainerId cont = BuilderUtils.newContainerId(appAttemptId, 1);
+    ContainerId cont = createContainer(appAttemptId, 1,
+        ContainerType.APPLICATION_MASTER);
     writeContainerLogs(appLogDir, cont, new String[] { "stdout",
         "stderr", "syslog" });
     logAggregationService.handle(new LogHandlerContainerFinishedEvent(cont, 0));
@@ -337,8 +342,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     app1LogDir.mkdir();
     logAggregationService
         .handle(new LogHandlerAppStartedEvent(
-            application1, this.user, null,
-            ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+            application1, this.user, null, this.acls));
 
     logAggregationService.handle(new LogHandlerAppFinishedEvent(
         application1));
@@ -388,13 +392,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     app1LogDir.mkdir();
     logAggregationService
         .handle(new LogHandlerAppStartedEvent(
-            application1, this.user, null,
-            ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+            application1, this.user, null, this.acls));
 
     ApplicationAttemptId appAttemptId1 =
         BuilderUtils.newApplicationAttemptId(application1, 1);
-    ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1);
-    
+    ContainerId container11 = createContainer(appAttemptId1, 1,
+        ContainerType.APPLICATION_MASTER);
+
     // Simulate log-file creation
     writeContainerLogs(app1LogDir, container11, fileNames);
     logAggregationService.handle(
@@ -407,18 +411,23 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     File app2LogDir =
       new File(localLogDir, ConverterUtils.toString(application2));
     app2LogDir.mkdir();
+    LogAggregationContext contextWithAMOnly =
+        Records.newRecord(LogAggregationContext.class);
+    contextWithAMOnly.setLogAggregationPolicyClassName(
+        AMOnlyLogAggregationPolicy.class.getName());
+
     logAggregationService.handle(new LogHandlerAppStartedEvent(
-        application2, this.user, null,
-        ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY, this.acls));
+        application2, this.user, null, this.acls, contextWithAMOnly));
+
+    ContainerId container21 = createContainer(appAttemptId2, 1,
+        ContainerType.APPLICATION_MASTER);
 
-    
-    ContainerId container21 = BuilderUtils.newContainerId(appAttemptId2, 1);
-    
     writeContainerLogs(app2LogDir, container21, fileNames);
     logAggregationService.handle(
         new LogHandlerContainerFinishedEvent(container21, 0));
 
-    ContainerId container12 = BuilderUtils.newContainerId(appAttemptId1, 2);
+    ContainerId container12 = createContainer(appAttemptId1, 2,
+        ContainerType.TASK);
 
     writeContainerLogs(app1LogDir, container12, fileNames);
     logAggregationService.handle(
@@ -431,9 +440,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     File app3LogDir =
       new File(localLogDir, ConverterUtils.toString(application3));
     app3LogDir.mkdir();
+    LogAggregationContext contextWithAMAndFailed =
+        Records.newRecord(LogAggregationContext.class);
+    contextWithAMAndFailed.setLogAggregationPolicyClassName(
+        AMOrFailedContainerLogAggregationPolicy.class.getName());
+
     logAggregationService.handle(new LogHandlerAppStartedEvent(application3,
-        this.user, null,
-        ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));        
+        this.user, null, this.acls, contextWithAMAndFailed));
 
     dispatcher.await();
     ApplicationEvent expectedInitEvents[] = new ApplicationEvent[]{
@@ -450,22 +463,26 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     checkEvents(appEventHandler, expectedInitEvents, false, "getType", "getApplicationID");
     reset(appEventHandler);
     
-    ContainerId container31 = BuilderUtils.newContainerId(appAttemptId3, 1);
+    ContainerId container31 = createContainer(appAttemptId3, 1,
+        ContainerType.APPLICATION_MASTER);
     writeContainerLogs(app3LogDir, container31, fileNames);
     logAggregationService.handle(
         new LogHandlerContainerFinishedEvent(container31, 0));
 
-    ContainerId container32 = BuilderUtils.newContainerId(appAttemptId3, 2);
+    ContainerId container32 = createContainer(appAttemptId3, 2,
+        ContainerType.TASK);
     writeContainerLogs(app3LogDir, container32, fileNames);
     logAggregationService.handle(
         new LogHandlerContainerFinishedEvent(container32, 1)); // Failed 
 
-    ContainerId container22 = BuilderUtils.newContainerId(appAttemptId2, 2);
+    ContainerId container22 = createContainer(appAttemptId2, 2,
+        ContainerType.TASK);
     writeContainerLogs(app2LogDir, container22, fileNames);
     logAggregationService.handle(
         new LogHandlerContainerFinishedEvent(container22, 0));
 
-    ContainerId container33 = BuilderUtils.newContainerId(appAttemptId3, 3);
+    ContainerId container33 = createContainer(appAttemptId3, 3,
+        ContainerType.TASK);
     writeContainerLogs(app3LogDir, container33, fileNames);
     logAggregationService.handle(
         new LogHandlerContainerFinishedEvent(container33, 0));
@@ -528,10 +545,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     ApplicationId appId =
         BuilderUtils.newApplicationId(System.currentTimeMillis(),
           (int) (Math.random() * 1000));
+    LogAggregationContext contextWithAMAndFailed =
+        Records.newRecord(LogAggregationContext.class);
+    contextWithAMAndFailed.setLogAggregationPolicyClassName(
+        AMOrFailedContainerLogAggregationPolicy.class.getName());
+
     logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
-        this.user, null,
-        ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY,
-        this.acls));
+        this.user, null, this.acls, contextWithAMAndFailed));
     dispatcher.await();
     
     // Verify that it failed
@@ -551,11 +571,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     File appLogDir =
         new File(localLogDir, ConverterUtils.toString(appId2));
     appLogDir.mkdir();
-    
     logAggregationService.handle(new LogHandlerAppStartedEvent(appId2,
-        this.user, null,
-        ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY,
-        this.acls));
+        this.user, null, this.acls, contextWithAMAndFailed));
     dispatcher.await();
     
     // Verify that it worked
@@ -627,8 +644,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
         remoteRootLogDir.getAbsolutePath(), this.user));
     Path suffixDir = new Path(userDir, logSuffix);
     Path appDir = new Path(suffixDir, appId.toString());
+    LogAggregationContext contextWithAllContainers =
+        Records.newRecord(LogAggregationContext.class);
+    contextWithAllContainers.setLogAggregationPolicyClassName(
+        AllContainerLogAggregationPolicy.class.getName());
     aggSvc.handle(new LogHandlerAppStartedEvent(appId, this.user, null,
-        ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+        this.acls, contextWithAllContainers));
     verify(spyFs).mkdirs(eq(userDir), isA(FsPermission.class));
     verify(spyFs).mkdirs(eq(suffixDir), isA(FsPermission.class));
     verify(spyFs).mkdirs(eq(appDir), isA(FsPermission.class));
@@ -637,7 +658,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     ApplicationId appId2 = BuilderUtils.newApplicationId(1, 2);
     Path appDir2 = new Path(suffixDir, appId2.toString());
     aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null,
-        ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+        this.acls, contextWithAllContainers));
     verify(spyFs).mkdirs(eq(appDir2), isA(FsPermission.class));
 
     // start another application with the app dir already created and verify
@@ -646,7 +667,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     Path appDir3 = new Path(suffixDir, appId3.toString());
     new File(appDir3.toUri().getPath()).mkdir();
     aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null,
-        ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+        this.acls, contextWithAllContainers));
     verify(spyFs, never()).mkdirs(eq(appDir3), isA(FsPermission.class));
     aggSvc.stop();
     aggSvc.close();
@@ -674,13 +695,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     doThrow(new YarnRuntimeException("KABOOM!"))
       .when(logAggregationService).initAppAggregator(
           eq(appId), eq(user), any(Credentials.class),
-          any(ContainerLogsRetentionPolicy.class), anyMap(),
-          any(LogAggregationContext.class));
-
+          anyMap(), any(LogAggregationContext.class));
+    LogAggregationContext contextWithAMAndFailed =
+        Records.newRecord(LogAggregationContext.class);
+    contextWithAMAndFailed.setLogAggregationPolicyClassName(
+        AMOrFailedContainerLogAggregationPolicy.class.getName());
     logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
-        this.user, null,
-        ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY,
-        this.acls));
+        this.user, null, this.acls, contextWithAMAndFailed));
 
     dispatcher.await();
     ApplicationEvent expectedEvents[] = new ApplicationEvent[]{
@@ -724,10 +745,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     doThrow(e)
       .when(logAggregationService).createAppDir(any(String.class),
           any(ApplicationId.class), any(UserGroupInformation.class));
+    LogAggregationContext contextWithAMAndFailed =
+        Records.newRecord(LogAggregationContext.class);
+    contextWithAMAndFailed.setLogAggregationPolicyClassName(
+        AMOrFailedContainerLogAggregationPolicy.class.getName());
     logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
-        this.user, null,
-        ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));        
-    
+        this.user, null, this.acls, contextWithAMAndFailed));
+
     dispatcher.await();
     ApplicationEvent expectedEvents[] = new ApplicationEvent[]{
         new ApplicationEvent(appId, 
@@ -765,10 +789,27 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     }
   }
 
-  private LogFileStatusInLastCycle verifyContainerLogs(LogAggregationService logAggregationService,
+  private LogFileStatusInLastCycle verifyContainerLogs(
+      LogAggregationService logAggregationService,
       ApplicationId appId, ContainerId[] expectedContainerIds,
-      String[] logFiles, int numOfContainerLogs, boolean multiLogs)
-      throws IOException {
+      String[] logFiles, int numOfLogsPerContainer,
+      boolean multiLogs) throws IOException {
+    return verifyContainerLogs(logAggregationService, appId,
+        expectedContainerIds, expectedContainerIds.length,
+        expectedContainerIds.length, logFiles, numOfLogsPerContainer,
+        multiLogs);
+  }
+
+  // expectedContainerIds is the minimal set of containers to check.
+  // The actual list of containers could be more than that.
+  // Verify the size of the actual list is in the range of
+  // [minNumOfContainers, maxNumOfContainers].
+  private LogFileStatusInLastCycle verifyContainerLogs(
+      LogAggregationService logAggregationService,
+      ApplicationId appId, ContainerId[] expectedContainerIds,
+      int minNumOfContainers, int maxNumOfContainers,
+      String[] logFiles, int numOfLogsPerContainer, boolean multiLogs)
+    throws IOException {
     Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, this.user);
     RemoteIterator<FileStatus> nodeFiles = null;
     try {
@@ -780,6 +821,10 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     } catch (FileNotFoundException fnf) {
       Assert.fail("Should have log files");
     }
+    if (numOfLogsPerContainer == 0) {
+      Assert.assertTrue(!nodeFiles.hasNext());
+      return null;
+    }
 
     Assert.assertTrue(nodeFiles.hasNext());
     FileStatus targetNodeFile = null;
@@ -865,11 +910,14 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
       }
 
       // 1 for each container
-      Assert.assertEquals(expectedContainerIds.length, logMap.size());
+      Assert.assertTrue("number of containers with logs should be at least " +
+          minNumOfContainers,logMap.size() >= minNumOfContainers);
+      Assert.assertTrue("number of containers with logs should be at most " +
+          minNumOfContainers,logMap.size() <= maxNumOfContainers);
       for (ContainerId cId : expectedContainerIds) {
         String containerStr = ConverterUtils.toString(cId);
         Map<String, String> thisContainerMap = logMap.remove(containerStr);
-        Assert.assertEquals(numOfContainerLogs, thisContainerMap.size());
+        Assert.assertEquals(numOfLogsPerContainer, thisContainerMap.size());
         for (String fileType : logFiles) {
           String expectedValue =
               containerStr + " Hello " + fileType + "!End of LogType:"
@@ -882,8 +930,15 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
         }
         Assert.assertEquals(0, thisContainerMap.size());
       }
-      Assert.assertEquals(0, logMap.size());
-      return new LogFileStatusInLastCycle(targetNodeFile.getPath().getName(), fileTypes);
+      Assert.assertTrue("number of remaining containers should be at least " +
+          (minNumOfContainers - expectedContainerIds.length),
+          logMap.size() >= minNumOfContainers - expectedContainerIds.length);
+      Assert.assertTrue("number of remaining containers should be at most " +
+          (maxNumOfContainers - expectedContainerIds.length),
+          logMap.size() <= maxNumOfContainers - expectedContainerIds.length);
+
+      return new LogFileStatusInLastCycle(targetNodeFile.getPath().getName(),
+          fileTypes);
     } finally {
       reader.close();
     }
@@ -991,9 +1046,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     logAggregationService.start();
 
     ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
+    LogAggregationContext contextWithAllContainers =
+        Records.newRecord(LogAggregationContext.class);
+    contextWithAllContainers.setLogAggregationPolicyClassName(
+        AllContainerLogAggregationPolicy.class.getName());
     logAggregationService.handle(new LogHandlerAppStartedEvent(
-            application1, this.user, null,
-            ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+        application1, this.user, null, this.acls, contextWithAllContainers));
 
     logAggregationService.stop();
     assertEquals(0, logAggregationService.getNumAggregators());
@@ -1015,8 +1073,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
 
     ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
     logAggregationService.handle(new LogHandlerAppStartedEvent(
-            application1, this.user, null,
-            ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+            application1, this.user, null, this.acls));
 
     logAggregationService.handle(new LogHandlerAppFinishedEvent(application1));
     dispatcher.await();
@@ -1216,12 +1273,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
         new File(localLogDir, ConverterUtils.toString(application1));
     appLogDir1.mkdir();
     logAggregationService.handle(new LogHandlerAppStartedEvent(application1,
-      this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls,
+      this.user, null, this.acls,
       logAggregationContextWithIncludePatterns));
 
     ApplicationAttemptId appAttemptId1 =
         BuilderUtils.newApplicationAttemptId(application1, 1);
-    ContainerId container1 = BuilderUtils.newContainerId(appAttemptId1, 1);
+    ContainerId container1 = createContainer(appAttemptId1, 1,
+        ContainerType.APPLICATION_MASTER);
 
     // Simulate log-file creation
     writeContainerLogs(appLogDir1, container1, new String[] { "stdout",
@@ -1239,10 +1297,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     File app2LogDir =
         new File(localLogDir, ConverterUtils.toString(application2));
     app2LogDir.mkdir();
+    LogAggregationContextWithExcludePatterns.setLogAggregationPolicyClassName(
+        AMOnlyLogAggregationPolicy.class.getName());
     logAggregationService.handle(new LogHandlerAppStartedEvent(application2,
-      this.user, null, ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY,
-      this.acls, LogAggregationContextWithExcludePatterns));
-    ContainerId container2 = BuilderUtils.newContainerId(appAttemptId2, 1);
+      this.user, null, this.acls, LogAggregationContextWithExcludePatterns));
+    ContainerId container2 = createContainer(appAttemptId2, 1,
+        ContainerType.APPLICATION_MASTER);
 
     writeContainerLogs(app2LogDir, container2, new String[] { "stdout",
         "stderr", "syslog" });
@@ -1262,10 +1322,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     File app3LogDir =
         new File(localLogDir, ConverterUtils.toString(application3));
     app3LogDir.mkdir();
+    context1.setLogAggregationPolicyClassName(
+        AMOnlyLogAggregationPolicy.class.getName());
     logAggregationService.handle(new LogHandlerAppStartedEvent(application3,
-      this.user, null, ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY,
-      this.acls, context1));
-    ContainerId container3 = BuilderUtils.newContainerId(appAttemptId3, 1);
+      this.user, null, this.acls, context1));
+    ContainerId container3 = createContainer(appAttemptId3, 1,
+        ContainerType.APPLICATION_MASTER);
     writeContainerLogs(app3LogDir, container3, new String[] { "stdout",
         "sys.log", "std.log", "out.log", "err.log", "log" });
     logAggregationService.handle(
@@ -1285,10 +1347,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     File app4LogDir =
         new File(localLogDir, ConverterUtils.toString(application4));
     app4LogDir.mkdir();
+    context2.setLogAggregationPolicyClassName(
+        AMOnlyLogAggregationPolicy.class.getName());
     logAggregationService.handle(new LogHandlerAppStartedEvent(application4,
-      this.user, null, ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY,
-      this.acls, context2));
-    ContainerId container4 = BuilderUtils.newContainerId(appAttemptId4, 1);
+      this.user, null, this.acls, context2));
+    ContainerId container4 = createContainer(appAttemptId4, 1,
+        ContainerType.APPLICATION_MASTER);
     writeContainerLogs(app4LogDir, container4, new String[] { "stdout",
         "sys.log", "std.log", "out.log", "err.log", "log" });
     logAggregationService.handle(
@@ -1346,6 +1410,471 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
       "getApplicationID");
   }
 
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testNoneContainerPolicy() throws Exception {
+    ApplicationId appId = createApplication();
+    // LogContext specifies policy to not aggregate any container logs
+    LogAggregationService logAggregationService = createLogAggregationService(
+        appId, NoneContainerLogAggregationPolicy.class, null);
+
+    String[] logFiles = new String[] { "stdout" };
+    ContainerId container1 = finishContainer(appId, logAggregationService,
+        ContainerType.APPLICATION_MASTER, 1, 0, logFiles);
+
+    finishApplication(appId, logAggregationService);
+
+    verifyContainerLogs(logAggregationService, appId,
+        new ContainerId[] { container1 }, logFiles, 0, false);
+
+    verifyLogAggFinishEvent(appId);
+  }
+
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testFailedContainerPolicy() throws Exception {
+    ApplicationId appId = createApplication();
+    LogAggregationService logAggregationService = createLogAggregationService(
+        appId, FailedContainerLogAggregationPolicy.class, null);
+
+    String[] logFiles = new String[] { "stdout" };
+    ContainerId container1 = finishContainer(
+        appId, logAggregationService, ContainerType.APPLICATION_MASTER, 1, 1,
+            logFiles);
+    finishContainer(appId, logAggregationService, ContainerType.TASK, 2, 0,
+        logFiles);
+    finishContainer(appId, logAggregationService, ContainerType.TASK, 3,
+        ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode(), logFiles);
+
+    finishApplication(appId, logAggregationService);
+
+    verifyContainerLogs(logAggregationService, appId,
+        new ContainerId[] { container1 }, logFiles, 1, false);
+
+    verifyLogAggFinishEvent(appId);
+  }
+
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testAMOrFailedContainerPolicy() throws Exception {
+    ApplicationId appId = createApplication();
+    LogAggregationService logAggregationService = createLogAggregationService(
+        appId, AMOrFailedContainerLogAggregationPolicy.class, null);
+
+    String[] logFiles = new String[] { "stdout" };
+    ContainerId container1 = finishContainer(
+        appId, logAggregationService, ContainerType.APPLICATION_MASTER, 1, 0,
+            logFiles);
+    ContainerId container2= finishContainer(appId,
+        logAggregationService, ContainerType.TASK, 2, 1, logFiles);
+    finishContainer(appId, logAggregationService, ContainerType.TASK, 3,
+        ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode(), logFiles);
+
+    finishApplication(appId, logAggregationService);
+
+    verifyContainerLogs(logAggregationService, appId,
+        new ContainerId[] { container1, container2 }, logFiles, 1, false);
+
+    verifyLogAggFinishEvent(appId);
+  }
+
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testFailedOrKilledContainerPolicy() throws Exception {
+    ApplicationId appId = createApplication();
+    LogAggregationService logAggregationService = createLogAggregationService(
+        appId, FailedOrKilledContainerLogAggregationPolicy.class, null);
+
+    String[] logFiles = new String[] { "stdout" };
+    finishContainer(appId, logAggregationService, ContainerType.APPLICATION_MASTER, 1, 0,
+        logFiles);
+    ContainerId container2 = finishContainer(appId,
+        logAggregationService, ContainerType.TASK, 2, 1, logFiles);
+    ContainerId container3 = finishContainer(appId, logAggregationService,
+        ContainerType.TASK, 3,
+        ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode(), logFiles);
+
+    finishApplication(appId, logAggregationService);
+
+    verifyContainerLogs(logAggregationService, appId,
+        new ContainerId[] { container2, container3 }, logFiles, 1, false);
+
+    verifyLogAggFinishEvent(appId);
+  }
+
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testAMOnlyContainerPolicy() throws Exception {
+    ApplicationId appId = createApplication();
+    LogAggregationService logAggregationService = createLogAggregationService(
+        appId, AMOnlyLogAggregationPolicy.class, null);
+
+    String[] logFiles = new String[] { "stdout" };
+    ContainerId container1 = finishContainer(appId, logAggregationService,
+        ContainerType.APPLICATION_MASTER, 1, 0, logFiles);
+    finishContainer(appId, logAggregationService, ContainerType.TASK, 2, 1,
+        logFiles);
+    finishContainer(appId, logAggregationService, ContainerType.TASK, 3, 0,
+        logFiles);
+
+    finishApplication(appId, logAggregationService);
+
+    verifyContainerLogs(logAggregationService, appId,
+        new ContainerId[] { container1 }, logFiles, 1, false);
+
+    verifyLogAggFinishEvent(appId);
+  }
+
+  // Test sample container policy with an app that has
+  // the same number of successful containers as
+  // SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD.
+  // and verify all those containers' logs are aggregated.
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testSampleContainerPolicyWithSmallApp() throws Exception {
+    setupAndTestSampleContainerPolicy(
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD,
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_RATE,
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD,
+        false);
+  }
+
+  // Test sample container policy with an app that has
+  // more successful containers than
+  // SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD.
+  // and verify some of those containers' logs are aggregated.
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testSampleContainerPolicyWithLargeApp() throws Exception {
+    setupAndTestSampleContainerPolicy(
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD * 10,
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_RATE,
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD,
+        false);
+  }
+
+  // Test sample container policy with zero sample rate.
+  // and verify there is no sampling beyond the MIN_THRESHOLD containers.
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testSampleContainerPolicyWithZeroSampleRate() throws Exception {
+    setupAndTestSampleContainerPolicy(
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD * 10,
+        0, SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD,
+        false);
+  }
+
+  // Test sample container policy with 100 percent sample rate.
+  // and verify all containers' logs are aggregated.
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testSampleContainerPolicyWith100PercentSampleRate()
+      throws Exception {
+    setupAndTestSampleContainerPolicy(
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD * 10,
+        1.0f,
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD,
+        false);
+  }
+
+  // Test sample container policy with zero min threshold.
+  // and verify some containers' logs are aggregated.
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testSampleContainerPolicyWithZeroMinThreshold()
+      throws Exception {
+    setupAndTestSampleContainerPolicy(
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD * 10,
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_RATE, 0, false);
+  }
+
+  // Test sample container policy with customized settings.
+  // and verify some containers' logs are aggregated.
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testSampleContainerPolicyWithCustomizedSettings()
+      throws Exception {
+    setupAndTestSampleContainerPolicy(500, 0.5f, 50, false);
+  }
+
+  // Test cluster-wide sample container policy.
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testClusterSampleContainerPolicy()
+      throws Exception {
+    setupAndTestSampleContainerPolicy(500, 0.5f, 50, true);
+  }
+
+  // Test the default cluster-wide sample container policy.
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testDefaultClusterSampleContainerPolicy() throws Exception {
+    setupAndTestSampleContainerPolicy(
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD * 10,
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_RATE,
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD,
+        true);
+  }
+
+  // The application specifies invalid policy class
+  // NM should fallback to the default policy which is to aggregate all
+  // containers.
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testInvalidPolicyClassName() throws Exception {
+    ApplicationId appId = createApplication();
+    LogAggregationService logAggregationService = createLogAggregationService(
+        appId, "foo", null, true);
+    verifyDefaultPolicy(appId, logAggregationService);
+  }
+
+  // The application specifies LogAggregationContext, but not policy class.
+  // NM should fallback to the default policy which is to aggregate all
+  // containers.
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testNullPolicyClassName() throws Exception {
+    ApplicationId appId = createApplication();
+    LogAggregationService logAggregationService = createLogAggregationService(
+        appId, null, null, true);
+    verifyDefaultPolicy(appId, logAggregationService);
+  }
+
+  // The application doesn't specifies LogAggregationContext.
+  // NM should fallback to the default policy which is to aggregate all
+  // containers.
+  @Test (timeout = 50000)
+  @SuppressWarnings("unchecked")
+  public void testDefaultPolicyWithoutLogAggregationContext()
+      throws Exception {
+    ApplicationId appId = createApplication();
+    LogAggregationService logAggregationService = createLogAggregationService(
+        appId, null, null, false);
+    verifyDefaultPolicy(appId, logAggregationService);
+  }
+
+  private void verifyDefaultPolicy(ApplicationId appId,
+      LogAggregationService logAggregationService) throws Exception {
+    String[] logFiles = new String[] { "stdout" };
+    ContainerId container1 = finishContainer(
+        appId, logAggregationService, ContainerType.APPLICATION_MASTER, 1, 0,
+            logFiles);
+    ContainerId container2 = finishContainer(appId,
+        logAggregationService, ContainerType.TASK, 2, 1, logFiles);
+    ContainerId container3 = finishContainer(appId, logAggregationService,
+        ContainerType.TASK, 3,
+        ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode(), logFiles);
+
+    finishApplication(appId, logAggregationService);
+
+    verifyContainerLogs(logAggregationService, appId,
+        new ContainerId[] { container1, container2, container3 },
+            logFiles, 1, false);
+
+    verifyLogAggFinishEvent(appId);
+  }
+
+  // If enableAtClusterLevel is false, the policy is set up via
+  // LogAggregationContext at the application level. If it is true,
+  // the policy is set up via Configuration at the cluster level.
+  private void setupAndTestSampleContainerPolicy(int successfulContainers,
+      float sampleRate, int minThreshold, boolean enableAtClusterLevel)
+      throws Exception {
+    ApplicationId appId = createApplication();
+    String policyParameters = null;
+    if (sampleRate != SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_RATE
+        || minThreshold !=
+        SampleContainerLogAggregationPolicy.DEFAULT_SAMPLE_MIN_THRESHOLD) {
+      policyParameters = SampleContainerLogAggregationPolicy.buildParameters(
+          sampleRate, minThreshold);
+    }
+
+    if (enableAtClusterLevel) {
+      this.conf.set(YarnConfiguration.NM_LOG_AGG_POLICY_CLASS,
+          SampleContainerLogAggregationPolicy.class.getName());
+      if (policyParameters != null) {
+        this.conf.set(YarnConfiguration.NM_LOG_AGG_POLICY_CLASS_PARAMETERS,
+            policyParameters);
+      }
+    }
+    LogAggregationService logAggregationService = createLogAggregationService(
+        appId, SampleContainerLogAggregationPolicy.class.getName(),
+            policyParameters, !enableAtClusterLevel);
+
+    ArrayList<ContainerId> containerIds = new ArrayList<ContainerId>();
+    String[] logFiles = new String[] { "stdout" };
+    int cid = 1;
+    // AM container
+    containerIds.add(finishContainer(appId, logAggregationService,
+        ContainerType.APPLICATION_MASTER, cid++, 0, logFiles));
+    // Successful containers
+    // We expect the minThreshold containers will be log aggregated.
+    if (minThreshold > 0) {
+      containerIds.addAll(finishContainers(appId, logAggregationService, cid,
+          (successfulContainers > minThreshold) ? minThreshold :
+              successfulContainers, 0, logFiles));
+    }
+    cid = containerIds.size() + 1;
+    if (successfulContainers > minThreshold) {
+      List<ContainerId> restOfSuccessfulContainers = finishContainers(appId,
+          logAggregationService, cid, successfulContainers - minThreshold, 0,
+          logFiles);
+      cid += successfulContainers - minThreshold;
+      // If the sample rate is 100 percent, restOfSuccessfulContainers will be
+      // all be log aggregated.
+      if (sampleRate == 1.0) {
+        containerIds.addAll(restOfSuccessfulContainers);
+      }
+    }
+    // Failed container
+    containerIds.add(finishContainer(appId, logAggregationService,
+        ContainerType.TASK, cid++, 1, logFiles));
+    // Killed container
+    containerIds.add(finishContainer(appId, logAggregationService,
+        ContainerType.TASK, cid++,
+        ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode(), logFiles));
+
+    finishApplication(appId, logAggregationService);
+
+    // The number of containers with logs should be 3(AM + failed + killed) +
+    // DEFAULT_SAMPLE_MIN_THRESHOLD +
+    // ( successfulContainers - DEFAULT_SAMPLE_MIN_THRESHOLD ) * SAMPLE_RATE
+    // Due to the sampling nature, the exact number could vary.
+    // So we only check for a range.
+    // For the cases where successfulContainers is the same as minThreshold
+    // or sampleRate is zero, minOfContainersWithLogs and
+    // maxOfContainersWithLogs will the same.
+    int minOfContainersWithLogs = 3 + minThreshold +
+        (int)((successfulContainers - minThreshold) * sampleRate / 2);
+    int maxOfContainersWithLogs = 3 + minThreshold +
+        (int)((successfulContainers - minThreshold) * sampleRate * 2);
+    verifyContainerLogs(logAggregationService, appId,
+        containerIds.toArray(new ContainerId[containerIds.size()]),
+        minOfContainersWithLogs, maxOfContainersWithLogs,
+        logFiles, 1, false);
+
+    verifyLogAggFinishEvent(appId);
+  }
+
+  private ApplicationId createApplication() {
+    this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
+    this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+        this.remoteRootLogDir.getAbsolutePath());
+
+    ApplicationId appId = BuilderUtils.newApplicationId(1234, 1);
+    Application mockApp = mock(Application.class);
+    when(mockApp.getContainers()).thenReturn(
+        new HashMap<ContainerId, Container>());
+
+    this.context.getApplications().put(appId, mockApp);
+    return appId;
+  }
+
+  private LogAggregationService createLogAggregationService(
+      ApplicationId appId,
+      Class<? extends ContainerLogAggregationPolicy> policy,
+      String parameters) {
+    return createLogAggregationService(appId, policy.getName(), parameters,
+        true);
+  }
+
+  private LogAggregationService createLogAggregationService(
+      ApplicationId appId, String className, String parameters,
+      boolean createLogAggContext) {
+    ConcurrentHashMap<ContainerId, Container> containers =
+        new ConcurrentHashMap<ContainerId, Container>();
+    LogAggregationService logAggregationService =
+        new LogAggregationService(dispatcher, this.context, this.delSrvc,
+            super.dirsHandler);
+    logAggregationService.init(this.conf);
+    logAggregationService.start();
+    LogAggregationContext logAggContext = null;
+
+    if (createLogAggContext) {
+      logAggContext = Records.newRecord(LogAggregationContext.class);
+      logAggContext.setLogAggregationPolicyClassName(className);
+      if (parameters != null) {
+        logAggContext.setLogAggregationPolicyParameters(parameters);
+      }
+    }
+    logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
+        this.user, null, this.acls, logAggContext));
+
+    return logAggregationService;
+  }
+
+  private ContainerId createContainer(ApplicationAttemptId appAttemptId1,
+      long cId, ContainerType containerType) {
+    ContainerId containerId = BuilderUtils.newContainerId(appAttemptId1,
+        cId);
+    Resource r = BuilderUtils.newResource(1024, 1);
+    ContainerTokenIdentifier containerToken = new ContainerTokenIdentifier(
+        containerId, context.getNodeId().toString(), user, r,
+        System.currentTimeMillis() + 100000L, 123, DUMMY_RM_IDENTIFIER,
+        Priority.newInstance(0), 0, null, null, containerType);
+    Container container = mock(Container.class);
+    context.getContainers().put(containerId, container);
+    when(container.getContainerTokenIdentifier()).thenReturn(containerToken);
+    when(container.getContainerId()).thenReturn(containerId);
+    return containerId;
+  }
+
+  private ContainerId finishContainer(ApplicationId application1,
+      LogAggregationService logAggregationService, ContainerType containerType,
+      long cId, int exitCode, String[] logFiles) throws IOException {
+    ApplicationAttemptId appAttemptId1 =
+        BuilderUtils.newApplicationAttemptId(application1, 1);
+    ContainerId containerId = createContainer(appAttemptId1, cId,
+        containerType);
+    // Simulate log-file creation
+    File appLogDir1 =
+        new File(localLogDir, ConverterUtils.toString(application1));
+    appLogDir1.mkdir();
+    writeContainerLogs(appLogDir1, containerId, logFiles);
+
+    logAggregationService.handle(new LogHandlerContainerFinishedEvent(
+        containerId, exitCode));
+    return containerId;
+
+  }
+
+  private List<ContainerId> finishContainers(ApplicationId appId,
+      LogAggregationService logAggregationService, long startingCid, int count,
+      int exitCode, String[] logFiles) throws IOException {
+    ArrayList<ContainerId> containerIds = new ArrayList<ContainerId>();
+    for (long cid = startingCid; cid < startingCid + count; cid++) {
+      containerIds.add(finishContainer(
+          appId, logAggregationService, ContainerType.TASK, cid, exitCode,
+              logFiles));
+    }
+    return containerIds;
+  }
+
+  private void finishApplication(ApplicationId appId,
+      LogAggregationService logAggregationService) throws Exception {
+    dispatcher.await();
+    ApplicationEvent expectedInitEvents[] =
+        new ApplicationEvent[] { new ApplicationEvent(appId,
+            ApplicationEventType.APPLICATION_LOG_HANDLING_INITED) };
+    checkEvents(appEventHandler, expectedInitEvents, false, "getType",
+        "getApplicationID");
+    reset(appEventHandler);
+
+    logAggregationService.handle(new LogHandlerAppFinishedEvent(appId));
+    logAggregationService.stop();
+    assertEquals(0, logAggregationService.getNumAggregators());
+  }
+
+  private void verifyLogAggFinishEvent(ApplicationId appId) throws Exception {
+    dispatcher.await();
+
+    ApplicationEvent[] expectedFinishedEvents =
+        new ApplicationEvent[] { new ApplicationEvent(appId,
+            ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED) };
+    checkEvents(appEventHandler, expectedFinishedEvents, false, "getType",
+            "getApplicationID");
+  }
+
   @Test (timeout = 50000)
   public void testLogAggregationServiceWithInterval() throws Exception {
     testLogAggregationService(false);
@@ -1391,17 +1920,14 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     ApplicationId application = BuilderUtils.newApplicationId(123456, 1);
     ApplicationAttemptId appAttemptId =
         BuilderUtils.newApplicationAttemptId(application, 1);
-    ContainerId container = BuilderUtils.newContainerId(appAttemptId, 1);
+    ContainerId container = createContainer(appAttemptId, 1,
+        ContainerType.APPLICATION_MASTER);
 
-    Context context = spy(this.context);
     ConcurrentMap<ApplicationId, Application> maps =
-        new ConcurrentHashMap<ApplicationId, Application>();
+        this.context.getApplications();
     Application app = mock(Application.class);
-    Map<ContainerId, Container> containers = new HashMap<ContainerId, Container>();
-    containers.put(container, mock(Container.class));
     maps.put(application, app);
-    when(app.getContainers()).thenReturn(containers);
-    when(context.getApplications()).thenReturn(maps);
+    when(app.getContainers()).thenReturn(this.context.getContainers());
 
     LogAggregationService logAggregationService =
         new LogAggregationService(dispatcher, context, this.delSrvc,
@@ -1415,8 +1941,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
         new File(localLogDir, ConverterUtils.toString(application));
     appLogDir.mkdir();
     logAggregationService.handle(new LogHandlerAppStartedEvent(application,
-      this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls,
-      logAggregationContextWithInterval));
+      this.user, null, this.acls, logAggregationContextWithInterval));
 
     LogFileStatusInLastCycle logFileStatusInLastCycle = null;
     // Simulate log-file creation
@@ -1536,7 +2061,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     logAggregationService.init(this.conf);
     logAggregationService.start();
     logAggregationService.handle(new LogHandlerAppStartedEvent(application1,
-      this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls,
+      this.user, null, this.acls,
       Records.newRecord(LogAggregationContext.class)));
 
     // Inject new token for log-aggregation after app log-aggregator init

+ 4 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java

@@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.InlineDispatcher;
-import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@@ -148,8 +147,7 @@ public class TestNonAggregatingLogHandler {
     logHandler.init(conf);
     logHandler.start();
 
-    logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
-        ContainerLogsRetentionPolicy.ALL_CONTAINERS, null));
+    logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, null));
 
     logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
 
@@ -189,8 +187,7 @@ public class TestNonAggregatingLogHandler {
     logHandler.init(conf);
     logHandler.start();
 
-    logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
-        ContainerLogsRetentionPolicy.ALL_CONTAINERS, null));
+    logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, null));
 
     logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
 
@@ -357,8 +354,7 @@ public class TestNonAggregatingLogHandler {
     logHandler.init(conf);
     logHandler.start();
 
-    logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
-        ContainerLogsRetentionPolicy.ALL_CONTAINERS, null));
+    logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, null));
     logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
     logHandler.handle(new LogHandlerAppFinishedEvent(appId));
 
@@ -445,7 +441,7 @@ public class TestNonAggregatingLogHandler {
     doReturn(localLogDirPaths).when(dirsHandler).getLogDirsForCleanup();
 
     logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
-      ContainerLogsRetentionPolicy.ALL_CONTAINERS, appAcls));
+        appAcls));
 
     // test case where some dirs have the log dir to delete
     // mock some dirs throwing various exceptions

+ 9 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java

@@ -231,16 +231,22 @@ public class TestContainerAllocation {
         LogAggregationContext.newInstance(
           "includePattern", "excludePattern",
           "rolledLogsIncludePattern",
-          "rolledLogsExcludePattern");
+          "rolledLogsExcludePattern",
+          "policyClass",
+          "policyParameters");
     LogAggregationContext returned =
         getLogAggregationContextFromContainerToken(rm1, nm2,
           logAggregationContext);
     Assert.assertEquals("includePattern", returned.getIncludePattern());
     Assert.assertEquals("excludePattern", returned.getExcludePattern());
     Assert.assertEquals("rolledLogsIncludePattern",
-      returned.getRolledLogsIncludePattern());
+        returned.getRolledLogsIncludePattern());
     Assert.assertEquals("rolledLogsExcludePattern",
-      returned.getRolledLogsExcludePattern());
+        returned.getRolledLogsExcludePattern());
+    Assert.assertEquals("policyClass",
+        returned.getLogAggregationPolicyClassName());
+    Assert.assertEquals("policyParameters",
+        returned.getLogAggregationPolicyParameters());
     rm1.stop();
   }