|
@@ -19,59 +19,65 @@
|
|
|
package org.apache.hadoop.streaming;
|
|
|
|
|
|
import java.io.*;
|
|
|
-import java.nio.charset.MalformedInputException;
|
|
|
-import java.util.Arrays;
|
|
|
-import java.util.zip.GZIPInputStream;
|
|
|
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.io.Writable;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
+import org.apache.hadoop.io.WritableComparable;
|
|
|
+import org.apache.hadoop.io.compress.GzipCodec;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
+import org.apache.hadoop.mapred.LineRecordReader;
|
|
|
import org.apache.hadoop.mapred.Reporter;
|
|
|
-import org.apache.hadoop.mapred.RecordReader;
|
|
|
import org.apache.hadoop.mapred.FileSplit;
|
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
|
-import org.apache.hadoop.util.StringUtils;
|
|
|
|
|
|
/**
|
|
|
* Similar to org.apache.hadoop.mapred.TextRecordReader,
|
|
|
* but delimits key and value with a TAB.
|
|
|
* @author Michel Tourn
|
|
|
*/
|
|
|
-public class StreamLineRecordReader extends StreamBaseRecordReader {
|
|
|
+public class StreamLineRecordReader extends LineRecordReader {
|
|
|
+
|
|
|
+ private String splitName;
|
|
|
+ private Reporter reporter;
|
|
|
+ private FileSplit split;
|
|
|
+ private int numRec = 0;
|
|
|
+ private int nextStatusRec = 1;
|
|
|
+ private int statusMaxRecordChars;
|
|
|
+ protected static final Log LOG = LogFactory.getLog(StreamLineRecordReader.class);
|
|
|
+ // base class uses LongWritable as key, use this.
|
|
|
+ private WritableComparable dummyKey = super.createKey();
|
|
|
+ private Text innerValue = (Text)super.createValue();
|
|
|
|
|
|
- public StreamLineRecordReader(FSDataInputStream in, FileSplit split, Reporter reporter,
|
|
|
+ public StreamLineRecordReader(FSDataInputStream in, FileSplit split,
|
|
|
+ Reporter reporter,
|
|
|
JobConf job, FileSystem fs) throws IOException {
|
|
|
- super(in, split, reporter, job, fs);
|
|
|
- gzipped_ = StreamInputFormat.isGzippedInput(job);
|
|
|
- if (gzipped_) {
|
|
|
- din_ = new BufferedInputStream( (new GZIPInputStream(in_) ) );
|
|
|
- } else {
|
|
|
- din_ = in_;
|
|
|
- }
|
|
|
+ super(createStream(in, job), split.getStart(),
|
|
|
+ split.getStart() + split.getLength());
|
|
|
+ this.split = split ;
|
|
|
+ this.reporter = reporter ;
|
|
|
}
|
|
|
-
|
|
|
- public void seekNextRecordBoundary() throws IOException {
|
|
|
- if (gzipped_) {
|
|
|
- // no skipping: use din_ as-is
|
|
|
- // assumes splitter created only one split per file
|
|
|
- return;
|
|
|
- } else {
|
|
|
- int bytesSkipped = 0;
|
|
|
- if (start_ != 0) {
|
|
|
- in_.seek(start_ - 1);
|
|
|
- // scan to the next newline in the file
|
|
|
- while (in_.getPos() < end_) {
|
|
|
- char c = (char) in_.read();
|
|
|
- bytesSkipped++;
|
|
|
- if (c == '\r' || c == '\n') {
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- //System.out.println("getRecordReader start="+start_ + " end=" + end_ + " bytesSkipped"+bytesSkipped);
|
|
|
- }
|
|
|
+
|
|
|
+ private static InputStream createStream(FSDataInputStream in, JobConf job)
|
|
|
+ throws IOException{
|
|
|
+ InputStream finalStream = in ;
|
|
|
+ boolean gzipped = StreamInputFormat.isGzippedInput(job);
|
|
|
+ if ( gzipped ) {
|
|
|
+ GzipCodec codec = new GzipCodec();
|
|
|
+ codec.setConf(job);
|
|
|
+ finalStream = codec.createInputStream(in);
|
|
|
+ }
|
|
|
+ return finalStream;
|
|
|
+ }
|
|
|
+
|
|
|
+ public WritableComparable createKey() {
|
|
|
+ return new Text();
|
|
|
+ }
|
|
|
+
|
|
|
+ public Writable createValue() {
|
|
|
+ return new Text();
|
|
|
}
|
|
|
|
|
|
public synchronized boolean next(Writable key, Writable value) throws IOException {
|
|
@@ -86,14 +92,12 @@ public class StreamLineRecordReader extends StreamBaseRecordReader {
|
|
|
|
|
|
Text tKey = (Text) key;
|
|
|
Text tValue = (Text) value;
|
|
|
- byte[] line;
|
|
|
-
|
|
|
- if ( !gzipped_ ) {
|
|
|
- long pos = in_.getPos();
|
|
|
- if (pos >= end_) return false;
|
|
|
+ byte[] line = null ;
|
|
|
+ if( super.next(dummyKey, innerValue) ){
|
|
|
+ line = innerValue.getBytes();
|
|
|
+ }else{
|
|
|
+ return false;
|
|
|
}
|
|
|
-
|
|
|
- line = UTF8ByteArrayUtils.readLine((InputStream) din_);
|
|
|
if (line == null) return false;
|
|
|
int tab = UTF8ByteArrayUtils.findTab(line);
|
|
|
if (tab == -1) {
|
|
@@ -105,7 +109,35 @@ public class StreamLineRecordReader extends StreamBaseRecordReader {
|
|
|
numRecStats(line, 0, line.length);
|
|
|
return true;
|
|
|
}
|
|
|
+
|
|
|
+ private void numRecStats(byte[] record, int start, int len) throws IOException {
|
|
|
+ numRec++;
|
|
|
+ if (numRec == nextStatusRec) {
|
|
|
+ String recordStr = new String(record, start, Math.min(len, statusMaxRecordChars), "UTF-8");
|
|
|
+ nextStatusRec += 100;//*= 10;
|
|
|
+ String status = getStatus(recordStr);
|
|
|
+ LOG.info(status);
|
|
|
+ reporter.setStatus(status);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- boolean gzipped_;
|
|
|
- InputStream din_; // GZIP or plain
|
|
|
+ private String getStatus(CharSequence record) {
|
|
|
+ long pos = -1;
|
|
|
+ try {
|
|
|
+ pos = getPos();
|
|
|
+ } catch (IOException io) {
|
|
|
+ }
|
|
|
+ String recStr;
|
|
|
+ if (record.length() > statusMaxRecordChars) {
|
|
|
+ recStr = record.subSequence(0, statusMaxRecordChars) + "...";
|
|
|
+ } else {
|
|
|
+ recStr = record.toString();
|
|
|
+ }
|
|
|
+ String unqualSplit = split.getFile().getName() + ":" + split.getStart() + "+"
|
|
|
+ + split.getLength();
|
|
|
+ String status = "HSTR " + StreamUtil.HOST + " " + numRec + ". pos=" + pos + " " + unqualSplit
|
|
|
+ + " Processing record=" + recStr;
|
|
|
+ status += " " + splitName;
|
|
|
+ return status;
|
|
|
+ }
|
|
|
}
|