|
@@ -26,6 +26,14 @@ import org.apache.hadoop.io.Text;
|
|
|
|
|
|
/**
|
|
|
* A class that provides a line reader from an input stream.
|
|
|
+ * Depending on the constructor used, lines will either be terminated by:
|
|
|
+ * <ul>
|
|
|
+ * <li>one of the following: '\n' (LF) , '\r' (CR),
|
|
|
+ * or '\r\n' (CR+LF).</li>
|
|
|
+ * <li><em>or</em>, a custom byte sequence delimiter</li>
|
|
|
+ * </ul>
|
|
|
+ * In both cases, EOF also terminates an otherwise unterminated
|
|
|
+ * line.
|
|
|
*/
|
|
|
public class LineReader {
|
|
|
private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
|
|
@@ -40,6 +48,9 @@ public class LineReader {
|
|
|
private static final byte CR = '\r';
|
|
|
private static final byte LF = '\n';
|
|
|
|
|
|
+ // The line delimiter
|
|
|
+ private final byte[] recordDelimiterBytes;
|
|
|
+
|
|
|
/**
|
|
|
* Create a line reader that reads from the given stream using the
|
|
|
* default buffer-size (64k).
|
|
@@ -61,6 +72,7 @@ public class LineReader {
|
|
|
this.in = in;
|
|
|
this.bufferSize = bufferSize;
|
|
|
this.buffer = new byte[this.bufferSize];
|
|
|
+ this.recordDelimiterBytes = null;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -75,6 +87,56 @@ public class LineReader {
|
|
|
this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Create a line reader that reads from the given stream using the
|
|
|
+ * default buffer-size, and using a custom delimiter of array of
|
|
|
+ * bytes.
|
|
|
+ * @param in The input stream
|
|
|
+ * @param recordDelimiterBytes The delimiter
|
|
|
+ */
|
|
|
+ public LineReader(InputStream in, byte[] recordDelimiterBytes) {
|
|
|
+ this.in = in;
|
|
|
+ this.bufferSize = DEFAULT_BUFFER_SIZE;
|
|
|
+ this.buffer = new byte[this.bufferSize];
|
|
|
+ this.recordDelimiterBytes = recordDelimiterBytes;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Create a line reader that reads from the given stream using the
|
|
|
+ * given buffer-size, and using a custom delimiter of array of
|
|
|
+ * bytes.
|
|
|
+ * @param in The input stream
|
|
|
+ * @param bufferSize Size of the read buffer
|
|
|
+ * @param recordDelimiterBytes The delimiter
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public LineReader(InputStream in, int bufferSize,
|
|
|
+ byte[] recordDelimiterBytes) {
|
|
|
+ this.in = in;
|
|
|
+ this.bufferSize = bufferSize;
|
|
|
+ this.buffer = new byte[this.bufferSize];
|
|
|
+ this.recordDelimiterBytes = recordDelimiterBytes;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Create a line reader that reads from the given stream using the
|
|
|
+ * <code>io.file.buffer.size</code> specified in the given
|
|
|
+ * <code>Configuration</code>, and using a custom delimiter of array of
|
|
|
+ * bytes.
|
|
|
+ * @param in input stream
|
|
|
+ * @param conf configuration
|
|
|
+ * @param recordDelimiterBytes The delimiter
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public LineReader(InputStream in, Configuration conf,
|
|
|
+ byte[] recordDelimiterBytes) throws IOException {
|
|
|
+ this.in = in;
|
|
|
+ this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
|
|
|
+ this.buffer = new byte[this.bufferSize];
|
|
|
+ this.recordDelimiterBytes = recordDelimiterBytes;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* Close the underlying stream.
|
|
|
* @throws IOException
|
|
@@ -84,10 +146,7 @@ public class LineReader {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Read one line from the InputStream into the given Text. A line
|
|
|
- * can be terminated by one of the following: '\n' (LF) , '\r' (CR),
|
|
|
- * or '\r\n' (CR+LF). EOF also terminates an otherwise unterminated
|
|
|
- * line.
|
|
|
+ * Read one line from the InputStream into the given Text.
|
|
|
*
|
|
|
* @param str the object to store the given line (without newline)
|
|
|
* @param maxLineLength the maximum number of bytes to store into str;
|
|
@@ -104,6 +163,18 @@ public class LineReader {
|
|
|
*/
|
|
|
public int readLine(Text str, int maxLineLength,
|
|
|
int maxBytesToConsume) throws IOException {
|
|
|
+ if (this.recordDelimiterBytes != null) {
|
|
|
+ return readCustomLine(str, maxLineLength, maxBytesToConsume);
|
|
|
+ } else {
|
|
|
+ return readDefaultLine(str, maxLineLength, maxBytesToConsume);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Read a line terminated by one of CR, LF, or CRLF.
|
|
|
+ */
|
|
|
+ private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
|
|
|
+ throws IOException {
|
|
|
/* We're reading data from in, but the head of the stream may be
|
|
|
* already buffered in buffer, so we have several cases:
|
|
|
* 1. No newline characters are in the buffer, so we need to copy
|
|
@@ -166,6 +237,52 @@ public class LineReader {
|
|
|
return (int)bytesConsumed;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Read a line terminated by a custom delimiter.
|
|
|
+ */
|
|
|
+ private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
|
|
|
+ throws IOException {
|
|
|
+ str.clear();
|
|
|
+ int txtLength = 0; // tracks str.getLength(), as an optimization
|
|
|
+ long bytesConsumed = 0;
|
|
|
+ int delPosn = 0;
|
|
|
+ do {
|
|
|
+ int startPosn = bufferPosn; // starting from where we left off the last
|
|
|
+ // time
|
|
|
+ if (bufferPosn >= bufferLength) {
|
|
|
+ startPosn = bufferPosn = 0;
|
|
|
+ bufferLength = in.read(buffer);
|
|
|
+ if (bufferLength <= 0)
|
|
|
+ break; // EOF
|
|
|
+ }
|
|
|
+ for (; bufferPosn < bufferLength; ++bufferPosn) {
|
|
|
+ if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
|
|
|
+ delPosn++;
|
|
|
+ if (delPosn >= recordDelimiterBytes.length) {
|
|
|
+ bufferPosn++;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ delPosn = 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ int readLength = bufferPosn - startPosn;
|
|
|
+ bytesConsumed += readLength;
|
|
|
+ int appendLength = readLength - delPosn;
|
|
|
+ if (appendLength > maxLineLength - txtLength) {
|
|
|
+ appendLength = maxLineLength - txtLength;
|
|
|
+ }
|
|
|
+ if (appendLength > 0) {
|
|
|
+ str.append(buffer, startPosn, appendLength);
|
|
|
+ txtLength += appendLength;
|
|
|
+ }
|
|
|
+ } while (delPosn < recordDelimiterBytes.length
|
|
|
+ && bytesConsumed < maxBytesToConsume);
|
|
|
+ if (bytesConsumed > (long) Integer.MAX_VALUE)
|
|
|
+ throw new IOException("Too many bytes before delimiter: " + bytesConsumed);
|
|
|
+ return (int) bytesConsumed;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Read from the InputStream into the given Text.
|
|
|
* @param str the object to store the given line
|