|
@@ -65,7 +65,6 @@ import org.apache.hadoop.yarn.api.records.Token;
|
|
|
import org.apache.hadoop.yarn.api.records.URL;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
-import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
@@ -87,7 +86,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
|
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
|
|
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
|
|
|
-import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
|
import org.junit.After;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Before;
|
|
@@ -188,34 +186,6 @@ public class TestNodeManagerResync {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // This test tests new container requests are blocked when NM starts from
|
|
|
- // scratch until it register with RM AND while NM is resyncing with RM
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- @Test(timeout=60000)
|
|
|
- public void testBlockNewContainerRequestsOnStartAndResync()
|
|
|
- throws IOException, InterruptedException, YarnException {
|
|
|
- NodeManager nm = new TestNodeManager2();
|
|
|
- int port = ServerSocketUtil.getPort(49154, 10);
|
|
|
- YarnConfiguration conf = createNMConfig(port);
|
|
|
- conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
|
|
|
- nm.init(conf);
|
|
|
- nm.start();
|
|
|
-
|
|
|
- // Start the container in running state
|
|
|
- ContainerId cId = TestNodeManagerShutdown.createContainerId();
|
|
|
- TestNodeManagerShutdown.startContainer(nm, cId, localFS, tmpDir,
|
|
|
- processStartFile, port);
|
|
|
-
|
|
|
- nm.getNMDispatcher().getEventHandler()
|
|
|
- .handle(new NodeManagerEvent(NodeManagerEventType.RESYNC));
|
|
|
- try {
|
|
|
- syncBarrier.await();
|
|
|
- } catch (BrokenBarrierException e) {
|
|
|
- }
|
|
|
- Assert.assertFalse(assertionFailedInThread.get());
|
|
|
- nm.stop();
|
|
|
- }
|
|
|
-
|
|
|
@SuppressWarnings("unchecked")
|
|
|
@Test(timeout=10000)
|
|
|
public void testNMshutdownWhenResyncThrowException() throws IOException,
|
|
@@ -493,135 +463,6 @@ public class TestNodeManagerResync {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- class TestNodeManager2 extends NodeManager {
|
|
|
-
|
|
|
- Thread launchContainersThread = null;
|
|
|
- @Override
|
|
|
- protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
|
|
- Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
|
|
- return new TestNodeStatusUpdaterImpl2(context, dispatcher,
|
|
|
- healthChecker, metrics);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- protected ContainerManagerImpl createContainerManager(Context context,
|
|
|
- ContainerExecutor exec, DeletionService del,
|
|
|
- NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager,
|
|
|
- LocalDirsHandlerService dirsHandler) {
|
|
|
- return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
|
|
|
- metrics, dirsHandler){
|
|
|
- @Override
|
|
|
- public void setBlockNewContainerRequests(
|
|
|
- boolean blockNewContainerRequests) {
|
|
|
- if (blockNewContainerRequests) {
|
|
|
- // start test thread right after blockNewContainerRequests is set
|
|
|
- // true
|
|
|
- super.setBlockNewContainerRequests(blockNewContainerRequests);
|
|
|
- launchContainersThread = new RejectedContainersLauncherThread();
|
|
|
- launchContainersThread.start();
|
|
|
- } else {
|
|
|
- // join the test thread right before blockNewContainerRequests is
|
|
|
- // reset
|
|
|
- try {
|
|
|
- // stop the test thread
|
|
|
- ((RejectedContainersLauncherThread) launchContainersThread)
|
|
|
- .setStopThreadFlag(true);
|
|
|
- launchContainersThread.join();
|
|
|
- ((RejectedContainersLauncherThread) launchContainersThread)
|
|
|
- .setStopThreadFlag(false);
|
|
|
- super.setBlockNewContainerRequests(blockNewContainerRequests);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- };
|
|
|
- }
|
|
|
-
|
|
|
- class TestNodeStatusUpdaterImpl2 extends MockNodeStatusUpdater {
|
|
|
-
|
|
|
- public TestNodeStatusUpdaterImpl2(Context context, Dispatcher dispatcher,
|
|
|
- NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
|
|
|
- super(context, dispatcher, healthChecker, metrics);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
|
|
|
- ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager
|
|
|
- .containermanager.container.Container> containers =
|
|
|
- getNMContext().getContainers();
|
|
|
-
|
|
|
- try {
|
|
|
- // ensure that containers are empty before restart nodeStatusUpdater
|
|
|
- if (!containers.isEmpty()) {
|
|
|
- for (Container container: containers.values()) {
|
|
|
- Assert.assertEquals(ContainerState.COMPLETE,
|
|
|
- container.cloneAndGetContainerStatus().getState());
|
|
|
- }
|
|
|
- }
|
|
|
- super.rebootNodeStatusUpdaterAndRegisterWithRM();
|
|
|
- // After this point new containers are free to be launched, except
|
|
|
- // containers from previous RM
|
|
|
- // Wait here so as to sync with the main test thread.
|
|
|
- syncBarrier.await();
|
|
|
- } catch (InterruptedException e) {
|
|
|
- } catch (BrokenBarrierException e) {
|
|
|
- } catch (AssertionError ae) {
|
|
|
- ae.printStackTrace();
|
|
|
- assertionFailedInThread.set(true);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- class RejectedContainersLauncherThread extends Thread {
|
|
|
-
|
|
|
- boolean isStopped = false;
|
|
|
- public void setStopThreadFlag(boolean isStopped) {
|
|
|
- this.isStopped = isStopped;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- int numContainers = 0;
|
|
|
- int numContainersRejected = 0;
|
|
|
- ContainerLaunchContext containerLaunchContext =
|
|
|
- recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
|
|
- try {
|
|
|
- while (!isStopped && numContainers < 10) {
|
|
|
- StartContainerRequest scRequest =
|
|
|
- StartContainerRequest.newInstance(containerLaunchContext,
|
|
|
- null);
|
|
|
- List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
|
|
|
- list.add(scRequest);
|
|
|
- StartContainersRequest allRequests =
|
|
|
- StartContainersRequest.newInstance(list);
|
|
|
- System.out.println("no. of containers to be launched: "
|
|
|
- + numContainers);
|
|
|
- numContainers++;
|
|
|
- try {
|
|
|
- getContainerManager().startContainers(allRequests);
|
|
|
- } catch (YarnException e) {
|
|
|
- numContainersRejected++;
|
|
|
- Assert.assertTrue(e.getMessage().contains(
|
|
|
- "Rejecting new containers as NodeManager has not" +
|
|
|
- " yet connected with ResourceManager"));
|
|
|
- Assert.assertEquals(NMNotYetReadyException.class.getName(), e
|
|
|
- .getClass().getName());
|
|
|
- } catch (IOException e) {
|
|
|
- e.printStackTrace();
|
|
|
- assertionFailedInThread.set(true);
|
|
|
- }
|
|
|
- }
|
|
|
- // no. of containers to be launched should equal to no. of
|
|
|
- // containers rejected
|
|
|
- Assert.assertEquals(numContainers, numContainersRejected);
|
|
|
- } catch (AssertionError ae) {
|
|
|
- assertionFailedInThread.set(true);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
class TestNodeManager3 extends NodeManager {
|
|
|
|
|
|
private int registrationCount = 0;
|
|
@@ -681,11 +522,6 @@ public class TestNodeManagerResync {
|
|
|
LocalDirsHandlerService dirsHandler) {
|
|
|
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
|
|
|
metrics, dirsHandler){
|
|
|
- @Override
|
|
|
- public void
|
|
|
- setBlockNewContainerRequests(boolean blockNewContainerRequests) {
|
|
|
- // do nothing
|
|
|
- }
|
|
|
|
|
|
@Override
|
|
|
protected void authorizeGetAndStopContainerRequest(
|