|
@@ -77,10 +77,25 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
|
|
public LineReader(InputStream in, Configuration conf) throws IOException {
|
|
public LineReader(InputStream in, Configuration conf) throws IOException {
|
|
super(in, conf);
|
|
super(in, conf);
|
|
}
|
|
}
|
|
|
|
+ LineReader(InputStream in, byte[] recordDelimiter) {
|
|
|
|
+ super(in, recordDelimiter);
|
|
|
|
+ }
|
|
|
|
+ LineReader(InputStream in, int bufferSize, byte[] recordDelimiter) {
|
|
|
|
+ super(in, bufferSize, recordDelimiter);
|
|
|
|
+ }
|
|
|
|
+ public LineReader(InputStream in, Configuration conf,
|
|
|
|
+ byte[] recordDelimiter) throws IOException {
|
|
|
|
+ super(in, conf, recordDelimiter);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
public LineRecordReader(Configuration job,
|
|
public LineRecordReader(Configuration job,
|
|
FileSplit split) throws IOException {
|
|
FileSplit split) throws IOException {
|
|
|
|
+ this(job, split, null);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public LineRecordReader(Configuration job, FileSplit split,
|
|
|
|
+ byte[] recordDelimiter) throws IOException {
|
|
this.maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input.
|
|
this.maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input.
|
|
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
|
|
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
|
|
start = split.getStart();
|
|
start = split.getStart();
|
|
@@ -99,17 +114,17 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
|
|
((SplittableCompressionCodec)codec).createInputStream(
|
|
((SplittableCompressionCodec)codec).createInputStream(
|
|
fileIn, decompressor, start, end,
|
|
fileIn, decompressor, start, end,
|
|
SplittableCompressionCodec.READ_MODE.BYBLOCK);
|
|
SplittableCompressionCodec.READ_MODE.BYBLOCK);
|
|
- in = new LineReader(cIn, job);
|
|
|
|
|
|
+ in = new LineReader(cIn, job, recordDelimiter);
|
|
start = cIn.getAdjustedStart();
|
|
start = cIn.getAdjustedStart();
|
|
end = cIn.getAdjustedEnd();
|
|
end = cIn.getAdjustedEnd();
|
|
filePosition = cIn; // take pos from compressed stream
|
|
filePosition = cIn; // take pos from compressed stream
|
|
} else {
|
|
} else {
|
|
- in = new LineReader(codec.createInputStream(fileIn, decompressor), job);
|
|
|
|
|
|
+ in = new LineReader(codec.createInputStream(fileIn, decompressor), job, recordDelimiter);
|
|
filePosition = fileIn;
|
|
filePosition = fileIn;
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
fileIn.seek(start);
|
|
fileIn.seek(start);
|
|
- in = new LineReader(fileIn, job);
|
|
|
|
|
|
+ in = new LineReader(fileIn, job, recordDelimiter);
|
|
filePosition = fileIn;
|
|
filePosition = fileIn;
|
|
}
|
|
}
|
|
// If this is not the first split, we always throw away first record
|
|
// If this is not the first split, we always throw away first record
|
|
@@ -120,29 +135,40 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
|
|
}
|
|
}
|
|
this.pos = start;
|
|
this.pos = start;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
public LineRecordReader(InputStream in, long offset, long endOffset,
|
|
public LineRecordReader(InputStream in, long offset, long endOffset,
|
|
int maxLineLength) {
|
|
int maxLineLength) {
|
|
|
|
+ this(in, offset, endOffset, maxLineLength, null);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public LineRecordReader(InputStream in, long offset, long endOffset,
|
|
|
|
+ int maxLineLength, byte[] recordDelimiter) {
|
|
this.maxLineLength = maxLineLength;
|
|
this.maxLineLength = maxLineLength;
|
|
- this.in = new LineReader(in);
|
|
|
|
|
|
+ this.in = new LineReader(in, recordDelimiter);
|
|
this.start = offset;
|
|
this.start = offset;
|
|
this.pos = offset;
|
|
this.pos = offset;
|
|
this.end = endOffset;
|
|
this.end = endOffset;
|
|
filePosition = null;
|
|
filePosition = null;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public LineRecordReader(InputStream in, long offset, long endOffset,
|
|
|
|
+ Configuration job)
|
|
|
|
+ throws IOException{
|
|
|
|
+ this(in, offset, endOffset, job, null);
|
|
|
|
+ }
|
|
|
|
+
|
|
public LineRecordReader(InputStream in, long offset, long endOffset,
|
|
public LineRecordReader(InputStream in, long offset, long endOffset,
|
|
- Configuration job)
|
|
|
|
|
|
+ Configuration job, byte[] recordDelimiter)
|
|
throws IOException{
|
|
throws IOException{
|
|
this.maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input.
|
|
this.maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input.
|
|
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
|
|
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
|
|
- this.in = new LineReader(in, job);
|
|
|
|
|
|
+ this.in = new LineReader(in, job, recordDelimiter);
|
|
this.start = offset;
|
|
this.start = offset;
|
|
this.pos = offset;
|
|
this.pos = offset;
|
|
this.end = endOffset;
|
|
this.end = endOffset;
|
|
filePosition = null;
|
|
filePosition = null;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
public LongWritable createKey() {
|
|
public LongWritable createKey() {
|
|
return new LongWritable();
|
|
return new LongWritable();
|
|
}
|
|
}
|
|
@@ -171,7 +197,6 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
|
|
return retVal;
|
|
return retVal;
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
/** Read a line. */
|
|
/** Read a line. */
|
|
public synchronized boolean next(LongWritable key, Text value)
|
|
public synchronized boolean next(LongWritable key, Text value)
|
|
throws IOException {
|
|
throws IOException {
|