Browse Source

HADOOP-476. Rewrite contrib/streaming command-line processing, improving parameter validation. Contributed by Sanjay.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@507163 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 years ago
parent
commit
62825831fa
2 changed files with 322 additions and 128 deletions
  1. 3 0
      CHANGES.txt
  2. 319 128
      src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java

+ 3 - 0
CHANGES.txt

@@ -46,6 +46,9 @@ Trunk (unreleased changes)
     separate thread, to improve heartbeat processing time.
     separate thread, to improve heartbeat processing time.
     (Dhruba Borthakur via cutting) 
     (Dhruba Borthakur via cutting) 
 
 
+14. HADOOP-476.  Rewrite contrib/streaming command-line processing,
+    improving parameter validation.  (Sanjay Dahiya via cutting)
+
 
 
 Release 0.11.1 - 2007-02-09
 Release 0.11.1 - 2007-02-09
 
 

+ 319 - 128
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java

@@ -26,16 +26,33 @@ import java.net.URISyntaxException;
 import java.net.URLEncoder;
 import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
 import java.util.Map;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.TreeSet;
 
 
+import org.apache.commons.cli2.*; 
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.commons.cli2.option.PropertyOption;
+import org.apache.commons.cli2.resource.ResourceConstants;
+import org.apache.commons.cli2.util.HelpFormatter;
+import org.apache.commons.cli2.validation.FileValidator;
+import org.apache.commons.cli2.validation.InvalidArgumentException;
+import org.apache.commons.cli2.validation.Validator;
 import org.apache.commons.logging.*;
 import org.apache.commons.logging.*;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 
 
 import org.apache.hadoop.mapred.FileAlreadyExistsException;
 import org.apache.hadoop.mapred.FileAlreadyExistsException;
@@ -45,6 +62,7 @@ import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.filecache.*;
 import org.apache.hadoop.filecache.*;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.util.*;
+import org.apache.log4j.helpers.OptionConverter;
 /** All the client-side work happens here.
 /** All the client-side work happens here.
  * (Jar packaging, MapRed job submission and monitoring)
  * (Jar packaging, MapRed job submission and monitoring)
  * @author Michel Tourn
  * @author Michel Tourn
@@ -55,7 +73,22 @@ public class StreamJob {
   final static String REDUCE_NONE = "NONE";
   final static String REDUCE_NONE = "NONE";
   private boolean reducerNone_;
   private boolean reducerNone_;
 
 
+  /** -----------Streaming CLI Implementation  **/
+  private DefaultOptionBuilder builder = 
+    new DefaultOptionBuilder("-","-", false);
+  private ArgumentBuilder argBuilder = new ArgumentBuilder(); 
+  private Parser parser = new Parser(); 
+  private Group allOptions ; 
+  HelpFormatter helpFormatter = new HelpFormatter("  ", "  ", "  ", 900);
+  // need these two at class level to extract values later from 
+  // commons-cli command line
+  private MultiPropertyOption jobconf = new MultiPropertyOption(
+      "-jobconf", "(n=v) Optional. Add or override a JobConf property.", 'D'); 
+  private MultiPropertyOption cmdenv = new MultiPropertyOption(
+      "-cmdenv", "(n=v) Pass env.var to streaming commands.", 'E');  
+  
   public StreamJob(String[] argv, boolean mayExit) {
   public StreamJob(String[] argv, boolean mayExit) {
+    setupOptions();
     argv_ = argv;
     argv_ = argv;
     mayExit_ = mayExit;
     mayExit_ = mayExit;
   }
   }
@@ -119,15 +152,6 @@ public class StreamJob {
     redCmd_ = unqualifyIfLocalPath(redCmd_);
     redCmd_ = unqualifyIfLocalPath(redCmd_);
   }
   }
 
 
-  String[] parseNameEqValue(String neqv) {
-    String[] nv = neqv.split("=", 2);
-    if (nv.length < 2) {
-      fail("Invalid name=value spec: " + neqv);
-    }
-    msg("Recording name=value: name=" + nv[0] + " value=" + nv[1]);
-    return nv;
-  }
-
   String unqualifyIfLocalPath(String cmd) throws IOException {
   String unqualifyIfLocalPath(String cmd) throws IOException {
     if (cmd == null) {
     if (cmd == null) {
       //
       //
@@ -168,109 +192,85 @@ public class StreamJob {
     return new File(getHadoopClientHome() + "/conf", hadoopAliasConf_).getAbsolutePath();
     return new File(getHadoopClientHome() + "/conf", hadoopAliasConf_).getAbsolutePath();
   }
   }
 
 
-  /**
-   * This method parses the command line args
-   * to a hadoop streaming job
-   */
-  void parseArgv() {
-    if (argv_.length == 0) {
-      exitUsage(false);
-    }
-    int i = 0;
-    while (i < argv_.length) {
-      String s;
-      if (argv_[i].equals("-verbose")) {
-        verbose_ = true;
-      } else if (argv_[i].equals("-info")) {
-        detailedUsage_ = true;
-      } else if (argv_[i].equals("-debug")) {
-        debug_++;
-      } else if ((s = optionArg(argv_, i, "-input", false)) != null) {
-        i++;
-        inputSpecs_.add(s);
-      } else if (argv_[i].equals("-inputtagged")) {
-        inputTagged_ = true;
-      } else if ((s = optionArg(argv_, i, "-output", output_ != null)) != null) {
-        i++;
-        output_ = s;
-      } else if ((s = optionArg(argv_, i, "-mapsideoutput", mapsideoutURI_ != null)) != null) {
-        i++;
-        mapsideoutURI_ = s;
-      } else if ((s = optionArg(argv_, i, "-mapper", mapCmd_ != null)) != null) {
-        i++;
-        mapCmd_ = s;
-      } else if ((s = optionArg(argv_, i, "-combiner", comCmd_ != null)) != null) {
-        i++;
-        comCmd_ = s;
-      } else if ((s = optionArg(argv_, i, "-reducer", redCmd_ != null)) != null) {
-        i++;
-        redCmd_ = s;
-      } else if ((s = optionArg(argv_, i, "-file", false)) != null) {
-        i++;
-        packageFiles_.add(s);
-      } else if ((s = optionArg(argv_, i, "-cluster", cluster_ != null)) != null) {
-        i++;
-        cluster_ = s;
-      } else if ((s = optionArg(argv_, i, "-config", false)) != null) {
-        i++;
-        configPath_.add(s);
-      } else if ((s = optionArg(argv_, i, "-dfs", false)) != null) {
-        i++;
-        userJobConfProps_.put("fs.default.name", s);
-      } else if ((s = optionArg(argv_, i, "-jt", false)) != null) {
-        i++;
-        userJobConfProps_.put("mapred.job.tracker", s);
-      } else if ((s = optionArg(argv_, i, "-jobconf", false)) != null) {
-        i++;
-        String[] nv = parseNameEqValue(s);
-        userJobConfProps_.put(nv[0], nv[1]);
-      } else if ((s = optionArg(argv_, i, "-cmdenv", false)) != null) {
-        i++;
-        parseNameEqValue(s);
-        if (addTaskEnvironment_.length() > 0) {
-          addTaskEnvironment_ += " ";
-        }
-        addTaskEnvironment_ += s;
-      } else if ((s = optionArg(argv_, i, "-inputreader", inReaderSpec_ != null)) != null) {
-        i++;
-        inReaderSpec_ = s;
-      } else if((s = optionArg(argv_, i, "-cacheArchive", false)) != null) {
-    	  i++;
-    	  if (cacheArchives == null)
-    		  cacheArchives = s;
-    	  else
-    		  cacheArchives = cacheArchives + "," + s;    	  
-      } else if((s = optionArg(argv_, i, "-cacheFile", false)) != null) {
-        i++;
-        System.out.println(" the val of s is " + s);
-        if (cacheFiles == null)
-          cacheFiles = s;
-        else
-          cacheFiles = cacheFiles + "," + s;
-        System.out.println(" the val of cachefiles is " + cacheFiles);
-      }
-      else {
-        System.err.println("Unexpected argument: " + argv_[i]);
+  void parseArgv(){
+    CommandLine cmdLine = null ; 
+    try{
+       cmdLine = parser.parse(argv_);
+    }catch(Exception oe){
+      LOG.error(oe.getMessage());
+      if (detailedUsage_) {
+        exitUsage(true);
+      } else {
         exitUsage(false);
         exitUsage(false);
       }
       }
-      i++;
-    }
-    if (detailedUsage_) {
-      exitUsage(true);
     }
     }
-  }
+    
+    if( cmdLine != null ){
+      verbose_ =  cmdLine.hasOption("-verbose") ;
+      detailedUsage_ = cmdLine.hasOption("-info") ;
+      debug_ = cmdLine.hasOption("-debug")? debug_ + 1 : debug_ ;
+      inputTagged_ = cmdLine.hasOption("-inputtagged"); 
+      
+      inputSpecs_.addAll(cmdLine.getValues("-input"));
+      output_ = (String) cmdLine.getValue("-output"); 
+      mapsideoutURI_ = (String) cmdLine.getValue("-mapsideoutput");
+      
+      mapCmd_ = (String)cmdLine.getValue("-mapper"); 
+      comCmd_ = (String)cmdLine.getValue("-combiner"); 
+      redCmd_ = (String)cmdLine.getValue("-reducer"); 
+      
+      packageFiles_.addAll(cmdLine.getValues("-file"));
+      
+      cluster_ = (String)cmdLine.getValue("-cluster");
+      
+      configPath_.addAll(cmdLine.getValues("-config"));
+      
+      String fsName = (String)cmdLine.getValue("-dfs");
+      if( null != fsName ){
+        userJobConfProps_.put("fs.default.name", fsName);        
+      }
+      
+      String jt = (String)cmdLine.getValue("mapred.job.tracker");
+      if( null != jt ){
+        userJobConfProps_.put("fs.default.name", jt);        
+      }
+      
+      inReaderSpec_ = (String)cmdLine.getValue("-inputreader"); 
+      
+      List<String> car = cmdLine.getValues("-cacheArchive"); 
+      if( null != car ){
+        for( String s : car ){
+          cacheArchives = (cacheArchives == null)?s :cacheArchives + "," + s;  
+        }
+      }
 
 
-  String optionArg(String[] args, int index, String arg, boolean argSet) {
-    if (index >= args.length || !args[index].equals(arg)) {
-      return null;
-    }
-    if (argSet) {
-      throw new IllegalArgumentException("Can only have one " + arg + " option");
-    }
-    if (index >= args.length - 1) {
-      throw new IllegalArgumentException("Expected argument after option " + args[index]);
+      List<String> caf = cmdLine.getValues("-cacheFile"); 
+      if( null != caf ){
+        for( String s : caf ){
+          cacheFiles = (cacheFiles == null)?s :cacheFiles + "," + s;  
+        }
+      }
+      
+      List<String> jobConfArgs = (List<String>)cmdLine.getValue(jobconf); 
+      List<String> envArgs = (List<String>)cmdLine.getValue(cmdenv); 
+      
+      if( null != jobConfArgs ){
+        for( String s : jobConfArgs){
+          String []parts = s.split("="); 
+          userJobConfProps_.put(parts[0], parts[1]);
+        }
+      }
+      if( null != envArgs ){
+        for( String s : envArgs ){
+          if (addTaskEnvironment_.length() > 0) {
+            addTaskEnvironment_ += " ";
+          }
+          addTaskEnvironment_ += s;
+        }
+      }
+    }else if (detailedUsage_) {
+      exitUsage(true);
     }
     }
-    return args[index + 1];
   }
   }
 
 
   protected void msg(String msg) {
   protected void msg(String msg) {
@@ -278,32 +278,173 @@ public class StreamJob {
       System.out.println("STREAM: " + msg);
       System.out.println("STREAM: " + msg);
     }
     }
   }
   }
+  
+  private Option createOption(String name, String desc, 
+      String argName, int max, boolean required){
+    Argument argument = argBuilder.
+                      withName(argName).
+                      withMinimum(1).
+                      withMaximum(max).
+                      create();
+    return builder.
+              withLongName(name).
+              withArgument(argument).
+              withDescription(desc).
+              withRequired(required).
+              create();
+  }
+  
+  private Option createOption(String name, String desc, 
+      String argName, int max, boolean required, Validator validator){
+    
+    Argument argument = argBuilder.
+                              withName(argName).
+                              withMinimum(1).
+                              withMaximum(max).
+                              withValidator(validator).
+                              create() ;
+   
+    return builder.
+              withLongName(name).
+              withArgument(argument).
+              withDescription(desc).
+              withRequired(required).
+              create();
+  }  
+  
+  private Option createBoolOption(String name, String desc){
+    return builder.withLongName(name).withDescription(desc).create();
+  }
+  
+  private void setupOptions(){
+
+    final Validator fileValidator = new Validator(){
+      public void validate(final List values) throws InvalidArgumentException {
+        // Note : This code doesnt belong here, it should be changed to 
+        // an can exec check in java 6
+        for (String file : (List<String>)values) {
+          File f = new File(file);  
+          if ( ! f.exists() ) {
+            throw new InvalidArgumentException("Argument : " + 
+                f.getAbsolutePath() + " doesn't exist."); 
+          }
+          if ( ! f.isFile() ) {
+            throw new InvalidArgumentException("Argument : " + 
+                f.getAbsolutePath() + " is not a file."); 
+          }
+          if ( ! f.canRead() ) {
+            throw new InvalidArgumentException("Argument : " + 
+                f.getAbsolutePath() + " is not accessible"); 
+          }
+        }
+      }      
+    }; 
+
+    // Note: not extending CLI2's FileValidator, that overwrites 
+    // the String arg into File and causes ClassCastException 
+    // in inheritance tree. 
+    final Validator execValidator = new Validator(){
+      public void validate(final List values) throws InvalidArgumentException {
+        // Note : This code doesnt belong here, it should be changed to 
+        // an can exec check in java 6
+        for (String file : (List<String>)values) {
+          try{
+            Runtime.getRuntime().exec("chmod 0777 " + (new File(file)).getAbsolutePath());
+          }catch(IOException ioe){
+            // ignore 
+          }
+        }
+        fileValidator.validate(values);
+    }      
+    }; 
+
+    Option input   = createOption("input", 
+        "DFS input file(s) for the Map step", 
+        "path", 
+        Integer.MAX_VALUE, 
+        true);  
+    
+    Option output  = createOption("output", 
+        "DFS output directory for the Reduce step", 
+        "path", 1, true); 
+    Option mapper  = createOption("mapper", 
+        "The streaming command to run", "cmd", 1, true);
+    Option combiner = createOption("combiner", 
+        "The streaming command to run", "cmd",1, false);
+    // reducer could be NONE 
+    Option reducer = createOption("reducer", 
+        "The streaming command to run", "cmd", 1, true); 
+    Option file = createOption("file", 
+        "File/dir to be shipped in the Job jar file", 
+        "file", Integer.MAX_VALUE, false, execValidator); 
+    Option dfs = createOption("dfs", 
+        "Optional. Override DFS configuration", "<h:p>|local", 1, false); 
+    Option jt = createOption("jt", 
+        "Optional. Override JobTracker configuration", "<h:p>|local",1, false);
+    Option inputreader = createOption("inputreader", 
+        "Optional.", "spec",1, false );
+    Option cacheFile = createOption("cacheFile", 
+        "File name URI", "fileNameURI", 1, false);
+    Option cacheArchive = createOption("cacheArchive", 
+        "File name URI", "fileNameURI",1, false);
+    
+    // boolean properties
+    
+    Option verbose = createBoolOption("verbose", "print verbose output"); 
+    Option info = createBoolOption("info", "print verbose output"); 
+    Option help = createBoolOption("help", "print this help message"); 
+    Option debug = createBoolOption("debug", "print debug output"); 
+    Option inputtagged = createBoolOption("inputtagged", "inputtagged"); 
+    
+    allOptions = new GroupBuilder().
+                          withOption(input).
+                          withOption(output).
+                          withOption(mapper).
+                          withOption(combiner).
+                          withOption(reducer).
+                          withOption(file).
+                          withOption(dfs).
+                          withOption(jt).
+                          withOption(inputreader).
+                          withOption(jobconf).
+                          withOption(cmdenv).
+                          withOption(cacheFile).
+                          withOption(cacheArchive).
+                          withOption(verbose).
+                          withOption(info).
+                          withOption(debug).
+                          withOption(inputtagged).
+                          withOption(help).
+                          create();
+    parser.setGroup(allOptions);
+    
+  }
 
 
   public void exitUsage(boolean detailed) {
   public void exitUsage(boolean detailed) {
     //         1         2         3         4         5         6         7
     //         1         2         3         4         5         6         7
     //1234567890123456789012345678901234567890123456789012345678901234567890123456789
     //1234567890123456789012345678901234567890123456789012345678901234567890123456789
-    System.out.println("Usage: $HADOOP_HOME/bin/hadoop [--config dir] jar \\");
-    System.out.println("          $HADOOP_HOME/hadoop-streaming.jar [options]");
-    System.out.println("Options:");
-    System.out.println("  -input    <path>     DFS input file(s) for the Map step");
-    System.out.println("  -output   <path>     DFS output directory for the Reduce step");
-    System.out.println("  -mapper   <cmd>      The streaming command to run");
-    System.out.println("  -combiner <cmd>      The streaming command to run");
-    System.out.println("  -reducer  <cmd>      The streaming command to run");
-    System.out.println("  -file     <file>     File/dir to be shipped in the Job jar file");
-    //Only advertise the standard way: [--config dir] in our launcher 
-    //System.out.println("  -cluster  <name>     Default uses hadoop-default.xml and hadoop-site.xml");
-    //System.out.println("  -config   <file>     Optional. One or more paths to xml config files");
-    System.out.println("  -dfs    <h:p>|local  Optional. Override DFS configuration");
-    System.out.println("  -jt     <h:p>|local  Optional. Override JobTracker configuration");
-    System.out.println("  -inputreader <spec>  Optional.");
-    System.out.println("  -jobconf  <n>=<v>    Optional. Add or override a JobConf property");
-    System.out.println("  -cmdenv   <n>=<v>    Optional. Pass env.var to streaming commands");
-    System.out.println("  -cacheFile fileNameURI");
-    System.out.println("  -cacheArchive fileNameURI");
-    System.out.println("  -verbose");
-    System.out.println();
     if (!detailed) {
     if (!detailed) {
+      System.out.println("Usage: $HADOOP_HOME/bin/hadoop [--config dir] jar \\");
+      System.out.println("          $HADOOP_HOME/hadoop-streaming.jar [options]");
+      System.out.println("Options:");
+      System.out.println("  -input    <path>     DFS input file(s) for the Map step");
+      System.out.println("  -output   <path>     DFS output directory for the Reduce step");
+      System.out.println("  -mapper   <cmd>      The streaming command to run");
+      System.out.println("  -combiner <cmd>      The streaming command to run");
+      System.out.println("  -reducer  <cmd>      The streaming command to run");
+      System.out.println("  -file     <file>     File/dir to be shipped in the Job jar file");
+      //Only advertise the standard way: [--config dir] in our launcher 
+      //System.out.println("  -cluster  <name>     Default uses hadoop-default.xml and hadoop-site.xml");
+      //System.out.println("  -config   <file>     Optional. One or more paths to xml config files");
+      System.out.println("  -dfs    <h:p>|local  Optional. Override DFS configuration");
+      System.out.println("  -jt     <h:p>|local  Optional. Override JobTracker configuration");
+      System.out.println("  -inputreader <spec>  Optional.");
+      System.out.println("  -jobconf  <n>=<v>    Optional. Add or override a JobConf property");
+      System.out.println("  -cmdenv   <n>=<v>    Optional. Pass env.var to streaming commands");
+      System.out.println("  -cacheFile fileNameURI");
+      System.out.println("  -cacheArchive fileNameURI");
+      System.out.println("  -verbose");
+      System.out.println();      
       System.out.println("For more details about these options:");
       System.out.println("For more details about these options:");
       System.out.println("Use $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar -info");
       System.out.println("Use $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar -info");
       fail("");
       fail("");
@@ -810,6 +951,56 @@ public class StreamJob {
       jc_.close();
       jc_.close();
     }
     }
   }
   }
+  /** Support -jobconf x=y x1=y1 type options **/
+  class MultiPropertyOption extends PropertyOption{
+    private String optionString ; 
+    MultiPropertyOption(){
+      super(); 
+    }
+    
+    MultiPropertyOption(final String optionString,
+        final String description,
+        final int id){
+      super(optionString, description, id) ; 
+      this.optionString = optionString;
+    }
+
+    public boolean canProcess(final WriteableCommandLine commandLine,
+        final String argument) {
+        boolean ret = (argument != null) && argument.startsWith(optionString);
+        
+        return ret;
+    }    
+    public void process(final WriteableCommandLine commandLine,
+        final ListIterator arguments) throws OptionException {
+      final String arg = (String) arguments.next();
+
+      if (!canProcess(commandLine, arg)) {
+          throw new OptionException(this, 
+              ResourceConstants.UNEXPECTED_TOKEN, arg);
+      }
+      
+      ArrayList properties = new ArrayList(); 
+      String next = "" ; 
+      while( arguments.hasNext()){
+        next = (String) arguments.next();
+        if( ! next.startsWith("-") ){
+          properties.add(next);
+        }else{
+          arguments.previous();
+          break; 
+        }
+      } 
+
+      // add to any existing values ( support specifying args multiple times)
+      List<String> oldVal = (List<String>)commandLine.getValue(this) ; 
+      if( oldVal == null ){
+        commandLine.addValue(this, properties);
+      }else{
+        oldVal.addAll(properties); 
+      }
+    }
+  }
 
 
   protected boolean mayExit_;
   protected boolean mayExit_;
   protected String[] argv_;
   protected String[] argv_;