|
@@ -32,6 +32,8 @@ import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
|
+import org.apache.hadoop.fs.permission.FsAction;
|
|
|
|
+import org.apache.hadoop.fs.permission.FsPermission;
|
|
import org.apache.hadoop.mapred.Counters;
|
|
import org.apache.hadoop.mapred.Counters;
|
|
import org.apache.hadoop.mapred.FileInputFormat;
|
|
import org.apache.hadoop.mapred.FileInputFormat;
|
|
import org.apache.hadoop.mapred.FileOutputFormat;
|
|
import org.apache.hadoop.mapred.FileOutputFormat;
|
|
@@ -47,7 +49,18 @@ import org.apache.hadoop.util.ToolRunner;
|
|
public class TestPipes extends TestCase {
|
|
public class TestPipes extends TestCase {
|
|
private static final Log LOG =
|
|
private static final Log LOG =
|
|
LogFactory.getLog(TestPipes.class.getName());
|
|
LogFactory.getLog(TestPipes.class.getName());
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ private static Path cppExamples =
|
|
|
|
+ new Path(System.getProperty("install.c++.examples"));
|
|
|
|
+ static Path wordCountSimple =
|
|
|
|
+ new Path(cppExamples, "bin/wordcount-simple");
|
|
|
|
+ static Path wordCountPart =
|
|
|
|
+ new Path(cppExamples, "bin/wordcount-part");
|
|
|
|
+ static Path wordCountNoPipes =
|
|
|
|
+ new Path(cppExamples,"bin/wordcount-nopipe");
|
|
|
|
+
|
|
|
|
+ static Path nonPipedOutDir;
|
|
|
|
+
|
|
static void cleanup(FileSystem fs, Path p) throws IOException {
|
|
static void cleanup(FileSystem fs, Path p) throws IOException {
|
|
fs.delete(p, true);
|
|
fs.delete(p, true);
|
|
assertFalse("output not cleaned up", fs.exists(p));
|
|
assertFalse("output not cleaned up", fs.exists(p));
|
|
@@ -60,7 +73,6 @@ public class TestPipes extends TestCase {
|
|
}
|
|
}
|
|
MiniDFSCluster dfs = null;
|
|
MiniDFSCluster dfs = null;
|
|
MiniMRCluster mr = null;
|
|
MiniMRCluster mr = null;
|
|
- Path cppExamples = new Path(System.getProperty("install.c++.examples"));
|
|
|
|
Path inputPath = new Path("/testing/in");
|
|
Path inputPath = new Path("/testing/in");
|
|
Path outputPath = new Path("/testing/out");
|
|
Path outputPath = new Path("/testing/out");
|
|
try {
|
|
try {
|
|
@@ -69,17 +81,15 @@ public class TestPipes extends TestCase {
|
|
dfs = new MiniDFSCluster(conf, numSlaves, true, null);
|
|
dfs = new MiniDFSCluster(conf, numSlaves, true, null);
|
|
mr = new MiniMRCluster(numSlaves, dfs.getFileSystem().getName(), 1);
|
|
mr = new MiniMRCluster(numSlaves, dfs.getFileSystem().getName(), 1);
|
|
writeInputFile(dfs.getFileSystem(), inputPath);
|
|
writeInputFile(dfs.getFileSystem(), inputPath);
|
|
- runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-simple"),
|
|
|
|
- inputPath, outputPath, 3, 2, twoSplitOutput);
|
|
|
|
|
|
+ runProgram(mr, dfs, wordCountSimple,
|
|
|
|
+ inputPath, outputPath, 3, 2, twoSplitOutput, null);
|
|
cleanup(dfs.getFileSystem(), outputPath);
|
|
cleanup(dfs.getFileSystem(), outputPath);
|
|
-
|
|
|
|
- runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-simple"),
|
|
|
|
- inputPath, outputPath, 3, 0, noSortOutput);
|
|
|
|
|
|
+ runProgram(mr, dfs, wordCountSimple,
|
|
|
|
+ inputPath, outputPath, 3, 0, noSortOutput, null);
|
|
cleanup(dfs.getFileSystem(), outputPath);
|
|
cleanup(dfs.getFileSystem(), outputPath);
|
|
-
|
|
|
|
- runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-part"),
|
|
|
|
- inputPath, outputPath, 3, 2, fixedPartitionOutput);
|
|
|
|
- runNonPipedProgram(mr, dfs, new Path(cppExamples,"bin/wordcount-nopipe"));
|
|
|
|
|
|
+ runProgram(mr, dfs, wordCountPart,
|
|
|
|
+ inputPath, outputPath, 3, 2, fixedPartitionOutput, null);
|
|
|
|
+ runNonPipedProgram(mr, dfs, wordCountNoPipes, null);
|
|
mr.waitUntilIdle();
|
|
mr.waitUntilIdle();
|
|
} finally {
|
|
} finally {
|
|
mr.shutdown();
|
|
mr.shutdown();
|
|
@@ -87,6 +97,7 @@ public class TestPipes extends TestCase {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+
|
|
final static String[] twoSplitOutput = new String[] {
|
|
final static String[] twoSplitOutput = new String[] {
|
|
"`and\t1\na\t1\nand\t1\nbeginning\t1\nbook\t1\nbut\t1\nby\t1\n" +
|
|
"`and\t1\na\t1\nand\t1\nbeginning\t1\nbook\t1\nbut\t1\nby\t1\n" +
|
|
"conversation?'\t1\ndo:\t1\nhad\t2\nhaving\t1\nher\t2\nin\t1\nit\t1\n"+
|
|
"conversation?'\t1\ndo:\t1\nhad\t2\nhaving\t1\nher\t2\nin\t1\nit\t1\n"+
|
|
@@ -124,7 +135,7 @@ public class TestPipes extends TestCase {
|
|
""
|
|
""
|
|
};
|
|
};
|
|
|
|
|
|
- private void writeInputFile(FileSystem fs, Path dir) throws IOException {
|
|
|
|
|
|
+ static void writeInputFile(FileSystem fs, Path dir) throws IOException {
|
|
DataOutputStream out = fs.create(new Path(dir, "part0"));
|
|
DataOutputStream out = fs.create(new Path(dir, "part0"));
|
|
out.writeBytes("Alice was beginning to get very tired of sitting by her\n");
|
|
out.writeBytes("Alice was beginning to get very tired of sitting by her\n");
|
|
out.writeBytes("sister on the bank, and of having nothing to do: once\n");
|
|
out.writeBytes("sister on the bank, and of having nothing to do: once\n");
|
|
@@ -135,12 +146,18 @@ public class TestPipes extends TestCase {
|
|
out.close();
|
|
out.close();
|
|
}
|
|
}
|
|
|
|
|
|
- private void runProgram(MiniMRCluster mr, MiniDFSCluster dfs,
|
|
|
|
|
|
+ static void runProgram(MiniMRCluster mr, MiniDFSCluster dfs,
|
|
Path program, Path inputPath, Path outputPath,
|
|
Path program, Path inputPath, Path outputPath,
|
|
- int numMaps, int numReduces, String[] expectedResults
|
|
|
|
|
|
+ int numMaps, int numReduces, String[] expectedResults,
|
|
|
|
+ JobConf conf
|
|
) throws IOException {
|
|
) throws IOException {
|
|
Path wordExec = new Path("/testing/bin/application");
|
|
Path wordExec = new Path("/testing/bin/application");
|
|
- JobConf job = mr.createJobConf();
|
|
|
|
|
|
+ JobConf job = null;
|
|
|
|
+ if(conf == null) {
|
|
|
|
+ job = mr.createJobConf();
|
|
|
|
+ }else {
|
|
|
|
+ job = new JobConf(conf);
|
|
|
|
+ }
|
|
job.setNumMapTasks(numMaps);
|
|
job.setNumMapTasks(numMaps);
|
|
job.setNumReduceTasks(numReduces);
|
|
job.setNumReduceTasks(numReduces);
|
|
{
|
|
{
|
|
@@ -199,15 +216,21 @@ public class TestPipes extends TestCase {
|
|
* @param program the program to run
|
|
* @param program the program to run
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
- private void runNonPipedProgram(MiniMRCluster mr, MiniDFSCluster dfs,
|
|
|
|
- Path program) throws IOException {
|
|
|
|
- JobConf job = mr.createJobConf();
|
|
|
|
|
|
+ static void runNonPipedProgram(MiniMRCluster mr, MiniDFSCluster dfs,
|
|
|
|
+ Path program, JobConf conf) throws IOException {
|
|
|
|
+ JobConf job;
|
|
|
|
+ if(conf == null) {
|
|
|
|
+ job = mr.createJobConf();
|
|
|
|
+ }else {
|
|
|
|
+ job = new JobConf(conf);
|
|
|
|
+ }
|
|
|
|
+
|
|
job.setInputFormat(WordCountInputFormat.class);
|
|
job.setInputFormat(WordCountInputFormat.class);
|
|
FileSystem local = FileSystem.getLocal(job);
|
|
FileSystem local = FileSystem.getLocal(job);
|
|
Path testDir = new Path("file:" + System.getProperty("test.build.data"),
|
|
Path testDir = new Path("file:" + System.getProperty("test.build.data"),
|
|
"pipes");
|
|
"pipes");
|
|
Path inDir = new Path(testDir, "input");
|
|
Path inDir = new Path(testDir, "input");
|
|
- Path outDir = new Path(testDir, "output");
|
|
|
|
|
|
+ nonPipedOutDir = new Path(testDir, "output");
|
|
Path wordExec = new Path("/testing/bin/application");
|
|
Path wordExec = new Path("/testing/bin/application");
|
|
Path jobXml = new Path(testDir, "job.xml");
|
|
Path jobXml = new Path(testDir, "job.xml");
|
|
{
|
|
{
|
|
@@ -228,21 +251,21 @@ public class TestPipes extends TestCase {
|
|
out.writeBytes("hall silly cats drink java\n");
|
|
out.writeBytes("hall silly cats drink java\n");
|
|
out.writeBytes("all dogs bow wow\n");
|
|
out.writeBytes("all dogs bow wow\n");
|
|
out.writeBytes("hello drink java\n");
|
|
out.writeBytes("hello drink java\n");
|
|
|
|
+ local.delete(nonPipedOutDir, true);
|
|
|
|
+ local.mkdirs(nonPipedOutDir, new FsPermission(FsAction.ALL, FsAction.ALL,
|
|
|
|
+ FsAction.ALL));
|
|
out.close();
|
|
out.close();
|
|
- local.delete(outDir, true);
|
|
|
|
- local.mkdirs(outDir);
|
|
|
|
out = local.create(jobXml);
|
|
out = local.create(jobXml);
|
|
job.writeXml(out);
|
|
job.writeXml(out);
|
|
out.close();
|
|
out.close();
|
|
- System.err.println("About to run: Submitter -conf " + jobXml +
|
|
|
|
- " -input " + inDir + " -output " + outDir +
|
|
|
|
- " -program " +
|
|
|
|
- dfs.getFileSystem().makeQualified(wordExec));
|
|
|
|
|
|
+ System.err.println("About to run: Submitter -conf " + jobXml + " -input "
|
|
|
|
+ + inDir + " -output " + nonPipedOutDir + " -program "
|
|
|
|
+ + dfs.getFileSystem().makeQualified(wordExec));
|
|
try {
|
|
try {
|
|
int ret = ToolRunner.run(new Submitter(),
|
|
int ret = ToolRunner.run(new Submitter(),
|
|
new String[]{"-conf", jobXml.toString(),
|
|
new String[]{"-conf", jobXml.toString(),
|
|
"-input", inDir.toString(),
|
|
"-input", inDir.toString(),
|
|
- "-output", outDir.toString(),
|
|
|
|
|
|
+ "-output", nonPipedOutDir.toString(),
|
|
"-program",
|
|
"-program",
|
|
dfs.getFileSystem().makeQualified(wordExec).toString(),
|
|
dfs.getFileSystem().makeQualified(wordExec).toString(),
|
|
"-reduces", "2"});
|
|
"-reduces", "2"});
|