|
@@ -21,6 +21,8 @@ import java.io.File;
|
|
import java.io.FileNotFoundException;
|
|
import java.io.FileNotFoundException;
|
|
import java.io.FileOutputStream;
|
|
import java.io.FileOutputStream;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
|
|
+import java.util.LinkedHashMap;
|
|
|
|
+import java.util.TreeMap;
|
|
import java.util.jar.JarOutputStream;
|
|
import java.util.jar.JarOutputStream;
|
|
import java.util.zip.ZipEntry;
|
|
import java.util.zip.ZipEntry;
|
|
|
|
|
|
@@ -36,6 +38,7 @@ import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
|
|
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
|
|
import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
|
|
import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
|
|
import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
|
|
import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
|
|
|
|
+import org.apache.hadoop.mapred.TaskTracker.RunningJob;
|
|
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
|
|
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
|
|
import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
|
|
import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue;
|
|
import org.apache.hadoop.mapreduce.JobContext;
|
|
import org.apache.hadoop.mapreduce.JobContext;
|
|
@@ -75,6 +78,8 @@ public class TestTaskTrackerLocalization extends TestCase {
|
|
protected File[] attemptLogFiles;
|
|
protected File[] attemptLogFiles;
|
|
protected JobConf localizedTaskConf;
|
|
protected JobConf localizedTaskConf;
|
|
private TaskInProgress tip;
|
|
private TaskInProgress tip;
|
|
|
|
+ private JobConf jobConf;
|
|
|
|
+ private File jobConfFile;
|
|
|
|
|
|
/**
|
|
/**
|
|
* Dummy method in this base class. Only derived classes will define this
|
|
* Dummy method in this base class. Only derived classes will define this
|
|
@@ -113,12 +118,14 @@ public class TestTaskTrackerLocalization extends TestCase {
|
|
trackerFConf.setStrings("mapred.local.dir", localDirs);
|
|
trackerFConf.setStrings("mapred.local.dir", localDirs);
|
|
|
|
|
|
// Create the job configuration file. Same as trackerConf in this test.
|
|
// Create the job configuration file. Same as trackerConf in this test.
|
|
- JobConf jobConf = new JobConf(trackerFConf);
|
|
|
|
|
|
+ jobConf = new JobConf(trackerFConf);
|
|
// Set job view ACLs in conf sothat validation of contents of jobACLsFile
|
|
// Set job view ACLs in conf sothat validation of contents of jobACLsFile
|
|
// can be done against this value. Have both users and groups
|
|
// can be done against this value. Have both users and groups
|
|
String jobViewACLs = "user1,user2, group1,group2";
|
|
String jobViewACLs = "user1,user2, group1,group2";
|
|
jobConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACLs);
|
|
jobConf.set(JobContext.JOB_ACL_VIEW_JOB, jobViewACLs);
|
|
jobConf.setInt("mapred.userlog.retain.hours", 0);
|
|
jobConf.setInt("mapred.userlog.retain.hours", 0);
|
|
|
|
+ jobConf.setUser(getJobOwner().getShortUserName());
|
|
|
|
+
|
|
String jtIdentifier = "200907202331";
|
|
String jtIdentifier = "200907202331";
|
|
jobId = new JobID(jtIdentifier, 1);
|
|
jobId = new JobID(jtIdentifier, 1);
|
|
|
|
|
|
@@ -127,49 +134,69 @@ public class TestTaskTrackerLocalization extends TestCase {
|
|
uploadJobJar(jobConf);
|
|
uploadJobJar(jobConf);
|
|
|
|
|
|
// JobClient uploads the jobConf to the file system.
|
|
// JobClient uploads the jobConf to the file system.
|
|
- File jobConfFile = uploadJobConf(jobConf);
|
|
|
|
|
|
+ jobConfFile = uploadJobConf(jobConf);
|
|
|
|
|
|
// create jobTokens file
|
|
// create jobTokens file
|
|
uploadJobTokensFile();
|
|
uploadJobTokensFile();
|
|
|
|
+
|
|
|
|
+ taskTrackerUGI = UserGroupInformation.getCurrentUser();
|
|
|
|
+ startTracker();
|
|
|
|
+
|
|
|
|
+ // Set up the task to be localized
|
|
|
|
+ taskId =
|
|
|
|
+ new TaskAttemptID(jtIdentifier, jobId.getId(), true, 1, 0);
|
|
|
|
+ createTask();
|
|
|
|
+ // mimic register task
|
|
|
|
+ // create the tip
|
|
|
|
+ tip = tracker.new TaskInProgress(task, trackerFConf);
|
|
|
|
+ }
|
|
|
|
|
|
|
|
+ private void startTracker() throws IOException {
|
|
// Set up the TaskTracker
|
|
// Set up the TaskTracker
|
|
tracker = new TaskTracker();
|
|
tracker = new TaskTracker();
|
|
tracker.setConf(trackerFConf);
|
|
tracker.setConf(trackerFConf);
|
|
- tracker.setIndexCache(new IndexCache(trackerFConf));
|
|
|
|
tracker.setUserLogManager(new UtilsForTests.InLineUserLogManager(
|
|
tracker.setUserLogManager(new UtilsForTests.InLineUserLogManager(
|
|
trackerFConf));
|
|
trackerFConf));
|
|
|
|
+ initializeTracker();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void initializeTracker() throws IOException {
|
|
|
|
+ tracker.setIndexCache(new IndexCache(trackerFConf));
|
|
tracker.setTaskMemoryManagerEnabledFlag();
|
|
tracker.setTaskMemoryManagerEnabledFlag();
|
|
|
|
|
|
// for test case system FS is the local FS
|
|
// for test case system FS is the local FS
|
|
-
|
|
|
|
tracker.systemFS = FileSystem.getLocal(trackerFConf);
|
|
tracker.systemFS = FileSystem.getLocal(trackerFConf);
|
|
tracker.systemDirectory = new Path(TEST_ROOT_DIR.getAbsolutePath());
|
|
tracker.systemDirectory = new Path(TEST_ROOT_DIR.getAbsolutePath());
|
|
tracker.setLocalFileSystem(tracker.systemFS);
|
|
tracker.setLocalFileSystem(tracker.systemFS);
|
|
|
|
|
|
- taskTrackerUGI = UserGroupInformation.getCurrentUser();
|
|
|
|
-
|
|
|
|
- // Set up the task to be localized
|
|
|
|
- taskId =
|
|
|
|
- new TaskAttemptID(jtIdentifier, jobId.getId(), true, 1, 0);
|
|
|
|
- task =
|
|
|
|
- new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, 1);
|
|
|
|
- task.setConf(jobConf); // Set conf. Set user name in particular.
|
|
|
|
- task.setUser(UserGroupInformation.getCurrentUser().getUserName());
|
|
|
|
-
|
|
|
|
|
|
+ tracker.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
|
|
|
|
+ tracker.runningJobs = new TreeMap<JobID, RunningJob>();
|
|
|
|
+ trackerFConf.deleteLocalFiles(TaskTracker.SUBDIR);
|
|
|
|
|
|
- taskController = new DefaultTaskController();
|
|
|
|
|
|
+ // setup task controller
|
|
|
|
+ taskController = getTaskController();
|
|
taskController.setConf(trackerFConf);
|
|
taskController.setConf(trackerFConf);
|
|
taskController.setup();
|
|
taskController.setup();
|
|
-
|
|
|
|
tracker.setTaskController(taskController);
|
|
tracker.setTaskController(taskController);
|
|
tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
|
|
tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs,
|
|
taskController));
|
|
taskController));
|
|
|
|
+ }
|
|
|
|
|
|
- // mimic register task
|
|
|
|
- // create the tip
|
|
|
|
- tip = tracker.new TaskInProgress(task, trackerFConf);
|
|
|
|
|
|
+ protected TaskController getTaskController() {
|
|
|
|
+ return new DefaultTaskController();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void createTask()
|
|
|
|
+ throws IOException {
|
|
|
|
+ task = new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, 1);
|
|
|
|
+ task.setConf(jobConf); // Set conf. Set user name in particular.
|
|
|
|
+ task.setUser(jobConf.getUser());
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ protected UserGroupInformation getJobOwner() throws IOException {
|
|
|
|
+ return UserGroupInformation.getCurrentUser();
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* @param jobConf
|
|
* @param jobConf
|
|
* @throws IOException
|
|
* @throws IOException
|
|
@@ -454,63 +481,7 @@ public class TestTaskTrackerLocalization extends TestCase {
|
|
}
|
|
}
|
|
TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
|
|
TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
|
|
localizedJobConf = rjob.getJobConf();
|
|
localizedJobConf = rjob.getJobConf();
|
|
-
|
|
|
|
- tip.setJobConf(localizedJobConf);
|
|
|
|
-
|
|
|
|
- // ////////// The central method being tested
|
|
|
|
- tip.localizeTask(task);
|
|
|
|
- // //////////
|
|
|
|
-
|
|
|
|
- // check the functionality of localizeTask
|
|
|
|
- for (String dir : trackerFConf.getStrings("mapred.local.dir")) {
|
|
|
|
- File attemptDir =
|
|
|
|
- new File(dir, TaskTracker.getLocalTaskDir(task.getUser(), jobId
|
|
|
|
- .toString(), taskId.toString()));
|
|
|
|
- assertTrue("attempt-dir " + attemptDir + " in localDir " + dir
|
|
|
|
- + " is not created!!", attemptDir.exists());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- attemptWorkDir =
|
|
|
|
- lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
|
|
|
|
- task.getUser(), task.getJobID().toString(), task.getTaskID()
|
|
|
|
- .toString(), task.isTaskCleanupTask()), trackerFConf);
|
|
|
|
- assertTrue("atttempt work dir for " + taskId.toString()
|
|
|
|
- + " is not created in any of the configured dirs!!",
|
|
|
|
- attemptWorkDir != null);
|
|
|
|
-
|
|
|
|
- TaskRunner runner = task.createRunner(tracker, tip);
|
|
|
|
-
|
|
|
|
- // /////// Few more methods being tested
|
|
|
|
- runner.setupChildTaskConfiguration(lDirAlloc);
|
|
|
|
- TaskRunner.createChildTmpDir(new File(attemptWorkDir.toUri().getPath()),
|
|
|
|
- localizedJobConf);
|
|
|
|
- attemptLogFiles = runner.prepareLogFiles(task.getTaskID());
|
|
|
|
-
|
|
|
|
- // Make sure the task-conf file is created
|
|
|
|
- Path localTaskFile =
|
|
|
|
- lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
|
|
|
|
- .getUser(), task.getJobID().toString(), task.getTaskID()
|
|
|
|
- .toString(), task.isTaskCleanupTask()), trackerFConf);
|
|
|
|
- assertTrue("Task conf file " + localTaskFile.toString()
|
|
|
|
- + " is not created!!", new File(localTaskFile.toUri().getPath())
|
|
|
|
- .exists());
|
|
|
|
-
|
|
|
|
- // /////// One more method being tested. This happens in child space.
|
|
|
|
- localizedTaskConf = new JobConf(localTaskFile);
|
|
|
|
- TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
|
|
|
|
- // ///////
|
|
|
|
-
|
|
|
|
- // Initialize task via TaskController
|
|
|
|
- TaskControllerContext taskContext =
|
|
|
|
- new TaskController.TaskControllerContext();
|
|
|
|
- taskContext.env =
|
|
|
|
- new JvmEnv(null, null, null, null, -1, new File(localizedJobConf
|
|
|
|
- .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf);
|
|
|
|
- taskContext.task = task;
|
|
|
|
- // /////////// The method being tested
|
|
|
|
- taskController.initializeTask(taskContext);
|
|
|
|
- // ///////////
|
|
|
|
-
|
|
|
|
|
|
+ initializeTask();
|
|
checkTaskLocalization();
|
|
checkTaskLocalization();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -521,13 +492,14 @@ public class TestTaskTrackerLocalization extends TestCase {
|
|
.getStrings("mapred.local.dir")) {
|
|
.getStrings("mapred.local.dir")) {
|
|
assertTrue("Local dir " + childMapredLocalDir + " is not sandboxed !!",
|
|
assertTrue("Local dir " + childMapredLocalDir + " is not sandboxed !!",
|
|
childMapredLocalDir.endsWith(TaskTracker.getLocalTaskDir(task
|
|
childMapredLocalDir.endsWith(TaskTracker.getLocalTaskDir(task
|
|
- .getUser(), jobId.toString(), taskId.toString(), false)));
|
|
|
|
|
|
+ .getUser(), jobId.toString(), taskId.toString(),
|
|
|
|
+ task.isTaskCleanupTask())));
|
|
}
|
|
}
|
|
|
|
|
|
// Make sure task task.getJobFile is changed and pointed correctly.
|
|
// Make sure task task.getJobFile is changed and pointed correctly.
|
|
assertTrue(task.getJobFile().endsWith(
|
|
assertTrue(task.getJobFile().endsWith(
|
|
TaskTracker.getTaskConfFile(task.getUser(), jobId.toString(), taskId
|
|
TaskTracker.getTaskConfFile(task.getUser(), jobId.toString(), taskId
|
|
- .toString(), false)));
|
|
|
|
|
|
+ .toString(), task.isTaskCleanupTask())));
|
|
|
|
|
|
// Make sure that the tmp directories are created
|
|
// Make sure that the tmp directories are created
|
|
assertTrue("tmp dir is not created in workDir "
|
|
assertTrue("tmp dir is not created in workDir "
|
|
@@ -684,36 +656,54 @@ public class TestTaskTrackerLocalization extends TestCase {
|
|
testTaskCleanup(false, true);// no needCleanup; jvmReuse
|
|
testTaskCleanup(false, true);// no needCleanup; jvmReuse
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * Validates if task cleanup is done properly
|
|
|
|
- */
|
|
|
|
- private void testTaskCleanup(boolean needCleanup, boolean jvmReuse)
|
|
|
|
- throws Exception {
|
|
|
|
- // Localize job and localize task.
|
|
|
|
- TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
|
|
|
|
- TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
|
|
|
|
- localizedJobConf = rjob.getJobConf();
|
|
|
|
- if (jvmReuse) {
|
|
|
|
- localizedJobConf.setNumTasksToExecutePerJvm(2);
|
|
|
|
- }
|
|
|
|
|
|
+ private void initializeTask() throws IOException {
|
|
tip.setJobConf(localizedJobConf);
|
|
tip.setJobConf(localizedJobConf);
|
|
|
|
+
|
|
|
|
+ // ////////// The central method being tested
|
|
tip.localizeTask(task);
|
|
tip.localizeTask(task);
|
|
- Path workDir =
|
|
|
|
|
|
+ // //////////
|
|
|
|
+
|
|
|
|
+ // check the functionality of localizeTask
|
|
|
|
+ for (String dir : trackerFConf.getStrings("mapred.local.dir")) {
|
|
|
|
+ File attemptDir =
|
|
|
|
+ new File(dir, TaskTracker.getLocalTaskDir(task.getUser(), jobId
|
|
|
|
+ .toString(), taskId.toString(), task.isTaskCleanupTask()));
|
|
|
|
+ assertTrue("attempt-dir " + attemptDir + " in localDir " + dir
|
|
|
|
+ + " is not created!!", attemptDir.exists());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ attemptWorkDir =
|
|
lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
|
|
lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
|
|
task.getUser(), task.getJobID().toString(), task.getTaskID()
|
|
task.getUser(), task.getJobID().toString(), task.getTaskID()
|
|
.toString(), task.isTaskCleanupTask()), trackerFConf);
|
|
.toString(), task.isTaskCleanupTask()), trackerFConf);
|
|
|
|
+ assertTrue("atttempt work dir for " + taskId.toString()
|
|
|
|
+ + " is not created in any of the configured dirs!!",
|
|
|
|
+ attemptWorkDir != null);
|
|
|
|
+
|
|
TaskRunner runner = task.createRunner(tracker, tip);
|
|
TaskRunner runner = task.createRunner(tracker, tip);
|
|
tip.setTaskRunner(runner);
|
|
tip.setTaskRunner(runner);
|
|
|
|
+
|
|
|
|
+ // /////// Few more methods being tested
|
|
runner.setupChildTaskConfiguration(lDirAlloc);
|
|
runner.setupChildTaskConfiguration(lDirAlloc);
|
|
- TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()),
|
|
|
|
|
|
+ TaskRunner.createChildTmpDir(new File(attemptWorkDir.toUri().getPath()),
|
|
localizedJobConf);
|
|
localizedJobConf);
|
|
- runner.prepareLogFiles(task.getTaskID());
|
|
|
|
|
|
+ attemptLogFiles = runner.prepareLogFiles(task.getTaskID());
|
|
|
|
+
|
|
|
|
+ // Make sure the task-conf file is created
|
|
Path localTaskFile =
|
|
Path localTaskFile =
|
|
lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
|
|
lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
|
|
.getUser(), task.getJobID().toString(), task.getTaskID()
|
|
.getUser(), task.getJobID().toString(), task.getTaskID()
|
|
.toString(), task.isTaskCleanupTask()), trackerFConf);
|
|
.toString(), task.isTaskCleanupTask()), trackerFConf);
|
|
- JobConf localizedTaskConf = new JobConf(localTaskFile);
|
|
|
|
|
|
+ assertTrue("Task conf file " + localTaskFile.toString()
|
|
|
|
+ + " is not created!!", new File(localTaskFile.toUri().getPath())
|
|
|
|
+ .exists());
|
|
|
|
+
|
|
|
|
+ // /////// One more method being tested. This happens in child space.
|
|
|
|
+ localizedTaskConf = new JobConf(localTaskFile);
|
|
TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
|
|
TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
|
|
|
|
+ // ///////
|
|
|
|
+
|
|
|
|
+ // Initialize task via TaskController
|
|
TaskControllerContext taskContext =
|
|
TaskControllerContext taskContext =
|
|
new TaskController.TaskControllerContext();
|
|
new TaskController.TaskControllerContext();
|
|
taskContext.env =
|
|
taskContext.env =
|
|
@@ -722,6 +712,21 @@ public class TestTaskTrackerLocalization extends TestCase {
|
|
taskContext.task = task;
|
|
taskContext.task = task;
|
|
// /////////// The method being tested
|
|
// /////////// The method being tested
|
|
taskController.initializeTask(taskContext);
|
|
taskController.initializeTask(taskContext);
|
|
|
|
+ // ///////////
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Validates if task cleanup is done properly
|
|
|
|
+ */
|
|
|
|
+ private void testTaskCleanup(boolean needCleanup, boolean jvmReuse)
|
|
|
|
+ throws Exception {
|
|
|
|
+ // Localize job and localize task.
|
|
|
|
+ TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
|
|
|
|
+ localizedJobConf = rjob.getJobConf();
|
|
|
|
+ if (jvmReuse) {
|
|
|
|
+ localizedJobConf.setNumTasksToExecutePerJvm(2);
|
|
|
|
+ }
|
|
|
|
+ initializeTask();
|
|
|
|
|
|
// TODO: Let the task run and create files.
|
|
// TODO: Let the task run and create files.
|
|
|
|
|
|
@@ -792,7 +797,6 @@ public class TestTaskTrackerLocalization extends TestCase {
|
|
}
|
|
}
|
|
|
|
|
|
// Initialize task dirs
|
|
// Initialize task dirs
|
|
- TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
|
|
|
|
tip.setJobConf(localizedJobConf);
|
|
tip.setJobConf(localizedJobConf);
|
|
tip.localizeTask(task);
|
|
tip.localizeTask(task);
|
|
|
|
|
|
@@ -835,4 +839,87 @@ public class TestTaskTrackerLocalization extends TestCase {
|
|
assertFalse("Job " + task.getJobID() + " work dir exists after cleanup",
|
|
assertFalse("Job " + task.getJobID() + " work dir exists after cleanup",
|
|
jWorkDirExists);
|
|
jWorkDirExists);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Tests TaskTracker restart after the localization.
|
|
|
|
+ *
|
|
|
|
+ * This tests the following steps:
|
|
|
|
+ *
|
|
|
|
+ * Localize Job, initialize a task.
|
|
|
|
+ * Then restart the Tracker.
|
|
|
|
+ * launch a cleanup attempt for the task.
|
|
|
|
+ *
|
|
|
|
+ * @throws IOException
|
|
|
|
+ * @throws InterruptedException
|
|
|
|
+ */
|
|
|
|
+ public void testTrackerRestart() throws IOException, InterruptedException {
|
|
|
|
+ if (!canRun()) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Localize job and localize task.
|
|
|
|
+ TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
|
|
|
|
+ localizedJobConf = rjob.getJobConf();
|
|
|
|
+ initializeTask();
|
|
|
|
+
|
|
|
|
+ // imitate tracker restart
|
|
|
|
+ startTracker();
|
|
|
|
+
|
|
|
|
+ // create a task cleanup attempt
|
|
|
|
+ createTask();
|
|
|
|
+ task.setTaskCleanupTask();
|
|
|
|
+ // register task
|
|
|
|
+ tip = tracker.new TaskInProgress(task, trackerFConf);
|
|
|
|
+
|
|
|
|
+ // localize the job again.
|
|
|
|
+ rjob = tracker.localizeJob(tip);
|
|
|
|
+ localizedJobConf = rjob.getJobConf();
|
|
|
|
+ checkJobLocalization();
|
|
|
|
+
|
|
|
|
+ // localize task cleanup attempt
|
|
|
|
+ initializeTask();
|
|
|
|
+ checkTaskLocalization();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Tests TaskTracker re-init after the localization.
|
|
|
|
+ *
|
|
|
|
+ * This tests the following steps:
|
|
|
|
+ *
|
|
|
|
+ * Localize Job, initialize a task.
|
|
|
|
+ * Then reinit the Tracker.
|
|
|
|
+ * launch a cleanup attempt for the task.
|
|
|
|
+ *
|
|
|
|
+ * @throws IOException
|
|
|
|
+ * @throws InterruptedException
|
|
|
|
+ */
|
|
|
|
+ public void testTrackerReinit() throws IOException, InterruptedException {
|
|
|
|
+ if (!canRun()) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Localize job and localize task.
|
|
|
|
+ TaskTracker.RunningJob rjob = tracker.localizeJob(tip);
|
|
|
|
+ localizedJobConf = rjob.getJobConf();
|
|
|
|
+ initializeTask();
|
|
|
|
+
|
|
|
|
+ // imitate tracker reinit
|
|
|
|
+ initializeTracker();
|
|
|
|
+
|
|
|
|
+ // create a task cleanup attempt
|
|
|
|
+ createTask();
|
|
|
|
+ task.setTaskCleanupTask();
|
|
|
|
+ // register task
|
|
|
|
+ tip = tracker.new TaskInProgress(task, trackerFConf);
|
|
|
|
+
|
|
|
|
+ // localize the job again.
|
|
|
|
+ rjob = tracker.localizeJob(tip);
|
|
|
|
+ localizedJobConf = rjob.getJobConf();
|
|
|
|
+ checkJobLocalization();
|
|
|
|
+
|
|
|
|
+ // localize task cleanup attempt
|
|
|
|
+ initializeTask();
|
|
|
|
+ checkTaskLocalization();
|
|
|
|
+ }
|
|
|
|
+
|
|
}
|
|
}
|