|
@@ -24,12 +24,14 @@ import java.nio.ByteBuffer;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
import org.apache.hadoop.security.Credentials;
|
|
|
+import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
|
|
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|
@@ -73,6 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
|
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
|
|
@@ -83,6 +86,9 @@ import org.apache.log4j.Logger;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
+import com.google.common.base.Supplier;
|
|
|
+
|
|
|
+import static org.junit.Assert.fail;
|
|
|
import static org.mockito.Matchers.any;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.when;
|
|
@@ -216,10 +222,14 @@ public class TestApplicationMasterLauncher {
|
|
|
// kick the scheduling
|
|
|
nm1.nodeHeartbeat(true);
|
|
|
|
|
|
- int waitCount = 0;
|
|
|
- while (containerManager.launched == false && waitCount++ < 20) {
|
|
|
- LOG.info("Waiting for AM Launch to happen..");
|
|
|
- Thread.sleep(1000);
|
|
|
+ try {
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override public Boolean get() {
|
|
|
+ return containerManager.launched;
|
|
|
+ }
|
|
|
+ }, 100, 200 * 100);
|
|
|
+ } catch (TimeoutException e) {
|
|
|
+ fail("timed out while waiting for AM Launch to happen.");
|
|
|
}
|
|
|
Assert.assertTrue(containerManager.launched);
|
|
|
|
|
@@ -233,7 +243,7 @@ public class TestApplicationMasterLauncher {
|
|
|
.getMasterContainer().getId()
|
|
|
.toString(), containerManager.containerIdAtContainerManager);
|
|
|
Assert.assertEquals(nm1.getNodeId().toString(),
|
|
|
- containerManager.nmHostAtContainerManager);
|
|
|
+ containerManager.nmHostAtContainerManager);
|
|
|
Assert.assertEquals(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS,
|
|
|
containerManager.maxAppAttempts);
|
|
|
|
|
@@ -246,10 +256,14 @@ public class TestApplicationMasterLauncher {
|
|
|
nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE);
|
|
|
rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
|
|
|
|
|
|
- waitCount = 0;
|
|
|
- while (containerManager.cleanedup == false && waitCount++ < 20) {
|
|
|
- LOG.info("Waiting for AM Cleanup to happen..");
|
|
|
- Thread.sleep(1000);
|
|
|
+ try {
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override public Boolean get() {
|
|
|
+ return containerManager.cleanedup;
|
|
|
+ }
|
|
|
+ }, 100, 200 * 100);
|
|
|
+ } catch (TimeoutException e) {
|
|
|
+ fail("timed out while waiting for AM cleanup to happen.");
|
|
|
}
|
|
|
Assert.assertTrue(containerManager.cleanedup);
|
|
|
|
|
@@ -257,6 +271,48 @@ public class TestApplicationMasterLauncher {
|
|
|
rm.stop();
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testAMCleanupBeforeLaunch() throws Exception {
|
|
|
+ MockRM rm = new MockRM();
|
|
|
+ rm.start();
|
|
|
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5120);
|
|
|
+ RMApp app = rm.submitApp(2000);
|
|
|
+ // kick the scheduling
|
|
|
+ nm1.nodeHeartbeat(true);
|
|
|
+ RMAppAttempt attempt = app.getCurrentAppAttempt();
|
|
|
+
|
|
|
+ try {
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override public Boolean get() {
|
|
|
+ return attempt.getMasterContainer() != null;
|
|
|
+ }
|
|
|
+ }, 10, 200 * 100);
|
|
|
+ } catch (TimeoutException e) {
|
|
|
+ fail("timed out while waiting for AM Launch to happen.");
|
|
|
+ }
|
|
|
+
|
|
|
+ //send kill before launch
|
|
|
+ rm.killApp(app.getApplicationId());
|
|
|
+ rm.waitForState(app.getApplicationId(), RMAppState.KILLED);
|
|
|
+ //Launch after kill
|
|
|
+ AMLauncher launcher = new AMLauncher(rm.getRMContext(),
|
|
|
+ attempt, AMLauncherEventType.LAUNCH, rm.getConfig()) {
|
|
|
+ @Override
|
|
|
+ public void onAMLaunchFailed(ContainerId containerId, Exception e) {
|
|
|
+ Assert.assertFalse("NullPointerException happens "
|
|
|
+ + " while launching " + containerId,
|
|
|
+ e instanceof NullPointerException);
|
|
|
+ }
|
|
|
+ @Override
|
|
|
+ protected ContainerManagementProtocol getContainerMgrProxy(
|
|
|
+ ContainerId containerId) {
|
|
|
+ return new MyContainerManagerImpl();
|
|
|
+ }
|
|
|
+ };
|
|
|
+ launcher.run();
|
|
|
+ rm.stop();
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testRetriesOnFailures() throws Exception {
|
|
|
final ContainerManagementProtocol mockProxy =
|
|
@@ -303,7 +359,7 @@ public class TestApplicationMasterLauncher {
|
|
|
rm.drainEvents();
|
|
|
|
|
|
MockRM.waitForState(app.getCurrentAppAttempt(),
|
|
|
- RMAppAttemptState.LAUNCHED, 500);
|
|
|
+ RMAppAttemptState.LAUNCHED, 500);
|
|
|
}
|
|
|
|
|
|
|
|
@@ -337,9 +393,9 @@ public class TestApplicationMasterLauncher {
|
|
|
|
|
|
AllocateResponse amrs = null;
|
|
|
try {
|
|
|
- amrs = am.allocate(new ArrayList<ResourceRequest>(),
|
|
|
+ amrs = am.allocate(new ArrayList<ResourceRequest>(),
|
|
|
new ArrayList<ContainerId>());
|
|
|
- Assert.fail();
|
|
|
+ Assert.fail();
|
|
|
} catch (ApplicationMasterNotRegisteredException e) {
|
|
|
}
|
|
|
|