|
@@ -21,7 +21,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
|
|
import java.security.PrivilegedExceptionAction;
|
|
import java.security.PrivilegedExceptionAction;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
|
|
|
|
+import org.junit.After;
|
|
import org.junit.Assert;
|
|
import org.junit.Assert;
|
|
|
|
+import org.junit.Before;
|
|
|
|
+import org.junit.Test;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
@@ -31,6 +34,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
|
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
|
@@ -39,9 +43,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
|
-import org.junit.After;
|
|
|
|
-import org.junit.Before;
|
|
|
|
-import org.junit.Test;
|
|
|
|
|
|
|
|
public class TestAMRMRPCNodeUpdates {
|
|
public class TestAMRMRPCNodeUpdates {
|
|
private MockRM rm;
|
|
private MockRM rm;
|
|
@@ -53,8 +54,8 @@ public class TestAMRMRPCNodeUpdates {
|
|
@Override
|
|
@Override
|
|
public void init(Configuration conf) {
|
|
public void init(Configuration conf) {
|
|
conf.set(
|
|
conf.set(
|
|
- CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
|
|
|
|
- "1.0");
|
|
|
|
|
|
+ CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
|
|
|
|
+ "1.0");
|
|
super.init(conf);
|
|
super.init(conf);
|
|
}
|
|
}
|
|
};
|
|
};
|
|
@@ -62,19 +63,19 @@ public class TestAMRMRPCNodeUpdates {
|
|
rm.start();
|
|
rm.start();
|
|
amService = rm.getApplicationMasterService();
|
|
amService = rm.getApplicationMasterService();
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
@After
|
|
@After
|
|
public void tearDown() {
|
|
public void tearDown() {
|
|
if (rm != null) {
|
|
if (rm != null) {
|
|
this.rm.stop();
|
|
this.rm.stop();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
private void syncNodeHeartbeat(MockNM nm, boolean health) throws Exception {
|
|
private void syncNodeHeartbeat(MockNM nm, boolean health) throws Exception {
|
|
nm.nodeHeartbeat(health);
|
|
nm.nodeHeartbeat(health);
|
|
rm.drainEvents();
|
|
rm.drainEvents();
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
private void syncNodeLost(MockNM nm) throws Exception {
|
|
private void syncNodeLost(MockNM nm) throws Exception {
|
|
rm.sendNodeStarted(nm);
|
|
rm.sendNodeStarted(nm);
|
|
rm.waitForState(nm.getNodeId(), NodeState.RUNNING);
|
|
rm.waitForState(nm.getNodeId(), NodeState.RUNNING);
|
|
@@ -82,13 +83,20 @@ public class TestAMRMRPCNodeUpdates {
|
|
rm.drainEvents();
|
|
rm.drainEvents();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private void syncNodeGracefulDecommission(
|
|
|
|
+ MockNM nm, int timeout) throws Exception {
|
|
|
|
+ rm.sendNodeGracefulDecommission(nm, timeout);
|
|
|
|
+ rm.waitForState(nm.getNodeId(), NodeState.DECOMMISSIONING);
|
|
|
|
+ rm.drainEvents();
|
|
|
|
+ }
|
|
|
|
+
|
|
private AllocateResponse allocate(final ApplicationAttemptId attemptId,
|
|
private AllocateResponse allocate(final ApplicationAttemptId attemptId,
|
|
final AllocateRequest req) throws Exception {
|
|
final AllocateRequest req) throws Exception {
|
|
UserGroupInformation ugi =
|
|
UserGroupInformation ugi =
|
|
UserGroupInformation.createRemoteUser(attemptId.toString());
|
|
UserGroupInformation.createRemoteUser(attemptId.toString());
|
|
Token<AMRMTokenIdentifier> token =
|
|
Token<AMRMTokenIdentifier> token =
|
|
rm.getRMContext().getRMApps().get(attemptId.getApplicationId())
|
|
rm.getRMContext().getRMApps().get(attemptId.getApplicationId())
|
|
- .getRMAppAttempt(attemptId).getAMRMToken();
|
|
|
|
|
|
+ .getRMAppAttempt(attemptId).getAMRMToken();
|
|
ugi.addTokenIdentifier(token.decodeIdentifier());
|
|
ugi.addTokenIdentifier(token.decodeIdentifier());
|
|
return ugi.doAs(new PrivilegedExceptionAction<AllocateResponse>() {
|
|
return ugi.doAs(new PrivilegedExceptionAction<AllocateResponse>() {
|
|
@Override
|
|
@Override
|
|
@@ -98,9 +106,42 @@ public class TestAMRMRPCNodeUpdates {
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Test
|
|
|
|
+ public void testAMRMDecommissioningNodes() throws Exception {
|
|
|
|
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10000);
|
|
|
|
+ MockNM nm2 = rm.registerNode("127.0.0.2:1234", 10000);
|
|
|
|
+ rm.drainEvents();
|
|
|
|
+
|
|
|
|
+ RMApp app1 = rm.submitApp(2000);
|
|
|
|
+
|
|
|
|
+ // Trigger the scheduling so the AM gets 'launched' on nm1
|
|
|
|
+ nm1.nodeHeartbeat(true);
|
|
|
|
+
|
|
|
|
+ RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
|
|
|
+ MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
|
|
|
+
|
|
|
|
+ // register AM returns no unusable node
|
|
|
|
+ am1.registerAppAttempt();
|
|
|
|
+
|
|
|
|
+ Integer decommissioningTimeout = 600;
|
|
|
|
+ syncNodeGracefulDecommission(nm2, decommissioningTimeout);
|
|
|
|
+
|
|
|
|
+ AllocateRequest allocateRequest1 =
|
|
|
|
+ AllocateRequest.newInstance(0, 0F, null, null, null);
|
|
|
|
+ AllocateResponse response1 =
|
|
|
|
+ allocate(attempt1.getAppAttemptId(), allocateRequest1);
|
|
|
|
+ List<NodeReport> updatedNodes = response1.getUpdatedNodes();
|
|
|
|
+ Assert.assertEquals(1, updatedNodes.size());
|
|
|
|
+ NodeReport nr = updatedNodes.iterator().next();
|
|
|
|
+ Assert.assertEquals(
|
|
|
|
+ decommissioningTimeout, nr.getDecommissioningTimeout());
|
|
|
|
+ Assert.assertEquals(
|
|
|
|
+ NodeUpdateType.NODE_DECOMMISSIONING, nr.getNodeUpdateType());
|
|
|
|
+ }
|
|
|
|
+
|
|
@Test
|
|
@Test
|
|
public void testAMRMUnusableNodes() throws Exception {
|
|
public void testAMRMUnusableNodes() throws Exception {
|
|
-
|
|
|
|
|
|
+
|
|
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10000);
|
|
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10000);
|
|
MockNM nm2 = rm.registerNode("127.0.0.2:1234", 10000);
|
|
MockNM nm2 = rm.registerNode("127.0.0.2:1234", 10000);
|
|
MockNM nm3 = rm.registerNode("127.0.0.3:1234", 10000);
|
|
MockNM nm3 = rm.registerNode("127.0.0.3:1234", 10000);
|
|
@@ -114,7 +155,7 @@ public class TestAMRMRPCNodeUpdates {
|
|
|
|
|
|
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
|
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
|
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
|
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
|
-
|
|
|
|
|
|
+
|
|
// register AM returns no unusable node
|
|
// register AM returns no unusable node
|
|
am1.registerAppAttempt();
|
|
am1.registerAppAttempt();
|
|
|
|
|
|
@@ -127,18 +168,20 @@ public class TestAMRMRPCNodeUpdates {
|
|
Assert.assertEquals(0, updatedNodes.size());
|
|
Assert.assertEquals(0, updatedNodes.size());
|
|
|
|
|
|
syncNodeHeartbeat(nm4, false);
|
|
syncNodeHeartbeat(nm4, false);
|
|
-
|
|
|
|
|
|
+
|
|
// allocate request returns updated node
|
|
// allocate request returns updated node
|
|
allocateRequest1 =
|
|
allocateRequest1 =
|
|
AllocateRequest.newInstance(response1.getResponseId(), 0F, null, null,
|
|
AllocateRequest.newInstance(response1.getResponseId(), 0F, null, null,
|
|
- null);
|
|
|
|
|
|
+ null);
|
|
response1 = allocate(attempt1.getAppAttemptId(), allocateRequest1);
|
|
response1 = allocate(attempt1.getAppAttemptId(), allocateRequest1);
|
|
updatedNodes = response1.getUpdatedNodes();
|
|
updatedNodes = response1.getUpdatedNodes();
|
|
Assert.assertEquals(1, updatedNodes.size());
|
|
Assert.assertEquals(1, updatedNodes.size());
|
|
NodeReport nr = updatedNodes.iterator().next();
|
|
NodeReport nr = updatedNodes.iterator().next();
|
|
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
|
|
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
|
|
Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState());
|
|
Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState());
|
|
-
|
|
|
|
|
|
+ Assert.assertNull(nr.getDecommissioningTimeout());
|
|
|
|
+ Assert.assertEquals(NodeUpdateType.NODE_UNUSABLE, nr.getNodeUpdateType());
|
|
|
|
+
|
|
// resending the allocate request returns the same result
|
|
// resending the allocate request returns the same result
|
|
response1 = allocate(attempt1.getAppAttemptId(), allocateRequest1);
|
|
response1 = allocate(attempt1.getAppAttemptId(), allocateRequest1);
|
|
updatedNodes = response1.getUpdatedNodes();
|
|
updatedNodes = response1.getUpdatedNodes();
|
|
@@ -146,30 +189,34 @@ public class TestAMRMRPCNodeUpdates {
|
|
nr = updatedNodes.iterator().next();
|
|
nr = updatedNodes.iterator().next();
|
|
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
|
|
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
|
|
Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState());
|
|
Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState());
|
|
|
|
+ Assert.assertNull(nr.getDecommissioningTimeout());
|
|
|
|
+ Assert.assertEquals(NodeUpdateType.NODE_UNUSABLE, nr.getNodeUpdateType());
|
|
|
|
|
|
syncNodeLost(nm3);
|
|
syncNodeLost(nm3);
|
|
-
|
|
|
|
|
|
+
|
|
// subsequent allocate request returns delta
|
|
// subsequent allocate request returns delta
|
|
allocateRequest1 =
|
|
allocateRequest1 =
|
|
AllocateRequest.newInstance(response1.getResponseId(), 0F, null, null,
|
|
AllocateRequest.newInstance(response1.getResponseId(), 0F, null, null,
|
|
- null);
|
|
|
|
|
|
+ null);
|
|
response1 = allocate(attempt1.getAppAttemptId(), allocateRequest1);
|
|
response1 = allocate(attempt1.getAppAttemptId(), allocateRequest1);
|
|
updatedNodes = response1.getUpdatedNodes();
|
|
updatedNodes = response1.getUpdatedNodes();
|
|
Assert.assertEquals(1, updatedNodes.size());
|
|
Assert.assertEquals(1, updatedNodes.size());
|
|
nr = updatedNodes.iterator().next();
|
|
nr = updatedNodes.iterator().next();
|
|
Assert.assertEquals(nm3.getNodeId(), nr.getNodeId());
|
|
Assert.assertEquals(nm3.getNodeId(), nr.getNodeId());
|
|
Assert.assertEquals(NodeState.LOST, nr.getNodeState());
|
|
Assert.assertEquals(NodeState.LOST, nr.getNodeState());
|
|
-
|
|
|
|
|
|
+ Assert.assertNull(nr.getDecommissioningTimeout());
|
|
|
|
+ Assert.assertEquals(NodeUpdateType.NODE_UNUSABLE, nr.getNodeUpdateType());
|
|
|
|
+
|
|
// registering another AM gives it the complete failed list
|
|
// registering another AM gives it the complete failed list
|
|
RMApp app2 = rm.submitApp(2000);
|
|
RMApp app2 = rm.submitApp(2000);
|
|
// Trigger nm2 heartbeat so that AM gets launched on it
|
|
// Trigger nm2 heartbeat so that AM gets launched on it
|
|
nm2.nodeHeartbeat(true);
|
|
nm2.nodeHeartbeat(true);
|
|
RMAppAttempt attempt2 = app2.getCurrentAppAttempt();
|
|
RMAppAttempt attempt2 = app2.getCurrentAppAttempt();
|
|
MockAM am2 = rm.sendAMLaunched(attempt2.getAppAttemptId());
|
|
MockAM am2 = rm.sendAMLaunched(attempt2.getAppAttemptId());
|
|
-
|
|
|
|
|
|
+
|
|
// register AM returns all unusable nodes
|
|
// register AM returns all unusable nodes
|
|
am2.registerAppAttempt();
|
|
am2.registerAppAttempt();
|
|
-
|
|
|
|
|
|
+
|
|
// allocate request returns no updated node
|
|
// allocate request returns no updated node
|
|
AllocateRequest allocateRequest2 =
|
|
AllocateRequest allocateRequest2 =
|
|
AllocateRequest.newInstance(0, 0F, null, null, null);
|
|
AllocateRequest.newInstance(0, 0F, null, null, null);
|
|
@@ -177,39 +224,43 @@ public class TestAMRMRPCNodeUpdates {
|
|
allocate(attempt2.getAppAttemptId(), allocateRequest2);
|
|
allocate(attempt2.getAppAttemptId(), allocateRequest2);
|
|
updatedNodes = response2.getUpdatedNodes();
|
|
updatedNodes = response2.getUpdatedNodes();
|
|
Assert.assertEquals(0, updatedNodes.size());
|
|
Assert.assertEquals(0, updatedNodes.size());
|
|
-
|
|
|
|
|
|
+
|
|
syncNodeHeartbeat(nm4, true);
|
|
syncNodeHeartbeat(nm4, true);
|
|
-
|
|
|
|
|
|
+
|
|
// both AM's should get delta updated nodes
|
|
// both AM's should get delta updated nodes
|
|
allocateRequest1 =
|
|
allocateRequest1 =
|
|
AllocateRequest.newInstance(response1.getResponseId(), 0F, null, null,
|
|
AllocateRequest.newInstance(response1.getResponseId(), 0F, null, null,
|
|
- null);
|
|
|
|
|
|
+ null);
|
|
response1 = allocate(attempt1.getAppAttemptId(), allocateRequest1);
|
|
response1 = allocate(attempt1.getAppAttemptId(), allocateRequest1);
|
|
updatedNodes = response1.getUpdatedNodes();
|
|
updatedNodes = response1.getUpdatedNodes();
|
|
Assert.assertEquals(1, updatedNodes.size());
|
|
Assert.assertEquals(1, updatedNodes.size());
|
|
nr = updatedNodes.iterator().next();
|
|
nr = updatedNodes.iterator().next();
|
|
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
|
|
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
|
|
Assert.assertEquals(NodeState.RUNNING, nr.getNodeState());
|
|
Assert.assertEquals(NodeState.RUNNING, nr.getNodeState());
|
|
-
|
|
|
|
|
|
+ Assert.assertNull(nr.getDecommissioningTimeout());
|
|
|
|
+ Assert.assertEquals(NodeUpdateType.NODE_USABLE, nr.getNodeUpdateType());
|
|
|
|
+
|
|
allocateRequest2 =
|
|
allocateRequest2 =
|
|
AllocateRequest.newInstance(response2.getResponseId(), 0F, null, null,
|
|
AllocateRequest.newInstance(response2.getResponseId(), 0F, null, null,
|
|
- null);
|
|
|
|
|
|
+ null);
|
|
response2 = allocate(attempt2.getAppAttemptId(), allocateRequest2);
|
|
response2 = allocate(attempt2.getAppAttemptId(), allocateRequest2);
|
|
updatedNodes = response2.getUpdatedNodes();
|
|
updatedNodes = response2.getUpdatedNodes();
|
|
Assert.assertEquals(1, updatedNodes.size());
|
|
Assert.assertEquals(1, updatedNodes.size());
|
|
nr = updatedNodes.iterator().next();
|
|
nr = updatedNodes.iterator().next();
|
|
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
|
|
Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
|
|
Assert.assertEquals(NodeState.RUNNING, nr.getNodeState());
|
|
Assert.assertEquals(NodeState.RUNNING, nr.getNodeState());
|
|
|
|
+ Assert.assertNull(nr.getDecommissioningTimeout());
|
|
|
|
+ Assert.assertEquals(NodeUpdateType.NODE_USABLE, nr.getNodeUpdateType());
|
|
|
|
|
|
// subsequent allocate calls should return no updated nodes
|
|
// subsequent allocate calls should return no updated nodes
|
|
allocateRequest2 =
|
|
allocateRequest2 =
|
|
AllocateRequest.newInstance(response2.getResponseId(), 0F, null, null,
|
|
AllocateRequest.newInstance(response2.getResponseId(), 0F, null, null,
|
|
- null);
|
|
|
|
|
|
+ null);
|
|
response2 = allocate(attempt2.getAppAttemptId(), allocateRequest2);
|
|
response2 = allocate(attempt2.getAppAttemptId(), allocateRequest2);
|
|
updatedNodes = response2.getUpdatedNodes();
|
|
updatedNodes = response2.getUpdatedNodes();
|
|
Assert.assertEquals(0, updatedNodes.size());
|
|
Assert.assertEquals(0, updatedNodes.size());
|
|
-
|
|
|
|
|
|
+
|
|
// how to do the above for LOST node
|
|
// how to do the above for LOST node
|
|
-
|
|
|
|
|
|
+
|
|
}
|
|
}
|
|
-}
|
|
|
|
|
|
+}
|