瀏覽代碼

HADOOP-2086. This patch adds the ability to add dependencies to a job (run via JobControl) after construction. Contributed by Adrian Woodhead.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@591389 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 18 年之前
父節點
當前提交
7a0b4a8550
共有 2 個文件被更改,包括 36 次插入5 次删除
  1. 3 0
      CHANGES.txt
  2. 33 5
      src/java/org/apache/hadoop/mapred/jobcontrol/Job.java

+ 3 - 0
CHANGES.txt

@@ -38,6 +38,9 @@ Trunk (unreleased changes)
     HADOOP-1912. Datanode has two new commands COPY and REPLACE. These are
     HADOOP-1912. Datanode has two new commands COPY and REPLACE. These are
     needed for supporting data rebalance.  (Hairong Kuang via dhruba)
     needed for supporting data rebalance.  (Hairong Kuang via dhruba)
 
 
+    HADOOP-2086. This patch adds the ability to add dependencies to a job
+    (run via JobControl) after construction.  (Adrian Woodhead via ddas)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     HADOOP-1898.  Release the lock protecting the last time of the last stack
     HADOOP-1898.  Release the lock protecting the last time of the last stack

+ 33 - 5
src/java/org/apache/hadoop/mapred/jobcontrol/Job.java

@@ -77,6 +77,16 @@ public class Job {
     this.message = "just initialized";
     this.message = "just initialized";
     this.jc = new JobClient(jobConf);
     this.jc = new JobClient(jobConf);
   }
   }
+  
+  /**
+   * Construct a job.
+   * 
+   * @param jobConf mapred job configuration representing a job to be executed.
+   * @throws IOException
+   */
+  public Job(JobConf jobConf) throws IOException {
+    this(jobConf, null);
+  }
 	
 	
   public String toString() {
   public String toString() {
     StringBuffer sb = new StringBuffer();
     StringBuffer sb = new StringBuffer();
@@ -86,7 +96,7 @@ public class Job {
     sb.append("job mapred id:\t").append(this.mapredJobID).append("\n");
     sb.append("job mapred id:\t").append(this.mapredJobID).append("\n");
     sb.append("job message:\t").append(this.message).append("\n");
     sb.append("job message:\t").append(this.message).append("\n");
 		
 		
-    if (this.dependingJobs == null) {
+    if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
       sb.append("job has no depending job:\t").append("\n");
       sb.append("job has no depending job:\t").append("\n");
     } else {
     } else {
       sb.append("job has ").append(this.dependingJobs.size()).append(" dependeng jobs:\n");
       sb.append("job has ").append(this.dependingJobs.size()).append(" dependeng jobs:\n");
@@ -162,7 +172,7 @@ public class Job {
   /**
   /**
    * @return the state of this job
    * @return the state of this job
    */
    */
-  public int getState() {
+  public synchronized int getState() {
     return this.state;
     return this.state;
   }
   }
 	
 	
@@ -170,7 +180,7 @@ public class Job {
    * Set the state for this job.
    * Set the state for this job.
    * @param state the new state for this job.
    * @param state the new state for this job.
    */
    */
-  public void setState(int state) {
+  protected synchronized void setState(int state) {
     this.state = state;
     this.state = state;
   }
   }
 	
 	
@@ -195,6 +205,24 @@ public class Job {
   public ArrayList getDependingJobs() {
   public ArrayList getDependingJobs() {
     return this.dependingJobs;
     return this.dependingJobs;
   }
   }
+  
+  /**
+   * Add a job to this jobs' dependency list. Dependent jobs can only be added while a Job 
+   * is waiting to run, not during or afterwards.
+   * 
+   * @param dependingJob Job that this Job depends on.
+   * @return <tt>true</tt> if the Job was added.
+   */
+  public synchronized boolean addDependingJob(Job dependingJob) {
+    if (this.state == Job.WAITING) { //only allowed to add jobs when waiting
+      if (this.dependingJobs == null) {
+        this.dependingJobs = new ArrayList();
+      }
+      return this.dependingJobs.add(dependingJob);
+    } else {
+      return false;
+    }
+  }
 	
 	
   /**
   /**
    * @return true if this job is in a complete state
    * @return true if this job is in a complete state
@@ -260,7 +288,7 @@ public class Job {
    * Check and update the state of this job. The state changes  
    * Check and update the state of this job. The state changes  
    * depending on its current state and the states of the depending jobs.
    * depending on its current state and the states of the depending jobs.
    */
    */
-  public int checkState() {
+   synchronized int checkState() {
     if (this.state == Job.RUNNING) {
     if (this.state == Job.RUNNING) {
       checkRunningState();
       checkRunningState();
     }
     }
@@ -299,7 +327,7 @@ public class Job {
    * Submit this job to mapred. The state becomes RUNNING if submission 
    * Submit this job to mapred. The state becomes RUNNING if submission 
    * is successful, FAILED otherwise.  
    * is successful, FAILED otherwise.  
    */
    */
-  public void submit() {
+  protected synchronized void submit() {
     try {
     try {
       if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) {
       if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) {
         FileSystem fs = FileSystem.get(theJobConf);
         FileSystem fs = FileSystem.get(theJobConf);