Browse Source

HADOOP-728. Fix contrib/streaming issues, including '-reducer=NONE'. Contributed by Sanjay.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@481430 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 years ago
parent
commit
64adfedcc2

+ 3 - 0
CHANGES.txt

@@ -146,6 +146,9 @@ Trunk (unreleased changes)
 43. HADOOP-750.  Fix a potential race condition during mapreduce
     shuffle.  (omalley via cutting)
 
+44. HADOOP-728.  Fix contrib/streaming-related issues, including
+    '-reducer NONE'.  (Sanjay Dahiya via cutting)
+
 
 Release 0.8.0 - 2006-11-03
 

+ 14 - 21
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java

@@ -36,6 +36,7 @@ import org.apache.commons.logging.*;
 
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.PhasedFileSystem;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.util.StringUtils;
@@ -192,10 +193,6 @@ public abstract class PipeMapRed {
     }
   }
 
-  String makeUniqueFileSuffix() {
-    return "." + System.currentTimeMillis() + "." + job_.get("mapred.task.id");
-  }
-
   public void configure(JobConf job) {
     try {
       String argv = getPipeCommand(job);
@@ -259,20 +256,21 @@ public abstract class PipeMapRed {
         // See StreamJob.setOutputSpec(): if reducerNone_ aka optSideEffect then: 
         // client has renamed outputPath and saved the argv's original output path as:
         if (useSingleSideOutputURI_) {
-          sideEffectURI_ = new URI(sideOutputURI_);
+          finalOutputURI = new URI(sideOutputURI_);
           sideEffectPathFinal_ = null; // in-place, no renaming to final
         } else {
+          sideFs_ = new PhasedFileSystem(sideFs_, job);
           String sideOutputPath = job_.get("stream.sideoutput.dir"); // was: job_.getOutputPath() 
           String fileName = getSideEffectFileName(); // see HADOOP-444 for rationale
           sideEffectPathFinal_ = new Path(sideOutputPath, fileName);
-          sideEffectURI_ = new URI(sideEffectPathFinal_ + makeUniqueFileSuffix()); // implicit dfs: 
+          finalOutputURI = new URI(sideEffectPathFinal_.toString()); // implicit dfs: 
         }
         // apply default scheme
-        if(sideEffectURI_.getScheme() == null) {
-          sideEffectURI_ = new URI("file", sideEffectURI_.getSchemeSpecificPart(), null);
+        if(finalOutputURI.getScheme() == null) {
+          finalOutputURI = new URI("file", finalOutputURI.getSchemeSpecificPart(), null);
         }
         boolean allowSocket = useSingleSideOutputURI_;
-        sideEffectOut_ = getURIOutputStream(sideEffectURI_, allowSocket);
+        sideEffectOut_ = getURIOutputStream(finalOutputURI, allowSocket);
       }
 
       // 
@@ -292,7 +290,7 @@ public abstract class PipeMapRed {
           f = null;
       }
       logprintln("PipeMapRed exec " + Arrays.asList(argvSplit));
-      logprintln("sideEffectURI_=" + sideEffectURI_);
+      logprintln("sideEffectURI_=" + finalOutputURI);
 
       Environment childEnv = (Environment) StreamUtil.env().clone();
       addJobConfToEnvironment(job_, childEnv);
@@ -505,6 +503,7 @@ public abstract class PipeMapRed {
           if (optSideEffect_) {
             sideEffectOut_.write(answer);
             sideEffectOut_.write('\n');
+            sideEffectOut_.flush();
           } else {
             splitKeyVal(answer, key, val);
             output.collect(key, val);
@@ -576,17 +575,11 @@ public abstract class PipeMapRed {
       waitOutputThreads();
       try {
         if (optSideEffect_) {
-          logprintln("closing " + sideEffectURI_);
+          logprintln("closing " + finalOutputURI);
           if (sideEffectOut_ != null) sideEffectOut_.close();
-          logprintln("closed  " + sideEffectURI_);
-          if (useSingleSideOutputURI_) {
-            // With sideEffectPath_ we wrote in-place. 
-            // Possibly a named pipe set up by user or a socket.
-          } else {
-            boolean del = sideFs_.delete(sideEffectPathFinal_);
-            logprintln("deleted  (" + del + ") " + sideEffectPathFinal_);
-            sideFs_.rename(new Path(sideEffectURI_.getSchemeSpecificPart()), sideEffectPathFinal_);
-            logprintln("renamed  " + sideEffectPathFinal_);
+          logprintln("closed  " + finalOutputURI);
+          if ( ! useSingleSideOutputURI_) {
+            ((PhasedFileSystem)sideFs_).commit(); 
           }
         }
       } catch (IOException io) {
@@ -725,7 +718,7 @@ public abstract class PipeMapRed {
   boolean optUseKey_ = true;
 
   private boolean optSideEffect_;
-  private URI sideEffectURI_;
+  private URI finalOutputURI;
   private Path sideEffectPathFinal_;
 
   private boolean useSingleSideOutputURI_;

+ 0 - 2
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java

@@ -701,8 +701,6 @@ public class StreamJob {
         } catch (URISyntaxException e) {
           throw (IOException) new IOException().initCause(e);
         }
-      } else {
-        mapsideoutURI_ = primary;
       }
       // an empty reduce output named "part-00002" will go here and not collide.
       channel0 = primary + ".NONE";

+ 4 - 2
src/java/org/apache/hadoop/mapred/PhasedFileSystem.java

@@ -25,7 +25,7 @@ import org.apache.hadoop.util.Progressable;
  * better to commit(Path) individual files when done. Otherwise
  * commit() can be used to commit all open files at once. 
  */
-class PhasedFileSystem extends FileSystem {
+public class PhasedFileSystem extends FileSystem {
 
   private FileSystem baseFS ;
   // Map from final file name to temporary file name
@@ -93,7 +93,9 @@ class PhasedFileSystem extends FileSystem {
         }catch(IOException ioe){
           // ignore if already closed
         }
-        baseFS.delete( fInfo.getTempPath() ); 
+        if( baseFS.exists(fInfo.getTempPath())){
+          baseFS.delete( fInfo.getTempPath() );
+        }
         finalNameToFileInfo.remove(finalFile); 
       }
     }

+ 5 - 3
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -1030,9 +1030,11 @@ public class TaskTracker
             // Delete temp directory in case any task used PhasedFileSystem.
             try{
               String systemDir = task.getConf().get("mapred.system.dir");
-              String taskTempDir = systemDir + "/" + 
-                  task.getJobId() + "/" + task.getTipId();
-              fs.delete(new Path(taskTempDir)) ;
+              Path taskTempDir = new Path(systemDir + "/" + 
+                  task.getJobId() + "/" + task.getTipId());
+              if( fs.exists(taskTempDir)){
+                fs.delete(taskTempDir) ;
+              }
             }catch(IOException e){
               LOG.warn("Error in deleting reduce temporary output",e); 
             }