Browse Source

YARN-9252. Allocation Tag Namespace support in Distributed Shell. Contributed by Prabhu Joseph.

Weiwei Yang 6 years ago
parent
commit
2b7f828d46

+ 43 - 20
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java

@@ -20,10 +20,13 @@ package org.apache.hadoop.yarn.util.constraint;
 import com.google.common.base.Strings;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
 import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets;
 
 import java.util.ArrayList;
 import java.util.Map;
@@ -35,6 +38,7 @@ import java.util.List;
 import java.util.Stack;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.HashSet;
 import java.util.Optional;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -51,6 +55,7 @@ public final class PlacementConstraintParser {
   private static final char KV_SPLIT_DELIM = '=';
   private static final char BRACKET_START = '(';
   private static final char BRACKET_END = ')';
+  private static final char NAMESPACE_DELIM = '/';
   private static final String KV_NE_DELIM = "!=";
   private static final String IN = "in";
   private static final String NOT_IN = "notin";
@@ -104,6 +109,25 @@ public final class PlacementConstraintParser {
       }
     }
 
+    TargetExpression parseNameSpace(String targetTag)
+        throws PlacementConstraintParseException {
+      int i = targetTag.lastIndexOf(NAMESPACE_DELIM);
+      if (i != -1) {
+        String namespace = targetTag.substring(0, i);
+        for (AllocationTagNamespaceType type :
+            AllocationTagNamespaceType.values()) {
+          if (type.getTypeKeyword().equals(namespace)) {
+            return PlacementTargets.allocationTagWithNamespace(
+                namespace, targetTag.substring(i+1));
+          }
+        }
+        throw new PlacementConstraintParseException(
+            "Invalid namespace prefix: " + namespace);
+      } else {
+        return PlacementTargets.allocationTag(targetTag);
+      }
+    }
+
     String parseScope(String scopeString)
         throws PlacementConstraintParseException {
       if (scopeString.equalsIgnoreCase(SCOPE_NODE)) {
@@ -392,12 +416,11 @@ public final class PlacementConstraintParser {
                 + constraintEntities);
       }
 
-      PlacementConstraint.TargetExpression target = null;
+      TargetExpression target = null;
       if (!constraintEntities.isEmpty()) {
-        target = PlacementConstraints.PlacementTargets
-            .nodeAttribute(attributeName,
-                constraintEntities
-                    .toArray(new String[constraintEntities.size()]));
+        target = PlacementTargets.nodeAttribute(attributeName,
+            constraintEntities
+            .toArray(new String[constraintEntities.size()]));
       }
 
       placementConstraints = PlacementConstraints
@@ -457,23 +480,20 @@ public final class PlacementConstraintParser {
         String scope = nextToken();
         scope = parseScope(scope);
 
-        Set<String> constraintEntities = new TreeSet<>();
+        Set<TargetExpression> targetExpressions = new HashSet<>();
         while(hasMoreTokens()) {
           String tag = nextToken();
-          constraintEntities.add(tag);
-        }
-        PlacementConstraint.TargetExpression target = null;
-        if(!constraintEntities.isEmpty()) {
-          target = PlacementConstraints.PlacementTargets.allocationTag(
-              constraintEntities
-                  .toArray(new String[constraintEntities.size()]));
+          TargetExpression t = parseNameSpace(tag);
+          targetExpressions.add(t);
         }
+        TargetExpression[] targetArr = targetExpressions.toArray(
+            new TargetExpression[targetExpressions.size()]);
         if (op.equalsIgnoreCase(IN)) {
           placementConstraints = PlacementConstraints
-              .targetIn(scope, target);
+              .targetIn(scope, targetArr);
         } else {
           placementConstraints = PlacementConstraints
-              .targetNotIn(scope, target);
+              .targetNotIn(scope, targetArr);
         }
       } else {
         throw new PlacementConstraintParseException(
@@ -527,13 +547,16 @@ public final class PlacementConstraintParser {
       String minCardinalityStr = resetElements.pop();
       int min = toInt(minCardinalityStr);
 
-      ArrayList<String> targetTags = new ArrayList<>();
+      Set<TargetExpression> targetExpressions = new HashSet<>();
       while (!resetElements.empty()) {
-        targetTags.add(resetElements.pop());
+        String tag = resetElements.pop();
+        TargetExpression t = parseNameSpace(tag);
+        targetExpressions.add(t);
       }
+      TargetExpression[] targetArr = targetExpressions.toArray(
+          new TargetExpression[targetExpressions.size()]);
 
-      return PlacementConstraints.cardinality(scope, min, max,
-          targetTags.toArray(new String[targetTags.size()]));
+      return PlacementConstraints.targetCardinality(scope, min, max, targetArr);
     }
   }
 
@@ -744,4 +767,4 @@ public final class PlacementConstraintParser {
     }
     return result;
   }
-}
+}

+ 65 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java

@@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.Constrai
 
 import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
 import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.and;
-import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.cardinality;
 import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.or;
 import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets;
 import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
@@ -110,11 +109,13 @@ public class TestPlacementConstraintParser {
     Assert.assertEquals("node", single.getScope());
     Assert.assertEquals(0, single.getMinCardinality());
     Assert.assertEquals(0, single.getMaxCardinality());
-    Assert.assertEquals(1, single.getTargetExpressions().size());
-    TargetExpression exp =
-        single.getTargetExpressions().iterator().next();
-    Assert.assertEquals("ALLOCATION_TAG", exp.getTargetType().toString());
-    Assert.assertEquals(3, exp.getTargetValues().size());
+    Assert.assertEquals(3, single.getTargetExpressions().size());
+    Set<TargetExpression> expectedTargetExpressions = Sets.newHashSet(
+        PlacementTargets.allocationTag("foo"),
+        PlacementTargets.allocationTag("bar"),
+        PlacementTargets.allocationTag("exp"));
+    Assert.assertTrue(Sets.difference(expectedTargetExpressions,
+        single.getTargetExpressions()).isEmpty());
     verifyConstraintToString(expressionStr, constraint);
 
     // Invalid OP
@@ -161,13 +162,13 @@ public class TestPlacementConstraintParser {
     Assert.assertEquals("rack", single.getScope());
     Assert.assertEquals(0, single.getMinCardinality());
     Assert.assertEquals(1, single.getMaxCardinality());
-    Assert.assertEquals(1, single.getTargetExpressions().size());
-    exp = single.getTargetExpressions().iterator().next();
-    Assert.assertEquals("ALLOCATION_TAG", exp.getTargetType().toString());
-    Assert.assertEquals(3, exp.getTargetValues().size());
-    Set<String> expectedTags = Sets.newHashSet("foo", "bar", "moo");
-    Assert.assertTrue(Sets.difference(expectedTags, exp.getTargetValues())
-        .isEmpty());
+    Assert.assertEquals(3, single.getTargetExpressions().size());
+    Set<TargetExpression> expectedTargetExpressions = Sets.newHashSet(
+        PlacementTargets.allocationTag("foo"),
+        PlacementTargets.allocationTag("bar"),
+        PlacementTargets.allocationTag("moo"));
+    Assert.assertTrue(Sets.difference(expectedTargetExpressions,
+        single.getTargetExpressions()).isEmpty());
     verifyConstraintToString(expressionExpr, constraint);
 
     // Invalid scope string
@@ -376,7 +377,11 @@ public class TestPlacementConstraintParser {
     tag1 = result.keySet().iterator().next();
     Assert.assertEquals("foo", tag1.getTag());
     Assert.assertEquals(10, tag1.getNumOfAllocations());
-    expectedPc1 = cardinality("node", 0, 100, "foo", "bar").build();
+    TargetExpression[] targetExpressions = new TargetExpression[] {
+        PlacementTargets.allocationTag("foo"),
+        PlacementTargets.allocationTag("bar")};
+    expectedPc1 = PlacementConstraints.targetCardinality("node", 0,
+        100, targetExpressions).build();
     Assert.assertEquals(expectedPc1, result.values().iterator().next());
 
     // Two constraint expressions
@@ -524,4 +529,50 @@ public class TestPlacementConstraintParser {
       Assert.assertTrue(e instanceof PlacementConstraintParseException);
     }
   }
+
+  @Test
+  public void testParseAllocationTagNameSpace()
+      throws PlacementConstraintParseException {
+    Map<SourceTags, PlacementConstraint> result;
+
+    // Constraint with Two Different NameSpaces
+    result = PlacementConstraintParser
+        .parsePlacementSpec("foo=2,notin,node,not-self/bar,all/moo");
+    Assert.assertEquals(1, result.size());
+    Set<TargetExpression> expectedTargetExpressions = Sets.newHashSet(
+        PlacementTargets.allocationTagWithNamespace("not-self", "bar"),
+        PlacementTargets.allocationTagWithNamespace("all", "moo"));
+    AbstractConstraint constraint = result.values().iterator().next().
+        getConstraintExpr();
+    Assert.assertTrue(constraint instanceof SingleConstraint);
+    SingleConstraint single = (SingleConstraint) constraint;
+    Assert.assertEquals(2, single.getTargetExpressions().size());
+    Assert.assertTrue(Sets.difference(expectedTargetExpressions,
+        single.getTargetExpressions()).isEmpty());
+
+    // Constraint With Default NameSpace SELF
+    result = PlacementConstraintParser
+        .parsePlacementSpec("foo=2,notin,node,moo");
+    Assert.assertEquals(1, result.size());
+    TargetExpression expectedTargetExpression = PlacementTargets.
+        allocationTagWithNamespace("self", "moo");
+    constraint = result.values().iterator().next().getConstraintExpr();
+    Assert.assertTrue(constraint instanceof SingleConstraint);
+    single = (SingleConstraint) constraint;
+    Assert.assertEquals(1, single.getTargetExpressions().size());
+    Assert.assertEquals(expectedTargetExpression,
+        single.getTargetExpressions().iterator().next());
+
+    // Constraint With Invalid NameSpace
+    boolean caughtException = false;
+    try {
+      result = PlacementConstraintParser
+          .parsePlacementSpec("foo=2,notin,node,bar/moo");
+    } catch(PlacementConstraintParseException e) {
+      caughtException = true;
+    }
+    Assert.assertTrue("PlacementConstraintParseException is expected",
+        caughtException);
+  }
+
 }

+ 145 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java

@@ -19,18 +19,23 @@
 package org.apache.hadoop.yarn.applications.distributedshell;
 import java.io.IOException;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -52,6 +57,11 @@ public class TestDSWithMultipleNodeManager {
     distShellTest.setupInternal(NUM_NMS);
   }
 
+  @After
+  public void tearDown() throws Exception {
+    distShellTest.tearDown();
+  }
+
   private void initializeNodeLabels() throws IOException {
     RMContext rmContext = distShellTest.yarnCluster.getResourceManager(0).getRMContext();
 
@@ -179,6 +189,141 @@ public class TestDSWithMultipleNodeManager {
     Assert.assertEquals(expectedNM2Count, maxRunningContainersOnNMs[1]);
   }
 
+  @Test(timeout = 90000)
+  public void testDistributedShellWithAllocationTagNamespace()
+      throws Exception {
+    NMContainerMonitor mon = new NMContainerMonitor();
+    Thread monitorThread = new Thread(mon);
+    monitorThread.start();
+
+    String[] argsA = {
+        "--jar",
+        distShellTest.APPMASTER_JAR,
+        "--shell_command",
+        distShellTest.getSleepCommand(30),
+        "--placement_spec",
+        "bar=1,notin,node,bar"
+    };
+    final Client clientA =
+        new Client(new Configuration(distShellTest.yarnCluster.getConfig()));
+    clientA.init(argsA);
+    final AtomicBoolean resultA = new AtomicBoolean(false);
+    Thread t = new Thread() {
+      public void run() {
+        try {
+          resultA.set(clientA.run());
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+    t.start();
+
+    NodeId masterContainerNodeIdA;
+    NodeId taskContainerNodeIdA;
+    ConcurrentMap<ApplicationId, RMApp> apps;
+    RMApp appA;
+
+    int expectedNM1Count = 0;
+    int expectedNM2Count = 0;
+    while (true) {
+      if ((expectedNM1Count + expectedNM2Count) < 2) {
+        expectedNM1Count = distShellTest.yarnCluster.getNodeManager(0).
+            getNMContext().getContainers().size();
+        expectedNM2Count = distShellTest.yarnCluster.getNodeManager(1).
+            getNMContext().getContainers().size();
+        continue;
+      }
+      apps = distShellTest.yarnCluster.getResourceManager().getRMContext().
+          getRMApps();
+      if (apps.isEmpty()) {
+        Thread.sleep(10);
+        continue;
+      }
+      appA = apps.values().iterator().next();
+      if (appA.getAppAttempts().isEmpty()) {
+        Thread.sleep(10);
+        continue;
+      }
+      RMAppAttempt appAttemptA = appA.getAppAttempts().values().iterator().
+          next();
+      if (appAttemptA.getMasterContainer() == null) {
+        Thread.sleep(10);
+        continue;
+      }
+      masterContainerNodeIdA = appAttemptA.getMasterContainer().getNodeId();
+      break;
+    }
+
+    NodeId nodeA = distShellTest.yarnCluster.getNodeManager(0).getNMContext().
+        getNodeId();
+    NodeId nodeB = distShellTest.yarnCluster.getNodeManager(1).getNMContext().
+        getNodeId();
+    Assert.assertEquals(2, (expectedNM1Count + expectedNM2Count));
+
+    if (expectedNM1Count != expectedNM2Count) {
+      taskContainerNodeIdA = masterContainerNodeIdA;
+    } else {
+      taskContainerNodeIdA = masterContainerNodeIdA.equals(nodeA) ? nodeB :
+          nodeA;
+    }
+
+    String[] argsB = {
+        "--jar",
+        distShellTest.APPMASTER_JAR,
+        "1",
+        "--shell_command",
+        Shell.WINDOWS ? "dir" : "ls",
+        "--placement_spec",
+        "foo=3,notin,node,all/bar"
+    };
+    final Client clientB = new Client(new Configuration(distShellTest.
+        yarnCluster.getConfig()));
+    clientB.init(argsB);
+    boolean resultB = clientB.run();
+    Assert.assertTrue(resultB);
+
+    monitorThread.interrupt();
+    apps = distShellTest.yarnCluster.getResourceManager().getRMContext().
+        getRMApps();
+    Iterator<RMApp> it = apps.values().iterator();
+    RMApp appB = it.next();
+    if (appA.equals(appB)) {
+      appB = it.next();
+    }
+    LOG.info("Allocation Tag NameSpace Applications are=" + appA.
+        getApplicationId() + " and " + appB.getApplicationId());
+
+    RMAppAttempt appAttemptB = appB.getAppAttempts().values().iterator().
+        next();
+    NodeId masterContainerNodeIdB = appAttemptB.getMasterContainer().
+        getNodeId();
+
+    if (nodeA.equals(masterContainerNodeIdB)) {
+      expectedNM1Count += 1;
+    } else {
+      expectedNM2Count += 1;
+    }
+    if (nodeA.equals(taskContainerNodeIdA)) {
+      expectedNM2Count += 3;
+    } else {
+      expectedNM1Count += 3;
+    }
+    int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport();
+    Assert.assertEquals(expectedNM1Count, maxRunningContainersOnNMs[0]);
+    Assert.assertEquals(expectedNM2Count, maxRunningContainersOnNMs[1]);
+
+    try {
+      YarnClient yarnClient = YarnClient.createYarnClient();
+      yarnClient.init(new Configuration(distShellTest.yarnCluster.
+          getConfig()));
+      yarnClient.start();
+      yarnClient.killApplication(appA.getApplicationId());
+    } catch (Exception e) {
+     // Ignore Exception while killing a job
+    }
+  }
+
   /**
    * Monitor containers running on NMs
    */