Ver Fonte

HADOOP-1315. Clean up contrib/streaming, switching it to use more core classes and removing unused classes. Contributed by Runping.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@534970 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting há 18 anos atrás
pai
commit
2d1ab53ba0

+ 3 - 0
CHANGES.txt

@@ -316,6 +316,9 @@ Trunk (unreleased changes)
     that killed the heartbeat monitoring thread.
     (Dhruba Borthakur via cutting)
 
+94. HADOOP-1315.  Clean up contrib/streaming, switching it to use core
+    classes more and removing unused code.  (Runping Qi via cutting)
+
 
 Release 0.12.3 - 2007-04-06
 

+ 0 - 272
src/contrib/streaming/src/java/org/apache/hadoop/streaming/CompoundDirSpec.java

@@ -1,272 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.streaming;
-
-import java.io.File;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.JobConf;
-
-/** Parses a -input <spec> that determines the DFS paths that will 
- be accessed by a MergedInputFormat.<br>
- CompoundDirSpec.getPaths() is a 2-D ragged array of DFS paths.<br>
- One of the paths is the <b>primary</b> and can contain a globbing pattern
- to match multiple files.<br>
- The other paths are <b>secondary</b> and must indicate either a directory or a single file.
- During execution secondary files are computed to be the secondary path 
- plus the primary non-qualified filename.
- Example: <tt>
- -input "/filter/colsx NULL | +/batch1/colsx/* /batch1/colsy/"
- -input "/filter/colsx NULL | +/batch2/colsx/* /batch2/colsy/"
- </tt>
- Files and contents:<tt>
- /filter/colsx/part-00000:
- /batch1/colsx/part-00000:
- /batch1/colsy/part-00000:
- /batch2/colsx/part-00000:
- /batch2/colsy/part-00000:
- </tt>
- Mapper input:<tt>
- </tt>
- Side-effect outputs with Identity "mapper":<tt>
-
- </tt>
- @author Michel Tourn
- */
-class CompoundDirSpec {
-
-  // Keep the Usage messages and docs in sync!
-  public final static String MERGEGLOB_PREFIX = "||";
-  public final static String MERGE_SEP = "|";
-  public final static String COL_SEP = " ";
-  public final static String PRIMARY_PREFIX = "+";
-
-  CompoundDirSpec(String argSpec, boolean isInputSpec) {
-    argSpec_ = argSpec;
-    isInputSpec_ = isInputSpec;
-
-    direction_ = isInputSpec_ ? "input" : "output";
-    parse();
-  }
-
-  public void parse() throws IllegalStateException {
-    String[] mergerSpecs = argSpec_.split(StreamUtil.regexpEscape(MERGE_SEP));
-
-    int msup = mergerSpecs.length;
-    paths_ = new String[msup][];
-
-    if (msup == 0) {
-      throw new IllegalStateException("A -" + direction_ + " spec needs at list one path");
-    }
-    if (false == isInputSpec_) {
-      if (msup > 1) {
-        throw new IllegalStateException("A -output spec cannot use merged streams ('" + MERGE_SEP
-                                        + "' delimiter)");
-      }
-    }
-    for (int m = 0; m < msup; m++) {
-      String merged = mergerSpecs[m];
-      merged = merged.trim();
-      String[] colSpecs = merged.split(StreamUtil.regexpEscape(COL_SEP));
-      int csup = colSpecs.length;
-      if (csup == 0) {
-        throw new IllegalStateException("A -input spec needs at list one path spec per |<column>|");
-      }
-      paths_[m] = new String[csup];
-      for (int c = 0; c < csup; c++) {
-        String spec = colSpecs[c];
-        if (spec.startsWith(PRIMARY_PREFIX)) {
-          // for (!isInputSpec_) the tuple paths should be symmetric.
-          // but we still allow user to specify one in case setOutputDir makes a difference
-          if (prow_ != NA) {
-            throwBadNumPrimaryInputSpecs();
-          }
-          spec = spec.substring(PRIMARY_PREFIX.length());
-          prow_ = m;
-          pcol_ = c;
-        }
-        paths_[m][c] = spec;
-      }
-    }
-    if (prow_ == NA) {
-      if (!isInputSpec_) {
-        // pick an 'arbitrary' one -- the tuple paths should be symmetric.
-        prow_ = 0;
-        pcol_ = 0;
-      } else if (msup == 1 && paths_[0].length == 1) {
-        // pick the only one available. That's also bw-compatible syntax
-        prow_ = 0;
-        pcol_ = 0;
-      } else {
-        throwBadNumPrimaryInputSpecs();
-      }
-    }
-  }
-
-  void throwBadNumPrimaryInputSpecs() throws IllegalStateException {
-    String msg = "A compound -input spec needs exactly one primary path prefixed with "
-      + PRIMARY_PREFIX;
-    msg += ":\n";
-    msg += toTableString();
-    throw new IllegalStateException(msg);
-  }
-
-  // TBD need to decide early whether they are dirs or files or globs?
-  public void validatePaths(FileSystem fs) {
-    int rsup = paths_.length;
-    for (int r = 0; r < rsup; r++) {
-      int csup = paths_[r].length;
-      for (int c = 0; c < csup; c++) {
-        String path = paths_[r][c];
-      }
-    }
-  }
-
-  public int primaryRow() {
-    return prow_;
-  }
-
-  public int primaryCol() {
-    return pcol_;
-  }
-
-  public String primarySpec() {
-    return paths_[prow_][pcol_];
-  }
-  
-  /*
-    Example input spec in table form:
-    <1 +[/input/part-00] 
-    <2  [/input/part-01] 
-    <3  [/input/part-02] 
-    Example output spec in table form:
-    +[/my.output] 
-  */
-  public String toTableString() {
-    StringBuffer buf = new StringBuffer();
-    int maxWid = 0;
-    for (int pass = 1; pass <= 2; pass++) {
-      int rsup = paths_.length;
-      for (int r = 0; r < rsup; r++) {
-        int csup = paths_[r].length;
-        for (int c = 0; c < csup; c++) {
-          String cell = "[" + paths_[r][c] + "]";
-          if (r == prow_ && c == pcol_) {
-            cell = PRIMARY_PREFIX + cell;
-          } else {
-            cell = StreamUtil.rjustify(cell, cell.length() + PRIMARY_PREFIX.length());
-          }
-          if (isInputSpec_) {
-            // channels are for tagged input streams: r-based
-            if (rsup > 1) {
-              String channel = "<" + (r + 1);
-              cell = channel + " " + cell;
-            }
-          } else {
-            // channels are for columns (multiple files) c-based
-            if (csup > 1) {
-              String channel = ">" + (c + 1);
-              cell = channel + " " + cell;
-            }
-          }
-          if (pass == 2) {
-            cell = StreamUtil.ljustify(cell, maxWid);
-            buf.append(cell);
-            buf.append(" ");
-          } else {
-            if (cell.length() > maxWid) {
-              maxWid = cell.length();
-            }
-          }
-        }
-        if (pass == 2) {
-          buf.append("\n");
-        }
-      }
-    }
-    return buf.toString();
-  }
-
-  /** 
-      @see #primaryRow 
-      @see #primaryCol
-  */
-  public String[][] getPaths() {
-    return paths_;
-  }
-
-  // ==== Static helpers that depend on a JobConf. ====
-  
-  // Unlike CompoundDirSpec.parse() which is reexecuted at Task runtime,
-  // this is expanded once in advance and relies on client-side DFS access.
-  // Main reason is we need to choose a primary input file at submission time. 
-  public static String expandGlobInputSpec(String inputSpec, JobConf job)
-  {
-    inputSpec = inputSpec.trim();
-    if (!inputSpec.startsWith(MERGEGLOB_PREFIX)) {
-      return inputSpec;
-    }
-    inputSpec = inputSpec.substring(MERGEGLOB_PREFIX.length());
-    // TODO use upcoming DFSShell wildcarding code..
-    return inputSpec;
-  }
-  
-  // find the -input statement that contains the job's split
-  // TODO test with globbing / directory /single file
-  public static CompoundDirSpec findInputSpecForPrimary(String primary, JobConf job) {
-    int num = job.getInt("stream.numinputspecs", -1);
-    for (int s = 0; s < num; s++) {
-      String specStr = job.get("stream.inputspecs." + s);
-      CompoundDirSpec spec = new CompoundDirSpec(specStr, true);
-      if (pathsMatch(spec.primarySpec(), primary, job)) {
-        return spec;
-      }
-    }
-    return null;
-  }
-
-  // There can be only one output spec but this provides some server-side validation
-  public static CompoundDirSpec findOutputSpecForPrimary(String primary, JobConf job) {
-    String specStr = job.get("stream.outputspec");
-    CompoundDirSpec spec = new CompoundDirSpec(specStr, false);
-    if (pathsMatch(spec.primarySpec(), primary, job)) {
-      return spec;
-    }
-    return spec;
-  }
-
-  static boolean pathsMatch(String s1, String s2, JobConf job) {
-    boolean isLocalFS = job.get("fs.default.name", "").equals("local");
-    if (isLocalFS) {
-      s1 = StreamUtil.safeGetCanonicalPath(new File(s1));
-      s2 = StreamUtil.safeGetCanonicalPath(new File(s2));
-    }
-    return (s1.equals(s2));
-  }
-
-  final static int NA = -1;
-
-  String argSpec_;
-  boolean isInputSpec_;
-
-  String direction_;
-  String[][] paths_;
-  int prow_ = NA;
-  int pcol_ = NA;
-}

+ 0 - 344
src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java

@@ -1,344 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.streaming;
-
-import java.io.IOException;
-import java.io.ByteArrayOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.util.ArrayList;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.PriorityQueue;
-
-/**
- Eventually will be fed TupleInputFormats. 
- For now will be fed primitive InputFormats.
- @author Michel Tourn
- */
-public class MergerInputFormat extends InputFormatBase {
-
-  public MergerInputFormat() {
-  }
-
-  void checkReady(FileSystem fs, JobConf job) {
-    if (ready_) {
-      // could complain if fs / job changes
-      return;
-    }
-    fs_ = fs;
-    job_ = job;
-    debug_ = (job.get("stream.debug") != null);
-
-    String someInputSpec = job_.get("stream.inputspecs.0");
-    CompoundDirSpec someSpec = new CompoundDirSpec(someInputSpec, true);
-    fmts_ = new ArrayList();
-    int n = someSpec.paths_.length;
-    inputTagged_ = job.getBoolean("stream.inputtagged", false);
-    //  0 is primary
-    //  Curr. secondaries are NOT used for getSplits(), only as RecordReader factory
-    for (int i = 0; i < n; i++) {
-      // this ignores -inputreader.. 
-      // That's why if hasSimpleInputSpecs_=true (n=1) then StreamJob will set
-      // the top-level format to StreamInputFormat rather than MergeInputFormat.
-      // So we only support custom -inputformat for n=1. 
-      // Probably OK for now since custom inputformats would be constrained (no \t and \n in payload) 
-      fmts_.add(new StreamInputFormat()); // will be TupleInputFormat
-    }
-    primary_ = (InputFormat) fmts_.get(0);
-    ready_ = true;
-  }
-
-  /** This implementation always returns true. */
-  public boolean[] areValidInputDirectories(FileSystem fileSys, Path[] inputDirs) throws IOException {
-    // must do this without JobConf...
-    boolean[] b = new boolean[inputDirs.length];
-    for (int i = 0; i < inputDirs.length; ++i) {
-      b[i] = true;
-    }
-    return b;
-  }
-
-  /** Delegate to the primary InputFormat. 
-      Force full-file splits since there's no index to sync secondaries.
-      (and if there was, this index may need to be created for the first time
-      full file at a time...   )
-  */
-  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-    return ((StreamInputFormat) primary_).getSplits(job, numSplits);
-  }
-
-  /**
-   */
-  public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
-    FileSystem fs = ((FileSplit) split).getPath().getFileSystem(job);
-    checkReady(fs, job);
-
-    reporter.setStatus(split.toString());
-
-    ArrayList readers = new ArrayList();
-    String primary = ((FileSplit) split).getPath().toString();
-    CompoundDirSpec spec = CompoundDirSpec.findInputSpecForPrimary(primary, job);
-    if (spec == null) {
-      throw new IOException("Did not find -input spec in JobConf for primary:" + primary);
-    }
-    for (int i = 0; i < fmts_.size(); i++) {
-      InputFormat f = (InputFormat) fmts_.get(i);
-      Path path = new Path(spec.getPaths()[i][0]);
-      FileSplit fsplit = makeFullFileSplit(path);
-      RecordReader r = f.getRecordReader(fsplit, job, reporter);
-      readers.add(r);
-    }
-
-    return new MergedRecordReader(readers);
-  }
-
-  private FileSplit makeFullFileSplit(Path path) throws IOException {
-    long len = fs_.getLength(path);
-    return new FileSplit(path, 0, len, job_);
-  }
-
-  /*
-    private FileSplit relatedSplit(FileSplit primarySplit, int i, CompoundDirSpec spec) throws IOException
-    {
-    if (i == 0) {
-    return primarySplit;
-    }
-
-    // TODO based on custom JobConf (or indirectly: InputFormat-s?)
-    String path = primarySplit.getFile().getAbsolutePath();
-    Path rpath = new Path(path + "." + i);
-
-    long rlength = fs_.getLength(rpath);
-    FileSplit related = new FileSplit(rpath, 0, rlength);
-    return related;    
-    }*/
-
-  class MergedRecordReader implements RecordReader {
-
-    MergedRecordReader(ArrayList/*<RecordReader>*/readers) throws IOException {
-      try {
-        readers_ = readers;
-        primaryReader_ = (RecordReader) readers.get(0);
-        q_ = new MergeQueue(readers.size(), debug_);
-        for (int i = 0; i < readers_.size(); i++) {
-          RecordReader reader = (RecordReader) readers.get(i);
-          WritableComparable k = (WritableComparable) job_.getInputKeyClass().newInstance();
-          Writable v = (Writable) job_.getInputValueClass().newInstance();
-          MergeRecordStream si = new MergeRecordStream(i, reader, k, v);
-          if (si.next()) {
-            q_.add(si);
-          }
-        }
-      } catch (Exception e) {
-        e.printStackTrace();
-        throw new IOException(e.toString());
-      }
-    }
-
-    // 1. implements RecordReader
-
-    public boolean next(Writable key, Writable value) throws IOException {
-      boolean more = (q_.size() > 0);
-      if (!more) return false;
-
-      MergeRecordStream ms = (MergeRecordStream) q_.top();
-      int keyTag = inputTagged_ ? (ms.index_ + 1) : NOTAG;
-      assignTaggedWritable(key, ms.k_, keyTag);
-      assignTaggedWritable(value, ms.v_, NOTAG);
-
-      if (ms.next()) { // has another entry
-        q_.adjustTop();
-      } else {
-        q_.pop(); // done with this file
-        if (ms.reader_ == primaryReader_) {
-          primaryClosed_ = true;
-          primaryLastPos_ = primaryReader_.getPos();
-        }
-        ms.reader_.close();
-      }
-      return true;
-    }
-
-    public long getPos() throws IOException {
-      if (primaryClosed_) {
-        return primaryLastPos_;
-      } else {
-        return primaryReader_.getPos();
-      }
-    }
-
-    public float getProgress() throws IOException {
-      if (primaryClosed_) {
-        return 1.0f;
-      } else {
-        return primaryReader_.getProgress();
-      }
-    }
-    
-    public void close() throws IOException {
-      IOException firstErr = null;
-
-      for (int i = 0; i < readers_.size(); i++) {
-        RecordReader r = (RecordReader) readers_.get(i);
-        try {
-          r.close();
-        } catch (IOException io) {
-          io.printStackTrace();
-          if (firstErr == null) {
-            firstErr = io;
-          }
-        }
-      }
-      if (firstErr != null) {
-        throw firstErr;
-      }
-    }
-
-    public WritableComparable createKey() {
-      return new Text();
-    }
-
-    public Writable createValue() {
-      return new Text();
-    }
-
-    // 2. utilities
-
-    final static int NOTAG = -1;
-
-    private void assignTaggedWritable(Writable dst, Writable src, int tag) {
-      try {
-        outBuf.reset();
-        if (tag != NOTAG) {
-          if (src instanceof UTF8) {
-            src = new UTF8(">" + tag + "\t" + src.toString()); // breaks anything?
-          } else if (src instanceof Text) {
-            src = new Text(">" + tag + "\t" + src.toString()); // breaks anything?
-          } else {
-            throw new UnsupportedOperationException("Cannot use with tags with key class "
-                                                    + src.getClass());
-          }
-        }
-        src.write(outBuf);
-        inBuf.reset(outBuf.getData(), outBuf.getLength());
-        dst.readFields(inBuf); // throws..
-      } catch (IOException io) {
-        // streams are backed by buffers, but buffers can run out
-        throw new IllegalStateException(io);
-      }
-    }
-
-    private DataInputBuffer inBuf = new DataInputBuffer();
-    private DataOutputBuffer outBuf = new DataOutputBuffer();
-
-    ArrayList/*<RecordReader>*/readers_;
-
-    RecordReader primaryReader_;
-    boolean primaryClosed_;
-    long primaryLastPos_;
-
-    MergeQueue q_;
-
-  }
-
-  boolean ready_;
-  FileSystem fs_;
-  JobConf job_;
-  boolean debug_;
-
-  // we need the JobConf: the other delegated InputFormat-s 
-  // will only be created in the delegator RecordReader
-  InputFormat primary_;
-  boolean inputTagged_;
-  ArrayList/*<InputFormat>*/fmts_;
-}
-
-class MergeQueue extends PriorityQueue // <MergeRecordStream>
-{
-
-  private boolean done;
-  private boolean debug;
-
-  public void add(MergeRecordStream reader) throws IOException {
-    super.put(reader);
-  }
-
-  public MergeQueue(int size, boolean debug) throws IOException {
-    initialize(size);
-    this.debug = debug;
-  }
-
-  protected boolean lessThan(Object a, Object b) {
-    MergeRecordStream ra = (MergeRecordStream) a;
-    MergeRecordStream rb = (MergeRecordStream) b;
-    int cp = ra.k_.compareTo(rb.k_);
-    if (debug) {
-      System.err.println("MergerInputFormat:lessThan " + ra.k_ + ", " + rb.k_ + " cp=" + cp);
-    }
-    if (cp == 0) {
-      return (ra.index_ < rb.index_);
-    } else {
-      return (cp < 0);
-    }
-  }
-
-  public void close() throws IOException {
-    IOException firstErr = null;
-    MergeRecordStream mr;
-    while ((mr = (MergeRecordStream) pop()) != null) {
-      try {
-        mr.reader_.close();
-      } catch (IOException io) {
-        io.printStackTrace();
-        if (firstErr == null) {
-          firstErr = io;
-        }
-      }
-    }
-    if (firstErr != null) {
-      throw firstErr;
-    }
-  }
-}
-
-class MergeRecordStream {
-
-  int index_;
-  RecordReader reader_;
-  WritableComparable k_;
-  Writable v_;
-
-  public MergeRecordStream(int index, RecordReader reader, WritableComparable k, Writable v)
-    throws IOException {
-    index_ = index;
-    reader_ = reader;
-    k_ = k;
-    v_ = v;
-  }
-
-  public boolean next() throws IOException {
-    boolean more = reader_.next(k_, v_);
-    return more;
-  }
-}

+ 0 - 174
src/contrib/streaming/src/java/org/apache/hadoop/streaming/MuxOutputFormat.java

@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.streaming;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * A multiplexed OutputFormat. The channel choice is encoded within the key.
- * If channels are fed at the same rate then the data can be read back in 
- * with a TupleInputFormat. (in a different Job)
- * @see TupleInputFormat 
- * @author Michel Tourn
- */
-public class MuxOutputFormat implements OutputFormat {
-
-  public RecordWriter getRecordWriter(FileSystem fs, JobConf job, String name, Progressable progr) throws IOException {
-    fs_ = fs;
-    job_ = job;
-
-    String primary = job.getOutputPath().toString();
-    CompoundDirSpec spec = CompoundDirSpec.findOutputSpecForPrimary(primary, job);
-    if (spec == null) {
-      throw new IOException("Did not find -output spec in JobConf for primary:" + primary);
-    }
-    String[] outPaths = spec.getPaths()[0];
-    int n = outPaths.length;
-    RecordWriter[] writers = new RecordWriter[n];
-    Path[] paths = new Path[n];
-    for (int i = 0; i < n; i++) {
-      OutputFormat f = new StreamOutputFormat(); // the only one supported
-      writers[i] = f.getRecordWriter(fs, job, name, progr);
-      paths[i] = new Path(outPaths[i], name); // same leaf name in different dir
-    }
-    return new MuxRecordWriter(writers, paths);
-  }
-
-  class MuxRecordWriter implements RecordWriter {
-
-    MuxRecordWriter(RecordWriter[] writers, Path[] paths) throws IOException {
-      writers_ = writers;
-      paths_ = paths;
-      numChannels_ = writers_.length;
-      out_ = new FSDataOutputStream[numChannels_];
-      for (int i = 0; i < out_.length; i++) {
-        System.err.println("MuxRecordWriter [" + i + "] create: " + paths[i]);
-        out_[i] = fs_.create(paths[i]);
-      }
-    }
-
-    final static int ONE_BASED = 1;
-    final static char CHANOUT = '>';
-    final static char CHANIN = '<';
-    final static String BADCHANOUT = "Invalid output channel spec: ";
-
-    int parseOutputChannel(String s, int max) throws IOException {
-      try {
-        if (s.charAt(s.length() - 1) != CHANOUT) {
-          throw new IOException(BADCHANOUT + s);
-        }
-        String s1 = s.substring(0, s.length() - 1);
-        int c = Integer.parseInt(s1);
-        if (c < 1 || c > max) {
-          String msg = "Output channel '" + s + "': must be an integer between 1 and " + max
-            + " followed by '" + CHANOUT + "' and TAB";
-          throw new IndexOutOfBoundsException(msg);
-        }
-        return c;
-      } catch (Exception e) {
-        throw new IOException(BADCHANOUT + s + " cause:" + e);
-      }
-    }
-
-    // TODO after Text patch, share code with StreamLineRecordReader.next()
-    void splitFirstTab(String input, UTF8 first, UTF8 second) {
-      int tab = input.indexOf('\t');
-      if (tab == -1) {
-        ((UTF8) first).set(input);
-        ((UTF8) second).set("");
-      } else {
-        ((UTF8) first).set(input.substring(0, tab));
-        ((UTF8) second).set(input);
-      }
-
-    }
-
-    void writeKeyTabVal(Writable key, Writable val, FSDataOutputStream out) throws IOException {
-      out.write(key.toString().getBytes("UTF-8"));
-      out.writeByte('\t');
-      out.write(val.toString().getBytes("UTF-8"));
-      out.writeByte('\n');
-    }
-
-    public void write(WritableComparable key, Writable value) throws IOException {
-      // convention: Application code must put a channel spec in first column
-      // iff there is more than one (output) channel
-      if (numChannels_ == 1) {
-        writeKeyTabVal(key, value, out_[0]);
-      } else {
-        // StreamInputFormat does not know about channels 
-        // Now reinterpret key as channel and split value as new key-value
-        // A more general mechanism would still require Reader classes to know about channels. 
-        // (and encode it as part of key or value)
-        int channel = parseOutputChannel(key.toString(), numChannels_);
-        FSDataOutputStream oi = out_[channel - ONE_BASED];
-        splitFirstTab(value.toString(), key2, val2);
-        writeKeyTabVal(key2, val2, oi);
-      }
-    }
-
-    public void close(Reporter reporter) throws IOException {
-      IOException firstErr = null;
-
-      for (int i = 0; i < writers_.length; i++) {
-        FSDataOutputStream oi = out_[i];
-        RecordWriter r = writers_[i];
-        try {
-          oi.close();
-          r.close(reporter);
-        } catch (IOException io) {
-          System.err.println("paths_[" + i + "]: " + paths_[i]);
-          io.printStackTrace();
-          if (firstErr == null) {
-            firstErr = io;
-          }
-        }
-      }
-      if (firstErr != null) {
-        throw firstErr;
-      }
-    }
-
-    UTF8 key2 = new UTF8();
-    UTF8 val2 = new UTF8();
-
-    RecordWriter[] writers_;
-    Path[] paths_;
-    int numChannels_;
-    FSDataOutputStream[] out_;
-  }
-
-  public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException {
-    // allow existing data (for app-level restartability)
-  }
-
-  FileSystem fs_;
-  JobConf job_;
-}

+ 0 - 63
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java

@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.streaming;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.Iterator;
-import java.net.URLDecoder;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.OutputCollector;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/** A generic Combiner bridge.<br>
- *  To use a Combiner specify -combiner myprogram in hadoopStreaming.
- *  It delegates operations to an external program via stdin and stdout.
- *  In one run of the external program, you can expect all records with
- *  the same key to appear together.
- *  You should not make assumptions about how many times the combiner is
- *  run on your data.
- *  Ideally the combiner and the reducer are the same program, the combiner
- *  partially aggregates the data zero or more times and the reducer
- *  applies the last aggregation pass.
- *  Do not use a Combiner if your reduce logic does not suport
- *  such a multipass aggregation.
- *  @author Michel Tourn
- */
-public class PipeCombiner extends PipeReducer {
-
-  String getPipeCommand(JobConf job) {
-    String str = job.get("stream.combine.streamprocessor");
-    if (str == null) {
-      System.err.println("X1003");
-      return str;
-    }
-    try {
-      return URLDecoder.decode(str, "UTF-8");
-    } catch (UnsupportedEncodingException e) {
-      System.err.println("stream.combine.streamprocessor in jobconf not found");
-      return null;
-    }
-  }
-
-}

+ 7 - 216
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java

@@ -19,8 +19,6 @@
 package org.apache.hadoop.streaming;
 
 import java.io.*;
-import java.net.Socket;
-import java.net.URI;
 import java.nio.charset.CharacterCodingException;
 import java.io.IOException;
 import java.util.Date;
@@ -29,14 +27,11 @@ import java.util.Iterator;
 import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Properties;
-import java.util.regex.*;
 
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.PhasedFileSystem;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.util.StringUtils;
@@ -45,7 +40,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 
 /** Shared functionality for PipeMapper, PipeReducer.
@@ -60,60 +54,12 @@ public abstract class PipeMapRed {
    */
   abstract String getPipeCommand(JobConf job);
 
-  /*
-   */
-  abstract String getKeyColPropName();
-
   abstract char getFieldSeparator();
   
   abstract int getNumOfKeyFields();
-  
-  /** Write output as side-effect files rather than as map outputs.
-      This is useful to do "Map" tasks rather than "MapReduce" tasks. */
-  boolean getUseSideEffect() {
-    return false;
-  }
 
   abstract boolean getDoPipe();
 
-  /**
-   * @returns how many TABS before the end of the key part
-   * usually: 1 or "ALL"
-   * used for tool output of both Map and Reduce
-   * configured via tool's argv: splitKeyVal=ALL or 1..
-   * although it is interpreted here, not by tool
-   */
-  int getKeyColsFromPipeCommand(String cmd) {
-    String key = getKeyColPropName();
-    Pattern kcPat = Pattern.compile(".*" + key + "=([^\\s]*).*");
-    Matcher match = kcPat.matcher(cmd);
-    String kc;
-    if (!match.matches()) {
-      kc = null;
-    } else {
-      kc = match.group(1);
-    }
-
-    int cols;
-    if (kc == null) {
-      // default value is 1 and the Stream applications could instead
-      // add/remove the \t separator on lines to get the same effect as value 0, 1, ALL
-      cols = 1;
-    } else if (kc.equals("ALL")) {
-      cols = ALL_COLS;
-    } else {
-      try {
-        cols = Integer.parseInt(kc);
-      } catch (NumberFormatException nf) {
-        cols = Integer.MAX_VALUE;
-      }
-    }
-
-    System.out.println("getKeyColsFromPipeCommand:" + key + " parse:" + cols + " from cmd=" + cmd);
-
-    return cols;
-  }
-
   final static int OUTSIDE = 1;
   final static int SINGLEQ = 2;
   final static int DOUBLEQ = 3;
@@ -164,54 +110,15 @@ public abstract class PipeMapRed {
     return (String[]) argList.toArray(new String[0]);
   }
 
-  OutputStream getURIOutputStream(URI uri, boolean allowSocket) throws IOException {
-    final String SOCKET = "socket";
-    if (uri.getScheme().equals(SOCKET)) {
-      if (!allowSocket) {
-        throw new IOException(SOCKET + " not allowed on outputstream " + uri);
-      }
-      final Socket sock = new Socket(uri.getHost(), uri.getPort());
-      OutputStream out = new FilterOutputStream(sock.getOutputStream()) {
-          public void close() throws IOException {
-            sock.close();
-            super.close();
-          }
-        };
-      return out;
-    } else {
-      // a FSDataOutputStreamm, localFS or HDFS.
-      // localFS file may be set up as a FIFO.
-      return sideFs_.create(new Path(uri.getSchemeSpecificPart()));
-    }
-  }
-
-  String getSideEffectFileName() {
-    FileSplit split = StreamUtil.getCurrentSplit(job_);
-    return new String(split.getPath().getName() + "-" + split.getStart() + 
-                      "-" + split.getLength());
-  }
-
   public void configure(JobConf job) {
     try {
       String argv = getPipeCommand(job);
 
-      keyCols_ = getKeyColsFromPipeCommand(argv);
-
-      debug_ = (job.get("stream.debug") != null);
-      if (debug_) {
-        System.out.println("PipeMapRed: stream.debug=true");
-      }
-
       joinDelay_ = job.getLong("stream.joindelay.milli", 0);
 
       job_ = job;
       fs_ = FileSystem.get(job_);
-      if (job_.getBoolean("stream.sideoutput.localfs", false)) {
-        //sideFs_ = new LocalFileSystem(job_);
-        sideFs_ = FileSystem.getLocal(job_);
-      } else {
-        sideFs_ = fs_;
-      }
+
       String mapOutputFieldSeparator = job_.get("stream.map.output.field.separator", "\t");
       String reduceOutputFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t");
       this.mapOutputFieldSeparator = mapOutputFieldSeparator.charAt(0);
@@ -219,22 +126,12 @@ public abstract class PipeMapRed {
       this.numOfMapOutputKeyFields = job_.getInt("stream.num.map.output.key.fields", 1);
       this.numOfReduceOutputKeyFields = job_.getInt("stream.num.reduce.output.key.fields", 1);
       
-      if (debug_) {
-        System.out.println("kind   :" + this.getClass());
-        System.out.println("split  :" + StreamUtil.getCurrentSplit(job_));
-        System.out.println("fs     :" + fs_.toString());
-        System.out.println("sideFs :" + sideFs_.toString());
-      }
 
       doPipe_ = getDoPipe();
       if (!doPipe_) return;
 
       setStreamJobDetails(job);
-      setStreamProperties();
-
-      if (debugFailEarly_) {
-        throw new RuntimeException("debugFailEarly_");
-      }
+      
       String[] argvSplit = splitArgs(argv);
       String prog = argvSplit[0];
       File currentDir = new File(".").getAbsoluteFile();
@@ -245,39 +142,6 @@ public abstract class PipeMapRed {
         FileUtil.chmod(new File(jobCacheDir, prog).toString(), "a+x");
       }
 
-      if (job_.getInputValueClass().equals(BytesWritable.class)) {
-        // TODO expose as separate config:
-        // job or semistandard inputformat property
-        optUseKey_ = false;
-      }
-
-      optSideEffect_ = getUseSideEffect();
-
-      if (optSideEffect_) {
-        // during work: use a completely unique filename to avoid HDFS namespace conflicts
-        // after work: rename to a filename that depends only on the workload (the FileSplit)
-        //   it's a friendly name and in case of reexecution it will clobber. 
-        // reexecution can be due to: other job, failed task and speculative task
-        // See StreamJob.setOutputSpec(): if reducerNone_ aka optSideEffect then: 
-        // client has renamed outputPath and saved the argv's original output path as:
-        if (useSingleSideOutputURI_) {
-          finalOutputURI = new URI(sideOutputURI_);
-          sideEffectPathFinal_ = null; // in-place, no renaming to final
-        } else {
-          sideFs_ = new PhasedFileSystem(sideFs_, job);
-          String sideOutputPath = job_.get("stream.sideoutput.dir"); // was: job_.getOutputPath() 
-          String fileName = getSideEffectFileName(); // see HADOOP-444 for rationale
-          sideEffectPathFinal_ = new Path(sideOutputPath, fileName);
-          finalOutputURI = new URI(sideEffectPathFinal_.toString()); // implicit dfs: 
-        }
-        // apply default scheme
-        if (finalOutputURI.getScheme() == null) {
-          finalOutputURI = new URI("file", finalOutputURI.getSchemeSpecificPart(), null);
-        }
-        boolean allowSocket = useSingleSideOutputURI_;
-        sideEffectOut_ = getURIOutputStream(finalOutputURI, allowSocket);
-      }
-
       // 
       // argvSplit[0]:
       // An absolute path should be a preexisting valid path on all TaskTrackers
@@ -295,8 +159,6 @@ public abstract class PipeMapRed {
         f = null;
       }
       logprintln("PipeMapRed exec " + Arrays.asList(argvSplit));
-      logprintln("sideEffectURI_=" + finalOutputURI);
-
       Environment childEnv = (Environment) StreamUtil.env().clone();
       addJobConfToEnvironment(job_, childEnv);
       addEnvironment(childEnv, job_.get("stream.addenvironment"));
@@ -327,34 +189,6 @@ public abstract class PipeMapRed {
       logprintln("JobConf set minRecWrittenToEnableSkip_ =" + minRecWrittenToEnableSkip_);
     }
     taskId_ = StreamUtil.getTaskInfo(job_);
-    debugFailEarly_ = isDebugFail("early");
-    debugFailDuring_ = isDebugFail("during");
-    debugFailLate_ = isDebugFail("late");
-
-    sideOutputURI_ = job_.get("stream.sideoutput.uri");
-    useSingleSideOutputURI_ = (sideOutputURI_ != null);
-  }
-
-  boolean isDebugFail(String kind) {
-    String execidlist = job_.get("stream.debugfail.reexec." + kind);
-    if (execidlist == null) {
-      return false;
-    }
-    String[] e = execidlist.split(",");
-    for (int i = 0; i < e.length; i++) {
-      int ei = Integer.parseInt(e[i]);
-      if (taskId_.execid == ei) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  void setStreamProperties() {
-    String s = System.getProperty("stream.port");
-    if (s != null) {
-      reportPortPlusOne_ = Integer.parseInt(s);
-    }
   }
 
   void logStackTrace(Exception e) {
@@ -442,7 +276,6 @@ public abstract class PipeMapRed {
     if (log_ != null) {
       StreamUtil.exec("/bin/rm " + LOGNAME, log_);
     }
-    // TODO socket-based aggregator (in JobTrackerInfoServer)
   }
 
   void startOutputThreads(OutputCollector output, Reporter reporter) {
@@ -474,14 +307,7 @@ public abstract class PipeMapRed {
    * @throws IOException
    */
   void splitKeyVal(byte[] line, Text key, Text val) throws IOException {
-    int pos = -1;
-    if (keyCols_ != ALL_COLS) {
-      pos = UTF8ByteArrayUtils.findNthByte(line, (byte)this.getFieldSeparator(), this.getNumOfKeyFields());
-    }
-    LOG.info("FieldSeparator: " + this.getFieldSeparator());
-    LOG.info("NumOfKeyFields: " + this.getNumOfKeyFields());
-    LOG.info("Line: " + new String (line));
-    LOG.info("Pos: " + pos);
+    int pos = UTF8ByteArrayUtils.findNthByte(line, (byte)this.getFieldSeparator(), this.getNumOfKeyFields());
     try {
       if (pos == -1) {
         key.set(line);
@@ -508,15 +334,10 @@ public abstract class PipeMapRed {
         Text val = new Text();
         // 3/4 Tool to Hadoop
         while ((answer = UTF8ByteArrayUtils.readLine((InputStream) clientIn_)) != null) {
-          // 4/4 Hadoop out
-          if (optSideEffect_) {
-            sideEffectOut_.write(answer);
-            sideEffectOut_.write('\n');
-            sideEffectOut_.flush();
-          } else {
-            splitKeyVal(answer, key, val);
-            output.collect(key, val);
-          }
+          
+          splitKeyVal(answer, key, val);
+          output.collect(key, val);
+          
           numRecWritten_++;
           long now = System.currentTimeMillis();
           if (now-lastStdoutReport > reporterOutDelay_) {
@@ -584,18 +405,6 @@ public abstract class PipeMapRed {
       } catch (IOException io) {
       }
       waitOutputThreads();
-      try {
-        if (optSideEffect_) {
-          logprintln("closing " + finalOutputURI);
-          if (sideEffectOut_ != null) sideEffectOut_.close();
-          logprintln("closed  " + finalOutputURI);
-          if (!useSingleSideOutputURI_) {
-            ((PhasedFileSystem)sideFs_).commit(); 
-          }
-        }
-      } catch (IOException io) {
-        io.printStackTrace();
-      }
       if (sim != null) sim.destroy();
     } catch (RuntimeException e) {
       logStackTrace(e);
@@ -692,18 +501,11 @@ public abstract class PipeMapRed {
   
   long minRecWrittenToEnableSkip_ = Long.MAX_VALUE;
 
-  int keyCols_;
-  final static int ALL_COLS = Integer.MAX_VALUE;
-
   long reporterOutDelay_ = 10*1000L; 
   long reporterErrDelay_ = 10*1000L; 
   long joinDelay_;
   JobConf job_;
   FileSystem fs_;
-  FileSystem sideFs_;
-
-  // generic MapRed parameters passed on by hadoopStreaming
-  int reportPortPlusOne_;
 
   boolean doPipe_;
   boolean debug_;
@@ -724,17 +526,6 @@ public abstract class PipeMapRed {
   int numExceptions_;
   StreamUtil.TaskId taskId_;
 
-  boolean optUseKey_ = true;
-
-  private boolean optSideEffect_;
-  private URI finalOutputURI;
-  private Path sideEffectPathFinal_;
-
-  private boolean useSingleSideOutputURI_;
-  private String sideOutputURI_;
-
-  private OutputStream sideEffectOut_;
-
   protected volatile Throwable outerrThreadsThrowable;
 
   String LOGNAME;

+ 11 - 9
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java

@@ -25,8 +25,10 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.util.StringUtils;
 
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
 
@@ -36,6 +38,8 @@ import org.apache.hadoop.io.Writable;
  */
 public class PipeMapper extends PipeMapRed implements Mapper {
 
+  private boolean ignoreKey = false;
+  
   String getPipeCommand(JobConf job) {
     String str = job.get("stream.map.streamprocessor");
     if (str == null) {
@@ -50,17 +54,15 @@ public class PipeMapper extends PipeMapRed implements Mapper {
     }
   }
 
-  String getKeyColPropName() {
-    return "mapKeyCols";
-  }
-
-  boolean getUseSideEffect() {
-    return StreamUtil.getUseMapSideEffect(job_);
-  }
-
   boolean getDoPipe() {
     return true;
   }
+  
+  public void configure(JobConf job) {
+    super.configure(job);
+    String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
+    this.ignoreKey = inputFormatClassName.equals(TextInputFormat.class.getCanonicalName());
+  }
 
   // Do NOT declare default constructor
   // (MapRed creates it reflectively)
@@ -86,7 +88,7 @@ public class PipeMapper extends PipeMapRed implements Mapper {
 
       // 2/4 Hadoop to Tool
       if (numExceptions_ == 0) {
-        if (optUseKey_) {
+        if (!this.ignoreKey) {
           write(key);
           clientOut_.write('\t');
         }

+ 0 - 4
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java

@@ -57,10 +57,6 @@ public class PipeReducer extends PipeMapRed implements Reducer {
     return (argv != null) && !StreamJob.REDUCE_NONE.equals(argv);
   }
 
-  String getKeyColPropName() {
-    return "reduceKeyCols";
-  }
-
   public void reduce(WritableComparable key, Iterator values, OutputCollector output,
                      Reporter reporter) throws IOException {
 

+ 40 - 147
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java

@@ -22,7 +22,6 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -41,7 +40,6 @@ import org.apache.commons.cli2.commandline.Parser;
 import org.apache.commons.cli2.option.PropertyOption;
 import org.apache.commons.cli2.resource.ResourceConstants;
 import org.apache.commons.cli2.util.HelpFormatter;
-import org.apache.commons.cli2.validation.FileValidator;
 import org.apache.commons.cli2.validation.InvalidArgumentException;
 import org.apache.commons.cli2.validation.Validator;
 import org.apache.commons.logging.*;
@@ -61,6 +59,7 @@ import org.apache.hadoop.mapred.KeyValueTextInputFormat;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileAsTextInputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
 import org.apache.hadoop.filecache.*;
 import org.apache.hadoop.util.*;
@@ -73,8 +72,7 @@ public class StreamJob {
 
   protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName());
   final static String REDUCE_NONE = "NONE";
-  private boolean reducerNone_;
-
+    
   /** -----------Streaming CLI Implementation  **/
   private DefaultOptionBuilder builder = 
     new DefaultOptionBuilder("-","-", false);
@@ -214,7 +212,6 @@ public class StreamJob {
       
       inputSpecs_.addAll(cmdLine.getValues("-input"));
       output_ = (String) cmdLine.getValue("-output"); 
-      mapsideoutURI_ = (String) cmdLine.getValue("-mapsideoutput");
       
       mapCmd_ = (String)cmdLine.getValue("-mapper"); 
       comCmd_ = (String)cmdLine.getValue("-combiner"); 
@@ -450,20 +447,17 @@ public class StreamJob {
       System.out.println("Options:");
       System.out.println("  -input    <path>     DFS input file(s) for the Map step");
       System.out.println("  -output   <path>     DFS output directory for the Reduce step");
-      System.out.println("  -mapper   <cmd>      The streaming command to run");
-      System.out.println("  -combiner <cmd>      The streaming command to run");
-      System.out.println("  -reducer  <cmd>      The streaming command to run");
+      System.out.println("  -mapper   <cmd|JavaClassName>      The streaming command to run");
+      System.out.println("  -combiner <JavaClassName> Combiner has to be a Java class");
+      System.out.println("  -reducer  <cmd|JavaClassName>      The streaming command to run");
       System.out.println("  -file     <file>     File/dir to be shipped in the Job jar file");
-      //Only advertise the standard way: [--config dir] in our launcher 
-      //System.out.println("  -cluster  <name>     Default uses hadoop-default.xml and hadoop-site.xml");
-      //System.out.println("  -config   <file>     Optional. One or more paths to xml config files");
       System.out.println("  -dfs    <h:p>|local  Optional. Override DFS configuration");
       System.out.println("  -jt     <h:p>|local  Optional. Override JobTracker configuration");
       System.out.println("  -additionalconfspec specfile  Optional.");
-      System.out.println("  -inputformat KeyValueTextInputFormat(default)|SequenceFileInputFormat|XmlTextInputFormat  Optional.");
-      System.out.println("  -outputformat specfile  Optional.");
-      System.out.println("  -partitioner specfile  Optional.");
-      System.out.println("  -numReduceTasks specfile  Optional.");
+      System.out.println("  -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional.");
+      System.out.println("  -outputformat TextOutputFormat(default)|JavaClassName  Optional.");
+      System.out.println("  -partitioner JavaClassName  Optional.");
+      System.out.println("  -numReduceTasks <num>  Optional.");
       System.out.println("  -inputreader <spec>  Optional.");
       System.out.println("  -jobconf  <n>=<v>    Optional. Add or override a JobConf property");
       System.out.println("  -cmdenv   <n>=<v>    Optional. Pass env.var to streaming commands");
@@ -478,10 +472,7 @@ public class StreamJob {
     System.out.println("In -input: globbing on <path> is supported and can have multiple -input");
     System.out.println("Default Map input format: a line is a record in UTF-8");
     System.out.println("  the key part ends at first TAB, the rest of the line is the value");
-    System.out.println("Custom Map input format: -inputreader package.MyRecordReader,n=v,n=v ");
-    System.out
-      .println("  comma-separated name-values can be specified to configure the InputFormat");
-    System.out.println("  Ex: -inputreader 'StreamXmlRecordReader,begin=<doc>,end=</doc>'");
+    System.out.println("Custom input format: -inputformat package.MyInputFormat ");
     System.out.println("Map output format, reduce input/output format:");
     System.out.println("  Format defined by what the mapper command outputs. Line-oriented");
     System.out.println();
@@ -489,34 +480,21 @@ public class StreamJob {
     System.out.println("  working directory when the mapper and reducer are run.");
     System.out.println("  The location of this working directory is unspecified.");
     System.out.println();
-    //System.out.println("Use -cluster <name> to switch between \"local\" Hadoop and one or more remote ");
-    //System.out.println("  Hadoop clusters. ");
-    //System.out.println("  The default is to use the normal hadoop-default.xml and hadoop-site.xml");
-    //System.out.println("  Else configuration will use $HADOOP_HOME/conf/hadoop-<name>.xml");
-    //System.out.println();
+    System.out.println("To set the number of reduce tasks (num. of output files):");
+    System.out.println("  -jobconf mapred.reduce.tasks=10");
     System.out.println("To skip the sort/combine/shuffle/sort/reduce step:");
-    System.out.println("  Use -reducer " + REDUCE_NONE);
+    System.out.println("  Use -numReduceTasks 0");
     System.out
       .println("  A Task's Map output then becomes a 'side-effect output' rather than a reduce input");
     System.out
       .println("  This speeds up processing, This also feels more like \"in-place\" processing");
     System.out.println("  because the input filename and the map input order are preserved");
-    System.out.println("To specify a single side-effect output file");
-    System.out.println("    -mapsideoutput [file:/C:/win|file:/unix/|socket://host:port]");//-output for side-effects will be soon deprecated
-    System.out.println("  If the jobtracker is local this is a local file");
-    System.out.println("  This currently requires -reducer NONE");
+    System.out.println("  This equivalent -reducer NONE");
     System.out.println();
-    System.out.println("To set the number of reduce tasks (num. of output files):");
-    System.out.println("  -jobconf mapred.reduce.tasks=10");
     System.out.println("To speed up the last reduces:");
     System.out.println("  -jobconf mapred.speculative.execution=true");
-    System.out.println("  Do not use this along -reducer " + REDUCE_NONE);
     System.out.println("To name the job (appears in the JobTracker Web UI):");
     System.out.println("  -jobconf mapred.job.name='My Job' ");
-    System.out.println("To specify that line-oriented input is in gzip format:");
-    System.out
-      .println("(at this time ALL input files must be gzipped and this is not recognized based on file extension)");
-    System.out.println("   -jobconf stream.recordreader.compression=gzip ");
     System.out.println("To change the local temp directory:");
     System.out.println("  -jobconf dfs.data.dir=/tmp/dfs");
     System.out.println("  -jobconf stream.tmpdir=/tmp/streaming");
@@ -681,8 +659,6 @@ public class StreamJob {
       config_.addFinalResource(new Path(pathName));
     }
 
-    testMerge_ = (-1 != userJobConfProps_.toString().indexOf("stream.testmerge"));
-
     // general MapRed job properties
     jobConf_ = new JobConf(config_);
     
@@ -695,25 +671,32 @@ public class StreamJob {
     // (to resolve local vs. dfs drive letter differences) 
     // (mapred.working.dir will be lazily initialized ONCE and depends on FS)
     for (int i = 0; i < inputSpecs_.size(); i++) {
-      addInputSpec((String) inputSpecs_.get(i), i);
+      jobConf_.addInputPath(new Path(((String) inputSpecs_.get(i))));
     }
-    jobConf_.setBoolean("stream.inputtagged", inputTagged_);
     jobConf_.set("stream.numinputspecs", "" + inputSpecs_.size());
 
     String defaultPackage = this.getClass().getPackage().getName();
     Class c;
     Class fmt = null;
     if (inReaderSpec_ == null && inputFormatSpec_ == null) {
-      fmt = KeyValueTextInputFormat.class;
+      fmt = TextInputFormat.class;
     } else if (inputFormatSpec_ != null) {
-      if (inputFormatSpec_.equals(KeyValueTextInputFormat.class.getName())
-          || inputFormatSpec_.equals(KeyValueTextInputFormat.class.getCanonicalName())) {
-        fmt = KeyValueTextInputFormat.class;
-      } else if (inputFormatSpec_.equals(SequenceFileInputFormat.class.getName())
-          || inputFormatSpec_.equals(SequenceFileInputFormat.class.getCanonicalName())) {
-        fmt = SequenceFileInputFormat.class;
-      } else if (inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class.getName())
-          || inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class.getCanonicalName())) {
+      if (inputFormatSpec_.equals(TextInputFormat.class.getName())
+          || inputFormatSpec_.equals(TextInputFormat.class.getCanonicalName())) {
+        fmt = TextInputFormat.class;
+      } else if (inputFormatSpec_.equals(KeyValueTextInputFormat.class
+          .getName())
+          || inputFormatSpec_.equals(KeyValueTextInputFormat.class
+              .getCanonicalName())) {
+      } else if (inputFormatSpec_.equals(SequenceFileInputFormat.class
+          .getName())
+          || inputFormatSpec_
+              .equals(org.apache.hadoop.mapred.SequenceFileInputFormat.class
+                  .getCanonicalName())) {
+      } else if (inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class
+          .getName())
+          || inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class
+              .getCanonicalName())) {
         fmt = SequenceFileAsTextInputFormat.class;
       } else {
         c = StreamUtil.goodClassOrNull(inputFormatSpec_, defaultPackage);
@@ -725,14 +708,7 @@ public class StreamJob {
       }
     } 
     if (fmt == null) {
-      if (testMerge_ && false == hasSimpleInputSpecs_) {
-        // this ignores -inputreader
-        fmt = MergerInputFormat.class;
-      } else {
-        // need to keep this case to support custom -inputreader
-        // and their parameters ,n=v,n=v
-        fmt = StreamInputFormat.class;
-      }
+      fmt = StreamInputFormat.class;
     }
 
     jobConf_.setInputFormat(fmt);
@@ -757,13 +733,10 @@ public class StreamJob {
       c = StreamUtil.goodClassOrNull(comCmd_, defaultPackage);
       if (c != null) {
         jobConf_.setCombinerClass(c);
-      } else {
-        jobConf_.setCombinerClass(PipeCombiner.class);
-        jobConf_.set("stream.combine.streamprocessor", URLEncoder.encode(comCmd_, "UTF-8"));
-      }
+      } 
     }
 
-    reducerNone_ = false;
+    boolean reducerNone_ = false;
     if (redCmd_ != null) {
       reducerNone_ = redCmd_.equals(REDUCE_NONE);
       if (redCmd_.compareToIgnoreCase("aggregate") == 0) {
@@ -801,9 +774,7 @@ public class StreamJob {
     }
     
     setUserJobConfProps(false);
-    // output setup is done late so we can customize for reducerNone_
-    //jobConf_.setOutputDir(new File(output_));
-    setOutputSpec();
+    jobConf_.setOutputPath(new Path(output_));
     fmt = null;
     if (outputFormatSpec_!= null) {
       c = StreamUtil.goodClassOrNull(outputFormatSpec_, defaultPackage);
@@ -812,11 +783,7 @@ public class StreamJob {
       } 
     }
     if (fmt == null) {
-      if (testMerge_) {
-        fmt = MuxOutputFormat.class;
-      } else {
-        fmt = TextOutputFormat.class;
-      }
+      fmt = TextOutputFormat.class;
     }
     jobConf_.setOutputFormat(fmt);
 
@@ -831,6 +798,9 @@ public class StreamJob {
       int numReduceTasks = Integer.parseInt(numReduceTasksSpec_);
       jobConf_.setNumReduceTasks(numReduceTasks);
     }
+    if (reducerNone_) {
+      jobConf_.setNumReduceTasks(0);
+    }
     
     // last, allow user to override anything
     // (although typically used with properties we didn't touch)
@@ -880,78 +850,6 @@ public class StreamJob {
     }
     msg("====");
   }
-  
-  /** InputSpec-s encode: a glob pattern x additional column files x additional joins */
-  protected void addInputSpec(String inSpec, int index) {
-    if (!testMerge_) {
-      jobConf_.addInputPath(new Path(inSpec));
-    } else {
-      CompoundDirSpec spec = new CompoundDirSpec(inSpec, true);
-      msg("Parsed -input:\n" + spec.toTableString());
-      if (index == 0) {
-        hasSimpleInputSpecs_ = (spec.paths_.length == 0);
-        msg("hasSimpleInputSpecs_=" + hasSimpleInputSpecs_);
-      }
-      String primary = spec.primarySpec();
-      if (!seenPrimary_.add(primary)) {
-        // this won't detect glob overlaps and noncanonical path variations
-        fail("Primary used in multiple -input spec: " + primary);
-      }
-      jobConf_.addInputPath(new Path(primary));
-      // during Job execution, will reparse into a CompoundDirSpec 
-      jobConf_.set("stream.inputspecs." + index, inSpec);
-    }
-  }
-
-  /** uses output_ and mapsideoutURI_ */
-  protected void setOutputSpec() throws IOException {
-    CompoundDirSpec spec = new CompoundDirSpec(output_, false);
-    msg("Parsed -output:\n" + spec.toTableString());
-    String primary = spec.primarySpec();
-    String channel0;
-    // TODO simplify cases, encapsulate in a StreamJobConf
-    if (!reducerNone_) {
-      channel0 = primary;
-    } else {
-      if (mapsideoutURI_ != null) {
-        // user can override in case this is in a difft filesystem..
-        try {
-          URI uri = new URI(mapsideoutURI_);
-          if (uri.getScheme() == null || uri.getScheme().equals("file")) { // || uri.getScheme().equals("hdfs")
-            if (!new Path(uri.getSchemeSpecificPart()).isAbsolute()) {
-              fail("Must be absolute: " + mapsideoutURI_);
-            }
-          } else if (uri.getScheme().equals("socket")) {
-            // ok
-          } else {
-            fail("Invalid scheme: " + uri.getScheme() + " for -mapsideoutput " + mapsideoutURI_);
-          }
-        } catch (URISyntaxException e) {
-          throw (IOException) new IOException().initCause(e);
-        }
-      }
-      // an empty reduce output named "part-00002" will go here and not collide.
-      channel0 = primary + ".NONE";
-      // the side-effect of the first split of an input named "part-00002" 
-      // will go in this directory
-      jobConf_.set("stream.sideoutput.dir", primary);
-      // oops if user overrides low-level this isn't set yet :-(
-      boolean localjt = StreamUtil.isLocalJobTracker(jobConf_);
-      // just a guess user may prefer remote..
-      jobConf_.setBoolean("stream.sideoutput.localfs", localjt);
-    }
-    // a path in fs.name.default filesystem
-    System.out.println(channel0);
-    System.out.println(new Path(channel0));
-    jobConf_.setOutputPath(new Path(channel0));
-    // will reparse remotely
-    jobConf_.set("stream.outputspec", output_);
-    if (null != mapsideoutURI_) {
-      // a path in "jobtracker's filesystem"
-      // overrides sideoutput.dir
-      jobConf_.set("stream.sideoutput.uri", mapsideoutURI_);
-    }
-  }
 
   protected String getJobTrackerHostPort() {
     return jobConf_.get("mapred.job.tracker");
@@ -1099,15 +997,12 @@ public class StreamJob {
 
   // command-line arguments
   protected ArrayList inputSpecs_ = new ArrayList(); // <String>
-  protected boolean inputTagged_ = false;
   protected TreeSet seenPrimary_ = new TreeSet(); // <String>
   protected boolean hasSimpleInputSpecs_;
   protected ArrayList packageFiles_ = new ArrayList(); // <String>
   protected ArrayList shippedCanonFiles_ = new ArrayList(); // <String>
-  //protected ArrayList userJobConfProps_ = new ArrayList(); // <String> name=value
   protected TreeMap<String, String> userJobConfProps_ = new TreeMap<String, String>(); 
   protected String output_;
-  protected String mapsideoutURI_;
   protected String mapCmd_;
   protected String comCmd_;
   protected String redCmd_;
@@ -1125,8 +1020,6 @@ public class StreamJob {
   protected String numReduceTasksSpec_;
   protected String additionalConfSpec_;
 
-  protected boolean testMerge_;
-
   // Use to communicate config to the external processes (ex env.var.HADOOP_USER)
   // encoding "a=b c=d"
   protected String addTaskEnvironment_;

+ 0 - 14
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java

@@ -453,20 +453,6 @@ public class StreamUtil {
     return res;
   }
 
-  static boolean getUseMapSideEffect(JobConf job) {
-    String reduce = job.get("stream.reduce.streamprocessor");
-    if (reduce == null) {
-      return false;
-    }
-    try {
-      reduce = URLDecoder.decode(reduce, "UTF-8");
-    } catch (UnsupportedEncodingException e) {
-      System.err.println("stream.reduce.streamprocessor in jobconf not found");
-      return false;
-    }
-    return StreamJob.REDUCE_NONE.equals(reduce);
-  }
-
   public static void touch(File file) throws IOException {
     file = file.getAbsoluteFile();
     FileOutputStream out = new FileOutputStream(file);

+ 0 - 92
src/contrib/streaming/src/java/org/apache/hadoop/streaming/TupleInputFormat.java

@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.streaming;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.*;
-
-/**
- @author Michel Tourn
- */
-public class TupleInputFormat extends InputFormatBase {
-
-  public TupleInputFormat() {
-    fmts_ = new ArrayList();
-  }
-
-  public void setPrimary(InputFormat fmt) {
-    if (fmts_.size() == 0) {
-      fmts_.add(fmt);
-    } else {
-      fmts_.set(0, fmt);
-    }
-  }
-
-  public void addSecondary(InputFormat fmt) {
-    if (fmts_.size() == 0) {
-      throw new IllegalStateException("this.setPrimary() has not been called");
-    }
-    fmts_.add(fmt);
-  }
-
-  /**
-   */
-  public RecordReader getRecordReader(InputSplit split, JobConf job, 
-                                      Reporter reporter) throws IOException {
-
-    reporter.setStatus(split.toString());
-
-    return new MultiRecordReader();
-  }
-
-  class MultiRecordReader implements RecordReader {
-
-    MultiRecordReader() {
-    }
-
-    public boolean next(Writable key, Writable value) throws IOException {
-      return false;
-    }
-
-    public long getPos() throws IOException {
-      return 0;
-    }
-
-    public void close() throws IOException {
-    }
-
-    public WritableComparable createKey() {
-      return new UTF8();
-    }
-
-    public Writable createValue() {
-      return new UTF8();
-    }
-
-    public float getProgress() {
-      return 1.0f;
-    }
-  }
-
-  ArrayList/*<InputFormat>*/fmts_;
-}

+ 0 - 2
src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java

@@ -19,10 +19,8 @@
 package org.apache.hadoop.streaming;
 
 import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.PushbackInputStream;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.LineRecordReader;

+ 3 - 2
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java

@@ -38,10 +38,11 @@ public class TestStreaming extends TestCase
   protected String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
   // map behaves like "/usr/bin/tr . \\n"; (split words into lines)
   protected String map = StreamUtil.makeJavaCommand(TrApp.class, new String[]{".", "\\n"});
-  // combine, reduce behave like /usr/bin/uniq. But also prepend lines with C, R.
+  // reduce behave like /usr/bin/uniq. But also prepend lines with R.
+  // command-line combiner does not have any effect any more.
   protected String combine  = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{"C"});
   protected String reduce = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{"R"});
-  protected String outputExpect = "RCare\t\nRCblue\t\nRCbunnies\t\nRCpink\t\nRCred\t\nRCroses\t\nRCviolets\t\n";
+  protected String outputExpect = "Rare\t\nRblue\t\nRbunnies\t\nRpink\t\nRred\t\nRroses\t\nRviolets\t\n";
 
   private StreamJob job;
 

+ 1 - 1
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java

@@ -41,7 +41,7 @@ public class TrApp
     // test that some JobConf properties are exposed as expected     
     // Note the dots translated to underscore: 
     // property names have been escaped in PipeMapRed.safeEnvVarName()
-    expect("mapred_input_format_class", "org.apache.hadoop.mapred.KeyValueTextInputFormat");
+    expect("mapred_input_format_class", "org.apache.hadoop.mapred.TextInputFormat");
     expect("mapred_job_tracker", "local");
     //expect("mapred_local_dir", "build/test/mapred/local");
     expectDefined("mapred_local_dir");