|
@@ -18,6 +18,7 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
|
|
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
|
import static org.mockito.Matchers.any;
|
|
|
import static org.mockito.Matchers.anyLong;
|
|
|
import static org.mockito.Matchers.eq;
|
|
@@ -28,17 +29,23 @@ import static org.mockito.Mockito.spy;
|
|
|
import static org.mockito.Mockito.times;
|
|
|
import static org.mockito.Mockito.verify;
|
|
|
|
|
|
+import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
import java.util.Map;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileContext;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
|
|
import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
import org.apache.hadoop.ipc.Server;
|
|
|
import org.apache.hadoop.security.Credentials;
|
|
@@ -46,14 +53,22 @@ import org.apache.hadoop.security.SecurityUtil;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
|
|
import org.apache.hadoop.yarn.MockApps;
|
|
|
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
|
|
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|
|
+import org.apache.hadoop.yarn.api.records.LocalResource;
|
|
|
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
|
|
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
|
|
+import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
|
|
import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
+import org.apache.hadoop.yarn.api.records.URL;
|
|
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
|
@@ -251,7 +266,113 @@ public class TestRMAppTransitions {
|
|
|
rmDispatcher.start();
|
|
|
}
|
|
|
|
|
|
- protected RMApp createNewTestApp(ApplicationSubmissionContext submissionContext) {
|
|
|
+ private ByteBuffer getTokens() throws IOException {
|
|
|
+ Credentials ts = new Credentials();
|
|
|
+ DataOutputBuffer dob = new DataOutputBuffer();
|
|
|
+ ts.writeTokenStorageToStream(dob);
|
|
|
+ ByteBuffer securityTokens =
|
|
|
+ ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
|
|
+ return securityTokens;
|
|
|
+ }
|
|
|
+
|
|
|
+ private ByteBuffer getTokensConf() throws IOException {
|
|
|
+
|
|
|
+ DataOutputBuffer dob = new DataOutputBuffer();
|
|
|
+ Configuration appConf = new Configuration(false);
|
|
|
+ appConf.clear();
|
|
|
+ appConf.set("dfs.nameservices", "mycluster1,mycluster2");
|
|
|
+ appConf.set("dfs.namenode.rpc-address.mycluster2.nn1",
|
|
|
+ "123.0.0.1");
|
|
|
+ appConf.set("dfs.namenode.rpc-address.mycluster3.nn2",
|
|
|
+ "123.0.0.2");
|
|
|
+ appConf.write(dob);
|
|
|
+ ByteBuffer tokenConf =
|
|
|
+ ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
|
|
+ return tokenConf;
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map<String, LocalResource> getLocalResources()
|
|
|
+ throws UnsupportedFileSystemException {
|
|
|
+ FileContext localFS = FileContext.getLocalFSFileContext();
|
|
|
+ File tmpDir = new File("target");
|
|
|
+ File scriptFile = new File(tmpDir, "scriptFile.sh");
|
|
|
+ URL resourceURL =
|
|
|
+ URL.fromPath(localFS
|
|
|
+ .makeQualified(new Path(scriptFile.getAbsolutePath())));
|
|
|
+ LocalResource localRes =
|
|
|
+ Records.newRecord(LocalResource.class);
|
|
|
+ localRes.setResource(resourceURL);
|
|
|
+ localRes.setSize(-1);
|
|
|
+ localRes.setVisibility(LocalResourceVisibility.APPLICATION);
|
|
|
+ localRes.setType(LocalResourceType.FILE);
|
|
|
+ localRes.setTimestamp(scriptFile.lastModified());
|
|
|
+ String destinationFile = "dest_file";
|
|
|
+ Map<String, LocalResource> localResources =
|
|
|
+ new HashMap<String, LocalResource>();
|
|
|
+ localResources.put(destinationFile, localRes);
|
|
|
+ return localResources;
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map<String, String> getEnvironment() {
|
|
|
+ Map<String, String> userSetEnv = new HashMap<String, String>();
|
|
|
+ userSetEnv.put(Environment.CONTAINER_ID.name(), "user_set_container_id");
|
|
|
+ userSetEnv.put(Environment.NM_HOST.name(), "user_set_NM_HOST");
|
|
|
+ userSetEnv.put(Environment.NM_PORT.name(), "user_set_NM_PORT");
|
|
|
+ userSetEnv.put(Environment.NM_HTTP_PORT.name(), "user_set_NM_HTTP_PORT");
|
|
|
+ userSetEnv.put(Environment.LOCAL_DIRS.name(), "user_set_LOCAL_DIR");
|
|
|
+ userSetEnv.put(Environment.USER.key(), "user_set_" +
|
|
|
+ Environment.USER.key());
|
|
|
+ userSetEnv.put(Environment.LOGNAME.name(), "user_set_LOGNAME");
|
|
|
+ userSetEnv.put(Environment.PWD.name(), "user_set_PWD");
|
|
|
+ userSetEnv.put(Environment.HOME.name(), "user_set_HOME");
|
|
|
+ return userSetEnv;
|
|
|
+ }
|
|
|
+
|
|
|
+ private ContainerRetryContext getContainerRetryContext() {
|
|
|
+ ContainerRetryContext containerRetryContext = ContainerRetryContext
|
|
|
+ .newInstance(
|
|
|
+ ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES,
|
|
|
+ new HashSet<>(Arrays.asList(Integer.valueOf(111))), 0, 0);
|
|
|
+ return containerRetryContext;
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map<String, ByteBuffer> getServiceData() {
|
|
|
+ Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
|
|
|
+ String serviceName = "non_exist_auxService";
|
|
|
+ serviceData.put(serviceName, ByteBuffer.wrap(serviceName.getBytes()));
|
|
|
+ return serviceData;
|
|
|
+ }
|
|
|
+
|
|
|
+ private ContainerLaunchContext prepareContainerLaunchContext()
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
+ ContainerLaunchContext clc =
|
|
|
+ Records.newRecord(ContainerLaunchContext.class);
|
|
|
+ clc.setCommands(Arrays.asList("/bin/sleep 5"));
|
|
|
+ if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
+ clc.setTokens(getTokens());
|
|
|
+ clc.setTokensConf(getTokensConf());
|
|
|
+ }
|
|
|
+ clc.setLocalResources(getLocalResources());
|
|
|
+ clc.setEnvironment(getEnvironment());
|
|
|
+ clc.setContainerRetryContext(getContainerRetryContext());
|
|
|
+ clc.setServiceData(getServiceData());
|
|
|
+ return clc;
|
|
|
+ }
|
|
|
+
|
|
|
+ private LogAggregationContext getLogAggregationContext() {
|
|
|
+ LogAggregationContext logAggregationContext =
|
|
|
+ LogAggregationContext.newInstance(
|
|
|
+ "includePattern", "excludePattern",
|
|
|
+ "rolledLogsIncludePattern",
|
|
|
+ "rolledLogsExcludePattern",
|
|
|
+ "policyClass",
|
|
|
+ "policyParameters");
|
|
|
+ return logAggregationContext;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected RMApp createNewTestApp(ApplicationSubmissionContext
|
|
|
+ submissionContext) throws IOException {
|
|
|
ApplicationId applicationId = MockApps.newAppID(appId++);
|
|
|
String user = MockApps.newUserName();
|
|
|
String name = MockApps.newAppName();
|
|
@@ -270,7 +391,9 @@ public class TestRMAppTransitions {
|
|
|
// but applicationId is still set for safety
|
|
|
submissionContext.setApplicationId(applicationId);
|
|
|
submissionContext.setPriority(Priority.newInstance(0));
|
|
|
- submissionContext.setAMContainerSpec(mock(ContainerLaunchContext.class));
|
|
|
+ submissionContext.setAMContainerSpec(prepareContainerLaunchContext());
|
|
|
+ submissionContext.setLogAggregationContext(getLogAggregationContext());
|
|
|
+
|
|
|
RMApp application = new RMAppImpl(applicationId, rmContext, conf, name,
|
|
|
user, queue, submissionContext, scheduler, masterService,
|
|
|
System.currentTimeMillis(), "YARN", null,
|
|
@@ -405,6 +528,7 @@ public class TestRMAppTransitions {
|
|
|
// verify sendATSCreateEvent() is get called during
|
|
|
// AddApplicationToSchedulerTransition.
|
|
|
verify(publisher).appCreated(eq(application), anyLong());
|
|
|
+ verifyRMAppFieldsForNonFinalTransitions(application);
|
|
|
return application;
|
|
|
}
|
|
|
|
|
@@ -422,6 +546,7 @@ public class TestRMAppTransitions {
|
|
|
application.handle(event);
|
|
|
assertStartTimeSet(application);
|
|
|
assertAppState(RMAppState.SUBMITTED, application);
|
|
|
+ verifyRMAppFieldsForNonFinalTransitions(application);
|
|
|
return application;
|
|
|
}
|
|
|
|
|
@@ -530,6 +655,7 @@ public class TestRMAppTransitions {
|
|
|
assertFailed(application,
|
|
|
".*Unmanaged application.*Failing the application.*");
|
|
|
assertAppFinalStateSaved(application);
|
|
|
+ verifyRMAppFieldsForFinalTransitions(application);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -539,6 +665,7 @@ public class TestRMAppTransitions {
|
|
|
RMApp application = testCreateAppFinished(null, diagMsg);
|
|
|
Assert.assertTrue("Finished application missing diagnostics",
|
|
|
application.getDiagnostics().indexOf(diagMsg) != -1);
|
|
|
+ verifyRMAppFieldsForFinalTransitions(application);
|
|
|
}
|
|
|
|
|
|
@Test (timeout = 30000)
|
|
@@ -546,15 +673,7 @@ public class TestRMAppTransitions {
|
|
|
LOG.info("--- START: testAppRecoverPath ---");
|
|
|
ApplicationSubmissionContext sub =
|
|
|
Records.newRecord(ApplicationSubmissionContext.class);
|
|
|
- ContainerLaunchContext clc =
|
|
|
- Records.newRecord(ContainerLaunchContext.class);
|
|
|
- Credentials credentials = new Credentials();
|
|
|
- DataOutputBuffer dob = new DataOutputBuffer();
|
|
|
- credentials.writeTokenStorageToStream(dob);
|
|
|
- ByteBuffer securityTokens =
|
|
|
- ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
|
|
- clc.setTokens(securityTokens);
|
|
|
- sub.setAMContainerSpec(clc);
|
|
|
+ sub.setAMContainerSpec(prepareContainerLaunchContext());
|
|
|
testCreateAppSubmittedRecovery(sub);
|
|
|
}
|
|
|
|
|
@@ -577,6 +696,7 @@ public class TestRMAppTransitions {
|
|
|
assertAppFinalStateNotSaved(application);
|
|
|
verifyApplicationFinished(RMAppState.KILLED);
|
|
|
verifyAppRemovedSchedulerEvent(RMAppState.KILLED);
|
|
|
+ verifyRMAppFieldsForFinalTransitions(application);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -594,6 +714,7 @@ public class TestRMAppTransitions {
|
|
|
assertFailed(application, rejectedText);
|
|
|
assertAppFinalStateSaved(application);
|
|
|
verifyApplicationFinished(RMAppState.FAILED);
|
|
|
+ verifyRMAppFieldsForFinalTransitions(application);
|
|
|
}
|
|
|
|
|
|
@Test (timeout = 30000)
|
|
@@ -611,6 +732,7 @@ public class TestRMAppTransitions {
|
|
|
assertFailed(application, rejectedText);
|
|
|
assertAppFinalStateSaved(application);
|
|
|
verifyApplicationFinished(RMAppState.FAILED);
|
|
|
+ verifyRMAppFieldsForFinalTransitions(application);
|
|
|
rmContext.getStateStore().removeApplication(application);
|
|
|
}
|
|
|
|
|
@@ -633,6 +755,7 @@ public class TestRMAppTransitions {
|
|
|
assertKilled(application);
|
|
|
verifyApplicationFinished(RMAppState.KILLED);
|
|
|
verifyAppRemovedSchedulerEvent(RMAppState.KILLED);
|
|
|
+ verifyRMAppFieldsForFinalTransitions(application);
|
|
|
}
|
|
|
|
|
|
@Test (timeout = 30000)
|
|
@@ -650,6 +773,7 @@ public class TestRMAppTransitions {
|
|
|
assertFailed(application, rejectedText);
|
|
|
assertAppFinalStateSaved(application);
|
|
|
verifyApplicationFinished(RMAppState.FAILED);
|
|
|
+ verifyRMAppFieldsForFinalTransitions(application);
|
|
|
}
|
|
|
|
|
|
@Test (timeout = 30000)
|
|
@@ -684,6 +808,7 @@ public class TestRMAppTransitions {
|
|
|
assertFailed(application, rejectedText);
|
|
|
assertAppFinalStateSaved(application);
|
|
|
verifyApplicationFinished(RMAppState.FAILED);
|
|
|
+ verifyRMAppFieldsForFinalTransitions(application);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -706,6 +831,7 @@ public class TestRMAppTransitions {
|
|
|
assertAppFinalStateSaved(application);
|
|
|
verifyApplicationFinished(RMAppState.KILLED);
|
|
|
verifyAppRemovedSchedulerEvent(RMAppState.KILLED);
|
|
|
+ verifyRMAppFieldsForFinalTransitions(application);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -769,8 +895,9 @@ public class TestRMAppTransitions {
|
|
|
assertAppFinalStateSaved(application);
|
|
|
verifyApplicationFinished(RMAppState.KILLED);
|
|
|
verifyAppRemovedSchedulerEvent(RMAppState.KILLED);
|
|
|
+ verifyRMAppFieldsForFinalTransitions(application);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Test
|
|
|
public void testAppAcceptedAttemptKilled() throws IOException,
|
|
|
InterruptedException {
|
|
@@ -816,6 +943,7 @@ public class TestRMAppTransitions {
|
|
|
assertKilled(application);
|
|
|
verifyApplicationFinished(RMAppState.KILLED);
|
|
|
verifyAppRemovedSchedulerEvent(RMAppState.KILLED);
|
|
|
+ verifyRMAppFieldsForFinalTransitions(application);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -873,6 +1001,7 @@ public class TestRMAppTransitions {
|
|
|
assertFailed(application, ".*Failing the application.*");
|
|
|
assertAppFinalStateSaved(application);
|
|
|
verifyApplicationFinished(RMAppState.FAILED);
|
|
|
+ verifyRMAppFieldsForFinalTransitions(application);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -914,6 +1043,7 @@ public class TestRMAppTransitions {
|
|
|
assertFinalAppStatus(FinalApplicationStatus.FAILED, application);
|
|
|
Assert.assertTrue("Finished app missing diagnostics", application
|
|
|
.getDiagnostics().indexOf(diagMsg) != -1);
|
|
|
+ verifyRMAppFieldsForFinalTransitions(application);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -933,6 +1063,7 @@ public class TestRMAppTransitions {
|
|
|
Assert.assertEquals("application diagnostics is not correct",
|
|
|
"", diag.toString());
|
|
|
verifyApplicationFinished(RMAppState.FINISHED);
|
|
|
+ verifyRMAppFieldsForFinalTransitions(application);
|
|
|
}
|
|
|
|
|
|
@Test (timeout = 30000)
|
|
@@ -962,6 +1093,7 @@ public class TestRMAppTransitions {
|
|
|
|
|
|
assertTimesAtFinish(application);
|
|
|
assertAppState(RMAppState.FAILED, application);
|
|
|
+ verifyRMAppFieldsForFinalTransitions(application);
|
|
|
}
|
|
|
|
|
|
@Test (timeout = 30000)
|
|
@@ -1016,6 +1148,7 @@ public class TestRMAppTransitions {
|
|
|
|
|
|
assertTimesAtFinish(application);
|
|
|
assertAppState(RMAppState.KILLED, application);
|
|
|
+ verifyRMAppFieldsForFinalTransitions(application);
|
|
|
}
|
|
|
|
|
|
@Test(timeout = 30000)
|
|
@@ -1061,11 +1194,12 @@ public class TestRMAppTransitions {
|
|
|
RMAppState finalState = appState.getState();
|
|
|
Assert.assertEquals("Application is not in finalState.", finalState,
|
|
|
application.getState());
|
|
|
+ verifyRMAppFieldsForFinalTransitions(application);
|
|
|
}
|
|
|
|
|
|
public void createRMStateForApplications(
|
|
|
Map<ApplicationId, ApplicationStateData> applicationState,
|
|
|
- RMAppState rmAppState) {
|
|
|
+ RMAppState rmAppState) throws IOException {
|
|
|
RMApp app = createNewTestApp(null);
|
|
|
ApplicationStateData appState =
|
|
|
ApplicationStateData.newInstance(app.getSubmitTime(), app.getStartTime(),
|
|
@@ -1075,7 +1209,7 @@ public class TestRMAppTransitions {
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
- public void testGetAppReport() {
|
|
|
+ public void testGetAppReport() throws IOException {
|
|
|
RMApp app = createNewTestApp(null);
|
|
|
assertAppState(RMAppState.NEW, app);
|
|
|
ApplicationReport report = app.createAndGetApplicationReport(null, true);
|
|
@@ -1109,4 +1243,41 @@ public class TestRMAppTransitions {
|
|
|
Assert.assertEquals(finalState, appRemovedEvent.getFinalState());
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private void verifyRMAppFieldsForNonFinalTransitions(RMApp application)
|
|
|
+ throws IOException {
|
|
|
+ assertEquals(Arrays.asList("/bin/sleep 5"),
|
|
|
+ application.getApplicationSubmissionContext().
|
|
|
+ getAMContainerSpec().getCommands());
|
|
|
+ assertEquals(getLocalResources(),
|
|
|
+ application.getApplicationSubmissionContext().
|
|
|
+ getAMContainerSpec().getLocalResources());
|
|
|
+ if(UserGroupInformation.isSecurityEnabled()) {
|
|
|
+ assertEquals(getTokens(),
|
|
|
+ application.getApplicationSubmissionContext().
|
|
|
+ getAMContainerSpec().getTokens());
|
|
|
+ assertEquals(getTokensConf(),
|
|
|
+ application.getApplicationSubmissionContext().
|
|
|
+ getAMContainerSpec().getTokensConf());
|
|
|
+ }
|
|
|
+ assertEquals(getEnvironment(),
|
|
|
+ application.getApplicationSubmissionContext().
|
|
|
+ getAMContainerSpec().getEnvironment());
|
|
|
+ assertEquals(getContainerRetryContext(),
|
|
|
+ application.getApplicationSubmissionContext().
|
|
|
+ getAMContainerSpec().getContainerRetryContext());
|
|
|
+ assertEquals(getServiceData(),
|
|
|
+ application.getApplicationSubmissionContext().
|
|
|
+ getAMContainerSpec().getServiceData());
|
|
|
+ assertEquals(getLogAggregationContext(),
|
|
|
+ application.getApplicationSubmissionContext().
|
|
|
+ getLogAggregationContext());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void verifyRMAppFieldsForFinalTransitions(RMApp application) {
|
|
|
+ assertEquals(null, application.getApplicationSubmissionContext().
|
|
|
+ getAMContainerSpec());
|
|
|
+ assertEquals(null, application.getApplicationSubmissionContext().
|
|
|
+ getLogAggregationContext());
|
|
|
+ }
|
|
|
}
|