Browse Source

HADOOP-5850. Fixes a problem to do with not being able to jobs with 0 maps/reduces. Contributed by Vinod K V.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20@777575 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 16 năm trước cách đây
mục cha
commit
f2dca23bb3

+ 3 - 0
CHANGES.txt

@@ -77,6 +77,9 @@ Release 0.20.1 - Unreleased
     HADOOP-5210. Solves a problem in the progress report of the reduce task.
     (Ravi Gummadi via ddas)
 
+    HADOOP-5850. Fixes a problem to do with not being able to jobs with  
+    0 maps/reduces. (Vinod K V via ddas)
+
 Release 0.20.0 - 2009-04-15
 
   INCOMPATIBLE CHANGES

+ 32 - 46
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -392,7 +392,7 @@ class JobInProgress {
       jobInitKillStatus.initStarted = true;
     }
 
-    LOG.debug("initializing " + this.jobId);
+    LOG.info("Initializing " + jobId);
 
     // log job info
     JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(), 
@@ -437,38 +437,15 @@ class JobInProgress {
                                    splits[i], 
                                    jobtracker, conf, this, i);
     }
-    LOG.info("Input size for job "+ jobId + " = " + inputLength);
+    LOG.info("Input size for job " + jobId + " = " + inputLength
+        + ". Number of splits = " + splits.length);
     if (numMapTasks > 0) { 
-      LOG.info("Split info for job:" + jobId + " with " + 
-               splits.length + " splits:");
       nonRunningMapCache = createCache(splits, maxLevel);
     }
         
     // set the launch time
     this.launchTime = System.currentTimeMillis();
 
-    // if no split is returned, job is considered completed and successful
-    if (numMapTasks == 0) {
-      // Finished time need to be setted here to prevent this job to be retired
-      // from the job tracker jobs at the next retire iteration.
-      this.finishTime = this.launchTime;
-      status.setSetupProgress(1.0f);
-      status.setMapProgress(1.0f);
-      status.setReduceProgress(1.0f);
-      status.setCleanupProgress(1.0f);
-      status.setRunState(JobStatus.SUCCEEDED);
-      tasksInited.set(true);
-      JobHistory.JobInfo.logInited(profile.getJobID(), 
-                                    this.launchTime, 0, 0);
-      JobHistory.JobInfo.logFinished(profile.getJobID(), 
-                                     this.finishTime, 0, 0, 0, 0,
-                                     getCounters());
-      // Special case because the Job is not queued
-      JobEndNotifier.registerNotification(this.getJobConf(), this.getStatus());
-
-      return;
-    }
-
     //
     // Create reduce tasks
     //
@@ -490,9 +467,11 @@ class JobInProgress {
 
     // create cleanup two cleanup tips, one map and one reduce.
     cleanup = new TaskInProgress[2];
-    // cleanup map tip. This map is doesn't use split. 
-    // Just assign splits[0]
-    cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0], 
+
+    // cleanup map tip. This map doesn't use any splits. Just assign an empty
+    // split.
+    JobClient.RawSplit emptySplit = new JobClient.RawSplit();
+    cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, 
             jobtracker, conf, this, numMapTasks);
     cleanup[0].setJobCleanupTask();
 
@@ -503,9 +482,10 @@ class JobInProgress {
 
     // create two setup tips, one map and one reduce.
     setup = new TaskInProgress[2];
-    // setup map tip. This map is doesn't use split. 
-    // Just assign splits[0]
-    setup[0] = new TaskInProgress(jobId, jobFile, splits[0], 
+
+    // setup map tip. This map doesn't use any split. Just assign an empty
+    // split.
+    setup[0] = new TaskInProgress(jobId, jobFile, emptySplit, 
             jobtracker, conf, this, numMapTasks + 1 );
     setup[0].setJobSetupTask();
 
@@ -898,20 +878,11 @@ class JobInProgress {
     if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
       double progressDelta = tip.getProgress() - oldProgress;
       if (tip.isMapTask()) {
-        if (maps.length == 0) {
-          this.status.setMapProgress(1.0f);
-        } else {
           this.status.setMapProgress((float) (this.status.mapProgress() +
                                               progressDelta / maps.length));
-        }
       } else {
-        if (reduces.length == 0) {
-          this.status.setReduceProgress(1.0f);
-        } else {
-          this.status.setReduceProgress
-            ((float) (this.status.reduceProgress() +
-                      (progressDelta / reduces.length)));
-        }
+        this.status.setReduceProgress((float) (this.status.reduceProgress() + 
+                                           (progressDelta / reduces.length)));
       }
     }
   }
@@ -1138,8 +1109,10 @@ class JobInProgress {
         status.getRunState() != JobStatus.PREP) {
       return false;
     }
-    // check if cleanup task has been launched already. 
-    if (launchedCleanup) {
+    // check if cleanup task has been launched already or if setup isn't
+    // launched already. The later check is useful when number of maps is
+    // zero.
+    if (launchedCleanup || !isSetupFinished()) {
       return false;
     }
     // check if job has failed or killed
@@ -1173,7 +1146,6 @@ class JobInProgress {
       if (!canLaunchSetupTask()) {
         return null;
       }
-      
       String taskTracker = tts.getTrackerName();
       // Update the last-known clusterSize
       this.clusterSize = clusterSize;
@@ -2121,6 +2093,12 @@ class JobInProgress {
     if (this.status.getRunState() == JobStatus.RUNNING ) {
       this.status.setRunState(JobStatus.SUCCEEDED);
       this.status.setCleanupProgress(1.0f);
+      if (maps.length == 0) {
+        this.status.setMapProgress(1.0f);
+      }
+      if (reduces.length == 0) {
+        this.status.setReduceProgress(1.0f);
+      }
       this.finishTime = System.currentTimeMillis();
       LOG.info("Job " + this.status.getJobID() + 
                " has completed successfully.");
@@ -2438,6 +2416,14 @@ class JobInProgress {
     }
   }
 
+  boolean isSetupFinished() {
+    if (setup[0].isComplete() || setup[0].isFailed() || setup[1].isComplete()
+        || setup[1].isFailed()) {
+      return true;
+    }
+    return false;
+  }
+
   /**
    * Fail a task with a given reason, but without a status object.
    * 

+ 18 - 8
src/mapred/org/apache/hadoop/mapred/TaskInProgress.java

@@ -30,6 +30,7 @@ import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobClient.RawSplit;
 import org.apache.hadoop.mapred.SortedRanges.Range;
 import org.apache.hadoop.net.Node;
@@ -729,7 +730,10 @@ class TaskInProgress {
    * Get the split locations 
    */
   public String[] getSplitLocations() {
-    return rawSplit.getLocations();
+    if (isMapTask() && !jobSetup && !jobCleanup) {
+      return rawSplit.getLocations();
+    }
+    return new String[0];
   }
   
   /**
@@ -913,12 +917,18 @@ class TaskInProgress {
                              boolean taskCleanup) {
     // create the task
     Task t = null;
-    if (isMapTask()) {
-      LOG.debug("attempt "+  numTaskFailures   +
-          " sending skippedRecords "+failedRanges.getIndicesCount());
-      t = new MapTask(jobFile, taskid, partition, 
-          rawSplit.getClassName(), rawSplit.getBytes());
-    } else {
+    if (isMapTask() && !jobSetup && !jobCleanup) {
+      LOG.debug("attempt " + numTaskFailures + " sending skippedRecords "
+          + failedRanges.getIndicesCount());
+
+      t =
+          new MapTask(jobFile, taskid, partition, rawSplit.getClassName(),
+              rawSplit.getBytes());
+
+    } else if (jobSetup || jobCleanup) {
+      t = new MapTask(jobFile, taskid, partition, null, new BytesWritable());
+    }
+    else {
       t = new ReduceTask(jobFile, taskid, partition, numMaps);
     }
     if (jobCleanup) {
@@ -1027,7 +1037,7 @@ class TaskInProgress {
    * Gets the Node list of input split locations sorted in rack order.
    */ 
   public String getSplitNodes() {
-    if ( rawSplit == null) {
+    if (!isMapTask() || jobSetup || jobCleanup) {
       return "";
     }
     String[] splits = rawSplit.getLocations();

+ 175 - 0
src/test/org/apache/hadoop/mapred/TestEmptyJob.java

@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+/**
+ * A JUnit test to test Map-Reduce empty jobs.
+ */
+public class TestEmptyJob extends TestCase {
+  private static final Log LOG =
+      LogFactory.getLog(TestEmptyJob.class.getName());
+
+  private static String TEST_ROOT_DIR =
+      new File(System.getProperty("test.build.data", "/tmp")).toURI()
+          .toString().replace(' ', '+');
+
+  MiniMRCluster mr = null;
+
+  /**
+   * Simple method running a MapReduce job with no input data. Used to test that
+   * such a job is successful.
+   * 
+   * @param fileSys
+   * @param numMaps
+   * @param numReduces
+   * @return true if the MR job is successful, otherwise false
+   * @throws IOException
+   */
+  private boolean launchEmptyJob(URI fileSys, int numMaps, int numReduces)
+      throws IOException {
+    // create an empty input dir
+    final Path inDir = new Path(TEST_ROOT_DIR, "testing/empty/input");
+    final Path outDir = new Path(TEST_ROOT_DIR, "testing/empty/output");
+    JobConf conf = mr.createJobConf();
+    FileSystem fs = FileSystem.get(fileSys, conf);
+    fs.delete(outDir, true);
+    if (!fs.mkdirs(inDir)) {
+      LOG.warn("Can't create " + inDir);
+      return false;
+    }
+
+    // use WordCount example
+    FileSystem.setDefaultUri(conf, fileSys);
+    conf.setJobName("empty");
+    // use an InputFormat which returns no split
+    conf.setInputFormat(EmptyInputFormat.class);
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(IntWritable.class);
+    conf.setMapperClass(IdentityMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+    FileInputFormat.setInputPaths(conf, inDir);
+    FileOutputFormat.setOutputPath(conf, outDir);
+    conf.setNumMapTasks(numMaps);
+    conf.setNumReduceTasks(numReduces);
+
+    // run job and wait for completion
+    JobClient jc = new JobClient(conf);
+    RunningJob runningJob = jc.submitJob(conf);
+    while (true) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+      }
+      if (runningJob.isComplete()) {
+        break;
+      }
+    }
+
+    assertTrue(runningJob.isComplete());
+    assertTrue(runningJob.isSuccessful());
+    JobID jobID = runningJob.getID();
+
+    TaskReport[] jobSetupTasks = jc.getSetupTaskReports(jobID);
+    assertTrue("Number of job-setup tips is not 2!", jobSetupTasks.length == 2);
+    assertTrue("Setup progress is " + runningJob.setupProgress()
+        + " and not 1.0", runningJob.setupProgress() == 1.0);
+    assertTrue("Setup task is not finished!", mr.getJobTrackerRunner()
+        .getJobTracker().getJob(jobID).isSetupFinished());
+
+    assertTrue("Number of maps is not zero!", jc.getMapTaskReports(runningJob
+        .getID()).length == 0);
+    assertTrue(
+        "Map progress is " + runningJob.mapProgress() + " and not 1.0!",
+        runningJob.mapProgress() == 1.0);
+
+    assertTrue("Reduce progress is " + runningJob.reduceProgress()
+        + " and not 1.0!", runningJob.reduceProgress() == 1.0);
+    assertTrue("Number of reduces is not " + numReduces, jc
+        .getReduceTaskReports(runningJob.getID()).length == numReduces);
+
+    TaskReport[] jobCleanupTasks = jc.getCleanupTaskReports(jobID);
+    assertTrue("Number of job-cleanup tips is not 2!",
+        jobCleanupTasks.length == 2);
+    assertTrue("Cleanup progress is " + runningJob.cleanupProgress()
+        + " and not 1.0", runningJob.cleanupProgress() == 1.0);
+
+    assertTrue("Job output directory doesn't exit!", fs.exists(outDir));
+    FileStatus[] list = fs.listStatus(outDir, new OutputLogFilter());
+    assertTrue("Number of part-files is " + list.length + " and not "
+        + numReduces, list.length == numReduces);
+
+    // cleanup
+    fs.delete(outDir, true);
+
+    // return job result
+    LOG.info("job is complete: " + runningJob.isSuccessful());
+    return (runningJob.isSuccessful());
+  }
+
+  /**
+   * Test that a job with no input data (and thus with no input split and no map
+   * task to execute) is successful.
+   * 
+   * @throws IOException
+   */
+  public void testEmptyJob()
+      throws IOException {
+    FileSystem fileSys = null;
+    try {
+      final int taskTrackers = 1;
+      JobConf conf = new JobConf();
+      fileSys = FileSystem.get(conf);
+
+      conf.set("mapred.job.tracker.handler.count", "1");
+      conf.set("mapred.job.tracker", "127.0.0.1:0");
+      conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
+      conf.set("mapred.task.tracker.http.address", "127.0.0.1:0");
+
+      mr =
+          new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1,
+              null, null, conf);
+
+      assertTrue(launchEmptyJob(fileSys.getUri(), 3, 1));
+      assertTrue(launchEmptyJob(fileSys.getUri(), 0, 0));
+    } finally {
+      if (fileSys != null) {
+        fileSys.close();
+      }
+      if (mr != null) {
+        mr.shutdown();
+      }
+    }
+  }
+}

+ 0 - 141
src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java

@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import java.io.IOException;
-
-import junit.framework.TestCase;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
-
-/**
- * A JUnit test to test Map-Reduce empty jobs Mini-DFS.
- */
-public class TestEmptyJobWithDFS extends TestCase {
-  private static final Log LOG =
-    LogFactory.getLog(TestEmptyJobWithDFS.class.getName());
-  
-  /**
-   * Simple method running a MapReduce job with no input data. Used
-   * to test that such a job is successful.
-   * @param fileSys
-   * @param jobTracker
-   * @param conf
-   * @param numMaps
-   * @param numReduces
-   * @return true if the MR job is successful, otherwise false
-   * @throws IOException
-   */
-  public static boolean launchEmptyJob(String fileSys,
-                                       String jobTracker,
-                                       JobConf conf,
-                                       int numMaps,
-                                       int numReduces) throws IOException {
-    // create an empty input dir
-    final Path inDir = new Path("/testing/empty/input");
-    final Path outDir = new Path("/testing/empty/output");
-    FileSystem fs = FileSystem.getNamed(fileSys, conf);
-    fs.delete(outDir, true);
-    if (!fs.mkdirs(inDir)) {
-      LOG.warn("Can't create " + inDir);
-      return false;
-    }
-
-    // use WordCount example
-    FileSystem.setDefaultUri(conf, fileSys);
-    conf.set("mapred.job.tracker", jobTracker);
-    conf.setJobName("empty");
-    // use an InputFormat which returns no split
-    conf.setInputFormat(EmptyInputFormat.class);
-    conf.setOutputKeyClass(Text.class);
-    conf.setOutputValueClass(IntWritable.class);
-    conf.setMapperClass(IdentityMapper.class);        
-    conf.setReducerClass(IdentityReducer.class);
-    FileInputFormat.setInputPaths(conf, inDir);
-    FileOutputFormat.setOutputPath(conf, outDir);
-    conf.setNumMapTasks(numMaps);
-    conf.setNumReduceTasks(numReduces);
-      
-    // run job and wait for completion
-    JobClient jc = new JobClient(conf);
-    RunningJob runningJob = jc.submitJob(conf);
-    while (true) {
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {}
-      if (runningJob.isComplete()) {
-        break;
-      }
-    }
-      
-    try {
-      assertTrue(runningJob.isComplete());
-      assertTrue(runningJob.isSuccessful());
-    } catch (NullPointerException npe) {
-      // This NPE should no more happens
-      fail("A NPE should not have happened.");
-    }
-          
-    // return job result
-    LOG.info("job is complete: " + runningJob.isSuccessful());
-    return (runningJob.isSuccessful());
-  }
-  
-  /**
-   * Test that a job with no input data (and thus with no input split and
-   * no map task to execute) is successful.
-   * @throws IOException
-   */
-  public void testEmptyJobWithDFS() throws IOException {
-    String namenode = null;
-    MiniDFSCluster dfs = null;
-    MiniMRCluster mr = null;
-    FileSystem fileSys = null;
-    try {
-      final int taskTrackers = 4;
-      final int jobTrackerPort = 60050;
-      Configuration conf = new Configuration();
-      dfs = new MiniDFSCluster(conf, 1, true, null);
-      fileSys = dfs.getFileSystem();
-      namenode = fileSys.getName();
-      mr = new MiniMRCluster(taskTrackers, namenode, 2);
-      final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
-      JobConf jobConf = new JobConf();
-      boolean result;
-      result = launchEmptyJob(namenode, jobTrackerName, jobConf, 
-                              3, 1);
-      assertTrue(result);
-          
-    } finally {
-      if (fileSys != null) { fileSys.close(); }
-      if (dfs != null) { dfs.shutdown(); }
-      if (mr != null) { mr.shutdown(); }
-    }
-  }
-  
-}

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestJobDirCleanup.java

@@ -37,7 +37,7 @@ public class TestJobDirCleanup extends TestCase {
   //end of the job (indirectly testing whether all tasktrackers
   //got a KillJobAction).
   private static final Log LOG =
-        LogFactory.getLog(TestEmptyJobWithDFS.class.getName());
+        LogFactory.getLog(TestEmptyJob.class.getName());
   private void runSleepJob(JobConf conf) throws Exception {
     String[] args = { "-m", "1", "-r", "10", "-mt", "1000", "-rt", "10000" };
     ToolRunner.run(conf, new SleepJob(), args);

+ 1 - 1
src/webapps/job/taskdetails.jsp

@@ -249,7 +249,7 @@
 </center>
 
 <%
-      if (ts[0].getIsMap()) {
+      if (ts[0].getIsMap() && !isCleanupOrSetup) {
 %>
 <h3>Input Split Locations</h3>
 <table border=2 cellpadding="5" cellspacing="2">