|
@@ -52,7 +52,6 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
|
|
|
public static final String MAX_LINE_LENGTH =
|
|
|
"mapreduce.input.linerecordreader.line.maxlength";
|
|
|
|
|
|
- private CompressionCodecFactory compressionCodecs = null;
|
|
|
private long start;
|
|
|
private long pos;
|
|
|
private long end;
|
|
@@ -60,9 +59,9 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
|
|
|
private FSDataInputStream fileIn;
|
|
|
private Seekable filePosition;
|
|
|
private int maxLineLength;
|
|
|
- private LongWritable key = null;
|
|
|
- private Text value = null;
|
|
|
- private CompressionCodec codec;
|
|
|
+ private LongWritable key;
|
|
|
+ private Text value;
|
|
|
+ private boolean isCompressedInput;
|
|
|
private Decompressor decompressor;
|
|
|
private byte[] recordDelimiterBytes;
|
|
|
|
|
@@ -81,13 +80,14 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
|
|
|
start = split.getStart();
|
|
|
end = start + split.getLength();
|
|
|
final Path file = split.getPath();
|
|
|
- compressionCodecs = new CompressionCodecFactory(job);
|
|
|
- codec = compressionCodecs.getCodec(file);
|
|
|
|
|
|
// open the file and seek to the start of the split
|
|
|
final FileSystem fs = file.getFileSystem(job);
|
|
|
fileIn = fs.open(file);
|
|
|
- if (isCompressedInput()) {
|
|
|
+
|
|
|
+ CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
|
|
|
+ if (null!=codec) {
|
|
|
+ isCompressedInput = true;
|
|
|
decompressor = CodecPool.getDecompressor(codec);
|
|
|
if (codec instanceof SplittableCompressionCodec) {
|
|
|
final SplitCompressionInputStream cIn =
|
|
@@ -132,19 +132,16 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
|
|
|
this.pos = start;
|
|
|
}
|
|
|
|
|
|
- private boolean isCompressedInput() {
|
|
|
- return (codec != null);
|
|
|
- }
|
|
|
|
|
|
private int maxBytesToConsume(long pos) {
|
|
|
- return isCompressedInput()
|
|
|
+ return isCompressedInput
|
|
|
? Integer.MAX_VALUE
|
|
|
: (int) Math.min(Integer.MAX_VALUE, end - pos);
|
|
|
}
|
|
|
|
|
|
private long getFilePosition() throws IOException {
|
|
|
long retVal;
|
|
|
- if (isCompressedInput() && null != filePosition) {
|
|
|
+ if (isCompressedInput && null != filePosition) {
|
|
|
retVal = filePosition.getPos();
|
|
|
} else {
|
|
|
retVal = pos;
|
|
@@ -166,9 +163,6 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
|
|
|
while (getFilePosition() <= end) {
|
|
|
newSize = in.readLine(value, maxLineLength,
|
|
|
Math.max(maxBytesToConsume(pos), maxLineLength));
|
|
|
- if (newSize == 0) {
|
|
|
- break;
|
|
|
- }
|
|
|
pos += newSize;
|
|
|
if (newSize < maxLineLength) {
|
|
|
break;
|