Browse Source

Revert MAPREDUCE-4488 and MAPREDUCE-4567.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1379467 13f79535-47bb-0310-9956-ffa450edef68
Thomas White 12 years ago
parent
commit
ba962d213a

+ 0 - 5
CHANGES.txt

@@ -27,9 +27,6 @@ Release 1.2.0 - unreleased
     module external to HDFS to specify how HDFS blocks should be placed.
     module external to HDFS to specify how HDFS blocks should be placed.
     (Sumadhur Reddy Bolli via szetszwo)
     (Sumadhur Reddy Bolli via szetszwo)
 
 
-    MAPREDUCE-4488. Port MAPREDUCE-463 (The job setup and cleanup tasks
-    should be optional) to branch-1. (tomwhite)
-
     HADOOP-7754. Expose file descriptors from Hadoop-wrapped local 
     HADOOP-7754. Expose file descriptors from Hadoop-wrapped local 
     FileSystems (todd and ahmed via tucu)
     FileSystems (todd and ahmed via tucu)
 
 
@@ -206,8 +203,6 @@ Release 1.2.0 - unreleased
     HADOOP-8611. Allow fall-back to the shell-based implementation when
     HADOOP-8611. Allow fall-back to the shell-based implementation when
     JNI-based users-group mapping fails (Robert Parker via bobby)
     JNI-based users-group mapping fails (Robert Parker via bobby)
 
 
-    MAPREDUCE-4567. Fix failing TestJobKillAndFail in branch-1. (tomwhite)
-
     MAPREDUCE-2374. "Text File Busy" errors launching MR tasks. (Andy Isaacson
     MAPREDUCE-2374. "Text File Busy" errors launching MR tasks. (Andy Isaacson
     via atm)
     via atm)
 
 

+ 0 - 8
src/mapred/mapred-default.xml

@@ -35,14 +35,6 @@
   </description>
   </description>
 </property>
 </property>
 
 
-<property>
-  <name>mapred.committer.job.setup.cleanup.needed</name>
-  <value>true</value>
-  <description> true, if job needs job-setup and job-cleanup.
-                false, otherwise  
-  </description>
-</property>
-
 <!-- i/o properties -->
 <!-- i/o properties -->
 
 
 <property>
 <property>

+ 27 - 66
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -131,7 +131,6 @@ public class JobInProgress {
   private volatile boolean launchedSetup = false;
   private volatile boolean launchedSetup = false;
   private volatile boolean jobKilled = false;
   private volatile boolean jobKilled = false;
   private volatile boolean jobFailed = false;
   private volatile boolean jobFailed = false;
-  private boolean jobSetupCleanupNeeded = true;
 
 
   JobPriority priority = JobPriority.NORMAL;
   JobPriority priority = JobPriority.NORMAL;
   final JobTracker jobtracker;
   final JobTracker jobtracker;
@@ -362,8 +361,6 @@ public class JobInProgress {
 
 
     this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
     this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
       (numMapTasks + numReduceTasks + 10);
       (numMapTasks + numReduceTasks + 10);
-    JobContext jobContext = new JobContext(conf, jobId);
-    this.jobSetupCleanupNeeded = jobContext.getJobSetupCleanupNeeded();
     try {
     try {
       this.userUGI = UserGroupInformation.getCurrentUser();
       this.userUGI = UserGroupInformation.getCurrentUser();
     } catch (IOException ie){
     } catch (IOException ie){
@@ -452,9 +449,6 @@ public class JobInProgress {
 
 
       this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
       this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
       (numMapTasks + numReduceTasks + 10);
       (numMapTasks + numReduceTasks + 10);
-      
-      JobContext jobContext = new JobContext(conf, jobId);
-      this.jobSetupCleanupNeeded = jobContext.getJobSetupCleanupNeeded();
 
 
       // Construct the jobACLs
       // Construct the jobACLs
       status.setJobACLs(jobtracker.getJobACLsManager().constructJobACLs(conf));
       status.setJobACLs(jobtracker.getJobACLsManager().constructJobACLs(conf));
@@ -763,35 +757,7 @@ public class JobInProgress {
     
     
     // ... use the same for estimating the total output of all maps
     // ... use the same for estimating the total output of all maps
     resourceEstimator.setThreshhold(completedMapsForReduceSlowstart);
     resourceEstimator.setThreshhold(completedMapsForReduceSlowstart);
-
-    initSetupCleanupTasks(jobFile);
-
-    synchronized(jobInitKillStatus){
-      jobInitKillStatus.initDone = true;
-
-      // set this before the throw to make sure cleanup works properly
-      tasksInited = true;
-
-      if(jobInitKillStatus.killed) {
-        throw new KillInterruptedException("Job " + jobId + " killed in init");
-      }
-    }
-    
-    JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, 
-                                 numMapTasks, numReduceTasks);
     
     
-    // if setup is not needed, mark it complete
-    if (!jobSetupCleanupNeeded) {
-      setupComplete();
-    }
-  }
-     
-  private void initSetupCleanupTasks(String jobFile) {
-    if (!jobSetupCleanupNeeded) {
-      // nothing to initialize
-      return;
-    }
-
     // create cleanup two cleanup tips, one map and one reduce.
     // create cleanup two cleanup tips, one map and one reduce.
     cleanup = new TaskInProgress[2];
     cleanup = new TaskInProgress[2];
 
 
@@ -821,23 +787,25 @@ public class JobInProgress {
                        numReduceTasks + 1, jobtracker, conf, this, 1);
                        numReduceTasks + 1, jobtracker, conf, this, 1);
     setup[1].setJobSetupTask();
     setup[1].setJobSetupTask();
     
     
+    synchronized(jobInitKillStatus){
+      jobInitKillStatus.initDone = true;
+
+      // set this before the throw to make sure cleanup works properly
+      tasksInited = true;
+
+      if(jobInitKillStatus.killed) {
+        throw new KillInterruptedException("Job " + jobId + " killed in init");
+      }
+    }
+    
+    JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, 
+                                 numMapTasks, numReduceTasks);
+    
    // Log the number of map and reduce tasks
    // Log the number of map and reduce tasks
    LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks
    LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks
             + " map tasks and " + numReduceTasks + " reduce tasks.");
             + " map tasks and " + numReduceTasks + " reduce tasks.");
   }
   }
 
 
-  private void setupComplete() {
-    status.setSetupProgress(1.0f);
-    if (maps.length == 0 && reduces.length == 0 && !jobSetupCleanupNeeded) {
-      jobComplete();
-      return;
-    }
-    if (this.status.getRunState() == JobStatus.PREP) {
-      changeStateTo(JobStatus.RUNNING);
-      JobHistory.JobInfo.logStarted(profile.getJobID());
-    }
-  }
-
   TaskSplitMetaInfo[] createSplits(org.apache.hadoop.mapreduce.JobID jobId)
   TaskSplitMetaInfo[] createSplits(org.apache.hadoop.mapreduce.JobID jobId)
   throws IOException {
   throws IOException {
     TaskSplitMetaInfo[] allTaskSplitMetaInfo =
     TaskSplitMetaInfo[] allTaskSplitMetaInfo =
@@ -2652,7 +2620,13 @@ public class JobInProgress {
     if (tip.isJobSetupTask()) {
     if (tip.isJobSetupTask()) {
       // setup task has finished. kill the extra setup tip
       // setup task has finished. kill the extra setup tip
       killSetupTip(!tip.isMapTask());
       killSetupTip(!tip.isMapTask());
-      setupComplete();
+      // Job can start running now.
+      this.status.setSetupProgress(1.0f);
+      // move the job to running state if the job is in prep state
+      if (this.status.getRunState() == JobStatus.PREP) {
+        changeStateTo(JobStatus.RUNNING);
+        JobHistory.JobInfo.logStarted(profile.getJobID());
+      }
     } else if (tip.isJobCleanupTask()) {
     } else if (tip.isJobCleanupTask()) {
       // cleanup task has finished. Kill the extra cleanup tip
       // cleanup task has finished. Kill the extra cleanup tip
       if (tip.isMapTask()) {
       if (tip.isMapTask()) {
@@ -2711,9 +2685,6 @@ public class JobInProgress {
         }
         }
       }
       }
     }
     }
-    if (!jobSetupCleanupNeeded && canLaunchJobCleanupTask()) {
-      jobComplete();
-    }
     return true;
     return true;
   }
   }
   
   
@@ -2774,8 +2745,7 @@ public class JobInProgress {
     //
     //
     // All tasks are complete, then the job is done!
     // All tasks are complete, then the job is done!
     //
     //
-    if (this.status.getRunState() == JobStatus.RUNNING ||
-        this.status.getRunState() == JobStatus.PREP) {
+    if (this.status.getRunState() == JobStatus.RUNNING ) {
       changeStateTo(JobStatus.SUCCEEDED);
       changeStateTo(JobStatus.SUCCEEDED);
       this.status.setCleanupProgress(1.0f);
       this.status.setCleanupProgress(1.0f);
       if (maps.length == 0) {
       if (maps.length == 0) {
@@ -2909,9 +2879,6 @@ public class JobInProgress {
       for (int i = 0; i < reduces.length; i++) {
       for (int i = 0; i < reduces.length; i++) {
         reduces[i].kill();
         reduces[i].kill();
       }
       }
-      if (!jobSetupCleanupNeeded) {
-        terminateJob(jobTerminationState);
-      }
     }
     }
   }
   }
 
 
@@ -3200,9 +3167,7 @@ public class JobInProgress {
   }
   }
 
 
   boolean isSetupFinished() {
   boolean isSetupFinished() {
-    // if there is no setup to be launched, consider setup is finished.  
-    if ((tasksInited && setup.length == 0) || 
-        setup[0].isComplete() || setup[0].isFailed() || setup[1].isComplete()
+    if (setup[0].isComplete() || setup[0].isFailed() || setup[1].isComplete()
         || setup[1].isFailed()) {
         || setup[1].isFailed()) {
       return true;
       return true;
     }
     }
@@ -3316,12 +3281,10 @@ public class JobInProgress {
    */
    */
   public synchronized TaskInProgress getTaskInProgress(TaskID tipid) {
   public synchronized TaskInProgress getTaskInProgress(TaskID tipid) {
     if (tipid.isMap()) {
     if (tipid.isMap()) {
-      // cleanup map tip
-      if (cleanup.length > 0 && tipid.equals(cleanup[0].getTIPId())) {
+      if (tipid.equals(cleanup[0].getTIPId())) { // cleanup map tip
         return cleanup[0]; 
         return cleanup[0]; 
       }
       }
-      // setup map tip
-      if (setup.length > 0 && tipid.equals(setup[0].getTIPId())) { 
+      if (tipid.equals(setup[0].getTIPId())) { //setup map tip
         return setup[0];
         return setup[0];
       }
       }
       for (int i = 0; i < maps.length; i++) {
       for (int i = 0; i < maps.length; i++) {
@@ -3330,12 +3293,10 @@ public class JobInProgress {
         }
         }
       }
       }
     } else {
     } else {
-      // cleanup reduce tip
-      if (cleanup.length > 0 && tipid.equals(cleanup[1].getTIPId())) { 
+      if (tipid.equals(cleanup[1].getTIPId())) { // cleanup reduce tip
         return cleanup[1]; 
         return cleanup[1]; 
       }
       }
-      // setup reduce tip
-      if (setup.length > 0 && tipid.equals(setup[1].getTIPId())) { 
+      if (tipid.equals(setup[1].getTIPId())) { //setup reduce tip
         return setup[1];
         return setup[1];
       }
       }
       for (int i = 0; i < reduces.length; i++) {
       for (int i = 0; i < reduces.length; i++) {

+ 0 - 12
src/mapred/org/apache/hadoop/mapreduce/Job.java

@@ -348,18 +348,6 @@ public class Job extends JobContext {
     conf.setReduceSpeculativeExecution(speculativeExecution);
     conf.setReduceSpeculativeExecution(speculativeExecution);
   }
   }
 
 
-  /**
-   * Specify whether job-setup and job-cleanup is needed for the job 
-   * 
-   * @param needed If <code>true</code>, job-setup and job-cleanup will be
-   *               considered from {@link OutputCommitter} 
-   *               else ignored.
-   */
-  public void setJobSetupCleanupNeeded(boolean needed) {
-    ensureState(JobState.DEFINE);
-    conf.setBoolean("mapred.committer.job.setup.cleanup.needed", needed);
-  }
-
   /**
   /**
    * Get the URL where some job progress information will be displayed.
    * Get the URL where some job progress information will be displayed.
    * 
    * 

+ 0 - 9
src/mapred/org/apache/hadoop/mapreduce/JobContext.java

@@ -275,13 +275,4 @@ public class JobContext {
   public RawComparator<?> getGroupingComparator() {
   public RawComparator<?> getGroupingComparator() {
     return conf.getOutputValueGroupingComparator();
     return conf.getOutputValueGroupingComparator();
   }
   }
-  
-  /**
-   * Get whether job-setup and job-cleanup is needed for the job 
-   * 
-   * @return boolean 
-   */
-  public boolean getJobSetupCleanupNeeded() {
-    return conf.getBoolean("mapred.committer.job.setup.cleanup.needed", true);
-  }
 }
 }

+ 0 - 105
src/test/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java

@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapreduce;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapred.FileAlreadyExistsException;
-import org.apache.hadoop.mapred.HadoopTestCase;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-
-public class TestNoJobSetupCleanup extends HadoopTestCase {
-  private static String TEST_ROOT_DIR =
-    new File(System.getProperty("test.build.data","/tmp"))
-    .toURI().toString().replace(' ', '+');
-  private final Path inDir = new Path(TEST_ROOT_DIR, "./wc/input");
-  private final Path outDir = new Path(TEST_ROOT_DIR, "./wc/output");
-
-  public TestNoJobSetupCleanup() throws IOException {
-    super(HadoopTestCase.CLUSTER_MR , HadoopTestCase.LOCAL_FS, 2, 2);
-  }
-
-  private Job submitAndValidateJob(Configuration conf, int numMaps, int numReds) 
-      throws IOException, InterruptedException, ClassNotFoundException {
-    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 
-                numMaps, numReds);
-    job.setJobSetupCleanupNeeded(false);
-    job.setOutputFormatClass(MyOutputFormat.class);
-    job.waitForCompletion(true);
-    assertTrue(job.isSuccessful());
-    JobID jobid = (org.apache.hadoop.mapred.JobID) job.getJobID();
-    JobClient jc = new JobClient(new JobConf(conf));
-    assertTrue(jc.getSetupTaskReports(jobid).length == 0);
-    assertTrue(jc.getCleanupTaskReports(jobid).length == 0);
-    assertTrue(jc.getMapTaskReports(jobid).length == numMaps);
-    assertTrue(jc.getReduceTaskReports(jobid).length == numReds);
-    FileSystem fs = FileSystem.get(conf);
-    assertTrue("Job output directory doesn't exit!", fs.exists(outDir));
-    FileStatus[] list = fs.listStatus(outDir, new OutputFilter());
-    int numPartFiles = numReds == 0 ? numMaps : numReds;
-    assertTrue("Number of part-files is " + list.length + " and not "
-        + numPartFiles, list.length == numPartFiles);
-    return job;
-  }
-  
-  public void testNoJobSetupCleanup() throws Exception {
-    try {
-      Configuration conf = createJobConf();
- 
-      // run a job without job-setup and cleanup
-      submitAndValidateJob(conf, 1, 1);
-
-      // run a map only job.
-      submitAndValidateJob(conf, 1, 0);
-
-      // run empty job without job setup and cleanup 
-      submitAndValidateJob(conf, 0, 0);
-
-      // run empty job without job setup and cleanup, with non-zero reduces 
-      submitAndValidateJob(conf, 0, 1);
-    } finally {
-      tearDown();
-    }
-  }
-  
-  static class MyOutputFormat extends TextOutputFormat {
-    public void checkOutputSpecs(JobContext job) 
-        throws FileAlreadyExistsException, IOException{
-      super.checkOutputSpecs(job);
-      // creating dummy TaskAttemptID
-      TaskAttemptID tid = new TaskAttemptID("jt", 1, false, 0, 0);
-      getOutputCommitter(new TaskAttemptContext(job.getConfiguration(), tid)).
-        setupJob(job);
-    }
-  }
-
-  private static class OutputFilter implements PathFilter {
-    public boolean accept(Path path) {
-      return !(path.getName().startsWith("_"));
-    }
-  }
-}