소스 검색

HADOOP-2046. Improve mapred javadoc. Contributed by Arun.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@588033 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 17 년 전
부모
커밋
5c36fd40a6
32개의 변경된 파일2309개의 추가작업 그리고 447개의 파일을 삭제
  1. 2 0
      CHANGES.txt
  2. 9 0
      src/contrib/streaming/src/java/org/apache/hadoop/streaming/package.html
  3. 276 101
      src/java/org/apache/hadoop/conf/Configuration.java
  4. 80 5
      src/java/org/apache/hadoop/filecache/DistributedCache.java
  5. 143 2
      src/java/org/apache/hadoop/io/SequenceFile.java
  6. 47 8
      src/java/org/apache/hadoop/io/Writable.java
  7. 32 1
      src/java/org/apache/hadoop/io/WritableComparable.java
  8. 49 5
      src/java/org/apache/hadoop/mapred/ClusterStatus.java
  9. 11 3
      src/java/org/apache/hadoop/mapred/Counters.java
  10. 12 1
      src/java/org/apache/hadoop/mapred/FileInputFormat.java
  11. 66 14
      src/java/org/apache/hadoop/mapred/InputFormat.java
  12. 15 4
      src/java/org/apache/hadoop/mapred/InputSplit.java
  13. 172 27
      src/java/org/apache/hadoop/mapred/JobClient.java
  14. 498 77
      src/java/org/apache/hadoop/mapred/JobConf.java
  15. 1 1
      src/java/org/apache/hadoop/mapred/JobEndNotifier.java
  16. 5 2
      src/java/org/apache/hadoop/mapred/MapReduceBase.java
  17. 18 5
      src/java/org/apache/hadoop/mapred/MapRunnable.java
  18. 127 15
      src/java/org/apache/hadoop/mapred/Mapper.java
  19. 12 4
      src/java/org/apache/hadoop/mapred/OutputCollector.java
  20. 37 12
      src/java/org/apache/hadoop/mapred/OutputFormat.java
  21. 21 7
      src/java/org/apache/hadoop/mapred/Partitioner.java
  22. 35 10
      src/java/org/apache/hadoop/mapred/RecordReader.java
  23. 21 7
      src/java/org/apache/hadoop/mapred/RecordWriter.java
  24. 161 9
      src/java/org/apache/hadoop/mapred/Reducer.java
  25. 26 11
      src/java/org/apache/hadoop/mapred/Reporter.java
  26. 61 19
      src/java/org/apache/hadoop/mapred/RunningJob.java
  27. 204 8
      src/java/org/apache/hadoop/mapred/package.html
  28. 77 60
      src/java/org/apache/hadoop/util/GenericOptionsParser.java
  29. 9 7
      src/java/org/apache/hadoop/util/Progressable.java
  30. 47 3
      src/java/org/apache/hadoop/util/Tool.java
  31. 34 18
      src/java/org/apache/hadoop/util/ToolRunner.java
  32. 1 1
      src/test/org/apache/hadoop/mapred/NotificationTestCase.java

+ 2 - 0
CHANGES.txt

@@ -464,6 +464,8 @@ Branch 0.15 (unreleased changes)
     edits log. Reduce the number of syncs by double-buffering the changes
     to the transaction log. (Dhruba Borthakur)
 
+    HADOOP-2046.  Improve mapred javadoc.  (Arun C. Murthy via cutting)
+
 
 Release 0.14.3 - 2007-10-19
 

+ 9 - 0
src/contrib/streaming/src/java/org/apache/hadoop/streaming/package.html

@@ -0,0 +1,9 @@
+<html>
+<body>
+
+<tt>Hadoop Streaming</tt> is a utility which allows users to create and run 
+Map-Reduce jobs with any executables (e.g. Unix shell utilities) as the mapper 
+and/or the reducer.
+
+</body>
+</html>

+ 276 - 101
src/java/org/apache/hadoop/conf/Configuration.java

@@ -38,43 +38,70 @@ import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-/** Provides access to configuration parameters.  Configurations are specified
- * by resources.  A resource contains a set of name/value pairs.
+/** 
+ * Provides access to configuration parameters.
  *
- * <p>Each resource is named by either a String or by a Path.  If named by a
- * String, then the classpath is examined for a file with that name.  If a
- * File, then the local filesystem is examined directly, without referring to
- * the CLASSPATH.
+ * <h4 id="Resources">Resources</h4>
  *
- * <p>Configuration parameters may be declared 'final'.  Once a resource
- * declares a value final, no subsequently-loaded resource may alter that
- * value.  For example, one might define a final parameter with:
- * <pre>
+ * <p>Configurations are specified by resources. A resource contains a set of
+ * name/value pairs as XML data. Each resource is named by either a 
+ * <code>String</code> or by a {@link Path}. If named by a <code>String</code>, 
+ * then the classpath is examined for a file with that name.  If named by a 
+ * <code>Path</code>, then the local filesystem is examined directly, without 
+ * referring to the classpath.
+ *
+ * <p>Hadoop by default specifies two resources, loaded in-order from the
+ * classpath: <ol>
+ * <li><tt><a href="{@docRoot}/../hadoop-default.html">hadoop-default.xml</a>
+ * </tt>: Read-only defaults for hadoop.</li>
+ * <li><tt>hadoop-site.xml</tt>: Site-specific configuration for a given hadoop
+ * installation.</li>
+ * </ol>
+ * Applications may add additional resources, which are loaded
+ * subsequent to these resources in the order they are added.
+ * 
+ * <h4 id="FinalParams">Final Parameters</h4>
+ *
+ * <p>Configuration parameters may be declared <i>final</i>. 
+ * Once a resource declares a value final, no subsequently-loaded 
+ * resource can alter that value.  
+ * For example, one might define a final parameter with:
+ * <tt><pre>
  *  &lt;property&gt;
  *    &lt;name&gt;dfs.client.buffer.dir&lt;/name&gt;
  *    &lt;value&gt;/tmp/hadoop/dfs/client&lt;/value&gt;
- *    &lt;final&gt;true&lt;/final&gt;
+ *    <b>&lt;final&gt;true&lt;/final&gt;</b>
+ *  &lt;/property&gt;</pre></tt>
+ *
+ * Administrators typically define parameters as final in 
+ * <tt>hadoop-site.xml</tt> for values that user applications may not alter.
+ *
+ * <h4 id="VariableExpansion">Variable Expansion</h4>
+ *
+ * <p>Value strings are first processed for <i>variable expansion</i>. The
+ * available properties are:<ol>
+ * <li>Other properties defined in this Configuration; and, if a name is
+ * undefined here,</li>
+ * <li>Properties in {@link System#getProperties()}.</li>
+ * </ol>
+ *
+ * <p>For example, if a configuration resource contains the following property
+ * definitions: 
+ * <tt><pre>
+ *  &lt;property&gt;
+ *    &lt;name&gt;basedir&lt;/name&gt;
+ *    &lt;value&gt;/user/${<i>user.name</i>}&lt;/value&gt;
  *  &lt;/property&gt;
- * </pre>
+ *  
+ *  &lt;property&gt;
+ *    &lt;name&gt;tempdir&lt;/name&gt;
+ *    &lt;value&gt;${<i>basedir</i>}/tmp&lt;/value&gt;
+ *  &lt;/property&gt;</pre></tt>
  *
- * <p>Hadoop by default specifies two resource strings: "hadoop-default.xml"
- * and "hadoop-site.xml".  Other tools built on Hadoop may specify additional
- * resources.
- * 
- * <p>The values returned by most <tt>get*</tt> methods are based on String representations. 
- * This String is processed for <b>variable expansion</b>. The available variables are the 
- * <em>System properties</em> and the <em>other properties</em> defined in this Configuration.
- * <p>The only <tt>get*</tt> method that is not processed for variable expansion is
- * {@link #getObject(String)} (as it cannot assume that the returned values are String). 
- * You can use <tt>getObject</tt> to obtain the raw value of a String property without 
- * variable expansion: if <tt>(String)conf.getObject("my.jdk")</tt> is <tt>"JDK ${java.version}"</tt>
- * then conf.get("my.jdk")</tt> is <tt>"JDK 1.5.0"</tt> 
- * <p> Example XML config using variables:<br><tt>
- * &lt;name>basedir&lt;/name>&lt;value>/user/${user.name}&lt;/value><br> 
- * &lt;name>tempdir&lt;/name>&lt;value>${basedir}/tmp&lt;/value><br>
- * </tt>When conf.get("tempdir") is called:<br>
- * <tt>${basedir}</tt> is resolved to another property in this Configuration.
- * Then <tt>${user.name}</tt> is resolved to a System property.
+ * When <tt>conf.get("tempdir")</tt> is called, then <tt>${<i>basedir</i>}</tt>
+ * will be resolved to another property in this Configuration, while
+ * <tt>${<i>user.name</i>}</tt> would then ordinarily be resolved to the value
+ * of the System property with that name.
  */
 public class Configuration implements Iterable<Map.Entry<String,String>> {
   private static final Log LOG =
@@ -120,7 +147,11 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
     finalResources.add("hadoop-site.xml");
   }
 
-  /** A new configuration with the same settings cloned from another. */
+  /** 
+   * A new configuration with the same settings cloned from another.
+   * 
+   * @param other the configuration from which to clone settings.
+   */
   @SuppressWarnings("unchecked")
   public Configuration(Configuration other) {
     if (LOG.isDebugEnabled()) {
@@ -186,8 +217,13 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
   }
 
   /**
-   * Add a configuration resource.
-   * @param name resource to be added
+   * Add a configuration resource. 
+   * 
+   * The properties of this resource will override properties of previously 
+   * added resources, unless they were marked <a href="#Final>final</a>. 
+   * 
+   * @param name resource to be added, the classpath is examined for a file 
+   *             with that name.
    */
   public void addResource(String name) {
     addResource(resources, name);
@@ -195,7 +231,13 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
 
   /**
    * Add a configuration resource. 
-   * @param url url of the resource to be added
+   * 
+   * The properties of this resource will override properties of previously 
+   * added resources, unless they were marked <a href="#Final>final</a>. 
+   * 
+   * @param url url of the resource to be added, the local filesystem is 
+   *            examined directly to find the resource, without referring to 
+   *            the classpath.
    */
   public void addResource(URL url) {
     addResource(resources, url);
@@ -203,7 +245,13 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
 
   /**
    * Add a configuration resource. 
-   * @param file file-path of resource to be added
+   * 
+   * The properties of this resource will override properties of previously 
+   * added resources, unless they were marked <a href="#Final>final</a>. 
+   * 
+   * @param file file-path of resource to be added, the local filesystem is
+   *             examined directly to find the resource, without referring to 
+   *             the classpath.
    */
   public void addResource(Path file) {
     addResource(resources, file);
@@ -271,17 +319,28 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
                                     + MAX_SUBST + " " + expr);
   }
   
-  /** Returns the value of the <code>name</code> property, or null if no
-   * such property exists. */
+  /**
+   * Get the value of the <code>name</code> property, <code>null</code> if
+   * no such property exists.
+   * 
+   * Values are processed for <a href="#VariableExpansion">variable expansion</a> 
+   * before being returned. 
+   * 
+   * @param name the property name.
+   * @return the value of the <code>name</code> property, 
+   *         or null if no such property exists.
+   */
   public String get(String name) {
     return substituteVars(getProps().getProperty(name));
   }
 
   /**
-   * Get the value of the <code>name</code> property, without doing variable
-   * expansion.
-   * @param name the property name
-   * @return the result or null if no such property exists
+   * Get the value of the <code>name</code> property, without doing
+   * <a href="#VariableExpansion">variable expansion</a>.
+   * 
+   * @param name the property name.
+   * @return the value of the <code>name</code> property, 
+   *         or null if no such property exists.
    */
   public String getRaw(String name) {
     return getProps().getProperty(name);
@@ -295,7 +354,11 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
     getProps().setProperty(name, value.toString());
   }
   
-  /** Sets the value of the <code>name</code> property. 
+  /** 
+   * Set the <code>value</code> of the <code>name</code> property.
+   * 
+   * @param name property name.
+   * @param value property value.
    */
   public void set(String name, String value) {
     getOverlay().setProperty(name, value);
@@ -309,16 +372,29 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
     return overlay;
   }
 
-  /** Returns the value of the <code>name</code> property.  If no such property
+  /** 
+   * Get the value of the <code>name</code> property. If no such property 
    * exists, then <code>defaultValue</code> is returned.
+   * 
+   * @param name property name.
+   * @param defaultValue default value.
+   * @return property value, or <code>defaultValue</code> if the property 
+   *         doesn't exist.                    
    */
   public String get(String name, String defaultValue) {
     return substituteVars(getProps().getProperty(name, defaultValue));
   }
     
-  /** Returns the value of the <code>name</code> property as an integer.  If no
-   * such property is specified, or if the specified value is not a valid
-   * integer, then <code>defaultValue</code> is returned.
+  /** 
+   * Get the value of the <code>name</code> property as an <code>int</code>.
+   *   
+   * If no such property exists, or if the specified value is not a valid
+   * <code>int</code>, then <code>defaultValue</code> is returned.
+   * 
+   * @param name property name.
+   * @param defaultValue default value.
+   * @return property value as an <code>int</code>, 
+   *         or <code>defaultValue</code>. 
    */
   public int getInt(String name, int defaultValue) {
     String valueString = get(name);
@@ -331,15 +407,26 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
     }
   }
 
-  /** Sets the value of the <code>name</code> property to an integer. */
+  /** 
+   * Set the value of the <code>name</code> property to an <code>int</code>.
+   * 
+   * @param name property name.
+   * @param value <code>int</code> value of the property.
+   */
   public void setInt(String name, int value) {
     set(name, Integer.toString(value));
   }
 
 
-  /** Returns the value of the <code>name</code> property as a long.  If no
-   * such property is specified, or if the specified value is not a valid
-   * long, then <code>defaultValue</code> is returned.
+  /** 
+   * Get the value of the <code>name</code> property as a <code>long</code>.  
+   * If no such property is specified, or if the specified value is not a valid
+   * <code>long</code>, then <code>defaultValue</code> is returned.
+   * 
+   * @param name property name.
+   * @param defaultValue default value.
+   * @return property value as a <code>long</code>, 
+   *         or <code>defaultValue</code>. 
    */
   public long getLong(String name, long defaultValue) {
     String valueString = get(name);
@@ -352,14 +439,25 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
     }
   }
 
-  /** Sets the value of the <code>name</code> property to a long. */
+  /** 
+   * Set the value of the <code>name</code> property to a <code>long</code>.
+   * 
+   * @param name property name.
+   * @param value <code>long</code> value of the property.
+   */
   public void setLong(String name, long value) {
     set(name, Long.toString(value));
   }
 
-  /** Returns the value of the <code>name</code> property as a float.  If no
-   * such property is specified, or if the specified value is not a valid
-   * float, then <code>defaultValue</code> is returned.
+  /** 
+   * Get the value of the <code>name</code> property as a <code>float</code>.  
+   * If no such property is specified, or if the specified value is not a valid
+   * <code>float</code>, then <code>defaultValue</code> is returned.
+   * 
+   * @param name property name.
+   * @param defaultValue default value.
+   * @return property value as a <code>float</code>, 
+   *         or <code>defaultValue</code>. 
    */
   public float getFloat(String name, float defaultValue) {
     String valueString = get(name);
@@ -372,10 +470,15 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
     }
   }
 
-  /** Returns the value of the <code>name</code> property as an boolean.  If no
-   * such property is specified, or if the specified value is not a valid
-   * boolean, then <code>defaultValue</code> is returned.  Valid boolean values
-   * are "true" and "false".
+  /** 
+   * Get the value of the <code>name</code> property as a <code>boolean</code>.  
+   * If no such property is specified, or if the specified value is not a valid
+   * <code>boolean</code>, then <code>defaultValue</code> is returned.
+   * 
+   * @param name property name.
+   * @param defaultValue default value.
+   * @return property value as a <code>boolean</code>, 
+   *         or <code>defaultValue</code>. 
    */
   public boolean getBoolean(String name, boolean defaultValue) {
     String valueString = get(name);
@@ -386,14 +489,24 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
     else return defaultValue;
   }
 
-  /** Sets the value of the <code>name</code> property to an integer. */
+  /** 
+   * Set the value of the <code>name</code> property to a <code>boolean</code>.
+   * 
+   * @param name property name.
+   * @param value <code>boolean</code> value of the property.
+   */
   public void setBoolean(String name, boolean value) {
     set(name, Boolean.toString(value));
   }
 
-  /** Returns the value of the <code>name</code> property as an array of
-   * strings.  If no such property is specified, then <code>null</code>
-   * is returned.  Values are comma delimited.
+  /** 
+   * Get the comma delimited values of the <code>name</code> property as 
+   * an array of <code>String</code>s.  
+   * If no such property is specified then <code>null</code> is returned.
+   * 
+   * @param name property name.
+   * @return property value as an array of <code>String</code>s, 
+   *         or <code>null</code>. 
    */
   public String[] getStrings(String name) {
     String valueString = get(name);
@@ -402,16 +515,24 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
 
   /**
    * Load a class by name.
-   * @param name the class name
-   * @return the class object
-   * @throws ClassNotFoundException if the class is not found
+   * 
+   * @param name the class name.
+   * @return the class object.
+   * @throws ClassNotFoundException if the class is not found.
    */
   public Class<?> getClassByName(String name) throws ClassNotFoundException {
     return Class.forName(name, true, classLoader);
   }
-  
-  /** Returns the value of the <code>name</code> property as a Class.  If no
-   * such property is specified, then <code>defaultValue</code> is returned.
+
+  /** 
+   * Get the value of the <code>name</code> property as a <code>Class</code>.  
+   * If no such property is specified, then <code>defaultValue</code> is 
+   * returned.
+   * 
+   * @param name the class name.
+   * @param defaultValue default value.
+   * @return property value as a <code>Class</code>, 
+   *         or <code>defaultValue</code>. 
    */
   public Class<?> getClass(String name, Class<?> defaultValue) {
     String valueString = get(name);
@@ -424,15 +545,27 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
     }
   }
 
-  /** Returns the value of the <code>name</code> property as a Class.  If no
-   * such property is specified, then <code>defaultValue</code> is returned.
-   * An error is thrown if the returned class does not implement the named
+  /** 
+   * Get the value of the <code>name</code> property as a <code>Class</code>
+   * implementing the interface specified by <code>xface</code>.
+   *   
+   * If no such property is specified, then <code>defaultValue</code> is 
+   * returned.
+   * 
+   * An exception is thrown if the returned class does not implement the named
    * interface. 
-   */
-  public <U> Class<? extends U> getClass(String propertyName, Class<? extends U> defaultValue,
-      Class<U> xface) {
+   * 
+   * @param name the class name.
+   * @param defaultValue default value.
+   * @param xface the interface implemented by the named class.
+   * @return property value as a <code>Class</code>, 
+   *         or <code>defaultValue</code>.
+   */
+  public <U> Class<? extends U> getClass(String name, 
+                                         Class<? extends U> defaultValue, 
+                                         Class<U> xface) {
     try {
-      Class<?> theClass = getClass(propertyName, defaultValue);
+      Class<?> theClass = getClass(name, defaultValue);
       if (theClass != null && !xface.isAssignableFrom(theClass))
         throw new RuntimeException(theClass+" not "+xface.getName());
       else if (theClass != null)
@@ -444,21 +577,32 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
     }
   }
 
-  /** Sets the value of the <code>name</code> property to the name of a class.
-   * First checks that the class implements the named interface. 
-   */
-  public void setClass(String propertyName, Class<?> theClass,
-                       Class<?> xface) {
-    
+  /** 
+   * Set the value of the <code>name</code> property to the name of a 
+   * <code>theClass</code> implementing the given interface <code>xface</code>.
+   * 
+   * An exception is thrown if <code>theClass</code> does not implement the 
+   * interface <code>xface</code>. 
+   * 
+   * @param name property name.
+   * @param theClass property value.
+   * @param xface the interface implemented by the named class.
+   */
+  public void setClass(String name, Class<?> theClass, Class<?> xface) {
     if (!xface.isAssignableFrom(theClass))
       throw new RuntimeException(theClass+" not "+xface.getName());
-    set(propertyName, theClass.getName());
+    set(name, theClass.getName());
   }
 
-  /** Returns a local file under a directory named in <i>dirsProp</i> with
+  /** 
+   * Get a local file under a directory named by <i>dirsProp</i> with
    * the given <i>path</i>.  If <i>dirsProp</i> contains multiple directories,
    * then one is chosen based on <i>path</i>'s hash code.  If the selected
    * directory does not exist, an attempt is made to create it.
+   * 
+   * @param dirsProp directory in which to locate the file.
+   * @param path file-path.
+   * @return local file under the directory with the given path.
    */
   public Path getLocalPath(String dirsProp, String path)
     throws IOException {
@@ -482,10 +626,15 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
     throw new IOException("No valid local directories in property: "+dirsProp);
   }
 
-  /** Returns a local file name under a directory named in <i>dirsProp</i> with
+  /** 
+   * Get a local file name under a directory named in <i>dirsProp</i> with
    * the given <i>path</i>.  If <i>dirsProp</i> contains multiple directories,
    * then one is chosen based on <i>path</i>'s hash code.  If the selected
    * directory does not exist, an attempt is made to create it.
+   * 
+   * @param dirsProp directory in which to locate the file.
+   * @param path file-path.
+   * @return local file under the directory with the given path.
    */
   public File getFile(String dirsProp, String path)
     throws IOException {
@@ -502,14 +651,22 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
     throw new IOException("No valid local directories in property: "+dirsProp);
   }
 
-
-
-  /** Returns the URL for the named resource. */
+  /** 
+   * Get the {@link URL} for the named resource.
+   * 
+   * @param name resource name.
+   * @return the url for the named resource.
+   */
   public URL getResource(String name) {
     return classLoader.getResource(name);
   }
-  /** Returns an input stream attached to the configuration resource with the
+  
+  /** 
+   * Get an input stream attached to the configuration resource with the
    * given <code>name</code>.
+   * 
+   * @param name configuration resource name.
+   * @return an input stream attached to the resource.
    */
   public InputStream getConfResourceAsInputStream(String name) {
     try {
@@ -528,8 +685,12 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
     }
   }
 
-  /** Returns a reader attached to the configuration resource with the
+  /** 
+   * Get a {@link Reader} attached to the configuration resource with the
    * given <code>name</code>.
+   * 
+   * @param name configuration resource name.
+   * @return a reader attached to the resource.
    */
   public Reader getConfResourceAsReader(String name) {
     try {
@@ -561,15 +722,17 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
   }
 
   /** @return Iterator&lt; Map.Entry&lt;String,String> >  
-   * @deprecated use <code>iterator()</code> instead 
+   * @deprecated Use {@link #iterator()} instead. 
    */
   public Iterator entries() {
     return iterator();
   }
 
   /**
-   * Go through the list of String key-value pairs in the configuration.
-   * @return an iterator over the entries
+   * Get an {@link Iterator} to go through the list of <code>String</code> 
+   * key-value pairs in the configuration.
+   * 
+   * @return an iterator over the entries.
    */
   public Iterator<Map.Entry<String, String>> iterator() {
     // Get a copy of just the string to string pairs. After the old object
@@ -689,7 +852,12 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
     
   }
 
-  /** Writes non-default properties in this configuration.*/
+  /** 
+   * Write out the non-default properties in this configuration to the give
+   * {@link OutputStream}.
+   * 
+   * @param out the output stream to write to.
+   */
   public void write(OutputStream out) throws IOException {
     Properties properties = getProps();
     try {
@@ -732,8 +900,9 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
   }
 
   /**
-   * Get the class loader for this job.
-   * @return the correct class loader
+   * Get the {@link ClassLoader} for this job.
+   * 
+   * @return the correct class loader.
    */
   public ClassLoader getClassLoader() {
     return classLoader;
@@ -741,7 +910,8 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
   
   /**
    * Set the class loader that will be used to load the various objects.
-   * @param classLoader the new class loader
+   * 
+   * @param classLoader the new class loader.
    */
   public void setClassLoader(ClassLoader classLoader) {
     this.classLoader = classLoader;
@@ -767,11 +937,16 @@ public class Configuration implements Iterable<Map.Entry<String,String>> {
     }
   }
 
-  /** Make this class quiet. Error and informational
-   *  messages might not be logged.
-   */
-  public synchronized void setQuietMode(boolean value) {
-    quietmode = value;
+  /** 
+   * Set the quiteness-mode. 
+   * 
+   * In the the quite-mode error and informational messages might not be logged.
+   * 
+   * @param quietmode <code>true</code> to set quiet-mode on, <code>false</code>
+   *              to turn it off.
+   */
+  public synchronized void setQuietMode(boolean quietmode) {
+    this.quietmode = quietmode;
   }
 
   /** For debugging.  List non-default properties to the terminal and exit. */

+ 80 - 5
src/java/org/apache/hadoop/filecache/DistributedCache.java

@@ -24,14 +24,89 @@ import java.util.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
 
 import java.net.URI;
 
-/*******************************************************************************
- * The DistributedCache maintains all the caching information of cached archives
- * and unarchives all the files as well and returns the path
+/**
+ * Distribute application-specific large, read-only files efficiently.
+ * 
+ * <p><code>DistributedCache</code> is a facility provided by the Map-Reduce
+ * framework to cache files (text, archives, jars etc.) needed by applications.
+ * </p>
+ * 
+ * <p>Applications specify the files, via urls (hdfs:// or http://) to be cached 
+ * via the {@link JobConf}. The <code>DistributedCache</code> assumes that the
+ * files specified via hdfs:// urls are already present on the 
+ * {@link FileSystem} at the path specified by the url.</p>
+ * 
+ * <p>The framework will copy the necessary files on to the slave node before 
+ * any tasks for the job are executed on that node. Its efficiency stems from 
+ * the fact that the files are only copied once per job and the ability to 
+ * cache archives which are un-archived on the slaves.</p> 
+ *
+ * <p><code>DistributedCache</code> can be used to distribute simple, read-only
+ * data/text files and/or more complex types such as archives, jars etc. 
+ * Archives (zip files) are un-archived at the slave nodes. Jars maybe be 
+ * optionally added to the classpath of the tasks, a rudimentary software
+ * distribution mechanism. Optionally users can also direct it to symlink the 
+ * distributed cache file(s) into the working directory of the task.</p>
  * 
- ******************************************************************************/
+ * <p><code>DistributedCache</code> tracks modification timestamps of the cache 
+ * files. Clearly the cache files should not be modified by the application 
+ * or externally while the job is executing.</p>
+ * 
+ * <p>Here is an illustrative example on how to use the 
+ * <code>DistributedCache</code>:</p>
+ * <p><blockquote><pre>
+ *     // Setting up the cache for the application
+ *     
+ *     1. Copy the requisite files to the <code>FileSystem</code>:
+ *     
+ *     $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat  
+ *     $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip  
+ *     $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar
+ *     
+ *     2. Setup the application's <code>JobConf</code>:
+ *     
+ *     JobConf job = new JobConf();
+ *     DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"), 
+ *                                   job);
+ *     DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
+ *     DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
+ *
+ *     3. Use the cached files in the {@link Mapper} or {@link Reducer}:
+ *     
+ *     public static class MapClass extends MapReduceBase  
+ *     implements Mapper&lt;K, V, K, V&gt; {
+ *     
+ *       private Path[] localArchives;
+ *       private Path[] localFiles;
+ *       
+ *       public void configure(JobConf job) {
+ *         // Get the cached archives/files
+ *         localArchives = DistributedCache.getLocalCacheArchives(job);
+ *         localFiles = DistributedCache.getLocalCacheFiles(job);
+ *       }
+ *       
+ *       public void map(K key, V value, 
+ *                       OutputCollector&lt;K, V&gt; output, Reporter reporter) 
+ *       throws IOException {
+ *         // Use data from the cached archives/files here
+ *         // ...
+ *         // ...
+ *         output.collect(k, v);
+ *       }
+ *     }
+ *     
+ * </pre></blockquote></p>
+ * 
+ * @see JobConf
+ * @see JobClient
+ */
 public class DistributedCache {
   // cacheID to cacheStatus mapping
   private static TreeMap<String, CacheStatus> cachedArchives = new TreeMap<String, CacheStatus>();
@@ -47,7 +122,7 @@ public class DistributedCache {
    * previously cached (and valid) or copy it from the {@link FileSystem} now.
    * 
    * @param cache the cache to be localized, this should be specified as 
-   * new URI(hdfs://hostname:port/absoulte_path_to_file#LINKNAME). If no schema 
+   * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema 
    * or hostname:port is provided the file is assumed to be in the filesystem
    * being used in the Configuration
    * @param conf The Confguration file which contains the filesystem

+ 143 - 2
src/java/org/apache/hadoop/io/SequenceFile.java

@@ -40,7 +40,145 @@ import org.apache.hadoop.util.NativeCodeLoader;
 import org.apache.hadoop.util.MergeSort;
 import org.apache.hadoop.util.PriorityQueue;
 
-/** Support for flat files of binary key/value pairs. */
+/** 
+ * <code>SequenceFile</code>s are flat files consisting of binary key/value 
+ * pairs.
+ * 
+ * <p><code>SequenceFile</code> provides {@link Writer}, {@link Reader} and
+ * {@link Sorter} classes for writing, reading and sorting respectively.</p>
+ * 
+ * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 
+ * {@link CompressionType} used to compress key/value pairs:
+ * <ol>
+ *   <li>
+ *   <code>Writer</code> : Uncompressed records.
+ *   </li>
+ *   <li>
+ *   <code>RecordCompressWriter</code> : Record-compressed files, only compress 
+ *                                       values.
+ *   </li>
+ *   <li>
+ *   <code>BlockCompressWriter</code> : Block-compressed files, both keys & 
+ *                                      values are collected in 'blocks' 
+ *                                      separately and compressed. The size of 
+ *                                      the 'block' is configurable.
+ * </ol>
+ * 
+ * <p>The actual compression algorithm used to compress key and/or values can be
+ * specified by using the appropriate {@link CompressionCodec}.</p>
+ * 
+ * <p>The recommended way is to use the static <tt>createWriter</tt> methods
+ * provided by the <code>SequenceFile</code> to chose the preferred format.</p>
+ *
+ * <p>The {@link Reader} acts as the bridge and can read any of the above 
+ * <code>SequenceFile</code> formats.</p>
+ *
+ * <h4 id="Formats">SequenceFile Formats</h4>
+ * 
+ * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s
+ * depending on the <code>CompressionType</code> specified. All of them share a
+ * <a href="#Header">common header</a> described below.
+ * 
+ * <h5 id="Header">SequenceFile Header</h5>
+ * <ul>
+ *   <li>
+ *   version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 
+ *             version number (e.g. SEQ4 or SEQ6)
+ *   </li>
+ *   <li>
+ *   keyClassName -key class
+ *   </li>
+ *   <li>
+ *   valueClassName - value class
+ *   </li>
+ *   <li>
+ *   compression - A boolean which specifies if compression is turned on for 
+ *                 keys/values in this file.
+ *   </li>
+ *   <li>
+ *   blockCompression - A boolean which specifies if block-compression is 
+ *                      turned on for keys/values in this file.
+ *   </li>
+ *   <li>
+ *   compression codec - <code>CompressionCodec</code> class which is used for  
+ *                       compression of keys and/or values (if compression is 
+ *                       enabled).
+ *   </li>
+ *   <li>
+ *   metadata - {@link Metadata} for this file.
+ *   </li>
+ *   <li>
+ *   sync - A sync marker to denote end of the header.
+ *   </li>
+ * </ul>
+ * 
+ * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5>
+ * <ul>
+ * <li>
+ * <a href="#Header">Header</a>
+ * </li>
+ * <li>
+ * Record
+ *   <ul>
+ *     <li>Record length</li>
+ *     <li>Key length</li>
+ *     <li>Key</li>
+ *     <li>Value</li>
+ *   </ul>
+ * </li>
+ * <li>
+ * A sync-marker every few <code>100</code> bytes or so.
+ * </li>
+ * </ul>
+ *
+ * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5>
+ * <ul>
+ * <li>
+ * <a href="#Header">Header</a>
+ * </li>
+ * <li>
+ * Record
+ *   <ul>
+ *     <li>Record length</li>
+ *     <li>Key length</li>
+ *     <li>Key</li>
+ *     <li><i>Compressed</i> Value</li>
+ *   </ul>
+ * </li>
+ * <li>
+ * A sync-marker every few <code>100</code> bytes or so.
+ * </li>
+ * </ul>
+ * 
+ * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5>
+ * <ul>
+ * <li>
+ * <a href="#Header">Header</a>
+ * </li>
+ * <li>
+ * Record <i>Block</i>
+ *   <ul>
+ *     <li>Compressed key-lengths block-size</li>
+ *     <li>Compressed key-lengths block</li>
+ *     <li>Compressed keys block-size</li>
+ *     <li>Compressed keys block</li>
+ *     <li>Compressed value-lengths block-size</li>
+ *     <li>Compressed value-lengths block</li>
+ *     <li>Compressed values block-size</li>
+ *     <li>Compressed values block</li>
+ *   </ul>
+ * </li>
+ * <li>
+ * A sync-marker every few <code>100</code> bytes or so.
+ * </li>
+ * </ul>
+ * 
+ * <p>The compressed blocks of key lengths and value lengths consist of the 
+ * actual lengths of individual keys/values encoded in ZeroCompressedInteger 
+ * format.</p>
+ * 
+ * @see CompressionCodec
+ */
 public class SequenceFile {
   private static final Log LOG = LogFactory.getLog(SequenceFile.class);
 
@@ -60,7 +198,10 @@ public class SequenceFile {
   /** The number of bytes between sync points.*/
   public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 
 
-  /** The type of compression.
+  /** 
+   * The compression type used to compress key/value pairs in the 
+   * {@link SequenceFile}.
+   * 
    * @see SequenceFile.Writer
    */
   public static enum CompressionType {

+ 47 - 8
src/java/org/apache/hadoop/io/Writable.java

@@ -22,20 +22,59 @@ import java.io.DataOutput;
 import java.io.DataInput;
 import java.io.IOException;
 
-/** A simple, efficient, serialization protocol, based on {@link DataInput} and
- * {@link DataOutput}.
+/**
+ * A serializable object which implements a simple, efficient, serialization 
+ * protocol, based on {@link DataInput} and {@link DataOutput}.
  *
+ * <p>Any <code>key</code> or <code>value</code> type in the Hadoop Map-Reduce
+ * framework implements this interface.</p>
+ * 
  * <p>Implementations typically implement a static <code>read(DataInput)</code>
- * method which constructs a new instance, calls {@link
- * #readFields(DataInput)}, and returns the instance.
+ * method which constructs a new instance, calls {@link #readFields(DataInput)} 
+ * and returns the instance.</p>
+ * 
+ * <p>Example:</p>
+ * <p><blockquote><pre>
+ *     public class MyWritable implements Writable {
+ *       // Some data     
+ *       private int counter;
+ *       private long timestamp;
+ *       
+ *       public void write(DataOutput out) throws IOException {
+ *         out.writeInt(counter);
+ *         out.writeLong(timestamp);
+ *       }
+ *       
+ *       public void readFields(DataInput in) throws IOException {
+ *         counter = in.readInt();
+ *         timestamp = in.readLong();
+ *       }
+ *       
+ *       public static MyWritable read(DataInput in) throws IOException {
+ *         MyWritable w = new MyWritable();
+ *         w.readFields(in);
+ *         return w;
+ *       }
+ *     }
+ * </pre></blockquote></p>
  */
 public interface Writable {
-  /** Writes the fields of this object to <code>out</code>. */
+  /** 
+   * Serialize the fields of this object to <code>out</code>.
+   * 
+   * @param out <code>DataOuput</code> to serialize this object into.
+   * @throws IOException
+   */
   void write(DataOutput out) throws IOException;
 
-  /** Reads the fields of this object from <code>in</code>.  For efficiency,
-   * implementations should attempt to re-use storage in the existing object
-   * where possible.
+  /** 
+   * Deserialize the fields of this object from <code>in</code>.  
+   * 
+   * <p>For efficiency, implementations should attempt to re-use storage in the 
+   * existing object where possible.</p>
+   * 
+   * @param in <code>DataInput</code> to deseriablize this object from.
+   * @throws IOException
    */
   void readFields(DataInput in) throws IOException;
 }

+ 32 - 1
src/java/org/apache/hadoop/io/WritableComparable.java

@@ -18,7 +18,38 @@
 
 package org.apache.hadoop.io;
 
-/** An interface which extends both {@link Writable} and {@link Comparable}.
+/**
+ * A {@link Writable} which is also {@link Comparable}. 
+ *
+ * <p><code>WritableComparable</code>s can be compared to each other, typically 
+ * via <code>Comparator</code>s. Any type which is to be used as a 
+ * <code>key</code> in the Hadoop Map-Reduce framework should implement this
+ * interface.</p>
+ *  
+ * <p>Example:</p>
+ * <p><blockquote><pre>
+ *     public class MyWritableComparable implements WritableComparable {
+ *       // Some data
+ *       private int counter;
+ *       private long timestamp;
+ *       
+ *       public void write(DataOutput out) throws IOException {
+ *         out.writeInt(counter);
+ *         out.writeLong(timestamp);
+ *       }
+ *       
+ *       public void readFields(DataInput in) throws IOException {
+ *         counter = in.readInt();
+ *         timestamp = in.readLong();
+ *       }
+ *       
+ *       public int compareTo(MyWritableComparable w) {
+ *         int thisValue = this.value;
+ *         int thatValue = ((IntWritable)o).value;
+ *         return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
+ *       }
+ *     }
+ * </pre></blockquote></p>
  */
 public interface WritableComparable extends Writable, Comparable {
 }

+ 49 - 5
src/java/org/apache/hadoop/mapred/ClusterStatus.java

@@ -26,7 +26,28 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 
 /**
- * Summarizes the size and current state of the cluster.
+ * Status information on the current state of the Map-Reduce cluster.
+ * 
+ * <p><code>ClusterStatus</code> provides clients with information such as:
+ * <ol>
+ *   <li>
+ *   Size of the cluster. 
+ *   </li>
+ *   <li>
+ *   Task capacity of the cluster. 
+ *   </li>
+ *   <li>
+ *   The number of currently running map & reduce tasks.
+ *   </li>
+ *   <li>
+ *   State of the <code>JobTracker</code>.
+ *   </li>
+ * </ol></p>
+ * 
+ * <p>Clients can query for the latest <code>ClusterStatus</code>, via 
+ * {@link JobClient#getClusterStatus()}.</p>
+ * 
+ * @see JobClient
  */
 public class ClusterStatus implements Writable {
 
@@ -38,6 +59,15 @@ public class ClusterStatus implements Writable {
 
   ClusterStatus() {}
   
+  /**
+   * Construct a new cluster status.
+   * 
+   * @param trackers no. of tasktrackers in the cluster
+   * @param maps no. of currently running map-tasks in the cluster
+   * @param reduces no. of currently running reduce-tasks in the cluster
+   * @param max the maximum no. of tasks in the cluster
+   * @param state the {@link JobTracker.State} of the <code>JobTracker</code>
+   */
   ClusterStatus(int trackers, int maps, int reduces, int max,
                 JobTracker.State state) {
     task_trackers = trackers;
@@ -49,33 +79,47 @@ public class ClusterStatus implements Writable {
   
 
   /**
-   * The number of task trackers in the cluster.
+   * Get the number of task trackers in the cluster.
+   * 
+   * @return the number of task trackers in the cluster.
    */
   public int getTaskTrackers() {
     return task_trackers;
   }
   
   /**
-   * The number of currently running map tasks.
+   * Get the number of currently running map tasks in the cluster.
+   * 
+   * @return the number of currently running map tasks in the cluster.
    */
   public int getMapTasks() {
     return map_tasks;
   }
   
   /**
-   * The number of current running reduce tasks.
+   * Get the number of currently running reduce tasks in the cluster.
+   * 
+   * @return the number of currently running reduce tasks in the cluster.
    */
   public int getReduceTasks() {
     return reduce_tasks;
   }
   
   /**
-   * The maximum capacity for running tasks in the cluster.
+   * Get the maximum capacity for running tasks in the cluster.
+   * 
+   * @return the maximum capacity for running tasks in the cluster.
    */
   public int getMaxTasks() {
     return max_tasks;
   }
 
+  /**
+   * Get the current state of the <code>JobTracker</code>, 
+   * as {@link JobTracker.State}
+   * 
+   * @return the current state of the <code>JobTracker</code>.
+   */
   public JobTracker.State getJobTrackerState() {
     return state;
   }

+ 11 - 3
src/java/org/apache/hadoop/mapred/Counters.java

@@ -36,6 +36,13 @@ import org.apache.hadoop.io.Writable;
 
 /**
  * A set of named counters.
+ * 
+ * <p><code>Counters</code> represent global counters, defined either by the 
+ * Map-Reduce framework or applications. Each <code>Counter</code> can be of
+ * any {@link Enum} type.</p>
+ * 
+ * <p><code>Counters</code> are bunched into {@link Group}s, each comprising of
+ * counters from a particular <code>Enum</code> class. 
  */
 public class Counters implements Writable {
   
@@ -57,10 +64,11 @@ public class Counters implements Writable {
   } // end class CounterRec
   
   /**
-   *  Represents a group of counters, comprising the counters from a particular 
-   *  counter enum class.  
+   *  <code>Group</code> of counters, comprising of counters from a particular 
+   *  counter {@link Enum} class.  
    *
-   *  This class handles localization of the class name and the counter names.
+   *  <p><code>Group</code>handles localization of the class name and the 
+   *  counter names.</p>
    */
   public static class Group {
     

+ 12 - 1
src/java/org/apache/hadoop/mapred/FileInputFormat.java

@@ -32,8 +32,14 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 /** 
- * A base class for {@link InputFormat}. 
+ * A base class for file-based {@link InputFormat}.
  * 
+ * <p><code>FileInputFormat</code> is the base class for all file-based 
+ * <code>InputFormat</code>s. This provides generic implementations of
+ * {@link #validateInput(JobConf)} and {@link #getSplits(JobConf, int)}.
+ * Implementations fo <code>FileInputFormat</code> can also override the 
+ * {@link #isSplitable(FileSystem, Path)} method to ensure input-files are
+ * not split-up and are processed as a whole by {@link Mapper}s.
  */
 public abstract class FileInputFormat<K extends WritableComparable,
                                       V extends Writable>
@@ -58,6 +64,11 @@ public abstract class FileInputFormat<K extends WritableComparable,
   /**
    * Is the given filename splitable? Usually, true, but if the file is
    * stream compressed, it will not be.
+   * 
+   * <code>FileInputFormat</code> implementations can override this and return
+   * <code>false</code> to ensure that individual input files are never split-up
+   * so that {@link Mapper}s process entire files.
+   * 
    * @param fs the file system that the file is on
    * @param filename the file name to check
    * @return is this file splitable?

+ 66 - 14
src/java/org/apache/hadoop/mapred/InputFormat.java

@@ -21,36 +21,88 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-/** An input data format.  Input files are stored in a {@link FileSystem}.
- * The processing of an input file may be split across multiple machines.
- * Files are processed as sequences of records, implementing {@link
- * RecordReader}.  Files must thus be split on record boundaries. */
+/** 
+ * <code>InputFormat</code> describes the input-specification for a 
+ * Map-Reduce job. 
+ * 
+ * <p>The Map-Reduce framework relies on the <code>InputFormat</code> of the
+ * job to:<p>
+ * <ol>
+ *   <li>
+ *   Validate the input-specification of the job. 
+ *   <li>
+ *   Split-up the input file(s) into logical {@link InputSplit}s, each of 
+ *   which is then assigned to an individual {@link Mapper}.
+ *   </li>
+ *   <li>
+ *   Provide the {@link RecordReader} implementation to be used to glean
+ *   input records from the logical <code>InputSplit</code> for processing by 
+ *   the {@link Mapper}.
+ *   </li>
+ * </ol>
+ * 
+ * <p>The default behavior of file-based {@link InputFormat}s, typically 
+ * sub-classes of {@link FileInputFormat}, is to split the 
+ * input into <i>logical</i> {@link InputSplit}s based on the total size, in 
+ * bytes, of the input files. However, the {@link FileSystem} blocksize of  
+ * the input files is treated as an upper bound for input splits. A lower bound 
+ * on the split size can be set via 
+ * <a href="{@docRoot}/../hadoop-default.html#mapred.min.split.size">
+ * mapred.min.split.size</a>.</p>
+ * 
+ * <p>Clearly, logical splits based on input-size is insufficient for many 
+ * applications since record boundaries are to respected. In such cases, the
+ * application has to also implement a {@link RecordReader} on whom lies the
+ * responsibilty to respect record-boundaries and present a record-oriented
+ * view of the logical <code>InputSplit</code> to the individual task.
+ *
+ * @see InputSplit
+ * @see RecordReader
+ * @see JobClient
+ * @see FileInputFormat
+ */
 public interface InputFormat<K extends WritableComparable,
                              V extends Writable> {
 
   /**
-   * Are the input directories valid? This method is used to test the input
-   * directories when a job is submitted so that the framework can fail early
-   * with a useful error message when the input directory does not exist.
-   * @param job the job to check
+   * Check for validity of the input-specification for the job. 
+   * 
+   * <p>This method is used to validate the input directories when a job is 
+   * submitted so that the {@link JobClient} can fail early, with an useful 
+   * error message, in case of errors. For e.g. input directory does not exist.
+   * </p>
+   * 
+   * @param job job configuration.
    * @throws InvalidInputException if the job does not have valid input
    */
   void validateInput(JobConf job) throws IOException;
   
-  /** Splits a set of input files.  One split is created per map task.
+  /** 
+   * Logically split the set of input files for the job.  
+   * 
+   * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
+   * for processing.</p>
    *
-   * @param job the job whose input files are to be split
-   * @param numSplits the desired number of splits
-   * @return the splits
+   * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
+   * input files are not physically split into chunks. For e.g. a split could
+   * be <i>&lt;input-file-path, start, offset&gt;</i> tuple.
+   * 
+   * @param job job configuration.
+   * @param numSplits the desired number of splits, a hint.
+   * @return an array of {@link InputSplit}s for the job.
    */
   InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
 
-  /** Construct a {@link RecordReader} for a {@link FileSplit}.
+  /** 
+   * Get the {@link RecordReader} for the given {@link InputSplit}.
    *
+   * <p>It is the responsibility of the <code>RecordReader</code> to respect
+   * record boundaries while processing the logical split to present a 
+   * record-oriented view to the individual task.</p>
+   * 
    * @param split the {@link InputSplit}
    * @param job the job that this split belongs to
    * @return a {@link RecordReader}

+ 15 - 4
src/java/org/apache/hadoop/mapred/InputSplit.java

@@ -22,20 +22,31 @@ import java.io.IOException;
 import org.apache.hadoop.io.Writable;
 
 /**
- * The description of the data for a single map task.
+ * <code>InputSplit</code> represents the data to be processed by an 
+ * individual {@link Mapper}. 
+ *
+ * <p>Typically, it presents a byte-oriented view on the input and is the 
+ * responsibility of {@link RecordReader} of the job to process this and present
+ * a record-oriented view.
+ * 
+ * @see InputFormat
+ * @see RecordReader
  */
 public interface InputSplit extends Writable {
 
   /**
-   * Get the number of input bytes in the split.
-   * @return the number of bytes in the input split
+   * Get the total number of bytes in the data of the <code>InputSplit</code>.
+   * 
+   * @return the number of bytes in the input split.
    * @throws IOException
    */
   long getLength() throws IOException;
   
   /**
    * Get the list of hostnames where the input split is located.
-   * @return A list of prefered hostnames
+   * 
+   * @return list of hostnames where data of the <code>InputSplit</code> is
+   *         located as an array of <code>String</code>s.
    * @throws IOException
    */
   String[] getLocations() throws IOException;

+ 172 - 27
src/java/org/apache/hadoop/mapred/JobClient.java

@@ -60,13 +60,89 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
-/*******************************************************
- * JobClient interacts with the JobTracker network interface.
- * This object implements the job-control interface, and
- * should be the primary method by which user programs interact
- * with the networked job system.
+/**
+ * <code>JobClient</code> is the primary interface for the user-job to interact
+ * with the {@link JobTracker}.
+ * 
+ * <code>JobClient</code> provides facilities to submit jobs, track their 
+ * progress, access component-tasks' reports/logs, get the Map-Reduce cluster
+ * status information etc.
+ * 
+ * <p>The job submission process involves:
+ * <ol>
+ *   <li>
+ *   Checking the input and output specifications of the job.
+ *   </li>
+ *   <li>
+ *   Computing the {@link InputSplit}s for the job.
+ *   </li>
+ *   <li>
+ *   Setup the requisite accounting information for the {@link DistributedCache} 
+ *   of the job, if necessary.
+ *   </li>
+ *   <li>
+ *   Copying the job's jar and configuration to the map-reduce system directory 
+ *   on the distributed file-system. 
+ *   </li>
+ *   <li>
+ *   Submitting the job to the <code>JobTracker</code> and optionally monitoring
+ *   it's status.
+ *   </li>
+ * </ol></p>
+ *  
+ * Normally the user creates the application, describes various facets of the
+ * job via {@link JobConf} and then uses the <code>JobClient</code> to submit 
+ * the job and monitor its progress.
+ * 
+ * <p>Here is an example on how to use <code>JobClient</code>:</p>
+ * <p><blockquote><pre>
+ *     // Create a new JobConf
+ *     JobConf job = new JobConf(new Configuration(), MyJob.class);
+ *     
+ *     // Specify various job-specific parameters     
+ *     job.setJobName("myjob");
+ *     
+ *     job.setInputPath(new Path("in"));
+ *     job.setOutputPath(new Path("out"));
+ *     
+ *     job.setMapperClass(MyJob.MyMapper.class);
+ *     job.setReducerClass(MyJob.MyReducer.class);
  *
- *******************************************************/
+ *     // Submit the job, then poll for progress until the job is complete
+ *     JobClient.runJob(job);
+ * </pre></blockquote></p>
+ * 
+ * <h4 id="JobControl">Job Control</h4>
+ * 
+ * <p>At times clients would chain map-reduce jobs to accomplish complex tasks 
+ * which cannot be done via a single map-reduce job. This is fairly easy since 
+ * the output of the job, typically, goes to distributed file-system and that 
+ * can be used as the input for the next job.</p>
+ * 
+ * <p>However, this also means that the onus on ensuring jobs are complete 
+ * (success/failure) lies squarely on the clients. In such situations the 
+ * various job-control options are:
+ * <ol>
+ *   <li>
+ *   {@link #runJob(JobConf)} : submits the job and returns only after 
+ *   the job has completed.
+ *   </li>
+ *   <li>
+ *   {@link #submitJob(JobConf)} : only submits the job, then poll the 
+ *   returned handle to the {@link RunningJob} to query status and make 
+ *   scheduling decisions.
+ *   </li>
+ *   <li>
+ *   {@link JobConf#setJobEndNotificationURI(String)} : setup a notification
+ *   on job-completion, thus avoiding polling.
+ *   </li>
+ * </ol></p>
+ * 
+ * @see JobConf
+ * @see ClusterStatus
+ * @see Tool
+ * @see DistributedCache
+ */
 public class JobClient extends Configured implements MRConstants, Tool  {
   private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobClient");
   public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
@@ -239,16 +315,28 @@ public class JobClient extends Configured implements MRConstants, Tool  {
   static Random r = new Random();
 
   /**
-   * Build a job client, connect to the default job tracker
+   * Create a job client.
    */
   public JobClient() {
   }
     
+  /**
+   * Build a job client with the given {@link JobConf}, and connect to the 
+   * default {@link JobTracker}.
+   * 
+   * @param conf the job configuration.
+   * @throws IOException
+   */
   public JobClient(JobConf conf) throws IOException {
     setConf(conf);
     init(conf);
   }
     
+  /**
+   * Connect to the default {@link JobTracker}.
+   * @param conf the job configuration.
+   * @throws IOException
+   */
   public void init(JobConf conf) throws IOException {
     String tracker = conf.get("mapred.job.tracker", "local");
     if ("local".equals(tracker)) {
@@ -260,9 +348,10 @@ public class JobClient extends Configured implements MRConstants, Tool  {
 
   /**
    * Create a proxy JobSubmissionProtocol that retries timeouts.
-   * @param addr the address to connect to
-   * @param conf the server's configuration
-   * @return a proxy object that will retry timeouts
+   * 
+   * @param addr the address to connect to.
+   * @param conf the server's configuration.
+   * @return a proxy object that will retry timeouts.
    * @throws IOException
    */
   private JobSubmissionProtocol createProxy(InetSocketAddress addr,
@@ -286,6 +375,9 @@ public class JobClient extends Configured implements MRConstants, Tool  {
 
   /**
    * Build a job client, connect to the indicated job tracker.
+   * 
+   * @param jobTrackAddr the job tracker to connect to.
+   * @param conf configuration.
    */
   public JobClient(InetSocketAddress jobTrackAddr, 
                    Configuration conf) throws IOException {
@@ -293,6 +385,7 @@ public class JobClient extends Configured implements MRConstants, Tool  {
   }
 
   /**
+   * Close the <code>JobClient</code>.
    */
   public synchronized void close() throws IOException {
   }
@@ -300,6 +393,8 @@ public class JobClient extends Configured implements MRConstants, Tool  {
   /**
    * Get a filesystem handle.  We need this to prepare jobs
    * for submission to the MapReduce system.
+   * 
+   * @return the filesystem handle.
    */
   public synchronized FileSystem getFs() throws IOException {
     if (this.fs == null) {
@@ -310,10 +405,21 @@ public class JobClient extends Configured implements MRConstants, Tool  {
   }
 
   /**
-   * Submit a job to the MR system
+   * Submit a job to the MR system.
+   * 
+   * This returns a handle to the {@link RunningJob} which can be used to track
+   * the running-job.
+   * 
+   * @param jobFile the job configuration.
+   * @return a handle to the {@link RunningJob} which can be used to track the
+   *         running-job.
+   * @throws FileNotFoundException
+   * @throws InvalidJobConfException
+   * @throws IOException
    */
   public RunningJob submitJob(String jobFile) throws FileNotFoundException, 
-                                                     InvalidJobConfException, IOException {
+                                                     InvalidJobConfException, 
+                                                     IOException {
     // Load in the submitted job details
     JobConf job = new JobConf(jobFile);
     return submitJob(job);
@@ -321,7 +427,16 @@ public class JobClient extends Configured implements MRConstants, Tool  {
     
    
   /**
-   * Submit a job to the MR system
+   * Submit a job to the MR system.
+   * This returns a handle to the {@link RunningJob} which can be used to track
+   * the running-job.
+   * 
+   * @param job the job configuration.
+   * @return a handle to the {@link RunningJob} which can be used to track the
+   *         running-job.
+   * @throws FileNotFoundException
+   * @throws InvalidJobConfException
+   * @throws IOException
    */
   public RunningJob submitJob(JobConf job) throws FileNotFoundException, 
                                                   InvalidJobConfException, IOException {
@@ -551,8 +666,13 @@ public class JobClient extends Configured implements MRConstants, Tool  {
   }
     
   /**
-   * Get an RunningJob object to track an ongoing job.  Returns
+   * Get an {@link RunningJob} object to track an ongoing job.  Returns
    * null if the id does not correspond to any known job.
+   * 
+   * @param jobid the jobid of the job.
+   * @return the {@link RunningJob} handle to track the job, null if the 
+   *         <code>jobid</code> doesn't correspond to any known job.
+   * @throws IOException
    */
   public RunningJob getJob(String jobid) throws IOException {
     JobStatus status = jobSubmitClient.getJobStatus(jobid);
@@ -565,8 +685,10 @@ public class JobClient extends Configured implements MRConstants, Tool  {
 
   /**
    * Get the information of the current state of the map tasks of a job.
-   * @param jobId the job to query
-   * @return the list of all of the map tips
+   * 
+   * @param jobId the job to query.
+   * @return the list of all of the map tips.
+   * @throws IOException
    */
   public TaskReport[] getMapTaskReports(String jobId) throws IOException {
     return jobSubmitClient.getMapTaskReports(jobId);
@@ -574,23 +696,44 @@ public class JobClient extends Configured implements MRConstants, Tool  {
     
   /**
    * Get the information of the current state of the reduce tasks of a job.
-   * @param jobId the job to query
-   * @return the list of all of the map tips
+   * 
+   * @param jobId the job to query.
+   * @return the list of all of the reduce tips.
+   * @throws IOException
    */    
   public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
     return jobSubmitClient.getReduceTaskReports(jobId);
   }
-    
+   
+  /**
+   * Get status information about the Map-Reduce cluster.
+   *  
+   * @return the status information about the Map-Reduce cluster as an object
+   *         of {@link ClusterStatus}.
+   * @throws IOException
+   */
   public ClusterStatus getClusterStatus() throws IOException {
     return jobSubmitClient.getClusterStatus();
   }
     
+
+  /** 
+   * Get the jobs that are not completed and not failed.
+   * 
+   * @return array of {@link JobStatus} for the running/to-be-run jobs.
+   * @throws IOException
+   */
   public JobStatus[] jobsToComplete() throws IOException {
     return jobSubmitClient.jobsToComplete();
   }
     
-  /** Utility that submits a job, then polls for progress until the job is
-   * complete. */
+  /** 
+   * Utility that submits a job, then polls for progress until the job is
+   * complete.
+   * 
+   * @param job the job configuration.
+   * @throws IOException
+   */
   public static RunningJob runJob(JobConf job) throws IOException {
     JobClient jc = new JobClient(job);
     boolean error = true;
@@ -764,9 +907,10 @@ public class JobClient extends Configured implements MRConstants, Tool  {
   }
     
   /**
-   * Get the task output filter out of the JobConf
-   * @param job the JobConf to examine
-   * @return the filter level
+   * Get the task output filter out of the JobConf.
+   * 
+   * @param job the JobConf to examine.
+   * @return the filter level.
    */
   public static TaskStatusFilter getTaskOutputFilter(JobConf job) {
     return TaskStatusFilter.valueOf(job.get("jobclient.output.filter", 
@@ -774,9 +918,10 @@ public class JobClient extends Configured implements MRConstants, Tool  {
   }
     
   /**
-   * Modify the JobConf to set the task output filter
-   * @param job the JobConf to modify
-   * @param newValue the value to set
+   * Modify the JobConf to set the task output filter.
+   * 
+   * @param job the JobConf to modify.
+   * @param newValue the value to set.
    */
   public static void setTaskOutputFilter(JobConf job, 
                                          TaskStatusFilter newValue) {

파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
+ 498 - 77
src/java/org/apache/hadoop/mapred/JobConf.java


+ 1 - 1
src/java/org/apache/hadoop/mapred/JobEndNotifier.java

@@ -95,7 +95,7 @@ public class JobEndNotifier {
   private static JobEndStatusInfo createNotification(JobConf conf,
                                                      JobStatus status) {
     JobEndStatusInfo notification = null;
-    String uri = conf.get("job.end.notification.url");
+    String uri = conf.getJobEndNotificationURI();
     if (uri != null) {
       // +1 to make logic for first notification identical to a retry
       int retryAttempts = conf.getInt("job.end.retry.attempts", 0) + 1;

+ 5 - 2
src/java/org/apache/hadoop/mapred/MapReduceBase.java

@@ -23,8 +23,11 @@ import java.io.IOException;
 import org.apache.hadoop.io.Closeable;
 import org.apache.hadoop.mapred.JobConfigurable;
 
-/** Base class for {@link Mapper} and {@link Reducer} implementations.
- * Provides default implementations for a few methods.
+/** 
+ * Base class for {@link Mapper} and {@link Reducer} implementations.
+ * 
+ * <p>Provides default no-op implementations for a few methods, most non-trivial
+ * applications need to override some of them.</p>
  */
 public class MapReduceBase implements Closeable, JobConfigurable {
 

+ 18 - 5
src/java/org/apache/hadoop/mapred/MapRunnable.java

@@ -23,15 +23,28 @@ import java.io.IOException;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-/** Expert: Permits greater control of map processing. For example,
- * implementations might perform multi-threaded, asynchronous mappings. */
+/**
+ * Expert: Generic interface for {@link Mapper}s.
+ * 
+ * <p>Custom implementations of <code>MapRunnable</code> can exert greater 
+ * control on map processing e.g. multi-threaded, asynchronous mappers etc.</p>
+ * 
+ * @see Mapper
+ */
 public interface MapRunnable<K1 extends WritableComparable, V1 extends Writable,
                              K2 extends WritableComparable, V2 extends Writable>
     extends JobConfigurable {
   
-  /** Called to execute mapping.  Mapping is complete when this returns.
-   * @param input the {@link RecordReader} with input key/value pairs.
-   * @param output the {@link OutputCollector} for mapped key/value pairs.
+  /** 
+   * Start mapping input <tt>&lt;key, value&gt;</tt> pairs.
+   *  
+   * <p>Mapping of input records to output records is complete when this method 
+   * returns.</p>
+   * 
+   * @param input the {@link RecordReader} to read the input records.
+   * @param output the {@link OutputCollector} to collect the outputrecords.
+   * @param reporter {@link Reporter} to report progress, status-updates etc.
+   * @throws IOException
    */
   void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
            Reporter reporter)

+ 127 - 15
src/java/org/apache/hadoop/mapred/Mapper.java

@@ -20,29 +20,141 @@ package org.apache.hadoop.mapred;
 
 import java.io.IOException;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Closeable;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.compress.CompressionCodec;
 
-/** Maps input key/value pairs to a set of intermediate key/value pairs.  All
- * intermediate values associated with a given output key are subsequently
- * grouped by the map/reduce system, and passed to a {@link Reducer} to
- * determine the final output.. */
+/** 
+ * Maps input key/value pairs to a set of intermediate key/value pairs.  
+ * 
+ * <p>Maps are the individual tasks which transform input records into a 
+ * intermediate records. The transformed intermediate records need not be of 
+ * the same type as the input records. A given input pair may map to zero or 
+ * many output pairs.</p> 
+ * 
+ * <p>The Hadoop Map-Reduce framework spawns one map task for each 
+ * {@link InputSplit} generated by the {@link InputFormat} for the job.
+ * <code>Mapper</code> implementations can access the {@link JobConf} for the 
+ * job via the {@link JobConfigurable#configure(JobConf)} and initialize
+ * themselves. Similarly they can use the {@link Closeable#close()} method for
+ * de-initialization.</p>
+ * 
+ * <p>The framework then calls 
+ * {@link #map(WritableComparable, Writable, OutputCollector, Reporter)} 
+ * for each key/value pair in the <code>InputSplit</code> for that task.</p>
+ * 
+ * <p>All intermediate values associated with a given output key are 
+ * subsequently grouped by the framework, and passed to a {@link Reducer} to  
+ * determine the final output. Users can control the grouping by specifying
+ * a <code>Comparator</code> via 
+ * {@link JobConf#setOutputKeyComparatorClass(Class)}.</p>
+ *
+ * <p>The grouped <code>Mapper</code> outputs are partitioned per 
+ * <code>Reducer</code>. Users can control which keys (and hence records) go to 
+ * which <code>Reducer</code> by implementing a custom {@link Partitioner}.
+ * 
+ * <p>Users can optionally specify a <code>combiner</code>, via 
+ * {@link JobConf#setCombinerClass(Class)}, to perform local aggregation of the 
+ * intermediate outputs, which helps to cut down the amount of data transferred 
+ * from the <code>Mapper</code> to the <code>Reducer</code>.
+ * 
+ * <p>The intermediate, grouped outputs are always stored in 
+ * {@link SequenceFile}s. Applications can specify if and how the intermediate
+ * outputs are to be compressed and which {@link CompressionCodec}s are to be
+ * used via the <code>JobConf</code>.</p>
+ *  
+ * <p>If the job has 
+ * <a href="{@docRoot}/org/apache/hadoop/mapred/JobConf.html#ReducerNone">zero
+ * reduces</a> then the output of the <code>Mapper</code> is directly written
+ * to the {@link FileSystem} without grouping by keys.</p>
+ * 
+ * <p>Example:</p>
+ * <p><blockquote><pre>
+ *     public class MyMapper&lt;K extends WritableComparable, V extends Writable&gt; 
+ *     extends MapReduceBase implements Mapper&lt;K, V, K, V&gt; {
+ *     
+ *       static enum MyCounters { NUM_RECORDS }
+ *       
+ *       private String mapTaskId;
+ *       private String inputFile;
+ *       private int noRecords = 0;
+ *       
+ *       public void configure(JobConf job) {
+ *         mapTaskId = job.get("mapred.task.id");
+ *         inputFile = job.get("mapred.input.file");
+ *       }
+ *       
+ *       public void map(K key, V val,
+ *                       OutputCollector&lt;K, V&gt; output, Reporter reporter)
+ *       throws IOException {
+ *         // Process the &lt;key, value&gt; pair (assume this takes a while)
+ *         // ...
+ *         // ...
+ *         
+ *         // Let the framework know that we are alive, and kicking!
+ *         // reporter.progress();
+ *         
+ *         // Process some more
+ *         // ...
+ *         // ...
+ *         
+ *         // Increment the no. of &lt;key, value&gt; pairs processed
+ *         ++noRecords;
+ *
+ *         // Increment counters
+ *         reporter.incrCounter(NUM_RECORDS, 1);
+ *        
+ *         // Every 100 records update application-level status
+ *         if ((noRecords%100) == 0) {
+ *           reporter.setStatus(mapTaskId + " processed " + noRecords + 
+ *                              " from input-file: " + inputFile); 
+ *         }
+ *         
+ *         // Output the result
+ *         output.collect(key, val);
+ *       }
+ *     }
+ *
+ * <p>Applications may write a custom {@link MapRunnable} to exert greater
+ * control on map processing e.g. multi-threaded <code>Mapper</code>s etc.</p>
+ * 
+ * @see JobConf
+ * @see InputFormat
+ * @see Partitioner  
+ * @see Reducer
+ * @see MapReduceBase
+ * @see MapRunnable
+ * @see SequenceFile
+ */
 public interface Mapper<K1 extends WritableComparable, V1 extends Writable,
                         K2 extends WritableComparable, V2 extends Writable>
   extends JobConfigurable, Closeable {
   
-  /** Maps a single input key/value pair into intermediate key/value pairs.
-   * Output pairs need not be of the same types as input pairs.  A given input
-   * pair may map to zero or many output pairs.  Output pairs are collected
-   * with calls to {@link
-   * OutputCollector#collect(WritableComparable,Writable)}.
+  /** 
+   * Maps a single input key/value pair into an intermediate key/value pair.
+   * 
+   * <p>Output pairs need not be of the same types as input pairs.  A given 
+   * input pair may map to zero or many output pairs.  Output pairs are 
+   * collected with calls to 
+   * {@link OutputCollector#collect(WritableComparable,Writable)}.</p>
    *
-   * @param key the key
-   * @param value the values
-   * @param output collects mapped keys and values
+   * <p>Applications can use the {@link Reporter} provided to report progress 
+   * or just indicate that they are alive. In scenarios where the application 
+   * takes an insignificant amount of time to process individual key/value 
+   * pairs, this is crucial since the framework might assume that the task has 
+   * timed-out and kill that task. The other way of avoiding this is to set 
+   * <a href="{@docRoot}/../hadoop-default.html#mapred.task.timeout">
+   * mapred.task.timeout</a> to a high-enough value (or even zero for no 
+   * time-outs).</p>
+   * 
+   * @param key the input key.
+   * @param value the input value.
+   * @param output collects mapped keys and values.
+   * @param reporter facility to report progress.
    */
-  void map(K1 key, V1 value,
-           OutputCollector<K2, V2> output, Reporter reporter)
-    throws IOException;
+  void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)
+  throws IOException;
 }

+ 12 - 4
src/java/org/apache/hadoop/mapred/OutputCollector.java

@@ -24,15 +24,23 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 
-/** Passed to {@link Mapper} and {@link Reducer} implementations to collect
- * output data. */
+/**
+ * Collects the <code>&lt;key, value&gt;</code> pairs output by {@link Mapper}s
+ * and {@link Reducer}s.
+ *  
+ * <p><code>OutputCollector</code> is the generalization of the facility 
+ * provided by the Map-Reduce framework to collect data output by either the 
+ * <code>Mapper</code> or the <code>Reducer</code> i.e. intermediate outputs 
+ * or the output of the job.</p>  
+ */
 public interface OutputCollector<K extends WritableComparable,
                                  V extends Writable> {
   
   /** Adds a key/value pair to the output.
    *
-   * @param key the key to add
-   * @param value to value to add
+   * @param key the key to collect.
+   * @param value to value to collect.
+   * @throws IOException
    */
   void collect(K key, V value) throws IOException;
 }

+ 37 - 12
src/java/org/apache/hadoop/mapred/OutputFormat.java

@@ -25,28 +25,53 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.Progressable;
 
-/** An output data format.  Output files are stored in a {@link
- * FileSystem}. */
+/** 
+ * <code>OutputFormat</code> describes the output-specification for a 
+ * Map-Reduce job.
+ *
+ * <p>The Map-Reduce framework relies on the <code>OutputFormat</code> of the
+ * job to:<p>
+ * <ol>
+ *   <li>
+ *   Validate the output-specification of the job. For e.g. check that the 
+ *   output directory doesn't already exist. 
+ *   <li>
+ *   Provide the {@link RecordWriter} implementation to be used to write out
+ *   the output files of the job. Output files are stored in a 
+ *   {@link FileSystem}.
+ *   </li>
+ * </ol>
+ * 
+ * @see RecordWriter
+ * @see JobConf
+ */
 public interface OutputFormat<K extends WritableComparable,
                               V extends Writable> {
 
-  /** Construct a {@link RecordWriter} with Progressable.
+  /** 
+   * Get the {@link RecordWriter} for the given job.
    *
-   * @param job the job whose output is being written
-   * @param name the unique name for this part of the output
-   * @param progress mechanism for reporting progress while writing to file
-   * @return a {@link RecordWriter}
+   * @param ignored
+   * @param job configuration for the job whose output is being written.
+   * @param name the unique name for this part of the output.
+   * @param progress mechanism for reporting progress while writing to file.
+   * @return a {@link RecordWriter} to write the output for the job.
+   * @throws IOException
    */
   RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
                                      String name, Progressable progress)
-    throws IOException;
+  throws IOException;
 
-  /** Check whether the output specification for a job is appropriate.  Called
-   * when a job is submitted.  Typically checks that it does not already exist,
+  /** 
+   * Check for validity of the output-specification for the job.
+   *  
+   * <p>This is to validate the output specification for the job when it is
+   * a job is submitted.  Typically checks that it does not already exist,
    * throwing an exception when it already exists, so that output is not
-   * overwritten.
+   * overwritten.</p>
    *
-   * @param job the job whose output will be written
+   * @param ignored
+   * @param job job configuration.
    * @throws IOException when output should not be attempted
    */
   void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;

+ 21 - 7
src/java/org/apache/hadoop/mapred/Partitioner.java

@@ -21,18 +21,32 @@ package org.apache.hadoop.mapred;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-/** Partitions the key space.  A partition is created for each reduce task. */
+/** 
+ * Partitions the key space.
+ * 
+ * <p><code>Partitioner</code> controls the partitioning of the keys of the 
+ * intermediate map-outputs. The key (or a subset of the key) is used to derive
+ * the partition, typically by a hash function. The total number of partitions
+ * is the same as the number of reduce tasks for the job. Hence this controls
+ * which of the <code>m</code> reduce tasks the intermediate key (and hence the 
+ * record) is sent for reduction.</p>
+ * 
+ * @see Reducer
+ */
 public interface Partitioner<K2 extends WritableComparable,
                              V2 extends Writable>
   extends JobConfigurable {
   
-  /** Returns the paritition number for a given entry given the total number of
-   * partitions.  Typically a hash function on a all or a subset of the key.
+  /** 
+   * Get the paritition number for a given key (hence record) given the total 
+   * number of partitions i.e. number of reduce-tasks for the job.
+   *   
+   * <p>Typically a hash function on a all or a subset of the key.</p>
    *
-   * @param key the entry key
-   * @param value the entry value
-   * @param numPartitions the number of partitions
-   * @return the partition number
+   * @param key the key to be paritioned.
+   * @param value the entry value.
+   * @param numPartitions the total number of partitions.
+   * @return the partition number for the <code>key</code>.
    */
   int getPartition(K2 key, V2 value, int numPartitions);
 }

+ 35 - 10
src/java/org/apache/hadoop/mapred/RecordReader.java

@@ -24,11 +24,23 @@ import java.io.DataInput;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-/** Reads key/value pairs from an input file {@link FileSplit}.
- * Implemented by {@link InputFormat} implementations. */
+/**
+ * <code>RecordReader</code> reads &lt;key, value&gt; pairs from an 
+ * {@link InputSplit}.
+ *   
+ * <p><code>RecordReader</code>, typically, converts the byte-oriented view of 
+ * the input, provided by the <code>InputSplit</code>, and presents a 
+ * record-oriented view for the {@link Mapper} & {@link Reducer} tasks for 
+ * processing. It thus assumes the responsibility of processing record 
+ * boundaries and presenting the tasks with keys and values.</p>
+ * 
+ * @see InputSplit
+ * @see InputFormat
+ */
 public interface RecordReader<K extends WritableComparable,
                               V extends Writable> {
-  /** Reads the next key/value pair.
+  /** 
+   * Reads the next key/value pair from the input for processing.
    *
    * @param key the key to read data into
    * @param value the value to read data into
@@ -40,25 +52,38 @@ public interface RecordReader<K extends WritableComparable,
   
   /**
    * Create an object of the appropriate type to be used as a key.
-   * @return a new key object
+   * 
+   * @return a new key object.
    */
   K createKey();
   
   /**
-   * Create an object of the appropriate type to be used as the value.
-   * @return a new value object
+   * Create an object of the appropriate type to be used as a value.
+   * 
+   * @return a new value object.
    */
   V createValue();
 
-  /** Returns the current position in the input. */
+  /** 
+   * Returns the current position in the input.
+   * 
+   * @return the current position in the input.
+   * @throws IOException
+   */
   long getPos() throws IOException;
 
-  /** Close this to future operations.*/ 
+  /** 
+   * Close this {@link InputSplit} to future operations.
+   * 
+   * @throws IOException
+   */ 
   public void close() throws IOException;
 
   /**
-   * How far has the reader gone through the input.
-   * @return progress from 0.0 to 1.0
+   * How much of the input has the {@link RecordReader} consumed i.e.
+   * has been processed by?
+   * 
+   * @return progress from <code>0.0</code> to <code>1.0</code>.
    * @throws IOException
    */
   float getProgress() throws IOException;

+ 21 - 7
src/java/org/apache/hadoop/mapred/RecordWriter.java

@@ -21,22 +21,36 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 import java.io.DataOutput;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
 
-/** Writes key/value pairs to an output file.  Implemented by {@link
- * OutputFormat} implementations. */
+/**
+ * <code>RecordWriter</code> writes the output &lt;key, value&gt; pairs 
+ * to an output file.
+ 
+ * <p><code>RecordWriter</code> implementations write the job outputs to the
+ * {@link FileSystem}.
+ * 
+ * @see OutputFormat
+ */
 public interface RecordWriter<K extends WritableComparable,
                               V extends Writable> {
-  /** Writes a key/value pair.
-   *
-   * @param key the key to write
-   * @param value the value to write
+  /** 
+   * Writes a key/value pair.
    *
+   * @param key the key to write.
+   * @param value the value to write.
+   * @throws IOException
    * @see Writable#write(DataOutput)
    */      
   void write(K key, V value) throws IOException;
 
-  /** Close this to future operations.*/ 
+  /** 
+   * Close this <code>RecordWriter</code> to future operations.
+   * 
+   * @param reporter facility to report progress.
+   * @throws IOException
+   */ 
   void close(Reporter reporter) throws IOException;
 }

+ 161 - 9
src/java/org/apache/hadoop/mapred/Reducer.java

@@ -22,24 +22,176 @@ import java.io.IOException;
 
 import java.util.Iterator;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Closeable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-/** Reduces a set of intermediate values which share a key to a smaller set of
- * values.  Input values are the grouped output of a {@link Mapper}. */
+/** 
+ * Reduces a set of intermediate values which share a key to a smaller set of
+ * values.  
+ * 
+ * <p>The number of <code>Reducer</code>s for the job is set by the user via 
+ * {@link JobConf#setNumReduceTasks(int)}. <code>Reducer</code> implementations 
+ * can access the {@link JobConf} for the job via the 
+ * {@link JobConfigurable#configure(JobConf)} method and initialize themselves. 
+ * Similarly they can use the {@link Closeable#close()} method for
+ * de-initialization.</p>
+
+ * <p><code>Reducer</code> has 3 primary phases:</p>
+ * <ol>
+ *   <li>
+ *   
+ *   <h4 id="Shuffle">Shuffle</h4>
+ *   
+ *   <p><code>Reducer</code> is input the grouped output of a {@link Mapper}.
+ *   In the phase the framework, for each <code>Reducer</code>, fetches the 
+ *   relevant partition of the output of all the <code>Mapper</code>s, via HTTP. 
+ *   </p>
+ *   </li>
+ *   
+ *   <li>
+ *   <h4 id="Sort">Sort</h4>
+ *   
+ *   <p>The framework groups <code>Reducer</code> inputs by <code>key</code>s 
+ *   (since different <code>Mapper</code>s may have output the same key) in this
+ *   stage.</p>
+ *   
+ *   <p>The shuffle and sort phases occur simultaneously i.e. while outputs are
+ *   being fetched they are merged.</p>
+ *      
+ *   <h5 id="SecondarySort">SecondarySort</h5>
+ *   
+ *   <p>If equivalence rules for keys while grouping the intermediates are 
+ *   different from those for grouping keys before reduction, then one may 
+ *   specify a <code>Comparator</code> via 
+ *   {@link JobConf#setOutputValueGroupingComparator(Class)}.Since 
+ *   {@link JobConf#setOutputKeyComparatorClass(Class)} can be used to 
+ *   control how intermediate keys are grouped, these can be used in conjunction 
+ *   to simulate <i>secondary sort on values</i>.</p>
+ *   
+ *   
+ *   For example, say that you want to find duplicate web pages and tag them 
+ *   all with the url of the "best" known example. You would set up the job 
+ *   like:
+ *   <ul>
+ *     <li>Map Input Key: url</li>
+ *     <li>Map Input Value: document</li>
+ *     <li>Map Output Key: document checksum, url pagerank</li>
+ *     <li>Map Output Value: url</li>
+ *     <li>Partitioner: by checksum</li>
+ *     <li>OutputKeyComparator: by checksum and then decreasing pagerank</li>
+ *     <li>OutputValueGroupingComparator: by checksum</li>
+ *   </ul>
+ *   </li>
+ *   
+ *   <li>   
+ *   <h4 id="Reduce">Reduce</h4>
+ *   
+ *   <p>In this phase the 
+ *   {@link #reduce(WritableComparable, Iterator, OutputCollector, Reporter)}
+ *   method is called for each <code>&lt;key, (list of values)></code> pair in
+ *   the grouped inputs.</p>
+ *   <p>The output of the reduce task is typically written to the 
+ *   {@link FileSystem} via 
+ *   {@link OutputCollector#collect(WritableComparable, Writable)}.</p>
+ *   </li>
+ * </ol>
+ * 
+ * <p>The output of the <code>Reducer</code> is <b>not re-sorted</b>.</p>
+ * 
+ * <p>Example:</p>
+ * <p><blockquote><pre>
+ *     public class MyReducer&lt;K extends WritableComparable, V extends Writable&gt; 
+ *     extends MapReduceBase implements Reducer&lt;K, V, K, V&gt; {
+ *     
+ *       static enum MyCounters { NUM_RECORDS }
+ *        
+ *       private String reduceTaskId;
+ *       private int noKeys = 0;
+ *       
+ *       public void configure(JobConf job) {
+ *         reduceTaskId = job.get("mapred.task.id");
+ *       }
+ *       
+ *       public void reduce(K key, Iterator&lt;V&gt; values,
+ *                          OutputCollector&lt;K, V&gt; output, 
+ *                          Reporter reporter)
+ *       throws IOException {
+ *       
+ *         // Process
+ *         int noValues = 0;
+ *         while (values.hasNext()) {
+ *           V value = values.next();
+ *           
+ *           // Increment the no. of values for this key
+ *           ++noValues;
+ *           
+ *           // Process the &lt;key, value&gt; pair (assume this takes a while)
+ *           // ...
+ *           // ...
+ *           
+ *           // Let the framework know that we are alive, and kicking!
+ *           if ((noValues%10) == 0) {
+ *             reporter.progress();
+ *           }
+ *         
+ *           // Process some more
+ *           // ...
+ *           // ...
+ *           
+ *           // Output the &lt;key, value&gt; 
+ *           output.collect(key, value);
+ *         }
+ *         
+ *         // Increment the no. of &lt;key, list of values&gt; pairs processed
+ *         ++noKeys;
+ *         
+ *         // Increment counters
+ *         reporter.incrCounter(NUM_RECORDS, 1);
+ *         
+ *         // Every 100 keys update application-level status
+ *         if ((noKeys%100) == 0) {
+ *           reporter.setStatus(reduceTaskId + " processed " + noKeys);
+ *         }
+ *       }
+ *     }
+ * </pre></blockquote></p>
+ * 
+ * @see Mapper
+ * @see Partitioner
+ * @see Reporter
+ * @see MapReduceBase
+ */
 public interface Reducer<K2 extends WritableComparable, V2 extends Writable,
                          K3 extends WritableComparable, V3 extends Writable>
     extends JobConfigurable, Closeable {
   
-  /** Combines values for a given key.  Output values must be of the same type
-   * as input values.  Input keys must not be altered.  Typically all values
-   * are combined into zero or one value.  Output pairs are collected with
-   * calls to {@link OutputCollector#collect(WritableComparable,Writable)}.
+  /** 
+   * <i>Reduces</i> values for a given key.  
+   * 
+   * <p>The framework calls this method for each 
+   * <code>&lt;key, (list of values)></code> pair in the grouped inputs.
+   * Output values must be of the same type as input values.  Input keys must 
+   * not be altered.  Typically all values are combined into zero or one value.
+   * </p>
+   *   
+   * <p>Output pairs are collected with calls to  
+   * {@link OutputCollector#collect(WritableComparable,Writable)}.</p>
    *
-   * @param key the key
-   * @param values the values to combine
-   * @param output to collect combined values
+   * <p>Applications can use the {@link Reporter} provided to report progress 
+   * or just indicate that they are alive. In scenarios where the application 
+   * takes an insignificant amount of time to process individual key/value 
+   * pairs, this is crucial since the framework might assume that the task has 
+   * timed-out and kill that task. The other way of avoiding this is to set 
+   * <a href="{@docRoot}/../hadoop-default.html#mapred.task.timeout">
+   * mapred.task.timeout</a> to a high-enough value (or even zero for no 
+   * time-outs).</p>
+   * 
+   * @param key the key.
+   * @param values the list of values to reduce.
+   * @param output to collect keys and combined values.
+   * @param reporter facility to report progress.
    */
   void reduce(K2 key, Iterator<V2> values,
               OutputCollector<K3, V3> output, Reporter reporter)

+ 26 - 11
src/java/org/apache/hadoop/mapred/Reporter.java

@@ -18,11 +18,24 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.IOException;
-
 import org.apache.hadoop.util.Progressable;
 
-/** Passed to application code to permit alteration of status. */
+/** 
+ * A facility for Map-Reduce applications to report progress and update 
+ * counters, status information etc.
+ * 
+ * <p>{@link Mapper} and {@link Reducer} can use the <code>Reporter</code>
+ * provided to report progress or just indicate that they are alive. In 
+ * scenarios where the application takes an insignificant amount of time to 
+ * process individual key/value pairs, this is crucial since the framework 
+ * might assume that the task has timed-out and kill that task.
+ *
+ * <p>Applications can also update {@link Counters} via the provided 
+ * <code>Reporter</code> .</p>
+ * 
+ * @see Progressable
+ * @see Counters
+ */
 public interface Reporter extends Progressable {
   
   /**
@@ -41,25 +54,27 @@ public interface Reporter extends Progressable {
     };
 
   /**
-   * Alter the application's status description.
+   * Set the status description for the task.
    * 
-   * @param status
-   *          a brief description of the current status
+   * @param status brief description of the current status.
    */
   public abstract void setStatus(String status);
   
   /**
    * Increments the counter identified by the key, which can be of
-   * any enum type, by the specified amount.
-   * @param key A value of any enum type
+   * any {@link Enum} type, by the specified amount.
+   * 
+   * @param key key to identify the counter to be incremented. The key can be
+   *            be any <code>Enum</code>. 
    * @param amount A non-negative amount by which the counter is to 
-   * be incremented
+   *               be incremented.
    */
   public abstract void incrCounter(Enum key, long amount);
   
   /**
-   * Get the InputSplit object for a map.
-   * @return the input split that the map is reading from
+   * Get the {@link InputSplit} object for a map.
+   * 
+   * @return the <code>InputSplit</code> that the map is reading from.
    * @throws UnsupportedOperationException if called outside a mapper
    */
   public abstract InputSplit getInputSplit() 

+ 61 - 19
src/java/org/apache/hadoop/mapred/RunningJob.java

@@ -21,78 +21,120 @@ package org.apache.hadoop.mapred;
 import java.io.*;
 
 /** 
- * Includes details on a running MapReduce job.  A client can
- * track a living job using this object.
+ * <code>RunningJob</code> is the user-interface to query for details on a 
+ * running Map-Reduce job.
+ * 
+ * <p>Clients can get hold of <code>RunningJob</code> via the {@link JobClient}
+ * and then query the running-job for details such as name, configuration, 
+ * progress etc.</p> 
+ * 
+ * @see JobClient
  */
 public interface RunningJob {
   /**
-   * Returns an identifier for the job
+   * Get the job identifier.
+   * 
+   * @return the job identifier.
    */
   public String getJobID();
   
   /**
-   * Returns the name of the job
+   * Get the name of the job.
+   * 
+   * @return the name of the job.
    */
   public String getJobName();
 
   /**
-   * Returns the path of the submitted job.
+   * Get the path of the submitted job configuration.
+   * 
+   * @return the path of the submitted job configuration.
    */
   public String getJobFile();
 
   /**
-   * Returns a URL where some job progress information will be displayed.
+   * Get the URL where some job progress information will be displayed.
+   * 
+   * @return the URL where some job progress information will be displayed.
    */
   public String getTrackingURL();
 
   /**
-   * Returns a float between 0.0 and 1.0, indicating progress on
-   * the map portion of the job.  When all map tasks have completed,
-   * the function returns 1.0.
+   * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 
+   * and 1.0.  When all map tasks have completed, the function returns 1.0.
+   * 
+   * @return the progress of the job's map-tasks.
+   * @throws IOException
    */
   public float mapProgress() throws IOException;
 
   /**
-   * Returns a float between 0.0 and 1.0, indicating progress on
-   * the reduce portion of the job.  When all reduce tasks have completed,
-   * the function returns 1.0.
+   * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 
+   * and 1.0.  When all reduce tasks have completed, the function returns 1.0.
+   * 
+   * @return the progress of the job's reduce-tasks.
+   * @throws IOException
    */
   public float reduceProgress() throws IOException;
 
   /**
-   * Non-blocking function to check whether the job is finished or not.
+   * Check if the job is finished or not. 
+   * This is a non-blocking call.
+   * 
+   * @return <code>true</code> if the job is complete, else <code>false</code>.
+   * @throws IOException
    */
   public boolean isComplete() throws IOException;
 
   /**
-   * True iff job completed successfully.
+   * Check if the job completed successfully. 
+   * 
+   * @return <code>true</code> if the job succeeded, else <code>false</code>.
+   * @throws IOException
    */
   public boolean isSuccessful() throws IOException;
 
   /**
    * Blocks until the job is complete.
+   * 
+   * @throws IOException
    */
   public void waitForCompletion() throws IOException;
 
   /**
    * Kill the running job.  Blocks until all job tasks have been
    * killed as well.  If the job is no longer running, it simply returns.
+   * 
+   * @throws IOException
    */
   public void killJob() throws IOException;
     
-  public TaskCompletionEvent[] getTaskCompletionEvents(
-                                                       int startFrom) throws IOException;
+  /**
+   * Get events indicating completion (success/failure) of component tasks.
+   *  
+   * @param startFrom index to start fetching events from
+   * @return an array of {@link TaskCompletionEvent}s
+   * @throws IOException
+   */
+  public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom) 
+  throws IOException;
   
   /**
    * Kill indicated task attempt.
-   * @param taskId the id of the task to kill.
-   * @param shouldFail if true the task is failed and added to failed tasks list, otherwise
-   * it is just killed, w/o affecting job failure status.  
+   * 
+   * @param taskId the id of the task to be terminated.
+   * @param shouldFail if true the task is failed and added to failed tasks 
+   *                   list, otherwise it is just killed, w/o affecting 
+   *                   job failure status.  
+   * @throws IOException
    */
   public void killTask(String taskId, boolean shouldFail) throws IOException;
     
   /**
    * Gets the counters for this job.
+   * 
+   * @return the counters for this job.
+   * @throws IOException
    */
   public Counters getCounters() throws IOException;
 }

+ 204 - 8
src/java/org/apache/hadoop/mapred/package.html

@@ -1,16 +1,212 @@
 <html>
 <body>
 
-<p>A system for scalable, fault-tolerant, distributed computation over
-large data collections.</p>
+<p>A software framework for easily writing applications which process vast 
+amounts of data (multi-terabyte data-sets) parallelly on large clusters 
+(thousands of nodes) built of commodity hardware in a reliable, fault-tolerant 
+manner.</p>
 
-<p>Applications implement {@link org.apache.hadoop.mapred.Mapper} and
-{@link org.apache.hadoop.mapred.Reducer} interfaces.  These are submitted
-as a {@link org.apache.hadoop.mapred.JobConf} and are applied to data
-stored in a {@link org.apache.hadoop.fs.FileSystem}.</p>
+<p>A Map-Reduce <i>job</i> usually splits the input data-set into independent 
+chunks which processed by <i>map</i> tasks in completely parallel manner, 
+followed by <i>reduce</i> tasks which aggregating their output. Typically both 
+the input and the output of the job are stored in a 
+{@link org.apache.hadoop.fs.FileSystem}. The framework takes care of monitoring 
+tasks and re-executing failed ones. Since, usually, the compute nodes and the 
+storage nodes are the same i.e. Hadoop's Map-Reduce framework and Distributed 
+FileSystem are running on the same set of nodes, tasks are effectively scheduled 
+on the nodes where data is already present, resulting in very high aggregate 
+bandwidth across the cluster.</p>
 
-<p>See <a href="http://labs.google.com/papers/mapreduce.html">Google's
-original Map/Reduce paper</a> for background information.</p>
+<p>The Map-Reduce framework operates exclusively on <tt>&lt;key, value&gt;</tt> 
+pairs i.e. the input to the job is viewed as a set of <tt>&lt;key, value&gt;</tt>
+pairs and the output as another, possibly different, set of 
+<tt>&lt;key, value&gt;</tt> pairs. The <tt>key</tt>s and <tt>value</tt>s have to 
+be serializable as {@link org.apache.hadoop.io.Writable}s and additionally the
+<tt>key</tt>s have to be {@link org.apache.hadoop.io.WritableComparable}s in 
+order to facilitate grouping by the framework.</p>
+
+<p>Data flow:</p>
+<pre>
+                                (input)
+                                <tt>&lt;k1, v1&gt;</tt>
+       
+                                   |
+                                   V
+       
+                                  <b>map</b>
+       
+                                   |
+                                   V
+
+                                <tt>&lt;k2, v2&gt;</tt>
+       
+                                   |
+                                   V
+       
+                                <b>combine</b>
+       
+                                   |
+                                   V
+       
+                                <tt>&lt;k2, v2&gt;</tt>
+       
+                                   |
+                                   V
+       
+                                 <b>reduce</b>
+       
+                                   |
+                                   V
+       
+                                <tt>&lt;k3, v3&gt;</tt>
+                                (output)
+</pre>
+
+<p>Applications typically implement 
+{@link org.apache.hadoop.mapred.Mapper#map(WritableComparable, Writable, OutputCollector, Reporter)} 
+and
+{@link org.apache.hadoop.mapred.Reducer#reduce(WritableComparable, Iterator, OutputCollector, Reporter)} 
+methods.  The application-writer also specifies various facets of the job such
+as input and output locations, the <tt>Partitioner</tt>, <tt>InputFormat</tt> 
+&amp; <tt>OutputFormat</tt> implementations to be used etc. as 
+a {@link org.apache.hadoop.mapred.JobConf}. The client program, 
+{@link org.apache.hadoop.mapred.JobClient}, then submits the job to the framework 
+and optionally monitors it.</p>
+
+<p>The framework spawns one map task per 
+{@link org.apache.hadoop.mapred.InputSplit} generated by the 
+{@link org.apache.hadoop.mapred.InputFormat} of the job and calls 
+{@link org.apache.hadoop.mapred.Mapper#map(WritableComparable, Writable, OutputCollector, Reporter)} 
+with each &lt;key, value&gt; pair read by the 
+{@link org.apache.hadoop.mapred.RecordReader} from the <tt>InputSplit</tt> for 
+the task. The intermediate outputs of the maps are then grouped by <tt>key</tt>s
+and optionally aggregated by <i>combiner</i>. The key space of intermediate 
+outputs are paritioned by the {@link org.apache.hadoop.mapred.Partitioner}, where 
+the number of partitions is exactly the number of reduce tasks for the job.</p>
+
+<p>The reduce tasks fetch the sorted intermediate outputs of the maps, via http, 
+merge the &lt;key, value&gt; pairs and call 
+{@link org.apache.hadoop.mapred.Reducer#reduce(WritableComparable, Iterator, OutputCollector, Reporter)} 
+for each &lt;key, list of values&gt; pair. The output of the reduce tasks' is 
+stored on the <tt>FileSystem</tt> by the 
+{@link org.apache.hadoop.mapred.RecordWriter} provided by the
+{@link org.apache.hadoop.mapred.OutputFormat} of the job.</p>
+
+<p>Map-Reduce application to perform a distributed <i>grep</i>:</p>
+<pre><tt>
+public class Grep extends Configured implements Tool {
+
+  // <i>map: Search for the pattern specified by 'grep.mapper.regex' &amp;</i>
+  //      <i>'grep.mapper.regex.group'</i>
+
+  class GrepMapper&lt;K extends WritableComparable, Text&gt; 
+  extends MapReduceBase  implements Mapper&lt;K, Text, Text, LongWritable&gt; {
+
+    private Pattern pattern;
+    private int group;
+
+    public void configure(JobConf job) {
+      pattern = Pattern.compile(job.get("grep.mapper.regex"));
+      group = job.getInt("grep.mapper.regex.group", 0);
+    }
+
+    public void map(K key, Text value,
+                    OutputCollector&lt;Text, LongWritable&gt; output,
+                    Reporter reporter)
+    throws IOException {
+      String text = value.toString();
+      Matcher matcher = pattern.matcher(text);
+      while (matcher.find()) {
+        output.collect(new Text(matcher.group(group)), new LongWritable(1));
+      }
+    }
+  }
+
+  // <i>reduce: Count the number of occurrences of the pattern</i>
+
+  class GrepReducer&lt;K extends WritableComparable&gt; extends MapReduceBase
+  implements Reducer&lt;K, LongWritable, K, LongWritable&gt; {
+
+    public void reduce(K key, Iterator&lt;LongWritable&gt; values,
+                       OutputCollector&lt;K, LongWritable&gt; output,
+                       Reporter reporter)
+    throws IOException {
+
+      // sum all values for this key
+      long sum = 0;
+      while (values.hasNext()) {
+        sum += values.next().get();
+      }
+
+      // output sum
+      output.collect(key, new LongWritable(sum));
+    }
+  }
+  
+  public int run(String[] args) throws Exception {
+    if (args.length &lt; 3) {
+      System.out.println("Grep &lt;inDir&gt; &lt;outDir&gt; &lt;regex&gt; [&lt;group&gt;]");
+      ToolRunner.printGenericCommandUsage(System.out);
+      return -1;
+    }
+
+    JobConf grepJob = new JobConf(getConf(), Grep.class);
+    
+    grepJob.setJobName("grep");
+
+    grepJob.setInputPath(new Path(args[0]));
+    grepJob.setOutputPath(args[1]);
+
+    grepJob.setMapperClass(GrepMapper.class);
+    grepJob.setCombinerClass(GrepReducer.class);
+    grepJob.setReducerClass(GrepReducer.class);
+
+    grepJob.set("mapred.mapper.regex", args[2]);
+    if (args.length == 4)
+      grepJob.set("mapred.mapper.regex.group", args[3]);
+
+    grepJob.setOutputFormat(SequenceFileOutputFormat.class);
+    grepJob.setOutputKeyClass(Text.class);
+    grepJob.setOutputValueClass(LongWritable.class);
+
+    JobClient.runJob(grepJob);
+
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new Grep(), args);
+    System.exit(res);
+  }
+
+}
+</tt></pre>
+
+<p>Notice how the data-flow of the above grep job is very similar to doing the
+same via the unix pipeline:</p>
+
+<pre>
+cat input/*   |   grep   |   sort    |   uniq -c   &gt;   out
+</pre>
+
+<pre>
+      input   |    map   |  shuffle  |   reduce    &gt;   out
+</pre>
+
+<p>Hadoop Map-Reduce applications need not be written in 
+Java<small><sup>TM</sup></small> only. 
+<a href="../streaming/package-summary.html">Hadoop Streaming</a> is a utility
+which allows users to create and run jobs with any executables (e.g. shell 
+utilities) as the mapper and/or the reducer. 
+<a href="pipes/package-summary.html">Hadoop Pipes</a> is a 
+<a href="http://www.swig.org/">SWIG</a>-compatible <em>C++ API</em> to implement
+Map-Reduce applications (non JNI<small><sup>TM</sup></small> based).</p>
+
+<p>See <a href="http://labs.google.com/papers/mapreduce.html">Google's original 
+Map/Reduce paper</a> for background information.</p>
+
+<p><i>Java and JNI are trademarks or registered trademarks of 
+Sun Microsystems, Inc. in the United States and other countries.</i></p>
 
 </body>
 </html>

+ 77 - 60
src/java/org/apache/hadoop/util/GenericOptionsParser.java

@@ -32,50 +32,54 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
-/*************************************************************
- * This class can be used as a utility to parse command line
- * arguments generic to the Hadoop framework. This class 
- * recognizes several command line arguments, which allow a user 
- * to specify a namenode, a job tracker etc. Generic options 
- * supported are 
- * <p>-conf <configuration file>     specify an application configuration file
- * <p>-D <property=value>            use value for given property
- * <p>-fs <local|namenode:port>      specify a namenode
- * <p>-jt <local|jobtracker:port>    specify a job tracker
- * <br>
- * <p>The general command line syntax is
- * <p>bin/hadoop command [genericOptions] [commandOptions]
- * <br>
- * Generic command line arguments <strong>might</strong> modify 
- * <code>Configuration </code> objects, given to constructors  
- * <br><br>
- * The functionality is implemented using Commons CLI.
- * <br>
- * <p>Examples using generic options are
- * <p>bin/hadoop dfs -fs darwin:8020 -ls /data
- * <p><blockquote><pre>
- *     list /data directory in dfs with namenode darwin:8020
- * </pre></blockquote>
- * <p>bin/hadoop dfs -D fs.default.name=darwin:8020 -ls /data
- * <p><blockquote><pre>
- *     list /data directory in dfs with namenode darwin:8020
- * </pre></blockquote>
- * <p>bin/hadoop dfs -conf hadoop-site.xml -ls /data
- * <p><blockquote><pre>
- *     list /data directory in dfs with conf specified in hadoop-site.xml
- * </pre></blockquote>
- * <p>bin/hadoop job -D mapred.job.tracker=darwin:50020 -submit job.xml
- * <p><blockquote><pre>
- *     submit a job to job tracker darwin:50020
- * </pre></blockquote>
- * <p>bin/hadoop job -jt darwin:50020 -submit job.xml
+/**
+ * <code>GenericOptionsParser</code> is a utility to parse command line
+ * arguments generic to the Hadoop framework. 
+ * 
+ * <code>GenericOptionsParser</code> recognizes several standarad command 
+ * line arguments, enabling applications to easily specify a namenode, a 
+ * jobtracker, additional configuration resources etc.
+ * 
+ * <h4 id="GenericOptions">Generic Options</h4>
+ * 
+ * <p>The supported generic options are:</p>
  * <p><blockquote><pre>
- *     submit a job to job tracker darwin:50020
- * </pre></blockquote>
- * <p>bin/hadoop job -jt local -submit job.xml
+ *     -conf &lt;configuration file&gt;     specify a configuration file
+ *     -D &lt;property=value&gt;            use value for given property
+ *     -fs &lt;local|namenode:port&gt;      specify a namenode
+ *     -jt &lt;local|jobtracker:port&gt;    specify a job tracker
+ * </pre></blockquote></p>
+ * 
+ * <p>The general command line syntax is:</p>
+ * <p><tt><pre>
+ * bin/hadoop command [genericOptions] [commandOptions]
+ * </pre></tt></p>
+ * 
+ * <p>Generic command line arguments <strong>might</strong> modify 
+ * <code>Configuration </code> objects, given to constructors.</p>
+ * 
+ * <p>The functionality is implemented using Commons CLI.</p>
+ *
+ * <p>Examples:</p>
  * <p><blockquote><pre>
- *     submit a job to local runner
- * </pre></blockquote>
+ * $ bin/hadoop dfs -fs darwin:8020 -ls /data
+ * list /data directory in dfs with namenode darwin:8020
+ * 
+ * $ bin/hadoop dfs -D fs.default.name=darwin:8020 -ls /data
+ * list /data directory in dfs with namenode darwin:8020
+ *     
+ * $ bin/hadoop dfs -conf hadoop-site.xml -ls /data
+ * list /data directory in dfs with conf specified in hadoop-site.xml
+ *     
+ * $ bin/hadoop job -D mapred.job.tracker=darwin:50020 -submit job.xml
+ * submit a job to job tracker darwin:50020
+ *     
+ * $ bin/hadoop job -jt darwin:50020 -submit job.xml
+ * submit a job to job tracker darwin:50020
+ *     
+ * $ bin/hadoop job -jt local -submit job.xml
+ * submit a job to local runner
+ * </pre></blockquote></p>
  *
  * @see Tool
  * @see ToolRunner
@@ -86,21 +90,27 @@ public class GenericOptionsParser {
 
   private CommandLine commandLine;
 
-  /** Instantinates a GenericOptionsParser to parse only
-   * the generic Hadoop  arguments. The array of string arguments 
-   * other than the generic arguments can be obtained by 
-   * {@link #getRemainingArgs()}
-   * @param conf the configuration to modify
-   * @param args User-specified arguments
+  /** 
+   * Create a <code>GenericOptionsParser<code> to parse only the generic Hadoop  
+   * arguments. 
+   * 
+   * The array of string arguments other than the generic arguments can be 
+   * obtained by {@link #getRemainingArgs()}.
+   * 
+   * @param conf the <code>Configuration</code> to modify.
+   * @param args command-line arguments.
    */
   public GenericOptionsParser(Configuration conf, String[] args) {
     this(conf, new Options(), args); 
   }
 
   /** 
-   * Instantinates a GenericOptionsParser to parse given options 
-   * as well as generic Hadoop options. The resulting <code>
-   * CommandLine</code> object can be obtained by {@link #getCommandLine()}
+   * Create a <code>GenericOptionsParser</code> to parse given options as well 
+   * as generic Hadoop options. 
+   * 
+   * The resulting <code>CommandLine</code> object can be obtained by 
+   * {@link #getCommandLine()}.
+   * 
    * @param conf the configuration to modify  
    * @param options options built by the caller 
    * @param args User-specified arguments
@@ -110,9 +120,9 @@ public class GenericOptionsParser {
   }
 
   /**
-   * Returns an array of Strings containing only command-specific 
-   * arguments.
-   * @return String array of remaining arguments not parsed
+   * Returns an array of Strings containing only application-specific arguments.
+   * 
+   * @return array of <code>String</code>s containing the un-parsed arguments.
    */
   public String[] getRemainingArgs() {
     return commandLine.getArgs();
@@ -120,12 +130,14 @@ public class GenericOptionsParser {
 
   /**
    * Returns the commons-cli <code>CommandLine</code> object 
-   * to process the parsed arguments. Note : if the object is 
-   * created with <code>GenericCommandLineParser(Configuration, String[])</code>, 
-   * then returned object will only contain parsed generic 
-   * options.
-   * @return CommandLine object representing list of arguments 
-   * parsed against Options descriptor.
+   * to process the parsed arguments. 
+   * 
+   * Note: If the object is created with 
+   * {@link #GenericOptionsParser(Configuration, String[])}, then returned 
+   * object will only contain parsed generic options.
+   * 
+   * @return <code>CommandLine</code> representing list of arguments 
+   *         parsed against Options descriptor.
    */
   public CommandLine getCommandLine() {
     return commandLine;
@@ -212,6 +224,11 @@ public class GenericOptionsParser {
     return args;
   }
 
+  /**
+   * Print the usage message for generic command-line options supported.
+   * 
+   * @param out stream to print the usage message to.
+   */
   public static void printGenericCommandUsage(PrintStream out) {
     out.println("Generic options supported are");
     out.println("-conf <configuration file>     specify an application configuration file");

+ 9 - 7
src/java/org/apache/hadoop/util/Progressable.java

@@ -18,16 +18,18 @@
 
 package org.apache.hadoop.util;
 
-import java.io.IOException;
-
-
-
 /**
- * An interface for callbacks when an method makes some progress.
+ * A facility for reporting progress.
+ * 
+ * <p>Clients and/or applications can use the provided <code>Progressable</code>
+ * to explicitly report progress to the Hadoop framework. This is especially
+ * important for operations which take an insignificant amount of time since,
+ * in-lieu of the reported progress, the framework has to assume that an error
+ * has occured and time-out the operation.</p>
  */
 public interface Progressable {
-  /** callback for reporting progress. Used by DFSclient to report
-   * progress while writing a block of DFS file.
+  /**
+   * Report progress to the Hadoop framework.
    */
   public void progress();
 }

+ 47 - 3
src/java/org/apache/hadoop/util/Tool.java

@@ -21,14 +21,58 @@ package org.apache.hadoop.util;
 import org.apache.hadoop.conf.Configurable;
 
 /**
- * A tool interface that support generic options handling
+ * A tool interface that supports handling of generic command-line options.
  * 
+ * <p><code>Tool</code>, is the standard for any Map-Reduce tool/application. 
+ * The tool/application should delegate the handling of 
+ * <a href="{@docRoot}/org/apache/hadoop/util/GenericOptionsParser.html#GenericOptions">
+ * standard command-line options</a> to {@link ToolRunner#run(Tool, String[])} 
+ * and only handle its custom arguments.</p>
+ * 
+ * <p>Here is how a typical <code>Tool</code> is implemented:</p>
+ * <p><blockquote><pre>
+ *     public class MyApp extends Configured implements Tool {
+ *     
+ *       public int run(String[] args) throws Exception {
+ *         // <code>Configuration</code> processed by <code>ToolRunner</code>
+ *         Configuration conf = getConf();
+ *         
+ *         // Create a JobConf using the processed <code>conf</code>
+ *         JobConf job = new JobConf(conf, MyApp.class);
+ *         
+ *         // Process custom command-line options
+ *         Path in = new Path(args[1]);
+ *         Path out = new Path(args[2]);
+ *         
+ *         // Specify various job-specific parameters     
+ *         job.setJobName("my-app");
+ *         job.setInputPath(in);
+ *         job.setOutputPath(out);
+ *         job.setMapperClass(MyApp.MyMapper.class);
+ *         job.setReducerClass(MyApp.MyReducer.class);
+ *
+ *         // Submit the job, then poll for progress until the job is complete
+ *         JobClient.runJob(job);
+ *       }
+ *       
+ *       public static void main(String[] args) throws Exception {
+ *         // Let <code>ToolRunner</code> handle generic command-line options 
+ *         int res = ToolRunner.run(new Configuration(), new Sort(), args);
+ *         
+ *         System.exit(res);
+ *       }
+ *     }
+ * </pre></blockquote></p>
+ * 
+ * @see GenericOptionsParser
+ * @see ToolRunner
  */
 public interface Tool extends Configurable {
   /**
    * Execute the command with the given arguments.
-   * @param args command specific arguments
-   * @return exit code
+   * 
+   * @param args command specific arguments.
+   * @return exit code.
    * @throws Exception
    */
   int run(String [] args) throws Exception;

+ 34 - 18
src/java/org/apache/hadoop/util/ToolRunner.java

@@ -22,23 +22,34 @@ import java.io.PrintStream;
 import org.apache.hadoop.conf.Configuration;
 
 /**
- * ToolRunner can be used to run classes implementing {@link Tool}
- * interface. Static method {@link #run(Tool, String[])} is used.
- * {@link GenericOptionsParser} is used to parse the hadoop generic 
- * arguments to modify the <code>Configuration</code>.
+ * A utility to help run {@link Tool}s.
+ * 
+ * <p><code>ToolRunner</code> can be used to run classes implementing 
+ * <code>Tool</code> interface. It works in conjunction with 
+ * {@link GenericOptionsParser} to parse the 
+ * <a href="{@docRoot}/org/apache/hadoop/util/GenericOptionsParser.html#GenericOptions">
+ * generic hadoop command line arguments</a> and modifies the 
+ * <code>Configuration</code> of the <code>Tool</code>. The 
+ * application-specific options are passed along without being modified.
+ * </p>
+ * 
+ * @see Tool
+ * @see GenericOptionsParser
  */
 public class ToolRunner {
  
   /**
-   * Runs the given Tool by {@link Tool#run(String[])}, with the 
-   * given arguments. Uses the given configuration, or builds one if null.
-   * Sets the possibly modified version of the conf by Tool#setConf()  
+   * Runs the given <code>Tool</code> by {@link Tool#run(String[])}, after 
+   * parsing with the given generic arguments. Uses the given 
+   * <code>Configuration</code>, or builds one if null.
    * 
-   * @param conf Configuration object to use
-   * @param tool The Tool to run
-   * @param args the arguments to the tool(including generic arguments
-   * , see {@link GenericOptionsParser})
-   * @return exit code of the {@link Tool#run(String[])} method
+   * Sets the <code>Tool</code>'s configuration with the possibly modified 
+   * version of the <code>conf</code>.  
+   * 
+   * @param conf <code>Configuration</code> for the <code>Tool</code>.
+   * @param tool <code>Tool</code> to run.
+   * @param args command-line arguments to the tool.
+   * @return exit code of the {@link Tool#run(String[])} method.
    */
   public static int run(Configuration conf, Tool tool, String[] args) 
     throws Exception{
@@ -55,19 +66,24 @@ public class ToolRunner {
   }
   
   /**
-   * Runs the tool with the tool's Configuration
+   * Runs the <code>Tool</code> with its <code>Configuration</code>.
+   * 
    * Equivalent to <code>run(tool.getConf(), tool, args)</code>.
-   * @param tool The Tool to run
-   * @param args the arguments to the tool(including generic arguments
-   * , see {@link GenericOptionsParser})
-   * @return exit code of the {@link Tool#run(String[])} method
+   * 
+   * @param tool <code>Tool</code> to run.
+   * @param args command-line arguments to the tool.
+   * @return exit code of the {@link Tool#run(String[])} method.
    */
   public static int run(Tool tool, String[] args) 
     throws Exception{
     return run(tool.getConf(), tool, args);
   }
   
-  /** Delegates to GenericOptionsParser#printGenericCommandUsage() */
+  /**
+   * Prints generic command-line argurments and usage information.
+   * 
+   *  @param out stream to write usage information to.
+   */
   public static void printGenericCommandUsage(PrintStream out) {
     GenericOptionsParser.printGenericCommandUsage(out);
   }

+ 1 - 1
src/test/org/apache/hadoop/mapred/NotificationTestCase.java

@@ -142,7 +142,7 @@ public abstract class NotificationTestCase extends HadoopTestCase {
 
   protected JobConf createJobConf() {
     JobConf conf = super.createJobConf();
-    conf.set("job.end.notification.url", getNotificationUrlTemplate());
+    conf.setJobEndNotificationURI(getNotificationUrlTemplate());
     conf.setInt("job.end.retry.attempts", 3);
     conf.setInt("job.end.retry.interval", 200);
     return conf;

이 변경점에서 너무 많은 파일들이 변경되어 몇몇 파일들은 표시되지 않았습니다.