Просмотр исходного кода

HADOOP-437. contrib/streaming: Add support for gzipped inputs. Contributed by Michel.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@437848 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 лет назад
Родитель
Сommit
df4e5c21ca

+ 3 - 0
CHANGES.txt

@@ -86,6 +86,9 @@ Trunk (unreleased changes)
 21. HADOOP-486.  Add the job username to JobStatus instances returned
     by JobClient.  (Mahadev Konar via cutting)
 
+22. HADOOP-437.  contrib/streaming: Add support for gzipped inputs.
+    (Michel Tourn via cutting)
+
 
 Release 0.5.0 - 2006-08-04
 

+ 7 - 1
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java

@@ -166,6 +166,10 @@ public abstract class PipeMapRed {
       String argv = getPipeCommand(job);
       keyCols_ = getKeyColsFromPipeCommand(argv);
 
+      debug_ = (job.get("stream.debug") != null);
+      if(debug_) {
+        System.out.println("PipeMapRed: stream.debug=true");
+      }
       job_ = job;
 
       // Currently: null is identity reduce. REDUCE_NONE is no-map-outputs.
@@ -194,6 +198,7 @@ public abstract class PipeMapRed {
       optSideEffect_ = getUseSideEffect();
 
       if(optSideEffect_) {
+        // in cluster local named: outnone/map_bw5nzv
         String fileName = job_.get("mapred.task.id");
         sideEffectPath_ = new Path(job_.getOutputPath(), fileName);
         FileSystem fs = FileSystem.get(job_);
@@ -360,7 +365,7 @@ public abstract class PipeMapRed {
       val.set(line.substring(pos+1));
     }
   }
-
+  
   class MROutputThread extends Thread
   {
     MROutputThread(OutputCollector output, Reporter reporter)
@@ -557,6 +562,7 @@ public abstract class PipeMapRed {
   int reportPortPlusOne_;
 
   boolean doPipe_;
+  boolean debug_;
 
   Process sim;
   Object doneLock_;

+ 4 - 2
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java

@@ -72,7 +72,8 @@ public class PipeMapper extends PipeMapRed implements Mapper
     try {
       // 1/4 Hadoop in
       if(key instanceof BytesWritable) {
-        mapredKey_ = new String(((BytesWritable)key).get(), "UTF-8");
+        BytesWritable bKey = (BytesWritable)key;
+        mapredKey_ = new String(bKey.get(), 0, bKey.getSize(), "UTF-8");
       } else {
         mapredKey_ = key.toString();        
       }
@@ -84,7 +85,8 @@ public class PipeMapper extends PipeMapRed implements Mapper
       if(numExceptions_==0) {
         String sval;
         if(value instanceof BytesWritable) {
-          sval = new String(((BytesWritable)value).get(), "UTF-8");
+          BytesWritable bVal = (BytesWritable)value;
+          sval = new String(bVal.get(), 0, bVal.getSize(), "UTF-8");
         } else {
           sval = value.toString();
         }

+ 1 - 1
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java

@@ -45,7 +45,7 @@ public abstract class StreamBaseRecordReader implements RecordReader
   protected static final Log LOG = LogFactory.getLog(StreamBaseRecordReader.class.getName());
   
   // custom JobConf properties for this class are prefixed with this namespace
-  final String CONF_NS = "stream.recordreader.";
+  final static String CONF_NS = "stream.recordreader.";
 
   public StreamBaseRecordReader(
     FSDataInputStream in, FileSplit split, Reporter reporter, JobConf job, FileSystem fs)

+ 31 - 0
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java

@@ -62,6 +62,36 @@ public class StreamInputFormat extends InputFormatBase
     return b;
   }
 
+  static boolean isGzippedInput(JobConf job)
+  {
+    String val = job.get(StreamBaseRecordReader.CONF_NS + "compression");
+    return "gzip".equals(val);
+  }
+
+  public FileSplit[] getSplits(FileSystem fs, JobConf job, int numSplits)
+    throws IOException {
+      
+    if(isGzippedInput(job)) {
+      return getFullFileSplits(fs, job);
+    } else {
+      return super.getSplits(fs, job, numSplits);
+    }   
+  }
+  
+  /** For the compressed-files case: override InputFormatBase to produce one split. */
+  FileSplit[] getFullFileSplits(FileSystem fs, JobConf job)
+    throws IOException
+  {
+    Path[] files = listPaths(fs, job);
+    int numSplits = files.length;
+    ArrayList splits = new ArrayList(numSplits);
+    for (int i = 0; i < files.length; i++) {
+      Path file = files[i];
+      long splitSize = fs.getLength(file);
+      splits.add(new FileSplit(file, 0, splitSize));
+    }
+    return (FileSplit[])splits.toArray(new FileSplit[splits.size()]);
+  }
 
   protected Path[] listPaths(FileSystem fs, JobConf job)
     throws IOException
@@ -170,4 +200,5 @@ public class StreamInputFormat extends InputFormatBase
     return reader;
   }
 
+  
 }

+ 6 - 0
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java

@@ -22,6 +22,7 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
+import java.util.Map;
 
 import org.apache.commons.logging.*;
 
@@ -290,6 +291,11 @@ public class StreamJob
     System.out.println();
     System.out.println("To set the number of reduce tasks (num. of output files):");
     System.out.println("  -jobconf mapred.reduce.tasks=10");
+    System.out.println("To name the job (appears in the JobTrack Web UI):");
+    System.out.println("  -jobconf mapred.job.name='My Job' ");
+    System.out.println("To specify that line-oriented input is in gzip format:");
+    System.out.println("(at this time ALL input files must be gzipped and this is not recognized based on file extension)");
+    System.out.println("   -jobconf stream.recordreader.compression=gzip ");
     System.out.println("To change the local temp directory:");
     System.out.println("  -jobconf dfs.data.dir=/tmp");
     System.out.println("Additional local temp directories with -cluster local:");

+ 50 - 20
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java

@@ -17,6 +17,7 @@
 package org.apache.hadoop.streaming;
 
 import java.io.*;
+import java.util.zip.GZIPInputStream; 
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -41,35 +42,54 @@ public class StreamLineRecordReader extends StreamBaseRecordReader
     throws IOException
   {
     super(in, split, reporter, job, fs);
+    gzipped_ = StreamInputFormat.isGzippedInput(job);
+    if(gzipped_) {
+      din_ = new DataInputStream(new GZIPInputStream(in_));
+    } else {
+      din_ = in_;
+    }
   }
 
   public void seekNextRecordBoundary() throws IOException
   {
-    int bytesSkipped = 0;
-    if (start_ != 0) {
-      in_.seek(start_ - 1);
-      // scan to the next newline in the file
-      while (in_.getPos() < end_) {
-        char c = (char)in_.read();
-        bytesSkipped++;
-        if (c == '\r' || c == '\n') {
-          break;
+    if(gzipped_) {
+      // no skipping: use din_ as-is 
+      // assumes splitter created only one split per file
+      return;
+    } else {
+      int bytesSkipped = 0;
+      if (start_ != 0) {
+        in_.seek(start_ - 1);
+        // scan to the next newline in the file
+        while (in_.getPos() < end_) {
+          char c = (char)in_.read();
+          bytesSkipped++;
+          if (c == '\r' || c == '\n') {
+            break;
+          }
         }
       }
-    }
 
-    //System.out.println("getRecordReader start="+start_ + " end=" + end_ + " bytesSkipped"+bytesSkipped);
+      //System.out.println("getRecordReader start="+start_ + " end=" + end_ + " bytesSkipped"+bytesSkipped);
+    }
   }
 
   public synchronized boolean next(Writable key, Writable value)
     throws IOException {
-    long pos = in_.getPos();
-    if (pos >= end_)
-      return false;
+    if(gzipped_) {
+      // figure EOS from readLine
+    } else {
+      long pos = in_.getPos();
+      if (pos >= end_)
+        return false;
+    }
 
-    //((LongWritable)key).set(pos);           // key is position
+    //((LongWritable)key).set(pos);      // key is position
     //((UTF8)value).set(readLine(in));   // value is line
-    String line = readLine(in_);
+    String line = readLine(din_);
+    if(line == null) {
+        return false; // for gzipped_
+    }
 
     // key is line up to TAB, value is rest
     final boolean NOVAL = false;
@@ -92,22 +112,32 @@ public class StreamLineRecordReader extends StreamBaseRecordReader
 
 
   // from TextInputFormat
-  private static String readLine(FSDataInputStream in) throws IOException {
+  private static String readLine(InputStream in) throws IOException {
     StringBuffer buffer = new StringBuffer();
+    boolean over = true;
     while (true) {
 
       int b = in.read();
       if (b == -1)
         break;
-
+      
+      over = false;
       char c = (char)b;              // bug: this assumes eight-bit characters.
       if (c == '\r' || c == '\n')    // TODO || c == '\t' here
         break;
 
       buffer.append(c);
     }
-
-    return buffer.toString();
+    
+    if(over) {
+      return null;
+    } else {
+      return buffer.toString();
+    }
+    
   }
 
+  boolean gzipped_;
+  GZIPInputStream zin_;
+  DataInputStream din_; // GZIP or plain
 }

+ 7 - 7
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java

@@ -40,13 +40,13 @@ public class StreamSequenceRecordReader extends StreamBaseRecordReader
     numFailed_ = 0;
     // super.in_ ignored, using rin_ instead
   }
-  
-    
+
+
   public synchronized boolean next(Writable key, Writable value)
    throws IOException
-  {         
+  {
     boolean success;
-    do {    
+    do {
       if (!more_) return false;
       success = false;
       try {
@@ -61,7 +61,7 @@ public class StreamSequenceRecordReader extends StreamBaseRecordReader
       } catch(IOException io) {
         numFailed_++;
         if(numFailed_ < 100 || numFailed_ % 100 == 0) {
-          err_.println("StreamSequenceRecordReader: numFailed_/numRec_=" 
+          err_.println("StreamSequenceRecordReader: numFailed_/numRec_="
             + numFailed_+ "/" + numRec_);
         }
         io.printStackTrace(err_);
@@ -69,9 +69,9 @@ public class StreamSequenceRecordReader extends StreamBaseRecordReader
       }
     } while(!success);
     numRecStats("");
-    return more_;    
+    return more_;
   }
-  
+
 
   public void seekNextRecordBoundary() throws IOException
   {