|
@@ -29,6 +29,8 @@ import java.io.PipedInputStream;
|
|
import java.io.PipedOutputStream;
|
|
import java.io.PipedOutputStream;
|
|
import java.io.PrintStream;
|
|
import java.io.PrintStream;
|
|
|
|
|
|
|
|
+import junit.framework.Assert;
|
|
|
|
+
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -60,6 +62,22 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
|
|
return job;
|
|
return job;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private Job runJobInBackGround(Configuration conf) throws Exception {
|
|
|
|
+ String input = "hello1\nhello2\nhello3\n";
|
|
|
|
+
|
|
|
|
+ Job job = MapReduceTestUtil.createJob(conf, getInputDir(), getOutputDir(),
|
|
|
|
+ 1, 1, input);
|
|
|
|
+ job.setJobName("mr");
|
|
|
|
+ job.setPriority(JobPriority.NORMAL);
|
|
|
|
+ job.submit();
|
|
|
|
+ int i = 0;
|
|
|
|
+ while (i++ < 200 && job.getJobID() == null) {
|
|
|
|
+ LOG.info("waiting for jobId...");
|
|
|
|
+ Thread.sleep(100);
|
|
|
|
+ }
|
|
|
|
+ return job;
|
|
|
|
+ }
|
|
|
|
+
|
|
public static int runTool(Configuration conf, Tool tool, String[] args,
|
|
public static int runTool(Configuration conf, Tool tool, String[] args,
|
|
OutputStream out) throws Exception {
|
|
OutputStream out) throws Exception {
|
|
PrintStream oldOut = System.out;
|
|
PrintStream oldOut = System.out;
|
|
@@ -108,8 +126,10 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
|
|
Job job = runJob(conf);
|
|
Job job = runJob(conf);
|
|
|
|
|
|
String jobId = job.getJobID().toString();
|
|
String jobId = job.getJobID().toString();
|
|
- // test jobs list
|
|
|
|
- testJobList(jobId, conf);
|
|
|
|
|
|
+ // test all jobs list
|
|
|
|
+ testAllJobList(jobId, conf);
|
|
|
|
+ // test only submitted jobs list
|
|
|
|
+ testSubmittedJobList(conf);
|
|
// test job counter
|
|
// test job counter
|
|
testGetCounter(jobId, conf);
|
|
testGetCounter(jobId, conf);
|
|
// status
|
|
// status
|
|
@@ -131,37 +151,37 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
|
|
// submit job from file
|
|
// submit job from file
|
|
testSubmit(conf);
|
|
testSubmit(conf);
|
|
// kill a task
|
|
// kill a task
|
|
- testKillTask(job, conf);
|
|
|
|
|
|
+ testKillTask(conf);
|
|
// fail a task
|
|
// fail a task
|
|
- testfailTask(job, conf);
|
|
|
|
|
|
+ testfailTask(conf);
|
|
// kill job
|
|
// kill job
|
|
- testKillJob(jobId, conf);
|
|
|
|
|
|
+ testKillJob(conf);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* test fail task
|
|
* test fail task
|
|
*/
|
|
*/
|
|
- private void testfailTask(Job job, Configuration conf) throws Exception {
|
|
|
|
|
|
+ private void testfailTask(Configuration conf) throws Exception {
|
|
|
|
+ Job job = runJobInBackGround(conf);
|
|
CLI jc = createJobClient();
|
|
CLI jc = createJobClient();
|
|
TaskID tid = new TaskID(job.getJobID(), TaskType.MAP, 0);
|
|
TaskID tid = new TaskID(job.getJobID(), TaskType.MAP, 0);
|
|
TaskAttemptID taid = new TaskAttemptID(tid, 1);
|
|
TaskAttemptID taid = new TaskAttemptID(tid, 1);
|
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
|
- // TaskAttemptId is not set
|
|
|
|
|
|
+ // TaskAttemptId is not set
|
|
int exitCode = runTool(conf, jc, new String[] { "-fail-task" }, out);
|
|
int exitCode = runTool(conf, jc, new String[] { "-fail-task" }, out);
|
|
assertEquals("Exit code", -1, exitCode);
|
|
assertEquals("Exit code", -1, exitCode);
|
|
|
|
|
|
- try {
|
|
|
|
- runTool(conf, jc, new String[] { "-fail-task", taid.toString() }, out);
|
|
|
|
- fail(" this task should field");
|
|
|
|
- } catch (IOException e) {
|
|
|
|
- // task completed !
|
|
|
|
- assertTrue(e.getMessage().contains("_0001_m_000000_1"));
|
|
|
|
- }
|
|
|
|
|
|
+ runTool(conf, jc, new String[] { "-fail-task", taid.toString() }, out);
|
|
|
|
+ String answer = new String(out.toByteArray(), "UTF-8");
|
|
|
|
+ Assert
|
|
|
|
+ .assertTrue(answer.contains("Killed task " + taid + " by failing it"));
|
|
}
|
|
}
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* test a kill task
|
|
* test a kill task
|
|
*/
|
|
*/
|
|
- private void testKillTask(Job job, Configuration conf) throws Exception {
|
|
|
|
|
|
+ private void testKillTask(Configuration conf) throws Exception {
|
|
|
|
+ Job job = runJobInBackGround(conf);
|
|
CLI jc = createJobClient();
|
|
CLI jc = createJobClient();
|
|
TaskID tid = new TaskID(job.getJobID(), TaskType.MAP, 0);
|
|
TaskID tid = new TaskID(job.getJobID(), TaskType.MAP, 0);
|
|
TaskAttemptID taid = new TaskAttemptID(tid, 1);
|
|
TaskAttemptID taid = new TaskAttemptID(tid, 1);
|
|
@@ -170,20 +190,17 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
|
|
int exitCode = runTool(conf, jc, new String[] { "-kill-task" }, out);
|
|
int exitCode = runTool(conf, jc, new String[] { "-kill-task" }, out);
|
|
assertEquals("Exit code", -1, exitCode);
|
|
assertEquals("Exit code", -1, exitCode);
|
|
|
|
|
|
- try {
|
|
|
|
- runTool(conf, jc, new String[] { "-kill-task", taid.toString() }, out);
|
|
|
|
- fail(" this task should be killed");
|
|
|
|
- } catch (IOException e) {
|
|
|
|
- System.out.println(e);
|
|
|
|
- // task completed
|
|
|
|
- assertTrue(e.getMessage().contains("_0001_m_000000_1"));
|
|
|
|
- }
|
|
|
|
|
|
+ runTool(conf, jc, new String[] { "-kill-task", taid.toString() }, out);
|
|
|
|
+ String answer = new String(out.toByteArray(), "UTF-8");
|
|
|
|
+ Assert.assertTrue(answer.contains("Killed task " + taid));
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* test a kill job
|
|
* test a kill job
|
|
*/
|
|
*/
|
|
- private void testKillJob(String jobId, Configuration conf) throws Exception {
|
|
|
|
|
|
+ private void testKillJob(Configuration conf) throws Exception {
|
|
|
|
+ Job job = runJobInBackGround(conf);
|
|
|
|
+ String jobId = job.getJobID().toString();
|
|
CLI jc = createJobClient();
|
|
CLI jc = createJobClient();
|
|
|
|
|
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
|
@@ -434,7 +451,8 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
|
|
/**
|
|
/**
|
|
* print a job list
|
|
* print a job list
|
|
*/
|
|
*/
|
|
- protected void testJobList(String jobId, Configuration conf) throws Exception {
|
|
|
|
|
|
+ protected void testAllJobList(String jobId, Configuration conf)
|
|
|
|
+ throws Exception {
|
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
|
// bad options
|
|
// bad options
|
|
|
|
|
|
@@ -457,23 +475,31 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
|
|
}
|
|
}
|
|
assertEquals(1, counter);
|
|
assertEquals(1, counter);
|
|
out.reset();
|
|
out.reset();
|
|
- // only submitted
|
|
|
|
- exitCode = runTool(conf, createJobClient(), new String[] { "-list" }, out);
|
|
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ protected void testSubmittedJobList(Configuration conf) throws Exception {
|
|
|
|
+ Job job = runJobInBackGround(conf);
|
|
|
|
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
|
|
|
|
+ String line;
|
|
|
|
+ int counter = 0;
|
|
|
|
+ // only submitted
|
|
|
|
+ int exitCode =
|
|
|
|
+ runTool(conf, createJobClient(), new String[] { "-list" }, out);
|
|
assertEquals("Exit code", 0, exitCode);
|
|
assertEquals("Exit code", 0, exitCode);
|
|
- br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(
|
|
|
|
- out.toByteArray())));
|
|
|
|
|
|
+ BufferedReader br =
|
|
|
|
+ new BufferedReader(new InputStreamReader(new ByteArrayInputStream(
|
|
|
|
+ out.toByteArray())));
|
|
counter = 0;
|
|
counter = 0;
|
|
while ((line = br.readLine()) != null) {
|
|
while ((line = br.readLine()) != null) {
|
|
LOG.info("line = " + line);
|
|
LOG.info("line = " + line);
|
|
- if (line.contains(jobId)) {
|
|
|
|
|
|
+ if (line.contains(job.getJobID().toString())) {
|
|
counter++;
|
|
counter++;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
// all jobs submitted! no current
|
|
// all jobs submitted! no current
|
|
assertEquals(1, counter);
|
|
assertEquals(1, counter);
|
|
-
|
|
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
protected void verifyJobPriority(String jobId, String priority,
|
|
protected void verifyJobPriority(String jobId, String priority,
|
|
Configuration conf, CLI jc) throws Exception {
|
|
Configuration conf, CLI jc) throws Exception {
|
|
PipedInputStream pis = new PipedInputStream();
|
|
PipedInputStream pis = new PipedInputStream();
|