1
0
فهرست منبع

HADOOP-1029. Fix streaming's input format to correctly seek to the start of splits. Contributed by Arun.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@511039 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 سال پیش
والد
کامیت
659b22cff5

+ 3 - 0
CHANGES.txt

@@ -119,6 +119,9 @@ Trunk (unreleased changes)
 35. HADOOP-248.  Optimize location of map outputs to not use random
 35. HADOOP-248.  Optimize location of map outputs to not use random
     probes.  (Devaraj Das via cutting)
     probes.  (Devaraj Das via cutting)
 
 
+36. HADOOP-1029.  Fix streaming's input format to correctly seek to
+    the start of splits.  (Arun C Murthy via cutting)
+
 
 
 Release 0.11.2 - 2007-02-16
 Release 0.11.2 - 2007-02-16
 
 

+ 17 - 6
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java

@@ -72,16 +72,27 @@ public class StreamInputFormat extends TextInputFormat {
                                       JobConf job,
                                       JobConf job,
                                       Reporter reporter) throws IOException {
                                       Reporter reporter) throws IOException {
     FileSplit split = (FileSplit) genericSplit;
     FileSplit split = (FileSplit) genericSplit;
-    FileSystem fs = split.getPath().getFileSystem(job);
     LOG.info("getRecordReader start.....split=" + split);
     LOG.info("getRecordReader start.....split=" + split);
     reporter.setStatus(split.toString());
     reporter.setStatus(split.toString());
 
 
-    final long start = split.getStart();
-    final long end = start + split.getLength();
-
-    FSDataInputStream in = fs.open(split.getPath());
+    long start = split.getStart();
+    long length  = split.getLength();
     
     
-    // will open the file and seek to the start of the split
+    // Open the file and seek to the start of the split
+    FileSystem fs = split.getPath().getFileSystem(job);
+    FSDataInputStream in = fs.open(split.getPath());
+    if (isGzippedInput(job)) {
+      length = Long.MAX_VALUE;
+    } else if (start != 0) {
+      in.seek(start-1);
+      LineRecordReader.readLine(in, null);
+      long oldStart = start;
+      start = in.getPos();
+      length -= (start - oldStart); 
+    }
+    // Ugly hack! 
+    split = new FileSplit(split.getPath(), start, length, job);
+
     // Factory dispatch based on available params..
     // Factory dispatch based on available params..
     Class readerClass;
     Class readerClass;
     String c = job.get("stream.recordreader.class");
     String c = job.get("stream.recordreader.class");

+ 9 - 7
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java

@@ -55,7 +55,7 @@ public class StreamLineRecordReader extends LineRecordReader {
       Reporter reporter,
       Reporter reporter,
       JobConf job, FileSystem fs) throws IOException {
       JobConf job, FileSystem fs) throws IOException {
     super(createStream(in, job), split.getStart(), 
     super(createStream(in, job), split.getStart(), 
-        split.getStart() + split.getLength());
+            (split.getStart() + split.getLength()));
     this.split = split ; 
     this.split = split ; 
     this.reporter = reporter ; 
     this.reporter = reporter ; 
   }
   }
@@ -92,21 +92,23 @@ public class StreamLineRecordReader extends LineRecordReader {
 
 
     Text tKey = (Text) key;
     Text tKey = (Text) key;
     Text tValue = (Text) value;
     Text tValue = (Text) value;
-    byte[] line = null ; 
+    byte[] line = null ;
+    int lineLen = -1;
     if( super.next(dummyKey, innerValue) ){
     if( super.next(dummyKey, innerValue) ){
-      line = innerValue.getBytes(); 
+      line = innerValue.getBytes();
+      lineLen = innerValue.getLength();
     }else{
     }else{
       return false;
       return false;
     }
     }
     if (line == null) return false;
     if (line == null) return false;
-    int tab = UTF8ByteArrayUtils.findTab(line);
+    int tab = UTF8ByteArrayUtils.findTab(line, 0, lineLen);
     if (tab == -1) {
     if (tab == -1) {
-      tKey.set(line);
+      tKey.set(line, 0, lineLen);
       tValue.set("");
       tValue.set("");
     } else {
     } else {
-      UTF8ByteArrayUtils.splitKeyVal(line, tKey, tValue, tab);
+      UTF8ByteArrayUtils.splitKeyVal(line, 0, lineLen, tKey, tValue, tab);
     }
     }
-    numRecStats(line, 0, line.length);
+    numRecStats(line, 0, lineLen);
     return true;
     return true;
   }
   }
   
   

+ 42 - 13
src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java

@@ -33,40 +33,69 @@ public class UTF8ByteArrayUtils {
     /**
     /**
      * Find the first occured tab in a UTF-8 encoded string
      * Find the first occured tab in a UTF-8 encoded string
      * @param utf a byte array containing a UTF-8 encoded string
      * @param utf a byte array containing a UTF-8 encoded string
+     * @param start starting offset
+     * @param length no. of bytes
      * @return position that first tab occures otherwise -1
      * @return position that first tab occures otherwise -1
      */
      */
-    public static int findTab(byte [] utf) {
-        for(int i=0; i<utf.length; i++) {
+    public static int findTab(byte [] utf, int start, int length) {
+        for(int i=start; i<(start+length); i++) {
             if(utf[i]==(byte)'\t') {
             if(utf[i]==(byte)'\t') {
                 return i;
                 return i;
             }
             }
-          }
-          return -1;      
+        }
+        return -1;      
     }
     }
     
     
+    /**
+     * Find the first occured tab in a UTF-8 encoded string
+     * @param utf a byte array containing a UTF-8 encoded string
+     * @return position that first tab occures otherwise -1
+     */
+    public static int findTab(byte [] utf) {
+      return findTab(utf, 0, utf.length);
+    }
+
     /**
     /**
      * split a UTF-8 byte array into key and value 
      * split a UTF-8 byte array into key and value 
      * assuming that the delimilator is at splitpos. 
      * assuming that the delimilator is at splitpos. 
      * @param utf utf-8 encoded string
      * @param utf utf-8 encoded string
+     * @param start starting offset
+     * @param length no. of bytes
      * @param key contains key upon the method is returned
      * @param key contains key upon the method is returned
      * @param val contains value upon the method is returned
      * @param val contains value upon the method is returned
      * @param splitPos the split pos
      * @param splitPos the split pos
      * @throws IOException
      * @throws IOException
      */
      */
-    public static void splitKeyVal(byte[] utf, Text key, Text val, int splitPos) 
-    throws IOException {
-        if(splitPos<0 || splitPos >= utf.length)
-            throw new IllegalArgumentException(
-                    "splitPos must be in the range [0, "+splitPos+"]: " +splitPos);
-        byte [] keyBytes = new byte[splitPos];
-        System.arraycopy(utf, 0, keyBytes, 0, splitPos);
-        int valLen = utf.length-splitPos-1;
+    public static void splitKeyVal(byte[] utf, int start, int length, 
+            Text key, Text val, int splitPos) throws IOException {
+        if(splitPos<start || splitPos >= (start+length))
+            throw new IllegalArgumentException( "splitPos must be in the range " +
+                "[" + start + ", " + (start+length) + "]: " + splitPos);
+        int keyLen = (splitPos-start);
+        byte [] keyBytes = new byte[keyLen];
+        System.arraycopy(utf, start, keyBytes, 0, keyLen);
+        int valLen = (start+length)-splitPos-1;
         byte [] valBytes = new byte[valLen];
         byte [] valBytes = new byte[valLen];
-        System.arraycopy(utf,splitPos+1, valBytes, 0, valLen );
+        System.arraycopy(utf, splitPos+1, valBytes, 0, valLen);
         key.set(keyBytes);
         key.set(keyBytes);
         val.set(valBytes);
         val.set(valBytes);
     }
     }
     
     
+
+    /**
+     * split a UTF-8 byte array into key and value 
+     * assuming that the delimilator is at splitpos. 
+     * @param utf utf-8 encoded string
+     * @param key contains key upon the method is returned
+     * @param val contains value upon the method is returned
+     * @param splitPos the split pos
+     * @throws IOException
+     */
+    public static void splitKeyVal(byte[] utf, Text key, Text val, int splitPos) 
+    throws IOException {
+        splitKeyVal(utf, 0, utf.length, key, val, splitPos);
+    }
+    
     /**
     /**
      * Read a utf8 encoded line from a data input stream. 
      * Read a utf8 encoded line from a data input stream. 
      * @param in data input stream
      * @param in data input stream