Преглед на файлове

HADOOP-249. Reuse JVMs across Map-Reduce Tasks. Contributed by Devaraj Das.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@696957 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy преди 16 години
родител
ревизия
55391ff58a
променени са 23 файла, в които са добавени 1422 реда и са изтрити 199 реда
  1. 5 0
      CHANGES.txt
  2. 8 0
      conf/hadoop-default.xml
  3. 41 4
      src/core/org/apache/hadoop/filecache/DistributedCache.java
  4. 181 0
      src/mapred/org/apache/hadoop/mapred/Child.java
  5. 2 1
      src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
  6. 2 1
      src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
  7. 147 0
      src/mapred/org/apache/hadoop/mapred/JVMId.java
  8. 17 0
      src/mapred/org/apache/hadoop/mapred/JobConf.java
  9. 6 2
      src/mapred/org/apache/hadoop/mapred/JobTracker.java
  10. 372 0
      src/mapred/org/apache/hadoop/mapred/JvmManager.java
  11. 63 0
      src/mapred/org/apache/hadoop/mapred/JvmTask.java
  12. 2 1
      src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
  13. 8 3
      src/mapred/org/apache/hadoop/mapred/Task.java
  14. 9 3
      src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
  15. 137 4
      src/mapred/org/apache/hadoop/mapred/TaskLog.java
  16. 4 0
      src/mapred/org/apache/hadoop/mapred/TaskLogAppender.java
  17. 18 4
      src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
  18. 113 52
      src/mapred/org/apache/hadoop/mapred/TaskRunner.java
  19. 2 2
      src/mapred/org/apache/hadoop/mapred/TaskStatus.java
  20. 267 113
      src/mapred/org/apache/hadoop/mapred/TaskTracker.java
  21. 2 2
      src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
  22. 11 3
      src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
  23. 5 4
      src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java

+ 5 - 0
CHANGES.txt

@@ -180,6 +180,11 @@ Trunk (unreleased changes)
 
     HADOOP-4176. Implement getFileChecksum(Path) in HftpFileSystem. (szetszwo)
 
+    HADOOP-249. Reuse JVMs across Map-Reduce Tasks. 
+    Configuration changes to hadoop-default.xml:
+      add mapred.job.reuse.jvm.num.tasks
+    (Devaraj Das via acmurthy) 
+
   IMPROVEMENTS
 
     HADOOP-4106. libhdfs: add time, permission and user attribute support (part 2).

+ 8 - 0
conf/hadoop-default.xml

@@ -956,6 +956,14 @@ creations/deletions), or "all".</description>
                may be executed in parallel.</description>
 </property>
 
+<property>
+  <name>mapred.job.reuse.jvm.num.tasks</name>
+  <value>1</value>
+  <description>How many tasks to run per jvm. If set to -1, there is
+  no limit. 
+  </description>
+</property>
+
 <property>
   <name>mapred.min.split.size</name>
   <value>0</value>

+ 41 - 4
src/core/org/apache/hadoop/filecache/DistributedCache.java

@@ -151,6 +151,42 @@ public class DistributedCache {
                                    Path baseDir, FileStatus fileStatus,
                                    boolean isArchive, long confFileStamp,
                                    Path currentWorkDir) 
+  throws IOException {
+    return getLocalCache(cache, conf, baseDir, fileStatus, isArchive, 
+        confFileStamp, currentWorkDir, true);
+  }
+  /**
+   * Get the locally cached file or archive; it could either be 
+   * previously cached (and valid) or copy it from the {@link FileSystem} now.
+   * 
+   * @param cache the cache to be localized, this should be specified as 
+   * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema 
+   * or hostname:port is provided the file is assumed to be in the filesystem
+   * being used in the Configuration
+   * @param conf The Confguration file which contains the filesystem
+   * @param baseDir The base cache Dir where you wnat to localize the files/archives
+   * @param fileStatus The file status on the dfs.
+   * @param isArchive if the cache is an archive or a file. In case it is an
+   *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
+   *  be unzipped/unjarred/untarred automatically 
+   *  and the directory where the archive is unzipped/unjarred/untarred is
+   *  returned as the Path.
+   *  In case of a file, the path to the file is returned
+   * @param confFileStamp this is the hdfs file modification timestamp to verify that the 
+   * file to be cached hasn't changed since the job started
+   * @param currentWorkDir this is the directory where you would want to create symlinks 
+   * for the locally cached files/archives
+   * @param honorSymLinkConf if this is false, then the symlinks are not
+   * created even if conf says so (this is required for an optimization in task
+   * launches
+   * @return the path to directory where the archives are unjarred in case of archives,
+   * the path to the file where the file is copied locally 
+   * @throws IOException
+   */
+  public static Path getLocalCache(URI cache, Configuration conf, 
+      Path baseDir, FileStatus fileStatus,
+      boolean isArchive, long confFileStamp,
+      Path currentWorkDir, boolean honorSymLinkConf) 
   throws IOException {
     String cacheId = makeRelative(cache, conf);
     CacheStatus lcacheStatus;
@@ -162,10 +198,10 @@ public class DistributedCache {
         lcacheStatus = new CacheStatus(new Path(baseDir, new Path(cacheId)));
         cachedArchives.put(cacheId, lcacheStatus);
       }
-      
+
       synchronized (lcacheStatus) {
         localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus, 
-                                      fileStatus, isArchive, currentWorkDir);
+            fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
         lcacheStatus.refcount++;
       }
     }
@@ -180,6 +216,7 @@ public class DistributedCache {
     }
     return localizedPath;
   }
+
   
   /**
    * Get the locally cached file or archive; it could either be 
@@ -292,9 +329,9 @@ public class DistributedCache {
                                     CacheStatus cacheStatus,
                                     FileStatus fileStatus,
                                     boolean isArchive, 
-                                    Path currentWorkDir) 
+                                    Path currentWorkDir,boolean honorSymLinkConf) 
   throws IOException {
-    boolean doSymlink = getSymlink(conf);
+    boolean doSymlink = honorSymLinkConf && getSymlink(conf);
     if(cache.getFragment() == null) {
     	doSymlink = false;
     }

+ 181 - 0
src/mapred/org/apache/hadoop/mapred/Child.java

@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapred.JvmTask;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.jvm.JvmMetrics;
+import org.apache.log4j.LogManager;
+
+/** 
+ * The main() for child processes. 
+ */
+
+public class Child {
+
+  public static final Log LOG =
+    LogFactory.getLog(TaskTracker.class);
+
+  static volatile TaskAttemptID taskid;
+
+  public static void main(String[] args) throws Throwable {
+    LOG.debug("Child starting");
+
+    JobConf defaultConf = new JobConf();
+    String host = args[0];
+    int port = Integer.parseInt(args[1]);
+    InetSocketAddress address = new InetSocketAddress(host, port);
+    final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
+    taskid = firstTaskid;
+    int jvmIdInt = Integer.parseInt(args[3]);
+    JVMId jvmId = new JVMId(taskid.getJobID(),taskid.isMap(),jvmIdInt);
+    final int MAX_SLEEP_COUNT = 600; //max idle time of 5 minutes
+    int sleepCount = 0;
+    TaskUmbilicalProtocol umbilical =
+      (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
+          TaskUmbilicalProtocol.versionID,
+          address,
+          defaultConf);
+    int numTasksToExecute = -1; //-1 signifies "no limit"
+    int numTasksExecuted = 0;
+    Thread t = new Thread() {
+      public void run() {
+        //every so often wake up and syncLogs so that we can track
+        //logs of the currently running task
+        while (true) {
+          try {
+            Thread.sleep(5000);
+            TaskLog.syncLogs(firstTaskid, taskid);
+          } catch (InterruptedException ie) {
+          } catch (IOException iee) {
+            LOG.error("Error in syncLogs: " + iee);
+            System.exit(-1);
+          }
+        }
+      }
+    };
+    t.setName("Thread for syncLogs");
+    t.setDaemon(true);
+    t.start();
+    //for the memory management, a PID file is written and the PID file
+    //is written once per JVM. We simply symlink the file on a per task
+    //basis later (see below). Long term, we should change the Memory
+    //manager to use JVMId instead of TaskAttemptId
+    Path srcPidPath = null;
+    Path dstPidPath = null;
+    try {
+      while (true) {
+        JvmTask myTask = umbilical.getTask(jvmId, firstTaskid);
+        if (myTask.shouldDie()) {
+          break;
+        } else {
+          if (myTask.getTask() == null) {
+            if (sleepCount == MAX_SLEEP_COUNT) {
+              System.exit(0);
+            }
+            sleepCount++;
+            Thread.sleep(500);
+            continue;
+          }
+          sleepCount = 0; //got a task. reset the sleepCount
+        }
+        Task task = myTask.getTask();
+        taskid = task.getTaskID();
+        
+        //create the index file so that the log files 
+        //are viewable immediately
+        TaskLog.syncLogs(firstTaskid, taskid);
+        JobConf job = new JobConf(task.getJobFile());
+        if (job.getBoolean("task.memory.mgmt.enabled", false)) {
+          if (srcPidPath == null) {
+            srcPidPath = TaskMemoryManagerThread.getPidFilePath(firstTaskid,
+                                                              job);
+          }
+          //since the JVM is running multiple tasks potentially, we need
+          //to do symlink stuff only for the subsequent tasks
+          if (!taskid.equals(firstTaskid)) {
+            dstPidPath = new Path(srcPidPath.getParent(), taskid.toString());
+            FileUtil.symLink(srcPidPath.toUri().getPath(), 
+                dstPidPath.toUri().getPath());
+          }
+        }
+        //setupWorkDir actually sets up the symlinks for the distributed
+        //cache. After a task exits we wipe the workdir clean, and hence
+        //the symlinks have to be rebuilt.
+        TaskRunner.setupWorkDir(job);
+
+        numTasksToExecute = job.getNumTasksToExecutePerJvm();
+        assert(numTasksToExecute != 0);
+        TaskLog.cleanup(job.getInt("mapred.userlog.retain.hours", 24));
+
+        task.setConf(job);
+
+        defaultConf.addResource(new Path(task.getJobFile()));
+
+        // Initiate Java VM metrics
+        JvmMetrics.init(task.getPhase().toString(), job.getSessionId());
+        // use job-specified working directory
+        FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
+        try {
+          task.run(job, umbilical);             // run the task
+        } finally {
+          TaskLog.syncLogs(firstTaskid, taskid);
+          if (!taskid.equals(firstTaskid) && 
+              job.getBoolean("task.memory.mgmt.enabled", false)) {
+            new File(dstPidPath.toUri().getPath()).delete();
+          }
+        }
+        if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {
+          break;
+        }
+      }
+    } catch (FSError e) {
+      LOG.fatal("FSError from child", e);
+      umbilical.fsError(taskid, e.getMessage());
+    } catch (Throwable throwable) {
+      LOG.warn("Error running child", throwable);
+      // Report back any failures, for diagnostic purposes
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      throwable.printStackTrace(new PrintStream(baos));
+      umbilical.reportDiagnosticInfo(taskid, baos.toString());
+    } finally {
+      RPC.stopProxy(umbilical);
+      MetricsContext metricsContext = MetricsUtil.getContext("mapred");
+      metricsContext.close();
+      // Shutting down log4j of the child-vm... 
+      // This assumes that on return from Task.run() 
+      // there is no more logging done.
+      LogManager.shutdown();
+    }
+  }
+}

+ 2 - 1
src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java

@@ -47,8 +47,9 @@ interface InterTrackerProtocol extends VersionedProtocol {
    * Version 15: Changed format of Task and TaskStatus for HADOOP-153
    * Version 16: adds ResourceStatus to TaskTrackerStatus for HADOOP-3759
    * Version 17: Changed format of Task and TaskStatus for HADOOP-3150
+   * Version 18: Changed status message due to changes in TaskStatus
    */
-  public static final long versionID = 17L;
+  public static final long versionID = 18L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

+ 2 - 1
src/mapred/org/apache/hadoop/mapred/IsolationRunner.java

@@ -35,6 +35,7 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JvmTask;
 
 public class IsolationRunner {
   private static final Log LOG = 
@@ -58,7 +59,7 @@ public class IsolationRunner {
       LOG.info("Task " + taskId + " reporting shuffle error: " + message);
     }
 
-    public Task getTask(TaskAttemptID taskid) throws IOException {
+    public JvmTask getTask(JVMId jvmId, TaskAttemptID taskId) throws IOException {
       return null;
     }
 

+ 147 - 0
src/mapred/org/apache/hadoop/mapred/JVMId.java

@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.text.NumberFormat;
+
+public class JVMId extends ID {
+  boolean isMap;
+  JobID jobId;
+  private static final String JVM = "jvm";
+  private static char UNDERSCORE = '_';  
+  private static NumberFormat idFormat = NumberFormat.getInstance();
+  static {
+    idFormat.setGroupingUsed(false);
+    idFormat.setMinimumIntegerDigits(6);
+  }
+  
+  public JVMId(JobID jobId, boolean isMap, int id) {
+    super(id);
+    this.isMap = isMap;
+    this.jobId = jobId;
+  }
+  
+  public JVMId (String jtIdentifier, int jobId, boolean isMap, int id) {
+    this(new JobID(jtIdentifier, jobId), isMap, id);
+  }
+    
+  private JVMId() { }
+  
+  public boolean isMapJVM() {
+    return isMap;
+  }
+  public JobID getJobId() {
+    return jobId;
+  }
+  public boolean equals(Object o) {
+    if(o == null)
+      return false;
+    if(o.getClass().equals(JVMId.class)) {
+      JVMId that = (JVMId)o;
+      return this.id==that.id
+        && this.isMap == that.isMap
+        && this.jobId.equals(that.jobId);
+    }
+    else return false;
+  }
+
+  /**Compare TaskInProgressIds by first jobIds, then by tip numbers. Reduces are 
+   * defined as greater then maps.*/
+  @Override
+  public int compareTo(ID o) {
+    JVMId that = (JVMId)o;
+    int jobComp = this.jobId.compareTo(that.jobId);
+    if(jobComp == 0) {
+      if(this.isMap == that.isMap) {
+        return this.id - that.id;
+      }
+      else return this.isMap ? -1 : 1;
+    }
+    else return jobComp;
+  }
+  
+  @Override
+  public String toString() { 
+    StringBuilder builder = new StringBuilder();
+    return builder.append(JVM).append(UNDERSCORE)
+      .append(toStringWOPrefix()).toString();
+  }
+
+  StringBuilder toStringWOPrefix() {
+    StringBuilder builder = new StringBuilder();
+    builder.append(jobId.toStringWOPrefix())
+      .append(isMap ? "_m_" : "_r_");
+    return builder.append(idFormat.format(id));
+  }
+  
+  @Override
+  public int hashCode() {
+    return toStringWOPrefix().toString().hashCode();
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    this.jobId = JobID.read(in);
+    this.isMap = in.readBoolean();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    jobId.write(out);
+    out.writeBoolean(isMap);
+  }
+  
+  public static JVMId read(DataInput in) throws IOException {
+    JVMId jvmId = new JVMId();
+    jvmId.readFields(in);
+    return jvmId;
+  }
+  
+  /** Construct a JVMId object from given string 
+   * @return constructed JVMId object or null if the given String is null
+   * @throws IllegalArgumentException if the given string is malformed
+   */
+  public static JVMId forName(String str) 
+    throws IllegalArgumentException {
+    if(str == null)
+      return null;
+    try {
+      String[] parts = str.split("_");
+      if(parts.length == 5) {
+        if(parts[0].equals(JVM)) {
+          boolean isMap = false;
+          if(parts[3].equals("m")) isMap = true;
+          else if(parts[3].equals("r")) isMap = false;
+          else throw new Exception();
+          return new JVMId(parts[1], Integer.parseInt(parts[2]),
+              isMap, Integer.parseInt(parts[4]));
+        }
+      }
+    }catch (Exception ex) {//fall below
+    }
+    throw new IllegalArgumentException("TaskId string : " + str 
+        + " is not properly formed");
+  }
+
+}

+ 17 - 0
src/mapred/org/apache/hadoop/mapred/JobConf.java

@@ -325,6 +325,23 @@ public class JobConf extends Configuration {
     }
   }
   
+  /**
+   * Sets the number of tasks that a spawned task JVM should run
+   * before it exits
+   * @param numTasks the number of tasks to execute; defaults to 1;
+   * -1 signifies no limit
+   */
+  public void setNumTasksToExecutePerJvm(int numTasks) {
+    setInt("mapred.job.reuse.jvm.num.tasks", numTasks);
+  }
+  
+  /**
+   * Get the number of tasks that a spawned JVM should execute
+   */
+  public int getNumTasksToExecutePerJvm() {
+    return getInt("mapred.job.reuse.jvm.num.tasks", 1);
+  }
+  
   /**
    * Get the {@link InputFormat} implementation for the map-reduce job,
    * defaults to {@link TextInputFormat} if not specified explicity.

+ 6 - 2
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -1415,7 +1415,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     for (TaskInProgress tip : job.getMapTasks()) {
       for (TaskStatus taskStatus : tip.getTaskStatuses()) {
         if (taskStatus.getRunState() != TaskStatus.State.RUNNING && 
-            taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
+            taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+            taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
+            taskStatus.getRunState() != TaskStatus.State.INITIALIZED) {
           markCompletedTaskAttempt(taskStatus.getTaskTracker(), 
                                    taskStatus.getTaskID());
         }
@@ -1424,7 +1426,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     for (TaskInProgress tip : job.getReduceTasks()) {
       for (TaskStatus taskStatus : tip.getTaskStatuses()) {
         if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
-            taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
+            taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+            taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
+            taskStatus.getRunState() != TaskStatus.State.INITIALIZED) {
           markCompletedTaskAttempt(taskStatus.getTaskTracker(), 
                                    taskStatus.getTaskID());
         }

+ 372 - 0
src/mapred/org/apache/hadoop/mapred/JvmManager.java

@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Vector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+class JvmManager {
+
+  public static final Log LOG =
+    LogFactory.getLog("org.apache.hadoop.mapred.JvmManager");
+
+  JvmManagerForType mapJvmManager;
+
+  JvmManagerForType reduceJvmManager;
+  
+  TaskTracker tracker;
+
+  public JvmEnv constructJvmEnv(List<String> setup, Vector<String>vargs,
+      File stdout,File stderr,long logSize, File workDir, 
+      Map<String,String> env, String pidFile, JobConf conf) {
+    return new JvmEnv(setup,vargs,stdout,stderr,logSize,workDir,env,pidFile,conf);
+  }
+  
+  public JvmManager(TaskTracker tracker) {
+    mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(), 
+        true, tracker);
+    reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(),
+        false, tracker);
+    this.tracker = tracker;
+  }
+  
+  public void stop() {
+    mapJvmManager.stop();
+    reduceJvmManager.stop();
+  }
+
+  public boolean isJvmKnown(JVMId jvmId) {
+    if (jvmId.isMapJVM()) {
+      return mapJvmManager.isJvmknown(jvmId);
+    } else {
+      return reduceJvmManager.isJvmknown(jvmId);
+    }
+  }
+
+  public void launchJvm(JobID jobId, boolean isMap, JvmEnv env) {
+    if (isMap) {
+      mapJvmManager.reapJvm(env, jobId, tracker);
+    } else {
+      reduceJvmManager.reapJvm(env, jobId, tracker);
+    }
+  }
+
+  public void setRunningTaskForJvm(JVMId jvmId, TaskRunner t) {
+    if (jvmId.isMapJVM()) {
+      mapJvmManager.setRunningTaskForJvm(jvmId, t);
+    } else {
+      reduceJvmManager.setRunningTaskForJvm(jvmId, t);
+    }
+  }
+
+  public void taskFinished(TaskRunner tr) {
+    if (tr.getTask().isMapTask()) {
+      mapJvmManager.taskFinished(tr);
+    } else {
+      reduceJvmManager.taskFinished(tr);
+    }
+  }
+
+  public void taskKilled(TaskRunner tr) {
+    if (tr.getTask().isMapTask()) {
+      mapJvmManager.taskKilled(tr);
+    } else {
+      reduceJvmManager.taskKilled(tr);
+    }
+  }
+
+  public void killJvm(JVMId jvmId) {
+    if (jvmId.isMap) {
+      mapJvmManager.killJvm(jvmId);
+    } else {
+      reduceJvmManager.killJvm(jvmId);
+    }
+  }  
+
+  private static class JvmManagerForType {
+    //Mapping from the JVM IDs to running Tasks
+    Map <JVMId,TaskRunner> jvmToRunningTask = 
+      new HashMap<JVMId, TaskRunner>();
+    //Mapping from the tasks to JVM IDs
+    Map <TaskRunner,JVMId> runningTaskToJvm = 
+      new HashMap<TaskRunner, JVMId>();
+    //Mapping from the JVM IDs to Reduce JVM processes
+    Map <JVMId, JvmRunner> jvmIdToRunner = 
+      new HashMap<JVMId, JvmRunner>();
+    int maxJvms;
+    boolean isMap;
+    
+    Random rand = new Random(System.currentTimeMillis());
+    TaskTracker tracker;
+
+    public JvmManagerForType(int maxJvms, boolean isMap, TaskTracker tracker) {
+      this.maxJvms = maxJvms;
+      this.isMap = isMap;
+      this.tracker = tracker;
+    }
+
+    synchronized public void setRunningTaskForJvm(JVMId jvmId, 
+        TaskRunner t) {
+      if (t == null) { 
+        //signifies the JVM asked for a task and it 
+        //was not given anything.
+        jvmIdToRunner.get(jvmId).setBusy(false);
+        return;
+      }
+      jvmToRunningTask.put(jvmId, t);
+      runningTaskToJvm.put(t,jvmId);
+      jvmIdToRunner.get(jvmId).setBusy(true);
+    }
+    
+    synchronized public boolean isJvmknown(JVMId jvmId) {
+      return jvmIdToRunner.containsKey(jvmId);
+    }
+
+    synchronized public void taskFinished(TaskRunner tr) {
+      JVMId jvmId = runningTaskToJvm.remove(tr);
+      if (jvmId != null) {
+        jvmToRunningTask.remove(jvmId);
+        JvmRunner jvmRunner;
+        if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) {
+          jvmRunner.taskRan();
+        }
+      }
+    }
+
+    synchronized public void taskKilled(TaskRunner tr) {
+      JVMId jvmId = runningTaskToJvm.remove(tr);
+      if (jvmId != null) {
+        jvmToRunningTask.remove(jvmId);
+        killJvm(jvmId);
+      }
+    }
+
+    synchronized public void killJvm(JVMId jvmId) {
+      JvmRunner jvmRunner;
+      if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) {
+        jvmRunner.kill();
+      }
+    }
+    
+    synchronized public void stop() {
+      for (JvmRunner jvm : jvmIdToRunner.values()) {
+        jvm.kill();
+      }
+    }
+    
+    synchronized private void removeJvm(JVMId jvmId) {
+      jvmIdToRunner.remove(jvmId);
+    }
+    private synchronized void reapJvm( 
+        JvmEnv env,
+        JobID jobId, TaskTracker tracker) {
+      boolean spawnNewJvm = false;
+      //Check whether there is a free slot to start a new JVM.
+      //,or, Kill a (idle) JVM and launch a new one
+      int numJvmsSpawned = jvmIdToRunner.size();
+
+      if (numJvmsSpawned >= maxJvms) {
+        //go through the list of JVMs for all jobs.
+        //for each JVM see whether it is currently running something and
+        //if not, then kill the JVM
+        Iterator<Map.Entry<JVMId, JvmRunner>> jvmIter = 
+          jvmIdToRunner.entrySet().iterator();
+        
+        while (jvmIter.hasNext()) {
+          JvmRunner jvmRunner = jvmIter.next().getValue();
+          JobID jId = jvmRunner.jvmId.getJobId();
+          //Cases when a JVM is killed: 
+          // (1) the JVM under consideration belongs to the same job 
+          //     (passed in the argument). In this case, kill only when
+          //     the JVM ran all the tasks it was scheduled to run (in terms
+          //     of count).
+          // (2) the JVM under consideration belongs to a different job and is
+          //     currently not busy
+          //             
+          if ((jId.equals(jobId) && jvmRunner.ranAll()) ||
+              (!jId.equals(jobId) && !jvmRunner.isBusy())) {
+            jvmIter.remove();
+            jvmRunner.kill();
+            spawnNewJvm = true;
+            break;
+          }
+        }
+      } else {
+        spawnNewJvm = true;
+      }
+
+      if (spawnNewJvm) {
+        spawnNewJvm(jobId, env, tracker);
+      } else {
+        LOG.info("No new JVM spawned for jobId: " + jobId);
+      }
+    }
+
+    private void spawnNewJvm(JobID jobId, JvmEnv env, TaskTracker tracker) {
+      JvmRunner jvmRunner = new JvmRunner(env,jobId);
+      jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
+      //spawn the JVM in a new thread. Note that there will be very little
+      //extra overhead of launching the new thread for a new JVM since
+      //most of the cost is involved in launching the process. Moreover,
+      //since we are going to be using the JVM for running many tasks,
+      //the thread launch cost becomes trivial when amortized over all
+      //tasks. Doing it this way also keeps code simple.
+      jvmRunner.setDaemon(true);
+      jvmRunner.setName("JVM Runner " + jvmRunner.jvmId + " spawned.");
+      if (tracker.isTaskMemoryManagerEnabled()) {
+        tracker.getTaskMemoryManager().addTask(
+            TaskAttemptID.forName(env.conf.get("mapred.task.id")),
+            tracker.getMemoryForTask(env.conf));
+      }
+      LOG.info(jvmRunner.getName());
+      jvmRunner.start();
+    }
+    synchronized private void updateOnJvmExit(JVMId jvmId, 
+        int exitCode, boolean killed) {
+      removeJvm(jvmId);
+      TaskRunner t = jvmToRunningTask.remove(jvmId);
+
+      if (t != null) {
+        runningTaskToJvm.remove(t);
+        if (!killed && exitCode != 0) {
+          t.setExitCode(exitCode);
+        }
+        t.signalDone();
+      }
+    }
+
+    private class JvmRunner extends Thread {
+      JvmEnv env;
+      volatile boolean killed = false;
+      volatile int numTasksRan;
+      final int numTasksToRun;
+      JVMId jvmId;
+      volatile boolean busy = true;
+      private ShellCommandExecutor shexec; // shell terminal for running the task
+      public JvmRunner(JvmEnv env, JobID jobId) {
+        this.env = env;
+        this.jvmId = new JVMId(jobId, isMap, rand.nextInt());
+        this.numTasksToRun = env.conf.getNumTasksToExecutePerJvm();
+        LOG.info("In JvmRunner constructed JVM ID: " + jvmId);
+      }
+      public void run() {
+        runChild(env);
+      }
+
+      public void runChild(JvmEnv env) {
+        try {
+          env.vargs.add(Integer.toString(jvmId.getId()));
+          List<String> wrappedCommand = 
+            TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
+                env.logSize, env.pidFile);
+          shexec = new ShellCommandExecutor(wrappedCommand.toArray(new String[0]), 
+              env.workDir, env.env);
+          shexec.execute();
+        } catch (IOException ioe) {
+          // do nothing
+          // error and output are appropriately redirected
+        } finally { // handle the exit code
+          if (shexec == null) {
+            return;
+          }
+          int exitCode = shexec.getExitCode();
+          updateOnJvmExit(jvmId, exitCode, killed);
+          LOG.info("JVM : " + jvmId +" exited. Number of tasks it ran: " + 
+              numTasksRan);
+          try {
+            //the task jvm cleans up the common workdir for every 
+            //task at the beginning of each task in the task JVM.
+            //For the last task, we do it here.
+            if (env.conf.getNumTasksToExecutePerJvm() != 1) {
+              FileUtil.fullyDelete(env.workDir);
+            }
+          } catch (IOException ie){}
+          if (tracker.isTaskMemoryManagerEnabled()) {
+          // Remove the associated pid-file, if any
+            tracker.getTaskMemoryManager().
+               removePidFile(TaskAttemptID.forName(
+                   env.conf.get("mapred.task.id")));
+          }
+        }
+      }
+
+      public void kill() {
+        if (shexec != null) {
+          Process process = shexec.getProcess();
+          if (process != null) {
+            process.destroy();
+          }
+        }
+        removeJvm(jvmId);
+      }
+      
+      public void taskRan() {
+        busy = false;
+        numTasksRan++;
+      }
+      
+      public boolean ranAll() {
+        return(numTasksRan == numTasksToRun);
+      }
+      public void setBusy(boolean busy) {
+        this.busy = busy;
+      }
+      public boolean isBusy() {
+        return busy;
+      }
+    }
+  }  
+  static class JvmEnv { //Helper class
+    List<String> vargs;
+    List<String> setup;
+    File stdout;
+    File stderr;
+    File workDir;
+    String pidFile;
+    long logSize;
+    JobConf conf;
+    Map<String, String> env;
+
+    public JvmEnv(List<String> setup, Vector<String> vargs, File stdout, 
+        File stderr, long logSize, File workDir, Map<String,String> env,
+        String pidFile, JobConf conf) {
+      this.setup = setup;
+      this.vargs = vargs;
+      this.stdout = stdout;
+      this.stderr = stderr;
+      this.workDir = workDir;
+      this.env = env;
+      this.pidFile = pidFile;
+      this.conf = conf;
+    }
+  }
+}

+ 63 - 0
src/mapred/org/apache/hadoop/mapred/JvmTask.java

@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.io.Writable;
+
+class JvmTask implements Writable {
+  Task t;
+  boolean shouldDie;
+  public JvmTask(Task t, boolean shouldDie) {
+    this.t = t;
+    this.shouldDie = shouldDie;
+  }
+  public JvmTask() {}
+  public Task getTask() {
+    return t;
+  }
+  public boolean shouldDie() {
+    return shouldDie;
+  }
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(shouldDie);
+    if (t != null) {
+      out.writeBoolean(true);
+      out.writeBoolean(t.isMapTask());
+      t.write(out);
+    } else {
+      out.writeBoolean(false);
+    }
+  }
+  public void readFields(DataInput in) throws IOException {
+    shouldDie = in.readBoolean();
+    boolean taskComing = in.readBoolean();
+    if (taskComing) {
+      boolean isMap = in.readBoolean();
+      if (isMap) {
+        t = new MapTask();
+      } else {
+        t = new ReduceTask();
+      }
+      t.readFields(in);
+    }
+  }
+}

+ 2 - 1
src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.mapred.JobTrackerMetricsInst;
+import org.apache.hadoop.mapred.JvmTask;
 
 /** Implements MapReduce locally, in-process, for debugging. */ 
 class LocalJobRunner implements JobSubmissionProtocol {
@@ -206,7 +207,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
 
     // TaskUmbilicalProtocol methods
 
-    public Task getTask(TaskAttemptID taskid) { return null; }
+    public JvmTask getTask(JVMId jvmId, TaskAttemptID taskId) { return null; }
 
     public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) 
     throws IOException, InterruptedException {

+ 8 - 3
src/mapred/org/apache/hadoop/mapred/Task.java

@@ -108,6 +108,7 @@ abstract class Task implements Writable, Configurable {
   private int partition;                          // id within job
   TaskStatus taskStatus;                          // current status of the task
   protected boolean cleanupJob = false;
+  private Thread pingProgressThread;
   
   //skip ranges based on failed ranges from previous attempts
   private SortedRanges skipRanges = new SortedRanges();
@@ -344,7 +345,7 @@ abstract class Task implements Writable, Configurable {
    * let the parent know that it's alive. It also pings the parent to see if it's alive. 
    */
   protected void startCommunicationThread(final TaskUmbilicalProtocol umbilical) {
-    Thread thread = new Thread(new Runnable() {
+    pingProgressThread = new Thread(new Runnable() {
         public void run() {
           final int MAX_RETRIES = 3;
           int remainingRetries = MAX_RETRIES;
@@ -407,8 +408,8 @@ abstract class Task implements Writable, Configurable {
           }
         }
       }, "Comm thread for "+taskId);
-    thread.setDaemon(true);
-    thread.start();
+    pingProgressThread.setDaemon(true);
+    pingProgressThread.start();
     LOG.debug(getTaskID() + " Progress/ping thread started");
   }
 
@@ -596,6 +597,10 @@ abstract class Task implements Writable, Configurable {
       commit(umbilical, outputCommitter);
     }
     taskDone.set(true);
+    pingProgressThread.interrupt();
+    try {
+      pingProgressThread.join();
+    } catch (InterruptedException ie) {}
     sendLastUpdate(umbilical);
     //signal the tasktracker that we are done
     sendDone(umbilical);

+ 9 - 3
src/mapred/org/apache/hadoop/mapred/TaskInProgress.java

@@ -485,7 +485,9 @@ class TaskInProgress {
       // and is addressed better at the TaskTracker to ensure this.
       // @see {@link TaskTracker.transmitHeartbeat()}
       if ((newState != TaskStatus.State.RUNNING && 
-           newState != TaskStatus.State.COMMIT_PENDING ) && 
+           newState != TaskStatus.State.COMMIT_PENDING && 
+           newState != TaskStatus.State.INITIALIZED &&
+           newState != TaskStatus.State.UNASSIGNED) && 
           (oldState == newState)) {
         LOG.warn("Recieved duplicate status update of '" + newState + 
                  "' for '" + taskid + "' of TIP '" + getTIPId() + "'");
@@ -496,7 +498,9 @@ class TaskInProgress {
       // We have seen out of order status messagesmoving tasks from complete
       // to running. This is a spot fix, but it should be addressed more
       // globally.
-      if (newState == TaskStatus.State.RUNNING &&
+      if ((newState == TaskStatus.State.RUNNING || 
+          newState == TaskStatus.State.UNASSIGNED ||
+          newState == TaskStatus.State.INITIALIZED) &&
           (oldState == TaskStatus.State.FAILED || 
            oldState == TaskStatus.State.KILLED || 
            oldState == TaskStatus.State.SUCCEEDED ||
@@ -708,7 +712,9 @@ class TaskInProgress {
   boolean killTask(TaskAttemptID taskId, boolean shouldFail) {
     TaskStatus st = taskStatuses.get(taskId);
     if(st != null && (st.getRunState() == TaskStatus.State.RUNNING
-        || st.getRunState() == TaskStatus.State.COMMIT_PENDING)
+        || st.getRunState() == TaskStatus.State.COMMIT_PENDING ||
+        st.getRunState() == TaskStatus.State.INITIALIZED ||
+        st.getRunState() == TaskStatus.State.UNASSIGNED)
         && tasksToKill.put(taskId, shouldFail) == null ) {
       String logStr = "Request received to " + (shouldFail ? "fail" : "kill") 
                       + " task '" + taskId + "' by user";

+ 137 - 4
src/mapred/org/apache/hadoop/mapred/TaskLog.java

@@ -18,17 +18,25 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Enumeration;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.log4j.Appender;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 /**
  * A simple logger to handle the task-specific user logs.
@@ -50,8 +58,130 @@ public class TaskLog {
   }
 
   public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
-    return new File(new File(LOG_DIR, taskid.toString()), filter.toString());
+    return new File(getBaseDir(taskid.toString()), filter.toString());
   }
+  public static File getRealTaskLogFileLocation(TaskAttemptID taskid, 
+      LogName filter) {
+    LogFileDetail l;
+    try {
+      l = getTaskLogFileDetail(taskid, filter);
+    } catch (IOException ie) {
+      LOG.error("getTaskLogFileDetail threw an exception " + ie);
+      return null;
+    }
+    return new File(getBaseDir(l.location), filter.toString());
+  }
+  private static class LogFileDetail {
+    final static String LOCATION = "LOG_DIR:";
+    String location;
+    long start;
+    long length;
+  }
+  
+  private static LogFileDetail getTaskLogFileDetail(TaskAttemptID taskid,
+      LogName filter) throws IOException {
+    File indexFile = new File(getBaseDir(taskid.toString()), "log.index");
+    BufferedReader fis = new BufferedReader(new java.io.FileReader(indexFile));
+    //the format of the index file is
+    //LOG_DIR: <the dir where the task logs are really stored>
+    //stdout:<start-offset in the stdout file> <length>
+    //stderr:<start-offset in the stderr file> <length>
+    //syslog:<start-offset in the syslog file> <length>
+    LogFileDetail l = new LogFileDetail();
+    String str = fis.readLine();
+    if (str == null) { //the file doesn't have anything
+      throw new IOException ("Index file for the log of " + taskid+" doesn't exist.");
+    }
+    l.location = str.substring(str.indexOf(LogFileDetail.LOCATION)+
+        LogFileDetail.LOCATION.length());
+    //special cases are the debugout and profile.out files. They are guaranteed
+    //to be associated with each task attempt since jvm reuse is disabled
+    //when profiling/debugging is enabled
+    if (filter.equals(LogName.DEBUGOUT) || filter.equals(LogName.PROFILE)) {
+      l.length = new File(getBaseDir(l.location), filter.toString()).length();
+      l.start = 0;
+      fis.close();
+      return l;
+    }
+    str = fis.readLine();
+    while (str != null) {
+      //look for the exact line containing the logname
+      if (str.contains(filter.toString())) {
+        str = str.substring(filter.toString().length()+1);
+        String[] startAndLen = str.split(" ");
+        l.start = Long.parseLong(startAndLen[0]);
+        l.length = Long.parseLong(startAndLen[1]);
+        break;
+      }
+      str = fis.readLine();
+    }
+    fis.close();
+    return l;
+  }
+  
+  public static File getIndexFile(String taskid) {
+    return new File(getBaseDir(taskid), "log.index");
+  }
+  private static File getBaseDir(String taskid) {
+    return new File(LOG_DIR, taskid);
+  }
+  private static long prevOutLength;
+  private static long prevErrLength;
+  private static long prevLogLength;
+  
+  private static void writeToIndexFile(TaskAttemptID firstTaskid) 
+  throws IOException {
+    File indexFile = getIndexFile(currentTaskid.toString());
+    BufferedOutputStream bos = 
+      new BufferedOutputStream(new FileOutputStream(indexFile,false));
+    DataOutputStream dos = new DataOutputStream(bos);
+    //the format of the index file is
+    //LOG_DIR: <the dir where the task logs are really stored>
+    //STDOUT: <start-offset in the stdout file> <length>
+    //STDERR: <start-offset in the stderr file> <length>
+    //SYSLOG: <start-offset in the syslog file> <length>    
+    dos.writeBytes(LogFileDetail.LOCATION + firstTaskid.toString()+"\n"+
+        LogName.STDOUT.toString()+":");
+    dos.writeBytes(Long.toString(prevOutLength)+" ");
+    dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.STDOUT)
+        .length() - prevOutLength)+"\n"+LogName.STDERR+":");
+    dos.writeBytes(Long.toString(prevErrLength)+" ");
+    dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.STDERR)
+        .length() - prevErrLength)+"\n"+LogName.SYSLOG.toString()+":");
+    dos.writeBytes(Long.toString(prevLogLength)+" ");
+    dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.SYSLOG)
+        .length() - prevLogLength)+"\n");
+    dos.close();
+  }
+  private static void resetPrevLengths(TaskAttemptID firstTaskid) {
+    prevOutLength = getTaskLogFile(firstTaskid, LogName.STDOUT).length();
+    prevErrLength = getTaskLogFile(firstTaskid, LogName.STDERR).length();
+    prevLogLength = getTaskLogFile(firstTaskid, LogName.SYSLOG).length();
+  }
+  private volatile static TaskAttemptID currentTaskid = null;
+  @SuppressWarnings("unchecked")
+  public synchronized static void syncLogs(TaskAttemptID firstTaskid, TaskAttemptID taskid) 
+  throws IOException {
+    System.out.flush();
+    System.err.flush();
+    Enumeration<Logger> allLoggers = LogManager.getCurrentLoggers();
+    while (allLoggers.hasMoreElements()) {
+      Logger l = allLoggers.nextElement();
+      Enumeration<Appender> allAppenders = l.getAllAppenders();
+      while (allAppenders.hasMoreElements()) {
+        Appender a = allAppenders.nextElement();
+        if (a instanceof TaskLogAppender) {
+          ((TaskLogAppender)a).flush();
+        }
+      }
+    }
+    if (currentTaskid != taskid) {
+      currentTaskid = taskid;
+      resetPrevLengths(firstTaskid);
+    }
+    writeToIndexFile(firstTaskid);
+  }
+  
   
   /**
    * The filter for userlogs.
@@ -133,9 +263,9 @@ public class TaskLog {
     public Reader(TaskAttemptID taskid, LogName kind, 
                   long start, long end) throws IOException {
       // find the right log file
-      File filename = getTaskLogFile(taskid, kind);
+      LogFileDetail fileDetail = getTaskLogFileDetail(taskid, kind);
       // calculate the start and stop
-      long size = filename.length();
+      long size = fileDetail.length;
       if (start < 0) {
         start += size + 1;
       }
@@ -144,8 +274,11 @@ public class TaskLog {
       }
       start = Math.max(0, Math.min(start, size));
       end = Math.max(0, Math.min(end, size));
+      start += fileDetail.start;
+      end += fileDetail.start;
       bytesRemaining = end - start;
-      file = new FileInputStream(filename);
+      file = new FileInputStream(new File(getBaseDir(fileDetail.location), 
+          kind.toString()));
       // skip upto start
       long pos = 0;
       while (pos < start) {

+ 4 - 0
src/mapred/org/apache/hadoop/mapred/TaskLogAppender.java

@@ -61,6 +61,10 @@ public class TaskLogAppender extends FileAppender {
       }
     }
   }
+  
+  public void flush() {
+    qw.flush();
+  }
 
   @Override
   public synchronized void close() {

+ 18 - 4
src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java

@@ -27,6 +27,7 @@ import java.util.ArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.TaskTracker;
@@ -235,30 +236,43 @@ class TaskMemoryManagerThread extends Thread {
    * @return the pid of the task process.
    */
   private String getPid(TaskAttemptID tipID) {
-    Path pidFileName = getPidFilePath(tipID);
+    Path pidFileName = getPidFilePath(tipID, taskTracker.getJobConf());
     if (pidFileName == null) {
       return null;
     }
     return ProcfsBasedProcessTree.getPidFromPidFile(pidFileName.toString());
   }
 
-  private LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+  private static LocalDirAllocator lDirAlloc = 
+    new LocalDirAllocator("mapred.local.dir");
 
   /**
    * Get the pidFile path of a Task
    * @param tipID
    * @return pidFile's Path
    */
-  Path getPidFilePath(TaskAttemptID tipID) {
+  public static Path getPidFilePath(TaskAttemptID tipID, JobConf conf) {
     Path pidFileName = null;
     try {
+      //this actually need not use a localdirAllocator since the PID
+      //files are really small..
       pidFileName = lDirAlloc.getLocalPathToRead(
           (TaskTracker.getPidFilesSubdir() + Path.SEPARATOR + tipID),
-          taskTracker.getJobConf());
+          conf);
     } catch (IOException i) {
       // PID file is not there
       LOG.debug("Failed to get pidFile name for " + tipID);
     }
     return pidFileName;
   }
+  public void removePidFile(TaskAttemptID tid) {
+    if (taskTracker.isTaskMemoryManagerEnabled()) {
+      Path pidFilePath = getPidFilePath(tid, taskTracker.getJobConf());
+      if (pidFilePath != null) {
+        try {
+          FileSystem.getLocal(taskTracker.getJobConf()).delete(pidFilePath, false);
+        } catch(IOException ie) {}
+      }
+    }
+  }
 }

+ 113 - 52
src/mapred/org/apache/hadoop/mapred/TaskRunner.java

@@ -20,7 +20,6 @@ package org.apache.hadoop.mapred;
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.fs.*;
-import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.filecache.*;
 import org.apache.hadoop.util.*;
@@ -43,11 +42,16 @@ abstract class TaskRunner extends Thread {
     LogFactory.getLog(TaskRunner.class);
 
   volatile boolean killed = false;
-  private ShellCommandExecutor shexec; // shell terminal for running the task
   private Task t;
+  private Object lock = new Object();
+  private volatile boolean done = false;
+  private int exitCode = -1;
+  private boolean exitCodeSet = false;
+  
   private TaskTracker tracker;
 
   protected JobConf conf;
+  JvmManager jvmManager;
 
   /** 
    * for cleaning up old map outputs
@@ -60,6 +64,7 @@ abstract class TaskRunner extends Thread {
     this.conf = conf;
     this.mapOutputFile = new MapOutputFile(t.getJobID());
     this.mapOutputFile.setConf(conf);
+    this.jvmManager = tracker.getJvmManagerInstance();
   }
 
   public Task getTask() { return t; }
@@ -78,7 +83,7 @@ abstract class TaskRunner extends Thread {
    */
   public void close() throws IOException {}
 
-  private String stringifyPathArray(Path[] p){
+  private static String stringifyPathArray(Path[] p){
     if (p == null){
       return null;
     }
@@ -143,7 +148,8 @@ abstract class TaskRunner extends Thread {
                                                   true, Long.parseLong(
                                                         archivesTimestamps[i]),
                                                   new Path(workDir.
-                                                        getAbsolutePath()));
+                                                        getAbsolutePath()), 
+                                                  false);
             
           }
           DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
@@ -171,7 +177,8 @@ abstract class TaskRunner extends Thread {
                                                   false, Long.parseLong(
                                                            fileTimestamps[i]),
                                                   new Path(workDir.
-                                                        getAbsolutePath()));
+                                                        getAbsolutePath()), 
+                                                  false);
           }
           DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
         }
@@ -185,17 +192,7 @@ abstract class TaskRunner extends Thread {
           out.close();
         }
       }
-    
-      // create symlinks for all the files in job cache dir in current
-      // workingdir for streaming
-      try{
-        DistributedCache.createAllSymlink(conf, jobCacheDir, 
-                                          workDir);
-      } catch(IOException ie){
-        // Do not exit even if symlinks have not been created.
-        LOG.warn(StringUtils.stringifyException(ie));
-      }
-      
+          
       if (!prepare()) {
         return;
       }
@@ -366,7 +363,7 @@ abstract class TaskRunner extends Thread {
       }
 
       // Add main class and its arguments 
-      vargs.add(TaskTracker.Child.class.getName());  // main of Child
+      vargs.add(Child.class.getName());  // main of Child
       // pass umbilical address
       InetSocketAddress address = tracker.getTaskTrackerReportAddress();
       vargs.add(address.getAddress().getHostAddress()); 
@@ -395,9 +392,7 @@ abstract class TaskRunner extends Thread {
       File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
       stdout.getParentFile().mkdirs();
       tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout, stderr);
-      List<String> wrappedCommand = 
-        TaskLog.captureOutAndError(setup, vargs, stdout, stderr, logSize, pidFile);
-      LOG.debug("child jvm command : " + wrappedCommand.toString());
+
       Map<String, String> env = new HashMap<String, String>();
       StringBuffer ldLibraryPath = new StringBuffer();
       ldLibraryPath.append(workDir.toString());
@@ -408,9 +403,26 @@ abstract class TaskRunner extends Thread {
         ldLibraryPath.append(oldLdLibraryPath);
       }
       env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
-      // Run the task as child of the parent TaskTracker process
-      runChild(wrappedCommand, workDir, env, taskid);
-
+      tracker.taskInitialized(t.getTaskID());
+      LOG.info("Task ID: " + t.getTaskID() +" initialized");
+      jvmManager.launchJvm(t.getJobID(), t.isMapTask(), 
+          jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize, 
+              workDir, env, pidFile, conf));
+      synchronized (lock) {
+        while (!done) {
+          lock.wait();
+        }
+      }
+      tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
+      if (exitCodeSet) {
+        if (!killed && exitCode != 0) {
+          if (exitCode == 65) {
+            tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID());
+          }
+          throw new IOException("Task process exit with nonzero status of " +
+              exitCode + ".");
+        }
+      }
     } catch (FSError e) {
       LOG.fatal("FSError", e);
       try {
@@ -445,31 +457,76 @@ abstract class TaskRunner extends Thread {
         LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
       }
       tracker.reportTaskFinished(t.getTaskID(), false);
+      if (t.isMapTask()) {
+        tracker.addFreeMapSlot();
+      } else {
+        tracker.addFreeReduceSlot();
+      }
     }
   }
-
-  /**
-   * Run the child process
-   */
-  private void runChild(List<String> args, File dir,
-                        Map<String, String> env,
-                        TaskAttemptID taskid) throws IOException {
-
-    shexec = new ShellCommandExecutor(args.toArray(new String[0]), dir, env);
-    try {
-      shexec.execute();
-    } catch (IOException ioe) {
-      // do nothing
-      // error and output are appropriately redirected
-    } finally { // handle the exit code
-      int exit_code = shexec.getExitCode();
-      tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
-      if (!killed && exit_code != 0) {
-        if (exit_code == 65) {
-          tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID());
+  
+  //Mostly for setting up the symlinks. Note that when we setup the distributed
+  //cache, we didn't create the symlinks. This is done on a per task basis
+  //by the currently executing task.
+  public static void setupWorkDir(JobConf conf) throws IOException {
+    File workDir = new File(".").getAbsoluteFile();
+    FileUtil.fullyDelete(workDir);
+    if (DistributedCache.getSymlink(conf)) {
+      URI[] archives = DistributedCache.getCacheArchives(conf);
+      URI[] files = DistributedCache.getCacheFiles(conf);
+      Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
+      Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
+      if (archives != null) {
+        for (int i = 0; i < archives.length; i++) {
+          String link = archives[i].getFragment();
+          if (link != null) {
+            link = workDir.toString() + Path.SEPARATOR + link;
+            File flink = new File(link);
+            if (!flink.exists()) {
+              FileUtil.symLink(localArchives[i].toString(), link);
+            }
+          }
         }
-        throw new IOException("Task process exit with nonzero status of " +
-                              exit_code + ": "+ shexec);
+      }
+      if (files != null) {
+        for (int i = 0; i < files.length; i++) {
+          String link = files[i].getFragment();
+          if (link != null) {
+            link = workDir.toString() + Path.SEPARATOR + link;
+            File flink = new File(link);
+            if (!flink.exists()) {
+              FileUtil.symLink(localFiles[i].toString(), link);
+            }
+          }
+        }
+      }
+    }
+    File jobCacheDir = null;
+    if (conf.getJar() != null) {
+      jobCacheDir = new File(
+          new Path(conf.getJar()).getParent().toString());
+    }
+
+    // create symlinks for all the files in job cache dir in current
+    // workingdir for streaming
+    try{
+      DistributedCache.createAllSymlink(conf, jobCacheDir,
+          workDir);
+    } catch(IOException ie){
+      // Do not exit even if symlinks have not been created.
+      LOG.warn(StringUtils.stringifyException(ie));
+    }
+    // add java.io.tmpdir given by mapred.child.tmp
+    String tmp = conf.get("mapred.child.tmp", "./tmp");
+    Path tmpDir = new Path(tmp);
+
+    // if temp directory path is not absolute
+    // prepend it with workDir.
+    if (!tmpDir.isAbsolute()) {
+      tmpDir = new Path(workDir.toString(), tmp);
+      FileSystem localFs = FileSystem.getLocal(conf);
+      if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()){
+        throw new IOException("Mkdirs failed to create " + tmpDir.toString());
       }
     }
   }
@@ -478,13 +535,17 @@ abstract class TaskRunner extends Thread {
    * Kill the child process
    */
   public void kill() {
-    if (shexec != null) {
-      Process process = shexec.getProcess();
-      if (process != null) {
-        process.destroy();
-      }
-    }
     killed = true;
+    jvmManager.taskKilled(this);
+  }
+  public void signalDone() {
+    synchronized (lock) {
+      done = true;
+      lock.notify();
+    }
+  }
+  public void setExitCode(int exitCode) {
+    this.exitCodeSet = true;
+    this.exitCode = exitCode;
   }
-
 }

+ 2 - 2
src/mapred/org/apache/hadoop/mapred/TaskStatus.java

@@ -41,11 +41,11 @@ abstract class TaskStatus implements Writable, Cloneable {
 
   // what state is the task in?
   public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, 
-                            COMMIT_PENDING}
+                            COMMIT_PENDING, INITIALIZED}
     
   private TaskAttemptID taskid;
   private float progress;
-  private State runState;
+  private volatile State runState;
   private String diagnosticInfo;
   private String stateString;
   private String taskTracker;

+ 267 - 113
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -18,6 +18,9 @@
  package org.apache.hadoop.mapred;
 
 import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -32,11 +35,13 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.LinkedHashMap;
 import java.util.Vector;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -62,6 +67,7 @@ import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
@@ -184,6 +190,8 @@ public class TaskTracker
   private MapEventsFetcherThread mapEventsFetcher;
   int workerThreads;
   private CleanupQueue directoryCleanupThread;
+  volatile JvmManager jvmManager;
+  
   private TaskMemoryManagerThread taskMemoryManager;
   private boolean taskMemoryManagerEnabled = false;
   private long maxVirtualMemoryForTasks 
@@ -389,7 +397,7 @@ public class TaskTracker
 
     // Clear out state tables
     this.tasks.clear();
-    this.runningTasks = new TreeMap<TaskAttemptID, TaskInProgress>();
+    this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
     this.runningJobs = new TreeMap<JobID, RunningJob>();
     this.mapTotal = 0;
     this.reduceTotal = 0;
@@ -422,6 +430,8 @@ public class TaskTracker
     InetSocketAddress socAddr = NetUtils.createSocketAddr(address);
     String bindAddress = socAddr.getHostName();
     int tmpPort = socAddr.getPort();
+    
+    this.jvmManager = new JvmManager(this);
 
     // RPC initialization
     int max = maxCurrentMapTasks > maxCurrentReduceTasks ? 
@@ -472,6 +482,10 @@ public class TaskTracker
       taskMemoryManager.setDaemon(true);
       taskMemoryManager.start();
     }
+    mapLauncher = new TaskLauncher(maxCurrentMapTasks);
+    reduceLauncher = new TaskLauncher(maxCurrentReduceTasks);
+    mapLauncher.start();
+    reduceLauncher.start();
     this.running = true;
   }
   
@@ -675,7 +689,6 @@ public class TaskTracker
   private void localizeJob(TaskInProgress tip) throws IOException {
     Path localJarFile = null;
     Task t = tip.getTask();
-    
     JobID jobId = t.getJobID();
     Path jobFile = new Path(t.getJobFile());
     // Get sizes of JobFile and JarFile
@@ -840,6 +853,14 @@ public class TaskTracker
     // Shutdown the fetcher thread
     this.mapEventsFetcher.interrupt();
     
+    //stop the launchers
+    mapLauncher.cleanTaskQueue();
+    reduceLauncher.cleanTaskQueue();
+    this.mapLauncher.interrupt();
+    this.reduceLauncher.interrupt();
+    
+    jvmManager.stop();
+    
     // shutdown RPC connections
     RPC.stopProxy(jobClient);
   }
@@ -1037,7 +1058,7 @@ public class TaskTracker
         if (actions != null){ 
           for(TaskTrackerAction action: actions) {
             if (action instanceof LaunchTaskAction) {
-              startNewTask((LaunchTaskAction) action);
+              addToTaskQueue((LaunchTaskAction)action);
             } else if (action instanceof CommitTaskAction) {
               CommitTaskAction commitAction = (CommitTaskAction)action;
               if (!commitResponses.contains(commitAction.getTaskID())) {
@@ -1130,8 +1151,8 @@ public class TaskTracker
     boolean askForNewTask;
     long localMinSpaceStart;
     synchronized (this) {
-      askForNewTask = (mapTotal < maxCurrentMapTasks || 
-                       reduceTotal < maxCurrentReduceTasks) &&
+      askForNewTask = (status.countMapTasks() < maxCurrentMapTasks || 
+                       status.countReduceTasks() < maxCurrentReduceTasks) &&
                       acceptNewTasks; 
       localMinSpaceStart = minSpaceStart;
     }
@@ -1161,6 +1182,8 @@ public class TaskTracker
     synchronized (this) {
       for (TaskStatus taskStatus : status.getTaskReports()) {
         if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
+            taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
+            taskStatus.getRunState() != TaskStatus.State.INITIALIZED &&
             taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
           if (taskStatus.getIsMap()) {
             mapTotal--;
@@ -1192,7 +1215,7 @@ public class TaskTracker
   /**
    * Return the maximum amount of memory available for all tasks on 
    * this tracker
-   * @return maximum amount of virtual memory in kilobytes
+   * @return maximum amount of virtual memory
    */
   long getMaxVirtualMemoryForTasks() {
     return maxVirtualMemoryForTasks;
@@ -1208,7 +1231,7 @@ public class TaskTracker
    * and the total amount of maximum virtual memory that can be
    * used by all currently running tasks.
    * 
-   * @return amount of free virtual memory in kilobytes that can be assured for
+   * @return amount of free virtual memory that can be assured for
    * new tasks
    */
   private synchronized long findFreeVirtualMemory() {
@@ -1224,9 +1247,9 @@ public class TaskTracker
       // still occupied and hence memory of the task should be
       // accounted in used memory.
       if ((tip.getRunState() == TaskStatus.State.RUNNING)
-            || (tip.getRunState() == TaskStatus.State.UNASSIGNED)
+            || (tip.getRunState() == TaskStatus.State.INITIALIZED)
             || (tip.getRunState() == TaskStatus.State.COMMIT_PENDING)) {
-        maxMemoryUsed += getMemoryForTask(tip);
+        maxMemoryUsed += getMemoryForTask(tip.getJobConf());
       }
     }
   
@@ -1239,11 +1262,11 @@ public class TaskTracker
    * If the TIP's job has a configured value for the max memory that is
    * returned. Else, the default memory that would be assigned for the
    * task is returned.
-   * @param tip The TaskInProgress
+   * @param conf
    * @return the memory allocated for the TIP.
    */
-  private long getMemoryForTask(TaskInProgress tip) {
-    long memForTask = tip.getJobConf().getMaxVirtualMemoryForTask();
+  public long getMemoryForTask(JobConf conf) {
+    long memForTask = conf.getMaxVirtualMemoryForTask();
     if (memForTask == JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT) {
       memForTask = this.getDefaultMemoryPerTask();
     }
@@ -1278,6 +1301,7 @@ public class TaskTracker
     long now = System.currentTimeMillis();
     for (TaskInProgress tip: runningTasks.values()) {
       if (tip.getRunState() == TaskStatus.State.RUNNING ||
+          tip.getRunState() == TaskStatus.State.INITIALIZED ||
           tip.getRunState() == TaskStatus.State.COMMIT_PENDING) {
         // Check the per-job timeout interval for tasks;
         // an interval of '0' implies it is never timed-out
@@ -1497,15 +1521,101 @@ public class TaskTracker
       return -1;
     }
   }
+
+  private TaskLauncher mapLauncher;
+  private TaskLauncher reduceLauncher;
+      
+  public JvmManager getJvmManagerInstance() {
+    return jvmManager;
+  }
+
+  public void addFreeMapSlot() {
+    mapLauncher.addFreeSlot();
+  }
   
-  /**
-   * Start a new task.
-   * All exceptions are handled locally, so that we don't mess up the
-   * task tracker.
-   */
-  private void startNewTask(LaunchTaskAction action) {
+  public void addFreeReduceSlot() {
+    reduceLauncher.addFreeSlot();
+  }
+  
+  private void addToTaskQueue(LaunchTaskAction action) {
+    if (action.getTask().isMapTask()) {
+      mapLauncher.addToTaskQueue(action);
+    } else {
+      reduceLauncher.addToTaskQueue(action);
+    }
+  }
+  
+  private class TaskLauncher extends Thread {
+    private IntWritable numFreeSlots;
+    private final int maxSlots;
+    private List<TaskInProgress> tasksToLaunch;
+
+    public TaskLauncher(int numSlots) {
+      this.maxSlots = numSlots;
+      this.numFreeSlots = new IntWritable(numSlots);
+      this.tasksToLaunch = new LinkedList<TaskInProgress>();
+      setDaemon(true);
+      setName("TaskLauncher for task");
+    }
+
+    public void addToTaskQueue(LaunchTaskAction action) {
+      synchronized (tasksToLaunch) {
+        TaskInProgress tip = registerTask(action);
+        tasksToLaunch.add(tip);
+        tasksToLaunch.notifyAll();
+      }
+    }
+    
+    public void cleanTaskQueue() {
+      tasksToLaunch.clear();
+    }
+    
+    public void addFreeSlot() {
+      synchronized (numFreeSlots) {
+        numFreeSlots.set(numFreeSlots.get() + 1);
+        assert (numFreeSlots.get() <= maxSlots);
+        LOG.info("addFreeSlot : current free slots : " + numFreeSlots.get());
+        numFreeSlots.notifyAll();
+      }
+    }
+    
+    public void run() {
+      while (!Thread.interrupted()) {
+        try {
+          TaskInProgress tip;
+          synchronized (tasksToLaunch) {
+            while (tasksToLaunch.isEmpty()) {
+              tasksToLaunch.wait();
+            }
+            //get the TIP
+            tip = tasksToLaunch.remove(0);
+            LOG.info("Trying to launch : " + tip.getTask().getTaskID());
+          }
+          //wait for a slot to run
+          synchronized (numFreeSlots) {
+            while (numFreeSlots.get() == 0) {
+              numFreeSlots.wait();
+            }
+            LOG.info("In TaskLauncher, current free slots : " + numFreeSlots.get()+
+                " and trying to launch "+tip.getTask().getTaskID());
+            numFreeSlots.set(numFreeSlots.get() - 1);
+            assert (numFreeSlots.get() >= 0);
+          }
+          
+          //got a free slot. launch the task
+          startNewTask(tip);
+        } catch (InterruptedException e) { 
+          return; // ALL DONE
+        } catch (Throwable th) {
+          LOG.error("TaskLauncher error " + 
+              StringUtils.stringifyException(th));
+        }
+      }
+    }
+  }
+  private TaskInProgress registerTask(LaunchTaskAction action) {
     Task t = action.getTask();
-    LOG.info("LaunchTaskAction: " + t.getTaskID());
+    LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID());
     TaskInProgress tip = new TaskInProgress(t, this.fConf);
     synchronized (this) {
       tasks.put(t.getTaskID(), tip);
@@ -1517,10 +1627,19 @@ public class TaskTracker
         reduceTotal++;
       }
     }
+    return tip;
+  }
+  /**
+   * Start a new task.
+   * All exceptions are handled locally, so that we don't mess up the
+   * task tracker.
+   */
+  private void startNewTask(TaskInProgress tip) {
     try {
       localizeJob(tip);
       if (isTaskMemoryManagerEnabled()) {
-        taskMemoryManager.addTask(t.getTaskID(), getMemoryForTask(tip));
+        taskMemoryManager.addTask(tip.getTask().getTaskID(), 
+            getMemoryForTask(tip.getJobConf()));
       }
     } catch (Throwable e) {
       String msg = ("Error initializing " + tip.getTask().getTaskID() + 
@@ -1691,13 +1810,11 @@ public class TaskTracker
         }
         localJobConf.set("hadoop.net.static.resolutions", str.toString());
       }
-      OutputStream out = localFs.create(localTaskFile);
-      try {
-        localJobConf.write(out);
-      } finally {
-        out.close();
+      if (task.isMapTask()) {
+        debugCommand = localJobConf.getMapDebugScript();
+      } else {
+        debugCommand = localJobConf.getReduceDebugScript();
       }
-      task.setConf(localJobConf);
       String keepPattern = localJobConf.getKeepTaskFilesPattern();
       if (keepPattern != null) {
         alwaysKeepTaskFiles = 
@@ -1705,11 +1822,21 @@ public class TaskTracker
       } else {
         alwaysKeepTaskFiles = false;
       }
-      if (task.isMapTask()) {
-        debugCommand = localJobConf.getMapDebugScript();
-      } else {
-        debugCommand = localJobConf.getReduceDebugScript();
+      if (debugCommand != null || localJobConf.getProfileEnabled() ||
+          alwaysKeepTaskFiles) {
+        //disable jvm reuse
+        localJobConf.setNumTasksToExecutePerJvm(1);
+      }
+      if (isTaskMemoryManagerEnabled()) {
+        localJobConf.setBoolean("task.memory.mgmt.enabled", true);
       }
+      OutputStream out = localFs.create(localTaskFile);
+      try {
+        localJobConf.write(out);
+      } finally {
+        out.close();
+      }
+      task.setConf(localJobConf);
     }
         
     /**
@@ -1717,6 +1844,10 @@ public class TaskTracker
     public Task getTask() {
       return task;
     }
+    
+    public TaskRunner getTaskRunner() {
+      return runner;
+    }
 
     public synchronized void setJobConf(JobConf lconf){
       this.localJobConf = lconf;
@@ -1745,10 +1876,8 @@ public class TaskTracker
      */
     public synchronized void launchTask() throws IOException {
       localizeTask(task);
-      this.taskStatus.setRunState(TaskStatus.State.RUNNING);
       this.runner = task.createRunner(TaskTracker.this);
       this.runner.start();
-      this.taskStatus.setStartTime(System.currentTimeMillis());
     }
 
     /**
@@ -1816,7 +1945,8 @@ public class TaskTracker
       this.taskStatus.setProgress(1.0f);
       this.taskStatus.setFinishTime(System.currentTimeMillis());
       this.done = true;
-      
+      jvmManager.taskFinished(runner);
+      runner.signalDone();
       LOG.info("Task " + task.getTaskID() + " is done.");
       LOG.info("reported output size for " + task.getTaskID() +  "  was " + taskStatus.getOutputSize());
 
@@ -1857,13 +1987,16 @@ public class TaskTracker
               String jobConf = task.getJobFile();
               try {
                 // get task's stdout file 
-                taskStdout = FileUtil.makeShellPath(TaskLog.getTaskLogFile
+                taskStdout = FileUtil.makeShellPath(
+                    TaskLog.getRealTaskLogFileLocation
                                   (task.getTaskID(), TaskLog.LogName.STDOUT));
                 // get task's stderr file 
-                taskStderr = FileUtil.makeShellPath(TaskLog.getTaskLogFile
+                taskStderr = FileUtil.makeShellPath(
+                    TaskLog.getRealTaskLogFileLocation
                                   (task.getTaskID(), TaskLog.LogName.STDERR));
                 // get task's syslog file 
-                taskSyslog = FileUtil.makeShellPath(TaskLog.getTaskLogFile
+                taskSyslog = FileUtil.makeShellPath(
+                    TaskLog.getRealTaskLogFileLocation
                                   (task.getTaskID(), TaskLog.LogName.SYSLOG));
               } catch(IOException e){
                 LOG.warn("Exception finding task's stdout/err/syslog files");
@@ -1882,8 +2015,8 @@ public class TaskTracker
                           StringUtils.stringifyException(e));
               }
               // Build the command  
-              File stdout = TaskLog.getTaskLogFile(task.getTaskID(),
-                                                   TaskLog.LogName.DEBUGOUT);
+              File stdout = TaskLog.getRealTaskLogFileLocation(
+                                   task.getTaskID(), TaskLog.LogName.DEBUGOUT);
               // add pipes program as argument if it exists.
               String program ="";
               String executable = Submitter.getExecutable(localJobConf);
@@ -1951,6 +2084,13 @@ public class TaskTracker
       }
 
     }
+    
+    synchronized void taskInitialized() {
+      if (taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
+        //one-way state change to INITIALIZED
+        this.taskStatus.setRunState(TaskStatus.State.INITIALIZED);
+      }
+    }
   
 
     /**
@@ -2042,6 +2182,8 @@ public class TaskTracker
       // Kill the task if it is still running
       synchronized(this){
         if (getRunState() == TaskStatus.State.RUNNING ||
+            getRunState() == TaskStatus.State.UNASSIGNED ||
+            getRunState() == TaskStatus.State.INITIALIZED ||
             getRunState() == TaskStatus.State.COMMIT_PENDING) {
           kill(wasFailure);
         }
@@ -2057,12 +2199,12 @@ public class TaskTracker
      */
     public synchronized void kill(boolean wasFailure) throws IOException {
       if (taskStatus.getRunState() == TaskStatus.State.RUNNING ||
+          taskStatus.getRunState() == TaskStatus.State.INITIALIZED ||
           taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
         wasKilled = true;
         if (wasFailure) {
           failures += 1;
         }
-        runner.kill();
         taskStatus.setRunState((wasFailure) ? 
                                   TaskStatus.State.FAILED : 
                                   TaskStatus.State.KILLED);
@@ -2074,6 +2216,16 @@ public class TaskTracker
           taskStatus.setRunState(TaskStatus.State.KILLED);
         }
       }
+      if (runner != null) {
+        runner.kill();
+        runner.signalDone();
+      } else {
+        if (task.isMapTask()) {
+          addFreeMapSlot();
+        } else {
+          addFreeReduceSlot();
+        }
+      }
     }
 
     /**
@@ -2112,13 +2264,6 @@ public class TaskTracker
       TaskAttemptID taskId = task.getTaskID();
       LOG.debug("Cleaning up " + taskId);
 
-      // Remove the associated pid-file, if any
-      if (TaskTracker.this.isTaskMemoryManagerEnabled()) {
-        Path pidFilePath = taskMemoryManager.getPidFilePath(taskId);
-        if (pidFilePath != null) {
-          directoryCleanupThread.addToQueue(pidFilePath);
-        }
-      }
 
       synchronized (TaskTracker.this) {
         if (needCleanup) {
@@ -2138,13 +2283,28 @@ public class TaskTracker
                            + task.getJobID() + Path.SEPARATOR + taskId;
           if (needCleanup) {
             if (runner != null) {
+              //cleans up the output directory of the task (where map outputs 
+              //and reduce inputs get stored)
               runner.close();
             }
-            directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
-                                                            taskDir));
+            //We don't delete the workdir
+            //since some other task (running in the same JVM) 
+            //might be using the dir. The JVM running the tasks would clean
+            //the workdir per a task in the task process itself.
+            if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+              directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+                  taskDir));
+            }  
+            
+            else {
+              directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+                taskDir+"/job.xml"));
+            }
           } else {
-            directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
-                           taskDir + Path.SEPARATOR + MRConstants.WORKDIR));
+            if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+              directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+                  taskDir+"/work"));
+            }  
           }
         } catch (Throwable ie) {
           LOG.info("Error cleaning up task runner: " + 
@@ -2170,16 +2330,54 @@ public class TaskTracker
   // ///////////////////////////////////////////////////////////////
   // TaskUmbilicalProtocol
   /////////////////////////////////////////////////////////////////
+
   /**
    * Called upon startup by the child process, to fetch Task data.
    */
-  public synchronized Task getTask(TaskAttemptID taskid) throws IOException {
-    TaskInProgress tip = tasks.get(taskid);
-    if (tip != null) {
-      return tip.getTask();
-    } else {
-      return null;
+  public synchronized JvmTask getTask(JVMId jvmId, TaskAttemptID firstTaskId) 
+  throws IOException {
+    LOG.debug("JVM with ID : " + jvmId + " asked for a task");
+    if (!jvmManager.isJvmKnown(jvmId)) {
+      LOG.info("Killing unknown JVM " + jvmId);
+      return new JvmTask(null, true);
+    }
+    RunningJob rjob = runningJobs.get(jvmId.getJobId());
+    if (rjob == null) { //kill the JVM since the job is dead
+      jvmManager.killJvm(jvmId);
+      return new JvmTask(null, true);
+    }
+    TaskInProgress t = runningTasks.get(firstTaskId);
+    //if we can give the JVM the task it is asking for, well and good;
+    //if not, we give it some other task from the same job (note that some
+    //other JVM might have run this task while this JVM was init'ing)
+    if (t == null || t.getStatus().getRunState() != 
+                     TaskStatus.State.INITIALIZED) {
+      boolean isMap = jvmId.isMapJVM();
+      synchronized (rjob) {
+        for (TaskInProgress tip : runningTasks.values()) {
+          synchronized (tip) {
+            if (tip.getTask().getJobID().equals(jvmId.getJobId()) &&
+                tip.getRunState() == TaskStatus.State.INITIALIZED
+                && ((isMap && tip.getTask().isMapTask()) ||
+                    (!isMap && !tip.getTask().isMapTask()))) {
+              t = tip;
+            }
+          }
+        }
+      }
     }
+    //now the task could be null or we could have got a task that already
+    //ran earlier (the firstTaskId case)
+    if (t == null || t.getRunState() != TaskStatus.State.INITIALIZED) {
+      jvmManager.setRunningTaskForJvm(jvmId, null);  
+      return new JvmTask(null, false);
+    }
+    t.getStatus().setRunState(TaskStatus.State.RUNNING);
+    t.getStatus().setStartTime(System.currentTimeMillis());
+    jvmManager.setRunningTaskForJvm(jvmId,t.getTaskRunner());
+    LOG.info("JVM with ID: " + jvmId + " given task: " + 
+        t.getTask().getTaskID().toString());
+    return new JvmTask(t.getTask(), false);
   }
 
   /**
@@ -2334,6 +2532,13 @@ public class TaskTracker
       taskMemoryManager.removeTask(taskid);
     }
   }
+  
+  synchronized void taskInitialized(TaskAttemptID taskid) {
+    TaskInProgress tip = tasks.get(taskid);
+    if (tip != null) {
+      tip.taskInitialized();
+    }
+  }
 
   /**
    * A completed map task's output has been lost.
@@ -2355,7 +2560,7 @@ public class TaskTracker
     private JobID jobid; 
     private Path jobFile;
     // keep this for later use
-    Set<TaskInProgress> tasks;
+    volatile Set<TaskInProgress> tasks;
     boolean localized;
     boolean keepJobFiles;
     FetchStatus f;
@@ -2384,61 +2589,6 @@ public class TaskTracker
     }
   }
 
-  /** 
-   * The main() for child processes. 
-   */
-  public static class Child {
-    
-    public static void main(String[] args) throws Throwable {
-      //LogFactory.showTime(false);
-      LOG.debug("Child starting");
-
-      JobConf defaultConf = new JobConf();
-      String host = args[0];
-      int port = Integer.parseInt(args[1]);
-      InetSocketAddress address = new InetSocketAddress(host, port);
-      TaskAttemptID taskid = TaskAttemptID.forName(args[2]);
-      TaskUmbilicalProtocol umbilical =
-        (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
-                                            TaskUmbilicalProtocol.versionID,
-                                            address,
-                                            defaultConf);
-            
-      Task task = umbilical.getTask(taskid);
-      JobConf job = new JobConf(task.getJobFile());
-      TaskLog.cleanup(job.getInt("mapred.userlog.retain.hours", 24));
-      task.setConf(job);
-          
-      defaultConf.addResource(new Path(task.getJobFile()));
-      
-      // Initiate Java VM metrics
-      JvmMetrics.init(task.getPhase().toString(), job.getSessionId());
-
-      try {
-        // use job-specified working directory
-        FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
-        task.run(job, umbilical);             // run the task
-      } catch (FSError e) {
-        LOG.fatal("FSError from child", e);
-        umbilical.fsError(taskid, e.getMessage());
-      } catch (Throwable throwable) {
-        LOG.warn("Error running child", throwable);
-        // Report back any failures, for diagnostic purposes
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        throwable.printStackTrace(new PrintStream(baos));
-        umbilical.reportDiagnosticInfo(taskid, baos.toString());
-      } finally {
-        RPC.stopProxy(umbilical);
-        MetricsContext metricsContext = MetricsUtil.getContext("mapred");
-        metricsContext.close();
-        // Shutting down log4j of the child-vm... 
-        // This assumes that on return from Task.run() 
-        // there is no more logging done.
-        LogManager.shutdown();
-      }
-    }
-  }
-
   /**
    * Get the name for this task tracker.
    * @return the string like "tracker_mymachine:50010"
@@ -2804,9 +2954,13 @@ public class TaskTracker
    * Is the TaskMemoryManager Enabled on this system?
    * @return true if enabled, false otherwise.
    */
-  boolean isTaskMemoryManagerEnabled() {
+  public boolean isTaskMemoryManagerEnabled() {
     return taskMemoryManagerEnabled;
   }
+  
+  public TaskMemoryManagerThread getTaskMemoryManager() {
+    return taskMemoryManager;
+  }
 
   private void setTaskMemoryManagerEnabledFlag() {
     if (!ProcfsBasedProcessTree.isAvailable()) {

+ 2 - 2
src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java

@@ -208,7 +208,7 @@ class TaskTrackerStatus implements Writable {
       if (ts.getIsMap() &&
           ((state == TaskStatus.State.RUNNING) ||
            (state == TaskStatus.State.UNASSIGNED) ||
-           (state == TaskStatus.State.COMMIT_PENDING))) {
+           (state == TaskStatus.State.INITIALIZED))) {
         mapCount++;
       }
     }
@@ -226,7 +226,7 @@ class TaskTrackerStatus implements Writable {
       if ((!ts.getIsMap()) &&
           ((state == TaskStatus.State.RUNNING) ||  
            (state == TaskStatus.State.UNASSIGNED) ||
-           (state == TaskStatus.State.COMMIT_PENDING))) {
+           (state == TaskStatus.State.INITIALIZED))) {
         reduceCount++;
       }
     }

+ 11 - 3
src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 
 import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.mapred.JvmTask;
 
 /** Protocol that task child process uses to contact its parent process.  The
  * parent is a daemon which which polls the central master for a new map or
@@ -49,12 +50,19 @@ interface TaskUmbilicalProtocol extends VersionedProtocol {
    * Version 12 getMapCompletionEvents() now also indicates if the events are 
    *            stale or not. Hence the return type is a class that 
    *            encapsulates the events and whether to reset events index.
+   * Version 13 changed the getTask method signature for HADOOP-249
    * */
 
-  public static final long versionID = 11L;
+  public static final long versionID = 13L;
   
-  /** Called when a child task process starts, to get its task.*/
-  Task getTask(TaskAttemptID taskid) throws IOException;
+  /**
+   * Called when a child task process starts, to get its task.
+   * @param jvmId the ID of this JVM w.r.t the tasktracker that launched it
+   * @param taskid the first taskid that the JVM runs
+   * @return Task object
+   * @throws IOException 
+   */
+  JvmTask getTask(JVMId jvmId, TaskAttemptID taskid) throws IOException;
 
   /**
    * Report child's progress to parent.

+ 5 - 4
src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java

@@ -72,14 +72,12 @@ public class TestMiniMRDFSSort extends TestCase {
     // Run Sort-Validator
     assertEquals(ToolRunner.run(job, new SortValidator(), svArgs), 0);
   }
-
+  Configuration conf = new Configuration();
   public void testMapReduceSort() throws Exception {
     MiniDFSCluster dfs = null;
     MiniMRCluster mr = null;
     FileSystem fileSys = null;
     try {
-      Configuration conf = new Configuration();
-
       // set io.sort.mb and fsinmemory.size.mb to lower value in test
       conf.setInt("io.sort.mb", 5);
       conf.setInt("fs.inmemory.size.mb", 20);
@@ -103,5 +101,8 @@ public class TestMiniMRDFSSort extends TestCase {
       }
     }
   }
-  
+  public void testMapReduceSortWithJvmReuse() throws Exception {
+    conf.setInt("mapred.job.reuse.jvm.num.tasks", -1);
+    testMapReduceSort();
+  }
 }