|
@@ -43,7 +43,7 @@ import org.apache.hadoop.util.Tool;
|
|
|
import org.apache.hadoop.util.ToolRunner;
|
|
|
|
|
|
/**
|
|
|
- * This class tests reliability of the framework in the face of failures of
|
|
|
+ * This class tests reliability of the framework in the face of failures of
|
|
|
* both tasks and tasktrackers. Steps:
|
|
|
* 1) Get the cluster status
|
|
|
* 2) Get the number of slots in the cluster
|
|
@@ -59,12 +59,12 @@ import org.apache.hadoop.util.ToolRunner;
|
|
|
* ./bin/hadoop --config <config> jar
|
|
|
* build/hadoop-<version>-test.jar MRReliabilityTest -libjars
|
|
|
* build/hadoop-<version>-examples.jar [-scratchdir <dir>]"
|
|
|
- *
|
|
|
- * The scratchdir is optional and by default the current directory on the client
|
|
|
- * will be used as the scratch space. Note that password-less SSH must be set up
|
|
|
- * between the client machine from where the test is submitted, and the cluster
|
|
|
- * nodes where the test runs.
|
|
|
- *
|
|
|
+ *
|
|
|
+ * The scratchdir is optional and by default the current directory on
|
|
|
+ * the client will be used as the scratch space. Note that password-less
|
|
|
+ * SSH must be set up between the client machine from where the test is
|
|
|
+ * submitted, and the cluster nodes where the test runs.
|
|
|
+ *
|
|
|
* The test should be run on a <b>free</b> cluster where there is no other parallel
|
|
|
* job submission going on. Submission of other jobs while the test runs can cause
|
|
|
* the tests/jobs submitted to fail.
|
|
@@ -73,7 +73,7 @@ import org.apache.hadoop.util.ToolRunner;
|
|
|
public class ReliabilityTest extends Configured implements Tool {
|
|
|
|
|
|
private String dir;
|
|
|
- private static final Log LOG = LogFactory.getLog(ReliabilityTest.class);
|
|
|
+ private static final Log LOG = LogFactory.getLog(ReliabilityTest.class);
|
|
|
|
|
|
private void displayUsage() {
|
|
|
LOG.info("This must be run in only the distributed mode " +
|
|
@@ -88,13 +88,13 @@ public class ReliabilityTest extends Configured implements Tool {
|
|
|
" any job submission while the tests are running can cause jobs/tests to fail");
|
|
|
System.exit(-1);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public int run(String[] args) throws Exception {
|
|
|
Configuration conf = getConf();
|
|
|
if ("local".equals(conf.get(JTConfig.JT_IPC_ADDRESS, "local"))) {
|
|
|
displayUsage();
|
|
|
}
|
|
|
- String[] otherArgs =
|
|
|
+ String[] otherArgs =
|
|
|
new GenericOptionsParser(conf, args).getRemainingArgs();
|
|
|
if (otherArgs.length == 2) {
|
|
|
if (otherArgs[0].equals("-scratchdir")) {
|
|
@@ -108,7 +108,7 @@ public class ReliabilityTest extends Configured implements Tool {
|
|
|
} else {
|
|
|
displayUsage();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
//to protect against the case of jobs failing even when multiple attempts
|
|
|
//fail, set some high values for the max attempts
|
|
|
conf.setInt(JobContext.MAP_MAX_ATTEMPTS, 10);
|
|
@@ -117,26 +117,26 @@ public class ReliabilityTest extends Configured implements Tool {
|
|
|
runSortJobTests(new JobClient(new JobConf(conf)), conf);
|
|
|
return 0;
|
|
|
}
|
|
|
-
|
|
|
- private void runSleepJobTest(final JobClient jc, final Configuration conf)
|
|
|
+
|
|
|
+ private void runSleepJobTest(final JobClient jc, final Configuration conf)
|
|
|
throws Exception {
|
|
|
ClusterStatus c = jc.getClusterStatus();
|
|
|
int maxMaps = c.getMaxMapTasks() * 2;
|
|
|
int maxReduces = maxMaps;
|
|
|
int mapSleepTime = (int)c.getTTExpiryInterval();
|
|
|
int reduceSleepTime = mapSleepTime;
|
|
|
- String[] sleepJobArgs = new String[] {
|
|
|
- "-m", Integer.toString(maxMaps),
|
|
|
+ String[] sleepJobArgs = new String[] {
|
|
|
+ "-m", Integer.toString(maxMaps),
|
|
|
"-r", Integer.toString(maxReduces),
|
|
|
"-mt", Integer.toString(mapSleepTime),
|
|
|
"-rt", Integer.toString(reduceSleepTime)};
|
|
|
- runTest(jc, conf, "org.apache.hadoop.mapreduce.SleepJob", sleepJobArgs,
|
|
|
+ runTest(jc, conf, "org.apache.hadoop.mapreduce.SleepJob", sleepJobArgs,
|
|
|
new KillTaskThread(jc, 2, 0.2f, false, 2),
|
|
|
new KillTrackerThread(jc, 2, 0.4f, false, 1));
|
|
|
LOG.info("SleepJob done");
|
|
|
}
|
|
|
-
|
|
|
- private void runSortJobTests(final JobClient jc, final Configuration conf)
|
|
|
+
|
|
|
+ private void runSortJobTests(final JobClient jc, final Configuration conf)
|
|
|
throws Exception {
|
|
|
String inputPath = "my_reliability_test_input";
|
|
|
String outputPath = "my_reliability_test_output";
|
|
@@ -147,36 +147,36 @@ public class ReliabilityTest extends Configured implements Tool {
|
|
|
runSortTest(jc, conf, inputPath, outputPath);
|
|
|
runSortValidatorTest(jc, conf, inputPath, outputPath);
|
|
|
}
|
|
|
-
|
|
|
- private void runRandomWriterTest(final JobClient jc,
|
|
|
- final Configuration conf, final String inputPath)
|
|
|
+
|
|
|
+ private void runRandomWriterTest(final JobClient jc,
|
|
|
+ final Configuration conf, final String inputPath)
|
|
|
throws Exception {
|
|
|
- runTest(jc, conf, "org.apache.hadoop.examples.RandomWriter",
|
|
|
- new String[]{inputPath},
|
|
|
+ runTest(jc, conf, "org.apache.hadoop.examples.RandomWriter",
|
|
|
+ new String[]{inputPath},
|
|
|
null, new KillTrackerThread(jc, 0, 0.4f, false, 1));
|
|
|
LOG.info("RandomWriter job done");
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private void runSortTest(final JobClient jc, final Configuration conf,
|
|
|
- final String inputPath, final String outputPath)
|
|
|
+ final String inputPath, final String outputPath)
|
|
|
throws Exception {
|
|
|
- runTest(jc, conf, "org.apache.hadoop.examples.Sort",
|
|
|
+ runTest(jc, conf, "org.apache.hadoop.examples.Sort",
|
|
|
new String[]{inputPath, outputPath},
|
|
|
new KillTaskThread(jc, 2, 0.2f, false, 2),
|
|
|
new KillTrackerThread(jc, 2, 0.8f, false, 1));
|
|
|
LOG.info("Sort job done");
|
|
|
}
|
|
|
-
|
|
|
- private void runSortValidatorTest(final JobClient jc,
|
|
|
+
|
|
|
+ private void runSortValidatorTest(final JobClient jc,
|
|
|
final Configuration conf, final String inputPath, final String outputPath)
|
|
|
throws Exception {
|
|
|
runTest(jc, conf, "org.apache.hadoop.mapred.SortValidator", new String[] {
|
|
|
"-sortInput", inputPath, "-sortOutput", outputPath},
|
|
|
new KillTaskThread(jc, 2, 0.2f, false, 1),
|
|
|
- new KillTrackerThread(jc, 2, 0.8f, false, 1));
|
|
|
- LOG.info("SortValidator job done");
|
|
|
+ new KillTrackerThread(jc, 2, 0.8f, false, 1));
|
|
|
+ LOG.info("SortValidator job done");
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private String normalizeCommandPath(String command) {
|
|
|
final String hadoopHome;
|
|
|
if ((hadoopHome = System.getenv("HADOOP_HOME")) != null) {
|
|
@@ -184,7 +184,7 @@ public class ReliabilityTest extends Configured implements Tool {
|
|
|
}
|
|
|
return command;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private void checkJobExitStatus(int status, String jobName) {
|
|
|
if (status != 0) {
|
|
|
LOG.info(jobName + " job failed with status: " + status);
|
|
@@ -203,7 +203,7 @@ public class ReliabilityTest extends Configured implements Tool {
|
|
|
public void run() {
|
|
|
try {
|
|
|
Class<?> jobClassObj = conf.getClassByName(jobClass);
|
|
|
- int status = ToolRunner.run(conf, (Tool)(jobClassObj.newInstance()),
|
|
|
+ int status = ToolRunner.run(conf, (Tool)(jobClassObj.newInstance()),
|
|
|
args);
|
|
|
checkJobExitStatus(status, jobClass);
|
|
|
} catch (Exception e) {
|
|
@@ -223,7 +223,8 @@ public class ReliabilityTest extends Configured implements Tool {
|
|
|
JobID jobId = jobs[jobs.length - 1].getJobID();
|
|
|
RunningJob rJob = jc.getJob(jobId);
|
|
|
if(rJob.isComplete()) {
|
|
|
- LOG.error("The last job returned by the querying JobTracker is complete :" +
|
|
|
+ LOG.error("The last job returned by the querying "
|
|
|
+ +"JobTracker is complete :" +
|
|
|
rJob.getJobID() + " .Exiting the test");
|
|
|
System.exit(-1);
|
|
|
}
|
|
@@ -246,7 +247,7 @@ public class ReliabilityTest extends Configured implements Tool {
|
|
|
}
|
|
|
t.join();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private class KillTrackerThread extends Thread {
|
|
|
private volatile boolean killed = false;
|
|
|
private JobClient jc;
|
|
@@ -255,14 +256,14 @@ public class ReliabilityTest extends Configured implements Tool {
|
|
|
private float threshold = 0.2f;
|
|
|
private boolean onlyMapsProgress;
|
|
|
private int numIterations;
|
|
|
- final private String slavesFile = dir + "/_reliability_test_slaves_file_";
|
|
|
- final String shellCommand = normalizeCommandPath("bin/slaves.sh");
|
|
|
- final private String STOP_COMMAND = "ps uwwx | grep java | grep " +
|
|
|
- "org.apache.hadoop.mapred.TaskTracker"+ " |" +
|
|
|
- " grep -v grep | tr -s ' ' | cut -d ' ' -f2 | xargs kill -s STOP";
|
|
|
- final private String RESUME_COMMAND = "ps uwwx | grep java | grep " +
|
|
|
- "org.apache.hadoop.mapred.TaskTracker"+ " |" +
|
|
|
- " grep -v grep | tr -s ' ' | cut -d ' ' -f2 | xargs kill -s CONT";
|
|
|
+ final private String workersFile = dir + "/_reliability_test_workers_file_";
|
|
|
+ final private String shellCommand = normalizeCommandPath("bin/workers.sh");
|
|
|
+ final private String stopCommand = "ps uwwx | grep java | grep " +
|
|
|
+ "org.apache.hadoop.mapred.TaskTracker"+ " |" +
|
|
|
+ " grep -v grep | tr -s ' ' | cut -d ' ' -f2 | xargs kill -s STOP";
|
|
|
+ final private String resumeCommand = "ps uwwx | grep java | grep " +
|
|
|
+ "org.apache.hadoop.mapred.TaskTracker"+ " |" +
|
|
|
+ " grep -v grep | tr -s ' ' | cut -d ' ' -f2 | xargs kill -s CONT";
|
|
|
//Only one instance must be active at any point
|
|
|
public KillTrackerThread(JobClient jc, int threshaldMultiplier,
|
|
|
float threshold, boolean onlyMapsProgress, int numIterations) {
|
|
@@ -293,8 +294,8 @@ public class ReliabilityTest extends Configured implements Tool {
|
|
|
LOG.info("Will STOP/RESUME tasktrackers based on " +
|
|
|
"Reduces' progress");
|
|
|
}
|
|
|
- LOG.info("Initial progress threshold: " + threshold +
|
|
|
- ". Threshold Multiplier: " + thresholdMultiplier +
|
|
|
+ LOG.info("Initial progress threshold: " + threshold +
|
|
|
+ ". Threshold Multiplier: " + thresholdMultiplier +
|
|
|
". Number of iterations: " + numIterations);
|
|
|
float thresholdVal = threshold;
|
|
|
int numIterationsDone = 0;
|
|
@@ -336,7 +337,7 @@ public class ReliabilityTest extends Configured implements Tool {
|
|
|
|
|
|
int count = 0;
|
|
|
|
|
|
- FileOutputStream fos = new FileOutputStream(new File(slavesFile));
|
|
|
+ FileOutputStream fos = new FileOutputStream(new File(workersFile));
|
|
|
LOG.info(new Date() + " Stopping a few trackers");
|
|
|
|
|
|
for (String tracker : trackerNamesList) {
|
|
@@ -355,17 +356,17 @@ public class ReliabilityTest extends Configured implements Tool {
|
|
|
private void startTaskTrackers() throws Exception {
|
|
|
LOG.info(new Date() + " Resuming the stopped trackers");
|
|
|
runOperationOnTT("resume");
|
|
|
- new File(slavesFile).delete();
|
|
|
+ new File(workersFile).delete();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private void runOperationOnTT(String operation) throws IOException {
|
|
|
Map<String,String> hMap = new HashMap<String,String>();
|
|
|
- hMap.put("HADOOP_SLAVES", slavesFile);
|
|
|
+ hMap.put("HADOOP_WORKERS", workersFile);
|
|
|
StringTokenizer strToken;
|
|
|
if (operation.equals("suspend")) {
|
|
|
- strToken = new StringTokenizer(STOP_COMMAND, " ");
|
|
|
+ strToken = new StringTokenizer(stopCommand, " ");
|
|
|
} else {
|
|
|
- strToken = new StringTokenizer(RESUME_COMMAND, " ");
|
|
|
+ strToken = new StringTokenizer(resumeCommand, " ");
|
|
|
}
|
|
|
String commandArgs[] = new String[strToken.countTokens() + 1];
|
|
|
int i = 0;
|
|
@@ -382,14 +383,14 @@ public class ReliabilityTest extends Configured implements Tool {
|
|
|
private String convertTrackerNameToHostName(String trackerName) {
|
|
|
// Convert the trackerName to it's host name
|
|
|
int indexOfColon = trackerName.indexOf(":");
|
|
|
- String trackerHostName = (indexOfColon == -1) ?
|
|
|
- trackerName :
|
|
|
+ String trackerHostName = (indexOfColon == -1) ?
|
|
|
+ trackerName :
|
|
|
trackerName.substring(0, indexOfColon);
|
|
|
return trackerHostName.substring("tracker_".length());
|
|
|
}
|
|
|
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private class KillTaskThread extends Thread {
|
|
|
|
|
|
private volatile boolean killed = false;
|
|
@@ -399,7 +400,7 @@ public class ReliabilityTest extends Configured implements Tool {
|
|
|
private float threshold = 0.2f;
|
|
|
private boolean onlyMapsProgress;
|
|
|
private int numIterations;
|
|
|
- public KillTaskThread(JobClient jc, int thresholdMultiplier,
|
|
|
+ public KillTaskThread(JobClient jc, int thresholdMultiplier,
|
|
|
float threshold, boolean onlyMapsProgress, int numIterations) {
|
|
|
this.jc = jc;
|
|
|
this.thresholdMultiplier = thresholdMultiplier;
|
|
@@ -427,15 +428,15 @@ public class ReliabilityTest extends Configured implements Tool {
|
|
|
} else {
|
|
|
LOG.info("Will kill tasks based on Reduces' progress");
|
|
|
}
|
|
|
- LOG.info("Initial progress threshold: " + threshold +
|
|
|
- ". Threshold Multiplier: " + thresholdMultiplier +
|
|
|
+ LOG.info("Initial progress threshold: " + threshold +
|
|
|
+ ". Threshold Multiplier: " + thresholdMultiplier +
|
|
|
". Number of iterations: " + numIterations);
|
|
|
float thresholdVal = threshold;
|
|
|
int numIterationsDone = 0;
|
|
|
while (!killed) {
|
|
|
try {
|
|
|
float progress;
|
|
|
- if (jc.getJob(rJob.getID()).isComplete() ||
|
|
|
+ if (jc.getJob(rJob.getID()).isComplete() ||
|
|
|
numIterationsDone == numIterations) {
|
|
|
break;
|
|
|
}
|
|
@@ -499,7 +500,7 @@ public class ReliabilityTest extends Configured implements Tool {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public static void main(String args[]) throws Exception {
|
|
|
int res = ToolRunner.run(new Configuration(), new ReliabilityTest(), args);
|
|
|
System.exit(res);
|