Browse Source

HADOOP-1127. Fix AlreadyBeingCreatedException in namenode for jobs run with speculative execution.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@532078 13f79535-47bb-0310-9956-ffa450edef68
Thomas White 18 years ago
parent
commit
9ee3cd355c

+ 4 - 0
CHANGES.txt

@@ -248,6 +248,10 @@ Trunk (unreleased changes)
 74. HADOOP-1190.  Fix unchecked warnings in main Hadoop code.  
     (tomwhite)
 
+75. HADOOP-1127.  Fix AlreadyBeingCreatedException in namenode for 
+    jobs run with speculative execution.
+    (Arun C Murthy via tomwhite)
+
 
 Release 0.12.3 - 2007-04-06
 

+ 6 - 5
src/examples/org/apache/hadoop/examples/RandomWriter.java

@@ -79,7 +79,7 @@ public class RandomWriter {
       }
       public boolean next(Writable key, Writable value) {
         if (name != null) {
-          ((Text) key).set(name.toString());
+          ((Text) key).set(name.getName());
           name = null;
           return true;
         }
@@ -118,6 +118,7 @@ public class RandomWriter {
     private Random random = new Random();
     private BytesWritable randomKey = new BytesWritable();
     private BytesWritable randomValue = new BytesWritable();
+    private Path outputDir = null;
     
     private void randomizeBytes(byte[] data, int offset, int length) {
       for(int i=offset + length - 1; i >= offset; --i) {
@@ -134,7 +135,8 @@ public class RandomWriter {
                     Reporter reporter) throws IOException {
       String filename = ((Text) key).toString();
       SequenceFile.Writer writer = 
-        SequenceFile.createWriter(fileSys, jobConf, new Path(filename), 
+        SequenceFile.createWriter(fileSys, jobConf, 
+                                  new Path(outputDir, filename), 
                                   BytesWritable.class, BytesWritable.class,
                                   CompressionType.NONE, reporter);
       int itemCount = 0;
@@ -171,6 +173,8 @@ public class RandomWriter {
       } catch (IOException e) {
         throw new RuntimeException("Can't get default file system", e);
       }
+      outputDir = job.getOutputPath();
+      
       numBytesToWrite = job.getLong("test.randomwrite.bytes_per_map",
                                     1*1024*1024*1024);
       minKeySize = job.getInt("test.randomwrite.min_key", 10);
@@ -206,9 +210,6 @@ public class RandomWriter {
     job.setJobName("random-writer");
     job.setOutputPath(outDir);
     
-    // turn off speculative execution, because DFS doesn't handle
-    // multiple writers to the same file.
-    job.setSpeculativeExecution(false);
     job.setOutputKeyClass(BytesWritable.class);
     job.setOutputValueClass(BytesWritable.class);
     

+ 22 - 0
src/java/org/apache/hadoop/fs/FileUtil.java

@@ -63,6 +63,28 @@ public class FileUtil {
     return dir.delete();
   }
 
+  /**
+   * Recursively delete a directory.
+   * 
+   * @param fs {@link FileSystem} on which the path is present
+   * @param dir directory to recursively delete 
+   * @throws IOException
+   */
+  public static void fullyDelete(FileSystem fs, Path dir) 
+  throws IOException {
+    Path[] paths = fs.listPaths(dir);
+    if (paths != null) {
+      for (Path p : paths) {
+        if (fs.isFile(p))  {
+          fs.delete(p);
+        } else {
+          fullyDelete(fs, p);
+        }
+      }
+    }
+    fs.delete(dir);
+  }
+
   /** Copy files between FileSystems. */
   public static boolean copy(FileSystem srcFS, Path src, 
                              FileSystem dstFS, Path dst, 

+ 25 - 6
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.mapred.JobTracker.JobTrackerMetrics;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.util.StringUtils;
 
 ///////////////////////////////////////////////////////
 // JobInProgress maintains all the info for keeping
@@ -333,8 +334,20 @@ class JobInProgress {
                                             TaskCompletionEvent.Status.SUCCEEDED,
                                             httpTaskLogLocation 
                                             );
+        try {
+          completedTask(tip, status, metrics);
+        } catch (IOException ioe) {
+          // Oops! Failed to copy the task's output to its final place;
+          // fail the task!
+          failedTask(tip, status.getTaskId(), 
+                     "Failed to copy reduce's output", 
+                     TaskStatus.Phase.REDUCE, TaskStatus.State.FAILED, 
+                     ttStatus.getHost(), status.getTaskTracker(), null);
+          LOG.info("Failed to copy the output of " + status.getTaskId() + 
+                   " with: " + StringUtils.stringifyException(ioe));
+          return;
+        }
         tip.setSuccessEventNumber(taskCompletionEventTracker);
-        completedTask(tip, status, metrics);
       } else if (state == TaskStatus.State.FAILED ||
                  state == TaskStatus.State.KILLED) {
         taskEvent = new TaskCompletionEvent(
@@ -641,7 +654,8 @@ class JobInProgress {
    */
   public synchronized void completedTask(TaskInProgress tip, 
                                          TaskStatus status,
-                                         JobTrackerMetrics metrics) {
+                                         JobTrackerMetrics metrics) 
+  throws IOException {
     String taskid = status.getTaskId();
         
     // Sanity check: is the TIP already complete?
@@ -650,7 +664,12 @@ class JobInProgress {
                " has completed task " + taskid);
           
       // Just mark this 'task' as complete
-      tip.completedTask(taskid);
+      try {
+        tip.alreadyCompletedTask(taskid);
+      } catch (IOException ioe) {
+        LOG.info("Failed to discard output of " + taskid + " : " + 
+                StringUtils.stringifyException(ioe));
+      }
           
       // Let the JobTracker cleanup this taskid if the job isn't running
       if (this.status.getRunState() != JobStatus.RUNNING) {
@@ -662,6 +681,9 @@ class JobInProgress {
     LOG.info("Task '" + taskid + "' has completed " + tip.getTIPId() + 
              " successfully.");          
 
+    // Mark the TIP as complete
+    tip.completed(taskid);
+
     // Update jobhistory 
     String taskTrackerName = status.getTaskTracker();
     if (status.getIsMap()){
@@ -685,9 +707,6 @@ class JobInProgress {
                                   Values.REDUCE.name(), status.getFinishTime()); 
     }
         
-    // Mark the TIP as complete
-    tip.completed(taskid);
-        
     // Update the running/finished map/reduce counts
     if (tip.isMapTask()){
       runningMapTasks -= 1;

+ 2 - 0
src/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -124,6 +124,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
           map_tasks += 1;
           myMetrics.launchMap();
           map.run(localConf, this);
+          map.saveTaskOutput();
           myMetrics.completeMap();
           map_tasks -= 1;
           updateCounters(map);
@@ -153,6 +154,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
           reduce_tasks += 1;
           myMetrics.launchReduce();
           reduce.run(localConf, this);
+          reduce.saveTaskOutput();
           myMetrics.completeReduce();
           reduce_tasks -= 1;
           updateCounters(reduce);

+ 0 - 16
src/java/org/apache/hadoop/mapred/MapTask.java

@@ -57,8 +57,6 @@ class MapTask extends Task {
 
   private BytesWritable split = new BytesWritable();
   private String splitClass;
-  private MapOutputFile mapOutputFile = new MapOutputFile();
-  private JobConf conf;
   private InputSplit instantiatedSplit = null;
 
   private static final Log LOG = LogFactory.getLog(MapTask.class.getName());
@@ -100,7 +98,6 @@ class MapTask extends Task {
     super.write(out);
     Text.writeString(out, splitClass);
     split.write(out);
-    
   }
   
   public void readFields(DataInput in) throws IOException {
@@ -226,19 +223,6 @@ class MapTask extends Task {
     return sortProgress;
   }
 
-  public void setConf(Configuration conf) {
-    if (conf instanceof JobConf) {
-      this.conf = (JobConf) conf;
-    } else {
-      this.conf = new JobConf(conf);
-    }
-    this.mapOutputFile.setConf(this.conf);
-  }
-
-  public Configuration getConf() {
-    return this.conf;
-  }
-
   class MapOutputBuffer implements OutputCollector {
 
     private final int partitions;

+ 4 - 1
src/java/org/apache/hadoop/mapred/PhasedFileSystem.java

@@ -24,7 +24,10 @@ import org.apache.hadoop.util.Progressable;
  * Temporary files are written in  ("mapred.system.dir")/<jobid>/<taskid>
  * If one tasks opens a large number of files in succession then its 
  * better to commit(Path) individual files when done. Otherwise
- * commit() can be used to commit all open files at once. 
+ * commit() can be used to commit all open files at once.
+ * 
+ * @deprecated {@link PhasedFileSystem} is no longer used
+ *             during speculative execution of tasks.
  */
 public class PhasedFileSystem extends FilterFileSystem {
   // Map from final file name to temporary file name

+ 0 - 25
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -89,8 +89,6 @@ class ReduceTask extends Task {
   private Progress copyPhase = getProgress().addPhase("copy");
   private Progress sortPhase  = getProgress().addPhase("sort");
   private Progress reducePhase = getProgress().addPhase("reduce");
-  private JobConf conf;
-  private MapOutputFile mapOutputFile = new MapOutputFile();
 
   public ReduceTask() {}
 
@@ -322,14 +320,8 @@ class ReduceTask extends Task {
     
     // make output collector
     String finalName = getOutputName(getPartition());
-    boolean runSpeculative = job.getSpeculativeExecution();
     FileSystem fs = FileSystem.get(job);
 
-    if (runSpeculative){
-      fs = new PhasedFileSystem (fs , 
-                                 getJobId(), getTipId(), getTaskId());
-    }
-    
     final RecordWriter out = 
       job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);  
     
@@ -360,10 +352,6 @@ class ReduceTask extends Task {
       reducer.close();
       out.close(reporter);
       //End of clean up.
-      
-      if (runSpeculative){
-        ((PhasedFileSystem)fs).commit(); 
-      }
     } catch (IOException ioe) {
       try {
         reducer.close();
@@ -392,19 +380,6 @@ class ReduceTask extends Task {
     return "part-" + NUMBER_FORMAT.format(partition);
   }
 
-  public void setConf(Configuration conf) {
-    if (conf instanceof JobConf) {
-      this.conf = (JobConf) conf;
-    } else {
-      this.conf = new JobConf(conf);
-    }
-    this.mapOutputFile.setConf(this.conf);
-  }
-
-  public Configuration getConf() {
-    return this.conf;
-  }
-  
   private class ReduceCopier implements MRConstants {
 
     /** Reference to the umbilical object */

+ 115 - 1
src/java/org/apache/hadoop/mapred/Task.java

@@ -21,10 +21,16 @@ package org.apache.hadoop.mapred;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.net.URI;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progress;
@@ -58,8 +64,11 @@ abstract class Task implements Writable, Configurable {
   private String jobId;                           // unique jobid
   private String tipId;
   private int partition;                          // id within job
-  private TaskStatus.Phase phase;                 // current phase of the task 
+  private TaskStatus.Phase phase ;                // current phase of the task
+  private Path taskOutputPath;                    // task-specific output dir
   
+  protected JobConf conf;
+  protected MapOutputFile mapOutputFile = new MapOutputFile();
 
   ////////////////////////////////////////////
   // Constructors
@@ -125,6 +134,11 @@ abstract class Task implements Writable, Configurable {
     UTF8.writeString(out, taskId);
     UTF8.writeString(out, jobId);
     out.writeInt(partition);
+    if (taskOutputPath != null) {
+      Text.writeString(out, taskOutputPath.toString());
+    } else {
+      Text.writeString(out, new String(""));
+    }
   }
   public void readFields(DataInput in) throws IOException {
     jobFile = UTF8.readString(in);
@@ -132,10 +146,20 @@ abstract class Task implements Writable, Configurable {
     taskId = UTF8.readString(in);
     jobId = UTF8.readString(in);
     partition = in.readInt();
+    String outPath = Text.readString(in);
+    if (outPath.length() != 0) {
+      taskOutputPath = new Path(outPath);
+    } else {
+      taskOutputPath = null;
+    }
   }
 
   public String toString() { return taskId; }
 
+  private Path getTaskOutputPath(JobConf conf) {
+    return new Path(conf.getOutputPath(), new String("_" + taskId));
+  }
+  
   /**
    * Localize the given JobConf to be specific for this task.
    */
@@ -145,6 +169,12 @@ abstract class Task implements Writable, Configurable {
     conf.setBoolean("mapred.task.is.map", isMapTask());
     conf.setInt("mapred.task.partition", partition);
     conf.set("mapred.job.id", jobId);
+    
+    // The task-specific output path
+    if (conf.getOutputPath() != null) {
+      taskOutputPath = getTaskOutputPath(conf);
+      conf.setOutputPath(taskOutputPath);
+    }
   }
   
   /** Run this task as a part of the named job.  This method is executed in the
@@ -250,4 +280,88 @@ abstract class Task implements Writable, Configurable {
       }
     }
   }
+  
+  public void setConf(Configuration conf) {
+    if (conf instanceof JobConf) {
+      this.conf = (JobConf) conf;
+
+      if (taskId != null && taskOutputPath == null && 
+              this.conf.getOutputPath() != null) {
+        taskOutputPath = getTaskOutputPath(this.conf);
+      }
+    } else {
+      this.conf = new JobConf(conf);
+    }
+    this.mapOutputFile.setConf(this.conf);
+  }
+
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  /**
+   * Save the task's output on successful completion.
+   * 
+   * @throws IOException
+   */
+  void saveTaskOutput() throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+
+    if (taskOutputPath != null && fs.exists(taskOutputPath)) {
+      Path jobOutputPath = taskOutputPath.getParent();
+
+      // Move the task outputs to their final place
+      moveTaskOutputs(fs, jobOutputPath, taskOutputPath);
+
+      // Delete the temporary task-specific output directory
+      if (!fs.delete(taskOutputPath)) {
+        LOG.info("Failed to delete the temporary output directory of task: " + 
+                getTaskId() + " - " + taskOutputPath);
+      }
+      
+      LOG.info("Saved output of task '" + getTaskId() + "' to " + jobOutputPath);
+    }
+  }
+  
+  private Path getFinalPath(Path jobOutputDir, Path taskOutput) {
+    URI relativePath = taskOutputPath.toUri().relativize(taskOutput.toUri());
+    return new Path(jobOutputDir, relativePath.toString());
+  }
+  
+  private void moveTaskOutputs(FileSystem fs, Path jobOutputDir, Path taskOutput) 
+  throws IOException {
+    if (fs.isFile(taskOutput)) {
+      Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput);
+      fs.mkdirs(finalOutputPath.getParent());
+      if (!fs.rename(taskOutput, finalOutputPath)) {
+        throw new IOException("Failed to save output of task: " + 
+                getTaskId());
+      }
+      LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
+    } else if(fs.isDirectory(taskOutput)) {
+      Path[] paths = fs.listPaths(taskOutput);
+      if (paths != null) {
+        for (Path path : paths) {
+          moveTaskOutputs(fs, jobOutputDir, path);
+        }
+      }
+    }
+  }
+  
+  /**
+   * Discard the task's output on failure.
+   * 
+   * @throws IOException
+   */
+  void discardTaskOutput() throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+
+    if (taskOutputPath != null && fs.exists(taskOutputPath)) {
+      // Delete the temporary task-specific output directory
+      FileUtil.fullyDelete(fs, taskOutputPath);
+      LOG.info("Discarded output of task '" + getTaskId() + "' - " 
+              + taskOutputPath);
+    }
+  }
+
 }

+ 45 - 2
src/java/org/apache/hadoop/mapred/TaskInProgress.java

@@ -30,6 +30,7 @@ import java.util.TreeSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.util.StringUtils;
 
 
 ////////////////////////////////////////////////////////
@@ -101,6 +102,10 @@ class TaskInProgress {
   private TreeMap<String,TaskStatus> taskStatuses = 
     new TreeMap<String,TaskStatus>();
 
+  // Map from taskId -> Task
+  private Map<String, Task> tasks = new TreeMap<String, Task>();
+  boolean savedTaskOutput = false;
+
   private TreeSet<String> machinesWhereFailed = new TreeSet<String>();
   private TreeSet<String> tasksReportedClosed = new TreeSet<String>();
     
@@ -410,6 +415,15 @@ class TaskInProgress {
       this.completes--;
     }
 
+    // Discard task output
+    Task t = tasks.get(taskid);
+    try {
+      t.discardTaskOutput();
+    } catch (IOException ioe) {
+      LOG.info("Failed to discard output of task '" + taskid + "' with " + 
+              StringUtils.stringifyException(ioe));
+    }
+
     if (taskState == TaskStatus.State.FAILED) {
       numTaskFailures++;
     } else {
@@ -431,18 +445,46 @@ class TaskInProgress {
    * TaskInProgress to be completed and hence we might not want to 
    * manipulate the TaskInProgress to note that it is 'complete' just-as-yet.
    */
+  void alreadyCompletedTask(String taskid) throws IOException {
+    Task t = tasks.get(taskid);
+    try {
+      t.discardTaskOutput();
+    } catch (IOException ioe) {
+      LOG.info("Failed to discard output of task '" + taskid + "' with " + 
+              StringUtils.stringifyException(ioe));
+    }
+    completedTask(taskid);
+  }
+
   void completedTask(String taskid) {
-    LOG.info("Task '" + taskid + "' has completed.");
     TaskStatus status = taskStatuses.get(taskid);
     status.setRunState(TaskStatus.State.SUCCEEDED);
     activeTasks.remove(taskid);
+    LOG.info("Task '" + taskid + "' has completed.");
   }
     
   /**
    * Indicate that one of the taskids in this TaskInProgress
    * has successfully completed!
    */
-  public void completed(String taskid) {
+  public void completed(String taskid) throws IOException {
+    //
+    // Finalize the task's output
+    //
+    Task t = tasks.get(taskid);
+    if (!savedTaskOutput) {
+      t.saveTaskOutput();
+      savedTaskOutput = true;
+    } else {
+      try {
+        t.discardTaskOutput();
+      } catch (IOException ioe) {
+        LOG.info("Failed to discard 'already-saved' output of task: " + 
+                t.getTaskId() + " with: " + 
+                StringUtils.stringifyException(ioe));
+      }
+    }
+
     //
     // Record that this taskid is complete
     //
@@ -597,6 +639,7 @@ class TaskInProgress {
       t = new ReduceTask(jobId, jobFile, this.id, taskid, partition, numMaps);
     }
     t.setConf(conf);
+    tasks.put(taskid, t);
 
     activeTasks.put(taskid, taskTracker);
 

+ 20 - 6
src/test/org/apache/hadoop/mapred/SortValidator.java

@@ -83,6 +83,12 @@ public class SortValidator {
     return pairData;
   }
 
+  private static final PathFilter sortPathsFilter = new PathFilter() {
+    public boolean accept(Path path) {
+      return (path.getName().startsWith("part-"));
+    }
+  };
+  
   /**
    * A simple map-reduce job which checks consistency of the
    * MapReduce framework's sort by checking:
@@ -132,6 +138,7 @@ public class SortValidator {
       private Partitioner partitioner = null;
       private int partition = -1;
       private int noSortReducers = -1;
+      private long recordId = -1;
 
       public void configure(JobConf job) {
         // 'key' == sortInput for sort-input; key == sortOutput for sort-output
@@ -161,6 +168,7 @@ public class SortValidator {
                       Reporter reporter) throws IOException {
         BytesWritable bwKey = (BytesWritable)key;
         BytesWritable bwValue = (BytesWritable)value;
+        ++recordId;
         
         if (this.key == sortOutput) {
           // Check if keys are 'sorted' if this  
@@ -170,7 +178,8 @@ public class SortValidator {
           } else {
             if (prevKey.compareTo(bwKey) > 0) {
               throw new IOException("The 'map-reduce' framework wrongly classifed"
-                                    + "(" + prevKey + ") > (" + bwKey + ")"); 
+                                    + "(" + prevKey + ") > (" + bwKey + ") for record# " 
+                                    + recordId); 
             }
             prevKey = bwKey;
           }
@@ -179,8 +188,9 @@ public class SortValidator {
           int keyPartition = 
             partitioner.getPartition(bwKey, bwValue, noSortReducers);
           if (partition != keyPartition) {
-            throw new IOException("Paritions do not match! - '" + partition + 
-                                  "' v/s '" + keyPartition + "'");
+            throw new IOException("Partitions do not match for record# " + 
+                                  recordId + " ! - '" + partition + "' v/s '" + 
+                                  keyPartition + "'");
           }
         }
 
@@ -225,8 +235,10 @@ public class SortValidator {
       JobConf jobConf = new JobConf(defaults, RecordStatsChecker.class);
       jobConf.setJobName("sortvalidate-recordstats-checker");
 
-      int noSortReduceTasks = fs.listPaths(sortOutput).length;
+      int noSortReduceTasks = 
+        fs.listPaths(sortOutput, sortPathsFilter).length;
       jobConf.setInt("sortvalidate.sort.reduce.tasks", noSortReduceTasks);
+      int noSortInputpaths = fs.listPaths(sortInput).length;
 
       jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
       jobConf.setOutputFormat(SequenceFileOutputFormat.class);
@@ -253,8 +265,10 @@ public class SortValidator {
       //job_conf.set("mapred.job.tracker", "local");
       
       System.out.println("\nSortValidator.RecordStatsChecker: Validate sort " +
-                         "from " + jobConf.getInputPaths()[0] + ", " + 
-                         jobConf.getInputPaths()[1] + " into " + jobConf.getOutputPath() + 
+                         "from " + jobConf.getInputPaths()[0] + " (" + 
+                         noSortInputpaths + " files), " + 
+                         jobConf.getInputPaths()[1] + " (" + noSortReduceTasks + 
+                         " files) into " + jobConf.getOutputPath() + 
                          " with 1 reducer.");
       Date startTime = new Date();
       System.out.println("Job started: " + startTime);