浏览代码

HADOOP-2919. Reduce the number of memory copies done during the
map output sorting. Contributed by cdouglas.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@643195 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 17 年之前
父节点
当前提交
d27b11e955

+ 8 - 0
CHANGES.txt

@@ -202,6 +202,14 @@ Trunk (unreleased changes)
     tasks by replacing static arrays with lists of runnable tasks. 
     (Amar Kamat via omalley)
 
+    HADOOP-2919.  Reduce the number of memory copies done during the
+    map output sorting. Also adds two config variables:
+    io.sort.spill.percent - the percentages of io.sort.mb that should
+                            cause a spill (default 80%)
+    io.sort.record.percent - the percent of io.sort.mb that should
+                             hold key/value indexes (default 5%)
+    (cdouglas via omalley)
+
   BUG FIXES
 
     HADOOP-2195. '-mkdir' behaviour is now closer to Linux shell in case of

+ 18 - 0
conf/hadoop-default.xml

@@ -79,6 +79,24 @@ creations/deletions), or "all".</description>
   should minimize seeks.</description>
 </property>
 
+<property>
+  <name>io.sort.record.percent</name>
+  <value>0.05</value>
+  <description>The percentage of io.sort.mb dedicated to tracking record
+  boundaries. Let this value be r, io.sort.mb be x. The maximum number
+  of records collected before the collection thread must block is equal
+  to (r * x) / 4</description>
+</property>
+
+<property>
+  <name>io.sort.spill.percent</name>
+  <value>0.80</value>
+  <description>The soft limit in either the buffer or record collection
+  buffers. Once reached, a thread will begin to spill the contents to disk
+  in the background. Note that this does not imply any chunking of data to
+  the spill. A value less than 0.5 is not recommended.</description>
+</property>
+
 <property>
   <name>io.file.buffer.size</name>
   <value>4096</value>

+ 14 - 17
src/java/org/apache/hadoop/io/SequenceFile.java

@@ -584,6 +584,11 @@ public class SequenceFile {
      */
     public void writeCompressedBytes(DataOutputStream outStream) 
       throws IllegalArgumentException, IOException;
+
+    /**
+     * Size of stored data.
+     */
+    public int getSize();
   }
   
   private static class UncompressedBytes implements ValueBytes {
@@ -1018,14 +1023,12 @@ public class SequenceFile {
       out.write(buffer.getData(), 0, buffer.getLength()); // data
     }
 
-    public synchronized void appendRaw(
-                                       byte[] keyData, int keyOffset, int keyLength, ValueBytes val) 
-      throws IOException {
+    public synchronized void appendRaw(byte[] keyData, int keyOffset,
+        int keyLength, ValueBytes val) throws IOException {
       if (keyLength == 0)
         throw new IOException("zero length keys not allowed: " + keyLength);
 
-      UncompressedBytes value = (UncompressedBytes)val;
-      int valLength = value.getSize();
+      int valLength = val.getSize();
 
       checkAndWriteSync();
       
@@ -1144,16 +1147,13 @@ public class SequenceFile {
     }
 
     /** Append a key/value pair. */
-    public synchronized void appendRaw(
-                                       byte[] keyData, int keyOffset, int keyLength,
-                                       ValueBytes val
-                                       ) throws IOException {
+    public synchronized void appendRaw(byte[] keyData, int keyOffset,
+        int keyLength, ValueBytes val) throws IOException {
 
       if (keyLength == 0)
         throw new IOException("zero length keys not allowed");
 
-      CompressedBytes value = (CompressedBytes)val;
-      int valLength = value.getSize();
+      int valLength = val.getSize();
       
       checkAndWriteSync();                        // sync
       out.writeInt(keyLength+valLength);          // total record length
@@ -1333,16 +1333,13 @@ public class SequenceFile {
     }
     
     /** Append a key/value pair. */
-    public synchronized void appendRaw(
-                                       byte[] keyData, int keyOffset, int keyLength,
-                                       ValueBytes val
-                                       ) throws IOException {
+    public synchronized void appendRaw(byte[] keyData, int keyOffset,
+        int keyLength, ValueBytes val) throws IOException {
       
       if (keyLength == 0)
         throw new IOException("zero length keys not allowed");
 
-      UncompressedBytes value = (UncompressedBytes)val;
-      int valLength = value.getSize();
+      int valLength = val.getSize();
       
       // Save key/value data in relevant buffers
       WritableUtils.writeVInt(keyLenBuffer, keyLength);

+ 1 - 1
src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java

@@ -222,7 +222,7 @@ class MRSortResultIterator implements RawKeyValueIterator {
       dataSize = length;
     }
             
-    public int getSize() throws IOException {
+    public int getSize() {
       return dataSize;
     }
             

文件差异内容过多而无法显示
+ 646 - 268
src/java/org/apache/hadoop/mapred/MapTask.java


+ 36 - 0
src/java/org/apache/hadoop/util/IndexedSortable.java

@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+/**
+ * Interface for collections capable of being sorted by {@link IndexedSorter}
+ * algorithms.
+ */
+public interface IndexedSortable {
+
+  /**
+   * Compare items at the given addresses consistent with the semantics of
+   * {@link java.util.Comparable#compare}.
+   */
+  int compare(int i, int j);
+
+  /**
+   * Swap items at the given addresses.
+   */
+  void swap(int i, int j);
+}

+ 38 - 0
src/java/org/apache/hadoop/util/IndexedSorter.java

@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+/**
+ * Interface for sort algorithms accepting {@link IndexedSortable} items.
+ *
+ * A sort algorithm implementing this interface may only
+ * {@link IndexedSortable#compare} and {@link IndexedSortable#swap} items
+ * for a range of indices to effect a sort across that range.
+ */
+public interface IndexedSorter {
+
+  /**
+   * Sort the items accessed through the given IndexedSortable over the given
+   * range of logical indices. From the perspective of the sort algorithm,
+   * each index between l (inclusive) and r (exclusive) is an addressable
+   * entry.
+   * @see IndexedSortable#compare
+   * @see IndexedSortable#swap
+   */
+  void sort(IndexedSortable s, int l, int r);
+}

+ 86 - 0
src/java/org/apache/hadoop/util/QuickSort.java

@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+/**
+ * An implementation of the core algorithm of QuickSort.
+ * See "Median-of-Three Partitioning" in Sedgewick book.
+ */
+public class QuickSort implements IndexedSorter {
+
+  public QuickSort() { }
+
+  private void fix(IndexedSortable s, int p, int r) {
+    if (s.compare(p, r) > 0) {
+      s.swap(p, r);
+    }
+  }
+
+  public void sort(IndexedSortable s, int p, int r) {
+    sort(s, p, r, null);
+  }
+
+  /**
+   * Same as {@link #sort}, but indicate that we're making progress after
+   * each partition.
+   */
+  public void sort(IndexedSortable s, int p, int r, Progressable rep) {
+    if (null != rep) {
+      rep.progress();
+    }
+    if (r-p < 13) {
+      for (int i = p; i < r; ++i) {
+        for (int j = i; j > p; --j) {
+          if (s.compare(j-1, j) > 0) {
+            s.swap(j, j-1);
+          }
+        }
+      }
+      return;
+    }
+
+    // select, move pivot into first position
+    fix(s, (p+r) >>> 1, p);
+    fix(s, (p+r) >>> 1, r - 1);
+    fix(s, p, r-1);
+
+    // Divide
+    int x = p;
+    int i = p;
+    int j = r;
+    while(true) {
+      while (++i < r && s.compare(i, x) < 0) { } // move lindex
+      while (--j > x && s.compare(x, j) < 0) { } // move rindex
+      if (i < j) s.swap(i, j);
+      else break;
+    }
+    // swap pivot into position
+    s.swap(x, i - 1);
+
+    // Conquer
+    // Recurse on smaller interval first to keep stack shallow
+    if (i - p - 1 < r - i) {
+      sort(s, p, i - 1, rep);
+      sort(s, i, r, rep);
+    } else {
+      sort(s, i, r, rep);
+      sort(s, p, i - 1, rep);
+    }
+  }
+
+}

+ 226 - 0
src/test/org/apache/hadoop/util/TestIndexedSort.java

@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
+
+public class TestIndexedSort extends TestCase {
+
+  private static class SampleSortable implements IndexedSortable {
+    private int[] valindex;
+    private int[] valindirect;
+    private int[] values;
+
+    public SampleSortable() {
+      this(50);
+    }
+
+    public SampleSortable(int j) {
+      Random r = new Random();
+      values = new int[j];
+      valindex = new int[j];
+      valindirect = new int[j];
+      for (int i = 0; i < j; ++i) {
+        valindex[i] = valindirect[i] = i;
+        values[i] = r.nextInt(1000);
+      }
+    }
+
+    public SampleSortable(int[] values) {
+      this.values = values;
+      valindex = new int[values.length];
+      valindirect = new int[values.length];
+      for (int i = 0; i < values.length; ++i) {
+        valindex[i] = valindirect[i] = i;
+      }
+    }
+
+    public int compare(int i, int j) {
+      // assume positive
+      return
+        values[valindirect[valindex[i]]] - values[valindirect[valindex[j]]];
+    }
+
+    public void swap(int i, int j) {
+      int tmp = valindex[i];
+      valindex[i] = valindex[j];
+      valindex[j] = tmp;
+    }
+
+    public int[] getSorted() {
+      int[] ret = new int[values.length];
+      for (int i = 0; i < ret.length; ++i) {
+        ret[i] = values[valindirect[valindex[i]]];
+      }
+      return ret;
+    }
+
+    public int[] getValues() {
+      int[] ret = new int[values.length];
+      System.arraycopy(values, 0, ret, 0, values.length);
+      return ret;
+    }
+
+  }
+
+  private static class WritableSortable implements IndexedSortable {
+
+    private static Random r = new Random();
+    private final int eob;
+    private final int[] indices;
+    private final int[] offsets;
+    private final byte[] bytes;
+    private final WritableComparator comparator;
+    private final String[] check;
+
+    public WritableSortable() throws IOException {
+      this(100);
+    }
+
+    public WritableSortable(int j) throws IOException {
+      Text t = new Text();
+      StringBuffer sb = new StringBuffer();
+      indices = new int[j];
+      offsets = new int[j];
+      check = new String[j];
+      DataOutputBuffer dob = new DataOutputBuffer();
+      for (int i = 0; i < j; ++i) {
+        indices[i] = i;
+        offsets[i] = dob.getLength();
+        genRandom(t, r.nextInt(15) + 1, sb);
+        t.write(dob);
+        check[i] = t.toString();
+      }
+      eob = dob.getLength();
+      bytes = dob.getData();
+      comparator = WritableComparator.get(Text.class);
+    }
+
+    private static void genRandom(Text t, int len, StringBuffer sb) {
+      sb.setLength(0);
+      for (int i = 0; i < len; ++i) {
+        sb.append(Integer.toString(r.nextInt(26) + 10, 36));
+      }
+      t.set(sb.toString());
+    }
+
+    public int compare(int i, int j) {
+      final int ii = indices[i];
+      final int ij = indices[j];
+      return comparator.compare(bytes, offsets[ii],
+        ((ii + 1 == indices.length) ? eob : offsets[ii + 1]) - offsets[ii],
+        bytes, offsets[ij],
+        ((ij + 1 == indices.length) ? eob : offsets[ij + 1]) - offsets[ij]);
+    }
+
+    public void swap(int i, int j) {
+      int tmp = indices[i];
+      indices[i] = indices[j];
+      indices[j] = tmp;
+    }
+
+    public String[] getValues() {
+      return check;
+    }
+
+    public String[] getSorted() throws IOException {
+      String[] ret = new String[indices.length];
+      Text t = new Text();
+      DataInputBuffer dib = new DataInputBuffer();
+      for (int i = 0; i < ret.length; ++i) {
+        int ii = indices[i];
+        dib.reset(bytes, offsets[ii],
+        ((ii + 1 == indices.length) ? eob : offsets[ii + 1]) - offsets[ii]);
+        t.readFields(dib);
+        ret[i] = t.toString();
+      }
+      return ret;
+    }
+
+  }
+
+  public void testAllEqual() throws Exception {
+    final int SAMPLE = 50;
+    int[] values = new int[SAMPLE];
+    Arrays.fill(values, 10);
+    SampleSortable s = new SampleSortable(values);
+    IndexedSorter sorter = new QuickSort();
+    sorter.sort(s, 0, SAMPLE);
+    int[] check = s.getSorted();
+    assertTrue(Arrays.equals(values, check));
+  }
+
+  public void testSorted() throws Exception {
+    final int SAMPLE = 50;
+    int[] values = new int[SAMPLE];
+    Random r = new Random();
+    for (int i = 0; i < SAMPLE; ++i) {
+      values[i] = r.nextInt(100);
+    }
+    Arrays.sort(values);
+    SampleSortable s = new SampleSortable(values);
+    IndexedSorter sorter = new QuickSort();
+    sorter.sort(s, 0, SAMPLE);
+    int[] check = s.getSorted();
+    assertTrue(Arrays.equals(values, check));
+  }
+
+  public void testSingleRecord() throws Exception {
+    final int SAMPLE = 1;
+    SampleSortable s = new SampleSortable(SAMPLE);
+    int[] values = s.getValues();
+    Arrays.sort(values);
+    IndexedSorter sorter = new QuickSort();
+    sorter.sort(s, 0, SAMPLE);
+    int[] check = s.getSorted();
+    assertTrue(Arrays.equals(values, check));
+  }
+
+  public void testQuickSort() throws Exception {
+    final int SAMPLE = 10000;
+    SampleSortable s = new SampleSortable(SAMPLE);
+    int[] values = s.getValues();
+    Arrays.sort(values);
+    IndexedSorter sorter = new QuickSort();
+    sorter.sort(s, 0, SAMPLE);
+    int[] check = s.getSorted();
+    assertTrue(Arrays.equals(values, check));
+  }
+
+  public void testWritable() throws Exception {
+    final int SAMPLE = 1000;
+    WritableSortable s = new WritableSortable(SAMPLE);
+    String[] values = s.getValues();
+    Arrays.sort(values);
+    IndexedSorter sorter = new QuickSort();
+    sorter.sort(s, 0, SAMPLE);
+    String[] check = s.getSorted();
+    assertTrue(Arrays.equals(values, check));
+  }
+
+}

部分文件因为文件数量过多而无法显示