|
@@ -23,6 +23,7 @@ import static org.mockito.Matchers.anyBoolean;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.verify;
|
|
|
import static org.mockito.Mockito.when;
|
|
|
+import static org.mockito.Mockito.times;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
@@ -47,6 +48,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.yarn.YarnException;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
@@ -89,28 +91,94 @@ import org.junit.Test;
|
|
|
handler.handle(new JobFinishEvent(jobid));
|
|
|
verify(fs).delete(stagingJobPath, true);
|
|
|
}
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testDeletionofStagingOnKill() throws IOException {
|
|
|
+ conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
|
|
|
+ conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 4);
|
|
|
+ fs = mock(FileSystem.class);
|
|
|
+ when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
|
|
|
+ ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
|
|
|
+ ApplicationAttemptId.class);
|
|
|
+ attemptId.setAttemptId(0);
|
|
|
+ ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
|
|
|
+ appId.setClusterTimestamp(System.currentTimeMillis());
|
|
|
+ appId.setId(0);
|
|
|
+ attemptId.setApplicationId(appId);
|
|
|
+ JobId jobid = recordFactory.newRecordInstance(JobId.class);
|
|
|
+ jobid.setAppId(appId);
|
|
|
+ ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
|
|
|
+ MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc);
|
|
|
+ appMaster.init(conf);
|
|
|
+ //simulate the process being killed
|
|
|
+ MRAppMaster.MRAppMasterShutdownHook hook =
|
|
|
+ new MRAppMaster.MRAppMasterShutdownHook(appMaster);
|
|
|
+ hook.run();
|
|
|
+ verify(fs, times(0)).delete(stagingJobPath, true);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testDeletionofStagingOnKillLastTry() throws IOException {
|
|
|
+ conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
|
|
|
+ conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 1);
|
|
|
+ fs = mock(FileSystem.class);
|
|
|
+ when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
|
|
|
+ ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
|
|
|
+ ApplicationAttemptId.class);
|
|
|
+ attemptId.setAttemptId(1);
|
|
|
+ ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
|
|
|
+ appId.setClusterTimestamp(System.currentTimeMillis());
|
|
|
+ appId.setId(0);
|
|
|
+ attemptId.setApplicationId(appId);
|
|
|
+ JobId jobid = recordFactory.newRecordInstance(JobId.class);
|
|
|
+ jobid.setAppId(appId);
|
|
|
+ ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
|
|
|
+ MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc);
|
|
|
+ appMaster.init(conf);
|
|
|
+ //simulate the process being killed
|
|
|
+ MRAppMaster.MRAppMasterShutdownHook hook =
|
|
|
+ new MRAppMaster.MRAppMasterShutdownHook(appMaster);
|
|
|
+ hook.run();
|
|
|
+ verify(fs).delete(stagingJobPath, true);
|
|
|
+ }
|
|
|
|
|
|
private class TestMRApp extends MRAppMaster {
|
|
|
+ ContainerAllocator allocator;
|
|
|
|
|
|
- public TestMRApp(ApplicationAttemptId applicationAttemptId) {
|
|
|
- super(applicationAttemptId, BuilderUtils.newContainerId(
|
|
|
- applicationAttemptId, 1), "testhost", 2222, 3333, System
|
|
|
- .currentTimeMillis());
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- protected FileSystem getFileSystem(Configuration conf) {
|
|
|
- return fs;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- protected void sysexit() {
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public Configuration getConfig() {
|
|
|
- return conf;
|
|
|
- }
|
|
|
+ public TestMRApp(ApplicationAttemptId applicationAttemptId,
|
|
|
+ ContainerAllocator allocator) {
|
|
|
+ super(applicationAttemptId, BuilderUtils.newContainerId(
|
|
|
+ applicationAttemptId, 1), "testhost", 2222, 3333, System
|
|
|
+ .currentTimeMillis());
|
|
|
+ this.allocator = allocator;
|
|
|
+ }
|
|
|
+
|
|
|
+ public TestMRApp(ApplicationAttemptId applicationAttemptId) {
|
|
|
+ this(applicationAttemptId, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected FileSystem getFileSystem(Configuration conf) {
|
|
|
+ return fs;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected ContainerAllocator createContainerAllocator(
|
|
|
+ final ClientService clientService, final AppContext context) {
|
|
|
+ if(allocator == null) {
|
|
|
+ return super.createContainerAllocator(clientService, context);
|
|
|
+ }
|
|
|
+ return allocator;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void sysexit() {
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Configuration getConfig() {
|
|
|
+ return conf;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private final class MRAppTestCleanup extends MRApp {
|