|
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
|
|
import org.junit.After;
|
|
|
import org.junit.Before;
|
|
@@ -90,6 +91,12 @@ public class TestAMRMRPCNodeUpdates {
|
|
|
rm.drainEvents();
|
|
|
}
|
|
|
|
|
|
+ private void syncNodeRecommissioning(MockNM nm) throws Exception {
|
|
|
+ rm.sendNodeEvent(nm, RMNodeEventType.RECOMMISSION);
|
|
|
+ rm.waitForState(nm.getNodeId(), NodeState.RUNNING);
|
|
|
+ rm.drainEvents();
|
|
|
+ }
|
|
|
+
|
|
|
private AllocateResponse allocate(final ApplicationAttemptId attemptId,
|
|
|
final AllocateRequest req) throws Exception {
|
|
|
UserGroupInformation ugi =
|
|
@@ -139,6 +146,53 @@ public class TestAMRMRPCNodeUpdates {
|
|
|
NodeUpdateType.NODE_DECOMMISSIONING, nr.getNodeUpdateType());
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testAMRMRecommissioningNodes() throws Exception {
|
|
|
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10000);
|
|
|
+ MockNM nm2 = rm.registerNode("127.0.0.2:1234", 10000);
|
|
|
+ rm.drainEvents();
|
|
|
+
|
|
|
+ RMApp app1 = rm.submitApp(2000);
|
|
|
+
|
|
|
+ // Trigger the scheduling so the AM gets 'launched' on nm1
|
|
|
+ nm1.nodeHeartbeat(true);
|
|
|
+
|
|
|
+ RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
|
|
+ MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
|
|
+
|
|
|
+ // register AM returns no unusable node
|
|
|
+ am1.registerAppAttempt();
|
|
|
+
|
|
|
+ // DECOMMISSION nm2
|
|
|
+ Integer decommissioningTimeout = 600;
|
|
|
+ syncNodeGracefulDecommission(nm2, decommissioningTimeout);
|
|
|
+
|
|
|
+ AllocateRequest allocateRequest1 =
|
|
|
+ AllocateRequest.newInstance(0, 0F, null, null, null);
|
|
|
+ AllocateResponse response1 =
|
|
|
+ allocate(attempt1.getAppAttemptId(), allocateRequest1);
|
|
|
+ List<NodeReport> updatedNodes = response1.getUpdatedNodes();
|
|
|
+ Assert.assertEquals(1, updatedNodes.size());
|
|
|
+ NodeReport nr = updatedNodes.iterator().next();
|
|
|
+ Assert.assertEquals(
|
|
|
+ decommissioningTimeout, nr.getDecommissioningTimeout());
|
|
|
+ Assert.assertEquals(
|
|
|
+ NodeUpdateType.NODE_DECOMMISSIONING, nr.getNodeUpdateType());
|
|
|
+
|
|
|
+ // Wait for nm2 to RECOMMISSION
|
|
|
+ syncNodeRecommissioning(nm2);
|
|
|
+
|
|
|
+ AllocateRequest allocateRequest2 = AllocateRequest
|
|
|
+ .newInstance(response1.getResponseId(), 0F, null, null, null);
|
|
|
+ AllocateResponse response2 =
|
|
|
+ allocate(attempt1.getAppAttemptId(), allocateRequest2);
|
|
|
+ List<NodeReport> updatedNodes2 = response2.getUpdatedNodes();
|
|
|
+ Assert.assertEquals(1, updatedNodes2.size());
|
|
|
+ NodeReport nr2 = updatedNodes2.iterator().next();
|
|
|
+ Assert.assertEquals(
|
|
|
+ NodeUpdateType.NODE_USABLE, nr2.getNodeUpdateType());
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testAMRMUnusableNodes() throws Exception {
|
|
|
|