|
@@ -32,6 +32,8 @@ import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.NodeHealthCheckerService;
|
|
|
import org.apache.hadoop.fs.FileContext;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
|
+import org.apache.hadoop.yarn.YarnException;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
@@ -52,9 +54,12 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp
|
|
|
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
|
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
|
|
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
|
|
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
|
|
|
+import org.apache.hadoop.yarn.service.Service;
|
|
|
import org.apache.hadoop.yarn.service.Service.STATE;
|
|
|
import org.junit.After;
|
|
|
import org.junit.Assert;
|
|
@@ -63,24 +68,38 @@ import org.junit.Test;
|
|
|
|
|
|
public class TestNodeStatusUpdater {
|
|
|
|
|
|
+ // temp fix until metrics system can auto-detect itself running in unit test:
|
|
|
+ static {
|
|
|
+ DefaultMetricsSystem.setMiniClusterMode(true);
|
|
|
+ }
|
|
|
+
|
|
|
static final Log LOG = LogFactory.getLog(TestNodeStatusUpdater.class);
|
|
|
static final Path basedir =
|
|
|
new Path("target", TestNodeStatusUpdater.class.getName());
|
|
|
- private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
|
|
+ private static final RecordFactory recordFactory = RecordFactoryProvider
|
|
|
+ .getRecordFactory(null);
|
|
|
|
|
|
int heartBeatID = 0;
|
|
|
volatile Error nmStartError = null;
|
|
|
+ private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
|
|
|
+
|
|
|
+ @After
|
|
|
+ public void tearDown() {
|
|
|
+ this.registeredNodes.clear();
|
|
|
+ DefaultMetricsSystem.shutdown();
|
|
|
+ }
|
|
|
|
|
|
private class MyResourceTracker implements ResourceTracker {
|
|
|
|
|
|
- private Context context;
|
|
|
+ private final Context context;
|
|
|
|
|
|
public MyResourceTracker(Context context) {
|
|
|
this.context = context;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerRequest request) throws YarnRemoteException {
|
|
|
+ public RegisterNodeManagerResponse registerNodeManager(
|
|
|
+ RegisterNodeManagerRequest request) throws YarnRemoteException {
|
|
|
NodeId nodeId = request.getNodeId();
|
|
|
Resource resource = request.getResource();
|
|
|
LOG.info("Registering " + nodeId.toString());
|
|
@@ -91,17 +110,24 @@ public class TestNodeStatusUpdater {
|
|
|
Assert.fail(e.getMessage());
|
|
|
}
|
|
|
Assert.assertEquals(5 * 1024, resource.getMemory());
|
|
|
- RegistrationResponse regResponse = recordFactory.newRecordInstance(RegistrationResponse.class);
|
|
|
-
|
|
|
- RegisterNodeManagerResponse response = recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
|
|
|
+ registeredNodes.add(nodeId);
|
|
|
+ RegistrationResponse regResponse = recordFactory
|
|
|
+ .newRecordInstance(RegistrationResponse.class);
|
|
|
+
|
|
|
+ RegisterNodeManagerResponse response = recordFactory
|
|
|
+ .newRecordInstance(RegisterNodeManagerResponse.class);
|
|
|
response.setRegistrationResponse(regResponse);
|
|
|
return response;
|
|
|
}
|
|
|
|
|
|
- ApplicationId applicationID = recordFactory.newRecordInstance(ApplicationId.class);
|
|
|
- ApplicationAttemptId appAttemptID = recordFactory.newRecordInstance(ApplicationAttemptId.class);
|
|
|
- ContainerId firstContainerID = recordFactory.newRecordInstance(ContainerId.class);
|
|
|
- ContainerId secondContainerID = recordFactory.newRecordInstance(ContainerId.class);
|
|
|
+ ApplicationId applicationID = recordFactory
|
|
|
+ .newRecordInstance(ApplicationId.class);
|
|
|
+ ApplicationAttemptId appAttemptID = recordFactory
|
|
|
+ .newRecordInstance(ApplicationAttemptId.class);
|
|
|
+ ContainerId firstContainerID = recordFactory
|
|
|
+ .newRecordInstance(ContainerId.class);
|
|
|
+ ContainerId secondContainerID = recordFactory
|
|
|
+ .newRecordInstance(ContainerId.class);
|
|
|
|
|
|
private Map<ApplicationId, List<ContainerStatus>> getAppToContainerStatusMap(
|
|
|
List<ContainerStatus> containers) {
|
|
@@ -118,8 +144,10 @@ public class TestNodeStatusUpdater {
|
|
|
}
|
|
|
return map;
|
|
|
}
|
|
|
+
|
|
|
@Override
|
|
|
- public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnRemoteException {
|
|
|
+ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
|
|
|
+ throws YarnRemoteException {
|
|
|
NodeStatus nodeStatus = request.getNodeStatus();
|
|
|
LOG.info("Got heartbeat number " + heartBeatID);
|
|
|
nodeStatus.setResponseId(heartBeatID++);
|
|
@@ -134,7 +162,8 @@ public class TestNodeStatusUpdater {
|
|
|
firstContainerID.setAppId(applicationID);
|
|
|
firstContainerID.setAppAttemptId(appAttemptID);
|
|
|
firstContainerID.setId(heartBeatID);
|
|
|
- ContainerLaunchContext launchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
|
|
+ ContainerLaunchContext launchContext = recordFactory
|
|
|
+ .newRecordInstance(ContainerLaunchContext.class);
|
|
|
launchContext.setContainerId(firstContainerID);
|
|
|
launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
|
|
|
launchContext.getResource().setMemory(2);
|
|
@@ -158,7 +187,8 @@ public class TestNodeStatusUpdater {
|
|
|
secondContainerID.setAppId(applicationID);
|
|
|
secondContainerID.setAppAttemptId(appAttemptID);
|
|
|
secondContainerID.setId(heartBeatID);
|
|
|
- ContainerLaunchContext launchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
|
|
+ ContainerLaunchContext launchContext = recordFactory
|
|
|
+ .newRecordInstance(ContainerLaunchContext.class);
|
|
|
launchContext.setContainerId(secondContainerID);
|
|
|
launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
|
|
|
launchContext.getResource().setMemory(3);
|
|
@@ -176,10 +206,12 @@ public class TestNodeStatusUpdater {
|
|
|
this.context.getContainers();
|
|
|
Assert.assertEquals(2, activeContainers.size());
|
|
|
}
|
|
|
- HeartbeatResponse response = recordFactory.newRecordInstance(HeartbeatResponse.class);
|
|
|
+ HeartbeatResponse response = recordFactory
|
|
|
+ .newRecordInstance(HeartbeatResponse.class);
|
|
|
response.setResponseId(heartBeatID);
|
|
|
|
|
|
- NodeHeartbeatResponse nhResponse = recordFactory.newRecordInstance(NodeHeartbeatResponse.class);
|
|
|
+ NodeHeartbeatResponse nhResponse = recordFactory
|
|
|
+ .newRecordInstance(NodeHeartbeatResponse.class);
|
|
|
nhResponse.setHeartbeatResponse(response);
|
|
|
return nhResponse;
|
|
|
}
|
|
@@ -189,8 +221,10 @@ public class TestNodeStatusUpdater {
|
|
|
private Context context;
|
|
|
|
|
|
public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
|
|
|
- NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
|
|
|
- super(context, dispatcher, healthChecker, metrics);
|
|
|
+ NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
|
|
|
+ ContainerTokenSecretManager containerTokenSecretManager) {
|
|
|
+ super(context, dispatcher, healthChecker, metrics,
|
|
|
+ containerTokenSecretManager);
|
|
|
this.context = context;
|
|
|
}
|
|
|
|
|
@@ -216,21 +250,23 @@ public class TestNodeStatusUpdater {
|
|
|
final NodeManager nm = new NodeManager() {
|
|
|
@Override
|
|
|
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
|
|
- Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
|
|
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
|
|
|
+ ContainerTokenSecretManager containerTokenSecretManager) {
|
|
|
return new MyNodeStatusUpdater(context, dispatcher, healthChecker,
|
|
|
- metrics);
|
|
|
+ metrics, containerTokenSecretManager);
|
|
|
}
|
|
|
};
|
|
|
|
|
|
- YarnConfiguration conf = new YarnConfiguration();
|
|
|
- conf.setInt(YarnConfiguration.NM_VMEM_GB, 5); // 5GB
|
|
|
- conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:12345");
|
|
|
- conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346");
|
|
|
- conf.set(YarnConfiguration.NM_LOG_DIRS, new Path(basedir, "logs").toUri().getPath());
|
|
|
- conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, new Path(basedir, "remotelogs")
|
|
|
- .toUri().getPath());
|
|
|
- conf.set(YarnConfiguration.NM_LOCAL_DIRS, new Path(basedir, "nm0").toUri().getPath());
|
|
|
+ YarnConfiguration conf = createNMConfig();
|
|
|
nm.init(conf);
|
|
|
+
|
|
|
+ // verify that the last service is the nodeStatusUpdater (ie registration
|
|
|
+ // with RM)
|
|
|
+ Object[] services = nm.getServices().toArray();
|
|
|
+ Object lastService = services[services.length-1];
|
|
|
+ Assert.assertTrue("last service is NOT the node status updater",
|
|
|
+ lastService instanceof NodeStatusUpdater);
|
|
|
+
|
|
|
new Thread() {
|
|
|
public void run() {
|
|
|
try {
|
|
@@ -260,7 +296,75 @@ public class TestNodeStatusUpdater {
|
|
|
while (heartBeatID <= 3) {
|
|
|
Thread.sleep(500);
|
|
|
}
|
|
|
+ Assert.assertEquals("Number of registered NMs is wrong!!", 1,
|
|
|
+ this.registeredNodes.size());
|
|
|
|
|
|
nm.stop();
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Verifies that if for some reason NM fails to start ContainerManager RPC
|
|
|
+ * server, RM is oblivious to NM's presence. The behaviour is like this
|
|
|
+ * because otherwise, NM will report to RM even if all its servers are not
|
|
|
+ * started properly, RM will think that the NM is alive and will retire the NM
|
|
|
+ * only after NM_EXPIRY interval. See MAPREDUCE-2749.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testNoRegistrationWhenNMServicesFail() {
|
|
|
+
|
|
|
+ final NodeManager nm = new NodeManager() {
|
|
|
+ @Override
|
|
|
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
|
|
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
|
|
|
+ ContainerTokenSecretManager containerTokenSecretManager) {
|
|
|
+ return new MyNodeStatusUpdater(context, dispatcher, healthChecker,
|
|
|
+ metrics, containerTokenSecretManager);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected ContainerManagerImpl createContainerManager(Context context,
|
|
|
+ ContainerExecutor exec, DeletionService del,
|
|
|
+ NodeStatusUpdater nodeStatusUpdater,
|
|
|
+ ContainerTokenSecretManager containerTokenSecretManager) {
|
|
|
+ return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
|
|
|
+ metrics, containerTokenSecretManager) {
|
|
|
+ @Override
|
|
|
+ public void start() {
|
|
|
+ // Simulating failure of starting RPC server
|
|
|
+ throw new YarnException("Starting of RPC Server failed");
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ YarnConfiguration conf = createNMConfig();
|
|
|
+ nm.init(conf);
|
|
|
+ try {
|
|
|
+ nm.start();
|
|
|
+ Assert.fail("NM should have failed to start. Didn't get exception!!");
|
|
|
+ } catch (Exception e) {
|
|
|
+ Assert.assertEquals("Starting of RPC Server failed", e.getCause()
|
|
|
+ .getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ Assert.assertEquals("NM state is wrong!", Service.STATE.INITED, nm
|
|
|
+ .getServiceState());
|
|
|
+
|
|
|
+ Assert.assertEquals("Number of registered nodes is wrong!", 0,
|
|
|
+ this.registeredNodes.size());
|
|
|
+ }
|
|
|
+
|
|
|
+ private YarnConfiguration createNMConfig() {
|
|
|
+ YarnConfiguration conf = new YarnConfiguration();
|
|
|
+ conf.setInt(YarnConfiguration.NM_VMEM_GB, 5); // 5GB
|
|
|
+ conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:12345");
|
|
|
+ conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346");
|
|
|
+ conf.set(YarnConfiguration.NM_LOG_DIRS, new Path(basedir, "logs").toUri()
|
|
|
+ .getPath());
|
|
|
+ conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, new Path(basedir,
|
|
|
+ "remotelogs").toUri().getPath());
|
|
|
+ conf.set(YarnConfiguration.NM_LOCAL_DIRS, new Path(basedir, "nm0")
|
|
|
+ .toUri().getPath());
|
|
|
+ return conf;
|
|
|
+ }
|
|
|
}
|