|
@@ -81,13 +81,13 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
|
|
start = split.getStart();
|
|
start = split.getStart();
|
|
end = start + split.getLength();
|
|
end = start + split.getLength();
|
|
final Path file = split.getPath();
|
|
final Path file = split.getPath();
|
|
|
|
+ compressionCodecs = new CompressionCodecFactory(job);
|
|
|
|
+ codec = compressionCodecs.getCodec(file);
|
|
|
|
|
|
// open the file and seek to the start of the split
|
|
// open the file and seek to the start of the split
|
|
final FileSystem fs = file.getFileSystem(job);
|
|
final FileSystem fs = file.getFileSystem(job);
|
|
fileIn = fs.open(file);
|
|
fileIn = fs.open(file);
|
|
if (isCompressedInput()) {
|
|
if (isCompressedInput()) {
|
|
- compressionCodecs = new CompressionCodecFactory(job);
|
|
|
|
- codec = compressionCodecs.getCodec(file);
|
|
|
|
decompressor = CodecPool.getDecompressor(codec);
|
|
decompressor = CodecPool.getDecompressor(codec);
|
|
if (codec instanceof SplittableCompressionCodec) {
|
|
if (codec instanceof SplittableCompressionCodec) {
|
|
final SplitCompressionInputStream cIn =
|
|
final SplitCompressionInputStream cIn =
|
|
@@ -166,6 +166,9 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
|
|
while (getFilePosition() <= end) {
|
|
while (getFilePosition() <= end) {
|
|
newSize = in.readLine(value, maxLineLength,
|
|
newSize = in.readLine(value, maxLineLength,
|
|
Math.max(maxBytesToConsume(pos), maxLineLength));
|
|
Math.max(maxBytesToConsume(pos), maxLineLength));
|
|
|
|
+ if (newSize == 0) {
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
pos += newSize;
|
|
pos += newSize;
|
|
if (newSize < maxLineLength) {
|
|
if (newSize < maxLineLength) {
|
|
break;
|
|
break;
|
|
@@ -216,4 +219,4 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-}
|
|
|
|
|
|
+}
|