|
@@ -64,8 +64,10 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
|
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
|
|
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.MemoryRMNodeLabelsManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|
@@ -83,12 +85,15 @@ import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
+import com.google.common.collect.ImmutableSet;
|
|
|
import com.google.common.collect.Sets;
|
|
|
|
|
|
public class TestSchedulerUtils {
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(TestSchedulerUtils.class);
|
|
|
|
|
|
+ private RMContext rmContext = getMockRMContext();
|
|
|
+
|
|
|
@Test (timeout = 30000)
|
|
|
public void testNormalizeRequest() {
|
|
|
ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
|
|
@@ -206,6 +211,8 @@ public class TestSchedulerUtils {
|
|
|
// set queue accessible node labesl to [x, y]
|
|
|
queueAccessibleNodeLabels.clear();
|
|
|
queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y"));
|
|
|
+ rmContext.getNodeLabelManager().addToCluserNodeLabels(
|
|
|
+ ImmutableSet.of("x","y"));
|
|
|
Resource resource = Resources.createResource(
|
|
|
0,
|
|
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
|
|
@@ -213,22 +220,44 @@ public class TestSchedulerUtils {
|
|
|
mock(Priority.class), ResourceRequest.ANY, resource, 1);
|
|
|
resReq.setNodeLabelExpression("x");
|
|
|
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
|
|
- scheduler);
|
|
|
+ scheduler, rmContext);
|
|
|
|
|
|
resReq.setNodeLabelExpression("y");
|
|
|
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
|
|
- scheduler);
|
|
|
+ scheduler, rmContext);
|
|
|
|
|
|
resReq.setNodeLabelExpression("");
|
|
|
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
|
|
- scheduler);
|
|
|
+ scheduler, rmContext);
|
|
|
|
|
|
resReq.setNodeLabelExpression(" ");
|
|
|
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
|
|
- scheduler);
|
|
|
+ scheduler, rmContext);
|
|
|
} catch (InvalidResourceRequestException e) {
|
|
|
e.printStackTrace();
|
|
|
fail("Should be valid when request labels is a subset of queue labels");
|
|
|
+ } finally {
|
|
|
+ rmContext.getNodeLabelManager().removeFromClusterNodeLabels(
|
|
|
+ Arrays.asList("x", "y"));
|
|
|
+ }
|
|
|
+
|
|
|
+ // same as above, but cluster node labels don't contains label being
|
|
|
+ // requested. should fail
|
|
|
+ try {
|
|
|
+ // set queue accessible node labesl to [x, y]
|
|
|
+ queueAccessibleNodeLabels.clear();
|
|
|
+ queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y"));
|
|
|
+ Resource resource = Resources.createResource(
|
|
|
+ 0,
|
|
|
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
|
|
|
+ ResourceRequest resReq = BuilderUtils.newResourceRequest(
|
|
|
+ mock(Priority.class), ResourceRequest.ANY, resource, 1);
|
|
|
+ resReq.setNodeLabelExpression("x");
|
|
|
+ SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
|
|
+ scheduler, rmContext);
|
|
|
+
|
|
|
+ fail("Should fail");
|
|
|
+ } catch (InvalidResourceRequestException e) {
|
|
|
}
|
|
|
|
|
|
// queue has labels, failed cases (when ask a label not included by queue)
|
|
@@ -236,6 +265,8 @@ public class TestSchedulerUtils {
|
|
|
// set queue accessible node labesl to [x, y]
|
|
|
queueAccessibleNodeLabels.clear();
|
|
|
queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y"));
|
|
|
+ rmContext.getNodeLabelManager().addToCluserNodeLabels(
|
|
|
+ ImmutableSet.of("x","y"));
|
|
|
|
|
|
Resource resource = Resources.createResource(
|
|
|
0,
|
|
@@ -244,9 +275,12 @@ public class TestSchedulerUtils {
|
|
|
mock(Priority.class), ResourceRequest.ANY, resource, 1);
|
|
|
resReq.setNodeLabelExpression("z");
|
|
|
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
|
|
- scheduler);
|
|
|
+ scheduler, rmContext);
|
|
|
fail("Should fail");
|
|
|
} catch (InvalidResourceRequestException e) {
|
|
|
+ } finally {
|
|
|
+ rmContext.getNodeLabelManager().removeFromClusterNodeLabels(
|
|
|
+ Arrays.asList("x", "y"));
|
|
|
}
|
|
|
|
|
|
// we don't allow specify more than two node labels in a single expression
|
|
@@ -255,6 +289,8 @@ public class TestSchedulerUtils {
|
|
|
// set queue accessible node labesl to [x, y]
|
|
|
queueAccessibleNodeLabels.clear();
|
|
|
queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y"));
|
|
|
+ rmContext.getNodeLabelManager().addToCluserNodeLabels(
|
|
|
+ ImmutableSet.of("x","y"));
|
|
|
|
|
|
Resource resource = Resources.createResource(
|
|
|
0,
|
|
@@ -263,9 +299,12 @@ public class TestSchedulerUtils {
|
|
|
mock(Priority.class), ResourceRequest.ANY, resource, 1);
|
|
|
resReq.setNodeLabelExpression("x && y");
|
|
|
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
|
|
- scheduler);
|
|
|
+ scheduler, rmContext);
|
|
|
fail("Should fail");
|
|
|
} catch (InvalidResourceRequestException e) {
|
|
|
+ } finally {
|
|
|
+ rmContext.getNodeLabelManager().removeFromClusterNodeLabels(
|
|
|
+ Arrays.asList("x", "y"));
|
|
|
}
|
|
|
|
|
|
// queue doesn't have label, succeed (when request no label)
|
|
@@ -280,15 +319,15 @@ public class TestSchedulerUtils {
|
|
|
ResourceRequest resReq = BuilderUtils.newResourceRequest(
|
|
|
mock(Priority.class), ResourceRequest.ANY, resource, 1);
|
|
|
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
|
|
- scheduler);
|
|
|
+ scheduler, rmContext);
|
|
|
|
|
|
resReq.setNodeLabelExpression("");
|
|
|
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
|
|
- scheduler);
|
|
|
+ scheduler, rmContext);
|
|
|
|
|
|
resReq.setNodeLabelExpression(" ");
|
|
|
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
|
|
- scheduler);
|
|
|
+ scheduler, rmContext);
|
|
|
} catch (InvalidResourceRequestException e) {
|
|
|
e.printStackTrace();
|
|
|
fail("Should be valid when request labels is empty");
|
|
@@ -299,6 +338,9 @@ public class TestSchedulerUtils {
|
|
|
// set queue accessible node labels to empty
|
|
|
queueAccessibleNodeLabels.clear();
|
|
|
|
|
|
+ rmContext.getNodeLabelManager().addToCluserNodeLabels(
|
|
|
+ ImmutableSet.of("x"));
|
|
|
+
|
|
|
Resource resource = Resources.createResource(
|
|
|
0,
|
|
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
|
|
@@ -306,9 +348,12 @@ public class TestSchedulerUtils {
|
|
|
mock(Priority.class), ResourceRequest.ANY, resource, 1);
|
|
|
resReq.setNodeLabelExpression("x");
|
|
|
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
|
|
- scheduler);
|
|
|
+ scheduler, rmContext);
|
|
|
fail("Should fail");
|
|
|
} catch (InvalidResourceRequestException e) {
|
|
|
+ } finally {
|
|
|
+ rmContext.getNodeLabelManager().removeFromClusterNodeLabels(
|
|
|
+ Arrays.asList("x"));
|
|
|
}
|
|
|
|
|
|
// queue is "*", always succeeded
|
|
@@ -317,6 +362,9 @@ public class TestSchedulerUtils {
|
|
|
queueAccessibleNodeLabels.clear();
|
|
|
queueAccessibleNodeLabels.add(RMNodeLabelsManager.ANY);
|
|
|
|
|
|
+ rmContext.getNodeLabelManager().addToCluserNodeLabels(
|
|
|
+ ImmutableSet.of("x","y", "z"));
|
|
|
+
|
|
|
Resource resource = Resources.createResource(
|
|
|
0,
|
|
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
|
|
@@ -324,18 +372,39 @@ public class TestSchedulerUtils {
|
|
|
mock(Priority.class), ResourceRequest.ANY, resource, 1);
|
|
|
resReq.setNodeLabelExpression("x");
|
|
|
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
|
|
- scheduler);
|
|
|
+ scheduler, rmContext);
|
|
|
|
|
|
resReq.setNodeLabelExpression("y");
|
|
|
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
|
|
- scheduler);
|
|
|
+ scheduler, rmContext);
|
|
|
|
|
|
resReq.setNodeLabelExpression("z");
|
|
|
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
|
|
- scheduler);
|
|
|
+ scheduler, rmContext);
|
|
|
} catch (InvalidResourceRequestException e) {
|
|
|
e.printStackTrace();
|
|
|
fail("Should be valid when queue can access any labels");
|
|
|
+ } finally {
|
|
|
+ rmContext.getNodeLabelManager().removeFromClusterNodeLabels(
|
|
|
+ Arrays.asList("x", "y", "z"));
|
|
|
+ }
|
|
|
+
|
|
|
+ // same as above, but cluster node labels don't contains label, should fail
|
|
|
+ try {
|
|
|
+ // set queue accessible node labels to empty
|
|
|
+ queueAccessibleNodeLabels.clear();
|
|
|
+ queueAccessibleNodeLabels.add(RMNodeLabelsManager.ANY);
|
|
|
+
|
|
|
+ Resource resource = Resources.createResource(
|
|
|
+ 0,
|
|
|
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
|
|
|
+ ResourceRequest resReq = BuilderUtils.newResourceRequest(
|
|
|
+ mock(Priority.class), ResourceRequest.ANY, resource, 1);
|
|
|
+ resReq.setNodeLabelExpression("x");
|
|
|
+ SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
|
|
+ scheduler, rmContext);
|
|
|
+ fail("Should fail");
|
|
|
+ } catch (InvalidResourceRequestException e) {
|
|
|
}
|
|
|
|
|
|
// we don't allow resource name other than ANY and specify label
|
|
@@ -343,6 +412,9 @@ public class TestSchedulerUtils {
|
|
|
// set queue accessible node labesl to [x, y]
|
|
|
queueAccessibleNodeLabels.clear();
|
|
|
queueAccessibleNodeLabels.addAll(Arrays.asList("x", "y"));
|
|
|
+ rmContext.getNodeLabelManager().addToCluserNodeLabels(
|
|
|
+ ImmutableSet.of("x", "y"));
|
|
|
+
|
|
|
|
|
|
Resource resource = Resources.createResource(
|
|
|
0,
|
|
@@ -351,9 +423,12 @@ public class TestSchedulerUtils {
|
|
|
mock(Priority.class), "rack", resource, 1);
|
|
|
resReq.setNodeLabelExpression("x");
|
|
|
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
|
|
- scheduler);
|
|
|
+ scheduler, rmContext);
|
|
|
fail("Should fail");
|
|
|
} catch (InvalidResourceRequestException e) {
|
|
|
+ } finally {
|
|
|
+ rmContext.getNodeLabelManager().removeFromClusterNodeLabels(
|
|
|
+ Arrays.asList("x", "y"));
|
|
|
}
|
|
|
|
|
|
// we don't allow resource name other than ANY and specify label even if
|
|
@@ -363,6 +438,8 @@ public class TestSchedulerUtils {
|
|
|
queueAccessibleNodeLabels.clear();
|
|
|
queueAccessibleNodeLabels.addAll(Arrays
|
|
|
.asList(CommonNodeLabelsManager.ANY));
|
|
|
+ rmContext.getNodeLabelManager().addToCluserNodeLabels(
|
|
|
+ ImmutableSet.of("x"));
|
|
|
|
|
|
Resource resource = Resources.createResource(
|
|
|
0,
|
|
@@ -371,9 +448,12 @@ public class TestSchedulerUtils {
|
|
|
mock(Priority.class), "rack", resource, 1);
|
|
|
resReq.setNodeLabelExpression("x");
|
|
|
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, "queue",
|
|
|
- scheduler);
|
|
|
+ scheduler, rmContext);
|
|
|
fail("Should fail");
|
|
|
} catch (InvalidResourceRequestException e) {
|
|
|
+ } finally {
|
|
|
+ rmContext.getNodeLabelManager().removeFromClusterNodeLabels(
|
|
|
+ Arrays.asList("x"));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -395,7 +475,7 @@ public class TestSchedulerUtils {
|
|
|
BuilderUtils.newResourceRequest(mock(Priority.class),
|
|
|
ResourceRequest.ANY, resource, 1);
|
|
|
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
|
|
|
- mockScheduler);
|
|
|
+ mockScheduler, rmContext);
|
|
|
} catch (InvalidResourceRequestException e) {
|
|
|
fail("Zero memory should be accepted");
|
|
|
}
|
|
@@ -409,7 +489,7 @@ public class TestSchedulerUtils {
|
|
|
BuilderUtils.newResourceRequest(mock(Priority.class),
|
|
|
ResourceRequest.ANY, resource, 1);
|
|
|
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
|
|
|
- mockScheduler);
|
|
|
+ mockScheduler, rmContext);
|
|
|
} catch (InvalidResourceRequestException e) {
|
|
|
fail("Zero vcores should be accepted");
|
|
|
}
|
|
@@ -424,7 +504,7 @@ public class TestSchedulerUtils {
|
|
|
BuilderUtils.newResourceRequest(mock(Priority.class),
|
|
|
ResourceRequest.ANY, resource, 1);
|
|
|
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
|
|
|
- mockScheduler);
|
|
|
+ mockScheduler, rmContext);
|
|
|
} catch (InvalidResourceRequestException e) {
|
|
|
fail("Max memory should be accepted");
|
|
|
}
|
|
@@ -439,7 +519,7 @@ public class TestSchedulerUtils {
|
|
|
BuilderUtils.newResourceRequest(mock(Priority.class),
|
|
|
ResourceRequest.ANY, resource, 1);
|
|
|
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
|
|
|
- mockScheduler);
|
|
|
+ mockScheduler, rmContext);
|
|
|
} catch (InvalidResourceRequestException e) {
|
|
|
fail("Max vcores should not be accepted");
|
|
|
}
|
|
@@ -453,7 +533,7 @@ public class TestSchedulerUtils {
|
|
|
BuilderUtils.newResourceRequest(mock(Priority.class),
|
|
|
ResourceRequest.ANY, resource, 1);
|
|
|
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
|
|
|
- mockScheduler);
|
|
|
+ mockScheduler, rmContext);
|
|
|
fail("Negative memory should not be accepted");
|
|
|
} catch (InvalidResourceRequestException e) {
|
|
|
// expected
|
|
@@ -468,7 +548,7 @@ public class TestSchedulerUtils {
|
|
|
BuilderUtils.newResourceRequest(mock(Priority.class),
|
|
|
ResourceRequest.ANY, resource, 1);
|
|
|
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
|
|
|
- mockScheduler);
|
|
|
+ mockScheduler, rmContext);
|
|
|
fail("Negative vcores should not be accepted");
|
|
|
} catch (InvalidResourceRequestException e) {
|
|
|
// expected
|
|
@@ -484,7 +564,7 @@ public class TestSchedulerUtils {
|
|
|
BuilderUtils.newResourceRequest(mock(Priority.class),
|
|
|
ResourceRequest.ANY, resource, 1);
|
|
|
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
|
|
|
- mockScheduler);
|
|
|
+ mockScheduler, rmContext);
|
|
|
fail("More than max memory should not be accepted");
|
|
|
} catch (InvalidResourceRequestException e) {
|
|
|
// expected
|
|
@@ -501,7 +581,7 @@ public class TestSchedulerUtils {
|
|
|
BuilderUtils.newResourceRequest(mock(Priority.class),
|
|
|
ResourceRequest.ANY, resource, 1);
|
|
|
SchedulerUtils.normalizeAndvalidateRequest(resReq, maxResource, null,
|
|
|
- mockScheduler);
|
|
|
+ mockScheduler, rmContext);
|
|
|
fail("More than max vcores should not be accepted");
|
|
|
} catch (InvalidResourceRequestException e) {
|
|
|
// expected
|
|
@@ -632,4 +712,12 @@ public class TestSchedulerUtils {
|
|
|
Assert.assertNull(applications.get(appId));
|
|
|
return app;
|
|
|
}
|
|
|
+
|
|
|
+ private static RMContext getMockRMContext() {
|
|
|
+ RMContext rmContext = mock(RMContext.class);
|
|
|
+ RMNodeLabelsManager nlm = new MemoryRMNodeLabelsManager();
|
|
|
+ nlm.init(new Configuration(false));
|
|
|
+ when(rmContext.getNodeLabelManager()).thenReturn(nlm);
|
|
|
+ return rmContext;
|
|
|
+ }
|
|
|
}
|