瀏覽代碼

HADOOP-1573. Support for 0 reducers in PIPES.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@578543 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 18 年之前
父節點
當前提交
de5b907d85
共有 3 個文件被更改,包括 30 次插入7 次删除
  1. 3 0
      CHANGES.txt
  2. 4 2
      src/c++/pipes/impl/HadoopPipes.cc
  3. 23 5
      src/test/org/apache/hadoop/mapred/pipes/TestPipes.java

+ 3 - 0
CHANGES.txt

@@ -87,6 +87,9 @@ Trunk (unreleased changes)
 
 
   BUG FIXES
   BUG FIXES
 
 
+    HADOOP-1573. Support for 0 reducers in PIPES. 
+    (Owen O'Malley via devaraj)
+
     HADOOP-1500. Fix typographical errors in the DFS WebUI.
     HADOOP-1500. Fix typographical errors in the DFS WebUI.
     (Nigel Daley via dhruba)
     (Nigel Daley via dhruba)
 
 

+ 4 - 2
src/c++/pipes/impl/HadoopPipes.cc

@@ -628,9 +628,11 @@ namespace HadoopPipes {
         value = new string();
         value = new string();
       }
       }
       mapper = factory->createMapper(*this);
       mapper = factory->createMapper(*this);
-      reducer = factory->createCombiner(*this);
-      partitioner = factory->createPartitioner(*this);
       numReduces = _numReduces;
       numReduces = _numReduces;
+      if (numReduces != 0) { 
+        reducer = factory->createCombiner(*this);
+        partitioner = factory->createPartitioner(*this);
+      }
       if (reducer != NULL) {
       if (reducer != NULL) {
         int64_t spillSize = 100;
         int64_t spillSize = 100;
         if (jobConf->hasKey("io.sort.mb")) {
         if (jobConf->hasKey("io.sort.mb")) {

+ 23 - 5
src/test/org/apache/hadoop/mapred/pipes/TestPipes.java

@@ -61,11 +61,15 @@ public class TestPipes extends TestCase {
       mr = new MiniMRCluster(numSlaves, fs.getName(), 1);
       mr = new MiniMRCluster(numSlaves, fs.getName(), 1);
       writeInputFile(fs, inputPath);
       writeInputFile(fs, inputPath);
       runProgram(mr, fs, new Path(cppExamples, "bin/wordcount-simple"), 
       runProgram(mr, fs, new Path(cppExamples, "bin/wordcount-simple"), 
-                 inputPath, outputPath, twoSplitOutput);
+                 inputPath, outputPath, 3, 2, twoSplitOutput);
+      FileUtil.fullyDelete(fs, outputPath);
+      assertFalse("output not cleaned up", fs.exists(outputPath));
+      runProgram(mr, fs, new Path(cppExamples, "bin/wordcount-simple"), 
+                 inputPath, outputPath, 3, 0, noSortOutput);
       FileUtil.fullyDelete(fs, outputPath);
       FileUtil.fullyDelete(fs, outputPath);
       assertFalse("output not cleaned up", fs.exists(outputPath));
       assertFalse("output not cleaned up", fs.exists(outputPath));
       runProgram(mr, fs, new Path(cppExamples, "bin/wordcount-part"),
       runProgram(mr, fs, new Path(cppExamples, "bin/wordcount-part"),
-                 inputPath, outputPath, fixedPartitionOutput);
+                 inputPath, outputPath, 3, 2, fixedPartitionOutput);
       runNonPipedProgram(mr, fs, new Path(cppExamples, "bin/wordcount-nopipe"));
       runNonPipedProgram(mr, fs, new Path(cppExamples, "bin/wordcount-nopipe"));
       mr.waitUntilIdle();
       mr.waitUntilIdle();
     } finally {
     } finally {
@@ -84,6 +88,20 @@ public class TestPipes extends TestCase {
     "into\t1\nis\t1\nreading,\t1\nshe\t1\nsister\t2\nsitting\t1\ntired\t1\n" +
     "into\t1\nis\t1\nreading,\t1\nshe\t1\nsister\t2\nsitting\t1\ntired\t1\n" +
     "twice\t1\nvery\t1\nwhat\t1\n"
     "twice\t1\nvery\t1\nwhat\t1\n"
   };
   };
+
+  final static String[] noSortOutput = new String[] {
+    "it,\t1\n`and\t1\nwhat\t1\nis\t1\nthe\t1\nuse\t1\nof\t1\na\t1\n" +
+    "book,'\t1\nthought\t1\nAlice\t1\n`without\t1\npictures\t1\nor\t1\n"+
+    "conversation?'\t1\n",
+
+    "Alice\t1\nwas\t1\nbeginning\t1\nto\t1\nget\t1\nvery\t1\ntired\t1\n"+
+    "of\t1\nsitting\t1\nby\t1\nher\t1\nsister\t1\non\t1\nthe\t1\nbank,\t1\n"+
+    "and\t1\nof\t1\nhaving\t1\nnothing\t1\nto\t1\ndo:\t1\nonce\t1\n", 
+
+    "or\t1\ntwice\t1\nshe\t1\nhad\t1\npeeped\t1\ninto\t1\nthe\t1\nbook\t1\n"+
+    "her\t1\nsister\t1\nwas\t1\nreading,\t1\nbut\t1\nit\t1\nhad\t1\nno\t1\n"+
+    "pictures\t1\nor\t1\nconversations\t1\nin\t1\n"
+  };
   
   
   final static String[] fixedPartitionOutput = new String[] {
   final static String[] fixedPartitionOutput = new String[] {
     "Alice\t2\n`and\t1\n`without\t1\na\t1\nand\t1\nbank,\t1\nbeginning\t1\n" +
     "Alice\t2\n`and\t1\n`without\t1\na\t1\nand\t1\nbank,\t1\nbeginning\t1\n" +
@@ -110,14 +128,14 @@ public class TestPipes extends TestCase {
 
 
   private void runProgram(MiniMRCluster mr, FileSystem fs, 
   private void runProgram(MiniMRCluster mr, FileSystem fs, 
                           Path program, Path inputPath, Path outputPath,
                           Path program, Path inputPath, Path outputPath,
-                          String[] expectedResults
+                          int numMaps, int numReduces, String[] expectedResults
                          ) throws IOException {
                          ) throws IOException {
     Path wordExec = new Path("/testing/bin/application");
     Path wordExec = new Path("/testing/bin/application");
     FileUtil.fullyDelete(fs, wordExec.getParent());
     FileUtil.fullyDelete(fs, wordExec.getParent());
     fs.copyFromLocalFile(program, wordExec);                                         
     fs.copyFromLocalFile(program, wordExec);                                         
     JobConf job = mr.createJobConf();
     JobConf job = mr.createJobConf();
-    job.setNumMapTasks(3);
-    job.setNumReduceTasks(expectedResults.length);
+    job.setNumMapTasks(numMaps);
+    job.setNumReduceTasks(numReduces);
     Submitter.setExecutable(job, fs.makeQualified(wordExec).toString());
     Submitter.setExecutable(job, fs.makeQualified(wordExec).toString());
     Submitter.setIsJavaRecordReader(job, true);
     Submitter.setIsJavaRecordReader(job, true);
     Submitter.setIsJavaRecordWriter(job, true);
     Submitter.setIsJavaRecordWriter(job, true);