Selaa lähdekoodia

MAPREDUCE-5233. Add methods that are changed or removed from JobControl.Job when compared to 1.x. This breaks 0.23.x users of one API in Job. Contributed by Mayank Bansal.
svn merge --ignore-ancestry -c 1485491 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1485494 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli 12 vuotta sitten
vanhempi
commit
28822c2f64

+ 4 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -15,6 +15,10 @@ Release 2.0.5-beta - UNRELEASED
     1.x examples jar on top of YARN. This change breaks 0.23.x direct usages of
     ProgramDriver. (Zhijie Shen via vinodkv)
 
+    MAPREDUCE-5233. Add methods that are changed or removed from JobControl.Job
+    when compared to 1.x. This breaks 0.23.x users of one API in Job. (Mayank
+    Bansal via vinodkv)
+
   NEW FEATURES
 
   IMPROVEMENTS

+ 48 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/jobcontrol/Job.java

@@ -60,12 +60,11 @@ public class Job extends ControlledJob {
   }
 
   /**
-   * @return the mapred ID of this job as assigned by the 
-   * mapred framework.
+   * @return the mapred ID of this job as assigned by the mapred framework.
    */
   public JobID getAssignedJobID() {
-    org.apache.hadoop.mapreduce.JobID temp = super.getMapredJobID();
-    if(temp == null) {
+    org.apache.hadoop.mapreduce.JobID temp = super.getMapredJobId();
+    if (temp == null) {
       return null;
     }
     return JobID.downgrade(temp);
@@ -126,6 +125,30 @@ public class Job extends ControlledJob {
     return -1;
   }
   
+  /**
+   * This is a no-op function, Its a behavior change from 1.x We no more can
+   * change the state from job
+   * 
+   * @param state
+   *          the new state for this job.
+   */
+  @Deprecated
+  protected synchronized void setState(int state) {
+    // No-Op, we dont want to change the sate
+  }
+  
+  /**
+   * Add a job to this jobs' dependency list. 
+   * Dependent jobs can only be added while a Job 
+   * is waiting to run, not during or afterwards.
+   * 
+   * @param dependingJob Job that this Job depends on.
+   * @return <tt>true</tt> if the Job was added.
+   */
+  public synchronized boolean addDependingJob(Job dependingJob) {
+    return super.addDependingJob(dependingJob);
+  }
+  
   /**
    * @return the job client of this job
    */
@@ -144,4 +167,25 @@ public class Job extends ControlledJob {
     return JobControl.castToJobList(super.getDependentJobs());
   }
 
+  /**
+   * @return the mapred ID of this job as assigned by the mapred framework.
+   */
+  public synchronized String getMapredJobID() {
+    if (super.getMapredJobId() != null) {
+      return super.getMapredJobId().toString();
+    }
+    return null;
+  }
+
+  /**
+   * This is no-op method for backward compatibility. It's a behavior change
+   * from 1.x, we can not change job ids from job.
+   * 
+   * @param mapredJobID
+   *          the mapred job ID for this job.
+   */
+  @Deprecated
+  public synchronized void setMapredJobID(String mapredJobID) {
+    setAssignedJobID(JobID.forName(mapredJobID));
+  }
 }

+ 3 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java

@@ -138,12 +138,11 @@ public class ControlledJob {
   public void setJobID(String id) {
     this.controlID = id;
   }
-	
+
   /**
-   * @return the mapred ID of this job as assigned by the 
-   * mapred framework.
+   * @return the mapred ID of this job as assigned by the mapred framework.
    */
-  public JobID getMapredJobID() {
+  public synchronized JobID getMapredJobId() {
     return this.job.getJobID();
   }
   

+ 61 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/jobcontrol/TestJobControl.java

@@ -22,11 +22,14 @@ import static org.mockito.Mockito.*;
 
 import java.util.ArrayList;
 
+import junit.framework.Assert;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
+import org.junit.Test;
 
 /**
  * This class performs unit test for Job/JobControl classes.
@@ -191,10 +194,68 @@ public class TestJobControl extends junit.framework.TestCase {
     theControl.stop();
   }
 
+  @SuppressWarnings("deprecation")
+  @Test(timeout = 30000)
+  public void testJobState() throws Exception {
+    Job job_1 = getCopyJob();
+    JobControl jc = new JobControl("Test");
+    jc.addJob(job_1);
+    Assert.assertEquals(Job.WAITING, job_1.getState());
+    job_1.setState(Job.SUCCESS);
+    Assert.assertEquals(Job.WAITING, job_1.getState());
+
+    org.apache.hadoop.mapreduce.Job mockjob =
+        mock(org.apache.hadoop.mapreduce.Job.class);
+    org.apache.hadoop.mapreduce.JobID jid =
+        new org.apache.hadoop.mapreduce.JobID("test", 0);
+    when(mockjob.getJobID()).thenReturn(jid);
+    job_1.setJob(mockjob);
+    Assert.assertEquals("job_test_0000", job_1.getMapredJobID());
+    job_1.setMapredJobID("job_test_0001");
+    Assert.assertEquals("job_test_0000", job_1.getMapredJobID());
+    jc.stop();
+  }
+
+  @Test(timeout = 30000)
+  public void testAddingDependingJob() throws Exception {
+    Job job_1 = getCopyJob();
+    ArrayList<Job> dependingJobs = new ArrayList<Job>();
+    JobControl jc = new JobControl("Test");
+    jc.addJob(job_1);
+    Assert.assertEquals(Job.WAITING, job_1.getState());
+    Assert.assertTrue(job_1.addDependingJob(new Job(job_1.getJobConf(),
+      dependingJobs)));
+  }
+
+  public Job getCopyJob() throws Exception {
+    Configuration defaults = new Configuration();
+    FileSystem fs = FileSystem.get(defaults);
+    Path rootDataDir =
+        new Path(System.getProperty("test.build.data", "."),
+          "TestJobControlData");
+    Path indir = new Path(rootDataDir, "indir");
+    Path outdir_1 = new Path(rootDataDir, "outdir_1");
+
+    JobControlTestUtils.cleanData(fs, indir);
+    JobControlTestUtils.generateData(fs, indir);
+
+    JobControlTestUtils.cleanData(fs, outdir_1);
+
+    ArrayList<Job> dependingJobs = null;
+
+    ArrayList<Path> inPaths_1 = new ArrayList<Path>();
+    inPaths_1.add(indir);
+    JobConf jobConf_1 = JobControlTestUtils.createCopyJob(inPaths_1, outdir_1);
+    Job job_1 = new Job(jobConf_1, dependingJobs);
+    return job_1;
+  }
+  
+  @Test (timeout = 30000)
   public void testJobControl() throws Exception {
     doJobControlTest();
   }
   
+  @Test (timeout = 30000)
   public void testGetAssignedJobId() throws Exception {
     JobConf jc = new JobConf();
     Job j = new Job(jc);

+ 19 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControl.java

@@ -22,12 +22,15 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import junit.framework.Assert;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.HadoopTestCase;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.junit.Test;
 
 /**
  * This class performs unit test for Job/JobControl classes.
@@ -187,4 +190,20 @@ public class TestMapReduceJobControl extends HadoopTestCase {
     
     theControl.stop();
   }
+  
+  @Test(timeout = 30000)
+  public void testControlledJob() throws Exception {
+    Configuration conf = createJobConf();
+    cleanupData(conf);
+    Job job1 = MapReduceTestUtil.createCopyJob(conf, outdir_1, indir);
+    createDependencies(conf, job1);
+    while (cjob1.getJobState() != ControlledJob.State.RUNNING) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        break;
+      }
+    }
+    Assert.assertNotNull(cjob1.getMapredJobId());
+  }
 }