Sfoglia il codice sorgente

Merge MAPREDUCE-3289 from trunk. Make use of fadvise in the NM's shuffle handler. (Contributed by Todd Lipcon and Siddharth Seth)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1368722 13f79535-47bb-0310-9956-ffa450edef68
Siddharth Seth 12 anni fa
parent
commit
f7545f076e

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -68,6 +68,9 @@ Release 2.1.0-alpha - Unreleased
     MAPREDUCE-4427. Added an 'unmanaged' mode for AMs so as to ease
     development of new applications. (Bikas Saha via acmurthy) 
 
+    MAPREDUCE-3289. Make use of fadvise in the NM's shuffle handler.
+    (Todd Lipcon and Siddharth Seth via sseth)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 80 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java

@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.jboss.netty.handler.stream.ChunkedFile;
+
+public class FadvisedChunkedFile extends ChunkedFile {
+
+  private static final Log LOG = LogFactory.getLog(FadvisedChunkedFile.class);
+
+  private final boolean manageOsCache;
+  private final int readaheadLength;
+  private final ReadaheadPool readaheadPool;
+  private final FileDescriptor fd;
+  private final String identifier;
+
+  private ReadaheadRequest readaheadRequest;
+
+  public FadvisedChunkedFile(RandomAccessFile file, long position, long count,
+      int chunkSize, boolean manageOsCache, int readaheadLength,
+      ReadaheadPool readaheadPool, String identifier) throws IOException {
+    super(file, position, count, chunkSize);
+    this.manageOsCache = manageOsCache;
+    this.readaheadLength = readaheadLength;
+    this.readaheadPool = readaheadPool;
+    this.fd = file.getFD();
+    this.identifier = identifier;
+  }
+
+  @Override
+  public Object nextChunk() throws Exception {
+    if (manageOsCache && readaheadPool != null) {
+      readaheadRequest = readaheadPool
+          .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength,
+              getEndOffset(), readaheadRequest);
+    }
+    return super.nextChunk();
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (readaheadRequest != null) {
+      readaheadRequest.cancel();
+    }
+    if (manageOsCache && getEndOffset() - getStartOffset() > 0) {
+      try {
+        NativeIO.posixFadviseIfPossible(fd, getStartOffset(), getEndOffset()
+            - getStartOffset(), NativeIO.POSIX_FADV_DONTNEED);
+      } catch (Throwable t) {
+        LOG.warn("Failed to manage OS cache for " + identifier, t);
+      }
+    }
+    super.close();
+  }
+}

+ 82 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java

@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.jboss.netty.channel.DefaultFileRegion;
+
+public class FadvisedFileRegion extends DefaultFileRegion {
+
+  private static final Log LOG = LogFactory.getLog(FadvisedFileRegion.class);
+
+  private final boolean manageOsCache;
+  private final int readaheadLength;
+  private final ReadaheadPool readaheadPool;
+  private final FileDescriptor fd;
+  private final String identifier;
+
+  private ReadaheadRequest readaheadRequest;
+
+  public FadvisedFileRegion(RandomAccessFile file, long position, long count,
+      boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
+      String identifier) throws IOException {
+    super(file.getChannel(), position, count);
+    this.manageOsCache = manageOsCache;
+    this.readaheadLength = readaheadLength;
+    this.readaheadPool = readaheadPool;
+    this.fd = file.getFD();
+    this.identifier = identifier;
+  }
+
+  @Override
+  public long transferTo(WritableByteChannel target, long position)
+      throws IOException {
+    if (manageOsCache && readaheadPool != null) {
+      readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
+          getPosition() + position, readaheadLength,
+          getPosition() + getCount(), readaheadRequest);
+    }
+    return super.transferTo(target, position);
+  }
+
+  @Override
+  public void releaseExternalResources() {
+    if (readaheadRequest != null) {
+      readaheadRequest.cancel();
+    }
+    if (manageOsCache && getCount() > 0) {
+      try {
+        NativeIO.posixFadviseIfPossible(fd, getPosition(), getCount(),
+            NativeIO.POSIX_FADV_DONTNEED);
+      } catch (Throwable t) {
+        LOG.warn("Failed to manage OS cache for " + identifier, t);
+      }
+    }
+    super.releaseExternalResources();
+  }
+}

+ 37 - 15
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java

@@ -55,6 +55,7 @@ import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.security.ssl.SSLFactory;
@@ -86,9 +87,7 @@ import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.DefaultFileRegion;
 import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.FileRegion;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.jboss.netty.channel.group.ChannelGroup;
@@ -104,7 +103,6 @@ import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.jboss.netty.handler.codec.http.QueryStringDecoder;
 import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedFile;
 import org.jboss.netty.handler.stream.ChunkedWriteHandler;
 import org.jboss.netty.util.CharsetUtil;
 
@@ -114,6 +112,12 @@ public class ShuffleHandler extends AbstractService
     implements AuxServices.AuxiliaryService {
 
   private static final Log LOG = LogFactory.getLog(ShuffleHandler.class);
+  
+  public static final String SHUFFLE_MANAGE_OS_CACHE = "mapreduce.shuffle.manage.os.cache";
+  public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
+
+  public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes";
+  public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
 
   private int port;
   private ChannelFactory selector;
@@ -121,6 +125,15 @@ public class ShuffleHandler extends AbstractService
   private HttpPipelineFactory pipelineFact;
   private int sslFileBufferSize;
 
+  /**
+   * Should the shuffle use posix_fadvise calls to manage the OS cache during
+   * sendfile
+   */
+  private boolean manageOsCache;
+  private int readaheadLength;
+  private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
+   
+
   public static final String MAPREDUCE_SHUFFLE_SERVICEID =
       "mapreduce.shuffle";
 
@@ -242,6 +255,12 @@ public class ShuffleHandler extends AbstractService
 
   @Override
   public synchronized void init(Configuration conf) {
+    manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
+        DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
+
+    readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
+        DEFAULT_SHUFFLE_READAHEAD_BYTES);
+    
     ThreadFactory bossFactory = new ThreadFactoryBuilder()
       .setNameFormat("ShuffleHandler Netty Boss #%d")
       .build();
@@ -503,14 +522,14 @@ public class ShuffleHandler extends AbstractService
           base + "/file.out", conf);
       LOG.debug("DEBUG1 " + base + " : " + mapOutputFileName + " : " +
           indexFileName);
-      IndexRecord info = 
+      final IndexRecord info = 
         indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
       final ShuffleHeader header =
         new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce);
       final DataOutputBuffer dob = new DataOutputBuffer();
       header.write(dob);
       ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
-      File spillfile = new File(mapOutputFileName.toString());
+      final File spillfile = new File(mapOutputFileName.toString());
       RandomAccessFile spill;
       try {
         spill = new RandomAccessFile(spillfile, "r");
@@ -520,22 +539,25 @@ public class ShuffleHandler extends AbstractService
       }
       ChannelFuture writeFuture;
       if (ch.getPipeline().get(SslHandler.class) == null) {
-        final FileRegion partition = new DefaultFileRegion(
-            spill.getChannel(), info.startOffset, info.partLength);
+        final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
+            info.startOffset, info.partLength, manageOsCache, readaheadLength,
+            readaheadPool, spillfile.getAbsolutePath());
         writeFuture = ch.write(partition);
         writeFuture.addListener(new ChannelFutureListener() {
             // TODO error handling; distinguish IO/connection failures,
             //      attribute to appropriate spill output
-            @Override
-            public void operationComplete(ChannelFuture future) {
-              partition.releaseExternalResources();
-            }
-          });
+          @Override
+          public void operationComplete(ChannelFuture future) {
+            partition.releaseExternalResources();
+          }
+        });
       } else {
         // HTTPS cannot be done with zero copy.
-        writeFuture = ch.write(new ChunkedFile(spill, info.startOffset,
-                                               info.partLength,
-                                               sslFileBufferSize));
+        final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
+            info.startOffset, info.partLength, sslFileBufferSize,
+            manageOsCache, readaheadLength, readaheadPool,
+            spillfile.getAbsolutePath());
+        writeFuture = ch.write(chunk);
       }
       metrics.shuffleConnections.incr();
       metrics.shuffleOutputBytes.incr(info.partLength); // optimistic