|
@@ -20,55 +20,96 @@ package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
|
-import junit.framework.TestCase;
|
|
|
-
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
+import org.apache.hadoop.yarn.event.Event;
|
|
|
+import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
|
|
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
|
|
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
|
|
|
import org.apache.hadoop.yarn.util.Records;
|
|
|
import org.junit.After;
|
|
|
+import org.junit.Assert;
|
|
|
import org.junit.Before;
|
|
|
+import org.junit.Test;
|
|
|
|
|
|
-public class TestRMNMRPCResponseId extends TestCase {
|
|
|
+public class TestRMNMRPCResponseId {
|
|
|
private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
|
|
ResourceTrackerService resourceTrackerService;
|
|
|
ContainerTokenSecretManager containerTokenSecretManager =
|
|
|
new ContainerTokenSecretManager();
|
|
|
- private NodeId nodeid;
|
|
|
+ private NodeId nodeId;
|
|
|
+
|
|
|
+ private class InlineDispatcher extends AsyncDispatcher {
|
|
|
+ private class InlineEventHandler implements EventHandler {
|
|
|
+ private final InlineDispatcher dispatcher;
|
|
|
+ public InlineEventHandler(InlineDispatcher dispatcher) {
|
|
|
+ this.dispatcher = dispatcher;
|
|
|
+ }
|
|
|
+ @Override
|
|
|
+ public void handle(Event event) {
|
|
|
+ this.dispatcher.dispatch(event);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ public void dispatch(Event event) {
|
|
|
+ super.dispatch(event);
|
|
|
+ }
|
|
|
+ @Override
|
|
|
+ public EventHandler getEventHandler() {
|
|
|
+ return new InlineEventHandler(this);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
@Before
|
|
|
public void setUp() {
|
|
|
- Dispatcher dispatcher = new AsyncDispatcher();
|
|
|
+ // Dispatcher that processes events inline
|
|
|
+ Dispatcher dispatcher = new InlineDispatcher();
|
|
|
+ dispatcher.register(SchedulerEventType.class, new EventHandler<Event>() {
|
|
|
+ @Override
|
|
|
+ public void handle(Event event) {
|
|
|
+ ; // ignore
|
|
|
+ }
|
|
|
+ });
|
|
|
RMContext context = new RMContextImpl(new MemStore(), dispatcher, null,
|
|
|
null);
|
|
|
- resourceTrackerService = new ResourceTrackerService(context, null, null,
|
|
|
+ dispatcher.register(RMNodeEventType.class,
|
|
|
+ new ResourceManager.NodeEventDispatcher(context));
|
|
|
+ NodesListManager nodesListManager = new NodesListManager();
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ nodesListManager.init(conf);
|
|
|
+ resourceTrackerService = new ResourceTrackerService(context,
|
|
|
+ nodesListManager, new NMLivelinessMonitor(dispatcher),
|
|
|
containerTokenSecretManager);
|
|
|
- resourceTrackerService.init(new Configuration());
|
|
|
+ resourceTrackerService.init(conf);
|
|
|
}
|
|
|
|
|
|
@After
|
|
|
public void tearDown() {
|
|
|
/* do nothing */
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ @Test
|
|
|
public void testRPCResponseId() throws IOException {
|
|
|
String node = "localhost";
|
|
|
Resource capability = recordFactory.newRecordInstance(Resource.class);
|
|
|
RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
|
|
|
- NodeId nodeId = Records.newRecord(NodeId.class);
|
|
|
+ nodeId = Records.newRecord(NodeId.class);
|
|
|
nodeId.setHost(node);
|
|
|
nodeId.setPort(1234);
|
|
|
request.setNodeId(nodeId);
|
|
@@ -84,7 +125,7 @@ public class TestRMNMRPCResponseId extends TestCase {
|
|
|
|
|
|
org.apache.hadoop.yarn.server.api.records.NodeStatus nodeStatus = recordFactory.
|
|
|
newRecordInstance(org.apache.hadoop.yarn.server.api.records.NodeStatus.class);
|
|
|
- nodeStatus.setNodeId(nodeid);
|
|
|
+ nodeStatus.setNodeId(nodeId);
|
|
|
NodeHealthStatus nodeHealthStatus = recordFactory.newRecordInstance(NodeHealthStatus.class);
|
|
|
nodeHealthStatus.setIsNodeHealthy(true);
|
|
|
nodeStatus.setNodeHealthStatus(nodeHealthStatus);
|
|
@@ -95,21 +136,21 @@ public class TestRMNMRPCResponseId extends TestCase {
|
|
|
nodeStatus.setResponseId(0);
|
|
|
HeartbeatResponse response = resourceTrackerService.nodeHeartbeat(
|
|
|
nodeHeartBeatRequest).getHeartbeatResponse();
|
|
|
- assertTrue(response.getResponseId() == 1);
|
|
|
+ Assert.assertTrue(response.getResponseId() == 1);
|
|
|
|
|
|
nodeStatus.setResponseId(response.getResponseId());
|
|
|
response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest)
|
|
|
.getHeartbeatResponse();
|
|
|
- assertTrue(response.getResponseId() == 2);
|
|
|
+ Assert.assertTrue(response.getResponseId() == 2);
|
|
|
|
|
|
/* try calling with less response id */
|
|
|
response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest)
|
|
|
.getHeartbeatResponse();
|
|
|
- assertTrue(response.getResponseId() == 2);
|
|
|
+ Assert.assertTrue(response.getResponseId() == 2);
|
|
|
|
|
|
nodeStatus.setResponseId(0);
|
|
|
response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest)
|
|
|
.getHeartbeatResponse();
|
|
|
- assertTrue(response.getReboot() == true);
|
|
|
+ Assert.assertTrue(response.getReboot() == true);
|
|
|
}
|
|
|
}
|