|
@@ -17,7 +17,7 @@
|
|
|
package org.apache.hadoop.streaming;
|
|
|
|
|
|
import java.io.*;
|
|
|
-import java.nio.channels.*;
|
|
|
+import java.nio.charset.CharacterCodingException;
|
|
|
import java.io.IOException;
|
|
|
import java.util.Date;
|
|
|
import java.util.Map;
|
|
@@ -30,17 +30,12 @@ import java.util.regex.*;
|
|
|
import org.apache.commons.logging.*;
|
|
|
|
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
|
-import org.apache.hadoop.mapred.Mapper;
|
|
|
-import org.apache.hadoop.mapred.Reducer;
|
|
|
import org.apache.hadoop.mapred.Reporter;
|
|
|
import org.apache.hadoop.mapred.OutputCollector;
|
|
|
+import org.apache.hadoop.util.StringUtils;
|
|
|
|
|
|
-import org.apache.hadoop.io.UTF8;
|
|
|
+import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.io.BytesWritable;
|
|
|
-import org.apache.hadoop.io.LongWritable;
|
|
|
-import org.apache.hadoop.io.WritableComparator;
|
|
|
-import org.apache.hadoop.io.WritableComparable;
|
|
|
-import org.apache.hadoop.io.LongWritable;
|
|
|
import org.apache.hadoop.io.Writable;
|
|
|
|
|
|
import org.apache.hadoop.fs.Path;
|
|
@@ -170,6 +165,9 @@ public abstract class PipeMapRed {
|
|
|
if(debug_) {
|
|
|
System.out.println("PipeMapRed: stream.debug=true");
|
|
|
}
|
|
|
+
|
|
|
+ joinDelay_ = job.getLong("stream.joindelay.milli", 0);
|
|
|
+
|
|
|
job_ = job;
|
|
|
|
|
|
// Currently: null is identity reduce. REDUCE_NONE is no-map-outputs.
|
|
@@ -228,7 +226,6 @@ public abstract class PipeMapRed {
|
|
|
clientOut_ = new DataOutputStream(new BufferedOutputStream(sim.getOutputStream()));
|
|
|
clientIn_ = new DataInputStream(new BufferedInputStream(sim.getInputStream()));
|
|
|
clientErr_ = new DataInputStream(new BufferedInputStream(sim.getErrorStream()));
|
|
|
- doneLock_ = new Object();
|
|
|
startTime_ = System.currentTimeMillis();
|
|
|
|
|
|
} catch(Exception e) {
|
|
@@ -265,7 +262,7 @@ public abstract class PipeMapRed {
|
|
|
if(log_ != null) {
|
|
|
log_.println(s);
|
|
|
} else {
|
|
|
- System.err.println(s); // or LOG.info()
|
|
|
+ LOG.info(s); // or LOG.info()
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -341,28 +338,49 @@ public abstract class PipeMapRed {
|
|
|
|
|
|
void startOutputThreads(OutputCollector output, Reporter reporter)
|
|
|
{
|
|
|
- outputDone_ = false;
|
|
|
- errorDone_ = false;
|
|
|
outThread_ = new MROutputThread(output, reporter);
|
|
|
outThread_.start();
|
|
|
errThread_ = new MRErrorThread(reporter);
|
|
|
errThread_.start();
|
|
|
}
|
|
|
+
|
|
|
+ void waitOutputThreads() {
|
|
|
+ try {
|
|
|
+ sim.waitFor();
|
|
|
+ if(outThread_ != null) {
|
|
|
+ outThread_.join(joinDelay_);
|
|
|
+ }
|
|
|
+ if(errThread_ != null) {
|
|
|
+ errThread_.join(joinDelay_);
|
|
|
+ }
|
|
|
+ } catch(InterruptedException e) {
|
|
|
+ //ignore
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- void splitKeyVal(String line, UTF8 key, UTF8 val)
|
|
|
+ /**
|
|
|
+ * Split a line into key and value. Assume the delimitor is a tab.
|
|
|
+ * @param line: a byte array of line containing UTF-8 bytes
|
|
|
+ * @param key: key of a record
|
|
|
+ * @param val: value of a record
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ void splitKeyVal(byte [] line, Text key, Text val) throws IOException
|
|
|
{
|
|
|
- int pos;
|
|
|
- if(keyCols_ == ALL_COLS) {
|
|
|
- pos = -1;
|
|
|
- } else {
|
|
|
- pos = line.indexOf('\t');
|
|
|
+ int pos=-1;
|
|
|
+ if(keyCols_ != ALL_COLS) {
|
|
|
+ pos = UTF8ByteArrayUtils.findTab(line);
|
|
|
}
|
|
|
- if(pos == -1) {
|
|
|
- key.set(line);
|
|
|
- val.set("");
|
|
|
- } else {
|
|
|
- key.set(line.substring(0, pos));
|
|
|
- val.set(line.substring(pos+1));
|
|
|
+ try {
|
|
|
+ if(pos == -1) {
|
|
|
+ key.set(line);
|
|
|
+ val.set("");
|
|
|
+ } else {
|
|
|
+ UTF8ByteArrayUtils.splitKeyVal(line, key, val, pos);
|
|
|
+ }
|
|
|
+ } catch (CharacterCodingException e) {
|
|
|
+ LOG.warn(e);
|
|
|
+ StringUtils.stringifyException(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -375,41 +393,33 @@ public abstract class PipeMapRed {
|
|
|
this.reporter = reporter;
|
|
|
}
|
|
|
public void run() {
|
|
|
- try {
|
|
|
- try {
|
|
|
- UTF8 EMPTY = new UTF8("");
|
|
|
- UTF8 key = new UTF8();
|
|
|
- UTF8 val = new UTF8();
|
|
|
- // 3/4 Tool to Hadoop
|
|
|
- while((answer = clientIn_.readLine()) != null) {
|
|
|
+ try {
|
|
|
+ Text key = new Text();
|
|
|
+ Text val = new Text();
|
|
|
+ // 3/4 Tool to Hadoop
|
|
|
+ while((answer=UTF8ByteArrayUtils.readLine(clientIn_))!= null) {
|
|
|
// 4/4 Hadoop out
|
|
|
if(optSideEffect_) {
|
|
|
- sideEffectOut_.write(answer.getBytes());
|
|
|
- sideEffectOut_.write('\n');
|
|
|
+ sideEffectOut_.write(answer);
|
|
|
+ sideEffectOut_.write('\n');
|
|
|
} else {
|
|
|
- splitKeyVal(answer, key, val);
|
|
|
- output.collect(key, val);
|
|
|
- numRecWritten_++;
|
|
|
- if(numRecWritten_ % 100 == 0) {
|
|
|
+ splitKeyVal(answer, key, val);
|
|
|
+ output.collect(key, val);
|
|
|
+ }
|
|
|
+ numRecWritten_++;
|
|
|
+ if(numRecWritten_ % 100 == 0) {
|
|
|
logprintln(numRecRead_+"/"+numRecWritten_);
|
|
|
logflush();
|
|
|
- }
|
|
|
}
|
|
|
- }
|
|
|
- } catch(IOException io) {
|
|
|
- io.printStackTrace(log_);
|
|
|
}
|
|
|
- logprintln("MROutputThread done");
|
|
|
- } finally {
|
|
|
- outputDone_ = true;
|
|
|
- synchronized(doneLock_) {
|
|
|
- doneLock_.notifyAll();
|
|
|
- }
|
|
|
- }
|
|
|
+ } catch(IOException io) {
|
|
|
+ io.printStackTrace(log_);
|
|
|
+ }
|
|
|
+ logprintln("MROutputThread done");
|
|
|
}
|
|
|
OutputCollector output;
|
|
|
Reporter reporter;
|
|
|
- String answer;
|
|
|
+ byte [] answer;
|
|
|
}
|
|
|
|
|
|
class MRErrorThread extends Thread
|
|
@@ -421,26 +431,21 @@ public abstract class PipeMapRed {
|
|
|
}
|
|
|
public void run()
|
|
|
{
|
|
|
- String line;
|
|
|
+ byte [] line;
|
|
|
try {
|
|
|
long num = 0;
|
|
|
- int bucket = 100;
|
|
|
- while((line=clientErr_.readLine()) != null) {
|
|
|
+ while((line=UTF8ByteArrayUtils.readLine(clientErr_)) != null) {
|
|
|
num++;
|
|
|
- logprintln(line);
|
|
|
+ String lineStr = new String(line, "UTF-8");
|
|
|
+ logprintln(lineStr);
|
|
|
if(num < 10) {
|
|
|
- String hline = "MRErr: " + line;
|
|
|
+ String hline = "MRErr: " + lineStr;
|
|
|
System.err.println(hline);
|
|
|
reporter.setStatus(hline);
|
|
|
}
|
|
|
}
|
|
|
} catch(IOException io) {
|
|
|
io.printStackTrace(log_);
|
|
|
- } finally {
|
|
|
- errorDone_ = true;
|
|
|
- synchronized(doneLock_) {
|
|
|
- doneLock_.notifyAll();
|
|
|
- }
|
|
|
}
|
|
|
}
|
|
|
Reporter reporter;
|
|
@@ -448,42 +453,31 @@ public abstract class PipeMapRed {
|
|
|
|
|
|
public void mapRedFinished()
|
|
|
{
|
|
|
- logprintln("mapRedFinished");
|
|
|
- try {
|
|
|
- if(!doPipe_) return;
|
|
|
- try {
|
|
|
- if(optSideEffect_) {
|
|
|
- logprintln("closing " + sideEffectPath_);
|
|
|
- sideEffectOut_.close();
|
|
|
- logprintln("closed " + sideEffectPath_);
|
|
|
- }
|
|
|
- } catch(IOException io) {
|
|
|
- io.printStackTrace();
|
|
|
- }
|
|
|
- try {
|
|
|
- if(clientOut_ != null) {
|
|
|
- clientOut_.close();
|
|
|
- }
|
|
|
- } catch(IOException io) {
|
|
|
- }
|
|
|
- if(outThread_ == null) {
|
|
|
- // no input records: threads were never spawned
|
|
|
- } else {
|
|
|
+ logprintln("mapRedFinished");
|
|
|
+ if(!doPipe_) return;
|
|
|
+
|
|
|
try {
|
|
|
- while(!outputDone_ || !errorDone_) {
|
|
|
- synchronized(doneLock_) {
|
|
|
- doneLock_.wait();
|
|
|
+ try {
|
|
|
+ if(clientOut_ != null) {
|
|
|
+ clientOut_.close();
|
|
|
+ }
|
|
|
+ } catch(IOException io) {
|
|
|
}
|
|
|
- }
|
|
|
- } catch(InterruptedException ie) {
|
|
|
- ie.printStackTrace();
|
|
|
+ waitOutputThreads();
|
|
|
+ try {
|
|
|
+ if(optSideEffect_) {
|
|
|
+ logprintln("closing " + sideEffectPath_);
|
|
|
+ sideEffectOut_.close();
|
|
|
+ logprintln("closed " + sideEffectPath_);
|
|
|
+ }
|
|
|
+ } catch(IOException io) {
|
|
|
+ io.printStackTrace();
|
|
|
+ }
|
|
|
+ sim.destroy();
|
|
|
+ } catch(RuntimeException e) {
|
|
|
+ e.printStackTrace(log_);
|
|
|
+ throw e;
|
|
|
}
|
|
|
- }
|
|
|
- sim.destroy();
|
|
|
- } catch(RuntimeException e) {
|
|
|
- e.printStackTrace(log_);
|
|
|
- throw e;
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
void maybeLogRecord()
|
|
@@ -543,7 +537,30 @@ public abstract class PipeMapRed {
|
|
|
return msg;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
+ /**
|
|
|
+ * Write a writable value to the output stream using UTF-8 encoding
|
|
|
+ * @param value output value
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ void write(Writable value) throws IOException {
|
|
|
+ byte[] bval;
|
|
|
+ int valSize;
|
|
|
+ if(value instanceof BytesWritable) {
|
|
|
+ BytesWritable val = (BytesWritable)value;
|
|
|
+ bval = val.get();
|
|
|
+ valSize = val.getSize();
|
|
|
+ } else if(value instanceof Text){
|
|
|
+ Text val = (Text)value;
|
|
|
+ bval = val.getBytes();
|
|
|
+ valSize = val.getLength();
|
|
|
+ } else {
|
|
|
+ String sval = value.toString();
|
|
|
+ bval = sval.getBytes("UTF-8");
|
|
|
+ valSize = bval.length;
|
|
|
+ }
|
|
|
+ clientOut_.write(bval, 0, valSize);
|
|
|
+ }
|
|
|
+
|
|
|
long startTime_;
|
|
|
long numRecRead_ = 0;
|
|
|
long numRecWritten_ = 0;
|
|
@@ -555,6 +572,7 @@ public abstract class PipeMapRed {
|
|
|
int keyCols_;
|
|
|
final static int ALL_COLS = Integer.MAX_VALUE;
|
|
|
|
|
|
+ long joinDelay_;
|
|
|
JobConf job_;
|
|
|
|
|
|
// generic MapRed parameters passed on by hadoopStreaming
|
|
@@ -565,16 +583,13 @@ public abstract class PipeMapRed {
|
|
|
boolean debug_;
|
|
|
|
|
|
Process sim;
|
|
|
- Object doneLock_;
|
|
|
MROutputThread outThread_;
|
|
|
+ String jobLog_;
|
|
|
MRErrorThread errThread_;
|
|
|
- boolean outputDone_;
|
|
|
- boolean errorDone_;
|
|
|
DataOutputStream clientOut_;
|
|
|
DataInputStream clientErr_;
|
|
|
DataInputStream clientIn_;
|
|
|
|
|
|
- String jobLog_;
|
|
|
// set in PipeMapper/PipeReducer subclasses
|
|
|
String mapredKey_;
|
|
|
int numExceptions_;
|