|
@@ -20,16 +20,10 @@ package org.apache.hadoop.mapred;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.*;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
-import org.apache.hadoop.io.BytesWritable;
|
|
|
-import org.apache.hadoop.io.Writable;
|
|
|
-import org.apache.hadoop.io.WritableComparable;
|
|
|
-import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
|
|
|
-import org.apache.hadoop.mapred.ThreadedMapBenchmark.RandomInputFormat;
|
|
|
-import org.apache.hadoop.mapred.lib.IdentityReducer;
|
|
|
+import org.apache.hadoop.mapred.UtilsForTests;
|
|
|
|
|
|
import junit.framework.TestCase;
|
|
|
import java.io.*;
|
|
|
-import java.util.Iterator;
|
|
|
|
|
|
/**
|
|
|
* TestJobTrackerRestart checks if the jobtracker can restart. JobTracker
|
|
@@ -37,26 +31,14 @@ import java.util.Iterator;
|
|
|
* recover previosuly submitted jobs.
|
|
|
*/
|
|
|
public class TestJobTrackerRestart extends TestCase {
|
|
|
- final static Object waitLock = new Object();
|
|
|
- final Path testDir = new Path("/jt-restart-testing");
|
|
|
+ final Path testDir =
|
|
|
+ new Path(System.getProperty("test.build.data","/tmp"),
|
|
|
+ "jt-restart-testing");
|
|
|
final Path inDir = new Path(testDir, "input");
|
|
|
final Path shareDir = new Path(testDir, "share");
|
|
|
final Path outputDir = new Path(testDir, "output");
|
|
|
private static int numJobsSubmitted = 0;
|
|
|
|
|
|
- /**
|
|
|
- * Gets job status from the jobtracker given the jobclient and the job id
|
|
|
- */
|
|
|
- static JobStatus getJobStatus(JobClient jc, JobID id) throws IOException {
|
|
|
- JobStatus[] statuses = jc.getAllJobs();
|
|
|
- for (JobStatus jobStatus : statuses) {
|
|
|
- if (jobStatus.getJobID().equals(id)) {
|
|
|
- return jobStatus;
|
|
|
- }
|
|
|
- }
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Return the job conf configured with the priorities and mappers as passed.
|
|
|
* @param conf The default conf
|
|
@@ -70,7 +52,7 @@ public class TestJobTrackerRestart extends TestCase {
|
|
|
* @return a array of jobconfs configured as needed
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- static JobConf[] getJobs(JobConf conf, JobPriority[] priorities,
|
|
|
+ private static JobConf[] getJobs(JobConf conf, JobPriority[] priorities,
|
|
|
int[] numMaps, int[] numReds,
|
|
|
Path outputDir, Path inDir,
|
|
|
String mapSignalFile, String reduceSignalFile)
|
|
@@ -79,81 +61,18 @@ public class TestJobTrackerRestart extends TestCase {
|
|
|
for (int i = 0; i < jobs.length; ++i) {
|
|
|
jobs[i] = new JobConf(conf);
|
|
|
Path newOutputDir = outputDir.suffix(String.valueOf(numJobsSubmitted++));
|
|
|
- configureWaitingJobConf(jobs[i], inDir, newOutputDir,
|
|
|
- numMaps[i], numReds[i], "jt-restart-test-job",
|
|
|
- mapSignalFile, reduceSignalFile);
|
|
|
+ UtilsForTests.configureWaitingJobConf(jobs[i], inDir, newOutputDir,
|
|
|
+ numMaps[i], numReds[i], "jt-restart-test-job", mapSignalFile,
|
|
|
+ reduceSignalFile);
|
|
|
jobs[i].setJobPriority(priorities[i]);
|
|
|
}
|
|
|
return jobs;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * A utility that waits for specified amount of time
|
|
|
- */
|
|
|
- static void waitFor(long duration) {
|
|
|
- try {
|
|
|
- synchronized (waitLock) {
|
|
|
- waitLock.wait(duration);
|
|
|
- }
|
|
|
- } catch (InterruptedException ie) {}
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Wait for the jobtracker to be RUNNING.
|
|
|
- */
|
|
|
- static void waitForJobTracker(JobClient jobClient) {
|
|
|
- while (true) {
|
|
|
- try {
|
|
|
- ClusterStatus status = jobClient.getClusterStatus();
|
|
|
- while (status.getJobTrackerState() != JobTracker.State.RUNNING) {
|
|
|
- waitFor(100);
|
|
|
- status = jobClient.getClusterStatus();
|
|
|
- }
|
|
|
- break; // means that the jt is ready
|
|
|
- } catch (IOException ioe) {}
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Signal the maps/reduces to start.
|
|
|
- */
|
|
|
- static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys,
|
|
|
- boolean isMap, String mapSignalFile,
|
|
|
- String reduceSignalFile)
|
|
|
- throws IOException {
|
|
|
- // signal the maps to complete
|
|
|
- TestRackAwareTaskPlacement.writeFile(dfs.getNameNode(), fileSys.getConf(),
|
|
|
- isMap
|
|
|
- ? new Path(mapSignalFile)
|
|
|
- : new Path(reduceSignalFile),
|
|
|
- (short)1);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Waits until all the jobs at the jobtracker complete.
|
|
|
- */
|
|
|
- static void waitTillDone(JobClient jobClient) throws IOException {
|
|
|
- // Wait for the last job to complete
|
|
|
- while (true) {
|
|
|
- boolean shouldWait = false;
|
|
|
- for (JobStatus jobStatuses : jobClient.getAllJobs()) {
|
|
|
- if (jobStatuses.getRunState() == JobStatus.RUNNING) {
|
|
|
- shouldWait = true;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- if (shouldWait) {
|
|
|
- waitFor(1000);
|
|
|
- } else {
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Clean up the signals.
|
|
|
*/
|
|
|
- static void cleanUp(FileSystem fileSys, Path dir) throws IOException {
|
|
|
+ private static void cleanUp(FileSystem fileSys, Path dir) throws IOException {
|
|
|
// Delete the map signal file
|
|
|
fileSys.delete(new Path(getMapSignalFile(dir)), false);
|
|
|
// Delete the reduce signal file
|
|
@@ -237,30 +156,30 @@ public class TestJobTrackerRestart extends TestCase {
|
|
|
}
|
|
|
|
|
|
// Make sure that the master job is 50% completed
|
|
|
- while (getJobStatus(jobClient, id).mapProgress() < 0.5f) {
|
|
|
- waitFor(100);
|
|
|
+ while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 0.5f) {
|
|
|
+ UtilsForTests.waitFor(100);
|
|
|
}
|
|
|
|
|
|
// Note the data that needs to be tested upon restart
|
|
|
- long jobStartTime = getJobStatus(jobClient, id).getStartTime();
|
|
|
+ long jobStartTime = UtilsForTests.getJobStatus(jobClient, id).getStartTime();
|
|
|
|
|
|
// Kill the jobtracker
|
|
|
mr.stopJobTracker();
|
|
|
|
|
|
// Signal the maps to complete
|
|
|
- signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir),
|
|
|
- getReduceSignalFile(shareDir));
|
|
|
+ UtilsForTests.signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir),
|
|
|
+ getReduceSignalFile(shareDir));
|
|
|
|
|
|
// Signal the reducers to complete
|
|
|
- signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir),
|
|
|
- getReduceSignalFile(shareDir));
|
|
|
+ UtilsForTests.signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir),
|
|
|
+ getReduceSignalFile(shareDir));
|
|
|
|
|
|
// Enable recovery on restart
|
|
|
mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
|
|
|
true);
|
|
|
|
|
|
// Wait for a minute before submitting a job
|
|
|
- waitFor(60 * 1000);
|
|
|
+ UtilsForTests.waitFor(60 * 1000);
|
|
|
|
|
|
// Restart the jobtracker
|
|
|
mr.startJobTracker();
|
|
@@ -268,11 +187,11 @@ public class TestJobTrackerRestart extends TestCase {
|
|
|
// Check if the jobs are still running
|
|
|
|
|
|
// Wait for the JT to be ready
|
|
|
- waitForJobTracker(jobClient);
|
|
|
+ UtilsForTests.waitForJobTracker(jobClient);
|
|
|
|
|
|
// Check if the job recovered
|
|
|
assertEquals("Restart failed as previously submitted job was missing",
|
|
|
- true, getJobStatus(jobClient, id) != null);
|
|
|
+ true, UtilsForTests.getJobStatus(jobClient, id) != null);
|
|
|
|
|
|
// check if the job's priority got changed
|
|
|
assertEquals("Restart failed as job's priority did not match",
|
|
@@ -280,7 +199,7 @@ public class TestJobTrackerRestart extends TestCase {
|
|
|
|
|
|
|
|
|
|
|
|
- waitTillDone(jobClient);
|
|
|
+ UtilsForTests.waitTillDone(jobClient);
|
|
|
|
|
|
// Check if the jobs are in order .. the order is 1->3->2
|
|
|
JobStatus[] newStatuses = jobClient.getAllJobs();
|
|
@@ -317,7 +236,8 @@ public class TestJobTrackerRestart extends TestCase {
|
|
|
|
|
|
// Check if the start time was recovered
|
|
|
assertTrue("Previously submitted job's start time has changed",
|
|
|
- getJobStatus(jobClient, id).getStartTime() == jobStartTime);
|
|
|
+ UtilsForTests.getJobStatus(jobClient, id).getStartTime()
|
|
|
+ == jobStartTime);
|
|
|
|
|
|
// Test history files
|
|
|
testJobHistoryFiles(id, jobs[masterJob]);
|
|
@@ -354,8 +274,8 @@ public class TestJobTrackerRestart extends TestCase {
|
|
|
JobID id = job.getID();
|
|
|
|
|
|
// make sure that the job is 50% completed
|
|
|
- while (getJobStatus(jobClient, id).mapProgress() < 0.5f) {
|
|
|
- waitFor(100);
|
|
|
+ while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 0.5f) {
|
|
|
+ UtilsForTests.waitFor(100);
|
|
|
}
|
|
|
|
|
|
mr.stopJobTracker();
|
|
@@ -365,22 +285,22 @@ public class TestJobTrackerRestart extends TestCase {
|
|
|
false);
|
|
|
|
|
|
// Wait for a minute before submitting a job
|
|
|
- waitFor(60 * 1000);
|
|
|
+ UtilsForTests.waitFor(60 * 1000);
|
|
|
|
|
|
mr.startJobTracker();
|
|
|
|
|
|
// Signal the tasks
|
|
|
- signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir),
|
|
|
- getReduceSignalFile(shareDir));
|
|
|
+ UtilsForTests.signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir),
|
|
|
+ getReduceSignalFile(shareDir));
|
|
|
|
|
|
// Wait for the JT to be ready
|
|
|
- waitForJobTracker(jobClient);
|
|
|
+ UtilsForTests.waitForJobTracker(jobClient);
|
|
|
|
|
|
- waitTillDone(jobClient);
|
|
|
+ UtilsForTests.waitTillDone(jobClient);
|
|
|
|
|
|
// The submitted job should not exist
|
|
|
assertTrue("Submitted job was detected with recovery disabled",
|
|
|
- getJobStatus(jobClient, id) == null);
|
|
|
+ UtilsForTests.getJobStatus(jobClient, id) == null);
|
|
|
}
|
|
|
|
|
|
/** Tests a job on jobtracker with restart-recovery turned on.
|
|
@@ -455,7 +375,7 @@ public class TestJobTrackerRestart extends TestCase {
|
|
|
|
|
|
// make sure that atleast on reducer is spawned
|
|
|
while (jobClient.getClusterStatus().getReduceTasks() == 0) {
|
|
|
- waitFor(100);
|
|
|
+ UtilsForTests.waitFor(100);
|
|
|
}
|
|
|
|
|
|
while(true) {
|
|
@@ -464,7 +384,7 @@ public class TestJobTrackerRestart extends TestCase {
|
|
|
mr.getMapTaskCompletionEventsUpdates(0, id, numMaps)
|
|
|
.getMapTaskCompletionEvents();
|
|
|
if (trackerEvents.length < numMaps / 2) {
|
|
|
- waitFor(1000);
|
|
|
+ UtilsForTests.waitFor(1000);
|
|
|
} else {
|
|
|
break;
|
|
|
}
|
|
@@ -483,22 +403,22 @@ public class TestJobTrackerRestart extends TestCase {
|
|
|
true);
|
|
|
|
|
|
// Wait for a minute before submitting a job
|
|
|
- waitFor(60 * 1000);
|
|
|
+ UtilsForTests.waitFor(60 * 1000);
|
|
|
|
|
|
mr.startJobTracker();
|
|
|
|
|
|
// Signal the map tasks
|
|
|
- signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir),
|
|
|
- getReduceSignalFile(shareDir));
|
|
|
+ UtilsForTests.signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir),
|
|
|
+ getReduceSignalFile(shareDir));
|
|
|
|
|
|
// Wait for the JT to be ready
|
|
|
- waitForJobTracker(jobClient);
|
|
|
+ UtilsForTests.waitForJobTracker(jobClient);
|
|
|
|
|
|
int numToMatch = mr.getNumEventsRecovered() / 2;
|
|
|
|
|
|
// make sure that the maps are completed
|
|
|
- while (getJobStatus(jobClient, id).mapProgress() < 1.0f) {
|
|
|
- waitFor(100);
|
|
|
+ while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 1.0f) {
|
|
|
+ UtilsForTests.waitFor(100);
|
|
|
}
|
|
|
|
|
|
// Get the new jobtrackers events
|
|
@@ -514,7 +434,7 @@ public class TestJobTrackerRestart extends TestCase {
|
|
|
mr.getMapTaskCompletionEventsUpdates(0, id, 2 * numMaps)
|
|
|
.getMapTaskCompletionEvents();
|
|
|
if (trackerEvents.length < jtEvents.length) {
|
|
|
- waitFor(1000);
|
|
|
+ UtilsForTests.waitFor(1000);
|
|
|
} else {
|
|
|
break;
|
|
|
}
|
|
@@ -528,10 +448,10 @@ public class TestJobTrackerRestart extends TestCase {
|
|
|
testTaskReports(prevSetupReports, afterSetupReports, 1);
|
|
|
|
|
|
// Signal the reduce tasks
|
|
|
- signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir),
|
|
|
- getReduceSignalFile(shareDir));
|
|
|
+ UtilsForTests.signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir),
|
|
|
+ getReduceSignalFile(shareDir));
|
|
|
|
|
|
- waitTillDone(jobClient);
|
|
|
+ UtilsForTests.waitTillDone(jobClient);
|
|
|
|
|
|
testTaskCompletionEvents(jtEvents, trackerEvents, true, 2 * numMaps);
|
|
|
|
|
@@ -653,9 +573,8 @@ public class TestJobTrackerRestart extends TestCase {
|
|
|
}
|
|
|
|
|
|
// Write the input file
|
|
|
- TestRackAwareTaskPlacement.writeFile(dfs.getNameNode(), conf,
|
|
|
- new Path(inDir + "/file"),
|
|
|
- (short)1);
|
|
|
+ UtilsForTests.writeFile(dfs.getNameNode(), conf,
|
|
|
+ new Path(inDir + "/file"), (short)1);
|
|
|
|
|
|
dfs.startDataNodes(conf, 1, true, null, null, null, null);
|
|
|
dfs.waitActive();
|
|
@@ -695,140 +614,14 @@ public class TestJobTrackerRestart extends TestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- static String getMapSignalFile(Path dir) {
|
|
|
- return dir.suffix("/jt-restart-map-signal").toString();
|
|
|
- }
|
|
|
-
|
|
|
- static String getReduceSignalFile(Path dir) {
|
|
|
- return dir.suffix("/jt-restart-reduce-signal").toString();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Map is a Mapper that just waits for a file to be created on the dfs. The
|
|
|
- * file creation is a signal to the mappers and hence acts as a waiting job.
|
|
|
- * Only the later half of the maps wait for the signal while the rest
|
|
|
- * complete immediately.
|
|
|
- */
|
|
|
-
|
|
|
- static class HalfWaitingMapper
|
|
|
- extends MapReduceBase
|
|
|
- implements Mapper<WritableComparable, Writable,
|
|
|
- WritableComparable, Writable> {
|
|
|
-
|
|
|
- FileSystem fs = null;
|
|
|
- Path signal;
|
|
|
- int id = 0;
|
|
|
- int totalMaps = 0;
|
|
|
-
|
|
|
- /** The waiting function. The map exits once it gets a signal. Here the
|
|
|
- * signal is the file existence.
|
|
|
- */
|
|
|
- public void map(WritableComparable key, Writable val,
|
|
|
- OutputCollector<WritableComparable, Writable> output,
|
|
|
- Reporter reporter)
|
|
|
- throws IOException {
|
|
|
- if (id > totalMaps / 2) {
|
|
|
- if (fs != null) {
|
|
|
- while (!fs.exists(signal)) {
|
|
|
- try {
|
|
|
- reporter.progress();
|
|
|
- synchronized (this) {
|
|
|
- this.wait(1000); // wait for 1 sec
|
|
|
- }
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- System.out.println("Interrupted while the map was waiting for "
|
|
|
- + " the signal.");
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
- throw new IOException("Could not get the DFS!!");
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public void configure(JobConf conf) {
|
|
|
- try {
|
|
|
- String taskId = conf.get("mapred.task.id");
|
|
|
- id = Integer.parseInt(taskId.split("_")[4]);
|
|
|
- totalMaps = Integer.parseInt(conf.get("mapred.map.tasks"));
|
|
|
- fs = FileSystem.get(conf);
|
|
|
- signal = new Path(conf.get("test.mapred.map.waiting.target"));
|
|
|
- } catch (IOException ioe) {
|
|
|
- System.out.println("Got an exception while obtaining the filesystem");
|
|
|
- }
|
|
|
- }
|
|
|
+ private static String getMapSignalFile(Path dir) {
|
|
|
+ return (new Path(dir, "jt-restart-map-signal")).toString();
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * Reduce that just waits for a file to be created on the dfs. The
|
|
|
- * file creation is a signal to the reduce.
|
|
|
- */
|
|
|
|
|
|
- static class WaitingReducer extends MapReduceBase
|
|
|
- implements Reducer<WritableComparable, Writable,
|
|
|
- WritableComparable, Writable> {
|
|
|
-
|
|
|
- FileSystem fs = null;
|
|
|
- Path signal;
|
|
|
-
|
|
|
- /** The waiting function. The reduce exits once it gets a signal. Here the
|
|
|
- * signal is the file existence.
|
|
|
- */
|
|
|
- public void reduce(WritableComparable key, Iterator<Writable> val,
|
|
|
- OutputCollector<WritableComparable, Writable> output,
|
|
|
- Reporter reporter)
|
|
|
- throws IOException {
|
|
|
- if (fs != null) {
|
|
|
- while (!fs.exists(signal)) {
|
|
|
- try {
|
|
|
- reporter.progress();
|
|
|
- synchronized (this) {
|
|
|
- this.wait(1000); // wait for 1 sec
|
|
|
- }
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- System.out.println("Interrupted while the map was waiting for the"
|
|
|
- + " signal.");
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
- throw new IOException("Could not get the DFS!!");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public void configure(JobConf conf) {
|
|
|
- try {
|
|
|
- fs = FileSystem.get(conf);
|
|
|
- signal = new Path(conf.get("test.mapred.reduce.waiting.target"));
|
|
|
- } catch (IOException ioe) {
|
|
|
- System.out.println("Got an exception while obtaining the filesystem");
|
|
|
- }
|
|
|
- }
|
|
|
+ private static String getReduceSignalFile(Path dir) {
|
|
|
+ return (new Path(dir, "jt-restart-reduce-signal")).toString();
|
|
|
}
|
|
|
|
|
|
- static void configureWaitingJobConf(JobConf jobConf, Path inDir,
|
|
|
- Path outputPath, int numMaps, int numRed,
|
|
|
- String jobName, String mapSignalFilename,
|
|
|
- String redSignalFilename)
|
|
|
- throws IOException {
|
|
|
- jobConf.setJobName(jobName);
|
|
|
- jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
|
|
|
- jobConf.setOutputFormat(SequenceFileOutputFormat.class);
|
|
|
- FileInputFormat.setInputPaths(jobConf, inDir);
|
|
|
- FileOutputFormat.setOutputPath(jobConf, outputPath);
|
|
|
- jobConf.setMapperClass(HalfWaitingMapper.class);
|
|
|
- jobConf.setReducerClass(IdentityReducer.class);
|
|
|
- jobConf.setOutputKeyClass(BytesWritable.class);
|
|
|
- jobConf.setOutputValueClass(BytesWritable.class);
|
|
|
- jobConf.setInputFormat(RandomInputFormat.class);
|
|
|
- jobConf.setNumMapTasks(numMaps);
|
|
|
- jobConf.setNumReduceTasks(numRed);
|
|
|
- jobConf.setJar("build/test/testjar/testjob.jar");
|
|
|
- jobConf.set("test.mapred.map.waiting.target", mapSignalFilename);
|
|
|
- jobConf.set("test.mapred.reduce.waiting.target", redSignalFilename);
|
|
|
- }
|
|
|
-
|
|
|
public static void main(String[] args) throws IOException {
|
|
|
new TestJobTrackerRestart().testJobTrackerRestart();
|
|
|
}
|