|
@@ -21,6 +21,7 @@ import java.nio.channels.*;
|
|
|
import java.io.IOException;
|
|
|
import java.util.Date;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Iterator;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Properties;
|
|
@@ -208,6 +209,7 @@ public abstract class PipeMapRed {
|
|
|
logprintln("sideEffectPath_=" + sideEffectPath_);
|
|
|
|
|
|
Environment childEnv = (Environment)StreamUtil.env().clone();
|
|
|
+ addJobConfToEnvironment(job_, childEnv);
|
|
|
addEnvironment(childEnv, job_.get("stream.addenvironment"));
|
|
|
sim = Runtime.getRuntime().exec(argvSplit, childEnv.toArray());
|
|
|
|
|
@@ -269,6 +271,37 @@ public abstract class PipeMapRed {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ void addJobConfToEnvironment(JobConf conf, Properties env)
|
|
|
+ {
|
|
|
+ logprintln("addJobConfToEnvironment: begin");
|
|
|
+ Iterator it = conf.entries();
|
|
|
+ while(it.hasNext()) {
|
|
|
+ Map.Entry en = (Map.Entry)it.next();
|
|
|
+ String name = (String)en.getKey();
|
|
|
+ String value = (String)en.getValue();
|
|
|
+ name = safeEnvVarName(name);
|
|
|
+ envPut(env, name, value);
|
|
|
+ }
|
|
|
+ logprintln("addJobConfToEnvironment: end");
|
|
|
+ }
|
|
|
+
|
|
|
+ String safeEnvVarName(String var)
|
|
|
+ {
|
|
|
+ StringBuffer safe = new StringBuffer();
|
|
|
+ int len = var.length();
|
|
|
+ for(int i=0; i<len; i++) {
|
|
|
+ char c = var.charAt(i);
|
|
|
+ char s;
|
|
|
+ if((c >= '0' && c <= '9') || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')) {
|
|
|
+ s = c;
|
|
|
+ } else {
|
|
|
+ s = '_';
|
|
|
+ }
|
|
|
+ safe.append(s);
|
|
|
+ }
|
|
|
+ return safe.toString();
|
|
|
+ }
|
|
|
+
|
|
|
void addEnvironment(Properties env, String nameVals)
|
|
|
{
|
|
|
// encoding "a=b c=d" from StreamJob
|
|
@@ -279,12 +312,17 @@ public abstract class PipeMapRed {
|
|
|
if(pair.length != 2) {
|
|
|
logprintln("Skip ev entry:" + nv[i]);
|
|
|
} else {
|
|
|
- logprintln("Add ev entry:" + nv[i]);
|
|
|
- env.put(pair[0], pair[1]);
|
|
|
+ envPut(env, pair[0], pair[1]);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ void envPut(Properties env, String name, String value)
|
|
|
+ {
|
|
|
+ logprintln("Add ev entry:" + name + "=" + value);
|
|
|
+ env.put(name, value);
|
|
|
+ }
|
|
|
+
|
|
|
/** .. and if successful: delete the task log */
|
|
|
void appendLogToJobLog(String status)
|
|
|
{
|