|
@@ -48,8 +48,6 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
|
|
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
|
|
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
-import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
-import org.apache.hadoop.yarn.event.DrainDispatcher;
|
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
|
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
|
@@ -125,22 +123,20 @@ public class TestAMRMClientOnRMRestart {
|
|
|
// Phase-1 Start 1st RM
|
|
|
MyResourceManager rm1 = new MyResourceManager(conf, memStore);
|
|
|
rm1.start();
|
|
|
- DrainDispatcher dispatcher =
|
|
|
- (DrainDispatcher) rm1.getRMContext().getDispatcher();
|
|
|
|
|
|
// Submit the application
|
|
|
RMApp app = rm1.submitApp(1024);
|
|
|
- dispatcher.await();
|
|
|
+ rm1.drainEvents();
|
|
|
|
|
|
MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
|
|
|
nm1.registerNode();
|
|
|
nm1.nodeHeartbeat(true); // Node heartbeat
|
|
|
- dispatcher.await();
|
|
|
+ rm1.drainEvents();
|
|
|
|
|
|
ApplicationAttemptId appAttemptId =
|
|
|
app.getCurrentAppAttempt().getAppAttemptId();
|
|
|
rm1.sendAMLaunched(appAttemptId);
|
|
|
- dispatcher.await();
|
|
|
+ rm1.drainEvents();
|
|
|
|
|
|
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
|
|
|
rm1.getRMContext().getRMApps().get(appAttemptId.getApplicationId())
|
|
@@ -175,7 +171,7 @@ public class TestAMRMClientOnRMRestart {
|
|
|
blacklistAdditions.remove("h2");// remove from local list
|
|
|
|
|
|
AllocateResponse allocateResponse = amClient.allocate(0.1f);
|
|
|
- dispatcher.await();
|
|
|
+ rm1.drainEvents();
|
|
|
Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
|
|
|
.getAllocatedContainers().size());
|
|
|
|
|
@@ -188,10 +184,10 @@ public class TestAMRMClientOnRMRestart {
|
|
|
// Step-2 : NM heart beat is sent.
|
|
|
// On 2nd AM allocate request, RM allocates 3 containers to AM
|
|
|
nm1.nodeHeartbeat(true); // Node heartbeat
|
|
|
- dispatcher.await();
|
|
|
+ rm1.drainEvents();
|
|
|
|
|
|
allocateResponse = amClient.allocate(0.2f);
|
|
|
- dispatcher.await();
|
|
|
+ rm1.drainEvents();
|
|
|
// 3 containers are allocated i.e for cRequest1, cRequest2 and cRequest3.
|
|
|
Assert.assertEquals("No of assignments must be 0", 3, allocateResponse
|
|
|
.getAllocatedContainers().size());
|
|
@@ -206,7 +202,7 @@ public class TestAMRMClientOnRMRestart {
|
|
|
amClient.removeContainerRequest(cRequest3);
|
|
|
|
|
|
allocateResponse = amClient.allocate(0.2f);
|
|
|
- dispatcher.await();
|
|
|
+ rm1.drainEvents();
|
|
|
Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
|
|
|
.getAllocatedContainers().size());
|
|
|
assertAsksAndReleases(4, 0, rm1);
|
|
@@ -232,13 +228,13 @@ public class TestAMRMClientOnRMRestart {
|
|
|
// request
|
|
|
nm1.nodeHeartbeat(containerId.getApplicationAttemptId(),
|
|
|
containerId.getContainerId(), ContainerState.RUNNING);
|
|
|
- dispatcher.await();
|
|
|
+ rm1.drainEvents();
|
|
|
amClient.requestContainerResourceChange(
|
|
|
container, Resource.newInstance(2048, 1));
|
|
|
it.remove();
|
|
|
|
|
|
allocateResponse = amClient.allocate(0.3f);
|
|
|
- dispatcher.await();
|
|
|
+ rm1.drainEvents();
|
|
|
Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
|
|
|
.getAllocatedContainers().size());
|
|
|
assertAsksAndReleases(3, pendingRelease, rm1);
|
|
@@ -254,7 +250,6 @@ public class TestAMRMClientOnRMRestart {
|
|
|
rm2.start();
|
|
|
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
|
|
((MyAMRMClientImpl) amClient).updateRMProxy(rm2);
|
|
|
- dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
|
|
|
|
|
|
// NM should be rebooted on heartbeat, even first heartbeat for nm2
|
|
|
NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
|
|
@@ -270,7 +265,7 @@ public class TestAMRMClientOnRMRestart {
|
|
|
Collections.singletonList(
|
|
|
containerId.getApplicationAttemptId().getApplicationId()));
|
|
|
nm1.nodeHeartbeat(true);
|
|
|
- dispatcher.await();
|
|
|
+ rm2.drainEvents();
|
|
|
|
|
|
blacklistAdditions.add("h3");
|
|
|
amClient.updateBlacklist(blacklistAdditions, null);
|
|
@@ -292,7 +287,7 @@ public class TestAMRMClientOnRMRestart {
|
|
|
// containerRequest and blacklisted nodes.
|
|
|
// Intern RM send resync command,AMRMClient resend allocate request
|
|
|
allocateResponse = amClient.allocate(0.3f);
|
|
|
- dispatcher.await();
|
|
|
+ rm2.drainEvents();
|
|
|
|
|
|
completedContainer =
|
|
|
allocateResponse.getCompletedContainersStatuses().size();
|
|
@@ -309,7 +304,7 @@ public class TestAMRMClientOnRMRestart {
|
|
|
|
|
|
// Step-5 : Allocater after resync command
|
|
|
allocateResponse = amClient.allocate(0.5f);
|
|
|
- dispatcher.await();
|
|
|
+ rm2.drainEvents();
|
|
|
Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
|
|
|
.getAllocatedContainers().size());
|
|
|
|
|
@@ -322,10 +317,10 @@ public class TestAMRMClientOnRMRestart {
|
|
|
int count = 5;
|
|
|
while (count-- > 0) {
|
|
|
nm1.nodeHeartbeat(true);
|
|
|
- dispatcher.await();
|
|
|
+ rm2.drainEvents();
|
|
|
|
|
|
allocateResponse = amClient.allocate(0.5f);
|
|
|
- dispatcher.await();
|
|
|
+ rm2.drainEvents();
|
|
|
noAssignedContainer += allocateResponse.getAllocatedContainers().size();
|
|
|
if (noAssignedContainer == 3) {
|
|
|
break;
|
|
@@ -354,22 +349,20 @@ public class TestAMRMClientOnRMRestart {
|
|
|
// Phase-1 Start 1st RM
|
|
|
MyResourceManager rm1 = new MyResourceManager(conf, memStore);
|
|
|
rm1.start();
|
|
|
- DrainDispatcher dispatcher =
|
|
|
- (DrainDispatcher) rm1.getRMContext().getDispatcher();
|
|
|
|
|
|
// Submit the application
|
|
|
RMApp app = rm1.submitApp(1024);
|
|
|
- dispatcher.await();
|
|
|
+ rm1.drainEvents();
|
|
|
|
|
|
MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
|
|
|
nm1.registerNode();
|
|
|
nm1.nodeHeartbeat(true); // Node heartbeat
|
|
|
- dispatcher.await();
|
|
|
+ rm1.drainEvents();
|
|
|
|
|
|
ApplicationAttemptId appAttemptId =
|
|
|
app.getCurrentAppAttempt().getAppAttemptId();
|
|
|
rm1.sendAMLaunched(appAttemptId);
|
|
|
- dispatcher.await();
|
|
|
+ rm1.drainEvents();
|
|
|
|
|
|
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
|
|
|
rm1.getRMContext().getRMApps().get(appAttemptId.getApplicationId())
|
|
@@ -389,7 +382,6 @@ public class TestAMRMClientOnRMRestart {
|
|
|
rm2.start();
|
|
|
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
|
|
((MyAMRMClientImpl) amClient).updateRMProxy(rm2);
|
|
|
- dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
|
|
|
|
|
|
// NM should be rebooted on heartbeat, even first heartbeat for nm2
|
|
|
NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
|
|
@@ -405,7 +397,7 @@ public class TestAMRMClientOnRMRestart {
|
|
|
Priority.newInstance(0), 0);
|
|
|
nm1.registerNode(Arrays.asList(containerReport), null);
|
|
|
nm1.nodeHeartbeat(true);
|
|
|
- dispatcher.await();
|
|
|
+ rm2.drainEvents();
|
|
|
|
|
|
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
|
|
|
null, null);
|
|
@@ -417,7 +409,6 @@ public class TestAMRMClientOnRMRestart {
|
|
|
amClient.stop();
|
|
|
rm1.stop();
|
|
|
rm2.stop();
|
|
|
-
|
|
|
}
|
|
|
|
|
|
|
|
@@ -435,22 +426,20 @@ public class TestAMRMClientOnRMRestart {
|
|
|
// start first RM
|
|
|
MyResourceManager2 rm1 = new MyResourceManager2(conf, memStore);
|
|
|
rm1.start();
|
|
|
- DrainDispatcher dispatcher =
|
|
|
- (DrainDispatcher) rm1.getRMContext().getDispatcher();
|
|
|
Long startTime = System.currentTimeMillis();
|
|
|
// Submit the application
|
|
|
RMApp app = rm1.submitApp(1024);
|
|
|
- dispatcher.await();
|
|
|
+ rm1.drainEvents();
|
|
|
|
|
|
MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
|
|
|
nm1.registerNode();
|
|
|
nm1.nodeHeartbeat(true); // Node heartbeat
|
|
|
- dispatcher.await();
|
|
|
+ rm1.drainEvents();
|
|
|
|
|
|
ApplicationAttemptId appAttemptId =
|
|
|
app.getCurrentAppAttempt().getAppAttemptId();
|
|
|
rm1.sendAMLaunched(appAttemptId);
|
|
|
- dispatcher.await();
|
|
|
+ rm1.drainEvents();
|
|
|
|
|
|
AMRMTokenSecretManager amrmTokenSecretManagerForRM1 =
|
|
|
rm1.getRMContext().getAMRMTokenSecretManager();
|
|
@@ -509,7 +498,6 @@ public class TestAMRMClientOnRMRestart {
|
|
|
rm2.start();
|
|
|
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
|
|
((MyAMRMClientImpl) amClient).updateRMProxy(rm2);
|
|
|
- dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
|
|
|
|
|
|
AMRMTokenSecretManager amrmTokenSecretManagerForRM2 =
|
|
|
rm2.getRMContext().getAMRMTokenSecretManager();
|
|
@@ -611,11 +599,6 @@ public class TestAMRMClientOnRMRestart {
|
|
|
MyResourceManager.setClusterTimeStamp(fakeClusterTimeStamp);
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- protected Dispatcher createDispatcher() {
|
|
|
- return new DrainDispatcher();
|
|
|
- }
|
|
|
-
|
|
|
@Override
|
|
|
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
|
|
|
// Dispatch inline for test sanity
|