|
@@ -52,145 +52,16 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
|
|
|
* @deprecated Use {@link org.apache.hadoop.util.LineReader} instead.
|
|
|
*/
|
|
|
@Deprecated
|
|
|
- public static class LineReader {
|
|
|
- private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
|
|
|
- private int bufferSize = DEFAULT_BUFFER_SIZE;
|
|
|
- private InputStream in;
|
|
|
- private byte[] buffer;
|
|
|
- // the number of bytes of real data in the buffer
|
|
|
- private int bufferLength = 0;
|
|
|
- // the current position in the buffer
|
|
|
- private int bufferPosn = 0;
|
|
|
-
|
|
|
- /**
|
|
|
- * Create a line reader that reads from the given stream using the
|
|
|
- * given buffer-size.
|
|
|
- * @param in
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
+ public static class LineReader extends org.apache.hadoop.util.LineReader {
|
|
|
+ LineReader(InputStream in) {
|
|
|
+ super(in);
|
|
|
+ }
|
|
|
LineReader(InputStream in, int bufferSize) {
|
|
|
- this.in = in;
|
|
|
- this.bufferSize = bufferSize;
|
|
|
- this.buffer = new byte[this.bufferSize];
|
|
|
+ super(in, bufferSize);
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * Create a line reader that reads from the given stream using the
|
|
|
- * <code>io.file.buffer.size</code> specified in the given
|
|
|
- * <code>Configuration</code>.
|
|
|
- * @param in input stream
|
|
|
- * @param conf configuration
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
public LineReader(InputStream in, Configuration conf) throws IOException {
|
|
|
- this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Fill the buffer with more data.
|
|
|
- * @return was there more data?
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- boolean backfill() throws IOException {
|
|
|
- bufferPosn = 0;
|
|
|
- bufferLength = in.read(buffer);
|
|
|
- return bufferLength > 0;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Close the underlying stream.
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- public void close() throws IOException {
|
|
|
- in.close();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Read from the InputStream into the given Text.
|
|
|
- * @param str the object to store the given line
|
|
|
- * @param maxLineLength the maximum number of bytes to store into str.
|
|
|
- * @param maxBytesToConsume the maximum number of bytes to consume in this call.
|
|
|
- * @return the number of bytes read including the newline
|
|
|
- * @throws IOException if the underlying stream throws
|
|
|
- */
|
|
|
- public int readLine(Text str, int maxLineLength,
|
|
|
- int maxBytesToConsume) throws IOException {
|
|
|
- str.clear();
|
|
|
- boolean hadFinalNewline = false;
|
|
|
- boolean hadFinalReturn = false;
|
|
|
- boolean hitEndOfFile = false;
|
|
|
- int startPosn = bufferPosn;
|
|
|
- long bytesConsumed = 0;
|
|
|
- outerLoop: while (true) {
|
|
|
- if (bufferPosn >= bufferLength) {
|
|
|
- if (!backfill()) {
|
|
|
- hitEndOfFile = true;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- startPosn = bufferPosn;
|
|
|
- for(; bufferPosn < bufferLength; ++bufferPosn) {
|
|
|
- switch (buffer[bufferPosn]) {
|
|
|
- case '\n':
|
|
|
- hadFinalNewline = true;
|
|
|
- bufferPosn += 1;
|
|
|
- break outerLoop;
|
|
|
- case '\r':
|
|
|
- if (hadFinalReturn) {
|
|
|
- // leave this \r in the stream, so we'll get it next time
|
|
|
- break outerLoop;
|
|
|
- }
|
|
|
- hadFinalReturn = true;
|
|
|
- break;
|
|
|
- default:
|
|
|
- if (hadFinalReturn) {
|
|
|
- break outerLoop;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- bytesConsumed += bufferPosn - startPosn;
|
|
|
- int length = bufferPosn - startPosn - (hadFinalReturn ? 1 : 0);
|
|
|
- length = (int)Math.min(length, maxLineLength - str.getLength());
|
|
|
- if (length >= 0) {
|
|
|
- str.append(buffer, startPosn, length);
|
|
|
- }
|
|
|
- if (bytesConsumed >= maxBytesToConsume) {
|
|
|
- return (int)Math.min(bytesConsumed, (long)Integer.MAX_VALUE);
|
|
|
- }
|
|
|
- }
|
|
|
- int newlineLength = (hadFinalNewline ? 1 : 0) + (hadFinalReturn ? 1 : 0);
|
|
|
- if (!hitEndOfFile) {
|
|
|
- bytesConsumed += bufferPosn - startPosn;
|
|
|
- int length = bufferPosn - startPosn - newlineLength;
|
|
|
- length = (int)Math.min(length, maxLineLength - str.getLength());
|
|
|
- if (length > 0) {
|
|
|
- str.append(buffer, startPosn, length);
|
|
|
- }
|
|
|
- }
|
|
|
- return (int)Math.min(bytesConsumed, (long)Integer.MAX_VALUE);
|
|
|
+ super(in, conf);
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * Read from the InputStream into the given Text.
|
|
|
- * @param str the object to store the given line
|
|
|
- * @param maxLineLength the maximum number of bytes to store into str.
|
|
|
- * @return the number of bytes read including the newline
|
|
|
- * @throws IOException if the underlying stream throws
|
|
|
- */
|
|
|
- public int readLine(Text str, int maxLineLength) throws IOException {
|
|
|
- return readLine(str, maxLineLength, Integer.MAX_VALUE);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Read from the InputStream into the given Text.
|
|
|
- * @param str the object to store the given line
|
|
|
- * @return the number of bytes read including the newline
|
|
|
- * @throws IOException if the underlying stream throws
|
|
|
- */
|
|
|
- public int readLine(Text str) throws IOException {
|
|
|
- return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
|
|
|
- }
|
|
|
-
|
|
|
}
|
|
|
|
|
|
public LineRecordReader(Configuration job,
|
|
@@ -228,7 +99,7 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
|
|
|
public LineRecordReader(InputStream in, long offset, long endOffset,
|
|
|
int maxLineLength) {
|
|
|
this.maxLineLength = maxLineLength;
|
|
|
- this.in = new LineReader(in, LineReader.DEFAULT_BUFFER_SIZE);
|
|
|
+ this.in = new LineReader(in);
|
|
|
this.start = offset;
|
|
|
this.pos = offset;
|
|
|
this.end = endOffset;
|