|
@@ -19,6 +19,7 @@
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.net.InetSocketAddress;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
@@ -26,6 +27,7 @@ import java.util.Map;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
|
|
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|
@@ -43,11 +45,18 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
import org.apache.hadoop.yarn.api.records.SerializedException;
|
|
|
import org.apache.hadoop.yarn.api.records.Token;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
+import org.apache.hadoop.yarn.event.DrainDispatcher;
|
|
|
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
|
|
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
|
|
+import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
|
|
+import org.apache.hadoop.yarn.ipc.YarnRPC;
|
|
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
@@ -58,6 +67,10 @@ import org.apache.log4j.Logger;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
+import static org.mockito.Matchers.any;
|
|
|
+import static org.mockito.Mockito.mock;
|
|
|
+import static org.mockito.Mockito.when;
|
|
|
+
|
|
|
public class TestApplicationMasterLauncher {
|
|
|
|
|
|
private static final Log LOG = LogFactory
|
|
@@ -177,8 +190,62 @@ public class TestApplicationMasterLauncher {
|
|
|
am.waitForState(RMAppAttemptState.FINISHED);
|
|
|
rm.stop();
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testRetriesOnFailures() throws Exception {
|
|
|
+ final ContainerManagementProtocol mockProxy =
|
|
|
+ mock(ContainerManagementProtocol.class);
|
|
|
+ final StartContainersResponse mockResponse =
|
|
|
+ mock(StartContainersResponse.class);
|
|
|
+ when(mockProxy.startContainers(any(StartContainersRequest.class)))
|
|
|
+ .thenThrow(new NMNotYetReadyException("foo")).thenReturn(mockResponse);
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
|
|
+ conf.setInt(YarnConfiguration.CLIENT_NM_CONNECT_RETRY_INTERVAL_MS, 1);
|
|
|
+ final DrainDispatcher dispatcher = new DrainDispatcher();
|
|
|
+ MockRM rm = new MockRMWithCustomAMLauncher(conf, null) {
|
|
|
+ @Override
|
|
|
+ protected ApplicationMasterLauncher createAMLauncher() {
|
|
|
+ return new ApplicationMasterLauncher(getRMContext()) {
|
|
|
+ @Override
|
|
|
+ protected Runnable createRunnableLauncher(RMAppAttempt application,
|
|
|
+ AMLauncherEventType event) {
|
|
|
+ return new AMLauncher(context, application, event, getConfig()) {
|
|
|
+ @Override
|
|
|
+ protected YarnRPC getYarnRPC() {
|
|
|
+ YarnRPC mockRpc = mock(YarnRPC.class);
|
|
|
+
|
|
|
+ when(mockRpc.getProxy(
|
|
|
+ any(Class.class),
|
|
|
+ any(InetSocketAddress.class),
|
|
|
+ any(Configuration.class)))
|
|
|
+ .thenReturn(mockProxy);
|
|
|
+ return mockRpc;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected Dispatcher createDispatcher() {
|
|
|
+ return dispatcher;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ rm.start();
|
|
|
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5120);
|
|
|
+
|
|
|
+ RMApp app = rm.submitApp(2000);
|
|
|
+ final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
|
|
|
+ .getAppAttemptId();
|
|
|
+
|
|
|
+ // kick the scheduling
|
|
|
+ nm1.nodeHeartbeat(true);
|
|
|
+ dispatcher.await();
|
|
|
+
|
|
|
+ rm.waitForState(appAttemptId, RMAppAttemptState.LAUNCHED, 500);
|
|
|
+ }
|
|
|
+
|
|
|
@SuppressWarnings("unused")
|
|
|
@Test(timeout = 100000)
|
|
|
public void testallocateBeforeAMRegistration() throws Exception {
|