|
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.*;
|
|
|
|
|
|
import java.io.*;
|
|
|
import java.util.logging.*;
|
|
|
+import java.util.Vector;
|
|
|
|
|
|
/** Base class that runs a task in a separate process. Tasks are run in a
|
|
|
* separate process in order to isolate the map/reduce system code from bugs in
|
|
@@ -90,20 +91,56 @@ abstract class TaskRunner extends Thread {
|
|
|
classPath.append(workDir);
|
|
|
}
|
|
|
|
|
|
+ // Build exec child jmv args.
|
|
|
+ Vector vargs = new Vector(8);
|
|
|
File jvm = // use same jvm as parent
|
|
|
new File(new File(System.getProperty("java.home"), "bin"), "java");
|
|
|
-
|
|
|
- // run java
|
|
|
- runChild(new String[] {
|
|
|
- jvm.toString(),
|
|
|
- //"-Xrunhprof:cpu=samples,file="+t.getTaskId()+".prof",
|
|
|
- "-Xmx"+job.get("mapred.child.heap.size", "200m"),
|
|
|
- "-cp", classPath.toString(),
|
|
|
- TaskTracker.Child.class.getName(), // main is Child
|
|
|
- tracker.taskReportPort+"", // pass umbilical port
|
|
|
- t.getTaskId() // pass task identifier
|
|
|
- }, workDir);
|
|
|
|
|
|
+ vargs.add(jvm.toString());
|
|
|
+
|
|
|
+ // Add child java ops. Also, mapred.child.heap.size has been superceded
|
|
|
+ // by // mapred.child.java.opts. Manage case where both are present
|
|
|
+ // letting the mapred.child.heap.size win over any setting of heap size in
|
|
|
+ // mapred.child.java.opts (Emit a warning that heap.size is deprecated).
|
|
|
+ //
|
|
|
+ // The following symbols if present in mapred.child.java.opts value are
|
|
|
+ // replaced:
|
|
|
+ // + @taskid@ is interpolated with value of TaskID.
|
|
|
+ // + Replaces @port@ with mapred.task.tracker.report.port + 1.
|
|
|
+ // Other occurrences of @ will not be altered.
|
|
|
+ //
|
|
|
+ // Example with multiple arguments and substitutions, showing
|
|
|
+ // jvm GC logging, and start of a passwordless JVM JMX agent so can
|
|
|
+ // connect with jconsole and the likes to watch child memory, threads
|
|
|
+ // and get thread dumps.
|
|
|
+ //
|
|
|
+ // <name>mapred.child.optional.jvm.args</name>
|
|
|
+ // <value>-verbose:gc -Xloggc:/tmp/@taskid@.gc \
|
|
|
+ // -Dcom.sun.management.jmxremote.authenticate=false \
|
|
|
+ // -Dcom.sun.management.jmxremote.ssl=false \
|
|
|
+ // -Dcom.sun.management.jmxremote.port=@port@
|
|
|
+ // </value>
|
|
|
+ //
|
|
|
+ String javaOpts = handleDeprecatedHeapSize(
|
|
|
+ job.get("mapred.child.java.opts", "-Xmx200m"),
|
|
|
+ job.get("mapred.child.heap.size"));
|
|
|
+ javaOpts = replaceAll(javaOpts, "@taskid@", t.getTaskId());
|
|
|
+ int port = job.getInt("mapred.task.tracker.report.port", 50050) + 1;
|
|
|
+ javaOpts = replaceAll(javaOpts, "@port@", Integer.toString(port));
|
|
|
+ String [] javaOptsSplit = javaOpts.split(" ");
|
|
|
+ for (int i = 0; i < javaOptsSplit.length; i++) {
|
|
|
+ vargs.add(javaOptsSplit[i]);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Add classpath.
|
|
|
+ vargs.add("-classpath");
|
|
|
+ vargs.add(classPath.toString());
|
|
|
+ // Add main class and its arguments
|
|
|
+ vargs.add(TaskTracker.Child.class.getName()); // main of Child
|
|
|
+ vargs.add(tracker.taskReportPort + ""); // pass umbilical port
|
|
|
+ vargs.add(t.getTaskId()); // pass task identifier
|
|
|
+ // Run java
|
|
|
+ runChild((String[])vargs.toArray(new String[0]), workDir);
|
|
|
} catch (FSError e) {
|
|
|
LOG.log(Level.SEVERE, "FSError", e);
|
|
|
try {
|
|
@@ -125,6 +162,65 @@ abstract class TaskRunner extends Thread {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Handle deprecated mapred.child.heap.size.
|
|
|
+ * If present, interpolate into mapred.child.java.opts value with
|
|
|
+ * warning.
|
|
|
+ * @param javaOpts Value of mapred.child.java.opts property.
|
|
|
+ * @param heapSize Value of mapred.child.heap.size property.
|
|
|
+ * @return A <code>javaOpts</code> with <code>heapSize</code>
|
|
|
+ * interpolated if present.
|
|
|
+ */
|
|
|
+ private String handleDeprecatedHeapSize(String javaOpts,
|
|
|
+ final String heapSize) {
|
|
|
+ if (heapSize == null || heapSize.length() <= 0) {
|
|
|
+ return javaOpts;
|
|
|
+ }
|
|
|
+ final String MX = "-Xmx";
|
|
|
+ int index = javaOpts.indexOf(MX);
|
|
|
+ if (index < 0) {
|
|
|
+ javaOpts = javaOpts + " " + MX + heapSize;
|
|
|
+ } else {
|
|
|
+ int end = javaOpts.indexOf(" ", index + MX.length());
|
|
|
+ javaOpts = javaOpts.substring(0, index + MX.length()) +
|
|
|
+ heapSize + ((end < 0)? "": javaOpts.substring(end));
|
|
|
+ }
|
|
|
+ LOG.warning("mapred.child.heap.size is deprecated. Use " +
|
|
|
+ "mapred.child.heap.size instead. Meantime, interpolated " +
|
|
|
+ "child.heap.size into child.java.opt: " + javaOpts);
|
|
|
+ return javaOpts;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Replace <code>toFind</code> with <code>replacement</code>.
|
|
|
+ * When hadoop moves to JDK1.5, replace this method with
|
|
|
+ * String#replace (Of is commons-lang available, replace with
|
|
|
+ * StringUtils#replace).
|
|
|
+ * @param text String to do replacements in.
|
|
|
+ * @param toFind String to find.
|
|
|
+ * @param replacement String to replace <code>toFind</code> with.
|
|
|
+ * @return A String with all instances of <code>toFind</code>
|
|
|
+ * replaced by <code>replacement</code> (The original
|
|
|
+ * <code>text</code> is returned if <code>toFind</code> is not
|
|
|
+ * found in <code>text<code>).
|
|
|
+ */
|
|
|
+ private static String replaceAll(String text, final String toFind,
|
|
|
+ final String replacement) {
|
|
|
+ if (text == null || toFind == null || replacement == null) {
|
|
|
+ throw new IllegalArgumentException("Text " + text + " or toFind " +
|
|
|
+ toFind + " or replacement " + replacement + " are null.");
|
|
|
+ }
|
|
|
+ int offset = 0;
|
|
|
+ for (int index = text.indexOf(toFind); index >= 0;
|
|
|
+ index = text.indexOf(toFind, offset)) {
|
|
|
+ offset = index + toFind.length();
|
|
|
+ text = text.substring(0, index) + replacement +
|
|
|
+ text.substring(offset);
|
|
|
+
|
|
|
+ }
|
|
|
+ return text;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Run the child process
|
|
|
*/
|