Browse Source

HADOOP-11569. Provide Merge API for MapFile to merge multiple similar MapFiles to one MapFile. Contributed by Vinayakumar B.

(cherry picked from commit 48c7ee7553af94a57952bca03b49c04b9bbfab45)
Tsuyoshi Ozawa 10 năm trước cách đây
mục cha
commit
02df51497f

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -33,6 +33,9 @@ Release 2.7.0 - UNRELEASED
 
     HADOOP-11510. Expose truncate API via FileContext. (yliu)
 
+    HADOOP-11569. Provide Merge API for MapFile to merge multiple similar MapFiles
+    to one MapFile. (Vinayakumar B via ozawa)
+
   IMPROVEMENTS
 
     HADOOP-11483. HardLink.java should use the jdk7 createLink method (aajisaka)

+ 143 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java

@@ -25,6 +25,7 @@ import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -824,6 +825,148 @@ public class MapFile {
     return cnt;
   }
 
+  /**
+   * Class to merge multiple MapFiles of same Key and Value types to one MapFile
+   */
+  public static class Merger {
+    private Configuration conf;
+    private WritableComparator comparator = null;
+    private Reader[] inReaders;
+    private Writer outWriter;
+    private Class<Writable> valueClass = null;
+    private Class<WritableComparable> keyClass = null;
+
+    public Merger(Configuration conf) throws IOException {
+      this.conf = conf;
+    }
+
+    /**
+     * Merge multiple MapFiles to one Mapfile
+     *
+     * @param inMapFiles
+     * @param outMapFile
+     * @throws IOException
+     */
+    public void merge(Path[] inMapFiles, boolean deleteInputs,
+        Path outMapFile) throws IOException {
+      try {
+        open(inMapFiles, outMapFile);
+        mergePass();
+      } finally {
+        close();
+      }
+      if (deleteInputs) {
+        for (int i = 0; i < inMapFiles.length; i++) {
+          Path path = inMapFiles[i];
+          delete(path.getFileSystem(conf), path.toString());
+        }
+      }
+    }
+
+    /*
+     * Open all input files for reading and verify the key and value types. And
+     * open Output file for writing
+     */
+    @SuppressWarnings("unchecked")
+    private void open(Path[] inMapFiles, Path outMapFile) throws IOException {
+      inReaders = new Reader[inMapFiles.length];
+      for (int i = 0; i < inMapFiles.length; i++) {
+        Reader reader = new Reader(inMapFiles[i], conf);
+        if (keyClass == null || valueClass == null) {
+          keyClass = (Class<WritableComparable>) reader.getKeyClass();
+          valueClass = (Class<Writable>) reader.getValueClass();
+        } else if (keyClass != reader.getKeyClass()
+            || valueClass != reader.getValueClass()) {
+          throw new HadoopIllegalArgumentException(
+              "Input files cannot be merged as they"
+                  + " have different Key and Value classes");
+        }
+        inReaders[i] = reader;
+      }
+
+      if (comparator == null) {
+        Class<? extends WritableComparable> cls;
+        cls = keyClass.asSubclass(WritableComparable.class);
+        this.comparator = WritableComparator.get(cls, conf);
+      } else if (comparator.getKeyClass() != keyClass) {
+        throw new HadoopIllegalArgumentException(
+            "Input files cannot be merged as they"
+                + " have different Key class compared to"
+                + " specified comparator");
+      }
+
+      outWriter = new MapFile.Writer(conf, outMapFile,
+          MapFile.Writer.keyClass(keyClass),
+          MapFile.Writer.valueClass(valueClass));
+    }
+
+    /**
+     * Merge all input files to output map file.<br>
+     * 1. Read first key/value from all input files to keys/values array. <br>
+     * 2. Select the least key and corresponding value. <br>
+     * 3. Write the selected key and value to output file. <br>
+     * 4. Replace the already written key/value in keys/values arrays with the
+     * next key/value from the selected input <br>
+     * 5. Repeat step 2-4 till all keys are read. <br>
+     */
+    private void mergePass() throws IOException {
+      // re-usable array
+      WritableComparable[] keys = new WritableComparable[inReaders.length];
+      Writable[] values = new Writable[inReaders.length];
+      // Read first key/value from all inputs
+      for (int i = 0; i < inReaders.length; i++) {
+        keys[i] = ReflectionUtils.newInstance(keyClass, null);
+        values[i] = ReflectionUtils.newInstance(valueClass, null);
+        if (!inReaders[i].next(keys[i], values[i])) {
+          // Handle empty files
+          keys[i] = null;
+          values[i] = null;
+        }
+      }
+
+      do {
+        int currentEntry = -1;
+        WritableComparable currentKey = null;
+        Writable currentValue = null;
+        for (int i = 0; i < keys.length; i++) {
+          if (keys[i] == null) {
+            // Skip Readers reached EOF
+            continue;
+          }
+          if (currentKey == null || comparator.compare(currentKey, keys[i]) > 0) {
+            currentEntry = i;
+            currentKey = keys[i];
+            currentValue = values[i];
+          }
+        }
+        if (currentKey == null) {
+          // Merge Complete
+          break;
+        }
+        // Write the selected key/value to merge stream
+        outWriter.append(currentKey, currentValue);
+        // Replace the already written key/value in keys/values arrays with the
+        // next key/value from the selected input
+        if (!inReaders[currentEntry].next(keys[currentEntry],
+            values[currentEntry])) {
+          // EOF for this file
+          keys[currentEntry] = null;
+          values[currentEntry] = null;
+        }
+      } while (true);
+    }
+
+    private void close() throws IOException {
+      for (int i = 0; i < inReaders.length; i++) {
+        IOUtils.closeStream(inReaders[i]);
+        inReaders[i] = null;
+      }
+      if (outWriter != null) {
+        outWriter.close();
+        outWriter = null;
+      }
+    }
+  }
 
   public static void main(String[] args) throws Exception {
     String usage = "Usage: MapFile inFile outFile";

+ 56 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java

@@ -21,6 +21,10 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -730,4 +734,56 @@ public class TestMapFile {
       reader.close();
     }
   }
+
+  @Test
+  public void testMerge() throws Exception {
+    final String TEST_METHOD_KEY = "testMerge.mapfile";
+    int SIZE = 10;
+    int ITERATIONS = 5;
+    Path[] in = new Path[5];
+    List<Integer> expected = new ArrayList<Integer>();
+    for (int j = 0; j < 5; j++) {
+      try (MapFile.Writer writer = createWriter(TEST_METHOD_KEY + "." + j,
+          IntWritable.class, Text.class)) {
+        in[j] = new Path(TEST_DIR, TEST_METHOD_KEY + "." + j);
+        for (int i = 0; i < SIZE; i++) {
+          expected.add(i + j);
+          writer.append(new IntWritable(i + j), new Text("Value:" + (i + j)));
+        }
+      }
+    }
+    // Sort expected values
+    Collections.sort(expected);
+    // Merge all 5 files
+    MapFile.Merger merger = new MapFile.Merger(conf);
+    merger.merge(in, true, new Path(TEST_DIR, TEST_METHOD_KEY));
+
+    try (MapFile.Reader reader = createReader(TEST_METHOD_KEY,
+        IntWritable.class)) {
+      int start = 0;
+      // test iteration
+      Text startValue = new Text("Value:" + start);
+      int i = 0;
+      while (i++ < ITERATIONS) {
+        Iterator<Integer> expectedIterator = expected.iterator();
+        IntWritable key = new IntWritable(start);
+        Text value = startValue;
+        IntWritable prev = new IntWritable(start);
+        while (reader.next(key, value)) {
+          assertTrue("Next key should be always equal or more",
+              prev.get() <= key.get());
+          assertEquals(expectedIterator.next().intValue(), key.get());
+          prev.set(key.get());
+        }
+        reader.reset();
+      }
+    }
+
+    // inputs should be deleted
+    for (int j = 0; j < in.length; j++) {
+      Path path = in[j];
+      assertFalse("inputs should be deleted",
+          path.getFileSystem(conf).exists(path));
+    }
+  }
 }