Procházet zdrojové kódy

merge -r 411936:413147 from trunk, preparing for 0.3.2 release

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/branches/branch-0.3@413169 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting před 19 roky
rodič
revize
2e2c31a0e0
25 změnil soubory, kde provedl 775 přidání a 218 odebrání
  1. 33 0
      CHANGES.txt
  2. 3 1
      bin/hadoop
  3. 1 1
      build.xml
  4. 17 7
      conf/log4j.properties
  5. 25 8
      site/index.html
  6. 31 9
      site/index.pdf
  7. 23 2
      src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java
  8. 56 13
      src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
  9. 0 1
      src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
  10. 0 1
      src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
  11. 44 27
      src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
  12. 15 9
      src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
  13. 128 52
      src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
  14. 1 1
      src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
  15. 18 2
      src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java
  16. 246 25
      src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
  17. 2 2
      src/java/org/apache/hadoop/dfs/DFSClient.java
  18. 38 14
      src/java/org/apache/hadoop/dfs/DataNode.java
  19. 56 32
      src/java/org/apache/hadoop/dfs/FSDirectory.java
  20. 4 2
      src/java/org/apache/hadoop/dfs/FSNamesystem.java
  21. 2 1
      src/java/org/apache/hadoop/fs/FileSystem.java
  22. 10 3
      src/java/org/apache/hadoop/fs/LocalFileSystem.java
  23. 16 0
      src/site/src/documentation/content/xdocs/index.xml
  24. 4 4
      src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java
  25. 2 1
      src/test/org/apache/hadoop/test/AllTestDriver.java

+ 33 - 0
CHANGES.txt

@@ -1,6 +1,39 @@
 Hadoop Change Log
 
 
+Release 0.3.2 - 2006-06-09
+
+ 1. HADOOP-275.  Update the streaming contrib module to use log4j for
+    its logging.  (Michel Tourn via cutting)
+
+ 2. HADOOP-279.  Provide defaults for log4j logging parameters, so
+    that things still work reasonably when Hadoop-specific system
+    properties are not provided.  (omalley via cutting)
+
+ 3. HADOOP-280.  Fix a typo in AllTestDriver which caused the wrong
+    test to be run when "DistributedFSCheck" was specified.
+   (Konstantin Shvachko via cutting)
+
+ 4. HADOOP-240.  DFS's mkdirs() implementation no longer logs a warning
+    when the directory already exists. (Hairong Kuang via cutting)
+
+ 5. HADOOP-285.  Fix DFS datanodes to be able to re-join the cluster
+    after the connection to the namenode is lost.  (omalley via cutting)
+
+ 6. HADOOP-277.  Fix a race condition when creating directories.
+   (Sameer Paranjpye via cutting)
+
+ 7. HADOOP-289.  Improved exception handling in DFS datanode.
+    (Konstantin Shvachko via cutting)
+
+ 8. HADOOP-292.  Fix client-side logging to go to standard error
+    rather than standard output, so that it can be distinguished from
+    application output.  (omalley via cutting)
+
+ 9. HADOOP-294.  Fixed bug where conditions for retrying after errors
+    in the DFS client were reversed.  (omalley via cutting)
+
+
 Release 0.3.1 - 2006-06-05
 
  1. HADOOP-272.  Fix a bug in bin/hadoop setting log

+ 3 - 1
bin/hadoop

@@ -13,6 +13,8 @@
 #
 #   HADOOP_CONF_DIR  Alternate conf dir. Default is ${HADOOP_HOME}/conf.
 #
+#   HADOOP_ROOT_LOGGER The root appender. Default is INFO,console
+#
 
 # resolve links - $0 may be a softlink
 THIS="$0"
@@ -162,7 +164,7 @@ HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.log.dir=$HADOOP_LOG_DIR"
 HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.log.file=$HADOOP_LOGFILE"
 HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.home.dir=$HADOOP_HOME"
 HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.id.str=$HADOOP_IDENT_STRING"
-HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.root.logger=${HADOOP_ROOT_LOGGER:-INFO,stdout}"
+HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.root.logger=${HADOOP_ROOT_LOGGER:-INFO,console}"
 
 # run it
 exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS -classpath "$CLASSPATH" $CLASS "$@"

+ 1 - 1
build.xml

@@ -9,7 +9,7 @@
  
   <property name="Name" value="Hadoop"/>
   <property name="name" value="hadoop"/>
-  <property name="version" value="0.3.2-dev"/>
+  <property name="version" value="0.3.3-dev"/>
   <property name="final.name" value="${name}-${version}"/>
   <property name="year" value="2006"/>
   <property name="libhdfs.version" value="1"/>

+ 17 - 7
conf/log4j.properties

@@ -1,10 +1,14 @@
-# RootLogger - DailyRollingFileAppender
+# Define some default values that can be overridden by system properties
+hadoop.root.logger=INFO,console
+hadoop.log.dir=.
+hadoop.log.file=hadoop.log
+
+# Define the root logger to the system property "hadoop.root.logger".
 log4j.rootLogger=${hadoop.root.logger}
 
 # Logging Threshold
 log4j.threshhold=ALL
 
-
 #
 # Daily Rolling File Appender
 #
@@ -26,13 +30,14 @@ log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
 
 
 #
-# stdout
-# Add *stdout* to rootlogger above if you want to use this 
+# console
+# Add "console" to rootlogger above if you want to use this 
 #
 
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
 
 #
 # Rolling File Appender
@@ -49,3 +54,8 @@ log4j.appender.stdout.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %
 #log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
 #log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
 
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
+#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
+

+ 25 - 8
site/index.html

@@ -122,6 +122,12 @@ document.write("<text>Last Published:</text> " + document.lastModified);
 <a href="#News">News</a>
 <ul class="minitoc">
 <li>
+<a href="#9+June%2C+2006%3A+release+0.3.2+available">9 June, 2006: release 0.3.2 available</a>
+</li>
+<li>
+<a href="#8+June%2C+2006%3A+FAQ+added+to+Wiki">8 June, 2006: FAQ added to Wiki</a>
+</li>
+<li>
 <a href="#5+June%2C+2006%3A+release+0.3.1+available">5 June, 2006: release 0.3.1 available</a>
 </li>
 <li>
@@ -154,26 +160,37 @@ document.write("<text>Last Published:</text> " + document.lastModified);
 <a name="N1000C"></a><a name="News"></a>
 <h2 class="h3">News</h2>
 <div class="section">
-<a name="N10012"></a><a name="5+June%2C+2006%3A+release+0.3.1+available"></a>
+<a name="N10012"></a><a name="9+June%2C+2006%3A+release+0.3.2+available"></a>
+<h3 class="h4">9 June, 2006: release 0.3.2 available</h3>
+<p>This is a bugfix release.  For details see the <a href="http://tinyurl.com/k9g5c">change log</a>. The release can
+      be obtained from <a href="http://www.apache.org/dyn/closer.cgi/lucene/hadoop/"> a
+      nearby mirror</a>.
+      </p>
+<a name="N10024"></a><a name="8+June%2C+2006%3A+FAQ+added+to+Wiki"></a>
+<h3 class="h4">8 June, 2006: FAQ added to Wiki</h3>
+<p>Hadoop now has a <a href="http://wiki.apache.org/lucene-hadoop/FAQ">FAQ</a>.  Please
+      help make this more complete!
+      </p>
+<a name="N10032"></a><a name="5+June%2C+2006%3A+release+0.3.1+available"></a>
 <h3 class="h4">5 June, 2006: release 0.3.1 available</h3>
 <p>This is a bugfix release.  For details see the <a href="http://tinyurl.com/l6on4">change log</a>. The release can
       be obtained from <a href="http://www.apache.org/dyn/closer.cgi/lucene/hadoop/"> a
       nearby mirror</a>.
       </p>
-<a name="N10024"></a><a name="2+June%2C+2006%3A+release+0.3.0+available"></a>
+<a name="N10044"></a><a name="2+June%2C+2006%3A+release+0.3.0+available"></a>
 <h3 class="h4">2 June, 2006: release 0.3.0 available</h3>
 <p>This includes many fixes, improving performance, scalability
       and reliability and adding new features.  For details see the <a href="http://tinyurl.com/rq3f7">change log</a>. The release can
       be obtained from <a href="http://www.apache.org/dyn/closer.cgi/lucene/hadoop/"> a
       nearby mirror</a>.
       </p>
-<a name="N10036"></a><a name="12+May%2C+2006%3A+release+0.2.1+available"></a>
+<a name="N10056"></a><a name="12+May%2C+2006%3A+release+0.2.1+available"></a>
 <h3 class="h4">12 May, 2006: release 0.2.1 available</h3>
 <p>This fixes a few bugs in release 0.2.0, listed in the <a href="http://tinyurl.com/rnnvz">change log</a>. The
       release can be obtained from <a href="http://www.apache.org/dyn/closer.cgi/lucene/hadoop/"> a
       nearby mirror</a>.
       </p>
-<a name="N10048"></a><a name="5+May%2C+2006%3A+release+0.2.0+available"></a>
+<a name="N10068"></a><a name="5+May%2C+2006%3A+release+0.2.0+available"></a>
 <h3 class="h4">5 May, 2006: release 0.2.0 available</h3>
 <p>We are now aiming for monthly releases.  There have been many
       bug fixes and improvements in the past month.  MapReduce and DFS
@@ -182,24 +199,24 @@ document.write("<text>Last Published:</text> " + document.lastModified);
       details. The release can be obtained from <a href="http://www.apache.org/dyn/closer.cgi/lucene/hadoop/"> a
       nearby mirror</a>.
       </p>
-<a name="N1005A"></a><a name="2+April%2C+2006%3A+release+0.1.0+available"></a>
+<a name="N1007A"></a><a name="2+April%2C+2006%3A+release+0.1.0+available"></a>
 <h3 class="h4">2 April, 2006: release 0.1.0 available</h3>
 <p>This is the first Hadoop release.  The release is available
       <a href="http://www.apache.org/dyn/closer.cgi/lucene/hadoop/">
       here</a>.</p>
-<a name="N10068"></a><a name="6+February%2C+2006%3A+nightly+builds"></a>
+<a name="N10088"></a><a name="6+February%2C+2006%3A+nightly+builds"></a>
 <h3 class="h4">6 February, 2006: nightly builds</h3>
 <p>Hadoop now has nightly builds.  This automatically creates a
       <a href="http://cvs.apache.org/dist/lucene/hadoop/nightly/">downloadable version of Hadoop every
       night</a>.  All unit tests must pass, or a message is sent to
       the developers mailing list and no new version is created.  This
       also updates the <a href="docs/api/">javadoc</a>.</p>
-<a name="N1007A"></a><a name="3+February%2C+2006%3A+Hadoop+code+moved+out+of+Nutch"></a>
+<a name="N1009A"></a><a name="3+February%2C+2006%3A+Hadoop+code+moved+out+of+Nutch"></a>
 <h3 class="h4">3 February, 2006: Hadoop code moved out of Nutch</h3>
 <p>The Hadoop code has now been moved into its own Subversion
       tree, renamed into packages under <span class="codefrag">org.apache.hadoop</span>.
       All unit tests pass, but little else has yet been tested.</p>
-<a name="N10087"></a><a name="30+March%2C+2006%3A+Hadoop+project+approved"></a>
+<a name="N100A7"></a><a name="30+March%2C+2006%3A+Hadoop+project+approved"></a>
 <h3 class="h4">30 March, 2006: Hadoop project approved</h3>
 <p>The Lucene PMC has elected to split the Nutch MapReduce and
       distributed filesytem code into a new project named Hadoop.</p>

Rozdílová data souboru nebyla zobrazena, protože soubor je příliš velký
+ 31 - 9
site/index.pdf


+ 23 - 2
src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java

@@ -17,8 +17,12 @@
 package org.apache.hadoop.streaming;
 
 import java.io.*;
+import java.net.InetAddress;
 import java.util.*;
 
+/*
+ * If we move to Java 1.5, we can get rid of this class and just use System.getenv
+ */
 public class Environment extends Properties
 {
    public Environment()
@@ -26,13 +30,15 @@ public class Environment extends Properties
    {
       // Extend this code to fit all operating
       // environments that you expect to run in
-
       String command = null;
       String OS = System.getProperty("os.name");
+      String lowerOs = OS.toLowerCase();
       if (OS.equals("Windows NT")) {
          command = "cmd /C set";
       } else if (OS.indexOf("ix") > -1 || OS.indexOf("inux") > -1) {
          command = "env";
+      } else if(lowerOs.startsWith("mac os x")) {
+         command = "env";
       } else {
          // Add others here
       }
@@ -83,4 +89,19 @@ public class Environment extends Properties
      }     
      return arr;
    }
-} 
+   
+   public String getHost()
+   {
+     String host = getProperty("HOST");
+     if(host == null) {
+       // HOST isn't always in the environment
+       try {
+         host = InetAddress.getLocalHost().getHostName();
+       } catch(IOException io) {
+         io.printStackTrace();
+       }
+     }
+     return host;
+   }
+   
+} 

+ 56 - 13
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java

@@ -22,9 +22,12 @@ import java.io.IOException;
 import java.util.Date;
 import java.util.Map;
 import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Properties;
 import java.util.regex.*;
 
+import org.apache.commons.logging.*;
+
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.Reducer;
@@ -43,6 +46,8 @@ import org.apache.hadoop.io.LongWritable;
  */
 public abstract class PipeMapRed {
 
+  protected static final Log LOG = LogFactory.getLog(PipeMapRed.class.getName());  
+  
   /** The command to be spawned as a subprocess.
    * Mapper/Reducer operations will delegate to it
    */
@@ -53,9 +58,9 @@ public abstract class PipeMapRed {
   
 
   /**
-   * @returns ow many TABS before the end of the key part 
+   * @returns how many TABS before the end of the key part 
    * usually: 1 or "ALL"
-   * used both for tool output of both Map and Reduce
+   * used for tool output of both Map and Reduce
    * configured via tool's argv: splitKeyVal=ALL or 1..
    * although it is interpreted here, not by tool
    */
@@ -91,20 +96,57 @@ public abstract class PipeMapRed {
     return cols;
   }
   
-  String[] splitArgs(String args)
+  final static int OUTSIDE = 1;
+  final static int SINGLEQ = 2;
+  final static int DOUBLEQ = 3;
+  
+  static String[] splitArgs(String args)
   {
-    String regex = "\\s(?=(?:[^\"]*\"[^\"]*\")*[^\"]*\\z)";
-    String[] split = args.split(regex);
-    // remove outer quotes
-    for(int i=0; i<split.length; i++) {
-        String si = split[i].trim();
-        if(si.charAt(0)=='"' && si.charAt(si.length()-1)=='"') {
-            si = si.substring(1, si.length()-1);
-            split[i] = si;
+    ArrayList argList = new ArrayList();
+    char[] ch = args.toCharArray();
+    int clen = ch.length;
+    int state = OUTSIDE;
+    int argstart = 0;
+    for(int c=0; c<=clen; c++) {
+        boolean last = (c==clen);
+        int lastState = state;
+        boolean endToken = false;
+        if(!last) {
+          if(ch[c]=='\'') {
+            if(state == OUTSIDE) {
+              state = SINGLEQ;
+            } else if(state == SINGLEQ) {
+              state = OUTSIDE;  
+            }
+            endToken = (state != lastState);
+          } else if(ch[c]=='"') {
+            if(state == OUTSIDE) {
+              state = DOUBLEQ;
+            } else if(state == DOUBLEQ) {
+              state = OUTSIDE;  
+            }          
+            endToken = (state != lastState);
+          } else if(ch[c]==' ') {
+            if(state == OUTSIDE) {
+              endToken = true;
+            }            
+          }
+        }
+        if(last || endToken) {
+          if(c == argstart) {
+            // unquoted space
+          } else {
+            String a;
+            a = args.substring(argstart, c); 
+            argList.add(a);
+          }
+          argstart = c+1;
+          lastState = state;
         }
     }
-    return split;
+    return (String[])argList.toArray(new String[0]);
   }
+
   public void configure(JobConf job)
   {
 
@@ -132,7 +174,7 @@ public abstract class PipeMapRed {
 	  // A  relative path should match in the unjarred Job data
       // In this case, force an absolute path to make sure exec finds it.
       argvSplit[0] = new File(argvSplit[0]).getAbsolutePath();
-      log_.println("PipeMapRed exec " + Arrays.toString(argvSplit));
+      log_.println("PipeMapRed exec " + Arrays.asList(argvSplit));
             
       
       Environment childEnv = (Environment)StreamUtil.env().clone();
@@ -440,4 +482,5 @@ public abstract class PipeMapRed {
       }
     }    
   }
+  
 }

+ 0 - 1
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java

@@ -25,7 +25,6 @@ import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
 
-import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableComparable;

+ 0 - 1
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java

@@ -25,7 +25,6 @@ import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
 
-import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableComparable;

+ 44 - 27
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java

@@ -20,14 +20,14 @@ import java.io.*;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.LogFormatter;
+import org.apache.commons.logging.*;
+
 
 /** 
  * Shared functionality for hadoopStreaming formats.
@@ -40,7 +40,10 @@ import org.apache.hadoop.util.LogFormatter;
 public abstract class StreamBaseRecordReader implements RecordReader
 {
     
-  protected static final Logger LOG = LogFormatter.getLogger(StreamBaseRecordReader.class.getName());
+  protected static final Log LOG = LogFactory.getLog(StreamBaseRecordReader.class.getName());
+  
+  // custom JobConf properties for this class are prefixed with this namespace
+  final String CONF_NS = "stream.recordreader.";
 
   public StreamBaseRecordReader(
     FSDataInputStream in, long start, long end, 
@@ -49,15 +52,45 @@ public abstract class StreamBaseRecordReader implements RecordReader
   {
     in_ = in;
     start_ = start;
-    splitName_ = splitName;
     end_ = end;
+    length_ = end_ - start_;
+    splitName_ = splitName;
     reporter_ = reporter;
     job_ = job;
+    
+    statusMaxRecordChars_ = job_.getInt(CONF_NS + "statuschars", 200);
+  }
+
+  /// RecordReader API
+  
+  /** Read a record. Implementation should call numRecStats at the end
+   */  
+  public abstract boolean next(Writable key, Writable value) throws IOException;
+
+  /** Returns the current position in the input. */
+  public synchronized long getPos() throws IOException 
+  { 
+    return in_.getPos(); 
+  }
+
+  /** Close this to future operations.*/
+  public synchronized void close() throws IOException 
+  { 
+    in_.close(); 
   }
+  
+  /// StreamBaseRecordReader API
 
-  /** Called once before the first call to next */
   public void init() throws IOException
   {
+    LOG.info("StreamBaseRecordReader.init: " +
+    " start_=" + start_ + " end_=" + end_ + " length_=" + length_ +
+    " start_ > in_.getPos() =" 
+        + (start_ > in_.getPos()) + " " + start_ 
+        + " > " + in_.getPos() );
+    if (start_ > in_.getPos()) {
+      in_.seek(start_);
+    }  
     seekNextRecordBoundary();
   }
   
@@ -66,17 +99,12 @@ public abstract class StreamBaseRecordReader implements RecordReader
    */
   public abstract void seekNextRecordBoundary() throws IOException;
   
-  
-  /** Read a record. Implementation should call numRecStats at the end
-   */  
-  public abstract boolean next(Writable key, Writable value) throws IOException;
-
-  
+    
   void numRecStats(CharSequence record) throws IOException
   {
     numRec_++;          
     if(numRec_ == nextStatusRec_) {
-      nextStatusRec_ +=100000;//*= 10;
+      nextStatusRec_ +=100;//*= 10;
       String status = getStatus(record);
       LOG.info(status);
       reporter_.setStatus(status);
@@ -91,10 +119,9 @@ public abstract class StreamBaseRecordReader implements RecordReader
       pos = getPos();
     } catch(IOException io) {
     }
-    final int M = 2000;
     String recStr;
-    if(record.length() > M) {
-    	recStr = record.subSequence(0, M) + "...";
+    if(record.length() > statusMaxRecordChars_) {
+        recStr = record.subSequence(0, statusMaxRecordChars_) + "...";
     } else {
     	recStr = record.toString();
     }
@@ -103,25 +130,15 @@ public abstract class StreamBaseRecordReader implements RecordReader
     return status;
   }
 
-  /** Returns the current position in the input. */
-  public synchronized long getPos() throws IOException 
-  { 
-    return in_.getPos(); 
-  }
-
-  /** Close this to future operations.*/
-  public synchronized void close() throws IOException 
-  { 
-    in_.close(); 
-  }
-
   FSDataInputStream in_;
   long start_;
   long end_;
+  long length_;
   String splitName_;
   Reporter reporter_;
   JobConf job_;
   int numRec_ = 0;
   int nextStatusRec_ = 1;
+  int statusMaxRecordChars_;
   
 }

+ 15 - 9
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java

@@ -23,6 +23,8 @@ import java.util.Arrays;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.commons.logging.*;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.PathFilter;
@@ -30,11 +32,8 @@ import org.apache.hadoop.fs.FSDataInputStream;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.UTF8;
 
 import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.LogFormatter;
-
 
 /** An input format that performs globbing on DFS paths and 
  * selects a RecordReader based on a JobConf property.
@@ -46,7 +45,8 @@ public class StreamInputFormat extends InputFormatBase
   // an InputFormat should be public with the synthetic public default constructor
   // JobTracker's JobInProgress will instantiate with clazz.newInstance() (and a custom ClassLoader)
   
-  protected static final Logger LOG = LogFormatter.getLogger(StreamInputFormat.class.getName());
+  protected static final Log LOG = LogFactory.getLog(StreamInputFormat.class.getName());
+  
   static {
     //LOG.setLevel(Level.FINE);
   }
@@ -59,7 +59,7 @@ public class StreamInputFormat extends InputFormatBase
     int dsup = globs.length;
     for(int d=0; d<dsup; d++) {
       String leafName = globs[d].getName();
-      LOG.fine("StreamInputFormat: globs[" + d + "] leafName = " + leafName);
+      LOG.info("StreamInputFormat: globs[" + d + "] leafName = " + leafName);
       Path[] paths; Path dir;
 	  PathFilter filter = new GlobFilter(fs, leafName);
 	  dir = new Path(globs[d].getParent().toString());
@@ -79,7 +79,13 @@ public class StreamInputFormat extends InputFormatBase
     }
     String globToRegexp(String glob)
 	{
-	  return glob.replaceAll("\\*", ".*");
+      String re = glob;
+      re = re.replaceAll("\\.", "\\\\.");
+      re = re.replaceAll("\\+", "\\\\+");
+	  re = re.replaceAll("\\*", ".*");
+      re = re.replaceAll("\\?", ".");
+      LOG.info("globToRegexp: |" + glob + "|  ->  |" + re + "|");
+      return re;
 	}
 
     public boolean accept(Path pathname)
@@ -88,7 +94,7 @@ public class StreamInputFormat extends InputFormatBase
       if(acc) {
       	acc = pat_.matcher(pathname.getName()).matches();
       }
-      LOG.finer("matches " + pat_ + ", " + pathname + " = " + acc);
+      LOG.info("matches " + pat_ + ", " + pathname + " = " + acc);
       return acc;
     }
 	
@@ -99,7 +105,7 @@ public class StreamInputFormat extends InputFormatBase
   public RecordReader getRecordReader(FileSystem fs, final FileSplit split,
                                       JobConf job, Reporter reporter)
     throws IOException {
-    LOG.finer("getRecordReader start.....");
+    LOG.info("getRecordReader start.....");
     reporter.setStatus(split.toString());
 
     final long start = split.getStart();
@@ -143,5 +149,5 @@ public class StreamInputFormat extends InputFormatBase
     
     return reader;
   }
-  
+
 }

+ 128 - 52
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java

@@ -23,6 +23,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 
+import org.apache.commons.logging.*;
+
 import org.apache.hadoop.conf.Configuration;
 
 import org.apache.hadoop.io.UTF8;
@@ -32,16 +34,14 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.RunningJob;
 
-import org.apache.hadoop.util.LogFormatter;
-
 /** All the client-side work happens here. 
  * (Jar packaging, MapRed job submission and monitoring)
  * @author Michel Tourn
  */
 public class StreamJob
 {
-  protected static final Logger LOG = LogFormatter.getLogger(StreamJob.class.getName());
-    
+  protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName());    
+  
   public StreamJob(String[] argv, boolean mayExit)
   {
     argv_ = argv;
@@ -72,9 +72,10 @@ public class StreamJob
   void preProcessArgs()
   {
     verbose_ = false;
+    addTaskEnvironment_ = "";
   }
   
-  void postProcessArgs()
+  void postProcessArgs() throws IOException
   {
     if(cluster_ == null) {
         // hadoop-default.xml is standard, hadoop-local.xml is not.
@@ -87,22 +88,35 @@ public class StreamJob
     if(output_ == null) {
         fail("Required argument: -output ");
     }
-    // careful with class names..
-    mapCmd_ = packageOrTrimNoShip(mapCmd_);
-    redCmd_ = packageOrTrimNoShip(redCmd_);
+    msg("addTaskEnvironment=" + addTaskEnvironment_);
+
+    Iterator it = packageFiles_.iterator();
+    while(it.hasNext()) {
+      File f = new File((String)it.next());    
+      if(f.isFile()) {
+        shippedCanonFiles_.add(f.getCanonicalPath());
+      }
+    }
+    msg("shippedCanonFiles_=" + shippedCanonFiles_);
     
-    // TBD -D format or sthg on cmdline. 
-    // Plus maybe a standard list originating on client or server    
-    addTaskEnvironment_ = ""; 
+    // careful with class names..
+    mapCmd_ = unqualifyIfLocalPath(mapCmd_);
+    redCmd_ = unqualifyIfLocalPath(redCmd_);    
+  }
+  
+  void validateNameEqValue(String neqv)
+  {
+    String[] nv = neqv.split("=", 2);
+    if(nv.length < 2) {
+        fail("Invalid name=value spec: " + neqv);
+    }
+    msg("Recording name=value: name=" + nv[0] + " value=" + nv[1]);
   }
   
-  String packageOrTrimNoShip(String cmd)
+  String unqualifyIfLocalPath(String cmd) throws IOException
   {
     if(cmd == null) {
       //    
-    } else if(cmd.startsWith(NOSHIP)) {
-      // don't package the file, but keep the abolute path
-      cmd = cmd.substring(NOSHIP.length());
     } else {
       String prog = cmd;
       String args = "";
@@ -111,18 +125,23 @@ public class StreamJob
         prog = cmd.substring(0, s);
         args = cmd.substring(s+1);
       }
-      packageFiles_.add(new File(prog).getAbsolutePath());
-      // Change path to simple filename. 
-      // That way when PipeMapRed calls Runtime.exec(), 
-      // it will look for the excutable in Task's working dir.
-      // And this is where TaskRunner unjars our job jar.
-      prog = new File(prog).getName();
-      if(args.length() > 0) {
-        cmd = prog + " " + args;
-      } else {
-        cmd = prog;
+      String progCanon = new File(prog).getCanonicalPath();
+      boolean shipped = shippedCanonFiles_.contains(progCanon);
+      msg("shipped: " + shipped + " " + progCanon);
+      if(shipped) {
+        // Change path to simple filename. 
+        // That way when PipeMapRed calls Runtime.exec(), 
+        // it will look for the excutable in Task's working dir.
+        // And this is where TaskRunner unjars our job jar.
+        prog = new File(prog).getName();
+        if(args.length() > 0) {
+          cmd = prog + " " + args;
+        } else {
+          cmd = prog;
+        }
       }
     }
+    msg("cmd=" + cmd);
     return cmd;
   }
   
@@ -130,17 +149,20 @@ public class StreamJob
   {
     return new File(getHadoopClientHome() + "/conf", hadoopAliasConf_).getAbsolutePath();
   }
+   
   
   void parseArgv()
   {
     if(argv_.length==0) {
-      exitUsage();
+      exitUsage(false);
     }
     int i=0; 
     while(i < argv_.length) {
       String s;
       if(argv_[i].equals("-verbose")) {
         verbose_ = true;      
+      } else if(argv_[i].equals("-info")) {
+        detailedUsage_ = true;      
       } else if(argv_[i].equals("-debug")) {
         debug_++;
       } else if((s = optionArg(argv_, i, "-input", false)) != null) {
@@ -155,7 +177,7 @@ public class StreamJob
       } else if((s = optionArg(argv_, i, "-reducer", redCmd_ != null)) != null) {
         i++;
         redCmd_ = s;
-      } else if((s = optionArg(argv_, i, "-files", false)) != null) {
+      } else if((s = optionArg(argv_, i, "-file", false)) != null) {
         i++;
         packageFiles_.add(s);
       } else if((s = optionArg(argv_, i, "-cluster", cluster_ != null)) != null) {
@@ -164,15 +186,35 @@ public class StreamJob
       } else if((s = optionArg(argv_, i, "-config", false)) != null) {
         i++;
         configPath_.add(s);
+      } else if((s = optionArg(argv_, i, "-dfs", false)) != null) {
+        i++;
+        userJobConfProps_.add("fs.default.name="+s);
+      } else if((s = optionArg(argv_, i, "-jt", false)) != null) {
+        i++;
+        userJobConfProps_.add("mapred.job.tracker="+s);
+      } else if((s = optionArg(argv_, i, "-jobconf", false)) != null) {
+        i++;
+        validateNameEqValue(s);
+        userJobConfProps_.add(s);
+      } else if((s = optionArg(argv_, i, "-cmdenv", false)) != null) {
+        i++;
+        validateNameEqValue(s);
+        if(addTaskEnvironment_.length() > 0) {
+            addTaskEnvironment_ += " ";
+        }
+        addTaskEnvironment_ += s;
       } else if((s = optionArg(argv_, i, "-inputreader", inReaderSpec_ != null)) != null) {
         i++;
         inReaderSpec_ = s;
       } else {
         System.err.println("Unexpected argument: " + argv_[i]);
-        exitUsage();
+        exitUsage(false);
       }
       i++;
     }
+    if(detailedUsage_) {
+        exitUsage(true);
+    }
   }
   
   String optionArg(String[] args, int index, String arg, boolean argSet)
@@ -196,22 +238,32 @@ public class StreamJob
     }
   }
 
-  public void exitUsage()
+  public void exitUsage(boolean detailed)
   {
                       //         1         2         3         4         5         6         7         
                       //1234567890123456789012345678901234567890123456789012345678901234567890123456789
-    System.out.println("Usage: bin/hadoop jar build/hadoop-streaming.jar [options]");
+    System.out.println("Usage: $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar [options]");
     System.out.println("Options:");
-    System.out.println("  -input   <path>     DFS input file(s) for the Map step");
-    System.out.println("  -output  <path>     DFS output directory for the Reduce step");
-    System.out.println("  -mapper  <cmd>      The streaming command to run");
-    System.out.println("  -reducer <cmd>      The streaming command to run");
-    System.out.println("  -files   <file>     Additional files to be shipped in the Job jar file");
-    System.out.println("  -cluster <name>     Default uses hadoop-default.xml and hadoop-site.xml");
-    System.out.println("  -config  <file>     Optional. One or more paths to xml config files");
-    System.out.println("  -inputreader <spec> Optional. See below");
+    System.out.println("  -input    <path>     DFS input file(s) for the Map step");
+    System.out.println("  -output   <path>     DFS output directory for the Reduce step");
+    System.out.println("  -mapper   <cmd>      The streaming command to run");
+    System.out.println("  -combiner <cmd>      Not implemented. But you can pipe the mapper output");
+    System.out.println("  -reducer  <cmd>      The streaming command to run");
+    System.out.println("  -file     <file>     File/dir to be shipped in the Job jar file");
+    System.out.println("  -cluster  <name>     Default uses hadoop-default.xml and hadoop-site.xml");
+    System.out.println("  -config   <file>     Optional. One or more paths to xml config files");
+    System.out.println("  -dfs      <h:p>      Optional. Override DFS configuration");
+    System.out.println("  -jt       <h:p>      Optional. Override JobTracker configuration");
+    System.out.println("  -inputreader <spec>  Optional.");
+    System.out.println("  -jobconf  <n>=<v>    Optional.");
+    System.out.println("  -cmdenv   <n>=<v>    Optional. Pass env.var to streaming commands");
     System.out.println("  -verbose");
     System.out.println();
+    if(!detailed) {    
+    System.out.println("For more details about these options:");
+    System.out.println("Use $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar -info");
+        fail("");
+    }
     System.out.println("In -input: globbing on <path> is supported and can have multiple -input");
     System.out.println("Default Map input format: a line is a record in UTF-8");
     System.out.println("  the key part ends at first TAB, the rest of the line is the value");
@@ -220,21 +272,34 @@ public class StreamJob
     System.out.println("  Ex: -inputreader 'StreamXmlRecordReader,begin=<doc>,end=</doc>'");
     System.out.println("Map output format, reduce input/output format:");
     System.out.println("  Format defined by what mapper command outputs. Line-oriented");
-    System.out.println("Mapper and Reducer <cmd> syntax: ");
-    System.out.println("  If the mapper or reducer programs are prefixed with " + NOSHIP + " then ");
-    System.out.println("  the paths are assumed to be valid absolute paths on the task tracker machines");
-    System.out.println("  and are NOT packaged with the Job jar file.");
+    System.out.println();
     System.out.println("Use -cluster <name> to switch between \"local\" Hadoop and one or more remote ");
     System.out.println("  Hadoop clusters. ");
     System.out.println("  The default is to use the normal hadoop-default.xml and hadoop-site.xml");
     System.out.println("  Else configuration will use $HADOOP_HOME/conf/hadoop-<name>.xml");
     System.out.println();
-    System.out.println("Example: hadoopStreaming -mapper \"noship:/usr/local/bin/perl5 filter.pl\"");
-    System.out.println("           -files /local/filter.pl -input \"/logs/0604*/*\" [...]");
+    System.out.println("To set the number of reduce tasks (num. of output files):");
+    System.out.println("  -jobconf mapred.reduce.tasks=10");
+    System.out.println("To change the local temp directory:");
+    System.out.println("  -jobconf dfs.data.dir=/tmp");
+    System.out.println("Additional local temp directories with -cluster local:");
+    System.out.println("  -jobconf mapred.local.dir=/tmp/local");
+    System.out.println("  -jobconf mapred.system.dir=/tmp/system");
+    System.out.println("  -jobconf mapred.temp.dir=/tmp/temp");
+    System.out.println("For more details about jobconf parameters see:");
+    System.out.println("  http://wiki.apache.org/lucene-hadoop/JobConfFile");
+    System.out.println("To set an environement variable in a streaming command:");
+    System.out.println("   -cmdenv EXAMPLE_DIR=/home/example/dictionaries/");
+    System.out.println();
+    System.out.println("Shortcut to run from any directory:");
+    System.out.println("   setenv HSTREAMING \"$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/build/hadoop-streaming.jar\"");
+    System.out.println();
+    System.out.println("Example: $HSTREAMING -mapper \"/usr/local/bin/perl5 filter.pl\"");
+    System.out.println("           -file /local/filter.pl -input \"/logs/0604*/*\" [...]");
     System.out.println("  Ships a script, invokes the non-shipped perl interpreter");
     System.out.println("  Shipped files go to the working directory so filter.pl is found by perl");
     System.out.println("  Input files are all the daily logs for days in month 2006-04");
-    fail("");    
+    fail("");
   }
   
   public void fail(String message)
@@ -291,7 +356,7 @@ public class StreamJob
         msg("Found runtime classes in: " + runtimeClasses);
     }
     if(isLocalHadoop()) {
-      // don't package class files (they might get unpackaged in . and then 
+      // don't package class files (they might get unpackaged in "." and then 
       //  hide the intended CLASSPATH entry)
       // we still package everything else (so that scripts and executable are found in 
       //  Task workdir like distributed Hadoop)
@@ -393,7 +458,17 @@ public class StreamJob
     if(jar_ != null) {
         jobConf_.setJar(jar_);
     }
-    //jobConf_.mtdump();System.exit(1);
+
+    // last, allow user to override anything 
+    // (although typically used with properties we didn't touch)
+    it = userJobConfProps_.iterator();
+    while(it.hasNext()) {
+        String prop = (String)it.next();
+        String[] nv = prop.split("=", 2);
+        msg("JobConf: set(" + nv[0] + ", " + nv[1]+")");
+        jobConf_.set(nv[0], nv[1]);
+    }   
+    
   }
   
   protected String getJobTrackerHostPort()
@@ -432,7 +507,7 @@ public class StreamJob
       running_ = jc_.submitJob(jobConf_);
       jobId_ = running_.getJobID();
 
-      LOG.info("getLocalDirs(): " + Arrays.toString(jobConf_.getLocalDirs()));     
+      LOG.info("getLocalDirs(): " + Arrays.asList(jobConf_.getLocalDirs()));     
       LOG.info("Running job: " + jobId_);      
       jobInfo();
 
@@ -467,11 +542,10 @@ public class StreamJob
   }
   
 
-  public final static String NOSHIP = "noship:";
-  
   protected boolean mayExit_;
   protected String[] argv_;
   protected boolean verbose_;
+  protected boolean detailedUsage_;
   protected int debug_;
 
   protected Environment env_;
@@ -483,8 +557,10 @@ public class StreamJob
   protected JobClient jc_;
 
   // command-line arguments
-  protected ArrayList inputGlobs_   = new ArrayList(); // <String>
-  protected ArrayList packageFiles_ = new ArrayList(); // <String>
+  protected ArrayList inputGlobs_       = new ArrayList(); // <String>
+  protected ArrayList packageFiles_     = new ArrayList(); // <String>
+  protected ArrayList shippedCanonFiles_= new ArrayList(); // <String>  
+  protected ArrayList userJobConfProps_ = new ArrayList(); // <String>
   protected String output_;
   protected String mapCmd_;
   protected String redCmd_;

+ 1 - 1
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java

@@ -69,7 +69,7 @@ public class StreamLineRecordReader extends StreamBaseRecordReader
       return false;
 
     //((LongWritable)key).set(pos);           // key is position
-    //((UTF8)value).set(readLine(in));        // value is line
+    //((UTF8)value).set(readLine(in));   // value is line
     String line = readLine(in_);
 
     // key is line up to TAB, value is rest

+ 18 - 2
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java

@@ -198,7 +198,7 @@ public class StreamUtil
   static {
     try {
       env = new Environment();
-      HOST = env.get("HOST").toString();
+      HOST = env.getHost();
     } catch(IOException io) {
       io.printStackTrace();
     }
@@ -275,6 +275,22 @@ public class StreamUtil
     }
   }
   
+  static final String regexpSpecials = "[]()?*+|.!^-\\~@";
+  
+  public static String regexpEscape(String plain)
+  {
+    StringBuffer buf = new StringBuffer();
+    char[] ch = plain.toCharArray();
+    int csup = ch.length;
+    for(int c=0; c<csup; c++) {
+      if(regexpSpecials.indexOf(ch[c]) != -1) {
+        buf.append("\\");    
+      }
+      buf.append(ch[c]);
+    }
+    return buf.toString();
+  }
+  
   static String slurp(File f) throws IOException
   {
     FileInputStream in = new FileInputStream(f);
@@ -298,5 +314,5 @@ public class StreamUtil
     }
     return env_;
   }
-
+  
 }

+ 246 - 25
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java

@@ -17,10 +17,12 @@
 package org.apache.hadoop.streaming;
 
 import java.io.*;
+import java.util.regex.*;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.mapred.Reporter;
@@ -32,6 +34,14 @@ import org.apache.hadoop.mapred.JobConf;
  *  Values are XML subtrees delimited by configurable tags.
  *  Keys could be the value of a certain attribute in the XML subtree, 
  *  but this is left to the stream processor application.
+ *
+ *  The name-value properties that StreamXmlRecordReader understands are:
+ *    String begin (chars marking beginning of record)
+ *    String end   (chars marking end of record)
+ *    int maxrec   (maximum record size)
+ *    int lookahead(maximum lookahead to sync CDATA)
+ *    boolean slowmatch
+ *
  *  @author Michel Tourn
  */
 public class StreamXmlRecordReader extends StreamBaseRecordReader 
@@ -42,67 +52,278 @@ public class StreamXmlRecordReader extends StreamBaseRecordReader
     throws IOException
   {
     super(in, start, end, splitName, reporter, job);
-    beginMark_ = checkJobGet("stream.recordreader.begin");
-    endMark_   = checkJobGet("stream.recordreader.end");
-  }
+    
+    beginMark_ = checkJobGet(CONF_NS + "begin");
+    endMark_   = checkJobGet(CONF_NS + "end");
 
-  String checkJobGet(String prop) throws IOException
-  {
-  	String val = job_.get(prop);
-  	if(val == null) {
-  		throw new IOException("JobConf: missing required property: " + prop);
-  	}
-  	return val;
+    maxRecSize_= job_.getInt(CONF_NS + "maxrec", 50*1000);
+    lookAhead_ = job_.getInt(CONF_NS + "lookahead", 2*maxRecSize_);
+    synched_ = false;
+    
+    slowMatch_ = job_.getBoolean(CONF_NS + "slowmatch", false);
+    if(slowMatch_) {
+      beginPat_  = makePatternCDataOrMark(beginMark_);
+      endPat_    = makePatternCDataOrMark(endMark_);
+    }
   }
   
-  public void seekNextRecordBoundary() throws IOException
-  {
-  System.out.println("@@@start seekNext " + in_.getPos());
-    readUntilMatch(beginMark_, null);      
-  System.out.println("@@@end   seekNext " + in_.getPos());
-  }
-    
+  int numNext = 0;
   public synchronized boolean next(Writable key, Writable value)
    throws IOException
   {
     long pos = in_.getPos();
-    if (pos >= end_)
+    numNext++;
+    if (pos >= end_) {
       return false;
+    }
     
     StringBuffer buf = new StringBuffer();
-    readUntilMatch(endMark_, buf);
+    if(!readUntilMatchBegin()) {
+        return false;
+    }
+    if(!readUntilMatchEnd(buf)) {
+        return false;
+    }
     numRecStats(buf);
+    
+    // There is only one elem..key/value splitting is not done here.
+    ((UTF8)key).set(buf.toString());
+    ((UTF8)value).set("");
+    
+    /*if(numNext < 5) {
+        System.out.println("@@@ " + numNext + ". true next k=|" + key.toString().replaceAll("[\\r\\n]", " ")
+        + "|, len=" + buf.length() + " v=|" + value.toString().replaceAll("[\\r\\n]", " ") + "|");
+    }*/
+
     return true;
   }
+  
+  public void seekNextRecordBoundary() throws IOException
+  {
+    readUntilMatchBegin();
+  }
+  
+  boolean readUntilMatchBegin() throws IOException
+  {
+    if(slowMatch_) {
+        return slowReadUntilMatch(beginPat_, false, null);
+    } else {
+        return fastReadUntilMatch(beginMark_, false, null);
+    }
+  }
+  
+  boolean readUntilMatchEnd(StringBuffer buf) throws IOException
+  {
+    if(slowMatch_) {
+      return slowReadUntilMatch(endPat_, true, buf);
+    } else {
+      return fastReadUntilMatch(endMark_, true, buf);
+    }
+  }
+  
+  
+  boolean slowReadUntilMatch(Pattern markPattern, boolean includePat, StringBuffer outBufOrNull) 
+    throws IOException   
+  {
+    try {
+      long inStart = in_.getPos();
+      byte[] buf = new byte[Math.max(lookAhead_, maxRecSize_)];
+      int read = 0;
+      boolean success = true;
+      in_.mark(lookAhead_ + 2);
+      read = in_.read(buf);
+      String sbuf = new String(buf);        
+      Matcher match = markPattern.matcher(sbuf);
 
-  void readUntilMatch(String pat, StringBuffer outBuf) throws IOException 
+      firstMatchStart_ = NA;
+      firstMatchEnd_ = NA;
+      int bufPos = 0;
+      int state = synched_ ? CDATA_OUT : CDATA_UNK;
+      int s=0;
+      int matchLen = 0;
+      while(match.find(bufPos)) {
+        int input;
+        matchLen = match.group(0).length();
+        if(match.group(1) != null) {
+          input = CDATA_BEGIN;
+        } else if(match.group(2) != null) {
+          input = CDATA_END;
+          firstMatchStart_ = NA; // |<DOC CDATA[ </DOC> ]]> should keep it
+        } else {
+          input = RECORD_MAYBE;
+        }
+        if(input == RECORD_MAYBE) {
+            if(firstMatchStart_ == NA) {
+              firstMatchStart_ = match.start();
+              firstMatchEnd_   = match.end();
+            }
+        }
+        state = nextState(state, input, match.start());
+        /*System.out.println("@@@" +
+         s + ". Match " + match.start() + " " + match.groupCount() +
+         " state=" + state + " input=" + input + 
+         " firstMatchStart_=" + firstMatchStart_ + " startinstream=" + (inStart+firstMatchStart_) + 
+         " match=" + match.group(0) + " in=" + in_.getPos());*/
+        if(state == RECORD_ACCEPT) {
+          break;
+        }
+        bufPos = match.end();
+        s++;
+      }
+      if(state != CDATA_UNK) {
+        synched_ = true;
+      }
+      boolean matched = (firstMatchStart_ != NA) && (state == RECORD_ACCEPT || state == CDATA_UNK); 
+      if(matched) {
+        int endPos = includePat ? firstMatchEnd_ : firstMatchStart_;
+        //System.out.println("firstMatchStart_=" + firstMatchStart_ + " firstMatchEnd_=" + firstMatchEnd_);
+        String snip = sbuf.substring(firstMatchStart_, firstMatchEnd_);
+        //System.out.println(" match snip=|" + snip + "| markPattern=" + markPattern);
+        if(outBufOrNull != null) {
+          buf = new byte[endPos];
+          in_.reset();      
+          read = in_.read(buf);
+          if(read != endPos) {
+              //System.out.println("@@@ BAD re-read less: " + read + " < " + endPos);
+          }          
+          outBufOrNull.append(new String(buf));
+        } else {
+          //System.out.println("Skip to " + (inStart + endPos));
+          in_.seek(inStart + endPos);
+        }
+      }
+      return matched;
+    } catch(Exception e) {
+      e.printStackTrace();
+    } finally {
+      // in_ ?
+    }
+    return false;
+  }  
+  
+  // states
+  final static int CDATA_IN  = 10;
+  final static int CDATA_OUT = 11;
+  final static int CDATA_UNK = 12;
+  final static int RECORD_ACCEPT = 13;
+  // inputs
+  final static int CDATA_BEGIN = 20;
+  final static int CDATA_END   = 21;
+  final static int RECORD_MAYBE= 22;
+  
+  /* also updates firstMatchStart_;*/
+  int nextState(int state, int input, int bufPos)
   {
+    switch(state) {
+      case CDATA_UNK:
+      case CDATA_OUT:
+        switch(input) {
+          case CDATA_BEGIN:
+            return CDATA_IN;
+          case CDATA_END:
+            if(state==CDATA_OUT) {
+              //System.out.println("buggy XML " + bufPos);
+            }
+            return CDATA_OUT;
+          case RECORD_MAYBE:
+            return (state==CDATA_UNK) ? CDATA_UNK : RECORD_ACCEPT;
+        }
+      break;
+      case CDATA_IN:
+       return (input==CDATA_END) ? CDATA_OUT : CDATA_IN;
+    }
+    throw new IllegalStateException(state + " " + input + " " + bufPos + " " + splitName_);
+  }
+  
     
-    char[] cpat = pat.toCharArray();
+  Pattern makePatternCDataOrMark(String escapedMark)
+  {
+    StringBuffer pat = new StringBuffer();
+    addGroup(pat, StreamUtil.regexpEscape("CDATA["));   // CDATA_BEGIN
+    addGroup(pat, StreamUtil.regexpEscape("]]>"));      // CDATA_END
+    addGroup(pat, escapedMark);                         // RECORD_MAYBE
+    return Pattern.compile(pat.toString());
+  }
+  void addGroup(StringBuffer pat, String escapedGroup)
+  {
+    if(pat.length() > 0) {
+        pat.append("|");
+    }
+    pat.append("(");
+    pat.append(escapedGroup);
+    pat.append(")");
+  }
+  
+  
+  
+  boolean fastReadUntilMatch(String textPat, boolean includePat, StringBuffer outBufOrNull) throws IOException 
+  {
+    //System.out.println("@@@BEGIN readUntilMatch inPos=" + in_.getPos());  
+    char[] cpat = textPat.toCharArray();
     int m = 0;
+    boolean match = false;
+    long markPos = -1;
     int msup = cpat.length;
+    if(!includePat) {
+      int LL = 120000 * 10;
+      markPos = in_.getPos();
+      in_.mark(LL); // lookAhead_
+    }
     while (true) {
       int b = in_.read();
       if (b == -1)
         break;
 
       char c = (char)b; // this assumes eight-bit matching. OK with UTF-8
+      if(outBufOrNull != null) {
+        outBufOrNull.append(c);
+      }
       if (c == cpat[m]) {
         m++;
-        if(m==msup-1) {
+        if(m==msup) {
+          match = true;
           break;
         }
       } else {
         m = 0;
       }
-      if(outBuf != null) {
-        outBuf.append(c);
+    }
+    if(!includePat && match) {
+      if(outBufOrNull != null) {
+        outBufOrNull.setLength(outBufOrNull.length() - textPat.length());
       }
+      long pos = in_.getPos() - textPat.length();
+      in_.reset();
+      in_.seek(pos);
     }
-System.out.println("@@@START readUntilMatch(" + pat + ", " + outBuf + "\n@@@END readUntilMatch");
+    //System.out.println("@@@DONE  readUntilMatch inPos=" + in_.getPos() + " includePat=" + includePat + " pat=" + textPat + ", buf=|" + outBufOrNull + "|");
+    return match;
+  }
+  
+  String checkJobGet(String prop) throws IOException
+  {
+    String val = job_.get(prop);
+    if(val == null) {
+        throw new IOException("JobConf: missing required property: " + prop);
+    }
+    return val;
   }
   
   
   String beginMark_;
   String endMark_;
+  
+  Pattern beginPat_;
+  Pattern endPat_;
+
+  boolean slowMatch_;  
+  int lookAhead_; // bytes to read to try to synch CDATA/non-CDATA. Should be more than max record size
+  int maxRecSize_;
+
+  final static int NA = -1;  
+  int firstMatchStart_ = 0; // candidate record boundary. Might just be CDATA.
+  int firstMatchEnd_ = 0;
+  
+  boolean isRecordMatch_;
+  boolean synched_;
 }

+ 2 - 2
src/java/org/apache/hadoop/dfs/DFSClient.java

@@ -808,7 +808,7 @@ class DFSClient implements FSConstants {
                     localName, overwrite, replication, blockSize);
               } catch (RemoteException e) {
                 if (--retries == 0 || 
-                    AlreadyBeingCreatedException.class.getName().
+                    !AlreadyBeingCreatedException.class.getName().
                         equals(e.getClassName())) {
                   throw e;
                 } else {
@@ -838,7 +838,7 @@ class DFSClient implements FSConstants {
                                          clientName.toString());
               } catch (RemoteException e) {
                 if (--retries == 0 || 
-                    NotReplicatedYetException.class.getName().
+                    !NotReplicatedYetException.class.getName().
                         equals(e.getClassName())) {
                   throw e;
                 } else {

+ 38 - 14
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -24,7 +24,6 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
 import java.io.*;
 import java.net.*;
-import java.nio.channels.FileLock;
 import java.util.*;
 
 /**********************************************************
@@ -173,7 +172,20 @@ public class DataNode implements FSConstants, Runnable {
      * @throws IOException
      */
     private void register() throws IOException {
-      dnRegistration = namenode.register( dnRegistration );
+      while (shouldRun) {
+        try {
+          dnRegistration = namenode.register( dnRegistration );
+          break;
+        } catch( ConnectException se ) {  // namenode has not been started
+          LOG.info("Namenode not available yet, Zzzzz...");
+        } catch( SocketTimeoutException te ) {  // namenode is busy
+          LOG.info("Problem connecting to Namenode: " + 
+                   StringUtils.stringifyException(te));
+        }
+        try {
+          Thread.sleep(10 * 1000);
+        } catch (InterruptedException ie) {}
+      }
       if( storage.getStorageID().equals("") ) {
         storage.setStorageID( dnRegistration.getStorageID());
         storage.write();
@@ -194,7 +206,7 @@ public class DataNode implements FSConstants, Runnable {
     }
 
     void handleDiskError( String errMsgr ) {
-        LOG.warn( "Shuting down DataNode because "+errMsgr );
+        LOG.warn( "DataNode is shutting down.\n" + errMsgr );
         try {
             namenode.errorReport(
                     dnRegistration, DatanodeProtocol.DISK_ERROR, errMsgr);
@@ -208,9 +220,7 @@ public class DataNode implements FSConstants, Runnable {
      * forever calling remote NameNode functions.
      */
     public void offerService() throws Exception {
-      // start dataXceiveServer  
-      dataXceiveServer.start();
-      
+     
       long lastHeartbeat = 0, lastBlockReport = 0;
       LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec");
 
@@ -325,13 +335,16 @@ public class DataNode implements FSConstants, Runnable {
           } // synchronized
         } // while (shouldRun)
       } catch(DiskErrorException e) {
-        handleDiskError(e.getMessage());
-      }
-      
-      // wait for dataXceiveServer to terminate
-      try {
-          this.dataXceiveServer.join();
-      } catch (InterruptedException ie) {
+        handleDiskError(e.getLocalizedMessage());
+      } catch( RemoteException re ) {
+        String reClass = re.getClassName();
+        if( UnregisteredDatanodeException.class.getName().equals( reClass )) {
+          LOG.warn( "DataNode is shutting down: " + 
+                    StringUtils.stringifyException(re));
+          shutdown();
+          return;
+        }
+        throw re;
       }
     } // offerService
 
@@ -818,6 +831,10 @@ public class DataNode implements FSConstants, Runnable {
      */
     public void run() {
         LOG.info("Starting DataNode in: "+data.data);
+        
+        // start dataXceiveServer
+        dataXceiveServer.start();
+        
         while (shouldRun) {
             try {
                 offerService();
@@ -832,7 +849,14 @@ public class DataNode implements FSConstants, Runnable {
               }
             }
         }
-      LOG.info("Finishing DataNode in: "+data.data);
+        
+        // wait for dataXceiveServer to terminate
+        try {
+            this.dataXceiveServer.join();
+        } catch (InterruptedException ie) {
+        }
+        
+        LOG.info("Finishing DataNode in: "+data.data);
     }
 
     /** Start datanode daemons.

+ 56 - 32
src/java/org/apache/hadoop/dfs/FSDirectory.java

@@ -143,18 +143,22 @@ class FSDirectory implements FSConstants {
          * @param path file path
          * @param newNode INode to be added
          * @return null if the node already exists; inserted INode, otherwise
+         * @throws FileNotFoundException 
          * @author shv
          */
-        INode addNode(String path, INode newNode) {
+        INode addNode(String path, INode newNode) throws FileNotFoundException {
           File target = new File( path );
           // find parent
           Path parent = new Path(path).getParent();
-          if (parent == null)
-            return null;
+          if (parent == null) { // add root
+              return null;
+          }
           INode parentNode = getNode(parent.toString());
-          if (parentNode == null)
-            return null;
-          // check whether the parent already has a node with that name
+          if (parentNode == null) {
+              throw new FileNotFoundException(
+                      "Parent path does not exist: "+path);
+          }
+           // check whether the parent already has a node with that name
           String name = newNode.name = target.getName();
           if( parentNode.getChild( name ) != null )
             return null;
@@ -688,11 +692,19 @@ class FSDirectory implements FSConstants {
      */
     boolean unprotectedAddFile(UTF8 path, INode newNode) {
       synchronized (rootDir) {
-        int nrBlocks = (newNode.blocks == null) ? 0 : newNode.blocks.length;
-        // Add file->block mapping
-        for (int i = 0; i < nrBlocks; i++)
-            activeBlocks.put(newNode.blocks[i], newNode);
-        return (rootDir.addNode(path.toString(), newNode) != null);
+         try {
+            if( rootDir.addNode(path.toString(), newNode ) != null ) {
+                int nrBlocks = (newNode.blocks == null) ? 0 : newNode.blocks.length;
+                // Add file->block mapping
+                for (int i = 0; i < nrBlocks; i++)
+                    activeBlocks.put(newNode.blocks[i], newNode);
+                return true;
+            } else {
+                return false;
+            }
+        } catch (FileNotFoundException e ) {
+            return false;
+        }
       }
     }
 
@@ -720,23 +732,36 @@ class FSDirectory implements FSConstants {
             INode renamedNode = rootDir.getNode(srcStr);
             if (renamedNode == null) {
                 NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-                        +"failed to rename "+src+" to "+dst+ " because "+ src+" does not exist" );
+                        +"failed to rename "+src+" to "+dst+ " because source does not exist" );
                 return false;
             }
-            renamedNode.removeNode();
             if (isDir(dst)) {
               dstStr += "/" + new File(srcStr).getName();
             }
+            if( rootDir.getNode(dstStr.toString()) != null ) {
+                NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+                        +"failed to rename "+src+" to "+dstStr+ " because destination exists" );
+                return false;
+            }
+            renamedNode.removeNode();
+            
             // the renamed node can be reused now
-            if( rootDir.addNode(dstStr, renamedNode ) == null ) {
+            try {
+                if( rootDir.addNode(dstStr, renamedNode ) != null ) {
+                    NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: "
+                        +src+" is renamed to "+dst );
+                    return true;
+                }
+            } catch (FileNotFoundException e ) {
                 NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
                         +"failed to rename "+src+" to "+dst );
-              rootDir.addNode(srcStr, renamedNode); // put it back
-              return false;
+                try {
+                    rootDir.addNode(srcStr, renamedNode); // put it back
+                }catch(FileNotFoundException e2) {                
+                }
             }
-            NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: "
-                     +src+" is renamed to "+dst );
-            return true;
+
+            return false;
         }
     }
 
@@ -977,29 +1002,28 @@ class FSDirectory implements FSConstants {
 
         // Now go backwards through list of dirs, creating along
         // the way
-        boolean lastSuccess = false;
         int numElts = v.size();
         for (int i = numElts - 1; i >= 0; i--) {
             String cur = (String) v.elementAt(i);
-            INode inserted = unprotectedMkdir(cur);
-            if (inserted != null) {
-                NameNode.stateChangeLog.debug("DIR* FSDirectory.mkdirs: "
+            try {
+               INode inserted = unprotectedMkdir(cur);
+               if (inserted != null) {
+                   NameNode.stateChangeLog.debug("DIR* FSDirectory.mkdirs: "
                         +"created directory "+cur );
-                logEdit(OP_MKDIR, new UTF8(inserted.computeName()), null);
-                lastSuccess = true;
-            } else {
-                lastSuccess = false;
+                   logEdit(OP_MKDIR, new UTF8(inserted.computeName()), null);
+               } // otherwise cur exists, continue
+            } catch (FileNotFoundException e ) {
+                NameNode.stateChangeLog.debug("DIR* FSDirectory.mkdirs: "
+                        +"failed to create directory "+src);
+                return false;
             }
         }
-/*        if( !lastSuccess )
-            NameNode.stateChangeLog.warn("DIR* FSDirectory.mkdirs: "
-                    +"failed to create directory "+src );*/
-        return lastSuccess;
+        return true;
     }
 
     /**
      */
-    INode unprotectedMkdir(String src) {
+    INode unprotectedMkdir(String src) throws FileNotFoundException {
         synchronized (rootDir) {
             return rootDir.addNode(src, new INode(new File(src).getName()));
         }

+ 4 - 2
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -1266,7 +1266,7 @@ class FSNamesystem implements FSConstants {
 
             if (! dir.isValidBlock(b) && ! pendingCreateBlocks.contains(b)) {
                 obsolete.add(b);
-                NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: "
+                NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
                         +"ask "+nodeID.getName()+" to delete "+b.getBlockName() );
             }
         }
@@ -1329,6 +1329,8 @@ class FSNamesystem implements FSConstants {
      */
     private void proccessOverReplicatedBlock( Block block, short replication ) {
       TreeSet containingNodes = (TreeSet) blocksMap.get(block);
+      if( containingNodes == null )
+        return;
       Vector nonExcess = new Vector();
       for (Iterator it = containingNodes.iterator(); it.hasNext(); ) {
           DatanodeInfo cur = (DatanodeInfo) it.next();
@@ -1509,7 +1511,7 @@ class FSNamesystem implements FSConstants {
                 blockList.append(' ');
                 blockList.append(((Block)invalidateSet.elementAt(i)).getBlockName());
             }
-            NameNode.stateChangeLog.info("BLOCK* NameSystem.blockToInvalidate: "
+            NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockToInvalidate: "
                    +"ask "+nodeID.getName()+" to delete " + blockList );
         }
         return (Block[]) invalidateSet.toArray(new Block[invalidateSet.size()]);

+ 2 - 1
src/java/org/apache/hadoop/fs/FileSystem.java

@@ -459,7 +459,8 @@ public abstract class FileSystem extends Configured {
 
     /**
      * Make the given file and all non-existent parents into
-     * directories.
+     * directories. Has the semantics of Unix 'mkdir -p'.
+     * Existence of the directory hierarchy is not an error.
      */
     public abstract boolean mkdirs(Path f) throws IOException;
 

+ 10 - 3
src/java/org/apache/hadoop/fs/LocalFileSystem.java

@@ -223,11 +223,18 @@ public class LocalFileSystem extends FileSystem {
         }
         return results;
     }
-
+    
+    /**
+     * Creates the specified directory hierarchy. Does not
+     * treat existence as an error.
+     */
     public boolean mkdirs(Path f) throws IOException {
-      return pathToFile(f).mkdirs();
+      Path parent = f.getParent();
+      File p2f = pathToFile(f);
+      return (parent == null || mkdirs(parent)) &&
+             (p2f.mkdir() || p2f.isDirectory());
     }
-
+    
     /**
      * Set the working directory to the given directory.
      */

+ 16 - 0
src/site/src/documentation/content/xdocs/index.xml

@@ -14,6 +14,22 @@
     <section>
       <title>News</title>
 
+      <section>
+      <title>9 June, 2006: release 0.3.2 available</title>
+      <p>This is a bugfix release.  For details see the <a
+      href="http://tinyurl.com/k9g5c">change log</a>. The release can
+      be obtained from <a
+      href="http://www.apache.org/dyn/closer.cgi/lucene/hadoop/"> a
+      nearby mirror</a>.
+      </p> </section>
+
+      <section>
+      <title>8 June, 2006: FAQ added to Wiki</title>
+      <p>Hadoop now has a <a
+      href="http://wiki.apache.org/lucene-hadoop/FAQ">FAQ</a>.  Please
+      help make this more complete!
+      </p> </section>
+
       <section>
       <title>5 June, 2006: release 0.3.1 available</title>
       <p>This is a bugfix release.  For details see the <a

+ 4 - 4
src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java

@@ -73,7 +73,7 @@ public class ClusterTestDFSNamespaceLogging extends TestCase implements FSConsta
   private static final int BLOCK_LOG_HEADER_LEN = 32;
   /** DFS block size
    */
-  private static final int BLOCK_SIZE = 32*1000*1000;
+  private static final int BLOCK_SIZE = 32*1024*1024;
   
   /** Buffer size
    */
@@ -158,7 +158,7 @@ public class ClusterTestDFSNamespaceLogging extends TestCase implements FSConsta
     
       // create a file with 2 data blocks
       try {
-        createFile("/data/yy",BLOCK_SIZE+1);
+        createFile("/data/yy", BLOCK_SIZE+1);
         assertCreate( "/data/yy", BLOCK_SIZE+1, false );
       } catch( IOException ioe ) {
     	assertCreate( "/data/yy", BLOCK_SIZE+1, true );
@@ -326,9 +326,9 @@ public class ClusterTestDFSNamespaceLogging extends TestCase implements FSConsta
   //
   private void configureDFS() throws IOException {
 	// set given config param to override other config settings
-	conf.setInt("test.dfs.block_size", BLOCK_SIZE);
+	conf.setInt("dfs.block.size", BLOCK_SIZE);
 	// verify that config changed
-	assertTrue(BLOCK_SIZE == conf.getInt("test.dfs.block_size", 2)); // 2 is an intentional obviously-wrong block size
+	assertTrue(BLOCK_SIZE == conf.getInt("dfs.block.size", 2)); // 2 is an intentional obviously-wrong block size
 	// downsize for testing (just to save resources)
 	conf.setInt("dfs.namenode.handler.count", 3);
 	conf.setLong("dfs.blockreport.intervalMsec", 50*1000L);

+ 2 - 1
src/test/org/apache/hadoop/test/AllTestDriver.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.io.TestSetFile;
 import org.apache.hadoop.io.TestSequenceFile;
 import org.apache.hadoop.ipc.TestIPC;
 import org.apache.hadoop.ipc.TestRPC;
+import org.apache.hadoop.fs.DistributedFSCheck;
 import org.apache.hadoop.fs.TestDFSIO;
 import org.apache.hadoop.fs.DFSCIOTest;
 
@@ -52,7 +53,7 @@ public class AllTestDriver {
 	    pgd.addClass("testtextinputformat", TestTextInputFormat.class, "A test for text input format.");
       pgd.addClass("TestDFSIO", TestDFSIO.class, "Distributed i/o benchmark.");
       pgd.addClass("DFSCIOTest", DFSCIOTest.class, "Distributed i/o benchmark of libhdfs.");
-      pgd.addClass("DistributedFSCheck", TestDFSIO.class, "Distributed checkup of the file system consistency.");
+      pgd.addClass("DistributedFSCheck", DistributedFSCheck.class, "Distributed checkup of the file system consistency.");
 	    pgd.driver(argv);
 	}
 	catch(Throwable e){

Některé soubory nejsou zobrazeny, neboť je v těchto rozdílových datech změněno mnoho souborů