|
@@ -18,8 +18,6 @@
|
|
|
|
|
|
package org.apache.hadoop.mapred;
|
|
|
|
|
|
-import java.io.BufferedInputStream;
|
|
|
-import java.io.ByteArrayOutputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
|
import java.io.OutputStream;
|
|
@@ -38,30 +36,126 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory;
|
|
|
*/
|
|
|
public class LineRecordReader implements RecordReader<LongWritable, Text> {
|
|
|
private CompressionCodecFactory compressionCodecs = null;
|
|
|
- private long start;
|
|
|
+ private long start;
|
|
|
private long pos;
|
|
|
private long end;
|
|
|
- private BufferedInputStream in;
|
|
|
- private ByteArrayOutputStream buffer = new ByteArrayOutputStream(256);
|
|
|
+ private LineReader in;
|
|
|
+
|
|
|
/**
|
|
|
- * Provide a bridge to get the bytes from the ByteArrayOutputStream
|
|
|
- * without creating a new byte array.
|
|
|
+ * A class that provides a line reader from an input stream.
|
|
|
*/
|
|
|
- private static class TextStuffer extends OutputStream {
|
|
|
- public Text target;
|
|
|
- public void write(int b) {
|
|
|
- throw new UnsupportedOperationException("write(byte) not supported");
|
|
|
+ public static class LineReader {
|
|
|
+ private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
|
|
|
+ private int bufferSize = DEFAULT_BUFFER_SIZE;
|
|
|
+ private InputStream in;
|
|
|
+ private byte[] buffer;
|
|
|
+ // the number of bytes of real data in the buffer
|
|
|
+ private int bufferLength = 0;
|
|
|
+ // the current position in the buffer
|
|
|
+ private int bufferPosn = 0;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Create a line reader that reads from the given stream using the
|
|
|
+ * given buffer-size.
|
|
|
+ * @param in
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ LineReader(InputStream in, int bufferSize) {
|
|
|
+ this.in = in;
|
|
|
+ this.bufferSize = bufferSize;
|
|
|
+ this.buffer = new byte[this.bufferSize];
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Create a line reader that reads from the given stream using the
|
|
|
+ * <code>io.file.buffer.size</code> specified in the given
|
|
|
+ * <code>Configuration</code>.
|
|
|
+ * @param in input stream
|
|
|
+ * @param conf configuration
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ LineReader(InputStream in, Configuration conf) throws IOException {
|
|
|
+ this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Fill the buffer with more data.
|
|
|
+ * @return was there more data?
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ boolean backfill() throws IOException {
|
|
|
+ bufferPosn = 0;
|
|
|
+ bufferLength = in.read(buffer);
|
|
|
+ return bufferLength > 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Close the underlying stream.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public void close() throws IOException {
|
|
|
+ in.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Read from the InputStream into the given Text.
|
|
|
+ * @param str the object to store the given line
|
|
|
+ * @return the number of bytes read including the newline
|
|
|
+ * @throws IOException if the underlying stream throws
|
|
|
+ */
|
|
|
+ public int readLine(Text str) throws IOException {
|
|
|
+ str.clear();
|
|
|
+ boolean hadFinalNewline = false;
|
|
|
+ boolean hadFinalReturn = false;
|
|
|
+ boolean hitEndOfFile = false;
|
|
|
+ int startPosn = bufferPosn;
|
|
|
+ outerLoop: while (true) {
|
|
|
+ if (bufferPosn >= bufferLength) {
|
|
|
+ if (!backfill()) {
|
|
|
+ hitEndOfFile = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ startPosn = bufferPosn;
|
|
|
+ for(; bufferPosn < bufferLength; ++bufferPosn) {
|
|
|
+ switch (buffer[bufferPosn]) {
|
|
|
+ case '\n':
|
|
|
+ hadFinalNewline = true;
|
|
|
+ bufferPosn += 1;
|
|
|
+ break outerLoop;
|
|
|
+ case '\r':
|
|
|
+ if (hadFinalReturn) {
|
|
|
+ // leave this \n in the stream, so we'll get it next time
|
|
|
+ break outerLoop;
|
|
|
+ }
|
|
|
+ hadFinalReturn = true;
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ if (hadFinalReturn) {
|
|
|
+ break outerLoop;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ int length = bufferPosn - startPosn - (hadFinalReturn ? 1 : 0);
|
|
|
+ if (length >= 0) {
|
|
|
+ str.append(buffer, startPosn, length);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ int newlineLength = (hadFinalNewline ? 1 : 0) + (hadFinalReturn ? 1 : 0);
|
|
|
+ if (!hitEndOfFile) {
|
|
|
+ int length = bufferPosn - startPosn - newlineLength;
|
|
|
+ if (length > 0) {
|
|
|
+ str.append(buffer, startPosn, length);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return str.getLength() + newlineLength;
|
|
|
}
|
|
|
- public void write(byte[] data, int offset, int len) throws IOException {
|
|
|
- target.set(data, offset, len);
|
|
|
- }
|
|
|
}
|
|
|
- private TextStuffer bridge = new TextStuffer();
|
|
|
|
|
|
- public LineRecordReader(Configuration job, FileSplit split)
|
|
|
- throws IOException {
|
|
|
- long start = split.getStart();
|
|
|
- long end = start + split.getLength();
|
|
|
+ public LineRecordReader(Configuration job,
|
|
|
+ FileSplit split) throws IOException {
|
|
|
+ start = split.getStart();
|
|
|
+ end = start + split.getLength();
|
|
|
final Path file = split.getPath();
|
|
|
compressionCodecs = new CompressionCodecFactory(job);
|
|
|
final CompressionCodec codec = compressionCodecs.getCodec(file);
|
|
@@ -69,33 +163,38 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
|
|
|
// open the file and seek to the start of the split
|
|
|
FileSystem fs = file.getFileSystem(job);
|
|
|
FSDataInputStream fileIn = fs.open(split.getPath());
|
|
|
- InputStream in = fileIn;
|
|
|
boolean skipFirstLine = false;
|
|
|
if (codec != null) {
|
|
|
- in = codec.createInputStream(fileIn);
|
|
|
+ in = new LineReader(codec.createInputStream(fileIn), job);
|
|
|
end = Long.MAX_VALUE;
|
|
|
- } else if (start != 0) {
|
|
|
- skipFirstLine = true; // wait till BufferedInputStream to skip
|
|
|
- --start;
|
|
|
- fileIn.seek(start);
|
|
|
+ } else {
|
|
|
+ if (start != 0) {
|
|
|
+ skipFirstLine = true;
|
|
|
+ --start;
|
|
|
+ fileIn.seek(start);
|
|
|
+ }
|
|
|
+ in = new LineReader(fileIn, job);
|
|
|
}
|
|
|
-
|
|
|
- this.in = new BufferedInputStream(in);
|
|
|
if (skipFirstLine) { // skip first line and re-establish "start".
|
|
|
- start += LineRecordReader.readLine(this.in, null);
|
|
|
+ start += in.readLine(new Text());
|
|
|
}
|
|
|
- this.start = start;
|
|
|
this.pos = start;
|
|
|
- this.end = end;
|
|
|
}
|
|
|
|
|
|
- public LineRecordReader(InputStream in, long offset, long endOffset)
|
|
|
+ public LineRecordReader(InputStream in, long offset, long endOffset) {
|
|
|
+ this.in = new LineReader(in, LineReader.DEFAULT_BUFFER_SIZE);
|
|
|
+ this.start = offset;
|
|
|
+ this.pos = offset;
|
|
|
+ this.end = endOffset;
|
|
|
+ }
|
|
|
+
|
|
|
+ public LineRecordReader(InputStream in, long offset, long endOffset,
|
|
|
+ Configuration job)
|
|
|
throws IOException{
|
|
|
- this.in = new BufferedInputStream(in);
|
|
|
+ this.in = new LineReader(in, job);
|
|
|
this.start = offset;
|
|
|
this.pos = offset;
|
|
|
this.end = endOffset;
|
|
|
- // readLine(in, null);
|
|
|
}
|
|
|
|
|
|
public LongWritable createKey() {
|
|
@@ -113,21 +212,17 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
|
|
|
return false;
|
|
|
|
|
|
key.set(pos); // key is position
|
|
|
- buffer.reset();
|
|
|
- long bytesRead = readLine();
|
|
|
- if (bytesRead == 0) {
|
|
|
- return false;
|
|
|
+ int newSize = in.readLine(value);
|
|
|
+ if (newSize > 0) {
|
|
|
+ pos += newSize;
|
|
|
+ return true;
|
|
|
}
|
|
|
- pos += bytesRead;
|
|
|
- bridge.target = value;
|
|
|
- buffer.writeTo(bridge);
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- protected long readLine() throws IOException {
|
|
|
- return LineRecordReader.readLine(in, buffer);
|
|
|
+ return false;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * @deprecated
|
|
|
+ */
|
|
|
public static long readLine(InputStream in,
|
|
|
OutputStream out) throws IOException {
|
|
|
long bytes = 0;
|
|
@@ -177,7 +272,9 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
|
|
|
return pos;
|
|
|
}
|
|
|
|
|
|
- public synchronized void close() throws IOException {
|
|
|
- in.close();
|
|
|
+ public synchronized void close() throws IOException {
|
|
|
+ if (in != null) {
|
|
|
+ in.close();
|
|
|
+ }
|
|
|
}
|
|
|
}
|