Browse Source

YARN-8002. Support NOT_SELF and ALL namespace types for allocation tag. (Weiwei Yang via wangda)

Change-Id: I63b4e4192a95bf7ded98c54e46a2871c72869700
Wangda Tan 7 years ago
parent
commit
a08921ca6c
20 changed files with 1031 additions and 372 deletions
  1. 0 29
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java
  2. 0 50
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTags.java
  3. 55 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
  4. 0 34
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidAllocationTagException.java
  5. 45 69
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagNamespace.java
  6. 82 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTags.java
  7. 112 34
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
  8. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/Evaluable.java
  9. 15 40
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
  10. 3 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplications.java
  11. 13 14
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java
  12. 8 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java
  13. 10 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
  14. 36 11
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
  15. 126 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java
  16. 231 41
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
  17. 19 17
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsNamespace.java
  18. 246 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java
  19. 25 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/TestLocalAllocationTagsManager.java
  20. 4 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java

+ 0 - 29
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java

@@ -18,12 +18,6 @@
 
 package org.apache.hadoop.yarn.api.records;
 
-import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException;
-
-import java.util.Arrays;
-import java.util.Set;
-import java.util.stream.Collectors;
-
 /**
  * Class to describe all supported forms of namespaces for an allocation tag.
  */
@@ -44,29 +38,6 @@ public enum AllocationTagNamespaceType {
     return this.typeKeyword;
   }
 
-  /**
-   * Parses the namespace type from a given string.
-   * @param prefix namespace prefix.
-   * @return namespace type.
-   * @throws InvalidAllocationTagException
-   */
-  public static AllocationTagNamespaceType fromString(String prefix) throws
-      InvalidAllocationTagException {
-    for (AllocationTagNamespaceType type :
-        AllocationTagNamespaceType.values()) {
-      if(type.getTypeKeyword().equals(prefix)) {
-        return type;
-      }
-    }
-
-    Set<String> values = Arrays.stream(AllocationTagNamespaceType.values())
-        .map(AllocationTagNamespaceType::toString)
-        .collect(Collectors.toSet());
-    throw new InvalidAllocationTagException(
-        "Invalid namespace prefix: " + prefix
-            + ", valid values are: " + String.join(",", values));
-  }
-
   @Override
   public String toString() {
     return this.getTypeKeyword();

+ 0 - 50
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTags.java

@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.api.records;
-
-import java.util.Set;
-
-/**
- * Allocation tags under same namespace.
- */
-public class AllocationTags {
-
-  private AllocationTagNamespace ns;
-  private Set<String> tags;
-
-  public AllocationTags(AllocationTagNamespace namespace,
-      Set<String> allocationTags) {
-    this.ns = namespace;
-    this.tags = allocationTags;
-  }
-
-  /**
-   * @return the namespace of these tags.
-   */
-  public AllocationTagNamespace getNamespace() {
-    return this.ns;
-  }
-
-  /**
-   * @return the allocation tags.
-   */
-  public Set<String> getTags() {
-    return this.tags;
-  }
-}

+ 55 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java

@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.AllocationTagNamespace;
+import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.DelayedOr;
@@ -107,6 +107,25 @@ public final class PlacementConstraints {
         PlacementTargets.allocationTag(allocationTags));
   }
 
+  /**
+   * Similar to {@link #cardinality(String, int, int, String...)}, but let you
+   * attach a namespace to the given allocation tags.
+   *
+   * @param scope the scope of the constraint
+   * @param namespace the namespace of the allocation tags
+   * @param minCardinality determines the minimum number of allocations within
+   *                       the scope
+   * @param maxCardinality determines the maximum number of allocations within
+   *                       the scope
+   * @param allocationTags allocation tags
+   * @return the resulting placement constraint
+   */
+  public static AbstractConstraint cardinality(String scope, String namespace,
+      int minCardinality, int maxCardinality, String... allocationTags) {
+    return new SingleConstraint(scope, minCardinality, maxCardinality,
+        PlacementTargets.allocationTagWithNamespace(namespace, allocationTags));
+  }
+
   /**
    * Similar to {@link #cardinality(String, int, int, String...)}, but
    * determines only the minimum cardinality (the maximum cardinality is
@@ -124,6 +143,23 @@ public final class PlacementConstraints {
         allocationTags);
   }
 
+  /**
+   * Similar to {@link #minCardinality(String, int, String...)}, but let you
+   * attach a namespace to the allocation tags.
+   *
+   * @param scope the scope of the constraint
+   * @param namespace the namespace of these tags
+   * @param minCardinality determines the minimum number of allocations within
+   *                       the scope
+   * @param allocationTags the constraint targets allocations with these tags
+   * @return the resulting placement constraint
+   */
+  public static AbstractConstraint minCardinality(String scope,
+      String namespace, int minCardinality, String... allocationTags) {
+    return cardinality(scope, namespace, minCardinality, Integer.MAX_VALUE,
+        allocationTags);
+  }
+
   /**
    * Similar to {@link #cardinality(String, int, int, String...)}, but
    * determines only the maximum cardinality (the minimum cardinality is 0).
@@ -139,6 +175,23 @@ public final class PlacementConstraints {
     return cardinality(scope, 0, maxCardinality, allocationTags);
   }
 
+  /**
+   * Similar to {@link #maxCardinality(String, int, String...)}, but let you
+   * specify a namespace for the tags, see supported namespaces in
+   * {@link AllocationTagNamespaceType}.
+   *
+   * @param scope the scope of the constraint
+   * @param tagNamespace the namespace of these tags
+   * @param maxCardinality determines the maximum number of allocations within
+   *          the scope
+   * @param allocationTags allocation tags
+   * @return the resulting placement constraint
+   */
+  public static AbstractConstraint maxCardinality(String scope,
+      String tagNamespace, int maxCardinality, String... allocationTags) {
+    return cardinality(scope, tagNamespace, 0, maxCardinality, allocationTags);
+  }
+
   /**
    * This constraint generalizes the cardinality and target constraints.
    *
@@ -242,9 +295,8 @@ public final class PlacementConstraints {
      */
     public static TargetExpression allocationTagToIntraApp(
         String... allocationTags) {
-      AllocationTagNamespace selfNs = new AllocationTagNamespace.Self();
       return new TargetExpression(TargetType.ALLOCATION_TAG,
-          selfNs.toString(), allocationTags);
+          AllocationTagNamespaceType.SELF.toString(), allocationTags);
     }
   }
 

+ 0 - 34
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidAllocationTagException.java

@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.exceptions;
-
-/**
- * This exception is thrown by
- * {@link
- * org.apache.hadoop.yarn.api.records.AllocationTagNamespace#parse(String)}
- * when it fails to parse a namespace.
- */
-public class InvalidAllocationTagException extends YarnException {
-
-  private static final long serialVersionUID = 1L;
-
-  public InvalidAllocationTagException(String message) {
-    super(message);
-  }
-}

+ 45 - 69
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespace.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagNamespace.java

@@ -16,22 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.yarn.api.records;
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
-import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException;
+import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.SELF;
 import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.NOT_SELF;
 import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_LABEL;
 import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_ID;
 import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.ALL;
-import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.fromString;
 
 /**
  * Class to describe the namespace of an allocation tag.
@@ -69,8 +71,6 @@ public abstract class AllocationTagNamespace implements
 
   /**
    * Get the scope of the namespace, in form of a set of applications.
-   * Before calling this method, {@link #evaluate(TargetApplications)}
-   * must be called in prior to ensure the scope is proper evaluated.
    *
    * @return a set of applications.
    */
@@ -83,51 +83,20 @@ public abstract class AllocationTagNamespace implements
     return this.nsScope;
   }
 
-  @Override
-  public abstract void evaluate(TargetApplications target)
-      throws InvalidAllocationTagException;
-
   /**
-   * @return true if the namespace is effective in all applications
-   * in this cluster. Specifically the namespace prefix should be
-   * "all".
-   */
-  public boolean isGlobal() {
-    return AllocationTagNamespaceType.ALL.equals(getNamespaceType());
-  }
-
-  /**
-   * @return true if the namespace is effective within a single application
-   * by its application ID, the namespace prefix should be "app-id";
-   * false otherwise.
-   */
-  public boolean isSingleInterApp() {
-    return AllocationTagNamespaceType.APP_ID.equals(getNamespaceType());
-  }
-
-  /**
-   * @return true if the namespace is effective to the application itself,
-   * the namespace prefix should be "self"; false otherwise.
-   */
-  public boolean isIntraApp() {
-    return AllocationTagNamespaceType.SELF.equals(getNamespaceType());
-  }
-
-  /**
-   * @return true if the namespace is effective to all applications except
-   * itself, the namespace prefix should be "not-self"; false otherwise.
-   */
-  public boolean isNotSelf() {
-    return AllocationTagNamespaceType.NOT_SELF.equals(getNamespaceType());
-  }
-
-  /**
-   * @return true if the namespace is effective to a group of applications
-   * identified by a application label, the namespace prefix should be
-   * "app-label"; false otherwise.
+   * Evaluate the namespace against given target applications
+   * if it is necessary. Only self/not-self/app-label namespace types
+   * require this evaluation step, because they are not binding to a
+   * specific scope during initiating. So we do lazy binding for them
+   * in this method.
+   *
+   * @param target a generic type target that impacts this evaluation.
+   * @throws InvalidAllocationTagsQueryException
    */
-  public boolean isAppLabel() {
-    return AllocationTagNamespaceType.APP_LABEL.equals(getNamespaceType());
+  @Override
+  public void evaluate(TargetApplications target)
+      throws InvalidAllocationTagsQueryException {
+    // Sub-class needs to override this when it requires the eval step.
   }
 
   @Override
@@ -146,9 +115,9 @@ public abstract class AllocationTagNamespace implements
 
     @Override
     public void evaluate(TargetApplications target)
-        throws InvalidAllocationTagException {
+        throws InvalidAllocationTagsQueryException {
       if (target == null || target.getCurrentApplicationId() == null) {
-        throw new InvalidAllocationTagException("Namespace Self must"
+        throw new InvalidAllocationTagsQueryException("Namespace Self must"
             + " be evaluated against a single application ID.");
       }
       ApplicationId applicationId = target.getCurrentApplicationId();
@@ -196,12 +165,6 @@ public abstract class AllocationTagNamespace implements
     public All() {
       super(ALL);
     }
-
-    @Override
-    public void evaluate(TargetApplications target) {
-      Set<ApplicationId> allAppIds = target.getAllApplicationIds();
-      setScopeIfNotNull(allAppIds);
-    }
   }
 
   /**
@@ -229,10 +192,6 @@ public abstract class AllocationTagNamespace implements
     public AppID(ApplicationId applicationId) {
       super(APP_ID);
       this.targetAppId = applicationId;
-    }
-
-    @Override
-    public void evaluate(TargetApplications target) {
       setScopeIfNotNull(ImmutableSet.of(targetAppId));
     }
 
@@ -248,11 +207,11 @@ public abstract class AllocationTagNamespace implements
    *
    * @param namespaceStr namespace string.
    * @return an instance of {@link AllocationTagNamespace}.
-   * @throws InvalidAllocationTagException
+   * @throws InvalidAllocationTagsQueryException
    * if given string is not in valid format
    */
   public static AllocationTagNamespace parse(String namespaceStr)
-      throws InvalidAllocationTagException {
+      throws InvalidAllocationTagsQueryException {
     // Return the default namespace if no valid string is given.
     if (Strings.isNullOrEmpty(namespaceStr)) {
       return new Self();
@@ -273,7 +232,7 @@ public abstract class AllocationTagNamespace implements
       return new All();
     case APP_ID:
       if (nsValues.size() != 2) {
-        throw new InvalidAllocationTagException(
+        throw new InvalidAllocationTagsQueryException(
             "Missing the application ID in the namespace string: "
                 + namespaceStr);
       }
@@ -282,18 +241,35 @@ public abstract class AllocationTagNamespace implements
     case APP_LABEL:
       return new AppLabel();
     default:
-      throw new InvalidAllocationTagException(
+      throw new InvalidAllocationTagsQueryException(
           "Invalid namespace string " + namespaceStr);
     }
   }
 
+  private static AllocationTagNamespaceType fromString(String prefix) throws
+      InvalidAllocationTagsQueryException {
+    for (AllocationTagNamespaceType type :
+        AllocationTagNamespaceType.values()) {
+      if(type.getTypeKeyword().equals(prefix)) {
+        return type;
+      }
+    }
+
+    Set<String> values = Arrays.stream(AllocationTagNamespaceType.values())
+        .map(AllocationTagNamespaceType::toString)
+        .collect(Collectors.toSet());
+    throw new InvalidAllocationTagsQueryException(
+        "Invalid namespace prefix: " + prefix
+            + ", valid values are: " + String.join(",", values));
+  }
+
   private static AllocationTagNamespace parseAppID(String appIDStr)
-      throws InvalidAllocationTagException {
+      throws InvalidAllocationTagsQueryException {
     try {
       ApplicationId applicationId = ApplicationId.fromString(appIDStr);
       return new AppID(applicationId);
     } catch (IllegalArgumentException e) {
-      throw new InvalidAllocationTagException(
+      throw new InvalidAllocationTagsQueryException(
           "Invalid application ID for "
               + APP_ID.getTypeKeyword() + ": " + appIDStr);
     }
@@ -307,11 +283,11 @@ public abstract class AllocationTagNamespace implements
    *
    * @param namespaceStr namespace string.
    * @return a list of parsed strings.
-   * @throws InvalidAllocationTagException
+   * @throws InvalidAllocationTagsQueryException
    * if namespace format is unexpected.
    */
   private static List<String> normalize(String namespaceStr)
-      throws InvalidAllocationTagException {
+      throws InvalidAllocationTagsQueryException {
     List<String> result = new ArrayList<>();
     if (namespaceStr == null) {
       return result;
@@ -326,7 +302,7 @@ public abstract class AllocationTagNamespace implements
 
     // Currently we only allow 1 or 2 values for a namespace string
     if (result.size() == 0 || result.size() > 2) {
-      throw new InvalidAllocationTagException("Invalid namespace string: "
+      throw new InvalidAllocationTagsQueryException("Invalid namespace string: "
           + namespaceStr + ", the syntax is <namespace_prefix> or"
           + " <namespace_prefix>/<namespace_value>");
     }

+ 82 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTags.java

@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+import java.util.Set;
+
+/**
+ * Allocation tags under same namespace.
+ */
+public final class AllocationTags {
+
+  private AllocationTagNamespace ns;
+  private Set<String> tags;
+
+  private AllocationTags(AllocationTagNamespace namespace,
+      Set<String> allocationTags) {
+    this.ns = namespace;
+    this.tags = allocationTags;
+  }
+
+  /**
+   * @return the namespace of these tags.
+   */
+  public AllocationTagNamespace getNamespace() {
+    return this.ns;
+  }
+
+  /**
+   * @return the allocation tags.
+   */
+  public Set<String> getTags() {
+    return this.tags;
+  }
+
+  @VisibleForTesting
+  public static AllocationTags createSingleAppAllocationTags(
+      ApplicationId appId, Set<String> tags) {
+    AllocationTagNamespace namespace = new AllocationTagNamespace.AppID(appId);
+    return new AllocationTags(namespace, tags);
+  }
+
+  @VisibleForTesting
+  public static AllocationTags createGlobalAllocationTags(Set<String> tags) {
+    AllocationTagNamespace namespace = new AllocationTagNamespace.All();
+    return new AllocationTags(namespace, tags);
+  }
+
+  @VisibleForTesting
+  public static AllocationTags createOtherAppAllocationTags(
+      ApplicationId currentApp, Set<ApplicationId> allIds, Set<String> tags)
+      throws InvalidAllocationTagsQueryException {
+    AllocationTagNamespace namespace = new AllocationTagNamespace.NotSelf();
+    TargetApplications ta = new TargetApplications(currentApp, allIds);
+    namespace.evaluate(ta);
+    return new AllocationTags(namespace, tags);
+  }
+
+  public static AllocationTags newAllocationTags(
+      AllocationTagNamespace namespace, Set<String> tags) {
+    return new AllocationTags(namespace, tags);
+  }
+}

+ 112 - 34
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java

@@ -22,9 +22,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -32,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.log4j.Logger;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -75,6 +78,12 @@ public class AllocationTagsManager {
     // Map<Type, Map<Tag, Count>>
     private Map<T, Map<String, Long>> typeToTagsWithCount = new HashMap<>();
 
+    public TypeToCountedTags() {}
+
+    private TypeToCountedTags(Map<T, Map<String, Long>> tags) {
+      this.typeToTagsWithCount = tags;
+    }
+
     // protected by external locks
     private void addTags(T type, Set<String> tags) {
       Map<String, Long> innerMap =
@@ -206,6 +215,52 @@ public class AllocationTagsManager {
     public Map<T, Map<String, Long>> getTypeToTagsWithCount() {
       return typeToTagsWithCount;
     }
+
+    /**
+     * Absorbs the given {@link TypeToCountedTags} to current mapping,
+     * this will aggregate the count of the tags with same name.
+     *
+     * @param target a {@link TypeToCountedTags} to merge with.
+     */
+    protected void absorb(final TypeToCountedTags<T> target) {
+      // No opt if the given target is null.
+      if (target == null || target.getTypeToTagsWithCount() == null) {
+        return;
+      }
+
+      // Merge the target.
+      Map<T, Map<String, Long>> targetMap = target.getTypeToTagsWithCount();
+      for (Map.Entry<T, Map<String, Long>> targetEntry :
+          targetMap.entrySet()) {
+        // Get a mutable copy, do not modify the target reference.
+        Map<String, Long> copy = Maps.newHashMap(targetEntry.getValue());
+
+        // If the target type doesn't exist in the current mapping,
+        // add as a new entry.
+        Map<String, Long> existingMapping =
+            this.typeToTagsWithCount.putIfAbsent(targetEntry.getKey(), copy);
+        // There was a mapping for this target type,
+        // do proper merging on the operator.
+        if (existingMapping != null) {
+          Map<String, Long> localMap =
+              this.typeToTagsWithCount.get(targetEntry.getKey());
+          // Merge the target map to the inner map.
+          Map<String, Long> targetValue = targetEntry.getValue();
+          for (Map.Entry<String, Long> entry : targetValue.entrySet()) {
+            localMap.merge(entry.getKey(), entry.getValue(),
+                (a, b) -> Long.sum(a, b));
+          }
+        }
+      }
+    }
+
+    /**
+     * @return an immutable copy of current instance.
+     */
+    protected TypeToCountedTags immutableCopy() {
+      return new TypeToCountedTags(
+          Collections.unmodifiableMap(this.typeToTagsWithCount));
+    }
   }
 
   @VisibleForTesting
@@ -235,6 +290,34 @@ public class AllocationTagsManager {
     rmContext = context;
   }
 
+  /**
+   * Aggregates multiple {@link TypeToCountedTags} to a single one based on
+   * a given set of application IDs, the values are properly merged.
+   *
+   * @param appIds a set of application IDs.
+   * @return an aggregated {@link TypeToCountedTags}.
+   */
+  private TypeToCountedTags aggregateAllocationTags(Set<ApplicationId> appIds,
+      Map<ApplicationId, TypeToCountedTags> mapping) {
+    TypeToCountedTags result = new TypeToCountedTags();
+    if (appIds != null) {
+      if (appIds.size() == 1) {
+        // If there is only one app, we simply return the mapping
+        // without any extra computation.
+        return mapping.get(appIds.iterator().next());
+      }
+
+      for (ApplicationId applicationId : appIds) {
+        TypeToCountedTags appIdTags = mapping.get(applicationId);
+        if (appIdTags != null) {
+          // Make sure ATM state won't be changed.
+          result.absorb(appIdTags.immutableCopy());
+        }
+      }
+    }
+    return result;
+  }
+
   /**
    * Notify container allocated on a node.
    *
@@ -458,9 +541,8 @@ public class AllocationTagsManager {
    * to implement customized logic.
    *
    * @param nodeId        nodeId, required.
-   * @param applicationId applicationId. When null is specified, return
-   *                      aggregated cardinality among all applications.
-   * @param tags          allocation tags, see
+   * @param tags          {@link AllocationTags}, allocation tags under a
+   *                      specific namespace. See
    *                      {@link SchedulingRequest#getAllocationTags()},
    *                      When multiple tags specified. Returns cardinality
    *                      depends on op. If a specified tag doesn't exist, 0
@@ -474,29 +556,28 @@ public class AllocationTagsManager {
    * @throws InvalidAllocationTagsQueryException when illegal query
    *                                            parameter specified
    */
-  public long getNodeCardinalityByOp(NodeId nodeId, ApplicationId applicationId,
-      Set<String> tags, LongBinaryOperator op)
-      throws InvalidAllocationTagsQueryException {
+  public long getNodeCardinalityByOp(NodeId nodeId, AllocationTags tags,
+      LongBinaryOperator op) throws InvalidAllocationTagsQueryException {
     readLock.lock();
-
     try {
-      if (nodeId == null || op == null) {
+      if (nodeId == null || op == null || tags == null) {
         throw new InvalidAllocationTagsQueryException(
             "Must specify nodeId/tags/op to query cardinality");
       }
 
       TypeToCountedTags mapping;
-      if (applicationId != null) {
-        mapping = perAppNodeMappings.get(applicationId);
-      } else {
+      if (AllocationTagNamespaceType.ALL.equals(
+          tags.getNamespace().getNamespaceType())) {
         mapping = globalNodeMapping;
+      } else {
+        // Aggregate app tags cardinality by applications.
+        mapping = aggregateAllocationTags(
+            tags.getNamespace().getNamespaceScope(),
+            perAppNodeMappings);
       }
 
-      if (mapping == null) {
-        return 0;
-      }
-
-      return mapping.getCardinality(nodeId, tags, op);
+      return mapping == null ? 0 :
+          mapping.getCardinality(nodeId, tags.getTags(), op);
     } finally {
       readLock.unlock();
     }
@@ -507,9 +588,8 @@ public class AllocationTagsManager {
    * to implement customized logic.
    *
    * @param rack          rack, required.
-   * @param applicationId applicationId. When null is specified, return
-   *                      aggregated cardinality among all applications.
-   * @param tags          allocation tags, see
+   * @param tags          {@link AllocationTags}, allocation tags under a
+   *                      specific namespace. See
    *                      {@link SchedulingRequest#getAllocationTags()},
    *                      When multiple tags specified. Returns cardinality
    *                      depends on op. If a specified tag doesn't exist, 0
@@ -523,30 +603,28 @@ public class AllocationTagsManager {
    * @throws InvalidAllocationTagsQueryException when illegal query
    *                                            parameter specified
    */
-  @SuppressWarnings("unchecked")
-  public long getRackCardinalityByOp(String rack, ApplicationId applicationId,
-      Set<String> tags, LongBinaryOperator op)
-      throws InvalidAllocationTagsQueryException {
+  public long getRackCardinalityByOp(String rack, AllocationTags tags,
+      LongBinaryOperator op) throws InvalidAllocationTagsQueryException {
     readLock.lock();
-
     try {
-      if (rack == null || op == null) {
+      if (rack == null || op == null || tags == null) {
         throw new InvalidAllocationTagsQueryException(
-            "Must specify rack/tags/op to query cardinality");
+            "Must specify nodeId/tags/op to query cardinality");
       }
 
       TypeToCountedTags mapping;
-      if (applicationId != null) {
-        mapping = perAppRackMappings.get(applicationId);
-      } else {
+      if (AllocationTagNamespaceType.ALL.equals(
+          tags.getNamespace().getNamespaceType())) {
         mapping = globalRackMapping;
+      } else {
+        // Aggregates cardinality by rack.
+        mapping = aggregateAllocationTags(
+            tags.getNamespace().getNamespaceScope(),
+            perAppRackMappings);
       }
 
-      if (mapping == null) {
-        return 0;
-      }
-
-      return mapping.getCardinality(rack, tags, op);
+      return mapping == null ? 0 :
+          mapping.getCardinality(rack, tags.getTags(), op);
     } finally {
       readLock.unlock();
     }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Evaluable.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/Evaluable.java

@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.yarn.api.records;
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
 
 import org.apache.hadoop.yarn.exceptions.YarnException;
 

+ 15 - 40
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java

@@ -24,11 +24,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.AllocationTagNamespace;
 import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
-import org.apache.hadoop.yarn.api.records.TargetApplications;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
@@ -38,7 +36,6 @@ import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SingleConstraintTransformer;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
-import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
@@ -70,43 +67,25 @@ public final class PlacementConstraintsUtil {
    */
   private static AllocationTagNamespace getAllocationTagNamespace(
       ApplicationId currentAppId, String targetKey, AllocationTagsManager atm)
-      throws InvalidAllocationTagException{
+      throws InvalidAllocationTagsQueryException {
     // Parse to a valid namespace.
     AllocationTagNamespace namespace = AllocationTagNamespace.parse(targetKey);
 
-    // TODO remove such check once we support all forms of namespaces
-    if (!namespace.isIntraApp() && !namespace.isSingleInterApp()) {
-      throw new InvalidAllocationTagException(
-          "Only support " + AllocationTagNamespaceType.SELF.toString()
-              + " and "+ AllocationTagNamespaceType.APP_ID + " now,"
-              + namespace.toString() + " is not supported yet!");
+    // TODO Complete remove this check once we support app-label.
+    if (AllocationTagNamespaceType.APP_LABEL
+        .equals(namespace.getNamespaceType())) {
+      throw new InvalidAllocationTagsQueryException(
+          namespace.toString() + " is not supported yet!");
     }
 
     // Evaluate the namespace according to the given target
     // before it can be consumed.
-    TargetApplications ta = new TargetApplications(currentAppId,
-        atm.getAllApplicationIds());
+    TargetApplications ta =
+        new TargetApplications(currentAppId, atm.getAllApplicationIds());
     namespace.evaluate(ta);
     return namespace;
   }
 
-  // We return a single app Id now, because at present,
-  // only self and app-id namespace is supported. But moving on,
-  // this will return a set of application IDs.
-  // TODO support other forms of namespaces
-  private static ApplicationId getNamespaceScope(
-      AllocationTagNamespace namespace)
-      throws InvalidAllocationTagException {
-    if (namespace.getNamespaceScope() == null
-        || namespace.getNamespaceScope().size() != 1) {
-      throw new InvalidAllocationTagException(
-          "Invalid allocation tag namespace " + namespace.toString()
-              + ", expecting it is not null and only 1 application"
-              + " ID in the scope.");
-    }
-    return namespace.getNamespaceScope().iterator().next();
-  }
-
   /**
    * Returns true if <b>single</b> placement constraint with associated
    * allocationTags and scope is satisfied by a specific scheduler Node.
@@ -128,14 +107,10 @@ public final class PlacementConstraintsUtil {
     // Parse the allocation tag's namespace from the given target key,
     // then evaluate the namespace and get its scope,
     // which is represented by one or more application IDs.
-    ApplicationId effectiveAppID;
-    try {
-      AllocationTagNamespace namespace = getAllocationTagNamespace(
+    AllocationTagNamespace namespace = getAllocationTagNamespace(
           targetApplicationId, te.getTargetKey(), tm);
-      effectiveAppID = getNamespaceScope(namespace);
-    } catch (InvalidAllocationTagException e) {
-      throw new InvalidAllocationTagsQueryException(e);
-    }
+    AllocationTags allocationTags = AllocationTags
+        .newAllocationTags(namespace, te.getTargetValues());
 
     long minScopeCardinality = 0;
     long maxScopeCardinality = 0;
@@ -149,20 +124,20 @@ public final class PlacementConstraintsUtil {
     if (sc.getScope().equals(PlacementConstraints.NODE)) {
       if (checkMinCardinality) {
         minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(),
-            effectiveAppID, te.getTargetValues(), Long::max);
+            allocationTags, Long::max);
       }
       if (checkMaxCardinality) {
         maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(),
-            effectiveAppID, te.getTargetValues(), Long::min);
+            allocationTags, Long::min);
       }
     } else if (sc.getScope().equals(PlacementConstraints.RACK)) {
       if (checkMinCardinality) {
         minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(),
-            effectiveAppID, te.getTargetValues(), Long::max);
+            allocationTags, Long::max);
       }
       if (checkMaxCardinality) {
         maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(),
-            effectiveAppID, te.getTargetValues(), Long::min);
+            allocationTags, Long::min);
       }
     }
 

+ 3 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/TargetApplications.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplications.java

@@ -16,7 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.yarn.api.records;
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -37,10 +39,6 @@ public class TargetApplications {
     this.allAppIds = allApplicationIds;
   }
 
-  public Set<ApplicationId> getAllApplicationIds() {
-    return this.allAppIds;
-  }
-
   public ApplicationId getCurrentApplicationId() {
     return this.currentAppId;
   }

+ 13 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTags;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -139,29 +140,27 @@ class LocalAllocationTagsManager extends AllocationTagsManager {
   }
 
   @Override
-  public long getRackCardinality(String rack, ApplicationId applicationId,
-      String tag) throws InvalidAllocationTagsQueryException {
-    return tagsManager.getRackCardinality(rack, applicationId, tag);
+  public long getNodeCardinalityByOp(NodeId nodeId, AllocationTags tags,
+      LongBinaryOperator op) throws InvalidAllocationTagsQueryException {
+    return tagsManager.getNodeCardinalityByOp(nodeId, tags, op);
   }
 
   @Override
-  public boolean allocationTagExistsOnNode(NodeId nodeId,
-      ApplicationId applicationId, String tag)
-      throws InvalidAllocationTagsQueryException {
-    return tagsManager.allocationTagExistsOnNode(nodeId, applicationId, tag);
+  public long getRackCardinality(String rack, ApplicationId applicationId,
+      String tag) throws InvalidAllocationTagsQueryException {
+    return tagsManager.getRackCardinality(rack, applicationId, tag);
   }
 
   @Override
-  public long getNodeCardinalityByOp(NodeId nodeId,
-      ApplicationId applicationId, Set<String> tags, LongBinaryOperator op)
-      throws InvalidAllocationTagsQueryException {
-    return tagsManager.getNodeCardinalityByOp(nodeId, applicationId, tags, op);
+  public long getRackCardinalityByOp(String rack, AllocationTags tags,
+      LongBinaryOperator op) throws InvalidAllocationTagsQueryException {
+    return tagsManager.getRackCardinalityByOp(rack, tags, op);
   }
 
   @Override
-  public long getRackCardinalityByOp(String rack, ApplicationId applicationId,
-      Set<String> tags, LongBinaryOperator op)
+  public boolean allocationTagExistsOnNode(NodeId nodeId,
+      ApplicationId applicationId, String tag)
       throws InvalidAllocationTagsQueryException {
-    return tagsManager.getRackCardinalityByOp(rack, applicationId, tags, op);
+    return tagsManager.allocationTagExistsOnNode(nodeId, applicationId, tag);
   }
 }

+ 8 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java

@@ -23,7 +23,7 @@ import org.apache.commons.collections.IteratorUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.AllocationTagNamespace;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagNamespace;
 import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -32,7 +32,6 @@ import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
-import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException;
 import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
@@ -339,18 +338,18 @@ public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
         try {
           AllocationTagNamespace tagNS =
               AllocationTagNamespace.parse(targetExpression.getTargetKey());
-          if (!AllocationTagNamespaceType.SELF
+          if (AllocationTagNamespaceType.APP_LABEL
               .equals(tagNS.getNamespaceType())) {
             throwExceptionWithMetaInfo(
-                "As of now, the only accepted target key for targetKey of "
-                    + "allocation_tag target expression is: ["
-                    + AllocationTagNamespaceType.SELF.toString()
-                    + "]. Please make changes to placement constraints "
-                    + "accordingly. If this is null, it will be set to "
+                "As of now, allocation tag namespace ["
+                    + AllocationTagNamespaceType.APP_LABEL.toString()
+                    + "] is not supported. Please make changes to placement "
+                    + "constraints accordingly. If this is null, it will be "
+                    + "set to "
                     + AllocationTagNamespaceType.SELF.toString()
                     + " by default.");
           }
-        } catch (InvalidAllocationTagException e) {
+        } catch (InvalidAllocationTagsQueryException e) {
           throwExceptionWithMetaInfo(
               "Invalid allocation tag namespace, message: " + e.getMessage());
         }

+ 10 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java

@@ -305,6 +305,14 @@ public class MockAM {
   public AllocateResponse allocateIntraAppAntiAffinity(
       ResourceSizing resourceSizing, Priority priority, long allocationId,
       Set<String> allocationTags, String... targetTags) throws Exception {
+    return allocateAppAntiAffinity(resourceSizing, priority, allocationId,
+        null, allocationTags, targetTags);
+  }
+
+  public AllocateResponse allocateAppAntiAffinity(
+      ResourceSizing resourceSizing, Priority priority, long allocationId,
+      String namespace, Set<String> allocationTags, String... targetTags)
+      throws Exception {
     return this.allocate(null,
         Arrays.asList(SchedulingRequest.newBuilder().executionType(
             ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
@@ -313,7 +321,8 @@ public class MockAM {
                 PlacementConstraints
                     .targetNotIn(PlacementConstraints.NODE,
                         PlacementConstraints.PlacementTargets
-                            .allocationTagToIntraApp(targetTags)).build())
+                            .allocationTagWithNamespace(namespace, targetTags))
+                    .build())
             .resourceSizing(resourceSizing).build()), null);
   }
 

+ 36 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java

@@ -63,6 +63,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTags;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -428,20 +430,27 @@ public class TestRMContainerImpl {
     rmContainer.setAllocationTags(ImmutableSet.of("mapper"));
 
     Assert.assertEquals(0,
-        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+        tagsManager.getNodeCardinalityByOp(nodeId,
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(1), null),
+            Long::max));
 
     rmContainer.handle(new RMContainerEvent(containerId,
         RMContainerEventType.START));
 
     Assert.assertEquals(1,
-        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+        tagsManager.getNodeCardinalityByOp(nodeId,
+            AllocationTags.createSingleAppAllocationTags(appId, null),
+            Long::max));
 
     rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus
         .newInstance(containerId, ContainerState.COMPLETE, "", 0),
         RMContainerEventType.KILL));
 
     Assert.assertEquals(0,
-        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+        tagsManager.getNodeCardinalityByOp(nodeId,
+            AllocationTags.createSingleAppAllocationTags(appId, null),
+            Long::max));
 
     /* Second container: ACQUIRED -> FINISHED */
     rmContainer = new RMContainerImpl(container,
@@ -449,14 +458,18 @@ public class TestRMContainerImpl {
         nodeId, "user", rmContext);
 
     Assert.assertEquals(0,
-        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+        tagsManager.getNodeCardinalityByOp(nodeId,
+            AllocationTags.createSingleAppAllocationTags(appId, null),
+            Long::max));
 
     rmContainer.setAllocationTags(ImmutableSet.of("mapper"));
     rmContainer.handle(new RMContainerEvent(containerId,
         RMContainerEventType.START));
 
     Assert.assertEquals(1,
-        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+        tagsManager.getNodeCardinalityByOp(nodeId,
+            AllocationTags.createSingleAppAllocationTags(appId, null),
+            Long::max));
 
     rmContainer.handle(
         new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED));
@@ -466,7 +479,9 @@ public class TestRMContainerImpl {
         RMContainerEventType.FINISHED));
 
     Assert.assertEquals(0,
-        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+        tagsManager.getNodeCardinalityByOp(nodeId,
+            AllocationTags.createSingleAppAllocationTags(appId, null),
+            Long::max));
 
     /* Third container: RUNNING -> FINISHED */
     rmContainer = new RMContainerImpl(container,
@@ -475,13 +490,17 @@ public class TestRMContainerImpl {
     rmContainer.setAllocationTags(ImmutableSet.of("mapper"));
 
     Assert.assertEquals(0,
-        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+        tagsManager.getNodeCardinalityByOp(nodeId,
+            AllocationTags.createSingleAppAllocationTags(appId, null),
+            Long::max));
 
     rmContainer.handle(new RMContainerEvent(containerId,
         RMContainerEventType.START));
 
     Assert.assertEquals(1,
-        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+        tagsManager.getNodeCardinalityByOp(nodeId,
+            AllocationTags.createSingleAppAllocationTags(appId, null),
+            Long::max));
 
     rmContainer.handle(
         new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED));
@@ -494,7 +513,9 @@ public class TestRMContainerImpl {
         RMContainerEventType.FINISHED));
 
     Assert.assertEquals(0,
-        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+        tagsManager.getNodeCardinalityByOp(nodeId,
+            AllocationTags.createSingleAppAllocationTags(appId, null),
+            Long::max));
 
     /* Fourth container: NEW -> RECOVERED */
     rmContainer = new RMContainerImpl(container,
@@ -503,7 +524,9 @@ public class TestRMContainerImpl {
     rmContainer.setAllocationTags(ImmutableSet.of("mapper"));
 
     Assert.assertEquals(0,
-        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+        tagsManager.getNodeCardinalityByOp(nodeId,
+            AllocationTags.createSingleAppAllocationTags(appId, null),
+            Long::max));
 
     NMContainerStatus containerStatus = NMContainerStatus
         .newInstance(containerId, 0, ContainerState.NEW,
@@ -514,6 +537,8 @@ public class TestRMContainerImpl {
         .handle(new RMContainerRecoverEvent(containerId, containerStatus));
 
     Assert.assertEquals(1,
-        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+        tagsManager.getNodeCardinalityByOp(nodeId,
+            AllocationTags.createSingleAppAllocationTags(appId, null),
+            Long::max));
   }
 }

+ 126 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagNamespace;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceSizing;
@@ -224,6 +225,131 @@ public class TestSchedulingRequestContainerAllocation {
     rm1.close();
   }
 
+  /**
+   * This UT covers some basic end-to-end inter-app anti-affinity
+   * constraint tests. For comprehensive tests over different namespace
+   * types, see more in TestPlacementConstraintsUtil.
+   * @throws Exception
+   */
+  @Test
+  public void testInterAppAntiAffinity() throws Exception {
+    Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
+        new Configuration());
+    csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+        YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(csConf) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+
+    // 4 NMs.
+    MockNM[] nms = new MockNM[4];
+    RMNode[] rmNodes = new RMNode[4];
+    for (int i = 0; i < 4; i++) {
+      nms[i] = rm1.registerNode("192.168.0." + i + ":1234", 10 * GB);
+      rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId());
+    }
+
+    // app1 -> c
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);
+
+    // app1 asks for 3 anti-affinity containers for the same app. It should
+    // only get 3 containers allocated to 3 different nodes..
+    am1.allocateIntraAppAntiAffinity(
+        ResourceSizing.newInstance(3, Resource.newInstance(1024, 1)),
+        Priority.newInstance(1), 1L, ImmutableSet.of("mapper"), "mapper");
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+    for (int i = 0; i < 3; i++) {
+      for (int j = 0; j < 4; j++) {
+        cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
+      }
+    }
+
+    System.out.println("Mappers on HOST0: "
+        + rmNodes[0].getAllocationTagsWithCount().get("mapper"));
+    System.out.println("Mappers on HOST1: "
+        + rmNodes[1].getAllocationTagsWithCount().get("mapper"));
+    System.out.println("Mappers on HOST2: "
+        + rmNodes[2].getAllocationTagsWithCount().get("mapper"));
+
+    // App1 should get 4 containers allocated (1 AM + 3 mappers).
+    FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(4, schedulerApp.getLiveContainers().size());
+
+    // app2 -> c
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nms[0]);
+
+    // App2 asks for 3 containers that anti-affinity with any mapper,
+    // since 3 out of 4 nodes already have mapper containers, all 3
+    // containers will be allocated on the other node.
+    AllocationTagNamespace.All allNs = new AllocationTagNamespace.All();
+    am2.allocateAppAntiAffinity(
+        ResourceSizing.newInstance(3, Resource.newInstance(1024, 1)),
+        Priority.newInstance(1), 1L, allNs.toString(),
+        ImmutableSet.of("foo"), "mapper");
+
+    for (int i = 0; i < 3; i++) {
+      for (int j = 0; j < 4; j++) {
+        cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
+      }
+    }
+
+    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+        am2.getApplicationAttemptId());
+
+    // App2 should get 4 containers allocated (1 AM + 3 container).
+    Assert.assertEquals(4, schedulerApp2.getLiveContainers().size());
+
+    // The allocated node should not have mapper tag.
+    Assert.assertTrue(schedulerApp2.getLiveContainers()
+        .stream().allMatch(rmContainer -> {
+          // except the nm host
+          if (!rmContainer.getContainer().getNodeId().equals(rmNodes[0])) {
+            return !rmContainer.getAllocationTags().contains("mapper");
+          }
+          return true;
+        }));
+
+    // app3 -> c
+    RMApp app3 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nms[0]);
+
+    // App3 asks for 3 containers that anti-affinity with any mapper.
+    // Unlike the former case, since app3 source tags are also mapper,
+    // it will anti-affinity with itself too. So there will be only 1
+    // container be allocated.
+    am3.allocateAppAntiAffinity(
+        ResourceSizing.newInstance(3, Resource.newInstance(1024, 1)),
+        Priority.newInstance(1), 1L, allNs.toString(),
+        ImmutableSet.of("mapper"), "mapper");
+
+    for (int i = 0; i < 3; i++) {
+      for (int j = 0; j < 4; j++) {
+        cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
+      }
+    }
+
+    FiCaSchedulerApp schedulerApp3 = cs.getApplicationAttempt(
+        am3.getApplicationAttemptId());
+
+    // App3 should get 2 containers allocated (1 AM + 1 container).
+    Assert.assertEquals(2, schedulerApp3.getLiveContainers().size());
+
+    rm1.close();
+  }
+
   @Test
   public void testSchedulingRequestDisabledByDefault() throws Exception {
     Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(

+ 231 - 41
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java

@@ -21,6 +21,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
 
 import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
@@ -96,7 +97,9 @@ public class TestAllocationTagsManager {
     // Get Node Cardinality of app1 on node1, with tag "mapper"
     Assert.assertEquals(1,
         atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
-            TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(1),
+                ImmutableSet.of("mapper")),
             Long::max));
 
     // Get Rack Cardinality of app1 on rack0, with tag "mapper"
@@ -106,20 +109,26 @@ public class TestAllocationTagsManager {
     // Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=min
     Assert.assertEquals(1,
         atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
-            TestUtils.getMockApplicationId(1),
-            ImmutableSet.of("mapper", "reducer"), Long::min));
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(1),
+                ImmutableSet.of("mapper", "reducer")),
+            Long::min));
 
     // Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=max
     Assert.assertEquals(2,
         atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
-            TestUtils.getMockApplicationId(1),
-            ImmutableSet.of("mapper", "reducer"), Long::max));
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(1),
+                ImmutableSet.of("mapper", "reducer")),
+            Long::max));
 
     // Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=sum
     Assert.assertEquals(3,
         atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
-            TestUtils.getMockApplicationId(1),
-            ImmutableSet.of("mapper", "reducer"), Long::sum));
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(1),
+                ImmutableSet.of("mapper", "reducer")),
+            Long::sum));
 
     // Get Node Cardinality by passing single tag.
     Assert.assertEquals(1,
@@ -134,38 +143,52 @@ public class TestAllocationTagsManager {
     // op=min
     Assert.assertEquals(0,
         atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
-            TestUtils.getMockApplicationId(1),
-            ImmutableSet.of("no_existed", "reducer"), Long::min));
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(1),
+                ImmutableSet.of("no_existed", "reducer")),
+            Long::min));
 
     // Get Node Cardinality of app1 on node2, with tag "<applicationId>", op=max
     // (Expect this returns #containers from app1 on node2)
     Assert.assertEquals(2,
         atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
-            TestUtils.getMockApplicationId(1), null, Long::max));
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(1), null),
+            Long::max));
 
     // Get Node Cardinality of app1 on node2, with empty tag set, op=max
     Assert.assertEquals(2,
         atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
-            TestUtils.getMockApplicationId(1), null, Long::max));
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(1), null),
+            Long::max));
 
     // Get Cardinality of app1 on node2, with empty tag set, op=max
     Assert.assertEquals(2,
         atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
-            TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max));
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(1), ImmutableSet.of()),
+            Long::max));
 
     // Get Node Cardinality of all apps on node2, with empty tag set, op=sum
     Assert.assertEquals(4, atm.getNodeCardinalityByOp(
-        NodeId.fromString("host2:123"), null, ImmutableSet.of(), Long::sum));
+        NodeId.fromString("host2:123"),
+        AllocationTags.createGlobalAllocationTags(ImmutableSet.of()),
+        Long::sum));
 
     // Get Node Cardinality of app_1 on node2, with empty tag set, op=sum
     Assert.assertEquals(3,
         atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
-            TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::sum));
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(1), ImmutableSet.of()),
+            Long::sum));
 
     // Get Node Cardinality of app_1 on node2, with empty tag set, op=sum
     Assert.assertEquals(1,
         atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
-            TestUtils.getMockApplicationId(2), ImmutableSet.of(), Long::sum));
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(2), ImmutableSet.of()),
+            Long::sum));
 
     // Finish all containers:
     atm.removeContainer(NodeId.fromString("host1:123"),
@@ -189,33 +212,42 @@ public class TestAllocationTagsManager {
     // Get Cardinality of app1 on node1, with tag "mapper"
     Assert.assertEquals(0,
         atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
-            TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(1),
+                ImmutableSet.of("mapper")),
             Long::max));
 
     // Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=min
     Assert.assertEquals(0,
         atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
-            TestUtils.getMockApplicationId(1),
-            ImmutableSet.of("mapper", "reducer"), Long::min));
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(1),
+                ImmutableSet.of("mapper", "reducer")),
+            Long::min));
 
     // Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=max
     Assert.assertEquals(0,
         atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
-            TestUtils.getMockApplicationId(1),
-            ImmutableSet.of("mapper", "reducer"), Long::max));
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(1),
+                ImmutableSet.of("mapper", "reducer")),
+            Long::max));
 
     // Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=sum
     Assert.assertEquals(0,
         atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
-            TestUtils.getMockApplicationId(1),
-            ImmutableSet.of("mapper", "reducer"), Long::sum));
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(1),
+                ImmutableSet.of("mapper", "reducer")),
+            Long::sum));
 
     // Get Node Cardinality of app1 on node2, with tag "<applicationId>", op=max
     // (Expect this returns #containers from app1 on node2)
     Assert.assertEquals(0,
         atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
-            TestUtils.getMockApplicationId(1),
-            ImmutableSet.of(TestUtils.getMockApplicationId(1).toString()),
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(1),
+                ImmutableSet.of(TestUtils.getMockApplicationId(1).toString())),
             Long::max));
 
     Assert.assertEquals(0,
@@ -226,21 +258,32 @@ public class TestAllocationTagsManager {
     // Get Node Cardinality of app1 on node2, with empty tag set, op=max
     Assert.assertEquals(0,
         atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
-            TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max));
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(1),
+                ImmutableSet.of()),
+            Long::max));
 
     // Get Node Cardinality of all apps on node2, with empty tag set, op=sum
     Assert.assertEquals(0, atm.getNodeCardinalityByOp(
-        NodeId.fromString("host2:123"), null, ImmutableSet.of(), Long::sum));
+        NodeId.fromString("host2:123"),
+        AllocationTags.createGlobalAllocationTags(ImmutableSet.of()),
+        Long::sum));
 
     // Get Node Cardinality of app_1 on node2, with empty tag set, op=sum
     Assert.assertEquals(0,
         atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
-            TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::sum));
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(1),
+                ImmutableSet.of()),
+            Long::sum));
 
     // Get Node Cardinality of app_2 on node2, with empty tag set, op=sum
     Assert.assertEquals(0,
         atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
-            TestUtils.getMockApplicationId(2), ImmutableSet.of(), Long::sum));
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(1),
+                ImmutableSet.of()),
+            Long::sum));
   }
 
 
@@ -296,20 +339,26 @@ public class TestAllocationTagsManager {
 
     // Get Rack Cardinality of app_1 on rack0, with empty tag set, op=max
     Assert.assertEquals(1, atm.getRackCardinalityByOp("rack0",
-        TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max));
+        AllocationTags.createSingleAppAllocationTags(
+            TestUtils.getMockApplicationId(1),
+            ImmutableSet.of()),
+        Long::max));
 
     // Get Rack Cardinality of app_1 on rack0, with empty tag set, op=min
     Assert.assertEquals(1, atm.getRackCardinalityByOp("rack0",
-        TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::min));
+        AllocationTags.createSingleAppAllocationTags(
+            TestUtils.getMockApplicationId(1),
+            ImmutableSet.of()),
+        Long::min));
 
     // Get Rack Cardinality of all apps on rack0, with empty tag set, op=min
-    Assert.assertEquals(3, atm.getRackCardinalityByOp("rack0", null,
-        ImmutableSet.of(), Long::max));
+    Assert.assertEquals(3, atm.getRackCardinalityByOp("rack0",
+        AllocationTags.createGlobalAllocationTags(ImmutableSet.of()),
+        Long::max));
   }
 
   @Test
-  public void testAllocationTagsManagerMemoryAfterCleanup()
-      throws InvalidAllocationTagsQueryException {
+  public void testAllocationTagsManagerMemoryAfterCleanup() {
     /**
      * Make sure YARN cleans up all memory once container/app finishes.
      */
@@ -362,8 +411,7 @@ public class TestAllocationTagsManager {
   }
 
   @Test
-  public void testQueryCardinalityWithIllegalParameters()
-      throws InvalidAllocationTagsQueryException {
+  public void testQueryCardinalityWithIllegalParameters() {
     /**
      * Make sure YARN cleans up all memory once container/app finishes.
      */
@@ -391,9 +439,12 @@ public class TestAllocationTagsManager {
     // No node-id
     boolean caughtException = false;
     try {
-      atm.getNodeCardinalityByOp(null, TestUtils.getMockApplicationId(2),
-          ImmutableSet.of("mapper"), Long::min);
-    } catch (InvalidAllocationTagsQueryException e) {
+      atm.getNodeCardinalityByOp(null,
+          AllocationTags.createSingleAppAllocationTags(
+              TestUtils.getMockApplicationId(2),
+              ImmutableSet.of("mapper")),
+          Long::min);
+    } catch (InvalidAllocationTagsQueryException e1) {
       caughtException = true;
     }
     Assert.assertTrue("should fail because of nodeId specified",
@@ -403,11 +454,150 @@ public class TestAllocationTagsManager {
     caughtException = false;
     try {
       atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
-          TestUtils.getMockApplicationId(2), ImmutableSet.of("mapper"), null);
-    } catch (InvalidAllocationTagsQueryException e) {
+          AllocationTags.createSingleAppAllocationTags(
+              TestUtils.getMockApplicationId(2),
+              ImmutableSet.of("mapper")),
+          null);
+    } catch (InvalidAllocationTagsQueryException e1) {
       caughtException = true;
     }
     Assert.assertTrue("should fail because of nodeId specified",
         caughtException);
   }
+
+  @Test
+  public void testNodeAllocationTagsAggregation()
+      throws InvalidAllocationTagsQueryException {
+
+    AllocationTagsManager atm = new AllocationTagsManager(rmContext);
+    ApplicationId app1 = TestUtils.getMockApplicationId(1);
+    ApplicationId app2 = TestUtils.getMockApplicationId(2);
+    ApplicationId app3 = TestUtils.getMockApplicationId(3);
+    NodeId host1 = NodeId.fromString("host1:123");
+    NodeId host2 = NodeId.fromString("host2:123");
+    NodeId host3 = NodeId.fromString("host3:123");
+
+    /**
+     * Node1 (rack0)
+     *   app1/A(2)
+     *   app1/B(1)
+     *   app2/A(3)
+     *   app3/A(1)
+     *
+     * Node2 (rack0)
+     *   app2/A(1)
+     *   app2/B(2)
+     *   app1/C(1)
+     *   app3/B(1)
+     *
+     * Node3 (rack1):
+     *   app2/D(1)
+     *   app3/D(1)
+     */
+    atm.addContainer(host1, TestUtils.getMockContainerId(1, 1),
+        ImmutableSet.of("A", "B"));
+    atm.addContainer(host1, TestUtils.getMockContainerId(1, 2),
+        ImmutableSet.of("A"));
+    atm.addContainer(host1, TestUtils.getMockContainerId(2, 1),
+        ImmutableSet.of("A"));
+    atm.addContainer(host1, TestUtils.getMockContainerId(2, 2),
+        ImmutableSet.of("A"));
+    atm.addContainer(host1, TestUtils.getMockContainerId(2, 3),
+        ImmutableSet.of("A"));
+    atm.addContainer(host1, TestUtils.getMockContainerId(3, 1),
+        ImmutableSet.of("A"));
+
+    atm.addContainer(host2, TestUtils.getMockContainerId(1, 3),
+        ImmutableSet.of("C"));
+    atm.addContainer(host2, TestUtils.getMockContainerId(2, 4),
+        ImmutableSet.of("A"));
+    atm.addContainer(host2, TestUtils.getMockContainerId(2, 5),
+        ImmutableSet.of("B"));
+    atm.addContainer(host2, TestUtils.getMockContainerId(2, 6),
+        ImmutableSet.of("B"));
+    atm.addContainer(host2, TestUtils.getMockContainerId(3, 2),
+        ImmutableSet.of("B"));
+
+    atm.addContainer(host3, TestUtils.getMockContainerId(2, 7),
+        ImmutableSet.of("D"));
+    atm.addContainer(host3, TestUtils.getMockContainerId(3, 3),
+        ImmutableSet.of("D"));
+
+    // Target applications, current app: app1
+    // all apps: app1, app2, app3
+    TargetApplications ta = new TargetApplications(app1,
+        ImmutableSet.of(app1, app2, app3));
+
+    //********************************
+    // 1) self (app1)
+    //********************************
+    AllocationTags tags = AllocationTags
+        .createSingleAppAllocationTags(app1, ImmutableSet.of("A", "C"));
+    Assert.assertEquals(2, atm.getNodeCardinalityByOp(host1, tags, Long::max));
+    Assert.assertEquals(0, atm.getNodeCardinalityByOp(host1, tags, Long::min));
+    Assert.assertEquals(1, atm.getNodeCardinalityByOp(host2, tags, Long::max));
+    Assert.assertEquals(0, atm.getNodeCardinalityByOp(host2, tags, Long::min));
+    Assert.assertEquals(0, atm.getNodeCardinalityByOp(host3, tags, Long::max));
+    Assert.assertEquals(0, atm.getNodeCardinalityByOp(host3, tags, Long::min));
+
+    //********************************
+    // 2) not-self (app2, app3)
+    //********************************
+    /**
+     * Verify max/min cardinality of tag "A" on host1 from all applications
+     * other than app1. This returns the max/min cardinality of tag "A" of
+     * app2 or app3 on this node.
+     *
+     * Node1 (rack0)
+     *   app1/A(1)
+     *   app1/B(1)
+     *   app2/A(3)
+     *   app3/A(1)
+     *
+     *   app2_app3/A(4)
+     *   app2_app3/B(0)
+     *
+     * expecting to return max=3, min=1
+     *
+     */
+    tags = AllocationTags.createOtherAppAllocationTags(app1,
+        ImmutableSet.of(app1, app2, app3), ImmutableSet.of("A", "B"));
+
+    Assert.assertEquals(4, atm.getNodeCardinalityByOp(host1, tags, Long::max));
+    Assert.assertEquals(0, atm.getNodeCardinalityByOp(host1, tags, Long::min));
+    Assert.assertEquals(4, atm.getNodeCardinalityByOp(host1, tags, Long::sum));
+
+    //********************************
+    // 3) app-id/app2 (app2)
+    //********************************
+    tags = AllocationTags
+        .createSingleAppAllocationTags(app2, ImmutableSet.of("A", "B"));
+    Assert.assertEquals(3, atm.getNodeCardinalityByOp(host1, tags, Long::max));
+    Assert.assertEquals(0, atm.getNodeCardinalityByOp(host1, tags, Long::min));
+    Assert.assertEquals(2, atm.getNodeCardinalityByOp(host2, tags, Long::max));
+    Assert.assertEquals(1, atm.getNodeCardinalityByOp(host2, tags, Long::min));
+    Assert.assertEquals(3, atm.getNodeCardinalityByOp(host2, tags, Long::sum));
+
+
+    //********************************
+    // 4) all (app1, app2, app3)
+    //********************************
+    tags = AllocationTags
+        .createGlobalAllocationTags(ImmutableSet.of("A"));
+    Assert.assertEquals(6, atm.getNodeCardinalityByOp(host1, tags, Long::sum));
+    Assert.assertEquals(1, atm.getNodeCardinalityByOp(host2, tags, Long::sum));
+    Assert.assertEquals(0, atm.getNodeCardinalityByOp(host3, tags, Long::sum));
+
+    tags = AllocationTags
+        .createGlobalAllocationTags(ImmutableSet.of("A", "B"));
+    Assert.assertEquals(7, atm.getNodeCardinalityByOp(host1, tags, Long::sum));
+    Assert.assertEquals(4, atm.getNodeCardinalityByOp(host2, tags, Long::sum));
+    Assert.assertEquals(0, atm.getNodeCardinalityByOp(host3, tags, Long::sum));
+    Assert.assertEquals(6, atm.getNodeCardinalityByOp(host1, tags, Long::max));
+    Assert.assertEquals(3, atm.getNodeCardinalityByOp(host2, tags, Long::max));
+    Assert.assertEquals(0, atm.getNodeCardinalityByOp(host3, tags, Long::max));
+    Assert.assertEquals(1, atm.getNodeCardinalityByOp(host1, tags, Long::min));
+    Assert.assertEquals(1, atm.getNodeCardinalityByOp(host2, tags, Long::min));
+    Assert.assertEquals(0, atm.getNodeCardinalityByOp(host3, tags, Long::min));
+  }
 }

+ 19 - 17
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsNamespace.java

@@ -16,10 +16,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; /**
  * limitations under the License.
  */
 import com.google.common.collect.ImmutableSet;
-import org.apache.hadoop.yarn.api.records.AllocationTagNamespace;
+import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.TargetApplications;
-import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -29,29 +27,34 @@ import org.junit.Test;
 public class TestAllocationTagsNamespace {
 
   @Test
-  public void testNamespaceParse() throws InvalidAllocationTagException {
+  public void testNamespaceParse() throws InvalidAllocationTagsQueryException {
     AllocationTagNamespace namespace;
 
     String namespaceStr = "self";
     namespace = AllocationTagNamespace.parse(namespaceStr);
-    Assert.assertTrue(namespace.isIntraApp());
+    Assert.assertEquals(AllocationTagNamespaceType.SELF,
+        namespace.getNamespaceType());
 
     namespaceStr = "not-self";
     namespace = AllocationTagNamespace.parse(namespaceStr);
-    Assert.assertTrue(namespace.isNotSelf());
+    Assert.assertEquals(AllocationTagNamespaceType.NOT_SELF,
+        namespace.getNamespaceType());
 
     namespaceStr = "all";
     namespace = AllocationTagNamespace.parse(namespaceStr);
-    Assert.assertTrue(namespace.isGlobal());
+    Assert.assertEquals(AllocationTagNamespaceType.ALL,
+        namespace.getNamespaceType());
 
     namespaceStr = "app-label";
     namespace = AllocationTagNamespace.parse(namespaceStr);
-    Assert.assertTrue(namespace.isAppLabel());
+    Assert.assertEquals(AllocationTagNamespaceType.APP_LABEL,
+        namespace.getNamespaceType());
 
     ApplicationId applicationId = ApplicationId.newInstance(12345, 1);
     namespaceStr = "app-id/" + applicationId.toString();
     namespace = AllocationTagNamespace.parse(namespaceStr);
-    Assert.assertTrue(namespace.isSingleInterApp());
+    Assert.assertEquals(AllocationTagNamespaceType.APP_ID,
+        namespace.getNamespaceType());
 
     // Invalid app-id namespace syntax, invalid app ID.
     try {
@@ -59,7 +62,7 @@ public class TestAllocationTagsNamespace {
       AllocationTagNamespace.parse(namespaceStr);
       Assert.fail("Parsing should fail as the given app ID is invalid");
     } catch (Exception e) {
-      Assert.assertTrue(e instanceof InvalidAllocationTagException);
+      Assert.assertTrue(e instanceof InvalidAllocationTagsQueryException);
       Assert.assertTrue(e.getMessage().startsWith(
           "Invalid application ID for app-id"));
     }
@@ -71,7 +74,7 @@ public class TestAllocationTagsNamespace {
       Assert.fail("Parsing should fail as the given namespace"
           + " is missing application ID");
     } catch (Exception e) {
-      Assert.assertTrue(e instanceof InvalidAllocationTagException);
+      Assert.assertTrue(e instanceof InvalidAllocationTagsQueryException);
       Assert.assertTrue(e.getMessage().startsWith(
           "Missing the application ID in the namespace string"));
     }
@@ -82,14 +85,15 @@ public class TestAllocationTagsNamespace {
       AllocationTagNamespace.parse(namespaceStr);
       Assert.fail("Parsing should fail as the giving type is not supported.");
     } catch (Exception e) {
-      Assert.assertTrue(e instanceof InvalidAllocationTagException);
+      Assert.assertTrue(e instanceof InvalidAllocationTagsQueryException);
       Assert.assertTrue(e.getMessage().startsWith(
           "Invalid namespace prefix"));
     }
   }
 
   @Test
-  public void testNamespaceEvaluation() throws InvalidAllocationTagException {
+  public void testNamespaceEvaluation() throws
+      InvalidAllocationTagsQueryException {
     AllocationTagNamespace namespace;
     TargetApplications targetApplications;
     ApplicationId app1 = ApplicationId.newInstance(10000, 1);
@@ -131,10 +135,8 @@ public class TestAllocationTagsNamespace {
 
     namespaceStr = "all";
     namespace = AllocationTagNamespace.parse(namespaceStr);
-    targetApplications = new TargetApplications(null,
-        ImmutableSet.of(app1, app2));
-    namespace.evaluate(targetApplications);
-    Assert.assertEquals(2, namespace.getNamespaceScope().size());
+    Assert.assertEquals(AllocationTagNamespaceType.ALL,
+        namespace.getNamespaceType());
 
     namespaceStr = "app-id/" + app2.toString();
     namespace = AllocationTagNamespace.parse(namespaceStr);

+ 246 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java

@@ -41,7 +41,6 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.hadoop.yarn.api.records.AllocationTagNamespace;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -512,6 +511,252 @@ public class TestPlacementConstraintsUtil {
         createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
   }
 
+  @Test
+  public void testGlobalAppConstraints()
+      throws InvalidAllocationTagsQueryException {
+    AllocationTagsManager tm = new AllocationTagsManager(rmContext);
+    PlacementConstraintManagerService pcm =
+        new MemoryPlacementConstraintManager();
+    rmContext.setAllocationTagsManager(tm);
+    rmContext.setPlacementConstraintManager(pcm);
+
+    long ts = System.currentTimeMillis();
+    ApplicationId application1 = BuilderUtils.newApplicationId(ts, 100);
+    ApplicationId application2 = BuilderUtils.newApplicationId(ts, 101);
+    ApplicationId application3 = BuilderUtils.newApplicationId(ts, 102);
+
+    // Register App1 with anti-affinity constraint map.
+    RMNode n0r1 = rmNodes.get(0);
+    RMNode n1r1 = rmNodes.get(1);
+    RMNode n2r2 = rmNodes.get(2);
+    RMNode n3r2 = rmNodes.get(3);
+
+    /**
+     * Place container:
+     *  n0: app1/A(1), app2/A(1)
+     *  n1: app3/A(3)
+     *  n2: app1/A(2)
+     *  n3: ""
+     */
+    tm.addContainer(n0r1.getNodeID(),
+        newContainerId(application1), ImmutableSet.of("A"));
+    tm.addContainer(n0r1.getNodeID(),
+        newContainerId(application2), ImmutableSet.of("A"));
+    tm.addContainer(n1r1.getNodeID(),
+        newContainerId(application3), ImmutableSet.of("A"));
+    tm.addContainer(n1r1.getNodeID(),
+        newContainerId(application3), ImmutableSet.of("A"));
+    tm.addContainer(n1r1.getNodeID(),
+        newContainerId(application3), ImmutableSet.of("A"));
+    tm.addContainer(n2r2.getNodeID(),
+        newContainerId(application1), ImmutableSet.of("A"));
+    tm.addContainer(n2r2.getNodeID(),
+        newContainerId(application1), ImmutableSet.of("A"));
+
+    SchedulerNode schedulerNode0 = newSchedulerNode(n0r1.getHostName(),
+        n0r1.getRackName(), n0r1.getNodeID());
+    SchedulerNode schedulerNode1 = newSchedulerNode(n1r1.getHostName(),
+        n1r1.getRackName(), n1r1.getNodeID());
+    SchedulerNode schedulerNode2 = newSchedulerNode(n2r2.getHostName(),
+        n2r2.getRackName(), n2r2.getNodeID());
+    SchedulerNode schedulerNode3 = newSchedulerNode(n3r2.getHostName(),
+        n3r2.getRackName(), n3r2.getNodeID());
+
+    AllocationTagNamespace namespaceAll =
+        new AllocationTagNamespace.All();
+
+    //***************************
+    // 1) all, anti-affinity
+    //***************************
+    // Anti-affinity with "A" from any application including itself.
+    PlacementConstraint constraint1 = PlacementConstraints.targetNotIn(
+        NODE, allocationTagWithNamespace(namespaceAll.toString(), "A"))
+        .build();
+    Map<Set<String>, PlacementConstraint> constraintMap = new HashMap<>();
+    Set<String> srcTags1 = ImmutableSet.of("A");
+    constraintMap.put(srcTags1, constraint1);
+    pcm.registerApplication(application1, constraintMap);
+
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
+        application1, createSchedulingRequest(srcTags1),
+        schedulerNode0, pcm, tm));
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
+        application1, createSchedulingRequest(srcTags1),
+        schedulerNode1, pcm, tm));
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
+        application1, createSchedulingRequest(srcTags1),
+        schedulerNode2, pcm, tm));
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
+        application1, createSchedulingRequest(srcTags1),
+        schedulerNode3, pcm, tm));
+
+    pcm.unregisterApplication(application1);
+
+    //***************************
+    // 2) all, max cardinality
+    //***************************
+    PlacementConstraint constraint2 = PlacementConstraints
+        .maxCardinality(NODE, namespaceAll.toString(), 2, "A")
+        .build();
+    constraintMap.clear();
+    Set<String> srcTags2 = ImmutableSet.of("foo");
+    constraintMap.put(srcTags2, constraint2);
+    pcm.registerApplication(application2, constraintMap);
+
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
+        application2, createSchedulingRequest(srcTags2),
+        schedulerNode0, pcm, tm));
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
+        application2, createSchedulingRequest(srcTags2),
+        schedulerNode1, pcm, tm));
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
+        application2, createSchedulingRequest(srcTags2),
+        schedulerNode2, pcm, tm));
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
+        application2, createSchedulingRequest(srcTags2),
+        schedulerNode3, pcm, tm));
+
+    pcm.unregisterApplication(application2);
+
+    //***************************
+    // 3) all, min cardinality
+    //***************************
+    PlacementConstraint constraint3 = PlacementConstraints
+        .minCardinality(NODE, namespaceAll.toString(), 3, "A")
+        .build();
+    constraintMap.clear();
+    Set<String> srcTags3 = ImmutableSet.of("foo");
+    constraintMap.put(srcTags3, constraint3);
+    pcm.registerApplication(application3, constraintMap);
+
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
+        application3, createSchedulingRequest(srcTags3),
+        schedulerNode0, pcm, tm));
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
+        application3, createSchedulingRequest(srcTags3),
+        schedulerNode1, pcm, tm));
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
+        application3, createSchedulingRequest(srcTags3),
+        schedulerNode2, pcm, tm));
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
+        application3, createSchedulingRequest(srcTags3),
+        schedulerNode3, pcm, tm));
+
+    pcm.unregisterApplication(application3);
+  }
+
+  @Test
+  public void testNotSelfAppConstraints()
+      throws InvalidAllocationTagsQueryException {
+    AllocationTagsManager tm = new AllocationTagsManager(rmContext);
+    PlacementConstraintManagerService pcm =
+        new MemoryPlacementConstraintManager();
+    rmContext.setAllocationTagsManager(tm);
+    rmContext.setPlacementConstraintManager(pcm);
+
+    long ts = System.currentTimeMillis();
+    ApplicationId application1 = BuilderUtils.newApplicationId(ts, 100);
+    ApplicationId application2 = BuilderUtils.newApplicationId(ts, 101);
+    ApplicationId application3 = BuilderUtils.newApplicationId(ts, 102);
+
+    // Register App1 with anti-affinity constraint map.
+    RMNode n0r1 = rmNodes.get(0);
+    RMNode n1r1 = rmNodes.get(1);
+    RMNode n2r2 = rmNodes.get(2);
+    RMNode n3r2 = rmNodes.get(3);
+
+    /**
+     * Place container:
+     *  n0: app1/A(1), app2/A(1)
+     *  n1: app3/A(3)
+     *  n2: app1/A(2)
+     *  n3: ""
+     */
+    tm.addContainer(n0r1.getNodeID(),
+        newContainerId(application1), ImmutableSet.of("A"));
+    tm.addContainer(n0r1.getNodeID(),
+        newContainerId(application2), ImmutableSet.of("A"));
+    tm.addContainer(n1r1.getNodeID(),
+        newContainerId(application3), ImmutableSet.of("A"));
+    tm.addContainer(n1r1.getNodeID(),
+        newContainerId(application3), ImmutableSet.of("A"));
+    tm.addContainer(n1r1.getNodeID(),
+        newContainerId(application3), ImmutableSet.of("A"));
+    tm.addContainer(n2r2.getNodeID(),
+        newContainerId(application1), ImmutableSet.of("A"));
+    tm.addContainer(n2r2.getNodeID(),
+        newContainerId(application1), ImmutableSet.of("A"));
+
+    SchedulerNode schedulerNode0 = newSchedulerNode(n0r1.getHostName(),
+        n0r1.getRackName(), n0r1.getNodeID());
+    SchedulerNode schedulerNode1 = newSchedulerNode(n1r1.getHostName(),
+        n1r1.getRackName(), n1r1.getNodeID());
+    SchedulerNode schedulerNode2 = newSchedulerNode(n2r2.getHostName(),
+        n2r2.getRackName(), n2r2.getNodeID());
+    SchedulerNode schedulerNode3 = newSchedulerNode(n3r2.getHostName(),
+        n3r2.getRackName(), n3r2.getNodeID());
+
+    AllocationTagNamespace notSelf =
+        new AllocationTagNamespace.NotSelf();
+
+    //***************************
+    // 1) not-self, app1
+    //***************************
+    // Anti-affinity with "A" from app2 and app3,
+    // n0 and n1 both have tag "A" from either app2 or app3, so they are
+    // not qualified for the placement.
+    PlacementConstraint constraint1 = PlacementConstraints.targetNotIn(
+        NODE, allocationTagWithNamespace(notSelf.toString(), "A"))
+        .build();
+    Map<Set<String>, PlacementConstraint> constraintMap = new HashMap<>();
+    Set<String> srcTags1 = ImmutableSet.of("A");
+    constraintMap.put(srcTags1, constraint1);
+    pcm.registerApplication(application1, constraintMap);
+
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
+        application1, createSchedulingRequest(srcTags1),
+        schedulerNode0, pcm, tm));
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
+        application1, createSchedulingRequest(srcTags1),
+        schedulerNode1, pcm, tm));
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
+        application1, createSchedulingRequest(srcTags1),
+        schedulerNode2, pcm, tm));
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
+        application1, createSchedulingRequest(srcTags1),
+        schedulerNode3, pcm, tm));
+
+    pcm.unregisterApplication(application1);
+
+    //***************************
+    // 2) not-self, app1
+    //***************************
+    // Affinity with "A" from app2 and app3,
+    // N0 and n1 are qualified for the placement.
+    PlacementConstraint constraint2 = PlacementConstraints.targetIn(
+        NODE, allocationTagWithNamespace(notSelf.toString(), "A"))
+        .build();
+    Map<Set<String>, PlacementConstraint> cm2 = new HashMap<>();
+    Set<String> srcTags2 = ImmutableSet.of("A");
+    cm2.put(srcTags2, constraint2);
+    pcm.registerApplication(application1, cm2);
+
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
+        application1, createSchedulingRequest(srcTags2),
+        schedulerNode0, pcm, tm));
+    Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
+        application1, createSchedulingRequest(srcTags2),
+        schedulerNode1, pcm, tm));
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
+        application1, createSchedulingRequest(srcTags2),
+        schedulerNode2, pcm, tm));
+    Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
+        application1, createSchedulingRequest(srcTags2),
+        schedulerNode3, pcm, tm));
+
+    pcm.unregisterApplication(application1);
+  }
+
   @Test
   public void testInterAppConstraintsByAppID()
       throws InvalidAllocationTagsQueryException {

+ 25 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/TestLocalAllocationTagsManager.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTags;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
 import org.junit.Assert;
@@ -85,46 +86,62 @@ public class TestLocalAllocationTagsManager {
     // Expect tag mappings to be present including temp Tags
     Assert.assertEquals(1,
         atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
-            TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(1),
+                ImmutableSet.of("mapper")),
             Long::sum));
 
     Assert.assertEquals(1,
         atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
-            TestUtils.getMockApplicationId(1), ImmutableSet.of("service"),
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(1),
+                ImmutableSet.of("service")),
             Long::sum));
 
     Assert.assertEquals(1,
         atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
-            TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(2),
+                ImmutableSet.of("service")),
             Long::sum));
 
     // Do a temp Tag cleanup on app2
     ephAtm.cleanTempContainers(TestUtils.getMockApplicationId(2));
     Assert.assertEquals(0,
         atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
-            TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(2),
+                ImmutableSet.of("service")),
             Long::sum));
     // Expect app1 to be unaffected
     Assert.assertEquals(1,
         atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
-            TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(1),
+                ImmutableSet.of("mapper")),
             Long::sum));
     // Do a cleanup on app1 as well
     ephAtm.cleanTempContainers(TestUtils.getMockApplicationId(1));
     Assert.assertEquals(0,
         atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
-            TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(1),
+                ImmutableSet.of("mapper")),
             Long::sum));
 
     // Non temp-tags should be unaffected
     Assert.assertEquals(1,
         atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
-            TestUtils.getMockApplicationId(1), ImmutableSet.of("service"),
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(1),
+                ImmutableSet.of("service")),
             Long::sum));
 
     Assert.assertEquals(0,
         atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
-            TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(2),
+                ImmutableSet.of("service")),
             Long::sum));
 
     // Expect app2 with no containers, and app1 with 2 containers across 2 nodes

+ 4 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
 
 import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTags;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -366,8 +367,7 @@ public class TestSingleConstraintAppPlacementAllocator {
     allocator.canAllocate(NodeType.NODE_LOCAL,
         TestUtils.getMockNode("host1", "/rack1", 123, 1024));
     verify(spyAllocationTagsManager, Mockito.times(1)).getNodeCardinalityByOp(
-        eq(NodeId.fromString("host1:123")), eq(TestUtils.getMockApplicationId(1)),
-        eq(ImmutableSet.of("mapper", "reducer")),
+        eq(NodeId.fromString("host1:123")), any(AllocationTags.class),
         any(LongBinaryOperator.class));
 
     allocator = new SingleConstraintAppPlacementAllocator();
@@ -388,9 +388,8 @@ public class TestSingleConstraintAppPlacementAllocator {
     allocator.canAllocate(NodeType.NODE_LOCAL,
         TestUtils.getMockNode("host1", "/rack1", 123, 1024));
     verify(spyAllocationTagsManager, Mockito.atLeast(1)).getNodeCardinalityByOp(
-        eq(NodeId.fromString("host1:123")),
-        eq(TestUtils.getMockApplicationId(1)), eq(ImmutableSet
-            .of("mapper", "reducer")), any(LongBinaryOperator.class));
+        eq(NodeId.fromString("host1:123")), any(AllocationTags.class),
+        any(LongBinaryOperator.class));
 
     SchedulerNode node1 = mock(SchedulerNode.class);
     when(node1.getPartition()).thenReturn("x");