|
@@ -16,8 +16,9 @@
|
|
|
|
|
|
package org.apache.hadoop.streaming;
|
|
|
|
|
|
-import java.io.DataInputStream;
|
|
|
import java.io.IOException;
|
|
|
+import java.io.InputStream;
|
|
|
+import java.io.PushbackInputStream;
|
|
|
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
|
|
@@ -70,7 +71,7 @@ public class UTF8ByteArrayUtils {
|
|
|
* @return a byte array containing the line
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public static byte[] readLine(DataInputStream in) throws IOException {
|
|
|
+ public static byte[] readLine(InputStream in) throws IOException {
|
|
|
byte [] buf = new byte[128];
|
|
|
byte [] lineBuffer = buf;
|
|
|
int room = 128;
|
|
@@ -84,9 +85,23 @@ public class UTF8ByteArrayUtils {
|
|
|
}
|
|
|
|
|
|
char c = (char)b;
|
|
|
- if (c == '\r' || c == '\n')
|
|
|
+ if (c == '\n')
|
|
|
break;
|
|
|
|
|
|
+ if (c == '\r') {
|
|
|
+ in.mark(1);
|
|
|
+ int c2 = in.read();
|
|
|
+ if(c2 == -1) {
|
|
|
+ isEOF = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ if (c2 != '\n') {
|
|
|
+ // push it back
|
|
|
+ in.reset();
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
if (--room < 0) {
|
|
|
buf = new byte[offset + 128];
|
|
|
room = buf.length - offset - 1;
|