|
@@ -18,13 +18,16 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.client.api.impl;
|
|
|
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
+import static org.junit.Assert.fail;
|
|
|
import static org.mockito.Matchers.any;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
@@ -97,6 +100,7 @@ public class TestAMRMClient {
|
|
|
static String rack;
|
|
|
static String[] nodes;
|
|
|
static String[] racks;
|
|
|
+ private final static int DEFAULT_ITERATION = 3;
|
|
|
|
|
|
@BeforeClass
|
|
|
public static void setup() throws Exception {
|
|
@@ -476,6 +480,144 @@ public class TestAMRMClient {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ @Test (timeout=60000)
|
|
|
+ public void testAllocationWithBlacklist() throws YarnException, IOException {
|
|
|
+ AMRMClientImpl<ContainerRequest> amClient = null;
|
|
|
+ try {
|
|
|
+ // start am rm client
|
|
|
+ amClient =
|
|
|
+ (AMRMClientImpl<ContainerRequest>) AMRMClient
|
|
|
+ .<ContainerRequest> createAMRMClient();
|
|
|
+ amClient.init(conf);
|
|
|
+ amClient.start();
|
|
|
+ amClient.registerApplicationMaster("Host", 10000, "");
|
|
|
+
|
|
|
+ assertTrue(amClient.ask.size() == 0);
|
|
|
+ assertTrue(amClient.release.size() == 0);
|
|
|
+
|
|
|
+ ContainerRequest storedContainer1 =
|
|
|
+ new ContainerRequest(capability, nodes, racks, priority);
|
|
|
+ amClient.addContainerRequest(storedContainer1);
|
|
|
+ assertTrue(amClient.ask.size() == 3);
|
|
|
+ assertTrue(amClient.release.size() == 0);
|
|
|
+
|
|
|
+ List<String> localNodeBlacklist = new ArrayList<String>();
|
|
|
+ localNodeBlacklist.add(node);
|
|
|
+
|
|
|
+ // put node in black list, so no container assignment
|
|
|
+ amClient.updateBlacklist(localNodeBlacklist, null);
|
|
|
+
|
|
|
+ int allocatedContainerCount = getAllocatedContainersNumber(amClient,
|
|
|
+ DEFAULT_ITERATION);
|
|
|
+ // the only node is in blacklist, so no allocation
|
|
|
+ assertTrue(allocatedContainerCount == 0);
|
|
|
+
|
|
|
+ // Remove node from blacklist, so get assigned with 2
|
|
|
+ amClient.updateBlacklist(null, localNodeBlacklist);
|
|
|
+ ContainerRequest storedContainer2 =
|
|
|
+ new ContainerRequest(capability, nodes, racks, priority);
|
|
|
+ amClient.addContainerRequest(storedContainer2);
|
|
|
+ allocatedContainerCount = getAllocatedContainersNumber(amClient,
|
|
|
+ DEFAULT_ITERATION);
|
|
|
+ assertEquals(allocatedContainerCount, 2);
|
|
|
+
|
|
|
+ // Test in case exception in allocate(), blacklist is kept
|
|
|
+ assertTrue(amClient.blacklistAdditions.isEmpty());
|
|
|
+ assertTrue(amClient.blacklistRemovals.isEmpty());
|
|
|
+
|
|
|
+ // create a invalid ContainerRequest - memory value is minus
|
|
|
+ ContainerRequest invalidContainerRequest =
|
|
|
+ new ContainerRequest(Resource.newInstance(-1024, 1),
|
|
|
+ nodes, racks, priority);
|
|
|
+ amClient.addContainerRequest(invalidContainerRequest);
|
|
|
+ amClient.updateBlacklist(localNodeBlacklist, null);
|
|
|
+ try {
|
|
|
+ // allocate() should complain as ContainerRequest is invalid.
|
|
|
+ amClient.allocate(0.1f);
|
|
|
+ fail("there should be an exception here.");
|
|
|
+ } catch (Exception e) {
|
|
|
+ assertEquals(amClient.blacklistAdditions.size(), 1);
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
|
|
|
+ amClient.stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test (timeout=60000)
|
|
|
+ public void testAMRMClientWithBlacklist() throws YarnException, IOException {
|
|
|
+ AMRMClientImpl<ContainerRequest> amClient = null;
|
|
|
+ try {
|
|
|
+ // start am rm client
|
|
|
+ amClient =
|
|
|
+ (AMRMClientImpl<ContainerRequest>) AMRMClient
|
|
|
+ .<ContainerRequest> createAMRMClient();
|
|
|
+ amClient.init(conf);
|
|
|
+ amClient.start();
|
|
|
+ amClient.registerApplicationMaster("Host", 10000, "");
|
|
|
+ String[] nodes = {"node1", "node2", "node3"};
|
|
|
+
|
|
|
+ // Add nodes[0] and nodes[1]
|
|
|
+ List<String> nodeList01 = new ArrayList<String>();
|
|
|
+ nodeList01.add(nodes[0]);
|
|
|
+ nodeList01.add(nodes[1]);
|
|
|
+ amClient.updateBlacklist(nodeList01, null);
|
|
|
+ assertEquals(amClient.blacklistAdditions.size(),2);
|
|
|
+ assertEquals(amClient.blacklistRemovals.size(),0);
|
|
|
+
|
|
|
+ // Add nodes[0] again, verify it is not added duplicated.
|
|
|
+ List<String> nodeList02 = new ArrayList<String>();
|
|
|
+ nodeList02.add(nodes[0]);
|
|
|
+ nodeList02.add(nodes[2]);
|
|
|
+ amClient.updateBlacklist(nodeList02, null);
|
|
|
+ assertEquals(amClient.blacklistAdditions.size(),3);
|
|
|
+ assertEquals(amClient.blacklistRemovals.size(),0);
|
|
|
+
|
|
|
+ // Add nodes[1] and nodes[2] to removal list,
|
|
|
+ // Verify addition list remove these two nodes.
|
|
|
+ List<String> nodeList12 = new ArrayList<String>();
|
|
|
+ nodeList12.add(nodes[1]);
|
|
|
+ nodeList12.add(nodes[2]);
|
|
|
+ amClient.updateBlacklist(null, nodeList12);
|
|
|
+ assertEquals(amClient.blacklistAdditions.size(),1);
|
|
|
+ assertEquals(amClient.blacklistRemovals.size(),2);
|
|
|
+
|
|
|
+ // Add nodes[1] again to addition list,
|
|
|
+ // Verify removal list will remove this node.
|
|
|
+ List<String> nodeList1 = new ArrayList<String>();
|
|
|
+ nodeList1.add(nodes[1]);
|
|
|
+ amClient.updateBlacklist(nodeList1, null);
|
|
|
+ assertEquals(amClient.blacklistAdditions.size(),2);
|
|
|
+ assertEquals(amClient.blacklistRemovals.size(),1);
|
|
|
+ } finally {
|
|
|
+ if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
|
|
|
+ amClient.stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private int getAllocatedContainersNumber(
|
|
|
+ AMRMClientImpl<ContainerRequest> amClient, int iterationsLeft)
|
|
|
+ throws YarnException, IOException {
|
|
|
+ int allocatedContainerCount = 0;
|
|
|
+ while (iterationsLeft-- > 0) {
|
|
|
+ Log.info(" == alloc " + allocatedContainerCount + " it left " + iterationsLeft);
|
|
|
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
|
|
|
+ assertTrue(amClient.ask.size() == 0);
|
|
|
+ assertTrue(amClient.release.size() == 0);
|
|
|
+
|
|
|
+ assertTrue(nodeCount == amClient.getClusterNodeCount());
|
|
|
+ allocatedContainerCount += allocResponse.getAllocatedContainers().size();
|
|
|
+
|
|
|
+ if(allocatedContainerCount == 0) {
|
|
|
+ // sleep to let NM's heartbeat to RM and trigger allocations
|
|
|
+ sleep(100);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return allocatedContainerCount;
|
|
|
+ }
|
|
|
|
|
|
@Test (timeout=60000)
|
|
|
public void testAMRMClient() throws YarnException, IOException {
|