Browse Source

YARN-4389. Allow application to enable or disable am blacklisting. (Sunil G via rohithsharmaks)

rohithsharmaks 9 years ago
parent
commit
f7736f464f

+ 4 - 0
hadoop-yarn-project/CHANGES.txt

@@ -1255,6 +1255,10 @@ Release 2.8.0 - UNRELEASED
     YARN-4534. Remove the redundant symbol in yarn rmadmin help msg.
     YARN-4534. Remove the redundant symbol in yarn rmadmin help msg.
     (Lin Yiqun via aajisaka)
     (Lin Yiqun via aajisaka)
 
 
+    YARN-4389. "yarn.am.blacklisting.enabled" and "yarn.am.blacklisting.disable-
+    failure-threshold" should be app specific rather than a setting for whole 
+    YARN cluster. (Sunil G via rohithsharmaks)
+
 Release 2.7.3 - UNRELEASED
 Release 2.7.3 - UNRELEASED
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 67 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMBlackListingRequest.java

@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * Specific AMBlacklistingRequest from AM to enable/disable blacklisting.
+ */
+@Public
+@Evolving
+public abstract class AMBlackListingRequest {
+
+  @Private
+  @Unstable
+  public static AMBlackListingRequest newInstance(
+      boolean isAMBlackListingEnabled, float disableFailureThreshold) {
+    AMBlackListingRequest blackListRequest = Records
+        .newRecord(AMBlackListingRequest.class);
+    blackListRequest.setBlackListingEnabled(isAMBlackListingEnabled);
+    blackListRequest
+        .setBlackListingDisableFailureThreshold(disableFailureThreshold);
+    return blackListRequest;
+  }
+
+  /**
+   * @return AM Blacklisting is enabled.
+   */
+  @Public
+  @Evolving
+  public abstract boolean isAMBlackListingEnabled();
+
+  /**
+   * @return AM Blacklisting disable failure threshold
+   */
+  @Public
+  @Evolving
+  public abstract float getBlackListingDisableFailureThreshold();
+
+  @Private
+  @Unstable
+  public abstract void setBlackListingEnabled(boolean isAMBlackListingEnabled);
+
+  @Private
+  @Unstable
+  public abstract void setBlackListingDisableFailureThreshold(
+      float disableFailureThreshold);
+}

+ 23 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java

@@ -535,4 +535,27 @@ public abstract class ApplicationSubmissionContext {
   @Public
   @Public
   @Unstable
   @Unstable
   public abstract void setReservationID(ReservationId reservationID);
   public abstract void setReservationID(ReservationId reservationID);
+
+  /**
+   * Get AM Blacklisting request object to know whether application needs any
+   * specific blacklisting for AM Nodes.
+   *
+   * @return AMBlackListingRequest object which has blacklisting information.
+   */
+  @Public
+  @Unstable
+  public abstract AMBlackListingRequest getAMBlackListRequest();
+
+  /**
+   * Get AM Blacklisting request object to know whether application needs any
+   * specific blacklisting for AM Nodes.
+   *
+   * @param blackListRequest
+   *          object which has blacklisting information such as
+   *          "enable/disable AM blacklisting" and "disable failure threshold".
+   */
+  @Public
+  @Unstable
+  public abstract void setAMBlackListRequest(
+      AMBlackListingRequest blackListRequest);
 }
 }

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto

@@ -289,6 +289,10 @@ enum ExecutionTypeProto {
   OPPORTUNISTIC = 2;
   OPPORTUNISTIC = 2;
 }
 }
 
 
+message AMBlackListingRequestProto {
+  optional bool blacklisting_enabled = 1 [default = false];
+  optional float blacklisting_failure_threshold = 2;
+}
 ////////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////////
 ////// From AM_RM_Protocol /////////////////////////////////////////////
 ////// From AM_RM_Protocol /////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////////
@@ -354,6 +358,7 @@ message ApplicationSubmissionContextProto {
   optional ReservationIdProto reservation_id = 15;
   optional ReservationIdProto reservation_id = 15;
   optional string node_label_expression = 16;
   optional string node_label_expression = 16;
   optional ResourceRequestProto am_container_resource_request = 17;
   optional ResourceRequestProto am_container_resource_request = 17;
+  optional AMBlackListingRequestProto am_blacklisting_request = 18;
 }
 }
 
 
 message LogAggregationContextProto {
 message LogAggregationContextProto {

+ 104 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMBlackListingRequestPBImpl.java

@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.AMBlackListingRequest;
+import org.apache.hadoop.yarn.proto.YarnProtos.AMBlackListingRequestProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.AMBlackListingRequestProtoOrBuilder;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class AMBlackListingRequestPBImpl extends AMBlackListingRequest {
+  AMBlackListingRequestProto proto = AMBlackListingRequestProto
+      .getDefaultInstance();
+  AMBlackListingRequestProto.Builder builder = null;
+  boolean viaProto = false;
+
+  public AMBlackListingRequestPBImpl() {
+    builder = AMBlackListingRequestProto.newBuilder();
+  }
+
+  public AMBlackListingRequestPBImpl(AMBlackListingRequestProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public AMBlackListingRequestProto getProto() {
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = AMBlackListingRequestProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public boolean isAMBlackListingEnabled() {
+    AMBlackListingRequestProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getBlacklistingEnabled();
+  }
+
+  @Override
+  public float getBlackListingDisableFailureThreshold() {
+    AMBlackListingRequestProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getBlacklistingFailureThreshold();
+  }
+
+  @Override
+  public void setBlackListingEnabled(boolean isAMBlackListingEnabled) {
+    maybeInitBuilder();
+    builder.setBlacklistingEnabled(isAMBlackListingEnabled);
+  }
+
+  @Override
+  public void setBlackListingDisableFailureThreshold(
+      float disableFailureThreshold) {
+    maybeInitBuilder();
+    builder.setBlacklistingFailureThreshold(disableFailureThreshold);
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+}

+ 40 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java

@@ -24,6 +24,7 @@ import java.util.Set;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.AMBlackListingRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -33,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnProtos.AMBlackListingRequestProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder;
@@ -63,6 +65,7 @@ extends ApplicationSubmissionContext {
   private ResourceRequest amResourceRequest = null;
   private ResourceRequest amResourceRequest = null;
   private LogAggregationContext logAggregationContext = null;
   private LogAggregationContext logAggregationContext = null;
   private ReservationId reservationId = null;
   private ReservationId reservationId = null;
+  private AMBlackListingRequest amBlackListRequest = null;
 
 
   public ApplicationSubmissionContextPBImpl() {
   public ApplicationSubmissionContextPBImpl() {
     builder = ApplicationSubmissionContextProto.newBuilder();
     builder = ApplicationSubmissionContextProto.newBuilder();
@@ -131,6 +134,10 @@ extends ApplicationSubmissionContext {
     if (this.reservationId != null) {
     if (this.reservationId != null) {
       builder.setReservationId(convertToProtoFormat(this.reservationId));
       builder.setReservationId(convertToProtoFormat(this.reservationId));
     }
     }
+    if (this.amBlackListRequest != null) {
+      builder.setAmBlacklistingRequest(
+          convertToProtoFormat(this.amBlackListRequest));
+    }
   }
   }
 
 
   private void mergeLocalToProto() {
   private void mergeLocalToProto() {
@@ -413,6 +420,29 @@ extends ApplicationSubmissionContext {
     return p.getKeepContainersAcrossApplicationAttempts();
     return p.getKeepContainersAcrossApplicationAttempts();
   }
   }
 
 
+  @Override
+  public AMBlackListingRequest getAMBlackListRequest() {
+    ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
+    if (amBlackListRequest != null) {
+      return amBlackListRequest;
+    }
+    if (!p.hasAmBlacklistingRequest()) {
+      return null;
+    }
+    amBlackListRequest = convertFromProtoFormat(p.getAmBlacklistingRequest());
+    return amBlackListRequest;
+  }
+
+  @Override
+  public void setAMBlackListRequest(AMBlackListingRequest amBlackListRequest) {
+    maybeInitBuilder();
+    if (amBlackListRequest == null) {
+      builder.clearAmBlacklistingRequest();
+      return;
+    }
+    this.amBlackListRequest = amBlackListRequest;
+  }
+
   private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
   private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
     return new PriorityPBImpl(p);
     return new PriorityPBImpl(p);
   }
   }
@@ -455,6 +485,16 @@ extends ApplicationSubmissionContext {
     return ((ResourcePBImpl)t).getProto();
     return ((ResourcePBImpl)t).getProto();
   }
   }
 
 
+  private AMBlackListingRequestPBImpl convertFromProtoFormat(
+      AMBlackListingRequestProto a) {
+    return new AMBlackListingRequestPBImpl(a);
+  }
+
+  private AMBlackListingRequestProto convertToProtoFormat(
+      AMBlackListingRequest a) {
+    return ((AMBlackListingRequestPBImpl) a).getProto();
+  }
+
   @Override
   @Override
   public String getNodeLabelExpression() {
   public String getNodeLabelExpression() {
     ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
     ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java

@@ -107,6 +107,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersReso
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.AMBlackListingRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -148,6 +149,7 @@ import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.api.records.impl.pb.AMBlackListingRequestPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptReportPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptReportPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
@@ -181,6 +183,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.StrictPreemptionContractPBImpl
 import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.URLPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.URLPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.YarnClusterMetricsPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.YarnClusterMetricsPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.AMBlackListingRequestProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptReportProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptReportProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
@@ -498,6 +501,7 @@ public class TestPBImplRecords {
     generateByNewInstance(ReservationRequests.class);
     generateByNewInstance(ReservationRequests.class);
     generateByNewInstance(ReservationDefinition.class);
     generateByNewInstance(ReservationDefinition.class);
     generateByNewInstance(ResourceUtilization.class);
     generateByNewInstance(ResourceUtilization.class);
+    generateByNewInstance(AMBlackListingRequest.class);
   }
   }
 
 
   private class GetSetPair {
   private class GetSetPair {
@@ -1319,4 +1323,10 @@ public class TestPBImplRecords {
     validatePBImplRecord(CheckForDecommissioningNodesResponsePBImpl.class,
     validatePBImplRecord(CheckForDecommissioningNodesResponsePBImpl.class,
         CheckForDecommissioningNodesResponseProto.class);
         CheckForDecommissioningNodesResponseProto.class);
   }
   }
+
+  @Test
+  public void testAMBlackListingRequestPBImpl() throws Exception {
+    validatePBImplRecord(AMBlackListingRequestPBImpl.class,
+        AMBlackListingRequestProto.class);
+  }
 }
 }

+ 51 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -138,8 +138,8 @@ public class RMAppImpl implements RMApp, Recoverable {
   private final Set<String> applicationTags;
   private final Set<String> applicationTags;
 
 
   private final long attemptFailuresValidityInterval;
   private final long attemptFailuresValidityInterval;
-  private final boolean amBlacklistingEnabled;
-  private final float blacklistDisableThreshold;
+  private boolean amBlacklistingEnabled = false;
+  private float blacklistDisableThreshold;
 
 
   private Clock systemClock;
   private Clock systemClock;
 
 
@@ -389,7 +389,9 @@ public class RMAppImpl implements RMApp, Recoverable {
                                                                  stateMachine;
                                                                  stateMachine;
 
 
   private static final int DUMMY_APPLICATION_ATTEMPT_NUMBER = -1;
   private static final int DUMMY_APPLICATION_ATTEMPT_NUMBER = -1;
-  
+  private static final float MINIMUM_THRESHOLD_VALUE = 0.0f;
+  private static final float MAXIMUM_THRESHOLD_VALUE = 1.0f;
+
   public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
   public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
       Configuration config, String name, String user, String queue,
       Configuration config, String name, String user, String queue,
       ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
       ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
@@ -467,16 +469,43 @@ public class RMAppImpl implements RMApp, Recoverable {
         YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
         YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
         YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
         YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
 
 
-    amBlacklistingEnabled = conf.getBoolean(
-        YarnConfiguration.AM_BLACKLISTING_ENABLED,
-        YarnConfiguration.DEFAULT_AM_BLACKLISTING_ENABLED);
-
-    if (amBlacklistingEnabled) {
-      blacklistDisableThreshold = conf.getFloat(
-          YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD,
-          YarnConfiguration.DEFAULT_AM_BLACKLISTING_DISABLE_THRESHOLD);
-    } else {
+    // amBlacklistingEnabled can be configured globally and by each
+    // application.
+    // Case 1: If AMBlackListRequest is available in submission context, we
+    // will consider only app level request (RM level configuration will be
+    // skipped).
+    // Case 2: AMBlackListRequest is available in submission context and
+    // amBlacklisting is disabled. In this case, AM blacklisting wont be
+    // enabled for this app even if this feature is enabled in RM level.
+    // Case 3: AMBlackListRequest is not available through submission context.
+    // RM level AM black listing configuration will be considered.
+    if (null != submissionContext.getAMBlackListRequest()) {
+      amBlacklistingEnabled = submissionContext.getAMBlackListRequest()
+          .isAMBlackListingEnabled();
       blacklistDisableThreshold = 0.0f;
       blacklistDisableThreshold = 0.0f;
+      if (amBlacklistingEnabled) {
+        blacklistDisableThreshold = submissionContext.getAMBlackListRequest()
+            .getBlackListingDisableFailureThreshold();
+
+        // Verify whether blacklistDisableThreshold is valid. And for invalid
+        // threshold, reset to global level blacklistDisableThreshold
+        // configured.
+        if (blacklistDisableThreshold < MINIMUM_THRESHOLD_VALUE
+            || blacklistDisableThreshold > MAXIMUM_THRESHOLD_VALUE) {
+          blacklistDisableThreshold = conf.getFloat(
+              YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD,
+              YarnConfiguration.DEFAULT_AM_BLACKLISTING_DISABLE_THRESHOLD);
+        }
+      }
+    } else {
+      amBlacklistingEnabled = conf.getBoolean(
+          YarnConfiguration.AM_BLACKLISTING_ENABLED,
+          YarnConfiguration.DEFAULT_AM_BLACKLISTING_ENABLED);
+      if (amBlacklistingEnabled) {
+        blacklistDisableThreshold = conf.getFloat(
+            YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD,
+            YarnConfiguration.DEFAULT_AM_BLACKLISTING_DISABLE_THRESHOLD);
+      }
     }
     }
   }
   }
 
 
@@ -1755,4 +1784,14 @@ public class RMAppImpl implements RMApp, Recoverable {
     rmContext.getRMApplicationHistoryWriter().applicationStarted(app);
     rmContext.getRMApplicationHistoryWriter().applicationStarted(app);
     rmContext.getSystemMetricsPublisher().appCreated(app, startTime);
     rmContext.getSystemMetricsPublisher().appCreated(app, startTime);
   }
   }
+
+  @VisibleForTesting
+  public boolean isAmBlacklistingEnabled() {
+    return amBlacklistingEnabled;
+  }
+
+  @VisibleForTesting
+  public float getAmBlacklistingDisableThreshold() {
+    return blacklistDisableThreshold;
+  }
 }
 }

+ 58 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java

@@ -42,6 +42,7 @@ import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.MockApps;
+import org.apache.hadoop.yarn.api.records.AMBlackListingRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -1027,6 +1028,63 @@ public class TestRMAppTransitions {
             + "/"));
             + "/"));
   }
   }
 
 
+  @Test
+  public void testAMBlackListConfigFromApp() {
+    // Scenario 1: Application enables AM blacklisting
+    float disableThreshold = 0.9f;
+    conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, false);
+    ApplicationSubmissionContext submissionContext =
+        new ApplicationSubmissionContextPBImpl();
+    submissionContext.setAMBlackListRequest(AMBlackListingRequest.newInstance(
+        true, disableThreshold));
+    RMAppImpl application = (RMAppImpl) createNewTestApp(submissionContext);
+
+    Assert.assertTrue(application.isAmBlacklistingEnabled());
+    Assert.assertEquals(disableThreshold,
+        application.getAmBlacklistingDisableThreshold(), 1e-8);
+
+    // Scenario 2: Application disables AM blacklisting
+    float globalThreshold = 0.9f;
+    conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, true);
+    conf.setFloat(YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD,
+        globalThreshold);
+    ApplicationSubmissionContext submissionContext2 =
+        new ApplicationSubmissionContextPBImpl();
+    submissionContext2.setAMBlackListRequest(AMBlackListingRequest.newInstance(
+        false, disableThreshold));
+    RMAppImpl application2 = (RMAppImpl) createNewTestApp(submissionContext2);
+
+    // Am blacklisting will be disabled eventhough its enabled in RM.
+    Assert.assertFalse(application2.isAmBlacklistingEnabled());
+
+    // Scenario 3: Application updates invalid AM threshold
+    float invalidDisableThreshold = -0.5f;
+    conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, true);
+    conf.setFloat(YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD,
+        globalThreshold);
+    ApplicationSubmissionContext submissionContext3 =
+        new ApplicationSubmissionContextPBImpl();
+    submissionContext3.setAMBlackListRequest(AMBlackListingRequest.newInstance(
+        true, invalidDisableThreshold));
+    RMAppImpl application3 = (RMAppImpl) createNewTestApp(submissionContext3);
+
+    Assert.assertTrue(application3.isAmBlacklistingEnabled());
+    Assert.assertEquals(globalThreshold,
+        application3.getAmBlacklistingDisableThreshold(), 1e-8);
+
+    // Scenario 4: Empty AMBlackListingRequest in Submission Context
+    conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, true);
+    conf.setFloat(YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD,
+        globalThreshold);
+    ApplicationSubmissionContext submissionContext4 =
+        new ApplicationSubmissionContextPBImpl();
+    RMAppImpl application4 = (RMAppImpl) createNewTestApp(submissionContext4);
+
+    Assert.assertTrue(application4.isAmBlacklistingEnabled());
+    Assert.assertEquals(globalThreshold,
+        application4.getAmBlacklistingDisableThreshold(), 1e-8);
+  }
+
   private void verifyApplicationFinished(RMAppState state) {
   private void verifyApplicationFinished(RMAppState state) {
     ArgumentCaptor<RMAppState> finalState =
     ArgumentCaptor<RMAppState> finalState =
         ArgumentCaptor.forClass(RMAppState.class);
         ArgumentCaptor.forClass(RMAppState.class);