Przeglądaj źródła

Revering MAPREDUCE-2264

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1439561 13f79535-47bb-0310-9956-ffa450edef68
Alejandro Abdelnur 12 lat temu
rodzic
commit
da4cab1099

+ 0 - 3
hadoop-mapreduce-project/CHANGES.txt

@@ -269,9 +269,6 @@ Release 2.0.3-alpha - Unreleased
     MAPREDUCE-4948. Fix a failing unit test TestYARNRunner.testHistoryServerToken.
     (Junping Du via sseth)
 
-    MAPREDUCE-2264. Job status exceeds 100% in some cases. 
-    (devaraj.k and sandyr via tucu)
-
 Release 2.0.2-alpha - 2012-09-07 
 
   INCOMPATIBLE CHANGES

+ 3 - 22
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java

@@ -218,7 +218,6 @@ public class Merger {
     CompressionCodec codec = null;
     long segmentOffset = 0;
     long segmentLength = -1;
-    long rawDataLength = -1;
     
     Counters.Counter mapOutputsCounter = null;
 
@@ -235,15 +234,6 @@ public class Merger {
       this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve, 
            mergedMapOutputsCounter);
     }
-    
-    public Segment(Configuration conf, FileSystem fs, Path file,
-        CompressionCodec codec, boolean preserve,
-        Counters.Counter mergedMapOutputsCounter, long rawDataLength)
-            throws IOException {
-      this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve, 
-          mergedMapOutputsCounter);
-      this.rawDataLength = rawDataLength;
-    }
 
     public Segment(Configuration conf, FileSystem fs, Path file,
                    long segmentOffset, long segmentLength,
@@ -271,11 +261,6 @@ public class Merger {
     public Segment(Reader<K, V> reader, boolean preserve) {
       this(reader, preserve, null);
     }
-
-    public Segment(Reader<K, V> reader, boolean preserve, long rawDataLength) {
-      this(reader, preserve, null);
-      this.rawDataLength = rawDataLength;
-    }
     
     public Segment(Reader<K, V> reader, boolean preserve, 
                    Counters.Counter mapOutputsCounter) {
@@ -315,10 +300,6 @@ public class Merger {
         segmentLength : reader.getLength();
     }
     
-    public long getRawDataLength() {
-      return (rawDataLength > 0) ? rawDataLength : getLength();
-    }
-
     boolean nextRawKey() throws IOException {
       return reader.nextRawKey(key);
     }
@@ -652,7 +633,7 @@ public class Merger {
             totalBytesProcessed = 0;
             totalBytes = 0;
             for (int i = 0; i < segmentsToMerge.size(); i++) {
-              totalBytes += segmentsToMerge.get(i).getRawDataLength();
+              totalBytes += segmentsToMerge.get(i).getLength();
             }
           }
           if (totalBytes != 0) //being paranoid
@@ -721,7 +702,7 @@ public class Merger {
           // size will match(almost) if combiner is not called in merge.
           long inputBytesOfThisMerge = totalBytesProcessed -
                                        bytesProcessedInPrevMerges;
-          totalBytes -= inputBytesOfThisMerge - tempSegment.getRawDataLength();
+          totalBytes -= inputBytesOfThisMerge - tempSegment.getLength();
           if (totalBytes != 0) {
             progPerByte = 1.0f / (float)totalBytes;
           }
@@ -787,7 +768,7 @@ public class Merger {
       for (int i = 0; i < numSegments; i++) {
         // Not handling empty segments here assuming that it would not affect
         // much in calculation of mergeProgress.
-        segmentSizes.add(segments.get(i).getRawDataLength());
+        segmentSizes.add(segments.get(i).getLength());
       }
       
       // If includeFinalMerge is true, allow the following while loop iterate

+ 19 - 50
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java

@@ -89,7 +89,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
     new TreeSet<InMemoryMapOutput<K,V>>(new MapOutputComparator<K, V>());
   private final MergeThread<InMemoryMapOutput<K,V>, K,V> inMemoryMerger;
   
-  Set<CompressAwarePath> onDiskMapOutputs = new TreeSet<CompressAwarePath>();
+  Set<Path> onDiskMapOutputs = new TreeSet<Path>();
   private final OnDiskMerger onDiskMerger;
   
   private final long memoryLimit;
@@ -336,7 +336,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
              inMemoryMergedMapOutputs.size());
   }
   
-  public synchronized void closeOnDiskFile(CompressAwarePath file) {
+  public synchronized void closeOnDiskFile(Path file) {
     onDiskMapOutputs.add(file);
     
     if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
@@ -356,7 +356,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
     List<InMemoryMapOutput<K, V>> memory = 
       new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
     memory.addAll(inMemoryMapOutputs);
-    List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);
+    List<Path> disk = new ArrayList<Path>(onDiskMapOutputs);
     return finalMerge(jobConf, rfs, memory, disk);
   }
    
@@ -456,7 +456,6 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
                         codec, null);
 
       RawKeyValueIterator rIter = null;
-      CompressAwarePath compressAwarePath;
       try {
         LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
                  " segments...");
@@ -475,8 +474,6 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
           combineCollector.setWriter(writer);
           combineAndSpill(rIter, reduceCombineInputCounter);
         }
-        compressAwarePath = new CompressAwarePath(outputPath,
-            writer.getRawLength());
         writer.close();
 
         LOG.info(reduceId +  
@@ -492,12 +489,12 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
       }
 
       // Note the output of the merge
-      closeOnDiskFile(compressAwarePath);
+      closeOnDiskFile(outputPath);
     }
 
   }
   
-  private class OnDiskMerger extends MergeThread<CompressAwarePath,K,V> {
+  private class OnDiskMerger extends MergeThread<Path,K,V> {
     
     public OnDiskMerger(MergeManagerImpl<K, V> manager) {
       super(manager, Integer.MAX_VALUE, exceptionReporter);
@@ -506,7 +503,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
     }
     
     @Override
-    public void merge(List<CompressAwarePath> inputs) throws IOException {
+    public void merge(List<Path> inputs) throws IOException {
       // sanity check
       if (inputs == null || inputs.isEmpty()) {
         LOG.info("No ondisk files to merge...");
@@ -521,8 +518,8 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
                " map outputs on disk. Triggering merge...");
       
       // 1. Prepare the list of files to be merged. 
-      for (CompressAwarePath file : inputs) {
-        approxOutputSize += localFS.getFileStatus(file.getPath()).getLen();
+      for (Path file : inputs) {
+        approxOutputSize += localFS.getFileStatus(file).getLen();
       }
 
       // add the checksum length
@@ -539,7 +536,6 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
                         (Class<V>) jobConf.getMapOutputValueClass(),
                         codec, null);
       RawKeyValueIterator iter  = null;
-      CompressAwarePath compressAwarePath;
       Path tmpDir = new Path(reduceId.toString());
       try {
         iter = Merger.merge(jobConf, rfs,
@@ -552,15 +548,13 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
                             mergedMapOutputsCounter, null);
 
         Merger.writeFile(iter, writer, reporter, jobConf);
-        compressAwarePath = new CompressAwarePath(outputPath,
-            writer.getRawLength());
         writer.close();
       } catch (IOException e) {
         localFS.delete(outputPath, true);
         throw e;
       }
 
-      closeOnDiskFile(compressAwarePath);
+      closeOnDiskFile(outputPath);
 
       LOG.info(reduceId +
           " Finished merging " + inputs.size() + 
@@ -659,7 +653,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
 
   private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
                                        List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,
-                                       List<CompressAwarePath> onDiskMapOutputs
+                                       List<Path> onDiskMapOutputs
                                        ) throws IOException {
     LOG.info("finalMerge called with " + 
              inMemoryMapOutputs.size() + " in-memory map-outputs and " + 
@@ -718,8 +712,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
         try {
           Merger.writeFile(rIter, writer, reporter, job);
           // add to list of final disk outputs.
-          onDiskMapOutputs.add(new CompressAwarePath(outputPath,
-              writer.getRawLength()));
+          onDiskMapOutputs.add(outputPath);
         } catch (IOException e) {
           if (null != outputPath) {
             try {
@@ -749,19 +742,15 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
     // segments on disk
     List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();
     long onDiskBytes = inMemToDiskBytes;
-    long rawBytes = inMemToDiskBytes;
-    CompressAwarePath[] onDisk = onDiskMapOutputs.toArray(
-        new CompressAwarePath[onDiskMapOutputs.size()]);
-    for (CompressAwarePath file : onDisk) {
-      long fileLength = fs.getFileStatus(file.getPath()).getLen();
-      onDiskBytes += fileLength;
-      rawBytes += (file.getRawDataLength() > 0) ? file.getRawDataLength() : fileLength;
-
-      LOG.debug("Disk file: " + file + " Length is " + fileLength);
-      diskSegments.add(new Segment<K, V>(job, fs, file.getPath(), codec, keepInputs,
+    Path[] onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()]);
+    for (Path file : onDisk) {
+      onDiskBytes += fs.getFileStatus(file).getLen();
+      LOG.debug("Disk file: " + file + " Length is " + 
+          fs.getFileStatus(file).getLen());
+      diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs,
                                          (file.toString().endsWith(
                                              Task.MERGED_OUTPUT_PREFIX) ?
-                                          null : mergedMapOutputsCounter), file.getRawDataLength()
+                                          null : mergedMapOutputsCounter)
                                         ));
     }
     LOG.info("Merging " + onDisk.length + " files, " +
@@ -797,7 +786,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
         return diskMerge;
       }
       finalSegments.add(new Segment<K,V>(
-            new RawKVIteratorReader(diskMerge, onDiskBytes), true, rawBytes));
+            new RawKVIteratorReader(diskMerge, onDiskBytes), true));
     }
     return Merger.merge(job, fs, keyClass, valueClass,
                  finalSegments, finalSegments.size(), tmpDir,
@@ -805,24 +794,4 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
                  null);
   
   }
-
-  static class CompressAwarePath
-  {
-    private long rawDataLength;
-
-    private Path path;
-
-    public CompressAwarePath(Path path, long rawDataLength) {
-      this.path = path;
-      this.rawDataLength = rawDataLength;
-    }
-
-    public long getRawDataLength() {
-      return rawDataLength;
-    }
-
-    public Path getPath() {
-      return path;
-    }
-  }
 }

+ 1 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java

@@ -37,7 +37,6 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.MapOutputFile;
 
 import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
 
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -113,9 +112,7 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
   @Override
   public void commit() throws IOException {
     localFS.rename(tmpOutputPath, outputPath);
-    CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath,
-        getSize());
-    merger.closeOnDiskFile(compressAwarePath);
+    merger.closeOnDiskFile(outputPath);
   }
   
   @Override

+ 0 - 157
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMerger.java

@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.doAnswer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import junit.framework.Assert;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.IFile.Reader;
-import org.apache.hadoop.mapred.Merger.Segment;
-import org.apache.hadoop.util.Progress;
-import org.apache.hadoop.util.Progressable;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-public class TestMerger {
-
-  @Test
-  public void testCompressed() throws IOException {
-    testMergeShouldReturnProperProgress(getCompressedSegments());
-  }
-  
-  @Test
-  public void testUncompressed() throws IOException {
-    testMergeShouldReturnProperProgress(getUncompressedSegments());
-  }
-  
-  @SuppressWarnings( { "deprecation", "unchecked" })
-  public void testMergeShouldReturnProperProgress(
-      List<Segment<Text, Text>> segments) throws IOException {
-    Configuration conf = new Configuration();
-    JobConf jobConf = new JobConf();
-    FileSystem fs = FileSystem.getLocal(conf);
-    Path tmpDir = new Path("localpath");
-    Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
-    Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
-    RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
-    Counter readsCounter = new Counter();
-    Counter writesCounter = new Counter();
-    Progress mergePhase = new Progress();
-    RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
-        valueClass, segments, 2, tmpDir, comparator, getReporter(),
-        readsCounter, writesCounter, mergePhase);
-    Assert.assertEquals(1.0f, mergeQueue.getProgress().get());
-  }
-
-  private Progressable getReporter() {
-    Progressable reporter = new Progressable() {
-      @Override
-      public void progress() {
-      }
-    };
-    return reporter;
-  }
-
-  private List<Segment<Text, Text>> getUncompressedSegments() throws IOException {
-    List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
-    for (int i = 1; i < 1; i++) {
-      segments.add(getUncompressedSegment(i));
-      System.out.println("adding segment");
-    }
-    return segments;
-  }
-  
-  private List<Segment<Text, Text>> getCompressedSegments() throws IOException {
-    List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
-    for (int i = 1; i < 1; i++) {
-      segments.add(getCompressedSegment(i));
-      System.out.println("adding segment");
-    }
-    return segments;
-  }
-  
-  private Segment<Text, Text> getUncompressedSegment(int i) throws IOException {
-    return new Segment<Text, Text>(getReader(i), false);
-  }
-
-  private Segment<Text, Text> getCompressedSegment(int i) throws IOException {
-    return new Segment<Text, Text>(getReader(i), false, 3000l);
-  }
-
-  @SuppressWarnings("unchecked")
-  private Reader<Text, Text> getReader(int i) throws IOException {
-    Reader<Text, Text> readerMock = mock(Reader.class);
-    when(readerMock.getPosition()).thenReturn(0l).thenReturn(10l).thenReturn(
-        20l);
-    when(
-        readerMock.nextRawKey(any(DataInputBuffer.class)))
-        .thenAnswer(getKeyAnswer("Segment" + i));
-    doAnswer(getValueAnswer("Segment" + i)).when(readerMock).nextRawValue(
-        any(DataInputBuffer.class));
-
-    return readerMock;
-  }
-
-  private Answer<?> getKeyAnswer(final String segmentName) {
-    return new Answer<Object>() {
-      int i = 0;
-
-      public Boolean answer(InvocationOnMock invocation) {
-        Object[] args = invocation.getArguments();
-        DataInputBuffer key = (DataInputBuffer) args[0];
-        if (i++ == 2) {
-          return false;
-        }
-        key.reset(("Segement Key " + segmentName + i).getBytes(), 20);
-        return true;
-      }
-    };
-  }
-  
-  private Answer<?> getValueAnswer(final String segmentName) {
-    return new Answer<Void>() {
-      int i = 0;
-
-      public Void answer(InvocationOnMock invocation) {
-        Object[] args = invocation.getArguments();
-        DataInputBuffer key = (DataInputBuffer) args[0];
-        if (i++ == 2) {
-          return null;
-        }
-        key.reset(("Segement Value " + segmentName + i).getBytes(), 20);
-        return null;
-      }
-    };
-  }
-}