Parcourir la source

Fix for HADOOP-2. The combiner now clones keys and values, so mappers may now safely reuse emitted keys and values. Contributed by Owen O'Malley.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@390231 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting il y a 19 ans
Parent
commit
ec87d7e0e3

+ 43 - 2
src/java/org/apache/hadoop/io/WritableUtils.java

@@ -17,7 +17,7 @@
 package org.apache.hadoop.io;
 
 import java.io.*;
-
+import org.apache.hadoop.mapred.JobConf;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
@@ -189,5 +189,46 @@ public final class WritableUtils  {
     System.out.println();
   }
 
-
+  /**
+   * A pair of input/output buffers that we use to clone writables.
+   */
+  private static class CopyInCopyOutBuffer {
+    DataOutputBuffer outBuffer = new DataOutputBuffer();
+    DataInputBuffer inBuffer = new DataInputBuffer();
+    /**
+     * Move the data from the output buffer to the input buffer.
+     */
+    void moveData() {
+      inBuffer.reset(outBuffer.getData(), outBuffer.getLength());
+    }
+  }
+  
+  /**
+   * Allocate a buffer for each thread that tries to clone objects.
+   */
+  private static ThreadLocal cloneBuffers = new ThreadLocal() {
+    protected synchronized Object initialValue() {
+      return new CopyInCopyOutBuffer();
+    }
+  };
+  
+  /**
+   * Make a copy of a writable object using serialization to a buffer.
+   * @param orig The object to copy
+   * @return The copied object
+   */
+  public static Writable clone(Writable orig, JobConf conf) {
+    try {
+      Writable newInst = (Writable)conf.newInstance(orig.getClass());
+      CopyInCopyOutBuffer buffer = (CopyInCopyOutBuffer)cloneBuffers.get();
+      buffer.outBuffer.reset();
+      orig.write(buffer.outBuffer);
+      buffer.moveData();
+      newInst.readFields(buffer.inBuffer);
+      return newInst;
+    } catch (IOException e) {
+      throw new RuntimeException("Error writing/reading clone buffer", e);
+    }
+  }
+  
 }

+ 9 - 5
src/java/org/apache/hadoop/mapred/CombiningCollector.java

@@ -51,12 +51,16 @@ class CombiningCollector implements OutputCollector {
 
     // buffer new value in map
     ArrayList values = (ArrayList)keyToValues.get(key);
-    if (values == null) {                         // no values yet for this key
-      values = new ArrayList(1);                  // make a new list
-      values.add(value);                          // add this value
-      keyToValues.put(key, values);               // add to map
+    Writable valueClone = WritableUtils.clone(value, job);
+    if (values == null) {
+      // this is a new key, so create a new list
+      values = new ArrayList(1);
+      values.add(valueClone);
+      Writable keyClone = WritableUtils.clone(key, job);
+      keyToValues.put(keyClone, values);
     } else {
-      values.add(value);                          // other values: just add new
+      // other values for this key, so just add.
+      values.add(valueClone);
     }
 
     count++;