فهرست منبع

YARN-8984. AMRMClient#OutstandingSchedRequests leaks when AllocationTags is null or empty. Contributed by Yang Wang.

(cherry picked from commit 176bb3f812e49b0fe3abddf54eebfc7219b5d718)
Weiwei Yang 6 سال پیش
والد
کامیت
01477108d9

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java

@@ -1037,6 +1037,11 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     return remoteRequests.get(Long.valueOf(allocationRequestId));
   }
 
+  @VisibleForTesting
+  Map<Set<String>, List<SchedulingRequest>> getOutstandingSchedRequests() {
+    return outstandingSchedRequests;
+  }
+
   RemoteRequestsTable<T> putTable(long allocationRequestId,
       RemoteRequestsTable<T> table) {
     return remoteRequests.put(Long.valueOf(allocationRequestId), table);

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMClientTest.java

@@ -196,7 +196,9 @@ public class BaseAMRMClientTest {
 
   @After
   public void teardown() throws YarnException, IOException {
-    yarnClient.killApplication(attemptId.getApplicationId());
+    if (yarnClient != null) {
+      yarnClient.killApplication(attemptId.getApplicationId());
+    }
     attemptId = null;
 
     if (yarnClient != null &&

+ 185 - 47
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -58,66 +59,46 @@ import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.Placement
  */
 public class TestAMRMClientPlacementConstraints extends BaseAMRMClientTest {
 
+  private List<Container> allocatedContainers = null;
+  private List<RejectedSchedulingRequest> rejectedSchedulingRequests = null;
+  private Map<Set<String>, PlacementConstraint> pcMapping = null;
+
+  @Before
+  public void setup() throws Exception {
+    conf = new YarnConfiguration();
+    allocatedContainers = new ArrayList<>();
+    rejectedSchedulingRequests = new ArrayList<>();
+    pcMapping = new HashMap<>();
+    pcMapping.put(Collections.singleton("foo"),
+        PlacementConstraints.build(
+            PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))));
+    pcMapping.put(Collections.singleton("bar"),
+        PlacementConstraints.build(
+            PlacementConstraints.targetNotIn(NODE, allocationTag("bar"))));
+  }
+
   @Test(timeout=60000)
-  public void testAMRMClientWithPlacementConstraints()
+  public void testAMRMClientWithPlacementConstraintsByPlacementProcessor()
       throws Exception {
     // we have to create a new instance of MiniYARNCluster to avoid SASL qop
     // mismatches between client and server
-    teardown();
-    conf = new YarnConfiguration();
     conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
         YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER);
     createClusterAndStartApplication(conf);
 
+    allocatedContainers.clear();
+    rejectedSchedulingRequests.clear();
     AMRMClient<AMRMClient.ContainerRequest> amClient =
         AMRMClient.<AMRMClient.ContainerRequest>createAMRMClient();
     amClient.setNMTokenCache(new NMTokenCache());
     //asserting we are not using the singleton instance cache
     Assert.assertNotSame(NMTokenCache.getSingleton(),
         amClient.getNMTokenCache());
-
-    final List<Container> allocatedContainers = new ArrayList<>();
-    final List<RejectedSchedulingRequest> rejectedSchedulingRequests =
-        new ArrayList<>();
-    AMRMClientAsync asyncClient = new AMRMClientAsyncImpl<>(amClient, 1000,
-        new AMRMClientAsync.AbstractCallbackHandler() {
-          @Override
-          public void onContainersAllocated(List<Container> containers) {
-            allocatedContainers.addAll(containers);
-          }
-
-          @Override
-          public void onRequestsRejected(
-              List<RejectedSchedulingRequest> rejReqs) {
-            rejectedSchedulingRequests.addAll(rejReqs);
-          }
-
-          @Override
-          public void onContainersCompleted(List<ContainerStatus> statuses) {}
-          @Override
-          public void onContainersUpdated(List<UpdatedContainer> containers) {}
-          @Override
-          public void onShutdownRequest() {}
-          @Override
-          public void onNodesUpdated(List<NodeReport> updatedNodes) {}
-          @Override
-          public void onError(Throwable e) {}
-
-          @Override
-          public float getProgress() {
-            return 0.1f;
-          }
-        });
-
+    AMRMClientAsync asyncClient = new AMRMClientAsyncImpl<>(amClient,
+        1000, new TestCallbackHandler());
     asyncClient.init(conf);
     asyncClient.start();
-    Map<Set<String>, PlacementConstraint> pcMapping = new HashMap<>();
-    pcMapping.put(Collections.singleton("foo"),
-        PlacementConstraints.build(
-            PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))));
-    pcMapping.put(Collections.singleton("bar"),
-        PlacementConstraints.build(
-            PlacementConstraints.targetNotIn(NODE, allocationTag("bar"))));
+
     asyncClient.registerApplicationMaster("Host", 10000, "", pcMapping);
 
     // Send two types of requests - 4 with source tag "foo" have numAlloc = 1
@@ -144,6 +125,15 @@ public class TestAMRMClientPlacementConstraints extends BaseAMRMClientTest {
         allocatedContainers.stream().collect(
             Collectors.groupingBy(Container::getNodeId));
 
+    Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests =
+        ((AMRMClientImpl)amClient).getOutstandingSchedRequests();
+    // Check the outstanding SchedulingRequests
+    Assert.assertEquals(2, outstandingSchedRequests.size());
+    Assert.assertEquals(1, outstandingSchedRequests.get(
+        new HashSet<>(Collections.singletonList("foo"))).size());
+    Assert.assertEquals(1, outstandingSchedRequests.get(
+        new HashSet<>(Collections.singletonList("bar"))).size());
+
     // Ensure 2 containers allocated per node.
     // Each node should have a "foo" and a "bar" container.
     Assert.assertEquals(3, containersPerNode.entrySet().size());
@@ -169,6 +159,140 @@ public class TestAMRMClientPlacementConstraints extends BaseAMRMClientTest {
     asyncClient.stop();
   }
 
+  @Test(timeout=60000)
+  public void testAMRMClientWithPlacementConstraintsByScheduler()
+      throws Exception {
+    // we have to create a new instance of MiniYARNCluster to avoid SASL qop
+    // mismatches between client and server
+    conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+        YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+    createClusterAndStartApplication(conf);
+
+    allocatedContainers.clear();
+    rejectedSchedulingRequests.clear();
+    AMRMClient<AMRMClient.ContainerRequest> amClient =
+        AMRMClient.<AMRMClient.ContainerRequest>createAMRMClient();
+    amClient.setNMTokenCache(new NMTokenCache());
+    //asserting we are not using the singleton instance cache
+    Assert.assertNotSame(NMTokenCache.getSingleton(),
+        amClient.getNMTokenCache());
+    AMRMClientAsync asyncClient = new AMRMClientAsyncImpl<>(amClient,
+        1000, new TestCallbackHandler());
+    asyncClient.init(conf);
+    asyncClient.start();
+
+    asyncClient.registerApplicationMaster("Host", 10000, "", pcMapping);
+
+    // Send two types of requests - 4 with source tag "foo" have numAlloc = 1
+    // and 1 with source tag "bar" and has numAlloc = 4. Both should be
+    // handled similarly. i.e: Since there are only 3 nodes,
+    // 2 schedulingRequests - 1 with source tag "foo" on one with source
+    // tag "bar" should get rejected.
+    asyncClient.addSchedulingRequests(
+        Arrays.asList(
+            // 4 reqs with numAlloc = 1
+            schedulingRequest(1, 1, 1, 1, 512, "foo"),
+            schedulingRequest(1, 1, 2, 1, 512, "foo"),
+            schedulingRequest(1, 1, 3, 1, 512, "foo"),
+            schedulingRequest(1, 1, 4, 1, 512, "foo"),
+            // 1 req with numAlloc = 4
+            schedulingRequest(4, 1, 5, 1, 512, "bar"),
+            // 1 empty tag
+            schedulingRequest(1, 1, 6, 1, 512, new HashSet<>())));
+
+    // kick the scheduler
+    waitForContainerAllocation(allocatedContainers,
+        rejectedSchedulingRequests, 7, 0);
+
+    Assert.assertEquals(7, allocatedContainers.size());
+    Map<NodeId, List<Container>> containersPerNode =
+        allocatedContainers.stream().collect(
+            Collectors.groupingBy(Container::getNodeId));
+
+    Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests =
+        ((AMRMClientImpl)amClient).getOutstandingSchedRequests();
+    // Check the outstanding SchedulingRequests
+    Assert.assertEquals(3, outstandingSchedRequests.size());
+    Assert.assertEquals(1, outstandingSchedRequests.get(
+        new HashSet<>(Collections.singletonList("foo"))).size());
+    Assert.assertEquals(1, outstandingSchedRequests.get(
+        new HashSet<>(Collections.singletonList("bar"))).size());
+    Assert.assertEquals(0, outstandingSchedRequests.get(
+        new HashSet<String>()).size());
+
+    // Each node should have a "foo" and a "bar" container.
+    Assert.assertEquals(3, containersPerNode.entrySet().size());
+    HashSet<String> srcTags = new HashSet<>(Arrays.asList("foo", "bar"));
+    containersPerNode.entrySet().forEach(
+        x ->
+          Assert.assertEquals(
+              srcTags,
+              x.getValue()
+                  .stream()
+                  .filter(y -> !y.getAllocationTags().isEmpty())
+                  .map(y -> y.getAllocationTags().iterator().next())
+                  .collect(Collectors.toSet()))
+    );
+
+    // The rejected requests were not set by scheduler
+    Assert.assertEquals(0, rejectedSchedulingRequests.size());
+
+    asyncClient.stop();
+  }
+
+
+  @Test
+  /*
+   * Three cases of empty HashSet key of outstandingSchedRequests
+   * 1. Not set any tags
+   * 2. Set a empty set, e.g ImmutableSet.of(), new HashSet<>()
+   * 3. Set tag as null
+   */
+  public void testEmptyKeyOfOutstandingSchedRequests() {
+    AMRMClient<AMRMClient.ContainerRequest> amClient =
+        AMRMClient.<AMRMClient.ContainerRequest>createAMRMClient();
+    HashSet<String> schedRequest = null;
+    amClient.addSchedulingRequests(Arrays.asList(
+        schedulingRequest(1, 1, 1, 1, 512, ExecutionType.GUARANTEED),
+        schedulingRequest(1, 1, 2, 1, 512, new HashSet<>()),
+        schedulingRequest(1, 1, 3, 1, 512, schedRequest)));
+    Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests =
+        ((AMRMClientImpl)amClient).getOutstandingSchedRequests();
+    Assert.assertEquals(1, outstandingSchedRequests.size());
+    Assert.assertEquals(3, outstandingSchedRequests
+        .get(new HashSet<String>()).size());
+  }
+
+  private class TestCallbackHandler extends
+      AMRMClientAsync.AbstractCallbackHandler {
+    @Override
+    public void onContainersAllocated(List<Container> containers) {
+      allocatedContainers.addAll(containers);
+    }
+
+    @Override
+    public void onRequestsRejected(
+        List<RejectedSchedulingRequest> rejReqs) {
+      rejectedSchedulingRequests.addAll(rejReqs);
+    }
+
+    @Override
+    public void onContainersCompleted(List<ContainerStatus> statuses) {}
+    @Override
+    public void onContainersUpdated(List<UpdatedContainer> containers) {}
+    @Override
+    public void onShutdownRequest() {}
+    @Override
+    public void onNodesUpdated(List<NodeReport> updatedNodes) {}
+    @Override
+    public void onError(Throwable e) {}
+
+    @Override
+    public float getProgress() {
+      return 0.1f;
+    }
+  }
+
   private static void waitForContainerAllocation(
       List<Container> allocatedContainers,
       List<RejectedSchedulingRequest> rejectedRequests,
@@ -186,16 +310,30 @@ public class TestAMRMClientPlacementConstraints extends BaseAMRMClientTest {
   private static SchedulingRequest schedulingRequest(int numAllocations,
       int priority, long allocReqId, int cores, int mem, String... tags) {
     return schedulingRequest(numAllocations, priority, allocReqId, cores, mem,
-        ExecutionType.GUARANTEED, tags);
+        ExecutionType.GUARANTEED, new HashSet<>(Arrays.asList(tags)));
+  }
+
+  private static SchedulingRequest schedulingRequest(int numAllocations,
+      int priority, long allocReqId, int cores, int mem, Set<String> tags) {
+    return schedulingRequest(numAllocations,
+        priority, allocReqId, cores, mem, ExecutionType.GUARANTEED, tags);
+  }
+
+  private static SchedulingRequest schedulingRequest(int numAllocations,
+      int priority, long allocReqId, int cores, int mem,
+      ExecutionType execType, Set<String> tags) {
+    SchedulingRequest schedRequest = schedulingRequest(numAllocations,
+        priority, allocReqId, cores, mem, execType);
+    schedRequest.setAllocationTags(tags);
+    return schedRequest;
   }
 
   private static SchedulingRequest schedulingRequest(int numAllocations,
       int priority, long allocReqId, int cores, int mem,
-      ExecutionType execType, String... tags) {
+      ExecutionType execType) {
     return SchedulingRequest.newBuilder()
         .priority(Priority.newInstance(priority))
         .allocationRequestId(allocReqId)
-        .allocationTags(new HashSet<>(Arrays.asList(tags)))
         .executionType(ExecutionTypeRequest.newInstance(execType, true))
         .resourceSizing(
             ResourceSizing.newInstance(numAllocations,

+ 1 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java

@@ -197,8 +197,7 @@ public final class AMRMClientUtils {
       return;
     }
     for (Container container : containers) {
-      if (container.getAllocationTags() != null
-          && !container.getAllocationTags().isEmpty()) {
+      if (container.getAllocationTags() != null) {
         List<SchedulingRequest> schedReqs =
             outstandingSchedRequests.get(container.getAllocationTags());
         if (schedReqs != null && !schedReqs.isEmpty()) {