Quellcode durchsuchen

HADOOP-5938. Change org.apache.hadoop.mapred.jobcontrol to use new mapreduce api. Contributed by Amareshwari Sriramadasu.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@784663 13f79535-47bb-0310-9956-ffa450edef68
Sharad Agarwal vor 16 Jahren
Ursprung
Commit
dcc852b1c1

+ 3 - 0
CHANGES.txt

@@ -441,6 +441,9 @@ Trunk (unreleased changes)
     HADOOP-5961. DataNode process understand generic hadoop command line
     options (like -Ddfs.property=value). (Raghu Angadi)
 
+    HADOOP-5938. Change org.apache.hadoop.mapred.jobcontrol to use new
+    api. (Amareshwari Sriramadasu via sharad)
+
   OPTIMIZATIONS
 
     HADOOP-5595. NameNode does not need to run a replicator to choose a

+ 56 - 303
src/mapred/org/apache/hadoop/mapred/jobcontrol/Job.java

@@ -22,167 +22,62 @@ package org.apache.hadoop.mapred.jobcontrol;
 import java.io.IOException;
 import java.util.ArrayList;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
 
-/** This class encapsulates a MapReduce job and its dependency. It monitors 
- *  the states of the depending jobs and updates the state of this job.
- *  A job starts in the WAITING state. If it does not have any depending jobs, or
- *  all of the depending jobs are in SUCCESS state, then the job state will become
- *  READY. If any depending jobs fail, the job will fail too. 
- *  When in READY state, the job can be submitted to Hadoop for execution, with
- *  the state changing into RUNNING state. From RUNNING state, the job can get into 
- *  SUCCESS or FAILED state, depending the status of the job execution.
- *  
+/** 
+ * @deprecated Use {@link ControlledJob} instead.  
  */
+@Deprecated
+public class Job extends ControlledJob {
+  static final Log LOG = LogFactory.getLog(Job.class);
 
-public class Job {
-
-  // A job will be in one of the following states
   final public static int SUCCESS = 0;
   final public static int WAITING = 1;
   final public static int RUNNING = 2;
   final public static int READY = 3;
   final public static int FAILED = 4;
   final public static int DEPENDENT_FAILED = 5;
-	
-	
-  private JobConf theJobConf;
-  private int state;
-  private String jobID; 		// assigned and used by JobControl class
-  private JobID mapredJobID; // the job ID assigned by map/reduce
-  private String jobName;		// external name, assigned/used by client app
-  private String message;		// some info for human consumption, 
-  // e.g. the reason why the job failed
-  private ArrayList<Job> dependingJobs;	// the jobs the current job depends on
-	
-  private JobClient jc = null;		// the map reduce job client
-	
+
   /** 
    * Construct a job.
    * @param jobConf a mapred job configuration representing a job to be executed.
    * @param dependingJobs an array of jobs the current job depends on
    */
-  public Job(JobConf jobConf, ArrayList<Job> dependingJobs) throws IOException {
-    this.theJobConf = jobConf;
-    this.dependingJobs = dependingJobs;
-    this.state = Job.WAITING;
-    this.jobID = "unassigned";
-    this.mapredJobID = null; //not yet assigned 
-    this.jobName = "unassigned";
-    this.message = "just initialized";
-    this.jc = new JobClient(jobConf);
-  }
-  
-  /**
-   * Construct a job.
-   * 
-   * @param jobConf mapred job configuration representing a job to be executed.
-   * @throws IOException
-   */
-  public Job(JobConf jobConf) throws IOException {
-    this(jobConf, null);
-  }
-	
-  @Override
-  public String toString() {
-    StringBuffer sb = new StringBuffer();
-    sb.append("job name:\t").append(this.jobName).append("\n");
-    sb.append("job id:\t").append(this.jobID).append("\n");
-    sb.append("job state:\t").append(this.state).append("\n");
-    sb.append("job mapred id:\t").append(this.mapredJobID==null ? "unassigned" 
-        : this.mapredJobID).append("\n");
-    sb.append("job message:\t").append(this.message).append("\n");
-		
-    if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
-      sb.append("job has no depending job:\t").append("\n");
-    } else {
-      sb.append("job has ").append(this.dependingJobs.size()).append(" dependeng jobs:\n");
-      for (int i = 0; i < this.dependingJobs.size(); i++) {
-        sb.append("\t depending job ").append(i).append(":\t");
-        sb.append((this.dependingJobs.get(i)).getJobName()).append("\n");
-      }
-    }
-    return sb.toString();
-  }
-	
-  /**
-   * @return the job name of this job
-   */
-  public String getJobName() {
-    return this.jobName;
-  }
-	
-  /**
-   * Set the job name for  this job.
-   * @param jobName the job name
-   */
-  public void setJobName(String jobName) {
-    this.jobName = jobName;
-  }
-	
-  /**
-   * @return the job ID of this job assigned by JobControl
-   */
-  public String getJobID() {
-    return this.jobID;
-  }
-	
-  /**
-   * Set the job ID for  this job.
-   * @param id the job ID
-   */
-  public void setJobID(String id) {
-    this.jobID = id;
-  }
-	
-  /**
-   * @return the mapred ID of this job
-   * @deprecated use {@link #getAssignedJobID()} instead
-   */
-  @Deprecated
-  public String getMapredJobID() {
-    return this.mapredJobID.toString();
+  public Job(JobConf jobConf, ArrayList<?> dependingJobs) throws IOException {
+    super(new org.apache.hadoop.mapreduce.Job(jobConf), 
+          (ArrayList<ControlledJob>) dependingJobs);
   }
-	
-  /**
-   * Set the mapred ID for this job.
-   * @param mapredJobID the mapred job ID for this job.
-   * @deprecated use {@link #setAssignedJobID(JobID)} instead
-   */
-  @Deprecated
-  public void setMapredJobID(String mapredJobID) {
-    this.mapredJobID = JobID.forName(mapredJobID);
+
+  public Job(JobConf conf) throws IOException {
+    super(conf);
   }
-	
+
   /**
    * @return the mapred ID of this job as assigned by the 
    * mapred framework.
    */
   public JobID getAssignedJobID() {
-    return this.mapredJobID;
+    return (JobID)super.getMapredJobID();
   }
-  
+
   /**
-   * Set the mapred ID for this job as assigned by the 
-   * mapred framework.
-   * @param mapredJobID the mapred job ID for this job.
+   * @deprecated setAssignedJobID should not be called.
+   * JOBID is set by the framework.
    */
   public void setAssignedJobID(JobID mapredJobID) {
-    this.mapredJobID = mapredJobID;
+    // do nothing
   }
-  
+
   /**
    * @return the mapred job conf of this job
    */
   public synchronized JobConf getJobConf() {
-    return this.theJobConf;
+    return new JobConf(super.getJob().getConfiguration());
   }
 
 
@@ -191,197 +86,55 @@ public class Job {
    * @param jobConf the mapred job conf for this job.
    */
   public synchronized void setJobConf(JobConf jobConf) {
-    this.theJobConf = jobConf;
+    try {
+      super.setJob(new org.apache.hadoop.mapreduce.Job(jobConf));
+    } catch (IOException ioe) { 
+      LOG.info("Exception" + ioe);
+    }
   }
 
   /**
    * @return the state of this job
    */
   public synchronized int getState() {
-    return this.state;
-  }
-	
-  /**
-   * Set the state for this job.
-   * @param state the new state for this job.
-   */
-  protected synchronized void setState(int state) {
-    this.state = state;
-  }
-	
-  /**
-   * @return the message of this job
-   */
-  public synchronized String getMessage() {
-    return this.message;
-  }
-
-  /**
-   * Set the message for this job.
-   * @param message the message for this job.
-   */
-  public synchronized void setMessage(String message) {
-    this.message = message;
-  }
-
-
-  /**
-   * @return the job client of this job
-   */
-  public JobClient getJobClient(){
-          return this.jc;
-  }
-
-  /**
-   * @return the depending jobs of this job
-   */
-  public ArrayList<Job> getDependingJobs() {
-    return this.dependingJobs;
-  }
-  
-  /**
-   * Add a job to this jobs' dependency list. Dependent jobs can only be added while a Job 
-   * is waiting to run, not during or afterwards.
-   * 
-   * @param dependingJob Job that this Job depends on.
-   * @return <tt>true</tt> if the Job was added.
-   */
-  public synchronized boolean addDependingJob(Job dependingJob) {
-    if (this.state == Job.WAITING) { //only allowed to add jobs when waiting
-      if (this.dependingJobs == null) {
-        this.dependingJobs = new ArrayList<Job>();
-      }
-      return this.dependingJobs.add(dependingJob);
-    } else {
-      return false;
-    }
-  }
-	
-  /**
-   * @return true if this job is in a complete state
-   */
-  public synchronized boolean isCompleted() {
-    return this.state == Job.FAILED || 
-      this.state == Job.DEPENDENT_FAILED ||
-      this.state == Job.SUCCESS;
-  }
-	
-  /**
-   * @return true if this job is in READY state
-   */
-  public synchronized boolean isReady() {
-    return this.state == Job.READY;
-  }
-	
-  /**
-   * Check the state of this running job. The state may 
-   * remain the same, become SUCCESS or FAILED.
-   */
-  private void checkRunningState() {
-    RunningJob running = null;
-    try {
-      running = jc.getJob(this.mapredJobID);
-      if (running.isComplete()) {
-        if (running.isSuccessful()) {
-          this.state = Job.SUCCESS;
-        } else {
-          this.state = Job.FAILED;
-          this.message = "Job failed!";
-          try {
-            running.killJob();
-          } catch (IOException e1) {
-
-          }
-          try {
-            this.jc.close();
-          } catch (IOException e2) {
-
-          }
-        }
-      }
-
-    } catch (IOException ioe) {
-      this.state = Job.FAILED;
-      this.message = StringUtils.stringifyException(ioe);
-      try {
-        if (running != null)
-          running.killJob();
-      } catch (IOException e1) {
-
-      }
-      try {
-        this.jc.close();
-      } catch (IOException e1) {
-
-      }
+    State state = super.getJobState();
+    if (state == State.SUCCESS) {
+      return SUCCESS;
+    } 
+    if (state == State.WAITING) {
+      return WAITING;
     }
-  }
-	
-  /**
-   * Check and update the state of this job. The state changes  
-   * depending on its current state and the states of the depending jobs.
-   */
-   synchronized int checkState() {
-    if (this.state == Job.RUNNING) {
-      checkRunningState();
+    if (state == State.RUNNING) {
+      return RUNNING;
     }
-    if (this.state != Job.WAITING) {
-      return this.state;
+    if (state == State.READY) {
+      return READY;
     }
-    if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
-      this.state = Job.READY;
-      return this.state;
+    if (state == State.FAILED ) {
+      return FAILED;
     }
-    Job pred = null;
-    int n = this.dependingJobs.size();
-    for (int i = 0; i < n; i++) {
-      pred = this.dependingJobs.get(i);
-      int s = pred.checkState();
-      if (s == Job.WAITING || s == Job.READY || s == Job.RUNNING) {
-        break; // a pred is still not completed, continue in WAITING
-        // state
-      }
-      if (s == Job.FAILED || s == Job.DEPENDENT_FAILED) {
-        this.state = Job.DEPENDENT_FAILED;
-        this.message = "depending job " + i + " with jobID "
-          + pred.getJobID() + " failed. " + pred.getMessage();
-        break;
-      }
-      // pred must be in success state
-      if (i == n - 1) {
-        this.state = Job.READY;
-      }
+    if (state == State.DEPENDENT_FAILED ) {
+      return DEPENDENT_FAILED;
     }
-
-    return this.state;
+    return -1;
   }
-	
+  
   /**
-   * Submit this job to mapred. The state becomes RUNNING if submission 
-   * is successful, FAILED otherwise.  
+   * @return the job client of this job
    */
-  protected synchronized void submit() {
+  public JobClient getJobClient() {
     try {
-      if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) {
-        FileSystem fs = FileSystem.get(theJobConf);
-        Path inputPaths[] = FileInputFormat.getInputPaths(theJobConf);
-        for (int i = 0; i < inputPaths.length; i++) {
-          if (!fs.exists(inputPaths[i])) {
-            try {
-              fs.mkdirs(inputPaths[i]);
-            } catch (IOException e) {
-
-            }
-          }
-        }
-      }
-      RunningJob running = jc.submitJob(theJobConf);
-      this.mapredJobID = running.getID();
-      this.state = Job.RUNNING;
+      return new JobClient(super.getJob().getConfiguration());
     } catch (IOException ioe) {
-      this.state = Job.FAILED;
-      this.message = StringUtils.stringifyException(ioe);
+      return null;
     }
   }
-	
+
+  /**
+   * @return the depending jobs of this job
+   */
+  public ArrayList<Job> getDependingJobs() {
+    return JobControl.castToJobList(super.getDependentJobs());
+  }
+
 }

+ 37 - 219
src/mapred/org/apache/hadoop/mapred/jobcontrol/JobControl.java

@@ -20,279 +20,97 @@ package org.apache.hadoop.mapred.jobcontrol;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Hashtable;
-import java.util.Map;
 
-/** This class encapsulates a set of MapReduce jobs and its dependency. It tracks 
- *  the states of the jobs by placing them into different tables according to their 
- *  states. 
- *  
- *  This class provides APIs for the client app to add a job to the group and to get 
- *  the jobs in the group in different states. When a 
- *  job is added, an ID unique to the group is assigned to the job. 
- *  
- *  This class has a thread that submits jobs when they become ready, monitors the
- *  states of the running jobs, and updates the states of jobs based on the state changes 
- *  of their depending jobs states. The class provides APIs for suspending/resuming
- *  the thread,and for stopping the thread.
- *  
- */
-public class JobControl implements Runnable{
+import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
+
+/**
+ *@deprecated Use 
+ *{@link org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl} instead
+ **/
+@Deprecated
+public class JobControl extends 
+    org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl {
 
-  // The thread can be in one of the following state
-  private static final int RUNNING = 0;
-  private static final int SUSPENDED = 1;
-  private static final int STOPPED = 2;
-  private static final int STOPPING = 3;
-  private static final int READY = 4;
-	
-  private int runnerState;			// the thread state
-	
-  private Map<String, Job> waitingJobs;
-  private Map<String, Job> readyJobs;
-  private Map<String, Job> runningJobs;
-  private Map<String, Job> successfulJobs;
-  private Map<String, Job> failedJobs;
-	
-  private long nextJobID;
-  private String groupName;
-	
   /** 
    * Construct a job control for a group of jobs.
    * @param groupName a name identifying this group
    */
   public JobControl(String groupName) {
-    this.waitingJobs = new Hashtable<String, Job>();
-    this.readyJobs = new Hashtable<String, Job>();
-    this.runningJobs = new Hashtable<String, Job>();
-    this.successfulJobs = new Hashtable<String, Job>();
-    this.failedJobs = new Hashtable<String, Job>();
-    this.nextJobID = -1;
-    this.groupName = groupName;
-    this.runnerState = JobControl.READY;
+    super(groupName);
   }
-	
-  private static ArrayList<Job> toArrayList(Map<String, Job> jobs) {
-    ArrayList<Job> retv = new ArrayList<Job>();
-    synchronized (jobs) {
-      for (Job job : jobs.values()) {
-        retv.add(job);
-      }
+  
+  static ArrayList<Job> castToJobList(ArrayList<ControlledJob> cjobs) {
+    ArrayList<Job> ret = new ArrayList<Job>();
+    for (ControlledJob job : cjobs) {
+      ret.add((Job)job);
     }
-    return retv;
+    return ret;
   }
-	
+  
   /**
    * @return the jobs in the waiting state
    */
   public ArrayList<Job> getWaitingJobs() {
-    return JobControl.toArrayList(this.waitingJobs);
+    return castToJobList(super.getWaitingJobList());
   }
 	
   /**
    * @return the jobs in the running state
    */
   public ArrayList<Job> getRunningJobs() {
-    return JobControl.toArrayList(this.runningJobs);
+    return castToJobList(super.getRunningJobList());
   }
 	
   /**
    * @return the jobs in the ready state
    */
   public ArrayList<Job> getReadyJobs() {
-    return JobControl.toArrayList(this.readyJobs);
+    return castToJobList(super.getReadyJobsList());
   }
 	
   /**
    * @return the jobs in the success state
    */
   public ArrayList<Job> getSuccessfulJobs() {
-    return JobControl.toArrayList(this.successfulJobs);
+    return castToJobList(super.getSuccessfulJobList());
   }
 	
   public ArrayList<Job> getFailedJobs() {
-    return JobControl.toArrayList(this.failedJobs);
-  }
-	
-  private String getNextJobID() {
-    nextJobID += 1;
-    return this.groupName + this.nextJobID;
-  }
-	
-  private static void addToQueue(Job aJob, Map<String, Job> queue) {
-    synchronized(queue) {
-      queue.put(aJob.getJobID(), aJob);
-    }		
-  }
-	
-  private void addToQueue(Job aJob) {
-    Map<String, Job> queue = getQueue(aJob.getState());
-    addToQueue(aJob, queue);	
-  }
-	
-  private Map<String, Job> getQueue(int state) {
-    Map<String, Job> retv = null;
-    if (state == Job.WAITING) {
-      retv = this.waitingJobs;
-    } else if (state == Job.READY) {
-      retv = this.readyJobs;
-    } else if (state == Job.RUNNING) {
-      retv = this.runningJobs;
-    } else if (state == Job.SUCCESS) {
-      retv = this.successfulJobs;
-    } else if (state == Job.FAILED || state == Job.DEPENDENT_FAILED) {
-      retv = this.failedJobs;
-    } 
-    return retv;
+    return castToJobList(super.getFailedJobList());
   }
 
-  /**
-   * Add a new job.
-   * @param aJob the new job
-   */
-  synchronized public String addJob(Job aJob) {
-    String id = this.getNextJobID();
-    aJob.setJobID(id);
-    aJob.setState(Job.WAITING);
-    this.addToQueue(aJob);
-    return id;	
-  }
-	
   /**
    * Add a collection of jobs
    * 
    * @param jobs
    */
-  public void addJobs(Collection<Job> jobs) {
+  public void addJobs(Collection <Job> jobs) {
     for (Job job : jobs) {
       addJob(job);
     }
   }
-	
+
   /**
    * @return the thread state
    */
   public int getState() {
-    return this.runnerState;
-  }
-	
-  /**
-   * set the thread state to STOPPING so that the 
-   * thread will stop when it wakes up.
-   */
-  public void stop() {
-    this.runnerState = JobControl.STOPPING;
-  }
-	
-  /**
-   * suspend the running thread
-   */
-  public void suspend () {
-    if (this.runnerState == JobControl.RUNNING) {
-      this.runnerState = JobControl.SUSPENDED;
+    ThreadState state = super.getThreadState();
+    if (state == ThreadState.RUNNING) {
+      return 0;
+    } 
+    if (state == ThreadState.SUSPENDED) {
+      return 1;
     }
-  }
-	
-  /**
-   * resume the suspended thread
-   */
-  public void resume () {
-    if (this.runnerState == JobControl.SUSPENDED) {
-      this.runnerState = JobControl.RUNNING;
+    if (state == ThreadState.STOPPED) {
+      return 2;
     }
-  }
-	
-  synchronized private void checkRunningJobs() {
-		
-    Map<String, Job> oldJobs = null;
-    oldJobs = this.runningJobs;
-    this.runningJobs = new Hashtable<String, Job>();
-		
-    for (Job nextJob : oldJobs.values()) {
-      int state = nextJob.checkState();
-      /*
-        if (state != Job.RUNNING) {
-        System.out.println("The state of the running job " +
-        nextJob.getJobName() + " has changed to: " + nextJob.getState());
-        }
-      */
-      this.addToQueue(nextJob);
-    }
-  }
-	
-  synchronized private void checkWaitingJobs() {
-    Map<String, Job> oldJobs = null;
-    oldJobs = this.waitingJobs;
-    this.waitingJobs = new Hashtable<String, Job>();
-		
-    for (Job nextJob : oldJobs.values()) {
-      int state = nextJob.checkState();
-      /*
-        if (state != Job.WAITING) {
-        System.out.println("The state of the waiting job " +
-        nextJob.getJobName() + " has changed to: " + nextJob.getState());
-        }
-      */
-      this.addToQueue(nextJob);
+    if (state == ThreadState.STOPPING) {
+      return 3;
     }
-  }
-	
-  synchronized private void startReadyJobs() {
-    Map<String, Job> oldJobs = null;
-    oldJobs = this.readyJobs;
-    this.readyJobs = new Hashtable<String, Job>();
-		
-    for (Job nextJob : oldJobs.values()) {
-      //System.out.println("Job to submit to Hadoop: " + nextJob.getJobName());
-      nextJob.submit();
-      //System.out.println("Hadoop ID: " + nextJob.getMapredJobID());
-      this.addToQueue(nextJob);
-    }	
-  }
-	
-  synchronized public boolean allFinished() {
-    return this.waitingJobs.size() == 0 &&
-      this.readyJobs.size() == 0 &&
-      this.runningJobs.size() == 0;
-  }
-	
-  /**
-   *  The main loop for the thread.
-   *  The loop does the following:
-   *  	Check the states of the running jobs
-   *  	Update the states of waiting jobs
-   *  	Submit the jobs in ready state
-   */
-  public void run() {
-    this.runnerState = JobControl.RUNNING;
-    while (true) {
-      while (this.runnerState == JobControl.SUSPENDED) {
-        try {
-          Thread.sleep(5000);
-        }
-        catch (Exception e) {
-					
-        }
-      }
-      checkRunningJobs();	
-      checkWaitingJobs();		
-      startReadyJobs();		
-      if (this.runnerState != JobControl.RUNNING && 
-          this.runnerState != JobControl.SUSPENDED) {
-        break;
-      }
-      try {
-        Thread.sleep(5000);
-      }
-      catch (Exception e) {
-				
-      }
-      if (this.runnerState != JobControl.RUNNING && 
-          this.runnerState != JobControl.SUSPENDED) {
-        break;
-      }
+    if (state == ThreadState.READY ) {
+      return 4;
     }
-    this.runnerState = JobControl.STOPPED;
+    return -1;
   }
 
 }

+ 326 - 0
src/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java

@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.jobcontrol;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+
+/** 
+ *  This class encapsulates a MapReduce job and its dependency. It monitors 
+ *  the states of the depending jobs and updates the state of this job.
+ *  A job starts in the WAITING state. If it does not have any depending jobs,
+ *  or all of the depending jobs are in SUCCESS state, then the job state 
+ *  will become READY. If any depending jobs fail, the job will fail too. 
+ *  When in READY state, the job can be submitted to Hadoop for execution, with
+ *  the state changing into RUNNING state. From RUNNING state, the job 
+ *  can get into SUCCESS or FAILED state, depending 
+ *  the status of the job execution.
+ */
+
+public class ControlledJob {
+
+  // A job will be in one of the following states
+  public static enum State {SUCCESS, WAITING, RUNNING, READY, FAILED,
+                            DEPENDENT_FAILED}; 
+	
+  private State state;
+  private String controlID;     // assigned and used by JobControl class
+  private Job job;               // mapreduce job to be executed.
+  // some info for human consumption, e.g. the reason why the job failed
+  private String message;
+  // the jobs the current job depends on
+  private ArrayList<ControlledJob> dependingJobs;
+	
+  /** 
+   * Construct a job.
+   * @param job a mapreduce job to be executed.
+   * @param dependingJobs an array of jobs the current job depends on
+   */
+  public ControlledJob(Job job, ArrayList<ControlledJob> dependingJobs) 
+      throws IOException {
+    this.job = job;
+    this.dependingJobs = dependingJobs;
+    this.state = State.WAITING;
+    this.controlID = "unassigned";
+    this.message = "just initialized";
+  }
+  
+  /**
+   * Construct a job.
+   * 
+   * @param conf mapred job configuration representing a job to be executed.
+   * @throws IOException
+   */
+  public ControlledJob(Configuration conf) throws IOException {
+    this(new Job(conf), null);
+  }
+	
+  @Override
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    sb.append("job name:\t").append(this.job.getJobName()).append("\n");
+    sb.append("job id:\t").append(this.controlID).append("\n");
+    sb.append("job state:\t").append(this.state).append("\n");
+    sb.append("job mapred id:\t").append(this.job.getJobID()).append("\n");
+    sb.append("job message:\t").append(this.message).append("\n");
+		
+    if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
+      sb.append("job has no depending job:\t").append("\n");
+    } else {
+      sb.append("job has ").append(this.dependingJobs.size()).
+         append(" dependeng jobs:\n");
+      for (int i = 0; i < this.dependingJobs.size(); i++) {
+        sb.append("\t depending job ").append(i).append(":\t");
+        sb.append((this.dependingJobs.get(i)).getJobName()).append("\n");
+      }
+    }
+    return sb.toString();
+  }
+	
+  /**
+   * @return the job name of this job
+   */
+  public String getJobName() {
+    return job.getJobName();
+  }
+	
+  /**
+   * Set the job name for  this job.
+   * @param jobName the job name
+   */
+  public void setJobName(String jobName) {
+    job.setJobName(jobName);
+  }
+	
+  /**
+   * @return the job ID of this job assigned by JobControl
+   */
+  public String getJobID() {
+    return this.controlID;
+  }
+	
+  /**
+   * Set the job ID for  this job.
+   * @param id the job ID
+   */
+  public void setJobID(String id) {
+    this.controlID = id;
+  }
+	
+  /**
+   * @return the mapred ID of this job as assigned by the 
+   * mapred framework.
+   */
+  public JobID getMapredJobID() {
+    return this.job.getJobID();
+  }
+  
+  /**
+   * @return the mapreduce job 
+   */
+  public synchronized Job getJob() {
+    return this.job;
+  }
+
+  /**
+   * Set the mapreduce job
+   * @param job the mapreduce job for this job.
+   */
+  public synchronized void setJob(Job job) {
+    this.job = job;
+  }
+
+  /**
+   * @return the state of this job
+   */
+  public synchronized State getJobState() {
+    return this.state;
+  }
+	
+  /**
+   * Set the state for this job.
+   * @param state the new state for this job.
+   */
+  protected synchronized void setJobState(State state) {
+    this.state = state;
+  }
+	
+  /**
+   * @return the message of this job
+   */
+  public synchronized String getMessage() {
+    return this.message;
+  }
+
+  /**
+   * Set the message for this job.
+   * @param message the message for this job.
+   */
+  public synchronized void setMessage(String message) {
+    this.message = message;
+  }
+
+  /**
+   * @return the depending jobs of this job
+   */
+  public ArrayList<ControlledJob> getDependentJobs() {
+    return this.dependingJobs;
+  }
+  
+  /**
+   * Add a job to this jobs' dependency list. 
+   * Dependent jobs can only be added while a Job 
+   * is waiting to run, not during or afterwards.
+   * 
+   * @param dependingJob Job that this Job depends on.
+   * @return <tt>true</tt> if the Job was added.
+   */
+  public synchronized boolean addDependingJob(ControlledJob dependingJob) {
+    if (this.state == State.WAITING) { //only allowed to add jobs when waiting
+      if (this.dependingJobs == null) {
+        this.dependingJobs = new ArrayList<ControlledJob>();
+      }
+      return this.dependingJobs.add(dependingJob);
+    } else {
+      return false;
+    }
+  }
+	
+  /**
+   * @return true if this job is in a complete state
+   */
+  public synchronized boolean isCompleted() {
+    return this.state == State.FAILED || 
+      this.state == State.DEPENDENT_FAILED ||
+      this.state == State.SUCCESS;
+  }
+	
+  /**
+   * @return true if this job is in READY state
+   */
+  public synchronized boolean isReady() {
+    return this.state == State.READY;
+  }
+
+  public void killJob() throws IOException {
+    job.killJob();
+  }
+  
+  /**
+   * Check the state of this running job. The state may 
+   * remain the same, become SUCCESS or FAILED.
+   */
+  private void checkRunningState() {
+    try {
+      if (job.isComplete()) {
+        if (job.isSuccessful()) {
+          this.state = State.SUCCESS;
+        } else {
+          this.state = State.FAILED;
+          this.message = "Job failed!";
+        }
+      }
+    } catch (IOException ioe) {
+      this.state = State.FAILED;
+      this.message = StringUtils.stringifyException(ioe);
+      try {
+        if (job != null) {
+          job.killJob();
+        }
+      } catch (IOException e) {}
+    }
+  }
+	
+  /**
+   * Check and update the state of this job. The state changes  
+   * depending on its current state and the states of the depending jobs.
+   */
+   synchronized State checkState() {
+    if (this.state == State.RUNNING) {
+      checkRunningState();
+    }
+    if (this.state != State.WAITING) {
+      return this.state;
+    }
+    if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
+      this.state = State.READY;
+      return this.state;
+    }
+    ControlledJob pred = null;
+    int n = this.dependingJobs.size();
+    for (int i = 0; i < n; i++) {
+      pred = this.dependingJobs.get(i);
+      State s = pred.checkState();
+      if (s == State.WAITING || s == State.READY || s == State.RUNNING) {
+        break; // a pred is still not completed, continue in WAITING
+        // state
+      }
+      if (s == State.FAILED || s == State.DEPENDENT_FAILED) {
+        this.state = State.DEPENDENT_FAILED;
+        this.message = "depending job " + i + " with jobID "
+          + pred.getJobID() + " failed. " + pred.getMessage();
+        break;
+      }
+      // pred must be in success state
+      if (i == n - 1) {
+        this.state = State.READY;
+      }
+    }
+
+    return this.state;
+  }
+	
+  /**
+   * Submit this job to mapred. The state becomes RUNNING if submission 
+   * is successful, FAILED otherwise.  
+   */
+  protected synchronized void submit() {
+    try {
+      Configuration conf = job.getConfiguration();
+      if (conf.getBoolean("create.empty.dir.if.nonexist", false)) {
+        FileSystem fs = FileSystem.get(conf);
+        Path inputPaths[] = FileInputFormat.getInputPaths(job);
+        for (int i = 0; i < inputPaths.length; i++) {
+          if (!fs.exists(inputPaths[i])) {
+            try {
+              fs.mkdirs(inputPaths[i]);
+            } catch (IOException e) {
+
+            }
+          }
+        }
+      }
+      job.submit();
+      this.state = State.RUNNING;
+    } catch (Exception ioe) {
+      this.state = State.FAILED;
+      this.message = StringUtils.stringifyException(ioe);
+    }
+  }
+	
+}

+ 288 - 0
src/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java

@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.jobcontrol;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Hashtable;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State;
+
+/** 
+ *  This class encapsulates a set of MapReduce jobs and its dependency.
+ *   
+ *  It tracks the states of the jobs by placing them into different tables
+ *  according to their states. 
+ *  
+ *  This class provides APIs for the client app to add a job to the group 
+ *  and to get the jobs in the group in different states. When a job is 
+ *  added, an ID unique to the group is assigned to the job. 
+ *  
+ *  This class has a thread that submits jobs when they become ready, 
+ *  monitors the states of the running jobs, and updates the states of jobs
+ *  based on the state changes of their depending jobs states. The class 
+ *  provides APIs for suspending/resuming the thread, and 
+ *  for stopping the thread.
+ *  
+ */
+public class JobControl implements Runnable {
+
+  // The thread can be in one of the following state
+  public static enum ThreadState {RUNNING, SUSPENDED,STOPPED, STOPPING, READY};
+	
+  private ThreadState runnerState;			// the thread state
+	
+  private Map<String, ControlledJob> waitingJobs;
+  private Map<String, ControlledJob> readyJobs;
+  private Map<String, ControlledJob> runningJobs;
+  private Map<String, ControlledJob> successfulJobs;
+  private Map<String, ControlledJob> failedJobs;
+	
+  private long nextJobID;
+  private String groupName;
+	
+  /** 
+   * Construct a job control for a group of jobs.
+   * @param groupName a name identifying this group
+   */
+  public JobControl(String groupName) {
+    this.waitingJobs = new Hashtable<String, ControlledJob>();
+    this.readyJobs = new Hashtable<String, ControlledJob>();
+    this.runningJobs = new Hashtable<String, ControlledJob>();
+    this.successfulJobs = new Hashtable<String, ControlledJob>();
+    this.failedJobs = new Hashtable<String, ControlledJob>();
+    this.nextJobID = -1;
+    this.groupName = groupName;
+    this.runnerState = ThreadState.READY;
+  }
+	
+  private static ArrayList<ControlledJob> toArrayList(
+                   Map<String, ControlledJob> jobs) {
+    ArrayList<ControlledJob> retv = new ArrayList<ControlledJob>();
+    synchronized (jobs) {
+      for (ControlledJob job : jobs.values()) {
+        retv.add(job);
+      }
+    }
+    return retv;
+  }
+	
+  /**
+   * @return the jobs in the waiting state
+   */
+  public ArrayList<ControlledJob> getWaitingJobList() {
+    return toArrayList(this.waitingJobs);
+  }
+	
+  /**
+   * @return the jobs in the running state
+   */
+  public ArrayList<ControlledJob> getRunningJobList() {
+    return toArrayList(this.runningJobs);
+  }
+	
+  /**
+   * @return the jobs in the ready state
+   */
+  public ArrayList<ControlledJob> getReadyJobsList() {
+    return toArrayList(this.readyJobs);
+  }
+	
+  /**
+   * @return the jobs in the success state
+   */
+  public ArrayList<ControlledJob> getSuccessfulJobList() {
+    return toArrayList(this.successfulJobs);
+  }
+	
+  public ArrayList<ControlledJob> getFailedJobList() {
+    return toArrayList(this.failedJobs);
+  }
+	
+  private String getNextJobID() {
+    nextJobID += 1;
+    return this.groupName + this.nextJobID;
+  }
+	
+  private static void addToQueue(ControlledJob aJob, 
+                                 Map<String, ControlledJob> queue) {
+    synchronized(queue) {
+      queue.put(aJob.getJobID(), aJob);
+    }		
+  }
+	
+  private void addToQueue(ControlledJob aJob) {
+    Map<String, ControlledJob> queue = getQueue(aJob.getJobState());
+    addToQueue(aJob, queue);	
+  }
+	
+  private Map<String, ControlledJob> getQueue(State state) {
+    Map<String, ControlledJob> retv = null;
+    if (state == State.WAITING) {
+      retv = this.waitingJobs;
+    } else if (state == State.READY) {
+      retv = this.readyJobs;
+    } else if (state == State.RUNNING) {
+      retv = this.runningJobs;
+    } else if (state == State.SUCCESS) {
+      retv = this.successfulJobs;
+    } else if (state == State.FAILED || state == State.DEPENDENT_FAILED) {
+      retv = this.failedJobs;
+    } 
+    return retv;
+  }
+
+  /**
+   * Add a new job.
+   * @param aJob the new job
+   */
+  synchronized public String addJob(ControlledJob aJob) {
+    String id = this.getNextJobID();
+    aJob.setJobID(id);
+    aJob.setJobState(State.WAITING);
+    this.addToQueue(aJob);
+    return id;	
+  }
+	
+  /**
+   * Add a collection of jobs
+   * 
+   * @param jobs
+   */
+  public void addJobCollection(Collection<ControlledJob> jobs) {
+    for (ControlledJob job : jobs) {
+      addJob(job);
+    }
+  }
+	
+  /**
+   * @return the thread state
+   */
+  public ThreadState getThreadState() {
+    return this.runnerState;
+  }
+	
+  /**
+   * set the thread state to STOPPING so that the 
+   * thread will stop when it wakes up.
+   */
+  public void stop() {
+    this.runnerState = ThreadState.STOPPING;
+  }
+	
+  /**
+   * suspend the running thread
+   */
+  public void suspend () {
+    if (this.runnerState == ThreadState.RUNNING) {
+      this.runnerState = ThreadState.SUSPENDED;
+    }
+  }
+	
+  /**
+   * resume the suspended thread
+   */
+  public void resume () {
+    if (this.runnerState == ThreadState.SUSPENDED) {
+      this.runnerState = ThreadState.RUNNING;
+    }
+  }
+	
+  synchronized private void checkRunningJobs() {
+		
+    Map<String, ControlledJob> oldJobs = null;
+    oldJobs = this.runningJobs;
+    this.runningJobs = new Hashtable<String, ControlledJob>();
+		
+    for (ControlledJob nextJob : oldJobs.values()) {
+      nextJob.checkState();
+      this.addToQueue(nextJob);
+    }
+  }
+	
+  synchronized private void checkWaitingJobs() {
+    Map<String, ControlledJob> oldJobs = null;
+    oldJobs = this.waitingJobs;
+    this.waitingJobs = new Hashtable<String, ControlledJob>();
+		
+    for (ControlledJob nextJob : oldJobs.values()) {
+      nextJob.checkState();
+      this.addToQueue(nextJob);
+    }
+  }
+	
+  synchronized private void startReadyJobs() {
+    Map<String, ControlledJob> oldJobs = null;
+    oldJobs = this.readyJobs;
+    this.readyJobs = new Hashtable<String, ControlledJob>();
+		
+    for (ControlledJob nextJob : oldJobs.values()) {
+      //Submitting Job to Hadoop
+      nextJob.submit();
+      this.addToQueue(nextJob);
+    }	
+  }
+	
+  synchronized public boolean allFinished() {
+    return this.waitingJobs.size() == 0 &&
+      this.readyJobs.size() == 0 &&
+      this.runningJobs.size() == 0;
+  }
+	
+  /**
+   *  The main loop for the thread.
+   *  The loop does the following:
+   *  	Check the states of the running jobs
+   *  	Update the states of waiting jobs
+   *  	Submit the jobs in ready state
+   */
+  public void run() {
+    this.runnerState = ThreadState.RUNNING;
+    while (true) {
+      while (this.runnerState == ThreadState.SUSPENDED) {
+        try {
+          Thread.sleep(5000);
+        }
+        catch (Exception e) {
+					
+        }
+      }
+      checkRunningJobs();	
+      checkWaitingJobs();		
+      startReadyJobs();		
+      if (this.runnerState != ThreadState.RUNNING && 
+          this.runnerState != ThreadState.SUSPENDED) {
+        break;
+      }
+      try {
+        Thread.sleep(5000);
+      }
+      catch (Exception e) {
+				
+      }
+      if (this.runnerState != ThreadState.RUNNING && 
+          this.runnerState != ThreadState.SUSPENDED) {
+        break;
+      }
+    }
+    this.runnerState = ThreadState.STOPPED;
+  }
+
+}

+ 25 - 0
src/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/package.html

@@ -0,0 +1,25 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+
+<p>Utilities for managing dependent jobs.</p>
+
+</body>
+</html>

+ 231 - 0
src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java

@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ * Utility methods used in various Job Control unit tests.
+ */
+public class MapReduceTestUtil {
+
+  static private Random rand = new Random();
+
+  private static NumberFormat idFormat = NumberFormat.getInstance();
+
+  static {
+    idFormat.setMinimumIntegerDigits(4);
+    idFormat.setGroupingUsed(false);
+  }
+
+  /**
+   * Cleans the data from the passed Path in the passed FileSystem.
+   * 
+   * @param fs FileSystem to delete data from.
+   * @param dirPath Path to be deleted.
+   * @throws IOException If an error occurs cleaning the data.
+   */
+  public static void cleanData(FileSystem fs, Path dirPath) 
+      throws IOException {
+    fs.delete(dirPath, true);
+  }
+
+  /**
+   * Generates a string of random digits.
+   * 
+   * @return A random string.
+   */
+  public static String generateRandomWord() {
+    return idFormat.format(rand.nextLong());
+  }
+
+  /**
+   * Generates a line of random text.
+   * 
+   * @return A line of random text.
+   */
+  public static String generateRandomLine() {
+    long r = rand.nextLong() % 7;
+    long n = r + 20;
+    StringBuffer sb = new StringBuffer();
+    for (int i = 0; i < n; i++) {
+      sb.append(generateRandomWord()).append(" ");
+    }
+    sb.append("\n");
+    return sb.toString();
+  }
+
+  /**
+   * Generates random data consisting of 10000 lines.
+   * 
+   * @param fs FileSystem to create data in.
+   * @param dirPath Path to create the data in.
+   * @throws IOException If an error occurs creating the data.
+   */
+  public static void generateData(FileSystem fs, Path dirPath) 
+      throws IOException {
+    FSDataOutputStream out = fs.create(new Path(dirPath, "data.txt"));
+    for (int i = 0; i < 10000; i++) {
+      String line = generateRandomLine();
+      out.write(line.getBytes("UTF-8"));
+    }
+    out.close();
+  }
+
+  /**
+   * Creates a simple copy job.
+   * 
+   * @param conf Configuration object
+   * @param outdir Output directory.
+   * @param indirs Comma separated input directories.
+   * @return Job initialized for a data copy job.
+   * @throws Exception If an error occurs creating job configuration.
+   */
+  public static Job createCopyJob(Configuration conf, Path outdir, 
+      Path... indirs) throws Exception {
+    conf.setInt("mapred.map.tasks", 3);
+    Job theJob = new Job(conf);
+    theJob.setJobName("DataMoveJob");
+
+    FileInputFormat.setInputPaths(theJob, indirs);
+    theJob.setMapperClass(DataCopyMapper.class);
+    FileOutputFormat.setOutputPath(theJob, outdir);
+    theJob.setOutputKeyClass(Text.class);
+    theJob.setOutputValueClass(Text.class);
+    theJob.setReducerClass(DataCopyReducer.class);
+    theJob.setNumReduceTasks(1);
+    return theJob;
+  }
+
+  /**
+   * Creates a simple fail job.
+   * 
+   * @param conf Configuration object
+   * @param outdir Output directory.
+   * @param indirs Comma separated input directories.
+   * @return Job initialized for a simple fail job.
+   * @throws Exception If an error occurs creating job configuration.
+   */
+  public static Job createFailJob(Configuration conf, Path outdir, 
+      Path... indirs) throws Exception {
+
+    conf.setInt("mapred.map.max.attempts", 2);
+    Job theJob = new Job(conf);
+    theJob.setJobName("Fail-Job");
+
+    FileInputFormat.setInputPaths(theJob, indirs);
+    theJob.setMapperClass(FailMapper.class);
+    theJob.setReducerClass(Reducer.class);
+    theJob.setNumReduceTasks(0);
+    FileOutputFormat.setOutputPath(theJob, outdir);
+    theJob.setOutputKeyClass(Text.class);
+    theJob.setOutputValueClass(Text.class);
+    return theJob;
+  }
+
+  /**
+   * Creates a simple fail job.
+   * 
+   * @param conf Configuration object
+   * @param outdir Output directory.
+   * @param indirs Comma separated input directories.
+   * @return Job initialized for a simple kill job.
+   * @throws Exception If an error occurs creating job configuration.
+   */
+  public static Job createKillJob(Configuration conf, Path outdir, 
+      Path... indirs) throws Exception {
+
+    Job theJob = new Job(conf);
+    theJob.setJobName("Kill-Job");
+
+    FileInputFormat.setInputPaths(theJob, indirs);
+    theJob.setMapperClass(KillMapper.class);
+    theJob.setReducerClass(Reducer.class);
+    theJob.setNumReduceTasks(0);
+    FileOutputFormat.setOutputPath(theJob, outdir);
+    theJob.setOutputKeyClass(Text.class);
+    theJob.setOutputValueClass(Text.class);
+    return theJob;
+  }
+
+  /**
+   * Simple Mapper and Reducer implementation which copies data it reads in.
+   */
+  public static class DataCopyMapper extends 
+      Mapper<LongWritable, Text, Text, Text> {
+    public void map(LongWritable key, Text value, Context context) 
+        throws IOException, InterruptedException {
+      context.write(new Text(key.toString()), value);
+    }
+  }
+
+  public static class DataCopyReducer extends Reducer<Text, Text, Text, Text> {
+    public void reduce(Text key, Iterator<Text> values, Context context)
+    throws IOException, InterruptedException {
+      Text dumbKey = new Text("");
+      while (values.hasNext()) {
+        Text data = (Text) values.next();
+        context.write(dumbKey, data);
+      }
+    }
+  }
+
+  // Mapper that fails
+  public static class FailMapper extends 
+    Mapper<WritableComparable<?>, Writable, WritableComparable<?>, Writable> {
+
+    public void map(WritableComparable<?> key, Writable value, Context context)
+        throws IOException {
+      throw new RuntimeException("failing map");
+    }
+  }
+
+  // Mapper that sleeps for a long time.
+  // Used for running a job that will be killed
+  public static class KillMapper extends 
+    Mapper<WritableComparable<?>, Writable, WritableComparable<?>, Writable> {
+
+    public void map(WritableComparable<?> key, Writable value, Context context)
+        throws IOException {
+      try {
+        Thread.sleep(1000000);
+      } catch (InterruptedException e) {
+        // Do nothing
+      }
+    }
+  }
+}

+ 189 - 0
src/test/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControl.java

@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.jobcontrol;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+
+/**
+ * This class performs unit test for Job/JobControl classes.
+ *  
+ */
+public class TestMapReduceJobControl extends HadoopTestCase {
+
+  static Path rootDataDir = new Path(
+    System.getProperty("test.build.data", "."), "TestData");
+  static Path indir = new Path(rootDataDir, "indir");
+  static Path outdir_1 = new Path(rootDataDir, "outdir_1");
+  static Path outdir_2 = new Path(rootDataDir, "outdir_2");
+  static Path outdir_3 = new Path(rootDataDir, "outdir_3");
+  static Path outdir_4 = new Path(rootDataDir, "outdir_4");
+  static ControlledJob cjob1 = null;
+  static ControlledJob cjob2 = null;
+  static ControlledJob cjob3 = null;
+  static ControlledJob cjob4 = null;
+
+  public TestMapReduceJobControl() throws IOException {
+    super(HadoopTestCase.LOCAL_MR , HadoopTestCase.LOCAL_FS, 2, 2);
+  }
+  
+  private void cleanupData(Configuration conf) throws Exception {
+    FileSystem fs = FileSystem.get(conf);
+    MapReduceTestUtil.cleanData(fs, indir);
+    MapReduceTestUtil.generateData(fs, indir);
+
+    MapReduceTestUtil.cleanData(fs, outdir_1);
+    MapReduceTestUtil.cleanData(fs, outdir_2);
+    MapReduceTestUtil.cleanData(fs, outdir_3);
+    MapReduceTestUtil.cleanData(fs, outdir_4);
+  }
+  
+  /**
+   * This is a main function for testing JobControl class.
+   * It requires 4 jobs: 
+   *      Job 1: passed as parameter. input:indir  output:outdir_1
+   *      Job 2: copy data from indir to outdir_2
+   *      Job 3: copy data from outdir_1 and outdir_2 to outdir_3
+   *      Job 4: copy data from outdir to outdir_4
+   * The jobs 1 and 2 have no dependency. The job 3 depends on jobs 1 and 2.
+   * The job 4 depends on job 3.
+   * 
+   * Then it creates a JobControl object and add the 4 jobs to 
+   * the JobControl object.
+   * Finally, it creates a thread to run the JobControl object
+   */
+  private JobControl createDependencies(Configuration conf, Job job1) 
+      throws Exception {
+    ArrayList<ControlledJob> dependingJobs = null;
+    cjob1 = new ControlledJob(job1, dependingJobs);
+    Job job2 = MapReduceTestUtil.createCopyJob(conf, outdir_2, indir);
+    cjob2 = new ControlledJob(job2, dependingJobs);
+
+    Job job3 = MapReduceTestUtil.createCopyJob(conf, outdir_3, 
+	                                   outdir_1, outdir_2);
+    dependingJobs = new ArrayList<ControlledJob>();
+    dependingJobs.add(cjob1);
+    dependingJobs.add(cjob2);
+    cjob3 = new ControlledJob(job3, dependingJobs);
+
+    Job job4 = MapReduceTestUtil.createCopyJob(conf, outdir_4, outdir_3);
+    dependingJobs = new ArrayList<ControlledJob>();
+    dependingJobs.add(cjob3);
+    cjob4 = new ControlledJob(job4, dependingJobs);
+
+    JobControl theControl = new JobControl("Test");
+    theControl.addJob(cjob1);
+    theControl.addJob(cjob2);
+    theControl.addJob(cjob3);
+    theControl.addJob(cjob4);
+    Thread theController = new Thread(theControl);
+    theController.start();
+    return theControl;
+  }
+  
+  private void waitTillAllFinished(JobControl theControl) {
+    while (!theControl.allFinished()) {
+      try {
+        Thread.sleep(100);
+      } catch (Exception e) {}
+    }
+  }
+  
+  public void testJobControlWithFailJob() throws Exception {
+    Configuration conf = createJobConf();
+
+    cleanupData(conf);
+    
+    // create a Fail job
+    Job job1 = MapReduceTestUtil.createFailJob(conf, outdir_1, indir);
+    
+    // create job dependencies
+    JobControl theControl = createDependencies(conf, job1);
+    
+    // wait till all the jobs complete
+    waitTillAllFinished(theControl);
+    
+    assertTrue(cjob1.getJobState() == ControlledJob.State.FAILED);
+    assertTrue(cjob2.getJobState() == ControlledJob.State.SUCCESS);
+    assertTrue(cjob3.getJobState() == ControlledJob.State.DEPENDENT_FAILED);
+    assertTrue(cjob4.getJobState() == ControlledJob.State.DEPENDENT_FAILED);
+
+    theControl.stop();
+  }
+
+  public void testJobControlWithKillJob() throws Exception {
+    Configuration conf = createJobConf();
+    cleanupData(conf);
+    Job job1 = MapReduceTestUtil.createKillJob(conf, outdir_1, indir);
+    JobControl theControl = createDependencies(conf, job1);
+
+    while (cjob1.getJobState() != ControlledJob.State.RUNNING) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        break;
+      }
+    }
+    // verify adding dependingJo to RUNNING job fails.
+    assertFalse(cjob1.addDependingJob(cjob2));
+
+    // suspend jobcontrol and resume it again
+    theControl.suspend();
+    assertTrue(
+      theControl.getThreadState() == JobControl.ThreadState.SUSPENDED);
+    theControl.resume();
+    
+    // kill the first job.
+    cjob1.killJob();
+
+    // wait till all the jobs complete
+    waitTillAllFinished(theControl);
+    
+    assertTrue(cjob1.getJobState() == ControlledJob.State.FAILED);
+    assertTrue(cjob2.getJobState() == ControlledJob.State.SUCCESS);
+    assertTrue(cjob3.getJobState() == ControlledJob.State.DEPENDENT_FAILED);
+    assertTrue(cjob4.getJobState() == ControlledJob.State.DEPENDENT_FAILED);
+
+    theControl.stop();
+  }
+
+  public void testJobControl() throws Exception {
+    Configuration conf = createJobConf();
+
+    cleanupData(conf);
+    
+    Job job1 = MapReduceTestUtil.createCopyJob(conf, outdir_1, indir);
+    
+    JobControl theControl = createDependencies(conf, job1);
+    
+    // wait till all the jobs complete
+    waitTillAllFinished(theControl);
+    
+    assertEquals("Some jobs failed", 0, theControl.getFailedJobList().size());
+    
+    theControl.stop();
+  }
+}