Forráskód Böngészése

Fixed HADOOP-20: permit mappers and reducers to cleanup. Add a close() method to the Mapper and Reducer interfaces by having them extend a Closeable interface. Update all implementations to define close(). Patch by Michel Tourn.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@376355 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 éve
szülő
commit
8b48a08eeb

+ 6 - 0
src/examples/org/apache/hadoop/examples/WordCount.java

@@ -68,6 +68,9 @@ public class WordCount {
     public void configure(JobConf job) {
     }
     
+    public void close() {
+    }
+
   }
   
   /**
@@ -88,6 +91,9 @@ public class WordCount {
     public void configure(JobConf job) {
     }
     
+    public void close() {
+    }
+    
   }
   
   static void printUsage() {

+ 24 - 0
src/java/org/apache/hadoop/mapred/Closeable.java

@@ -0,0 +1,24 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/** That which can be closed. */
+public interface Closeable {
+  /** Called after the last call to any other method on this object to free
+   * and/or flush resources.  Typical implementations do nothing. */
+  void close();
+}

+ 5 - 0
src/java/org/apache/hadoop/mapred/CombiningCollector.java

@@ -77,5 +77,10 @@ class CombiningCollector implements OutputCollector {
     keyToValues.clear();
     count = 0;
   }
+  
+  public synchronized void close() 
+  { 
+	  combiner.close(); 
+  } 
 
 }

+ 16 - 12
src/java/org/apache/hadoop/mapred/MapRunner.java

@@ -38,18 +38,22 @@ public class MapRunner implements MapRunnable {
   public void run(RecordReader input, OutputCollector output,
                   Reporter reporter)
     throws IOException {
-    while (true) {
-      // allocate new key & value instances
-      WritableComparable key =
-        (WritableComparable)job.newInstance(inputKeyClass);
-      Writable value = (Writable)job.newInstance(inputValueClass);
-
-      // read next key & value
-      if (!input.next(key, value))
-        return;
-
-      // map pair to output
-      mapper.map(key, value, output, reporter);
+    try {
+      while (true) {
+        // allocate new key & value instances
+        WritableComparable key =
+          (WritableComparable)job.newInstance(inputKeyClass);
+        Writable value = (Writable)job.newInstance(inputValueClass);
+
+        // read next key & value
+        if (!input.next(key, value))
+          return;
+
+        // map pair to output
+        mapper.map(key, value, output, reporter);
+      }
+    } finally {
+        mapper.close();
     }
   }
 

+ 3 - 0
src/java/org/apache/hadoop/mapred/MapTask.java

@@ -133,6 +133,9 @@ class MapTask extends Task {
         }
 
       } finally {
+        if (combining) { 
+          ((CombiningCollector)collector).close(); 
+        } 
         in.close();                               // close input
       }
     } finally {

+ 1 - 1
src/java/org/apache/hadoop/mapred/Mapper.java

@@ -25,7 +25,7 @@ import org.apache.hadoop.io.WritableComparable;
  * intermediate values associated with a given output key are subsequently
  * grouped by the map/reduce system, and passed to a {@link Reducer} to
  * determine the final output.. */
-public interface Mapper extends JobConfigurable {
+public interface Mapper extends JobConfigurable, Closeable {
   /** Maps a single input key/value pair into intermediate key/value pairs.
    * Output pairs need not be of the same types as input pairs.  A given input
    * pair may map to zero or many output pairs.  Output pairs are collected

+ 1 - 0
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -285,6 +285,7 @@ class ReduceTask extends Task {
       }
 
     } finally {
+    	reducer.close();
       in.close();
       lfs.delete(new File(sortedFile));           // remove sorted
       out.close(reporter);

+ 2 - 1
src/java/org/apache/hadoop/mapred/Reducer.java

@@ -25,7 +25,7 @@ import org.apache.hadoop.io.WritableComparable;
 
 /** Reduces a set of intermediate values which share a key to a smaller set of
  * values.  Input values are the grouped output of a {@link Mapper}. */
-public interface Reducer extends JobConfigurable {
+public interface Reducer extends JobConfigurable, Closeable {
   /** Combines values for a given key.  Output values must be of the same type
    * as input values.  Input keys must not be altered.  Typically all values
    * are combined into zero or one value.  Output pairs are collected with
@@ -38,4 +38,5 @@ public interface Reducer extends JobConfigurable {
   void reduce(WritableComparable key, Iterator values,
               OutputCollector output, Reporter reporter)
     throws IOException;
+
 }

+ 1 - 1
src/java/org/apache/hadoop/mapred/lib/IdentityMapper.java

@@ -38,5 +38,5 @@ public class IdentityMapper implements Mapper {
     throws IOException {
     output.collect(key, val);
   }
-
+	public void close() {}
 }

+ 3 - 1
src/java/org/apache/hadoop/mapred/lib/IdentityReducer.java

@@ -41,5 +41,7 @@ public class IdentityReducer implements Reducer {
       output.collect(key, (Writable)values.next());
     }
   }
-
+	
+	public void close() {}
+	
 }

+ 3 - 0
src/java/org/apache/hadoop/mapred/lib/InverseMapper.java

@@ -38,4 +38,7 @@ public class InverseMapper implements Mapper {
     throws IOException {
     output.collect((WritableComparable)value, key);
   }
+  
+  public void close() {}
+  
 }

+ 3 - 0
src/java/org/apache/hadoop/mapred/lib/LongSumReducer.java

@@ -45,4 +45,7 @@ public class LongSumReducer implements Reducer {
     // output sum
     output.collect(key, new LongWritable(sum));
   }
+  
+  public void close() {}
+  
 }

+ 3 - 0
src/java/org/apache/hadoop/mapred/lib/RegexMapper.java

@@ -53,4 +53,7 @@ public class RegexMapper implements Mapper {
       output.collect(new UTF8(matcher.group(group)), new LongWritable(1));
     }
   }
+  
+  public void close() {}
+  
 }

+ 4 - 1
src/java/org/apache/hadoop/mapred/lib/TokenCountMapper.java

@@ -48,6 +48,9 @@ public class TokenCountMapper implements Mapper {
     while (st.hasMoreTokens()) {
       // output <token,1> pairs
       output.collect(new UTF8(st.nextToken()), new LongWritable(1));
-    }
+    }  
   }
+  
+  public void close() {}
+  
 }

+ 1 - 1
src/java/overview.html

@@ -120,7 +120,7 @@ way, put the following in conf/hadoop-site.xml:
 </configuration></xmp>
 
 <p>(We also set the DFS replication level to 1 in order to
-reduce the number of warnings.)</p>
+reduce warnings when running on a single node.)</p>
 
 <p>Now check that the command <br><tt>ssh localhost</tt><br> does not
 require a password.  If it does, execute the following commands:</p>

+ 12 - 0
src/test/org/apache/hadoop/fs/TestFileSystem.java

@@ -155,6 +155,10 @@ public class TestFileSystem extends TestCase {
 
       reporter.setStatus("wrote " + name);
     }
+    
+    public void close() {
+    }
+    
   }
 
   public static void writeTest(FileSystem fs, boolean fastCheck)
@@ -247,6 +251,10 @@ public class TestFileSystem extends TestCase {
 
       reporter.setStatus("read " + name);
     }
+    
+    public void close() {
+    }
+    
   }
 
   public static void readTest(FileSystem fs, boolean fastCheck)
@@ -339,6 +347,10 @@ public class TestFileSystem extends TestCase {
         in.close();
       }
     }
+    
+    public void close() {
+    }
+    
   }
 
   public static void seekTest(FileSystem fs, boolean fastCheck)

+ 9 - 0
src/test/org/apache/hadoop/mapred/MapredLoadTest.java

@@ -69,6 +69,9 @@ public class MapredLoadTest {
                 out.collect(new IntWritable(Math.abs(r.nextInt())), new IntWritable(randomVal));
             }
         }
+				public void close() {
+				}
+
     }
     static class RandomGenReducer implements Reducer {
         public void configure(JobConf job) {
@@ -81,6 +84,8 @@ public class MapredLoadTest {
                 out.collect(new UTF8("" + val), new UTF8(""));
             }
         }
+				public void close() {
+				}
     }
     static class RandomCheckMapper implements Mapper {
         public void configure(JobConf job) {
@@ -92,6 +97,8 @@ public class MapredLoadTest {
 
             out.collect(new IntWritable(Integer.parseInt(str.toString().trim())), new IntWritable(1));
         }
+				public void close() {
+				}
     }
     static class RandomCheckReducer implements Reducer {
         public void configure(JobConf job) {
@@ -106,6 +113,8 @@ public class MapredLoadTest {
             }
             out.collect(new IntWritable(keyint), new IntWritable(count));
         }
+			public void close() {
+			}
     }
 
     int range;