|
@@ -19,34 +19,30 @@
|
|
|
package org.apache.hadoop.yarn.server.nodemanager.scheduler;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
|
|
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|
|
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateRequest;
|
|
|
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
|
|
|
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
|
|
|
-import org.apache.hadoop.yarn.api.protocolrecords
|
|
|
- .RegisterApplicationMasterRequest;
|
|
|
-import org.apache.hadoop.yarn.api.protocolrecords
|
|
|
- .RegisterApplicationMasterResponse;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.Container;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
|
|
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
|
|
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
|
|
|
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
|
|
|
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
|
|
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
|
|
|
-import org.apache.hadoop.yarn.server.nodemanager.security
|
|
|
- .NMContainerTokenSecretManager;
|
|
|
-import org.apache.hadoop.yarn.server.nodemanager.security
|
|
|
- .NMTokenSecretManagerInNM;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
|
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
|
import org.apache.hadoop.yarn.util.Records;
|
|
|
import org.junit.Assert;
|
|
@@ -63,27 +59,30 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
-public class TestLocalScheduler {
|
|
|
+/**
|
|
|
+ * Test cases for {@link DistributedScheduler}.
|
|
|
+ */
|
|
|
+public class TestDistributedScheduler {
|
|
|
|
|
|
@Test
|
|
|
- public void testLocalScheduler() throws Exception {
|
|
|
+ public void testDistributedScheduler() throws Exception {
|
|
|
|
|
|
Configuration conf = new Configuration();
|
|
|
- LocalScheduler localScheduler = new LocalScheduler();
|
|
|
+ DistributedScheduler distributedScheduler = new DistributedScheduler();
|
|
|
|
|
|
- RequestInterceptor finalReqIntcptr = setup(conf, localScheduler);
|
|
|
+ RequestInterceptor finalReqIntcptr = setup(conf, distributedScheduler);
|
|
|
|
|
|
- registerAM(localScheduler, finalReqIntcptr, Arrays.asList(
|
|
|
+ registerAM(distributedScheduler, finalReqIntcptr, Arrays.asList(
|
|
|
NodeId.newInstance("a", 1), NodeId.newInstance("b", 2)));
|
|
|
|
|
|
- final AtomicBoolean flipFlag = new AtomicBoolean(false);
|
|
|
+ final AtomicBoolean flipFlag = new AtomicBoolean(true);
|
|
|
Mockito.when(
|
|
|
finalReqIntcptr.allocateForDistributedScheduling(
|
|
|
- Mockito.any(DistSchedAllocateRequest.class)))
|
|
|
- .thenAnswer(new Answer<DistSchedAllocateResponse>() {
|
|
|
+ Mockito.any(DistributedSchedulingAllocateRequest.class)))
|
|
|
+ .thenAnswer(new Answer<DistributedSchedulingAllocateResponse>() {
|
|
|
@Override
|
|
|
- public DistSchedAllocateResponse answer(InvocationOnMock
|
|
|
- invocationOnMock) throws Throwable {
|
|
|
+ public DistributedSchedulingAllocateResponse answer(
|
|
|
+ InvocationOnMock invocationOnMock) throws Throwable {
|
|
|
flipFlag.set(!flipFlag.get());
|
|
|
if (flipFlag.get()) {
|
|
|
return createAllocateResponse(Arrays.asList(
|
|
@@ -101,15 +100,15 @@ public class TestLocalScheduler {
|
|
|
|
|
|
ResourceRequest opportunisticReq =
|
|
|
createResourceRequest(ExecutionType.OPPORTUNISTIC, 4, "*");
|
|
|
+
|
|
|
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
|
|
|
|
|
|
// Verify 4 containers were allocated
|
|
|
AllocateResponse allocateResponse =
|
|
|
- localScheduler.allocate(allocateRequest);
|
|
|
+ distributedScheduler.allocate(allocateRequest);
|
|
|
Assert.assertEquals(4, allocateResponse.getAllocatedContainers().size());
|
|
|
|
|
|
- // Verify equal distribution on hosts a and b
|
|
|
- // And None on c and d
|
|
|
+ // Verify equal distribution on hosts a and b, and none on c or d
|
|
|
Map<NodeId, List<ContainerId>> allocs = mapAllocs(allocateResponse, 4);
|
|
|
Assert.assertEquals(2, allocs.get(NodeId.newInstance("a", 1)).size());
|
|
|
Assert.assertEquals(2, allocs.get(NodeId.newInstance("b", 2)).size());
|
|
@@ -123,18 +122,18 @@ public class TestLocalScheduler {
|
|
|
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
|
|
|
|
|
|
// Verify 6 containers were allocated
|
|
|
- allocateResponse = localScheduler.allocate(allocateRequest);
|
|
|
+ allocateResponse = distributedScheduler.allocate(allocateRequest);
|
|
|
Assert.assertEquals(6, allocateResponse.getAllocatedContainers().size());
|
|
|
|
|
|
- // Verify New containers are equally distribution on hosts c and d
|
|
|
- // And None on a and b
|
|
|
+ // Verify new containers are equally distribution on hosts c and d,
|
|
|
+ // and none on a or b
|
|
|
allocs = mapAllocs(allocateResponse, 6);
|
|
|
Assert.assertEquals(3, allocs.get(NodeId.newInstance("c", 3)).size());
|
|
|
Assert.assertEquals(3, allocs.get(NodeId.newInstance("d", 4)).size());
|
|
|
Assert.assertNull(allocs.get(NodeId.newInstance("a", 1)));
|
|
|
Assert.assertNull(allocs.get(NodeId.newInstance("b", 2)));
|
|
|
|
|
|
- // Ensure the LocalScheduler respects the list order..
|
|
|
+ // Ensure the DistributedScheduler respects the list order..
|
|
|
// The first request should be allocated to "d" since it is ranked higher
|
|
|
// The second request should be allocated to "c" since the ranking is
|
|
|
// flipped on every allocate response.
|
|
@@ -142,7 +141,7 @@ public class TestLocalScheduler {
|
|
|
opportunisticReq =
|
|
|
createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
|
|
|
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
|
|
|
- allocateResponse = localScheduler.allocate(allocateRequest);
|
|
|
+ allocateResponse = distributedScheduler.allocate(allocateRequest);
|
|
|
allocs = mapAllocs(allocateResponse, 1);
|
|
|
Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size());
|
|
|
|
|
@@ -150,7 +149,7 @@ public class TestLocalScheduler {
|
|
|
opportunisticReq =
|
|
|
createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
|
|
|
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
|
|
|
- allocateResponse = localScheduler.allocate(allocateRequest);
|
|
|
+ allocateResponse = distributedScheduler.allocate(allocateRequest);
|
|
|
allocs = mapAllocs(allocateResponse, 1);
|
|
|
Assert.assertEquals(1, allocs.get(NodeId.newInstance("c", 3)).size());
|
|
|
|
|
@@ -158,22 +157,23 @@ public class TestLocalScheduler {
|
|
|
opportunisticReq =
|
|
|
createResourceRequest(ExecutionType.OPPORTUNISTIC, 1, "*");
|
|
|
allocateRequest.setAskList(Arrays.asList(guaranteedReq, opportunisticReq));
|
|
|
- allocateResponse = localScheduler.allocate(allocateRequest);
|
|
|
+ allocateResponse = distributedScheduler.allocate(allocateRequest);
|
|
|
allocs = mapAllocs(allocateResponse, 1);
|
|
|
Assert.assertEquals(1, allocs.get(NodeId.newInstance("d", 4)).size());
|
|
|
}
|
|
|
|
|
|
- private void registerAM(LocalScheduler localScheduler, RequestInterceptor
|
|
|
- finalReqIntcptr, List<NodeId> nodeList) throws Exception {
|
|
|
- DistSchedRegisterResponse distSchedRegisterResponse =
|
|
|
- Records.newRecord(DistSchedRegisterResponse.class);
|
|
|
+ private void registerAM(DistributedScheduler distributedScheduler,
|
|
|
+ RequestInterceptor finalReqIntcptr, List<NodeId> nodeList)
|
|
|
+ throws Exception {
|
|
|
+ RegisterDistributedSchedulingAMResponse distSchedRegisterResponse =
|
|
|
+ Records.newRecord(RegisterDistributedSchedulingAMResponse.class);
|
|
|
distSchedRegisterResponse.setRegisterResponse(
|
|
|
Records.newRecord(RegisterApplicationMasterResponse.class));
|
|
|
distSchedRegisterResponse.setContainerTokenExpiryInterval(12345);
|
|
|
distSchedRegisterResponse.setContainerIdStart(0);
|
|
|
- distSchedRegisterResponse.setMaxAllocatableCapabilty(
|
|
|
+ distSchedRegisterResponse.setMaxContainerResource(
|
|
|
Resource.newInstance(1024, 4));
|
|
|
- distSchedRegisterResponse.setMinAllocatableCapabilty(
|
|
|
+ distSchedRegisterResponse.setMinContainerResource(
|
|
|
Resource.newInstance(512, 2));
|
|
|
distSchedRegisterResponse.setNodesForScheduling(nodeList);
|
|
|
Mockito.when(
|
|
@@ -181,12 +181,12 @@ public class TestLocalScheduler {
|
|
|
Mockito.any(RegisterApplicationMasterRequest.class)))
|
|
|
.thenReturn(distSchedRegisterResponse);
|
|
|
|
|
|
- localScheduler.registerApplicationMaster(
|
|
|
+ distributedScheduler.registerApplicationMaster(
|
|
|
Records.newRecord(RegisterApplicationMasterRequest.class));
|
|
|
}
|
|
|
|
|
|
- private RequestInterceptor setup(Configuration conf, LocalScheduler
|
|
|
- localScheduler) {
|
|
|
+ private RequestInterceptor setup(Configuration conf,
|
|
|
+ DistributedScheduler distributedScheduler) {
|
|
|
NodeStatusUpdater nodeStatusUpdater = Mockito.mock(NodeStatusUpdater.class);
|
|
|
Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l);
|
|
|
Context context = Mockito.mock(Context.class);
|
|
@@ -215,12 +215,12 @@ public class TestLocalScheduler {
|
|
|
NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
|
|
|
new NMTokenSecretManagerInNM();
|
|
|
nmTokenSecretManagerInNM.setMasterKey(mKey);
|
|
|
- localScheduler.initLocal(
|
|
|
+ distributedScheduler.initLocal(
|
|
|
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
|
|
|
containerAllocator, nmTokenSecretManagerInNM, "test");
|
|
|
|
|
|
RequestInterceptor finalReqIntcptr = Mockito.mock(RequestInterceptor.class);
|
|
|
- localScheduler.setNextInterceptor(finalReqIntcptr);
|
|
|
+ distributedScheduler.setNextInterceptor(finalReqIntcptr);
|
|
|
return finalReqIntcptr;
|
|
|
}
|
|
|
|
|
@@ -237,17 +237,18 @@ public class TestLocalScheduler {
|
|
|
return opportunisticReq;
|
|
|
}
|
|
|
|
|
|
- private DistSchedAllocateResponse createAllocateResponse(List<NodeId> nodes) {
|
|
|
- DistSchedAllocateResponse distSchedAllocateResponse = Records.newRecord
|
|
|
- (DistSchedAllocateResponse.class);
|
|
|
- distSchedAllocateResponse.setAllocateResponse(
|
|
|
- Records.newRecord(AllocateResponse.class));
|
|
|
+ private DistributedSchedulingAllocateResponse createAllocateResponse(
|
|
|
+ List<NodeId> nodes) {
|
|
|
+ DistributedSchedulingAllocateResponse distSchedAllocateResponse =
|
|
|
+ Records.newRecord(DistributedSchedulingAllocateResponse.class);
|
|
|
+ distSchedAllocateResponse
|
|
|
+ .setAllocateResponse(Records.newRecord(AllocateResponse.class));
|
|
|
distSchedAllocateResponse.setNodesForScheduling(nodes);
|
|
|
return distSchedAllocateResponse;
|
|
|
}
|
|
|
|
|
|
- private Map<NodeId, List<ContainerId>> mapAllocs(AllocateResponse
|
|
|
- allocateResponse, int expectedSize) throws Exception {
|
|
|
+ private Map<NodeId, List<ContainerId>> mapAllocs(
|
|
|
+ AllocateResponse allocateResponse, int expectedSize) throws Exception {
|
|
|
Assert.assertEquals(expectedSize,
|
|
|
allocateResponse.getAllocatedContainers().size());
|
|
|
Map<NodeId, List<ContainerId>> allocs = new HashMap<>();
|
|
@@ -266,5 +267,4 @@ public class TestLocalScheduler {
|
|
|
}
|
|
|
return allocs;
|
|
|
}
|
|
|
-
|
|
|
}
|