|
@@ -18,175 +18,177 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
|
|
|
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
import junit.framework.Assert;
|
|
|
-import junit.framework.TestCase;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.yarn.api.records.Container;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
+import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
|
|
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
|
|
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
|
|
|
+import org.apache.hadoop.yarn.util.Records;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
public class TestNMExpiry {
|
|
|
-// private static final Log LOG = LogFactory.getLog(TestNMExpiry.class);
|
|
|
-// private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
|
|
-//
|
|
|
-// ResourceTrackerService resourceTrackerService;
|
|
|
-// ContainerTokenSecretManager containerTokenSecretManager =
|
|
|
-// new ContainerTokenSecretManager();
|
|
|
-// AtomicInteger test = new AtomicInteger();
|
|
|
-// AtomicInteger notify = new AtomicInteger();
|
|
|
-//
|
|
|
-// private static class VoidResourceListener implements ResourceListener {
|
|
|
-//
|
|
|
-// @Override
|
|
|
-// public void removeNode(RMNode node) {
|
|
|
-// }
|
|
|
-// @Override
|
|
|
-// public void nodeUpdate(RMNode nodeInfo,
|
|
|
-// Map<String, List<Container>> containers) {
|
|
|
-//
|
|
|
-// }
|
|
|
-// @Override
|
|
|
-// public void addNode(RMNode nodeInfo) {
|
|
|
-//
|
|
|
-// }
|
|
|
-// }
|
|
|
-//
|
|
|
-// private class TestNmLivelinessMonitor extends NMLivelinessMonitor {
|
|
|
-// public TestNmLivelinessMonitor(RMContext context) {
|
|
|
-// super(context);
|
|
|
-// }
|
|
|
-//
|
|
|
-// @Override
|
|
|
-// protected void expireNodes(List<NodeId> ids) {
|
|
|
-// for (NodeId id: ids) {
|
|
|
-// LOG.info("Expired " + id);
|
|
|
-// if (test.addAndGet(1) == 2) {
|
|
|
-// try {
|
|
|
-// /* delay atleast 2 seconds to make sure the 3rd one does not expire
|
|
|
-// *
|
|
|
-// */
|
|
|
-// Thread.sleep(2000);
|
|
|
-// } catch(InterruptedException ie){}
|
|
|
-// synchronized(notify) {
|
|
|
-// notify.addAndGet(1);
|
|
|
-// notify.notifyAll();
|
|
|
-// }
|
|
|
-// }
|
|
|
-// }
|
|
|
-// }
|
|
|
-// }
|
|
|
-//
|
|
|
-// @Before
|
|
|
-// public void setUp() {
|
|
|
-// Configuration conf = new Configuration();
|
|
|
-// RMContext context = new RMContextImpl(new MemStore());
|
|
|
-// NMLivelinessMonitor nmLivelinessMonitror = new TestNmLivelinessMonitor(
|
|
|
-// context);
|
|
|
-// nmLivelinessMonitror.start();
|
|
|
-// resourceTrackerService = new ResourceTrackerService(context,
|
|
|
-// nmLivelinessMonitror, containerTokenSecretManager);
|
|
|
-// context.getNodesCollection().addListener(new VoidResourceListener());
|
|
|
-//
|
|
|
-// conf.setLong(RMConfig.NM_EXPIRY_INTERVAL, 1000);
|
|
|
-// resourceTrackerService.init(conf);
|
|
|
-// resourceTrackerService.start();
|
|
|
-// }
|
|
|
-//
|
|
|
-// private class ThirdNodeHeartBeatThread extends Thread {
|
|
|
-// public void run() {
|
|
|
-// int lastResponseID = 0;
|
|
|
-// while (!stopT) {
|
|
|
-// try {
|
|
|
-// org.apache.hadoop.yarn.server.api.records.NodeStatus nodeStatus =
|
|
|
-// recordFactory
|
|
|
-// .newRecordInstance(org.apache.hadoop.yarn.server.api.records.NodeStatus.class);
|
|
|
-// nodeStatus.setNodeId(thirdNodeRegResponse.getNodeId());
|
|
|
-// nodeStatus.setResponseId(lastResponseID);
|
|
|
-// nodeStatus.setNodeHealthStatus(recordFactory.newRecordInstance(NodeHealthStatus.class));
|
|
|
-// nodeStatus.getNodeHealthStatus().setIsNodeHealthy(true);
|
|
|
-//
|
|
|
-// NodeHeartbeatRequest request = recordFactory
|
|
|
-// .newRecordInstance(NodeHeartbeatRequest.class);
|
|
|
-// request.setNodeStatus(nodeStatus);
|
|
|
-// lastResponseID = resourceTrackerService.nodeHeartbeat(request)
|
|
|
-// .getHeartbeatResponse().getResponseId();
|
|
|
-//
|
|
|
-// } catch(Exception e) {
|
|
|
-// LOG.info("failed to heartbeat ", e);
|
|
|
-// }
|
|
|
-// }
|
|
|
-// }
|
|
|
-// }
|
|
|
-//
|
|
|
-// boolean stopT = false;
|
|
|
-// RegistrationResponse thirdNodeRegResponse;
|
|
|
-//
|
|
|
-// @Test
|
|
|
-// public void testNMExpiry() throws Exception {
|
|
|
-// String hostname1 = "localhost1";
|
|
|
-// String hostname2 = "localhost2";
|
|
|
-// String hostname3 = "localhost3";
|
|
|
-// Resource capability = recordFactory.newRecordInstance(Resource.class);
|
|
|
-//
|
|
|
-// RegisterNodeManagerRequest request1 = recordFactory
|
|
|
-// .newRecordInstance(RegisterNodeManagerRequest.class);
|
|
|
-// request1.setContainerManagerPort(0);
|
|
|
-// request1.setHost(hostname1);
|
|
|
-// request1.setHttpPort(0);
|
|
|
-// request1.setResource(capability);
|
|
|
-// resourceTrackerService.registerNodeManager(request1);
|
|
|
-//
|
|
|
-// RegisterNodeManagerRequest request2 = recordFactory
|
|
|
-// .newRecordInstance(RegisterNodeManagerRequest.class);
|
|
|
-// request2.setContainerManagerPort(0);
|
|
|
-// request2.setHost(hostname2);
|
|
|
-// request2.setHttpPort(0);
|
|
|
-// request2.setResource(capability);
|
|
|
-// resourceTrackerService.registerNodeManager(request2);
|
|
|
-//
|
|
|
-// RegisterNodeManagerRequest request3 = recordFactory
|
|
|
-// .newRecordInstance(RegisterNodeManagerRequest.class);
|
|
|
-// request3.setContainerManagerPort(0);
|
|
|
-// request3.setHost(hostname3);
|
|
|
-// request3.setHttpPort(0);
|
|
|
-// request3.setResource(capability);
|
|
|
-// thirdNodeRegResponse = resourceTrackerService.registerNodeManager(
|
|
|
-// request3).getRegistrationResponse();
|
|
|
-//
|
|
|
-// /* test to see if hostanme 3 does not expire */
|
|
|
-// stopT = false;
|
|
|
-// new ThirdNodeHeartBeatThread().start();
|
|
|
-// int timeOut = 0;
|
|
|
-// synchronized (notify) {
|
|
|
-// while (notify.get() == 0 && timeOut++ < 30) {
|
|
|
-// notify.wait(1000);
|
|
|
-// }
|
|
|
-// }
|
|
|
-// Assert.assertEquals(2, test.get());
|
|
|
-//
|
|
|
-// stopT = true;
|
|
|
-// }
|
|
|
+ private static final Log LOG = LogFactory.getLog(TestNMExpiry.class);
|
|
|
+ private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
|
|
+
|
|
|
+ ResourceTrackerService resourceTrackerService;
|
|
|
+ ContainerTokenSecretManager containerTokenSecretManager =
|
|
|
+ new ContainerTokenSecretManager();
|
|
|
+ AtomicInteger test = new AtomicInteger();
|
|
|
+ AtomicInteger notify = new AtomicInteger();
|
|
|
+
|
|
|
+ private class TestNmLivelinessMonitor extends NMLivelinessMonitor {
|
|
|
+ public TestNmLivelinessMonitor(Dispatcher dispatcher) {
|
|
|
+ super(dispatcher);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void init(Configuration conf) {
|
|
|
+ conf.setLong(RMConfig.NM_EXPIRY_INTERVAL, 1000);
|
|
|
+ super.init(conf);
|
|
|
+ }
|
|
|
+ @Override
|
|
|
+ protected void expire(NodeId id) {
|
|
|
+ LOG.info("Expired " + id);
|
|
|
+ if (test.addAndGet(1) == 2) {
|
|
|
+ try {
|
|
|
+ /* delay atleast 2 seconds to make sure the 3rd one does not expire
|
|
|
+ *
|
|
|
+ */
|
|
|
+ Thread.sleep(2000);
|
|
|
+ } catch(InterruptedException ie){}
|
|
|
+ synchronized(notify) {
|
|
|
+ notify.addAndGet(1);
|
|
|
+ notify.notifyAll();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Before
|
|
|
+ public void setUp() {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ // Dispatcher that processes events inline
|
|
|
+ Dispatcher dispatcher = new InlineDispatcher();
|
|
|
+ dispatcher.register(SchedulerEventType.class,
|
|
|
+ new InlineDispatcher.EmptyEventHandler());
|
|
|
+ dispatcher.register(RMNodeEventType.class,
|
|
|
+ new InlineDispatcher.EmptyEventHandler());
|
|
|
+ RMContext context = new RMContextImpl(new MemStore(), dispatcher, null,
|
|
|
+ null);
|
|
|
+ NMLivelinessMonitor nmLivelinessMonitor = new TestNmLivelinessMonitor(
|
|
|
+ dispatcher);
|
|
|
+ nmLivelinessMonitor.init(conf);
|
|
|
+ nmLivelinessMonitor.start();
|
|
|
+ NodesListManager nodesListManager = new NodesListManager();
|
|
|
+ nodesListManager.init(conf);
|
|
|
+ resourceTrackerService = new ResourceTrackerService(context,
|
|
|
+ nodesListManager, nmLivelinessMonitor, containerTokenSecretManager);
|
|
|
+
|
|
|
+ resourceTrackerService.init(conf);
|
|
|
+ resourceTrackerService.start();
|
|
|
+ }
|
|
|
+
|
|
|
+ private class ThirdNodeHeartBeatThread extends Thread {
|
|
|
+ public void run() {
|
|
|
+ int lastResponseID = 0;
|
|
|
+ while (!stopT) {
|
|
|
+ try {
|
|
|
+ org.apache.hadoop.yarn.server.api.records.NodeStatus nodeStatus =
|
|
|
+ recordFactory
|
|
|
+ .newRecordInstance(org.apache.hadoop.yarn.server.api.records.NodeStatus.class);
|
|
|
+ nodeStatus.setNodeId(request3.getNodeId());
|
|
|
+ nodeStatus.setResponseId(lastResponseID);
|
|
|
+ nodeStatus.setNodeHealthStatus(recordFactory.newRecordInstance(NodeHealthStatus.class));
|
|
|
+ nodeStatus.getNodeHealthStatus().setIsNodeHealthy(true);
|
|
|
+
|
|
|
+ NodeHeartbeatRequest request = recordFactory
|
|
|
+ .newRecordInstance(NodeHeartbeatRequest.class);
|
|
|
+ request.setNodeStatus(nodeStatus);
|
|
|
+ lastResponseID = resourceTrackerService.nodeHeartbeat(request)
|
|
|
+ .getHeartbeatResponse().getResponseId();
|
|
|
+
|
|
|
+ Thread.sleep(1000);
|
|
|
+ } catch(Exception e) {
|
|
|
+ LOG.info("failed to heartbeat ", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean stopT = false;
|
|
|
+ RegisterNodeManagerRequest request3;
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testNMExpiry() throws Exception {
|
|
|
+ String hostname1 = "localhost1";
|
|
|
+ String hostname2 = "localhost2";
|
|
|
+ String hostname3 = "localhost3";
|
|
|
+ Resource capability = recordFactory.newRecordInstance(Resource.class);
|
|
|
+
|
|
|
+ RegisterNodeManagerRequest request1 = recordFactory
|
|
|
+ .newRecordInstance(RegisterNodeManagerRequest.class);
|
|
|
+ NodeId nodeId1 = Records.newRecord(NodeId.class);
|
|
|
+ nodeId1.setPort(0);
|
|
|
+ nodeId1.setHost(hostname1);
|
|
|
+ request1.setNodeId(nodeId1);
|
|
|
+ request1.setHttpPort(0);
|
|
|
+ request1.setResource(capability);
|
|
|
+ resourceTrackerService.registerNodeManager(request1);
|
|
|
+
|
|
|
+ RegisterNodeManagerRequest request2 = recordFactory
|
|
|
+ .newRecordInstance(RegisterNodeManagerRequest.class);
|
|
|
+ NodeId nodeId2 = Records.newRecord(NodeId.class);
|
|
|
+ nodeId2.setPort(0);
|
|
|
+ nodeId2.setHost(hostname2);
|
|
|
+ request2.setNodeId(nodeId2);
|
|
|
+ request2.setHttpPort(0);
|
|
|
+ request2.setResource(capability);
|
|
|
+ resourceTrackerService.registerNodeManager(request2);
|
|
|
+
|
|
|
+ request3 = recordFactory
|
|
|
+ .newRecordInstance(RegisterNodeManagerRequest.class);
|
|
|
+ NodeId nodeId3 = Records.newRecord(NodeId.class);
|
|
|
+ nodeId3.setPort(0);
|
|
|
+ nodeId3.setHost(hostname3);
|
|
|
+ request3.setNodeId(nodeId3);
|
|
|
+ request3.setHttpPort(0);
|
|
|
+ request3.setResource(capability);
|
|
|
+ RegistrationResponse thirdNodeRegResponse = resourceTrackerService
|
|
|
+ .registerNodeManager(request3).getRegistrationResponse();
|
|
|
+
|
|
|
+ /* test to see if hostanme 3 does not expire */
|
|
|
+ stopT = false;
|
|
|
+ new ThirdNodeHeartBeatThread().start();
|
|
|
+ int timeOut = 0;
|
|
|
+ synchronized (notify) {
|
|
|
+ while (notify.get() == 0 && timeOut++ < 30) {
|
|
|
+ notify.wait(1000);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Assert.assertEquals(2, test.get());
|
|
|
+
|
|
|
+ stopT = true;
|
|
|
+ }
|
|
|
}
|