|
@@ -32,8 +32,6 @@ import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
-import org.apache.hadoop.fs.permission.FsAction;
|
|
|
-import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.mapred.Counters;
|
|
|
import org.apache.hadoop.mapred.FileInputFormat;
|
|
|
import org.apache.hadoop.mapred.FileOutputFormat;
|
|
@@ -49,18 +47,7 @@ import org.apache.hadoop.util.ToolRunner;
|
|
|
public class TestPipes extends TestCase {
|
|
|
private static final Log LOG =
|
|
|
LogFactory.getLog(TestPipes.class.getName());
|
|
|
-
|
|
|
- private static Path cppExamples =
|
|
|
- new Path(System.getProperty("install.c++.examples"));
|
|
|
- static Path wordCountSimple =
|
|
|
- new Path(cppExamples, "bin/wordcount-simple");
|
|
|
- static Path wordCountPart =
|
|
|
- new Path(cppExamples, "bin/wordcount-part");
|
|
|
- static Path wordCountNoPipes =
|
|
|
- new Path(cppExamples,"bin/wordcount-nopipe");
|
|
|
-
|
|
|
- static Path nonPipedOutDir;
|
|
|
-
|
|
|
+
|
|
|
static void cleanup(FileSystem fs, Path p) throws IOException {
|
|
|
fs.delete(p, true);
|
|
|
assertFalse("output not cleaned up", fs.exists(p));
|
|
@@ -73,6 +60,7 @@ public class TestPipes extends TestCase {
|
|
|
}
|
|
|
MiniDFSCluster dfs = null;
|
|
|
MiniMRCluster mr = null;
|
|
|
+ Path cppExamples = new Path(System.getProperty("install.c++.examples"));
|
|
|
Path inputPath = new Path("/testing/in");
|
|
|
Path outputPath = new Path("/testing/out");
|
|
|
try {
|
|
@@ -81,15 +69,17 @@ public class TestPipes extends TestCase {
|
|
|
dfs = new MiniDFSCluster(conf, numSlaves, true, null);
|
|
|
mr = new MiniMRCluster(numSlaves, dfs.getFileSystem().getName(), 1);
|
|
|
writeInputFile(dfs.getFileSystem(), inputPath);
|
|
|
- runProgram(mr, dfs, wordCountSimple,
|
|
|
- inputPath, outputPath, 3, 2, twoSplitOutput, null);
|
|
|
+ runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-simple"),
|
|
|
+ inputPath, outputPath, 3, 2, twoSplitOutput);
|
|
|
cleanup(dfs.getFileSystem(), outputPath);
|
|
|
- runProgram(mr, dfs, wordCountSimple,
|
|
|
- inputPath, outputPath, 3, 0, noSortOutput, null);
|
|
|
+
|
|
|
+ runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-simple"),
|
|
|
+ inputPath, outputPath, 3, 0, noSortOutput);
|
|
|
cleanup(dfs.getFileSystem(), outputPath);
|
|
|
- runProgram(mr, dfs, wordCountPart,
|
|
|
- inputPath, outputPath, 3, 2, fixedPartitionOutput, null);
|
|
|
- runNonPipedProgram(mr, dfs, wordCountNoPipes, null);
|
|
|
+
|
|
|
+ runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-part"),
|
|
|
+ inputPath, outputPath, 3, 2, fixedPartitionOutput);
|
|
|
+ runNonPipedProgram(mr, dfs, new Path(cppExamples,"bin/wordcount-nopipe"));
|
|
|
mr.waitUntilIdle();
|
|
|
} finally {
|
|
|
mr.shutdown();
|
|
@@ -97,7 +87,6 @@ public class TestPipes extends TestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
final static String[] twoSplitOutput = new String[] {
|
|
|
"`and\t1\na\t1\nand\t1\nbeginning\t1\nbook\t1\nbut\t1\nby\t1\n" +
|
|
|
"conversation?'\t1\ndo:\t1\nhad\t2\nhaving\t1\nher\t2\nin\t1\nit\t1\n"+
|
|
@@ -135,7 +124,7 @@ public class TestPipes extends TestCase {
|
|
|
""
|
|
|
};
|
|
|
|
|
|
- static void writeInputFile(FileSystem fs, Path dir) throws IOException {
|
|
|
+ private void writeInputFile(FileSystem fs, Path dir) throws IOException {
|
|
|
DataOutputStream out = fs.create(new Path(dir, "part0"));
|
|
|
out.writeBytes("Alice was beginning to get very tired of sitting by her\n");
|
|
|
out.writeBytes("sister on the bank, and of having nothing to do: once\n");
|
|
@@ -146,18 +135,12 @@ public class TestPipes extends TestCase {
|
|
|
out.close();
|
|
|
}
|
|
|
|
|
|
- static void runProgram(MiniMRCluster mr, MiniDFSCluster dfs,
|
|
|
+ private void runProgram(MiniMRCluster mr, MiniDFSCluster dfs,
|
|
|
Path program, Path inputPath, Path outputPath,
|
|
|
- int numMaps, int numReduces, String[] expectedResults,
|
|
|
- JobConf conf
|
|
|
+ int numMaps, int numReduces, String[] expectedResults
|
|
|
) throws IOException {
|
|
|
Path wordExec = new Path("/testing/bin/application");
|
|
|
- JobConf job = null;
|
|
|
- if(conf == null) {
|
|
|
- job = mr.createJobConf();
|
|
|
- }else {
|
|
|
- job = new JobConf(conf);
|
|
|
- }
|
|
|
+ JobConf job = mr.createJobConf();
|
|
|
job.setNumMapTasks(numMaps);
|
|
|
job.setNumReduceTasks(numReduces);
|
|
|
{
|
|
@@ -216,21 +199,15 @@ public class TestPipes extends TestCase {
|
|
|
* @param program the program to run
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- static void runNonPipedProgram(MiniMRCluster mr, MiniDFSCluster dfs,
|
|
|
- Path program, JobConf conf) throws IOException {
|
|
|
- JobConf job;
|
|
|
- if(conf == null) {
|
|
|
- job = mr.createJobConf();
|
|
|
- }else {
|
|
|
- job = new JobConf(conf);
|
|
|
- }
|
|
|
-
|
|
|
+ private void runNonPipedProgram(MiniMRCluster mr, MiniDFSCluster dfs,
|
|
|
+ Path program) throws IOException {
|
|
|
+ JobConf job = mr.createJobConf();
|
|
|
job.setInputFormat(WordCountInputFormat.class);
|
|
|
FileSystem local = FileSystem.getLocal(job);
|
|
|
Path testDir = new Path("file:" + System.getProperty("test.build.data"),
|
|
|
"pipes");
|
|
|
Path inDir = new Path(testDir, "input");
|
|
|
- nonPipedOutDir = new Path(testDir, "output");
|
|
|
+ Path outDir = new Path(testDir, "output");
|
|
|
Path wordExec = new Path("/testing/bin/application");
|
|
|
Path jobXml = new Path(testDir, "job.xml");
|
|
|
{
|
|
@@ -251,21 +228,21 @@ public class TestPipes extends TestCase {
|
|
|
out.writeBytes("hall silly cats drink java\n");
|
|
|
out.writeBytes("all dogs bow wow\n");
|
|
|
out.writeBytes("hello drink java\n");
|
|
|
- local.delete(nonPipedOutDir, true);
|
|
|
- local.mkdirs(nonPipedOutDir, new FsPermission(FsAction.ALL, FsAction.ALL,
|
|
|
- FsAction.ALL));
|
|
|
out.close();
|
|
|
+ local.delete(outDir, true);
|
|
|
+ local.mkdirs(outDir);
|
|
|
out = local.create(jobXml);
|
|
|
job.writeXml(out);
|
|
|
out.close();
|
|
|
- System.err.println("About to run: Submitter -conf " + jobXml + " -input "
|
|
|
- + inDir + " -output " + nonPipedOutDir + " -program "
|
|
|
- + dfs.getFileSystem().makeQualified(wordExec));
|
|
|
+ System.err.println("About to run: Submitter -conf " + jobXml +
|
|
|
+ " -input " + inDir + " -output " + outDir +
|
|
|
+ " -program " +
|
|
|
+ dfs.getFileSystem().makeQualified(wordExec));
|
|
|
try {
|
|
|
int ret = ToolRunner.run(new Submitter(),
|
|
|
new String[]{"-conf", jobXml.toString(),
|
|
|
"-input", inDir.toString(),
|
|
|
- "-output", nonPipedOutDir.toString(),
|
|
|
+ "-output", outDir.toString(),
|
|
|
"-program",
|
|
|
dfs.getFileSystem().makeQualified(wordExec).toString(),
|
|
|
"-reduces", "2"});
|