Browse Source

MAPREDUCE-6174. Combine common stream code into parent class for InMemoryMapOutput and OnDiskMapOutput. (Eric Payne via gera)

(cherry picked from commit d90c13e2da8867661bf19a802add70145ab9a462)
Gera Shegalov 10 năm trước cách đây
mục cha
commit
25c1e54d3f

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -73,6 +73,9 @@ Release 2.8.0 - UNRELEASED
     MAPREDUCE-5248. Let NNBenchWithoutMR specify the replication factor for
     its test (Erik Paulson via jlowe)
 
+    MAPREDUCE-6174. Combine common stream code into parent class for
+    InMemoryMapOutput and OnDiskMapOutput. (Eric Payne via gera)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 72 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java

@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.IFileInputStream;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+/**
+ * Common code for allowing MapOutput classes to handle streams.
+ *
+ * @param <K> key type for map output
+ * @param <V> value type for map output
+ */
+public abstract class IFileWrappedMapOutput<K, V> extends MapOutput<K, V> {
+  private final Configuration conf;
+  private final MergeManagerImpl<K, V> merger;
+
+  public IFileWrappedMapOutput(
+      Configuration c, MergeManagerImpl<K, V> m, TaskAttemptID mapId,
+      long size, boolean primaryMapOutput) {
+    super(mapId, size, primaryMapOutput);
+    conf = c;
+    merger = m;
+  }
+
+  /**
+   * @return the merger
+   */
+  protected MergeManagerImpl<K, V> getMerger() {
+    return merger;
+  }
+
+  protected abstract void doShuffle(
+      MapHost host, IFileInputStream iFileInputStream,
+      long compressedLength, long decompressedLength,
+      ShuffleClientMetrics metrics, Reporter reporter) throws IOException;
+
+  @Override
+  public void shuffle(MapHost host, InputStream input,
+                      long compressedLength, long decompressedLength,
+                      ShuffleClientMetrics metrics,
+                      Reporter reporter) throws IOException {
+    IFileInputStream iFin =
+        new IFileInputStream(input, compressedLength, conf);
+    try {
+      this.doShuffle(host, iFin, compressedLength,
+                    decompressedLength, metrics, reporter);
+    } finally {
+      iFin.close();
+    }
+  }
+}

+ 6 - 20
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java

@@ -42,10 +42,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-class InMemoryMapOutput<K, V> extends MapOutput<K, V> {
+class InMemoryMapOutput<K, V> extends IFileWrappedMapOutput<K, V> {
   private static final Log LOG = LogFactory.getLog(InMemoryMapOutput.class);
-  private Configuration conf;
-  private final MergeManagerImpl<K, V> merger;
   private final byte[] memory;
   private BoundedByteArrayOutputStream byteStream;
   // Decompression of map-outputs
@@ -56,9 +54,7 @@ class InMemoryMapOutput<K, V> extends MapOutput<K, V> {
                            MergeManagerImpl<K, V> merger,
                            int size, CompressionCodec codec,
                            boolean primaryMapOutput) {
-    super(mapId, (long)size, primaryMapOutput);
-    this.conf = conf;
-    this.merger = merger;
+    super(conf, merger, mapId, (long)size, primaryMapOutput);
     this.codec = codec;
     byteStream = new BoundedByteArrayOutputStream(size);
     memory = byteStream.getBuffer();
@@ -78,15 +74,12 @@ class InMemoryMapOutput<K, V> extends MapOutput<K, V> {
   }
 
   @Override
-  public void shuffle(MapHost host, InputStream input,
+  protected void doShuffle(MapHost host, IFileInputStream iFin,
                       long compressedLength, long decompressedLength,
                       ShuffleClientMetrics metrics,
                       Reporter reporter) throws IOException {
-    IFileInputStream checksumIn = 
-      new IFileInputStream(input, compressedLength, conf);
+    InputStream input = iFin;
 
-    input = checksumIn;       
-  
     // Are map-outputs compressed?
     if (codec != null) {
       decompressor.reset();
@@ -111,13 +104,6 @@ class InMemoryMapOutput<K, V> extends MapOutput<K, V> {
         throw new IOException("Unexpected extra bytes from input stream for " +
                                getMapId());
       }
-
-    } catch (IOException ioe) {      
-      // Close the streams
-      IOUtils.cleanup(LOG, input);
-
-      // Re-throw
-      throw ioe;
     } finally {
       CodecPool.returnDecompressor(decompressor);
     }
@@ -125,12 +111,12 @@ class InMemoryMapOutput<K, V> extends MapOutput<K, V> {
 
   @Override
   public void commit() throws IOException {
-    merger.closeInMemoryFile(this);
+    getMerger().closeInMemoryFile(this);
   }
   
   @Override
   public void abort() {
-    merger.unreserve(memory.length);
+    getMerger().unreserve(memory.length);
   }
 
   @Override

+ 3 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java

@@ -263,8 +263,9 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
       LOG.info(mapId + ": Shuffling to disk since " + requestedSize + 
                " is greater than maxSingleShuffleLimit (" + 
                maxSingleShuffleLimit + ")");
-      return new OnDiskMapOutput<K,V>(mapId, reduceId, this, requestedSize,
-                                      jobConf, mapOutputFile, fetcher, true);
+      return new OnDiskMapOutput<K,V>(mapId, this, requestedSize, jobConf,
+         fetcher, true, FileSystem.getLocal(jobConf).getRaw(),
+         mapOutputFile.getInputFileForWrite(mapId.getTaskID(), requestedSize));
     }
     
     // Stall shuffle if we are above the memory limit

+ 18 - 15
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java

@@ -18,13 +18,11 @@
 package org.apache.hadoop.mapreduce.task.reduce;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -46,41 +44,46 @@ import com.google.common.annotations.VisibleForTesting;
 
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
+class OnDiskMapOutput<K, V> extends IFileWrappedMapOutput<K, V> {
   private static final Log LOG = LogFactory.getLog(OnDiskMapOutput.class);
   private final FileSystem fs;
   private final Path tmpOutputPath;
   private final Path outputPath;
-  private final MergeManagerImpl<K, V> merger;
   private final OutputStream disk; 
   private long compressedSize;
-  private final Configuration conf;
 
+  @Deprecated
   public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId,
                          MergeManagerImpl<K,V> merger, long size,
                          JobConf conf,
                          MapOutputFile mapOutputFile,
                          int fetcher, boolean primaryMapOutput)
       throws IOException {
-    this(mapId, reduceId, merger, size, conf, mapOutputFile, fetcher,
+    this(mapId, merger, size, conf, fetcher,
         primaryMapOutput, FileSystem.getLocal(conf).getRaw(),
         mapOutputFile.getInputFileForWrite(mapId.getTaskID(), size));
   }
 
-  @VisibleForTesting
+  @Deprecated
   OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId,
                          MergeManagerImpl<K,V> merger, long size,
                          JobConf conf,
                          MapOutputFile mapOutputFile,
                          int fetcher, boolean primaryMapOutput,
                          FileSystem fs, Path outputPath) throws IOException {
-    super(mapId, size, primaryMapOutput);
+    this(mapId, merger, size, conf, fetcher, primaryMapOutput, fs, outputPath);
+  }
+
+  OnDiskMapOutput(TaskAttemptID mapId,
+                  MergeManagerImpl<K, V> merger, long size,
+                  JobConf conf,
+                  int fetcher, boolean primaryMapOutput,
+                  FileSystem fs, Path outputPath) throws IOException {
+    super(conf, merger, mapId, size, primaryMapOutput);
     this.fs = fs;
-    this.merger = merger;
     this.outputPath = outputPath;
     tmpOutputPath = getTempPath(outputPath, fetcher);
     disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath));
-    this.conf = conf;
   }
 
   @VisibleForTesting
@@ -89,18 +92,18 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
   }
 
   @Override
-  public void shuffle(MapHost host, InputStream input,
+  protected void doShuffle(MapHost host, IFileInputStream input,
                       long compressedLength, long decompressedLength,
                       ShuffleClientMetrics metrics,
                       Reporter reporter) throws IOException {
-    input = new IFileInputStream(input, compressedLength, conf);
     // Copy data to local-disk
     long bytesLeft = compressedLength;
     try {
       final int BYTES_TO_READ = 64 * 1024;
       byte[] buf = new byte[BYTES_TO_READ];
       while (bytesLeft > 0) {
-        int n = ((IFileInputStream)input).readWithChecksum(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
+        int n = input.readWithChecksum(buf, 0,
+                                      (int) Math.min(bytesLeft, BYTES_TO_READ));
         if (n < 0) {
           throw new IOException("read past end of stream reading " + 
                                 getMapId());
@@ -117,7 +120,7 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
       disk.close();
     } catch (IOException ioe) {
       // Close the streams
-      IOUtils.cleanup(LOG, input, disk);
+      IOUtils.cleanup(LOG, disk);
 
       // Re-throw
       throw ioe;
@@ -139,7 +142,7 @@ class OnDiskMapOutput<K, V> extends MapOutput<K, V> {
     fs.rename(tmpOutputPath, outputPath);
     CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath,
         getSize(), this.compressedSize);
-    merger.closeOnDiskFile(compressAwarePath);
+    getMerger().closeOnDiskFile(compressAwarePath);
   }
   
   @Override

+ 12 - 15
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java

@@ -19,9 +19,7 @@
 package org.apache.hadoop.mapreduce.task.reduce;
 
 import java.io.FilterInputStream;
-
 import java.lang.Void;
-
 import java.net.HttpURLConnection;
 
 import org.apache.hadoop.fs.ChecksumException;
@@ -30,13 +28,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.MapOutputFile;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskID;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.rules.TestName;
-import static org.junit.Assert.*;
 
+import static org.junit.Assert.*;
 import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.*;
 
@@ -65,10 +62,11 @@ import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.Time;
 import org.junit.Test;
-
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import com.nimbusds.jose.util.StringUtils;
+
 /**
  * Test that the Fetcher does what we expect it to.
  */
@@ -453,9 +451,9 @@ public class TestFetcher {
     ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
     when(connection.getInputStream()).thenReturn(in);
     // 8 < 10 therefore there appear to be extra bytes in the IFileInputStream
-    InMemoryMapOutput<Text,Text> mapOut = new InMemoryMapOutput<Text, Text>(
+    IFileWrappedMapOutput<Text,Text> mapOut = new InMemoryMapOutput<Text, Text>(
         job, map1ID, mm, 8, null, true );
-    InMemoryMapOutput<Text,Text> mapOut2 = new InMemoryMapOutput<Text, Text>(
+    IFileWrappedMapOutput<Text,Text> mapOut2 = new InMemoryMapOutput<Text, Text>(
         job, map2ID, mm, 10, null, true );
 
     when(mm.reserve(eq(map1ID), anyLong(), anyInt())).thenReturn(mapOut);
@@ -478,9 +476,9 @@ public class TestFetcher {
     Path shuffledToDisk =
         OnDiskMapOutput.getTempPath(onDiskMapOutputPath, fetcher);
     fs = FileSystem.getLocal(job).getRaw();
-    MapOutputFile mof = mock(MapOutputFile.class);
-    OnDiskMapOutput<Text,Text> odmo = new OnDiskMapOutput<Text,Text>(map1ID,
-        id, mm, 100L, job, mof, fetcher, true, fs, onDiskMapOutputPath);
+    IFileWrappedMapOutput<Text,Text> odmo =
+        new OnDiskMapOutput<Text,Text>(map1ID, mm, 100L, job, fetcher, true,
+                                       fs, onDiskMapOutputPath);
 
     String mapData = "MAPDATA12345678901234567890";
 
@@ -538,7 +536,7 @@ public class TestFetcher {
   @Test(timeout=10000)
   public void testInterruptInMemory() throws Exception {
     final int FETCHER = 2;
-    InMemoryMapOutput<Text,Text> immo = spy(new InMemoryMapOutput<Text,Text>(
+    IFileWrappedMapOutput<Text,Text> immo = spy(new InMemoryMapOutput<Text,Text>(
           job, id, mm, 100, null, true));
     when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
         .thenReturn(immo);
@@ -584,10 +582,9 @@ public class TestFetcher {
     Path p = new Path("file:///tmp/foo");
     Path pTmp = OnDiskMapOutput.getTempPath(p, FETCHER);
     FileSystem mFs = mock(FileSystem.class, RETURNS_DEEP_STUBS);
-    MapOutputFile mof = mock(MapOutputFile.class);
-    when(mof.getInputFileForWrite(any(TaskID.class), anyLong())).thenReturn(p);
-    OnDiskMapOutput<Text,Text> odmo = spy(new OnDiskMapOutput<Text,Text>(map1ID,
-        id, mm, 100L, job, mof, FETCHER, true, mFs, p));
+    IFileWrappedMapOutput<Text,Text> odmo =
+        spy(new OnDiskMapOutput<Text,Text>(map1ID, mm, 100L, job,
+                                           FETCHER, true, mFs, p));
     when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
         .thenReturn(odmo);
     doNothing().when(mm).waitForResource();