|
@@ -36,11 +36,11 @@ import org.apache.hadoop.mapred.JobConf;
|
|
|
import org.apache.hadoop.mapred.Reporter;
|
|
|
import org.apache.hadoop.mapred.OutputCollector;
|
|
|
import org.apache.hadoop.mapred.TaskLog;
|
|
|
+import org.apache.hadoop.mapred.LineRecordReader.LineReader;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.io.BytesWritable;
|
|
|
-import org.apache.hadoop.io.Writable;
|
|
|
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
|
|
@@ -354,14 +354,16 @@ public abstract class PipeMapRed {
|
|
|
* @param val: value of a record
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- void splitKeyVal(byte[] line, Text key, Text val) throws IOException {
|
|
|
- int pos = UTF8ByteArrayUtils.findNthByte(line, (byte)this.getFieldSeparator(), this.getNumOfKeyFields());
|
|
|
+ void splitKeyVal(byte[] line, int length, Text key, Text val)
|
|
|
+ throws IOException {
|
|
|
+ int pos = UTF8ByteArrayUtils.findNthByte(line, 0, length,
|
|
|
+ (byte)this.getFieldSeparator(), this.getNumOfKeyFields());
|
|
|
try {
|
|
|
if (pos == -1) {
|
|
|
- key.set(line);
|
|
|
+ key.set(line, 0, length);
|
|
|
val.set("");
|
|
|
} else {
|
|
|
- UTF8ByteArrayUtils.splitKeyVal(line, key, val, pos);
|
|
|
+ UTF8ByteArrayUtils.splitKeyVal(line, 0, length, key, val, pos);
|
|
|
}
|
|
|
} catch (CharacterCodingException e) {
|
|
|
LOG.warn(StringUtils.stringifyException(e));
|
|
@@ -377,15 +379,18 @@ public abstract class PipeMapRed {
|
|
|
}
|
|
|
|
|
|
public void run() {
|
|
|
+ LineReader lineReader = null;
|
|
|
try {
|
|
|
Text key = new Text();
|
|
|
Text val = new Text();
|
|
|
+ Text line = new Text();
|
|
|
+ lineReader = new LineReader((InputStream)clientIn_, job_);
|
|
|
// 3/4 Tool to Hadoop
|
|
|
- while ((answer = UTF8ByteArrayUtils.readLine((InputStream) clientIn_)) != null) {
|
|
|
-
|
|
|
- splitKeyVal(answer, key, val);
|
|
|
+ while (lineReader.readLine(line) > 0) {
|
|
|
+ answer = line.getBytes();
|
|
|
+ splitKeyVal(answer, line.getLength(), key, val);
|
|
|
output.collect(key, val);
|
|
|
-
|
|
|
+ line.clear();
|
|
|
numRecWritten_++;
|
|
|
long now = System.currentTimeMillis();
|
|
|
if (now-lastStdoutReport > reporterOutDelay_) {
|
|
@@ -396,6 +401,9 @@ public abstract class PipeMapRed {
|
|
|
logflush();
|
|
|
}
|
|
|
}
|
|
|
+ if (lineReader != null) {
|
|
|
+ lineReader.close();
|
|
|
+ }
|
|
|
if (clientIn_ != null) {
|
|
|
clientIn_.close();
|
|
|
clientIn_ = null;
|
|
@@ -405,6 +413,9 @@ public abstract class PipeMapRed {
|
|
|
outerrThreadsThrowable = th;
|
|
|
LOG.warn(StringUtils.stringifyException(th));
|
|
|
try {
|
|
|
+ if (lineReader != null) {
|
|
|
+ lineReader.close();
|
|
|
+ }
|
|
|
if (clientIn_ != null) {
|
|
|
clientIn_.close();
|
|
|
clientIn_ = null;
|
|
@@ -433,18 +444,23 @@ public abstract class PipeMapRed {
|
|
|
}
|
|
|
|
|
|
public void run() {
|
|
|
- byte[] line;
|
|
|
+ Text line = new Text();
|
|
|
+ LineReader lineReader = null;
|
|
|
try {
|
|
|
- while ((line = UTF8ByteArrayUtils.readLine((InputStream) clientErr_)) != null) {
|
|
|
- String lineStr = new String(line, "UTF-8");
|
|
|
- System.err.println(lineStr);
|
|
|
+ lineReader = new LineReader((InputStream)clientErr_, job_);
|
|
|
+ while (lineReader.readLine(line) > 0) {
|
|
|
+ System.err.println(line.toString());
|
|
|
long now = System.currentTimeMillis();
|
|
|
if (reporter != null && now-lastStderrReport > reporterErrDelay_) {
|
|
|
lastStderrReport = now;
|
|
|
reporter.progress();
|
|
|
}
|
|
|
+ line.clear();
|
|
|
}
|
|
|
- if (clientErr_ != null) {
|
|
|
+ if (lineReader != null) {
|
|
|
+ lineReader.close();
|
|
|
+ }
|
|
|
+ if (clientErr_ != null) {
|
|
|
clientErr_.close();
|
|
|
clientErr_ = null;
|
|
|
LOG.info("MRErrorThread done");
|
|
@@ -453,6 +469,9 @@ public abstract class PipeMapRed {
|
|
|
outerrThreadsThrowable = th;
|
|
|
LOG.warn(StringUtils.stringifyException(th));
|
|
|
try {
|
|
|
+ if (lineReader != null) {
|
|
|
+ lineReader.close();
|
|
|
+ }
|
|
|
if (clientErr_ != null) {
|
|
|
clientErr_.close();
|
|
|
clientErr_ = null;
|