|
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
|
|
import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
|
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
|
@@ -65,6 +66,7 @@ public class TestNodeManagerResync {
|
|
private FileContext localFS;
|
|
private FileContext localFS;
|
|
private CyclicBarrier syncBarrier;
|
|
private CyclicBarrier syncBarrier;
|
|
private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
|
|
private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
|
|
|
|
+ private AtomicBoolean isNMShutdownCalled = new AtomicBoolean(false);
|
|
|
|
|
|
@Before
|
|
@Before
|
|
public void setup() throws UnsupportedFileSystemException {
|
|
public void setup() throws UnsupportedFileSystemException {
|
|
@@ -137,6 +139,30 @@ public class TestNodeManagerResync {
|
|
Assert.assertFalse(assertionFailedInThread.get());
|
|
Assert.assertFalse(assertionFailedInThread.get());
|
|
nm.stop();
|
|
nm.stop();
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
|
+ @Test(timeout=10000)
|
|
|
|
+ public void testNMshutdownWhenResyncThrowException() throws IOException,
|
|
|
|
+ InterruptedException, YarnException {
|
|
|
|
+ NodeManager nm = new TestNodeManager3();
|
|
|
|
+ YarnConfiguration conf = createNMConfig();
|
|
|
|
+ nm.init(conf);
|
|
|
|
+ nm.start();
|
|
|
|
+ Assert.assertEquals(1, ((TestNodeManager3) nm).getNMRegistrationCount());
|
|
|
|
+ nm.getNMDispatcher().getEventHandler()
|
|
|
|
+ .handle(new NodeManagerEvent(NodeManagerEventType.RESYNC));
|
|
|
|
+
|
|
|
|
+ synchronized (isNMShutdownCalled) {
|
|
|
|
+ while (isNMShutdownCalled.get() == false) {
|
|
|
|
+ try {
|
|
|
|
+ isNMShutdownCalled.wait();
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ Assert.assertTrue("NM shutdown not called.",isNMShutdownCalled.get());
|
|
|
|
+ }
|
|
|
|
|
|
private YarnConfiguration createNMConfig() {
|
|
private YarnConfiguration createNMConfig() {
|
|
YarnConfiguration conf = new YarnConfiguration();
|
|
YarnConfiguration conf = new YarnConfiguration();
|
|
@@ -322,4 +348,44 @@ public class TestNodeManagerResync {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ class TestNodeManager3 extends NodeManager {
|
|
|
|
+
|
|
|
|
+ private int registrationCount = 0;
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
|
|
|
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
|
|
|
+ return new TestNodeStatusUpdaterImpl3(context, dispatcher, healthChecker,
|
|
|
|
+ metrics);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public int getNMRegistrationCount() {
|
|
|
|
+ return registrationCount;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ protected void shutDown() {
|
|
|
|
+ synchronized (isNMShutdownCalled) {
|
|
|
|
+ isNMShutdownCalled.set(true);
|
|
|
|
+ isNMShutdownCalled.notify();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ class TestNodeStatusUpdaterImpl3 extends MockNodeStatusUpdater {
|
|
|
|
+
|
|
|
|
+ public TestNodeStatusUpdaterImpl3(Context context, Dispatcher dispatcher,
|
|
|
|
+ NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
|
|
|
|
+ super(context, dispatcher, healthChecker, metrics);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ protected void registerWithRM() throws YarnException, IOException {
|
|
|
|
+ super.registerWithRM();
|
|
|
|
+ registrationCount++;
|
|
|
|
+ if (registrationCount > 1) {
|
|
|
|
+ throw new YarnRuntimeException("Registration with RM failed.");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }}
|
|
}
|
|
}
|