|
@@ -35,11 +35,16 @@ import org.apache.hadoop.mapred.Reporter;
|
|
|
import org.apache.hadoop.mapred.OutputCollector;
|
|
|
|
|
|
import org.apache.hadoop.io.UTF8;
|
|
|
+import org.apache.hadoop.io.BytesWritable;
|
|
|
import org.apache.hadoop.io.LongWritable;
|
|
|
import org.apache.hadoop.io.WritableComparator;
|
|
|
import org.apache.hadoop.io.WritableComparable;
|
|
|
-import org.apache.hadoop.io.Writable;
|
|
|
import org.apache.hadoop.io.LongWritable;
|
|
|
+import org.apache.hadoop.io.Writable;
|
|
|
+
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
|
|
|
/** Shared functionality for PipeMapper, PipeReducer.
|
|
|
* @author Michel Tourn
|
|
@@ -56,7 +61,13 @@ public abstract class PipeMapRed {
|
|
|
*/
|
|
|
abstract String getKeyColPropName();
|
|
|
|
|
|
-
|
|
|
+ /** Write output as side-effect files rather than as map outputs.
|
|
|
+ This is useful to do "Map" tasks rather than "MapReduce" tasks. */
|
|
|
+ boolean getUseSideEffect()
|
|
|
+ {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* @returns how many TABS before the end of the key part
|
|
|
* usually: 1 or "ALL"
|
|
@@ -154,7 +165,10 @@ public abstract class PipeMapRed {
|
|
|
String argv = getPipeCommand(job);
|
|
|
keyCols_ = getKeyColsFromPipeCommand(argv);
|
|
|
|
|
|
- doPipe_ = (argv != null);
|
|
|
+ job_ = job;
|
|
|
+
|
|
|
+ // Currently: null is identity reduce. REDUCE_NONE is no-map-outputs.
|
|
|
+ doPipe_ = (argv != null) && !StreamJob.REDUCE_NONE.equals(argv);
|
|
|
if(!doPipe_) return;
|
|
|
|
|
|
setStreamJobDetails(job);
|
|
@@ -169,29 +183,46 @@ public abstract class PipeMapRed {
|
|
|
new MustangFile(prog).setExecutable(true, true);
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ if(job_.getInputValueClass().equals(BytesWritable.class)) {
|
|
|
+ // TODO expose as separate config:
|
|
|
+ // job or semistandard inputformat property
|
|
|
+ optUseKey_ = false;
|
|
|
+ }
|
|
|
+
|
|
|
+ optSideEffect_ = getUseSideEffect();
|
|
|
+
|
|
|
+ if(optSideEffect_) {
|
|
|
+ String fileName = job_.get("mapred.task.id");
|
|
|
+ sideEffectPath_ = new Path(job_.getOutputPath(), fileName);
|
|
|
+ FileSystem fs = FileSystem.get(job_);
|
|
|
+ sideEffectOut_ = fs.create(sideEffectPath_);
|
|
|
+ }
|
|
|
+
|
|
|
// argvSplit[0]:
|
|
|
// An absolute path should be a preexisting valid path on all TaskTrackers
|
|
|
// A relative path should match in the unjarred Job data
|
|
|
// In this case, force an absolute path to make sure exec finds it.
|
|
|
argvSplit[0] = new File(argvSplit[0]).getAbsolutePath();
|
|
|
- log_.println("PipeMapRed exec " + Arrays.asList(argvSplit));
|
|
|
-
|
|
|
+ logprintln("PipeMapRed exec " + Arrays.asList(argvSplit));
|
|
|
+ logprintln("sideEffectPath_=" + sideEffectPath_);
|
|
|
|
|
|
Environment childEnv = (Environment)StreamUtil.env().clone();
|
|
|
- addEnvironment(childEnv, job.get("stream.addenvironment"));
|
|
|
+ addEnvironment(childEnv, job_.get("stream.addenvironment"));
|
|
|
sim = Runtime.getRuntime().exec(argvSplit, childEnv.toArray());
|
|
|
|
|
|
/* // This way required jdk1.5
|
|
|
ProcessBuilder processBuilder = new ProcessBuilder(argvSplit);
|
|
|
Map<String, String> env = processBuilder.environment();
|
|
|
- addEnvironment(env, job.get("stream.addenvironment"));
|
|
|
+ addEnvironment(env, job_.get("stream.addenvironment"));
|
|
|
sim = processBuilder.start();
|
|
|
*/
|
|
|
|
|
|
clientOut_ = new DataOutputStream(new BufferedOutputStream(sim.getOutputStream()));
|
|
|
- clientIn_ = new BufferedReader(new InputStreamReader(sim.getInputStream()));
|
|
|
- clientErr_ = new DataInputStream(sim.getErrorStream());
|
|
|
+ clientIn_ = new DataInputStream(new BufferedInputStream(sim.getInputStream()));
|
|
|
+ clientErr_ = new DataInputStream(new BufferedInputStream(sim.getErrorStream()));
|
|
|
doneLock_ = new Object();
|
|
|
+ startTime_ = System.currentTimeMillis();
|
|
|
|
|
|
} catch(Exception e) {
|
|
|
e.printStackTrace();
|
|
@@ -205,7 +236,7 @@ public abstract class PipeMapRed {
|
|
|
String s = job.get("stream.minRecWrittenToEnableSkip_");
|
|
|
if(s != null) {
|
|
|
minRecWrittenToEnableSkip_ = Long.parseLong(s);
|
|
|
- log_.println("JobConf set minRecWrittenToEnableSkip_ =" + minRecWrittenToEnableSkip_);
|
|
|
+ logprintln("JobConf set minRecWrittenToEnableSkip_ =" + minRecWrittenToEnableSkip_);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -222,6 +253,22 @@ public abstract class PipeMapRed {
|
|
|
|
|
|
}
|
|
|
|
|
|
+ void logprintln(String s)
|
|
|
+ {
|
|
|
+ if(log_ != null) {
|
|
|
+ log_.println(s);
|
|
|
+ } else {
|
|
|
+ System.err.println(s); // or LOG.info()
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void logflush()
|
|
|
+ {
|
|
|
+ if(log_ != null) {
|
|
|
+ log_.flush();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
void addEnvironment(Properties env, String nameVals)
|
|
|
{
|
|
|
// encoding "a=b c=d" from StreamJob
|
|
@@ -230,9 +277,9 @@ public abstract class PipeMapRed {
|
|
|
for(int i=0; i<nv.length; i++) {
|
|
|
String[] pair = nv[i].split("=", 2);
|
|
|
if(pair.length != 2) {
|
|
|
- log_.println("Skip ev entry:" + nv[i]);
|
|
|
+ logprintln("Skip ev entry:" + nv[i]);
|
|
|
} else {
|
|
|
- log_.println("Add ev entry:" + nv[i]);
|
|
|
+ logprintln("Add ev entry:" + nv[i]);
|
|
|
env.put(pair[0], pair[1]);
|
|
|
}
|
|
|
}
|
|
@@ -293,18 +340,23 @@ public abstract class PipeMapRed {
|
|
|
// 3/4 Tool to Hadoop
|
|
|
while((answer = clientIn_.readLine()) != null) {
|
|
|
// 4/4 Hadoop out
|
|
|
- splitKeyVal(answer, key, val);
|
|
|
- output.collect(key, val);
|
|
|
- numRecWritten_++;
|
|
|
- if(numRecWritten_ % 100 == 0) {
|
|
|
- log_.println(numRecRead_+"/"+numRecWritten_);
|
|
|
- log_.flush();
|
|
|
+ if(optSideEffect_) {
|
|
|
+ sideEffectOut_.write(answer.getBytes());
|
|
|
+ sideEffectOut_.write('\n');
|
|
|
+ } else {
|
|
|
+ splitKeyVal(answer, key, val);
|
|
|
+ output.collect(key, val);
|
|
|
+ numRecWritten_++;
|
|
|
+ if(numRecWritten_ % 100 == 0) {
|
|
|
+ logprintln(numRecRead_+"/"+numRecWritten_);
|
|
|
+ logflush();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
} catch(IOException io) {
|
|
|
io.printStackTrace(log_);
|
|
|
}
|
|
|
- log_.println("MROutputThread done");
|
|
|
+ logprintln("MROutputThread done");
|
|
|
} finally {
|
|
|
outputDone_ = true;
|
|
|
synchronized(doneLock_) {
|
|
@@ -332,7 +384,7 @@ public abstract class PipeMapRed {
|
|
|
int bucket = 100;
|
|
|
while((line=clientErr_.readLine()) != null) {
|
|
|
num++;
|
|
|
- log_.println(line);
|
|
|
+ logprintln(line);
|
|
|
if(num < 10) {
|
|
|
String hline = "MRErr: " + line;
|
|
|
System.err.println(hline);
|
|
@@ -353,9 +405,18 @@ public abstract class PipeMapRed {
|
|
|
|
|
|
public void mapRedFinished()
|
|
|
{
|
|
|
- log_.println("mapRedFinished");
|
|
|
+ logprintln("mapRedFinished");
|
|
|
try {
|
|
|
if(!doPipe_) return;
|
|
|
+ try {
|
|
|
+ if(optSideEffect_) {
|
|
|
+ logprintln("closing " + sideEffectPath_);
|
|
|
+ sideEffectOut_.close();
|
|
|
+ logprintln("closed " + sideEffectPath_);
|
|
|
+ }
|
|
|
+ } catch(IOException io) {
|
|
|
+ io.printStackTrace();
|
|
|
+ }
|
|
|
try {
|
|
|
if(clientOut_ != null) {
|
|
|
clientOut_.close();
|
|
@@ -385,10 +446,12 @@ public abstract class PipeMapRed {
|
|
|
void maybeLogRecord()
|
|
|
{
|
|
|
if(numRecRead_ >= nextRecReadLog_) {
|
|
|
- log_.println(numRecInfo());
|
|
|
- log_.flush();
|
|
|
- nextRecReadLog_ *= 10;
|
|
|
- //nextRecReadLog_ += 1000;
|
|
|
+ String info = numRecInfo();
|
|
|
+ logprintln(info);
|
|
|
+ logflush();
|
|
|
+ System.err.println(info);
|
|
|
+ //nextRecReadLog_ *= 10;
|
|
|
+ nextRecReadLog_ += 100;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -417,7 +480,15 @@ public abstract class PipeMapRed {
|
|
|
|
|
|
String numRecInfo()
|
|
|
{
|
|
|
- return "R/W/S=" + numRecRead_+"/"+numRecWritten_+"/"+numRecSkipped_;
|
|
|
+ long elapsed = (System.currentTimeMillis() - startTime_)/1000;
|
|
|
+ long total = numRecRead_+numRecWritten_+numRecSkipped_;
|
|
|
+ return "R/W/S=" + numRecRead_+"/"+numRecWritten_+"/"+numRecSkipped_
|
|
|
+ + " in:" + safeDiv(numRecRead_, elapsed) + " [rec/s]"
|
|
|
+ + " out:" + safeDiv(numRecWritten_, elapsed) + " [rec/s]";
|
|
|
+ }
|
|
|
+ String safeDiv(long n, long d)
|
|
|
+ {
|
|
|
+ return (d==0) ? "NA" : ""+n/d + "=" + n + "/" + d;
|
|
|
}
|
|
|
String logFailure(Exception e)
|
|
|
{
|
|
@@ -425,15 +496,15 @@ public abstract class PipeMapRed {
|
|
|
PrintWriter pw = new PrintWriter(sw);
|
|
|
e.printStackTrace(pw);
|
|
|
String msg = "log:" + jobLog_ + "\n" + getContext() + sw + "\n";
|
|
|
- log_.println(msg);
|
|
|
+ logprintln(msg);
|
|
|
return msg;
|
|
|
}
|
|
|
|
|
|
|
|
|
+ long startTime_;
|
|
|
long numRecRead_ = 0;
|
|
|
long numRecWritten_ = 0;
|
|
|
long numRecSkipped_ = 0;
|
|
|
-
|
|
|
long nextRecReadLog_ = 1;
|
|
|
|
|
|
long minRecWrittenToEnableSkip_ = Long.MAX_VALUE;
|
|
@@ -441,6 +512,8 @@ public abstract class PipeMapRed {
|
|
|
int keyCols_;
|
|
|
final static int ALL_COLS = Integer.MAX_VALUE;
|
|
|
|
|
|
+ JobConf job_;
|
|
|
+
|
|
|
// generic MapRed parameters passed on by hadoopStreaming
|
|
|
String taskid_;
|
|
|
int reportPortPlusOne_;
|
|
@@ -455,24 +528,31 @@ public abstract class PipeMapRed {
|
|
|
boolean errorDone_;
|
|
|
DataOutputStream clientOut_;
|
|
|
DataInputStream clientErr_;
|
|
|
- BufferedReader clientIn_;
|
|
|
+ DataInputStream clientIn_;
|
|
|
|
|
|
String jobLog_;
|
|
|
// set in PipeMapper/PipeReducer subclasses
|
|
|
String mapredKey_;
|
|
|
int numExceptions_;
|
|
|
|
|
|
+ boolean optUseKey_ = true;
|
|
|
+
|
|
|
+ boolean optSideEffect_;
|
|
|
+ Path sideEffectPath_;
|
|
|
+ FSDataOutputStream sideEffectOut_;
|
|
|
+
|
|
|
String LOGNAME;
|
|
|
PrintStream log_;
|
|
|
|
|
|
+ /* curr. going to stderr so that it is preserved
|
|
|
{ // instance initializer
|
|
|
try {
|
|
|
int id = (int)((System.currentTimeMillis()/2000) % 10);
|
|
|
String sid = id+ "." + StreamUtil.env().get("USER");
|
|
|
LOGNAME = "/tmp/PipeMapRed." + sid + ".log";
|
|
|
log_ = new PrintStream(new FileOutputStream(LOGNAME));
|
|
|
- log_.println(new java.util.Date());
|
|
|
- log_.flush();
|
|
|
+ logprintln(new java.util.Date());
|
|
|
+ logflush();
|
|
|
} catch(IOException io) {
|
|
|
System.err.println("LOGNAME=" + LOGNAME);
|
|
|
io.printStackTrace();
|
|
@@ -482,5 +562,5 @@ public abstract class PipeMapRed {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+ */
|
|
|
}
|