1
0
Pārlūkot izejas kodu

HADOOP-3443. Avoid copying map output across partitions when renaming a
single spill. Contributed by Owen O'Malley.



git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@662634 13f79535-47bb-0310-9956-ffa450edef68

Christopher Douglas 17 gadi atpakaļ
vecāks
revīzija
2e0711d5fe
2 mainītis faili ar 24 papildinājumiem un 10 dzēšanām
  1. 3 0
      CHANGES.txt
  2. 21 10
      src/java/org/apache/hadoop/mapred/MapTask.java

+ 3 - 0
CHANGES.txt

@@ -391,6 +391,9 @@ Trunk (unreleased changes)
     HADOOP-3475. Fix MapTask to correctly size the accounting allocation of
     HADOOP-3475. Fix MapTask to correctly size the accounting allocation of
     io.sort.mb. (cdouglas)
     io.sort.mb. (cdouglas)
 
 
+    HADOOP-3443. Avoid copying map output across partitions when renaming a
+    single spill. (omalley via cdouglas)
+
 Release 0.17.0 - 2008-05-18
 Release 0.17.0 - 2008-05-18
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 21 - 10
src/java/org/apache/hadoop/mapred/MapTask.java

@@ -35,7 +35,6 @@ import java.util.List;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
@@ -58,7 +57,6 @@ import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.QuickSort;
 import org.apache.hadoop.util.QuickSort;
@@ -174,7 +172,7 @@ class MapTask extends Task {
     public float getProgress() throws IOException {
     public float getProgress() throws IOException {
       return rawIn.getProgress();
       return rawIn.getProgress();
     }
     }
-  };
+  }
 
 
   @Override
   @Override
   @SuppressWarnings("unchecked")
   @SuppressWarnings("unchecked")
@@ -366,6 +364,7 @@ class MapTask extends Task {
       if ((sortmb & 0x7FF) != sortmb) {
       if ((sortmb & 0x7FF) != sortmb) {
         throw new IOException("Invalid \"io.sort.mb\": " + sortmb);
         throw new IOException("Invalid \"io.sort.mb\": " + sortmb);
       }
       }
+      LOG.info("io.sort.mb = " + sortmb);
       // buffers and accounting
       // buffers and accounting
       int maxMemUsage = sortmb << 20;
       int maxMemUsage = sortmb << 20;
       int recordCapacity = (int)(maxMemUsage * recper);
       int recordCapacity = (int)(maxMemUsage * recper);
@@ -377,6 +376,8 @@ class MapTask extends Task {
       kvindices = new int[recordCapacity * ACCTSIZE];
       kvindices = new int[recordCapacity * ACCTSIZE];
       softBufferLimit = (int)(kvbuffer.length * spillper);
       softBufferLimit = (int)(kvbuffer.length * spillper);
       softRecordLimit = (int)(kvoffsets.length * spillper);
       softRecordLimit = (int)(kvoffsets.length * spillper);
+      LOG.info("data buffer = " + softBufferLimit + "/" + kvbuffer.length);
+      LOG.info("record buffer = " + softRecordLimit + "/" + kvoffsets.length);
       // k/v serialization
       // k/v serialization
       comparator = job.getOutputKeyComparator();
       comparator = job.getOutputKeyComparator();
       keyClass = job.getMapOutputKeyClass();
       keyClass = job.getMapOutputKeyClass();
@@ -643,6 +644,12 @@ class MapTask extends Task {
                   ? bufindex - bufend > softBufferLimit
                   ? bufindex - bufend > softBufferLimit
                   : bufend - bufindex < bufvoid - softBufferLimit;
                   : bufend - bufindex < bufvoid - softBufferLimit;
                 if (kvsoftlimit || bufsoftlimit || (buffull && !wrap)) {
                 if (kvsoftlimit || bufsoftlimit || (buffull && !wrap)) {
+                  LOG.info("Spilling map output: buffer full = " + bufsoftlimit+
+                           " and record full = " + kvsoftlimit);
+                  LOG.info("bufindex = " + bufindex + "; bufend = " + bufend +
+                           "; bufvoid = " + bufvoid);
+                  LOG.info("kvindex = " + kvindex + "; kvend = " + kvend +
+                           "; length = " + kvoffsets.length);
                   kvend = kvindex;
                   kvend = kvindex;
                   bufend = bufmark;
                   bufend = bufmark;
                   // TODO No need to recreate this thread every time
                   // TODO No need to recreate this thread every time
@@ -693,6 +700,7 @@ class MapTask extends Task {
     }
     }
 
 
     public synchronized void flush() throws IOException {
     public synchronized void flush() throws IOException {
+      LOG.info("Starting flush of map output");
       synchronized (spillLock) {
       synchronized (spillLock) {
         while (kvstart != kvend) {
         while (kvstart != kvend) {
           try {
           try {
@@ -815,6 +823,7 @@ class MapTask extends Task {
           }
           }
         }
         }
         ++numSpills;
         ++numSpills;
+        LOG.info("Finished spill " + numSpills);
       } finally {
       } finally {
         if (out != null) out.close();
         if (out != null) out.close();
         if (indexOut != null) indexOut.close();
         if (indexOut != null) indexOut.close();
@@ -976,7 +985,15 @@ class MapTask extends Task {
       for(int i = 0; i < numSpills; i++) {
       for(int i = 0; i < numSpills; i++) {
         filename[i] = mapOutputFile.getSpillFile(getTaskID(), i);
         filename[i] = mapOutputFile.getSpillFile(getTaskID(), i);
         indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskID(), i);
         indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskID(), i);
-        finalOutFileSize += localFs.getLength(filename[i]);
+        finalOutFileSize += localFs.getFileStatus(filename[i]).getLen();
+      }
+      
+      if (numSpills == 1) { //the spill is the final output
+        localFs.rename(filename[0], 
+                       new Path(filename[0].getParent(), "file.out"));
+        localFs.rename(indexFileName[0], 
+                       new Path(indexFileName[0].getParent(),"file.out.index"));
+        return;
       }
       }
       //make correction in the length to include the sequence file header
       //make correction in the length to include the sequence file header
       //lengths for each partition
       //lengths for each partition
@@ -989,12 +1006,6 @@ class MapTask extends Task {
       Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(
       Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(
                             getTaskID(), finalIndexFileSize);
                             getTaskID(), finalIndexFileSize);
       
       
-      if (numSpills == 1) { //the spill is the final output
-        localFs.rename(filename[0], finalOutputFile);
-        localFs.rename(indexFileName[0], finalIndexFile);
-        return;
-      }
-      
       //The output stream for the final single output file
       //The output stream for the final single output file
       FSDataOutputStream finalOut = localFs.create(finalOutputFile, true, 
       FSDataOutputStream finalOut = localFs.create(finalOutputFile, true, 
                                                    4096);
                                                    4096);