Просмотр исходного кода

HADOOP-1216. Change MapReduce so that, when numReduceTasks is zero, map outputs are written directly as final output. Contributed by Runping.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@532878 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 лет назад
Родитель
Сommit
18f43a4fba

+ 5 - 0
CHANGES.txt

@@ -277,6 +277,11 @@ Trunk (unreleased changes)
     do not exist no longer causes block reports to be re-sent every
     second.  (Dhruba Borthakur via cutting)
 
+83. HADOOP-1216.  Change MapReduce so that, when numReduceTasks is
+    zero, map outputs are written directly as final output, skipping
+    shuffle, sort and reduce.  Use this to implement reduce=NONE
+    option in contrib/streaming.  (Runping Qi via cutting)
+
 
 Release 0.12.3 - 2007-04-06
 

+ 12 - 1
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java

@@ -241,7 +241,8 @@ public class StreamJob {
       
       additionalConfSpec_ = (String)cmdLine.getValue("-additionalconfspec"); 
       inputFormatSpec_ = (String)cmdLine.getValue("-inputformat"); 
-      outputFormatSpec_ = (String)cmdLine.getValue("-outputformat"); 
+      outputFormatSpec_ = (String)cmdLine.getValue("-outputformat");
+      numReduceTasksSpec_ = (String)cmdLine.getValue("-numReduceTasks"); 
       partitionerSpec_ = (String)cmdLine.getValue("-partitioner");
       inReaderSpec_ = (String)cmdLine.getValue("-inputreader"); 
       
@@ -397,6 +398,8 @@ public class StreamJob {
                                        "Optional.", "spec", 1, false);
     Option partitioner = createOption("partitioner", 
                                       "Optional.", "spec", 1, false);
+    Option numReduceTasks = createOption("numReduceTasks", 
+        "Optional.", "spec",1, false );
     Option inputreader = createOption("inputreader", 
                                       "Optional.", "spec", 1, false);
     Option cacheFile = createOption("cacheFile", 
@@ -425,6 +428,7 @@ public class StreamJob {
       withOption(inputformat).
       withOption(outputformat).
       withOption(partitioner).
+      withOption(numReduceTasks).
       withOption(inputreader).
       withOption(jobconf).
       withOption(cmdenv).
@@ -462,6 +466,7 @@ public class StreamJob {
       System.out.println("  -inputformat KeyValueTextInputFormat(default)|SequenceFileInputFormat|XmlTextInputFormat  Optional.");
       System.out.println("  -outputformat specfile  Optional.");
       System.out.println("  -partitioner specfile  Optional.");
+      System.out.println("  -numReduceTasks specfile  Optional.");
       System.out.println("  -inputreader <spec>  Optional.");
       System.out.println("  -jobconf  <n>=<v>    Optional. Add or override a JobConf property");
       System.out.println("  -cmdenv   <n>=<v>    Optional. Pass env.var to streaming commands");
@@ -823,6 +828,11 @@ public class StreamJob {
       } 
     }
     
+    if (numReduceTasksSpec_!= null) {
+      int numReduceTasks = Integer.parseInt(numReduceTasksSpec_);
+      jobConf_.setNumReduceTasks(numReduceTasks);
+    }
+    
     // last, allow user to override anything
     // (although typically used with properties we didn't touch)
 
@@ -1113,6 +1123,7 @@ public class StreamJob {
   protected String inputFormatSpec_;
   protected String outputFormatSpec_;
   protected String partitionerSpec_;
+  protected String numReduceTasksSpec_;
   protected String additionalConfSpec_;
 
   protected boolean testMerge_;

+ 116 - 0
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java

@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This class tests hadoopStreaming in MapReduce local mode.
+ * It tests the case where number of reducers is set to 0.
+   In this case, the mappers are expected to write out outputs directly.
+   No reducer/combiner will be activated.
+ */
+public class TestStreamReduceNone extends TestCase
+{
+  protected File INPUT_FILE = new File("stream_reduce_none_input.txt");
+  protected File OUTPUT_DIR = new File("stream_reduce_none_out");
+  protected String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
+  // map parses input lines and generates count entries for each word.
+  protected String map = StreamUtil.makeJavaCommand(TrApp.class, new String[]{".", "\\n"});
+  protected String outputExpect = "roses\t\nare\t\nred\t\nviolets\t\nare\t\nblue\t\nbunnies\t\nare\t\npink\t\n";
+
+  private StreamJob job;
+
+  public TestStreamReduceNone() throws IOException
+  {
+    UtilTest utilTest = new UtilTest(getClass().getName());
+    utilTest.checkUserDir();
+    utilTest.redirectIfAntJunit();
+  }
+
+  protected void createInput() throws IOException
+  {
+    DataOutputStream out = new DataOutputStream(
+                                                new FileOutputStream(INPUT_FILE.getAbsoluteFile()));
+    out.write(input.getBytes("UTF-8"));
+    out.close();
+  }
+
+  protected String[] genArgs() {
+    return new String[] {
+      "-input", INPUT_FILE.getAbsolutePath(),
+      "-output", OUTPUT_DIR.getAbsolutePath(),
+      "-mapper", map,
+      "-reducer", "org.apache.hadoop.mapred.lib.IdentityReducer",
+      "-numReduceTasks", "0",
+      "-jobconf", "keep.failed.task.files=true",
+      "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
+    };
+  }
+  
+  public void testCommandLine()
+  {
+    File outFile = null;
+    try {
+      try {
+        OUTPUT_DIR.getAbsoluteFile().delete();
+      } catch (Exception e) {
+      }
+
+      createInput();
+      boolean mayExit = false;
+
+      // During tests, the default Configuration will use a local mapred
+      // So don't specify -config or -cluster
+      job = new StreamJob(genArgs(), mayExit);      
+      job.go();
+      outFile = new File(OUTPUT_DIR, "tip_m_map_0000").getAbsoluteFile();
+      String output = StreamUtil.slurp(outFile);
+      System.err.println("outEx1=" + outputExpect);
+      System.err.println("  out1=" + output);
+      assertEquals(outputExpect, output);
+    } catch(Exception e) {
+      failTrace(e);
+    } finally {
+      outFile.delete();
+      File outFileCRC = new File(OUTPUT_DIR, ".tip_m_map_0000.crc").getAbsoluteFile();
+      INPUT_FILE.delete();
+      outFileCRC.delete();
+      OUTPUT_DIR.getAbsoluteFile().delete();
+    }
+  }
+
+  private void failTrace(Exception e)
+  {
+    StringWriter sw = new StringWriter();
+    e.printStackTrace(new PrintWriter(sw));
+    fail(sw.toString());
+  }
+
+  public static void main(String[]args) throws Exception
+  {
+    new TestStreamReduceNone().testCommandLine();
+  }
+
+}

+ 0 - 1
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java

@@ -50,7 +50,6 @@ public class TrApp
     expect("mapred_output_value_class", "org.apache.hadoop.io.Text");
 
     expect("mapred_task_is_map", "true");
-    expect("mapred_reduce_tasks", "1");
     expectDefined("mapred_task_id");
 
     expectDefined("map_input_file");

+ 46 - 32
src/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.text.NumberFormat;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Random;
@@ -104,11 +105,15 @@ class LocalJobRunner implements JobSubmissionProtocol {
         splits = job.getInputFormat().getSplits(job, 1);
         String jobId = profile.getJobId();
         
-        // run a map task for each split
-        job.setNumReduceTasks(1);                 // force a single reduce task
+        int numReduceTasks = job.getNumReduceTasks();
+        if (numReduceTasks > 1 || numReduceTasks < 0) {
+          // we only allow 0 or 1 reducer in local mode
+          numReduceTasks = 1;
+          job.setNumReduceTasks(1);
+        }
         DataOutputBuffer buffer = new DataOutputBuffer();
         for (int i = 0; i < splits.length; i++) {
-          String mapId = "map_" + newId(); 
+          String mapId = "map_" + idFormat.format(i); 
           mapIds.add(mapId);
           buffer.reset();
           splits[i].write(buffer);
@@ -129,38 +134,38 @@ class LocalJobRunner implements JobSubmissionProtocol {
           map_tasks -= 1;
           updateCounters(map);
         }
-
-        // move map output to reduce input
-        String reduceId = "reduce_" + newId();
-        for (int i = 0; i < mapIds.size(); i++) {
-          String mapId = mapIds.get(i);
-          Path mapOut = this.mapoutputFile.getOutputFile(mapId);
-          Path reduceIn = this.mapoutputFile.getInputFile(i, reduceId);
-          if (!localFs.mkdirs(reduceIn.getParent())) {
-            throw new IOException("Mkdirs failed to create " + 
-                                  reduceIn.getParent().toString());
+        if (numReduceTasks > 0) {
+          // move map output to reduce input
+          String reduceId = "reduce_" + newId();
+          for (int i = 0; i < mapIds.size(); i++) {
+            String mapId = mapIds.get(i);
+            Path mapOut = this.mapoutputFile.getOutputFile(mapId);
+            Path reduceIn = this.mapoutputFile.getInputFile(i, reduceId);
+            if (!localFs.mkdirs(reduceIn.getParent())) {
+              throw new IOException("Mkdirs failed to create "
+                  + reduceIn.getParent().toString());
+            }
+            if (!localFs.rename(mapOut, reduceIn))
+              throw new IOException("Couldn't rename " + mapOut);
+            this.mapoutputFile.removeAll(mapId);
           }
-          if (!localFs.rename(mapOut, reduceIn))
-            throw new IOException("Couldn't rename " + mapOut);
-          this.mapoutputFile.removeAll(mapId);
-        }
 
-        {
-          ReduceTask reduce = new ReduceTask(jobId, file, 
-                                             "tip_r_0001", reduceId, 0, mapIds.size());
-          JobConf localConf = new JobConf(job);
-          reduce.localizeConfiguration(localConf);
-          reduce.setConf(localConf);
-          reduce_tasks += 1;
-          myMetrics.launchReduce();
-          reduce.run(localConf, this);
-          reduce.saveTaskOutput();
-          myMetrics.completeReduce();
-          reduce_tasks -= 1;
-          updateCounters(reduce);
+          {
+            ReduceTask reduce = new ReduceTask(jobId, file, "tip_r_0001",
+                reduceId, 0, mapIds.size());
+            JobConf localConf = new JobConf(job);
+            reduce.localizeConfiguration(localConf);
+            reduce.setConf(localConf);
+            reduce_tasks += 1;
+            myMetrics.launchReduce();
+            reduce.run(localConf, this);
+            reduce.saveTaskOutput();
+            myMetrics.completeReduce();
+            reduce_tasks -= 1;
+            updateCounters(reduce);
+          }
+          this.mapoutputFile.removeAll(reduceId);
         }
-        this.mapoutputFile.removeAll(reduceId);
-        
         this.status.setRunState(JobStatus.SUCCEEDED);
 
         JobEndNotifier.localRunnerNotification(job, status);
@@ -293,4 +298,13 @@ class LocalJobRunner implements JobSubmissionProtocol {
     return TaskCompletionEvent.EMPTY_ARRAY;
   }
   
+  /**
+   * Used for formatting the id numbers
+   */
+  private static NumberFormat idFormat = NumberFormat.getInstance();
+  static {
+    idFormat.setMinimumIntegerDigits(4);
+    idFormat.setGroupingUsed(false);
+  }
+  
 }

+ 69 - 13
src/java/org/apache/hadoop/mapred/MapTask.java

@@ -115,8 +115,14 @@ class MapTask extends Task {
 
     final Reporter reporter = getReporter(umbilical);
 
-    MapOutputBuffer collector = new MapOutputBuffer(umbilical, job, reporter);
-    
+    int numReduceTasks = conf.getNumReduceTasks();
+    LOG.info("numReduceTasks: " + numReduceTasks);
+    MapOutputCollector collector = null;
+    if (numReduceTasks > 0) {
+      collector = new MapOutputBuffer(umbilical, job, reporter);
+    } else { 
+      collector = new DirectMapOutputCollector(umbilical, job, reporter);
+    }
     // reinstantiate the split
     try {
       instantiatedSplit = (InputSplit) 
@@ -177,15 +183,8 @@ class MapTask extends Task {
 
     try {
       sortProgress.start();
-      runner.run(in, collector, reporter);      // run the map
-      //check whether the length of the key/value buffer is 0. If not, then
-      //we need to spill that to disk. Note that we reset the key/val buffer
-      //upon each spill (so a length > 0 means that we have not spilled yet)
-      if (((MapOutputBuffer)collector).keyValBuffer.getLength() > 0) {
-        ((MapOutputBuffer)collector).sortAndSpillToDisk();
-      }
-      //merge the partitions from the spilled files and create one output
-      collector.mergeParts();
+      runner.run(in, collector, reporter);      
+      collector.flush();
     } finally {
       //close
       in.close();                               // close input
@@ -222,8 +221,55 @@ class MapTask extends Task {
     sortProgress.setDaemon(true);
     return sortProgress;
   }
+  
+  interface MapOutputCollector extends OutputCollector {
+
+    public void close() throws IOException;
+    
+    public void flush() throws IOException;
+        
+  }
+
+  class DirectMapOutputCollector implements MapOutputCollector {
+
+    private RecordWriter out = null;
+
+    private Reporter reporter = null;
+
+    private JobConf job;
+
+    private TaskUmbilicalProtocol umbilical;
+
+    public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
+        JobConf job, Reporter reporter) throws IOException {
+      this.umbilical = umbilical;
+      this.job = job;
+      this.reporter = reporter;
+      String finalName = getTipId();
+      FileSystem fs = FileSystem.get(this.job);
+
+      out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
+    }
 
-  class MapOutputBuffer implements OutputCollector {
+    public void close() throws IOException {
+      if (this.out != null) {
+        out.close(this.reporter);
+      }
+
+    }
+
+    public void flush() throws IOException {
+      // TODO Auto-generated method stub
+      
+    }
+
+    public void collect(WritableComparable key, Writable value) throws IOException {
+      this.out.write(key, value);
+    }
+    
+  }
+  
+  class MapOutputBuffer implements MapOutputCollector {
 
     private final int partitions;
     private Partitioner partitioner;
@@ -432,7 +478,7 @@ class MapTask extends Task {
       }
     }
     
-    public void mergeParts() throws IOException {
+    private void mergeParts() throws IOException {
       Path finalOutputFile = mapOutputFile.getOutputFile(getTaskId());
       Path finalIndexFile = mapOutputFile.getOutputIndexFile(getTaskId());
       
@@ -539,5 +585,15 @@ class MapTask extends Task {
         return super.next();
       }
     }
+
+    public void flush() throws IOException {
+      //check whether the length of the key/value buffer is 0. If not, then
+      //we need to spill that to disk. Note that we reset the key/val buffer
+      //upon each spill (so a length > 0 means that we have not spilled yet)
+      if (keyValBuffer.getLength() > 0) {
+        sortAndSpillToDisk();
+      }
+      mergeParts();
+    }
   }
 }