|
@@ -31,6 +31,7 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
import java.util.concurrent.ConcurrentSkipListMap;
|
|
|
+import java.util.concurrent.CyclicBarrier;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -51,9 +52,11 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
+import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
+import org.apache.hadoop.yarn.ipc.RPCUtil;
|
|
|
import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
|
@@ -167,6 +170,10 @@ public class TestNodeStatusUpdater {
|
|
|
throws YarnRemoteException {
|
|
|
NodeStatus nodeStatus = request.getNodeStatus();
|
|
|
LOG.info("Got heartbeat number " + heartBeatID);
|
|
|
+ NodeManagerMetrics mockMetrics = mock(NodeManagerMetrics.class);
|
|
|
+ Dispatcher mockDispatcher = mock(Dispatcher.class);
|
|
|
+ EventHandler mockEventHandler = mock(EventHandler.class);
|
|
|
+ when(mockDispatcher.getEventHandler()).thenReturn(mockEventHandler);
|
|
|
nodeStatus.setResponseId(heartBeatID++);
|
|
|
Map<ApplicationId, List<ContainerStatus>> appToContainers =
|
|
|
getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
|
|
@@ -183,7 +190,8 @@ public class TestNodeStatusUpdater {
|
|
|
launchContext.setContainerId(firstContainerID);
|
|
|
launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
|
|
|
launchContext.getResource().setMemory(2);
|
|
|
- Container container = new ContainerImpl(conf , null, launchContext, null, null);
|
|
|
+ Container container = new ContainerImpl(conf , mockDispatcher,
|
|
|
+ launchContext, null, mockMetrics);
|
|
|
this.context.getContainers().put(firstContainerID, container);
|
|
|
} else if (heartBeatID == 2) {
|
|
|
// Checks on the RM end
|
|
@@ -207,7 +215,8 @@ public class TestNodeStatusUpdater {
|
|
|
launchContext.setContainerId(secondContainerID);
|
|
|
launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
|
|
|
launchContext.getResource().setMemory(3);
|
|
|
- Container container = new ContainerImpl(conf, null, launchContext, null, null);
|
|
|
+ Container container = new ContainerImpl(conf, mockDispatcher,
|
|
|
+ launchContext, null, mockMetrics);
|
|
|
this.context.getContainers().put(secondContainerID, container);
|
|
|
} else if (heartBeatID == 3) {
|
|
|
// Checks on the RM end
|
|
@@ -229,13 +238,14 @@ public class TestNodeStatusUpdater {
|
|
|
}
|
|
|
|
|
|
private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
|
|
|
- public ResourceTracker resourceTracker = new MyResourceTracker(this.context);
|
|
|
+ public ResourceTracker resourceTracker;
|
|
|
private Context context;
|
|
|
|
|
|
public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
|
|
|
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
|
|
|
super(context, dispatcher, healthChecker, metrics);
|
|
|
this.context = context;
|
|
|
+ resourceTracker = new MyResourceTracker(this.context);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -312,6 +322,21 @@ public class TestNodeStatusUpdater {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private class MyNodeStatusUpdater5 extends NodeStatusUpdaterImpl {
|
|
|
+ private ResourceTracker resourceTracker;
|
|
|
+
|
|
|
+ public MyNodeStatusUpdater5(Context context, Dispatcher dispatcher,
|
|
|
+ NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
|
|
|
+ super(context, dispatcher, healthChecker, metrics);
|
|
|
+ resourceTracker = new MyResourceTracker5();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected ResourceTracker getRMClient() {
|
|
|
+ return resourceTracker;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private class MyNodeManager extends NodeManager {
|
|
|
|
|
|
private MyNodeStatusUpdater3 nodeStatusUpdater;
|
|
@@ -328,6 +353,32 @@ public class TestNodeStatusUpdater {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private class MyNodeManager2 extends NodeManager {
|
|
|
+ public boolean isStopped = false;
|
|
|
+ private NodeStatusUpdater nodeStatusUpdater;
|
|
|
+ private CyclicBarrier syncBarrier;
|
|
|
+ public MyNodeManager2 (CyclicBarrier syncBarrier) {
|
|
|
+ this.syncBarrier = syncBarrier;
|
|
|
+ }
|
|
|
+ @Override
|
|
|
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
|
|
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
|
|
+ nodeStatusUpdater =
|
|
|
+ new MyNodeStatusUpdater5(context, dispatcher, healthChecker,
|
|
|
+ metrics);
|
|
|
+ return nodeStatusUpdater;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void stop() {
|
|
|
+ super.stop();
|
|
|
+ isStopped = true;
|
|
|
+ try {
|
|
|
+ syncBarrier.await();
|
|
|
+ } catch (Exception e) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
//
|
|
|
private class MyResourceTracker2 implements ResourceTracker {
|
|
|
public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
|
|
@@ -505,6 +556,26 @@ public class TestNodeStatusUpdater {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private class MyResourceTracker5 implements ResourceTracker {
|
|
|
+ public NodeAction registerNodeAction = NodeAction.NORMAL;
|
|
|
+ @Override
|
|
|
+ public RegisterNodeManagerResponse registerNodeManager(
|
|
|
+ RegisterNodeManagerRequest request) throws YarnRemoteException {
|
|
|
+
|
|
|
+ RegisterNodeManagerResponse response = recordFactory
|
|
|
+ .newRecordInstance(RegisterNodeManagerResponse.class);
|
|
|
+ response.setNodeAction(registerNodeAction );
|
|
|
+ return response;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
|
|
|
+ throws YarnRemoteException {
|
|
|
+ heartBeatID++;
|
|
|
+ throw RPCUtil.getRemoteException("NodeHeartbeat exception");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Before
|
|
|
public void clearError() {
|
|
|
nmStartError = null;
|
|
@@ -883,6 +954,30 @@ public class TestNodeStatusUpdater {
|
|
|
nm.stop();
|
|
|
}
|
|
|
|
|
|
+ @Test(timeout = 20000)
|
|
|
+ public void testNodeStatusUpdaterRetryAndNMShutdown()
|
|
|
+ throws InterruptedException {
|
|
|
+ final long connectionWaitSecs = 1;
|
|
|
+ final long connectionRetryIntervalSecs = 1;
|
|
|
+ YarnConfiguration conf = createNMConfig();
|
|
|
+ conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
|
|
|
+ connectionWaitSecs);
|
|
|
+ conf.setLong(YarnConfiguration
|
|
|
+ .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
|
|
|
+ connectionRetryIntervalSecs);
|
|
|
+ CyclicBarrier syncBarrier = new CyclicBarrier(2);
|
|
|
+ nm = new MyNodeManager2(syncBarrier);
|
|
|
+ nm.init(conf);
|
|
|
+ nm.start();
|
|
|
+ try {
|
|
|
+ syncBarrier.await();
|
|
|
+ } catch (Exception e) {
|
|
|
+ }
|
|
|
+ Assert.assertTrue(((MyNodeManager2) nm).isStopped);
|
|
|
+ Assert.assertTrue("calculate heartBeatCount based on" +
|
|
|
+ " connectionWaitSecs and RetryIntervalSecs", heartBeatID == 2);
|
|
|
+ }
|
|
|
+
|
|
|
private class MyNMContext extends NMContext {
|
|
|
ConcurrentMap<ContainerId, Container> containers =
|
|
|
new ConcurrentSkipListMap<ContainerId, Container>();
|