|
@@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentMap;
|
|
|
import java.util.stream.Collectors;
|
|
|
import java.util.stream.Stream;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
+import com.google.common.collect.ImmutableMap;
|
|
|
|
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
@@ -227,6 +228,93 @@ public class TestPlacementConstraintsUtil {
|
|
|
createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testMultiTagsPlacementConstraints()
|
|
|
+ throws InvalidAllocationTagsQueryException {
|
|
|
+ PlacementConstraintManagerService pcm =
|
|
|
+ new MemoryPlacementConstraintManager();
|
|
|
+ AllocationTagsManager tm = new AllocationTagsManager(rmContext);
|
|
|
+ rmContext.setAllocationTagsManager(tm);
|
|
|
+ rmContext.setPlacementConstraintManager(pcm);
|
|
|
+
|
|
|
+ HashSet<String> st1 = new HashSet<>(Arrays.asList("X"));
|
|
|
+ HashSet<String> st2 = new HashSet<>(Arrays.asList("Y"));
|
|
|
+
|
|
|
+ // X anti-affinity with A and B
|
|
|
+ PlacementConstraint pc1 = PlacementConstraints.build(
|
|
|
+ targetNotIn(NODE, allocationTag("A", "B")));
|
|
|
+ // Y affinity with A and B
|
|
|
+ PlacementConstraint pc2 = PlacementConstraints.build(
|
|
|
+ targetIn(NODE, allocationTag("A", "B")));
|
|
|
+ Map<Set<String>, PlacementConstraint> constraintMap =
|
|
|
+ ImmutableMap.of(st1, pc1, st2, pc2);
|
|
|
+ // Register App1 with affinity constraint map
|
|
|
+ pcm.registerApplication(appId1, constraintMap);
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Now place container:
|
|
|
+ * n0: A(1)
|
|
|
+ * n1: B(1)
|
|
|
+ * n2:
|
|
|
+ * n3:
|
|
|
+ */
|
|
|
+ RMNode n0_r1 = rmNodes.get(0);
|
|
|
+ RMNode n1_r1 = rmNodes.get(1);
|
|
|
+ RMNode n2_r2 = rmNodes.get(2);
|
|
|
+ RMNode n3_r2 = rmNodes.get(3);
|
|
|
+ SchedulerNode schedulerNode0 =newSchedulerNode(n0_r1.getHostName(),
|
|
|
+ n0_r1.getRackName(), n0_r1.getNodeID());
|
|
|
+ SchedulerNode schedulerNode1 =newSchedulerNode(n1_r1.getHostName(),
|
|
|
+ n1_r1.getRackName(), n1_r1.getNodeID());
|
|
|
+ SchedulerNode schedulerNode2 =newSchedulerNode(n2_r2.getHostName(),
|
|
|
+ n2_r2.getRackName(), n2_r2.getNodeID());
|
|
|
+ SchedulerNode schedulerNode3 =newSchedulerNode(n3_r2.getHostName(),
|
|
|
+ n3_r2.getRackName(), n3_r2.getNodeID());
|
|
|
+
|
|
|
+ ContainerId ca = ContainerId
|
|
|
+ .newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
|
|
|
+ tm.addContainer(n0_r1.getNodeID(), ca, ImmutableSet.of("A"));
|
|
|
+
|
|
|
+ ContainerId cb = ContainerId
|
|
|
+ .newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
|
|
|
+ tm.addContainer(n1_r1.getNodeID(), cb, ImmutableSet.of("B"));
|
|
|
+
|
|
|
+ // n0 and n1 has A/B so they cannot satisfy the PC
|
|
|
+ // n2 and n3 doesn't have A or B, so they can satisfy the PC
|
|
|
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
|
|
|
+ createSchedulingRequest(st1), schedulerNode0, pcm, tm));
|
|
|
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
|
|
|
+ createSchedulingRequest(st1), schedulerNode1, pcm, tm));
|
|
|
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
|
|
|
+ createSchedulingRequest(st1), schedulerNode2, pcm, tm));
|
|
|
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
|
|
|
+ createSchedulingRequest(st1), schedulerNode3, pcm, tm));
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Now place container:
|
|
|
+ * n0: A(1)
|
|
|
+ * n1: B(1)
|
|
|
+ * n2: A(1), B(1)
|
|
|
+ * n3:
|
|
|
+ */
|
|
|
+ ContainerId ca1 = ContainerId
|
|
|
+ .newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
|
|
|
+ tm.addContainer(n2_r2.getNodeID(), ca1, ImmutableSet.of("A"));
|
|
|
+ ContainerId cb1 = ContainerId
|
|
|
+ .newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
|
|
|
+ tm.addContainer(n2_r2.getNodeID(), cb1, ImmutableSet.of("B"));
|
|
|
+
|
|
|
+ // Only n2 has both A and B so only it can satisfy the PC
|
|
|
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
|
|
|
+ createSchedulingRequest(st2), schedulerNode0, pcm, tm));
|
|
|
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
|
|
|
+ createSchedulingRequest(st2), schedulerNode1, pcm, tm));
|
|
|
+ Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
|
|
|
+ createSchedulingRequest(st2), schedulerNode2, pcm, tm));
|
|
|
+ Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
|
|
|
+ createSchedulingRequest(st2), schedulerNode3, pcm, tm));
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testRackAffinityAssignment()
|
|
|
throws InvalidAllocationTagsQueryException {
|