Explorar el Código

HDFS-4817. Make HDFS advisory caching configurable on a per-file basis. (Contributed by Colin Patrick McCabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.1-beta@1526961 13f79535-47bb-0310-9956-ffa450edef68
Colin McCabe hace 11 años
padre
commit
9fb4379807
Se han modificado 35 ficheros con 883 adiciones y 106 borrados
  1. 3 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 41 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanSetDropBehind.java
  3. 40 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanSetReadahead.java
  4. 25 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
  5. 17 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java
  6. 14 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
  7. 1 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ReadaheadPool.java
  8. 13 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
  9. 7 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
  10. 26 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  11. 3 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  12. 40 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
  13. 14 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
  14. 5 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
  15. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
  16. 6 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
  17. 18 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
  18. 20 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
  19. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
  20. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
  21. 43 18
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
  22. 76 29
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
  23. 5 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  24. 13 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
  25. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
  26. 7 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
  27. 45 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
  28. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
  29. 12 10
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
  30. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
  31. 369 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java
  32. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
  33. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
  34. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java
  35. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -18,6 +18,9 @@ Release 2.1.2 - UNRELEASED
 
 
   NEW FEATURES
   NEW FEATURES
 
 
+    HDFS-4817.  Make HDFS advisory caching configurable on a per-file basis.
+    (Contributed by Colin Patrick McCabe)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     HADOOP-9948. Add a config value to CLITestHelper to skip tests on Windows.
     HADOOP-9948. Add a config value to CLITestHelper to skip tests on Windows.

+ 41 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanSetDropBehind.java

@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface CanSetDropBehind {
+  /**
+   * Configure whether the stream should drop the cache.
+   *
+   * @param dropCache     Whether to drop the cache.  null means to use the
+   *                      default value.
+   * @throws IOException  If there was an error changing the dropBehind
+   *                      setting.
+   *         UnsupportedOperationException  If this stream doesn't support
+   *                                        setting the drop-behind.
+   */
+  public void setDropBehind(Boolean dropCache) 
+      throws IOException, UnsupportedOperationException;
+}

+ 40 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanSetReadahead.java

@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface CanSetReadahead {
+  /**
+   * Set the readahead on this stream.
+   *
+   * @param readahead     The readahead to use.  null means to use the default.
+   * @throws IOException  If there was an error changing the dropBehind
+   *                      setting.
+   *         UnsupportedOperationException  If this stream doesn't support
+   *                                        setting readahead. 
+   */
+  public void setReadahead(Long readahead)
+    throws IOException, UnsupportedOperationException;
+}

+ 25 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java

@@ -28,7 +28,8 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceAudience.Public
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 @InterfaceStability.Stable
 public class FSDataInputStream extends DataInputStream
 public class FSDataInputStream extends DataInputStream
-    implements Seekable, PositionedReadable, Closeable, ByteBufferReadable, HasFileDescriptor {
+    implements Seekable, PositionedReadable, Closeable,
+    ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead {
 
 
   public FSDataInputStream(InputStream in)
   public FSDataInputStream(InputStream in)
     throws IOException {
     throws IOException {
@@ -143,4 +144,27 @@ public class FSDataInputStream extends DataInputStream
       return null;
       return null;
     }
     }
   }
   }
+
+  @Override
+  public void setReadahead(Long readahead)
+      throws IOException, UnsupportedOperationException {
+    try {
+      ((CanSetReadahead)in).setReadahead(readahead);
+    } catch (ClassCastException e) {
+      throw new UnsupportedOperationException(
+          "this stream does not support setting the readahead " +
+          "caching strategy.");
+    }
+  }
+
+  @Override
+  public void setDropBehind(Boolean dropBehind)
+      throws IOException, UnsupportedOperationException {
+    try {
+      ((CanSetDropBehind)in).setDropBehind(dropBehind);
+    } catch (ClassCastException e) {
+      throw new UnsupportedOperationException("this stream does not " +
+          "support setting the drop-behind caching setting.");
+    }
+  }
 }
 }

+ 17 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java

@@ -18,6 +18,10 @@
 package org.apache.hadoop.fs;
 package org.apache.hadoop.fs;
 
 
 import java.io.*;
 import java.io.*;
+import java.io.DataOutputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
 
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -27,8 +31,9 @@ import org.apache.hadoop.classification.InterfaceStability;
  * file. */
  * file. */
 @InterfaceAudience.Public
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 @InterfaceStability.Stable
-public class FSDataOutputStream extends DataOutputStream implements Syncable {
-  private OutputStream wrappedStream;
+public class FSDataOutputStream extends DataOutputStream
+    implements Syncable, CanSetDropBehind {
+  private final OutputStream wrappedStream;
 
 
   private static class PositionCache extends FilterOutputStream {
   private static class PositionCache extends FilterOutputStream {
     private FileSystem.Statistics statistics;
     private FileSystem.Statistics statistics;
@@ -134,4 +139,14 @@ public class FSDataOutputStream extends DataOutputStream implements Syncable {
       wrappedStream.flush();
       wrappedStream.flush();
     }
     }
   }
   }
+
+  @Override
+  public void setDropBehind(Boolean dropBehind) throws IOException {
+    try {
+      ((CanSetDropBehind)wrappedStream).setDropBehind(dropBehind);
+    } catch (ClassCastException e) {
+      throw new UnsupportedOperationException("the wrapped stream does " +
+          "not support setting the drop-behind caching setting.");
+    }
+  }
 }
 }

+ 14 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java

@@ -824,7 +824,8 @@ public class HarFileSystem extends FilterFileSystem {
     /**
     /**
      * Create an input stream that fakes all the reads/positions/seeking.
      * Create an input stream that fakes all the reads/positions/seeking.
      */
      */
-    private static class HarFsInputStream extends FSInputStream {
+    private static class HarFsInputStream extends FSInputStream
+        implements CanSetDropBehind, CanSetReadahead {
       private long position, start, end;
       private long position, start, end;
       //The underlying data input stream that the
       //The underlying data input stream that the
       // underlying filesystem will return.
       // underlying filesystem will return.
@@ -971,7 +972,18 @@ public class HarFileSystem extends FilterFileSystem {
       public void readFully(long pos, byte[] b) throws IOException {
       public void readFully(long pos, byte[] b) throws IOException {
           readFully(pos, b, 0, b.length);
           readFully(pos, b, 0, b.length);
       }
       }
-      
+
+      @Override
+      public void setReadahead(Long readahead)
+          throws IOException, UnsupportedEncodingException {
+        underLyingStream.setReadahead(readahead);
+      }
+
+      @Override
+      public void setDropBehind(Boolean dropBehind)
+          throws IOException, UnsupportedEncodingException {
+        underLyingStream.setDropBehind(dropBehind);
+      }
     }
     }
   
   
     /**
     /**

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ReadaheadPool.java

@@ -203,7 +203,7 @@ public class ReadaheadPool {
       // It's also possible that we'll end up requesting readahead on some
       // It's also possible that we'll end up requesting readahead on some
       // other FD, which may be wasted work, but won't cause a problem.
       // other FD, which may be wasted work, but won't cause a problem.
       try {
       try {
-        NativeIO.POSIX.posixFadviseIfPossible(fd, off, len,
+        NativeIO.POSIX.posixFadviseIfPossible(identifier, fd, off, len,
             NativeIO.POSIX.POSIX_FADV_WILLNEED);
             NativeIO.POSIX.POSIX_FADV_WILLNEED);
       } catch (IOException ioe) {
       } catch (IOException ioe) {
         if (canceled) {
         if (canceled) {

+ 13 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java

@@ -37,6 +37,8 @@ import org.apache.hadoop.util.Shell;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
 /**
  * JNI wrappers for various native IO-related calls not available in Java.
  * JNI wrappers for various native IO-related calls not available in Java.
  * These functions should generally be used alongside a fallback to another
  * These functions should generally be used alongside a fallback to another
@@ -92,6 +94,9 @@ public class NativeIO {
 
 
     private static final Log LOG = LogFactory.getLog(NativeIO.class);
     private static final Log LOG = LogFactory.getLog(NativeIO.class);
 
 
+    @VisibleForTesting
+    public static CacheTracker cacheTracker = null;
+    
     private static boolean nativeLoaded = false;
     private static boolean nativeLoaded = false;
     private static boolean fadvisePossible = true;
     private static boolean fadvisePossible = true;
     private static boolean syncFileRangePossible = true;
     private static boolean syncFileRangePossible = true;
@@ -102,6 +107,10 @@ public class NativeIO {
 
 
     private static long cacheTimeout = -1;
     private static long cacheTimeout = -1;
 
 
+    public static interface CacheTracker {
+      public void fadvise(String identifier, long offset, long len, int flags);
+    }
+    
     static {
     static {
       if (NativeCodeLoader.isNativeCodeLoaded()) {
       if (NativeCodeLoader.isNativeCodeLoaded()) {
         try {
         try {
@@ -178,9 +187,12 @@ public class NativeIO {
      *
      *
      * @throws NativeIOException if there is an error with the syscall
      * @throws NativeIOException if there is an error with the syscall
      */
      */
-    public static void posixFadviseIfPossible(
+    public static void posixFadviseIfPossible(String identifier,
         FileDescriptor fd, long offset, long len, int flags)
         FileDescriptor fd, long offset, long len, int flags)
         throws NativeIOException {
         throws NativeIOException {
+      if (cacheTracker != null) {
+        cacheTracker.fadvise(identifier, offset, len, flags);
+      }
       if (nativeLoaded && fadvisePossible) {
       if (nativeLoaded && fadvisePossible) {
         try {
         try {
           posix_fadvise(fd, offset, len, flags);
           posix_fadvise(fd, offset, len, flags);

+ 7 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java

@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.DomainSocket;
@@ -85,7 +86,8 @@ public class BlockReaderFactory {
                                      DomainSocketFactory domSockFactory,
                                      DomainSocketFactory domSockFactory,
                                      PeerCache peerCache,
                                      PeerCache peerCache,
                                      FileInputStreamCache fisCache,
                                      FileInputStreamCache fisCache,
-                                     boolean allowShortCircuitLocalReads)
+                                     boolean allowShortCircuitLocalReads,
+                                     CachingStrategy cachingStrategy)
   throws IOException {
   throws IOException {
     peer.setReadTimeout(conf.socketTimeout);
     peer.setReadTimeout(conf.socketTimeout);
     peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT);
     peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT);
@@ -122,12 +124,14 @@ public class BlockReaderFactory {
       @SuppressWarnings("deprecation")
       @SuppressWarnings("deprecation")
       RemoteBlockReader reader = RemoteBlockReader.newBlockReader(file,
       RemoteBlockReader reader = RemoteBlockReader.newBlockReader(file,
           block, blockToken, startOffset, len, conf.ioBufferSize,
           block, blockToken, startOffset, len, conf.ioBufferSize,
-          verifyChecksum, clientName, peer, datanodeID, peerCache);
+          verifyChecksum, clientName, peer, datanodeID, peerCache,
+          cachingStrategy);
       return reader;
       return reader;
     } else {
     } else {
       return RemoteBlockReader2.newBlockReader(
       return RemoteBlockReader2.newBlockReader(
           file, block, blockToken, startOffset, len,
           file, block, blockToken, startOffset, len,
-          verifyChecksum, clientName, peer, datanodeID, peerCache);
+          verifyChecksum, clientName, peer, datanodeID, peerCache,
+          cachingStrategy);
     }
     }
   }
   }
 
 

+ 26 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -44,6 +44,9 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIR
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
@@ -136,6 +139,7 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -202,6 +206,8 @@ public class DFSClient implements java.io.Closeable {
   private SocketAddress[] localInterfaceAddrs;
   private SocketAddress[] localInterfaceAddrs;
   private DataEncryptionKey encryptionKey;
   private DataEncryptionKey encryptionKey;
   private boolean shouldUseLegacyBlockReaderLocal;
   private boolean shouldUseLegacyBlockReaderLocal;
+  private final CachingStrategy defaultReadCachingStrategy;
+  private final CachingStrategy defaultWriteCachingStrategy;
   
   
   /**
   /**
    * DFSClient configuration 
    * DFSClient configuration 
@@ -520,6 +526,16 @@ public class DFSClient implements java.io.Closeable {
     }
     }
     
     
     this.peerCache = PeerCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
     this.peerCache = PeerCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
+    Boolean readDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS) == null) ?
+        null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_READS, false);
+    Long readahead = (conf.get(DFS_CLIENT_CACHE_READAHEAD) == null) ?
+        null : conf.getLong(DFS_CLIENT_CACHE_READAHEAD, 0);
+    Boolean writeDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES) == null) ?
+        null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, false);
+    this.defaultReadCachingStrategy =
+        new CachingStrategy(readDropBehind, readahead);
+    this.defaultWriteCachingStrategy =
+        new CachingStrategy(writeDropBehind, readahead);
   }
   }
   
   
   /**
   /**
@@ -1990,7 +2006,8 @@ public class DFSClient implements java.io.Closeable {
           HdfsConstants.SMALL_BUFFER_SIZE));
           HdfsConstants.SMALL_BUFFER_SIZE));
       DataInputStream in = new DataInputStream(pair.in);
       DataInputStream in = new DataInputStream(pair.in);
   
   
-      new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true);
+      new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName,
+          0, 1, true, CachingStrategy.newDefaultStrategy());
       final BlockOpResponseProto reply =
       final BlockOpResponseProto reply =
           BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
           BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
       
       
@@ -2489,4 +2506,12 @@ public class DFSClient implements java.io.Closeable {
   public boolean useLegacyBlockReaderLocal() {
   public boolean useLegacyBlockReaderLocal() {
     return shouldUseLegacyBlockReaderLocal;
     return shouldUseLegacyBlockReaderLocal;
   }
   }
+
+  public CachingStrategy getDefaultReadCachingStrategy() {
+    return defaultReadCachingStrategy;
+  }
+
+  public CachingStrategy getDefaultWriteCachingStrategy() {
+    return defaultWriteCachingStrategy;
+  }
 }
 }

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -54,6 +54,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
   public static final int     DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
   public static final String  DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname";
   public static final String  DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname";
   public static final boolean DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT = false;
   public static final boolean DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT = false;
+  public static final String  DFS_CLIENT_CACHE_DROP_BEHIND_WRITES = "dfs.client.cache.drop.behind.writes";
+  public static final String  DFS_CLIENT_CACHE_DROP_BEHIND_READS = "dfs.client.cache.drop.behind.reads";
+  public static final String  DFS_CLIENT_CACHE_READAHEAD = "dfs.client.cache.readahead";
   public static final String  DFS_HDFS_BLOCKS_METADATA_ENABLED = "dfs.datanode.hdfs-blocks-metadata.enabled";
   public static final String  DFS_HDFS_BLOCKS_METADATA_ENABLED = "dfs.datanode.hdfs-blocks-metadata.enabled";
   public static final boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false;
   public static final boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false;
   public static final String  DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS = "dfs.client.file-block-storage-locations.num-threads";
   public static final String  DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS = "dfs.client.file-block-storage-locations.num-threads";

+ 40 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java

@@ -36,6 +36,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanSetDropBehind;
+import org.apache.hadoop.fs.CanSetReadahead;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.UnresolvedLinkException;
@@ -50,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RemoteException;
@@ -65,7 +68,8 @@ import com.google.common.annotations.VisibleForTesting;
  * negotiation of the namenode and various datanodes as necessary.
  * negotiation of the namenode and various datanodes as necessary.
  ****************************************************************/
  ****************************************************************/
 @InterfaceAudience.Private
 @InterfaceAudience.Private
-public class DFSInputStream extends FSInputStream implements ByteBufferReadable {
+public class DFSInputStream extends FSInputStream
+implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead {
   @VisibleForTesting
   @VisibleForTesting
   static boolean tcpReadsDisabledForTesting = false;
   static boolean tcpReadsDisabledForTesting = false;
   private final PeerCache peerCache;
   private final PeerCache peerCache;
@@ -80,6 +84,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
   private LocatedBlock currentLocatedBlock = null;
   private LocatedBlock currentLocatedBlock = null;
   private long pos = 0;
   private long pos = 0;
   private long blockEnd = -1;
   private long blockEnd = -1;
+  private CachingStrategy cachingStrategy;
   private final ReadStatistics readStatistics = new ReadStatistics();
   private final ReadStatistics readStatistics = new ReadStatistics();
 
 
   public static class ReadStatistics {
   public static class ReadStatistics {
@@ -185,6 +190,8 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
     this.fileInputStreamCache = new FileInputStreamCache(
     this.fileInputStreamCache = new FileInputStreamCache(
         dfsClient.getConf().shortCircuitStreamsCacheSize,
         dfsClient.getConf().shortCircuitStreamsCacheSize,
         dfsClient.getConf().shortCircuitStreamsCacheExpiryMs);
         dfsClient.getConf().shortCircuitStreamsCacheExpiryMs);
+    this.cachingStrategy =
+        dfsClient.getDefaultReadCachingStrategy().duplicate();
     openInfo();
     openInfo();
   }
   }
 
 
@@ -1035,7 +1042,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
             dfsClient.getConf(), file, block, blockToken, startOffset,
             dfsClient.getConf(), file, block, blockToken, startOffset,
             len, verifyChecksum, clientName, peer, chosenNode, 
             len, verifyChecksum, clientName, peer, chosenNode, 
             dsFactory, peerCache, fileInputStreamCache,
             dsFactory, peerCache, fileInputStreamCache,
-            allowShortCircuitLocalReads);
+            allowShortCircuitLocalReads, cachingStrategy);
         return reader;
         return reader;
       } catch (IOException ex) {
       } catch (IOException ex) {
         DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " +
         DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " +
@@ -1058,7 +1065,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
             dfsClient.getConf(), file, block, blockToken, startOffset,
             dfsClient.getConf(), file, block, blockToken, startOffset,
             len, verifyChecksum, clientName, peer, chosenNode,
             len, verifyChecksum, clientName, peer, chosenNode,
             dsFactory, peerCache, fileInputStreamCache,
             dsFactory, peerCache, fileInputStreamCache,
-            allowShortCircuitLocalReads);
+            allowShortCircuitLocalReads, cachingStrategy);
         return reader;
         return reader;
       } catch (IOException e) {
       } catch (IOException e) {
         DFSClient.LOG.warn("failed to connect to " + domSock, e);
         DFSClient.LOG.warn("failed to connect to " + domSock, e);
@@ -1081,7 +1088,8 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
         reader = BlockReaderFactory.newBlockReader(
         reader = BlockReaderFactory.newBlockReader(
             dfsClient.getConf(), file, block, blockToken, startOffset,
             dfsClient.getConf(), file, block, blockToken, startOffset,
             len, verifyChecksum, clientName, peer, chosenNode, 
             len, verifyChecksum, clientName, peer, chosenNode, 
-            dsFactory, peerCache, fileInputStreamCache, false);
+            dsFactory, peerCache, fileInputStreamCache, false,
+            cachingStrategy);
         return reader;
         return reader;
       } catch (IOException ex) {
       } catch (IOException ex) {
         DFSClient.LOG.debug("Error making BlockReader. Closing stale " +
         DFSClient.LOG.debug("Error making BlockReader. Closing stale " +
@@ -1100,7 +1108,8 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
     return BlockReaderFactory.newBlockReader(
     return BlockReaderFactory.newBlockReader(
         dfsClient.getConf(), file, block, blockToken, startOffset,
         dfsClient.getConf(), file, block, blockToken, startOffset,
         len, verifyChecksum, clientName, peer, chosenNode, 
         len, verifyChecksum, clientName, peer, chosenNode, 
-        dsFactory, peerCache, fileInputStreamCache, false);
+        dsFactory, peerCache, fileInputStreamCache, false,
+        cachingStrategy);
   }
   }
 
 
 
 
@@ -1358,4 +1367,30 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
   public synchronized ReadStatistics getReadStatistics() {
   public synchronized ReadStatistics getReadStatistics() {
     return new ReadStatistics(readStatistics);
     return new ReadStatistics(readStatistics);
   }
   }
+
+  private synchronized void closeCurrentBlockReader() {
+    if (blockReader == null) return;
+    // Close the current block reader so that the new caching settings can 
+    // take effect immediately.
+    try {
+      blockReader.close();
+    } catch (IOException e) {
+      DFSClient.LOG.error("error closing blockReader", e);
+    }
+    blockReader = null;
+  }
+
+  @Override
+  public synchronized void setReadahead(Long readahead)
+      throws IOException {
+    this.cachingStrategy.setReadahead(readahead);
+    closeCurrentBlockReader();
+  }
+
+  @Override
+  public synchronized void setDropBehind(Boolean dropBehind)
+      throws IOException {
+    this.cachingStrategy.setDropBehind(dropBehind);
+    closeCurrentBlockReader();
+  }
 }
 }

+ 14 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

@@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.CanSetDropBehind;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSOutputSummer;
 import org.apache.hadoop.fs.FSOutputSummer;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -71,6 +72,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.EnumSetWritable;
@@ -83,6 +85,7 @@ import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Time;
+import org.mortbay.log.Log;
 
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheBuilder;
@@ -115,7 +118,8 @@ import com.google.common.cache.RemovalNotification;
  * starts sending packets from the dataQueue.
  * starts sending packets from the dataQueue.
 ****************************************************************/
 ****************************************************************/
 @InterfaceAudience.Private
 @InterfaceAudience.Private
-public class DFSOutputStream extends FSOutputSummer implements Syncable {
+public class DFSOutputStream extends FSOutputSummer
+    implements Syncable, CanSetDropBehind {
   private final DFSClient dfsClient;
   private final DFSClient dfsClient;
   private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
   private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
   private Socket s;
   private Socket s;
@@ -147,6 +151,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
   private Progressable progress;
   private Progressable progress;
   private final short blockReplication; // replication factor of file
   private final short blockReplication; // replication factor of file
   private boolean shouldSyncBlock = false; // force blocks to disk upon close
   private boolean shouldSyncBlock = false; // force blocks to disk upon close
+  private CachingStrategy cachingStrategy;
   
   
   private class Packet {
   private class Packet {
     long    seqno;               // sequencenumber of buffer in block
     long    seqno;               // sequencenumber of buffer in block
@@ -1143,7 +1148,8 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
           // send the request
           // send the request
           new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
           new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
               nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, 
               nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, 
-              nodes.length, block.getNumBytes(), bytesSent, newGS, checksum);
+              nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
+              cachingStrategy);
   
   
           // receive ack for connect
           // receive ack for connect
           BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
           BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
@@ -1340,6 +1346,8 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
     this.blockSize = stat.getBlockSize();
     this.blockSize = stat.getBlockSize();
     this.blockReplication = stat.getReplication();
     this.blockReplication = stat.getReplication();
     this.progress = progress;
     this.progress = progress;
+    this.cachingStrategy =
+        dfsClient.getDefaultWriteCachingStrategy().duplicate();
     if ((progress != null) && DFSClient.LOG.isDebugEnabled()) {
     if ((progress != null) && DFSClient.LOG.isDebugEnabled()) {
       DFSClient.LOG.debug(
       DFSClient.LOG.debug(
           "Set non-null progress callback on DFSOutputStream " + src);
           "Set non-null progress callback on DFSOutputStream " + src);
@@ -1937,4 +1945,8 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
     return streamer.getBlockToken();
     return streamer.getBlockToken();
   }
   }
 
 
+  @Override
+  public void setDropBehind(Boolean dropBehind) throws IOException {
+    this.cachingStrategy.setDropBehind(dropBehind);
+  }
 }
 }

+ 5 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumIn
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
@@ -381,13 +382,14 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
                                      int bufferSize, boolean verifyChecksum,
                                      int bufferSize, boolean verifyChecksum,
                                      String clientName, Peer peer,
                                      String clientName, Peer peer,
                                      DatanodeID datanodeID,
                                      DatanodeID datanodeID,
-                                     PeerCache peerCache)
-                                     throws IOException {
+                                     PeerCache peerCache,
+                                     CachingStrategy cachingStrategy)
+                                       throws IOException {
     // in and out will be closed when sock is closed (by the caller)
     // in and out will be closed when sock is closed (by the caller)
     final DataOutputStream out =
     final DataOutputStream out =
         new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
         new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
     new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
     new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
-        verifyChecksum);
+        verifyChecksum, cachingStrategy);
     
     
     //
     //
     // Get bytes in block, set streams
     // Get bytes in block, set streams

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java

@@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DataChecksum;
@@ -375,12 +376,13 @@ public class RemoteBlockReader2  implements BlockReader {
                                      boolean verifyChecksum,
                                      boolean verifyChecksum,
                                      String clientName,
                                      String clientName,
                                      Peer peer, DatanodeID datanodeID,
                                      Peer peer, DatanodeID datanodeID,
-                                     PeerCache peerCache) throws IOException {
+                                     PeerCache peerCache,
+                                     CachingStrategy cachingStrategy) throws IOException {
     // in and out will be closed when sock is closed (by the caller)
     // in and out will be closed when sock is closed (by the caller)
     final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
     final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
           peer.getOutputStream()));
           peer.getOutputStream()));
     new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
     new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
-        verifyChecksum);
+        verifyChecksum, cachingStrategy);
 
 
     //
     //
     // Get bytes in block
     // Get bytes in block

+ 6 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DataChecksum;
 
 
@@ -57,13 +58,15 @@ public interface DataTransferProtocol {
    * @param length maximum number of bytes for this read.
    * @param length maximum number of bytes for this read.
    * @param sendChecksum if false, the DN should skip reading and sending
    * @param sendChecksum if false, the DN should skip reading and sending
    *        checksums
    *        checksums
+   * @param cachingStrategy  The caching strategy to use.
    */
    */
   public void readBlock(final ExtendedBlock blk,
   public void readBlock(final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> blockToken,
       final Token<BlockTokenIdentifier> blockToken,
       final String clientName,
       final String clientName,
       final long blockOffset,
       final long blockOffset,
       final long length,
       final long length,
-      final boolean sendChecksum) throws IOException;
+      final boolean sendChecksum,
+      final CachingStrategy cachingStrategy) throws IOException;
 
 
   /**
   /**
    * Write a block to a datanode pipeline.
    * Write a block to a datanode pipeline.
@@ -89,7 +92,8 @@ public interface DataTransferProtocol {
       final long minBytesRcvd,
       final long minBytesRcvd,
       final long maxBytesRcvd,
       final long maxBytesRcvd,
       final long latestGenerationStamp,
       final long latestGenerationStamp,
-      final DataChecksum requestedChecksum) throws IOException;
+      final DataChecksum requestedChecksum,
+      final CachingStrategy cachingStrategy) throws IOException;
 
 
   /**
   /**
    * Transfer a block to another datanode.
    * Transfer a block to another datanode.

+ 18 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java

@@ -31,8 +31,10 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 
 
 /** Receiver */
 /** Receiver */
 @InterfaceAudience.Private
 @InterfaceAudience.Private
@@ -85,6 +87,14 @@ public abstract class Receiver implements DataTransferProtocol {
     }
     }
   }
   }
 
 
+  static private CachingStrategy getCachingStrategy(CachingStrategyProto strategy) {
+    Boolean dropBehind = strategy.hasDropBehind() ?
+        strategy.getDropBehind() : null;
+    Long readahead = strategy.hasReadahead() ?
+        strategy.getReadahead() : null;
+    return new CachingStrategy(dropBehind, readahead);
+  }
+
   /** Receive OP_READ_BLOCK */
   /** Receive OP_READ_BLOCK */
   private void opReadBlock() throws IOException {
   private void opReadBlock() throws IOException {
     OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
     OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
@@ -93,7 +103,10 @@ public abstract class Receiver implements DataTransferProtocol {
         proto.getHeader().getClientName(),
         proto.getHeader().getClientName(),
         proto.getOffset(),
         proto.getOffset(),
         proto.getLen(),
         proto.getLen(),
-        proto.getSendChecksums());
+        proto.getSendChecksums(),
+        (proto.hasCachingStrategy() ?
+            getCachingStrategy(proto.getCachingStrategy()) :
+          CachingStrategy.newDefaultStrategy()));
   }
   }
   
   
   /** Receive OP_WRITE_BLOCK */
   /** Receive OP_WRITE_BLOCK */
@@ -108,7 +121,10 @@ public abstract class Receiver implements DataTransferProtocol {
         proto.getPipelineSize(),
         proto.getPipelineSize(),
         proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
         proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
         proto.getLatestGenerationStamp(),
         proto.getLatestGenerationStamp(),
-        fromProto(proto.getRequestedChecksum()));
+        fromProto(proto.getRequestedChecksum()),
+        (proto.hasCachingStrategy() ?
+            getCachingStrategy(proto.getCachingStrategy()) :
+          CachingStrategy.newDefaultStrategy()));
   }
   }
 
 
   /** Receive {@link Op#TRANSFER_BLOCK} */
   /** Receive {@link Op#TRANSFER_BLOCK} */

+ 20 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java

@@ -35,9 +35,11 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DataChecksum;
 
 
@@ -72,19 +74,32 @@ public class Sender implements DataTransferProtocol {
     out.flush();
     out.flush();
   }
   }
 
 
+  static private CachingStrategyProto getCachingStrategy(CachingStrategy cachingStrategy) {
+    CachingStrategyProto.Builder builder = CachingStrategyProto.newBuilder();
+    if (cachingStrategy.getReadahead() != null) {
+      builder.setReadahead(cachingStrategy.getReadahead().longValue());
+    }
+    if (cachingStrategy.getDropBehind() != null) {
+      builder.setDropBehind(cachingStrategy.getDropBehind().booleanValue());
+    }
+    return builder.build();
+  }
+
   @Override
   @Override
   public void readBlock(final ExtendedBlock blk,
   public void readBlock(final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> blockToken,
       final Token<BlockTokenIdentifier> blockToken,
       final String clientName,
       final String clientName,
       final long blockOffset,
       final long blockOffset,
       final long length,
       final long length,
-      final boolean sendChecksum) throws IOException {
+      final boolean sendChecksum,
+      final CachingStrategy cachingStrategy) throws IOException {
 
 
     OpReadBlockProto proto = OpReadBlockProto.newBuilder()
     OpReadBlockProto proto = OpReadBlockProto.newBuilder()
       .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
       .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
       .setOffset(blockOffset)
       .setOffset(blockOffset)
       .setLen(length)
       .setLen(length)
       .setSendChecksums(sendChecksum)
       .setSendChecksums(sendChecksum)
+      .setCachingStrategy(getCachingStrategy(cachingStrategy))
       .build();
       .build();
 
 
     send(out, Op.READ_BLOCK, proto);
     send(out, Op.READ_BLOCK, proto);
@@ -102,7 +117,8 @@ public class Sender implements DataTransferProtocol {
       final long minBytesRcvd,
       final long minBytesRcvd,
       final long maxBytesRcvd,
       final long maxBytesRcvd,
       final long latestGenerationStamp,
       final long latestGenerationStamp,
-      DataChecksum requestedChecksum) throws IOException {
+      DataChecksum requestedChecksum,
+      final CachingStrategy cachingStrategy) throws IOException {
     ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
     ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
         blk, clientName, blockToken);
         blk, clientName, blockToken);
     
     
@@ -117,7 +133,8 @@ public class Sender implements DataTransferProtocol {
       .setMinBytesRcvd(minBytesRcvd)
       .setMinBytesRcvd(minBytesRcvd)
       .setMaxBytesRcvd(maxBytesRcvd)
       .setMaxBytesRcvd(maxBytesRcvd)
       .setLatestGenerationStamp(latestGenerationStamp)
       .setLatestGenerationStamp(latestGenerationStamp)
-      .setRequestedChecksum(checksumProto);
+      .setRequestedChecksum(checksumProto)
+      .setCachingStrategy(getCachingStrategy(cachingStrategy));
     
     
     if (source != null) {
     if (source != null) {
       proto.setSource(PBHelper.convertDatanodeInfo(source));
       proto.setSource(PBHelper.convertDatanodeInfo(source));

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java

@@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
 import org.apache.hadoop.hdfs.web.resources.DelegationParam;
 import org.apache.hadoop.hdfs.web.resources.DelegationParam;
@@ -217,7 +218,7 @@ public class JspHelper {
         "JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
         "JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
         new DatanodeID(addr.getAddress().getHostAddress(),
         new DatanodeID(addr.getAddress().getHostAddress(),
             addr.getHostName(), poolId, addr.getPort(), 0, 0), null,
             addr.getHostName(), poolId, addr.getPort(), 0, 0), null,
-            null, null, false);
+            null, null, false, CachingStrategy.newDefaultStrategy());
         
         
     final byte[] buf = new byte[amtToRead];
     final byte[] buf = new byte[amtToRead];
     int readOffset = 0;
     int readOffset = 0;

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java

@@ -417,7 +417,7 @@ class BlockPoolSliceScanner {
         adjustThrottler();
         adjustThrottler();
         
         
         blockSender = new BlockSender(block, 0, -1, false, true, true, 
         blockSender = new BlockSender(block, 0, -1, false, true, true, 
-            datanode, null);
+            datanode, null, CachingStrategy.newDropBehind());
 
 
         DataOutputStream out = 
         DataOutputStream out = 
                 new DataOutputStream(new IOUtils.NullOutputStream());
                 new DataOutputStream(new IOUtils.NullOutputStream());

+ 43 - 18
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -53,6 +53,8 @@ import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
 
 
+import com.google.common.annotations.VisibleForTesting;
+
 /** A class that receives a block and writes to its own disk, meanwhile
 /** A class that receives a block and writes to its own disk, meanwhile
  * may copies it to another site. If a throttler is provided,
  * may copies it to another site. If a throttler is provided,
  * streaming throttling is also supported.
  * streaming throttling is also supported.
@@ -61,7 +63,8 @@ class BlockReceiver implements Closeable {
   public static final Log LOG = DataNode.LOG;
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
 
 
-  private static final long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
+  @VisibleForTesting
+  static long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
   
   
   private DataInputStream in = null; // from where data are read
   private DataInputStream in = null; // from where data are read
   private DataChecksum clientChecksum; // checksum used by client
   private DataChecksum clientChecksum; // checksum used by client
@@ -97,8 +100,8 @@ class BlockReceiver implements Closeable {
 
 
   // Cache management state
   // Cache management state
   private boolean dropCacheBehindWrites;
   private boolean dropCacheBehindWrites;
+  private long lastCacheManagementOffset = 0;
   private boolean syncBehindWrites;
   private boolean syncBehindWrites;
-  private long lastCacheDropOffset = 0;
 
 
   /** The client name.  It is empty if a datanode is the client */
   /** The client name.  It is empty if a datanode is the client */
   private final String clientname;
   private final String clientname;
@@ -120,8 +123,8 @@ class BlockReceiver implements Closeable {
       final BlockConstructionStage stage, 
       final BlockConstructionStage stage, 
       final long newGs, final long minBytesRcvd, final long maxBytesRcvd, 
       final long newGs, final long minBytesRcvd, final long maxBytesRcvd, 
       final String clientname, final DatanodeInfo srcDataNode,
       final String clientname, final DatanodeInfo srcDataNode,
-      final DataNode datanode, DataChecksum requestedChecksum)
-      throws IOException {
+      final DataNode datanode, DataChecksum requestedChecksum,
+      CachingStrategy cachingStrategy) throws IOException {
     try{
     try{
       this.block = block;
       this.block = block;
       this.in = in;
       this.in = in;
@@ -146,6 +149,7 @@ class BlockReceiver implements Closeable {
             + "\n  isClient  =" + isClient + ", clientname=" + clientname
             + "\n  isClient  =" + isClient + ", clientname=" + clientname
             + "\n  isDatanode=" + isDatanode + ", srcDataNode=" + srcDataNode
             + "\n  isDatanode=" + isDatanode + ", srcDataNode=" + srcDataNode
             + "\n  inAddr=" + inAddr + ", myAddr=" + myAddr
             + "\n  inAddr=" + inAddr + ", myAddr=" + myAddr
+            + "\n  cachingStrategy = " + cachingStrategy
             );
             );
       }
       }
 
 
@@ -192,7 +196,9 @@ class BlockReceiver implements Closeable {
               " while receiving block " + block + " from " + inAddr);
               " while receiving block " + block + " from " + inAddr);
         }
         }
       }
       }
-      this.dropCacheBehindWrites = datanode.getDnConf().dropCacheBehindWrites;
+      this.dropCacheBehindWrites = (cachingStrategy.getDropBehind() == null) ?
+        datanode.getDnConf().dropCacheBehindWrites :
+          cachingStrategy.getDropBehind();
       this.syncBehindWrites = datanode.getDnConf().syncBehindWrites;
       this.syncBehindWrites = datanode.getDnConf().syncBehindWrites;
       
       
       final boolean isCreate = isDatanode || isTransfer 
       final boolean isCreate = isDatanode || isTransfer 
@@ -598,7 +604,7 @@ class BlockReceiver implements Closeable {
 
 
           datanode.metrics.incrBytesWritten(len);
           datanode.metrics.incrBytesWritten(len);
 
 
-          dropOsCacheBehindWriter(offsetInBlock);
+          manageWriterOsCache(offsetInBlock);
         }
         }
       } catch (IOException iex) {
       } catch (IOException iex) {
         datanode.checkDiskError(iex);
         datanode.checkDiskError(iex);
@@ -620,25 +626,44 @@ class BlockReceiver implements Closeable {
     return lastPacketInBlock?-1:len;
     return lastPacketInBlock?-1:len;
   }
   }
 
 
-  private void dropOsCacheBehindWriter(long offsetInBlock) {
+  private void manageWriterOsCache(long offsetInBlock) {
     try {
     try {
       if (outFd != null &&
       if (outFd != null &&
-          offsetInBlock > lastCacheDropOffset + CACHE_DROP_LAG_BYTES) {
-        long twoWindowsAgo = lastCacheDropOffset - CACHE_DROP_LAG_BYTES;
-        if (twoWindowsAgo > 0 && dropCacheBehindWrites) {
-          NativeIO.POSIX.posixFadviseIfPossible(outFd, 0, lastCacheDropOffset,
-              NativeIO.POSIX.POSIX_FADV_DONTNEED);
-        }
-        
+          offsetInBlock > lastCacheManagementOffset + CACHE_DROP_LAG_BYTES) {
+        //
+        // For SYNC_FILE_RANGE_WRITE, we want to sync from
+        // lastCacheManagementOffset to a position "two windows ago"
+        //
+        //                         <========= sync ===========>
+        // +-----------------------O--------------------------X
+        // start                  last                      curPos
+        // of file                 
+        //
         if (syncBehindWrites) {
         if (syncBehindWrites) {
-          NativeIO.POSIX.syncFileRangeIfPossible(outFd, lastCacheDropOffset, CACHE_DROP_LAG_BYTES,
+          NativeIO.POSIX.syncFileRangeIfPossible(outFd,
+              lastCacheManagementOffset,
+              offsetInBlock - lastCacheManagementOffset,
               NativeIO.POSIX.SYNC_FILE_RANGE_WRITE);
               NativeIO.POSIX.SYNC_FILE_RANGE_WRITE);
         }
         }
-        
-        lastCacheDropOffset += CACHE_DROP_LAG_BYTES;
+        //
+        // For POSIX_FADV_DONTNEED, we want to drop from the beginning 
+        // of the file to a position prior to the current position.
+        //
+        // <=== drop =====> 
+        //                 <---W--->
+        // +--------------+--------O--------------------------X
+        // start        dropPos   last                      curPos
+        // of file             
+        //                     
+        long dropPos = lastCacheManagementOffset - CACHE_DROP_LAG_BYTES;
+        if (dropPos > 0 && dropCacheBehindWrites) {
+          NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
+              outFd, 0, dropPos, NativeIO.POSIX.POSIX_FADV_DONTNEED);
+        }
+        lastCacheManagementOffset = offsetInBlock;
       }
       }
     } catch (Throwable t) {
     } catch (Throwable t) {
-      LOG.warn("Couldn't drop os cache behind writer for " + block, t);
+      LOG.warn("Error managing cache for writer of block " + block, t);
     }
     }
   }
   }
 
 

+ 76 - 29
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

@@ -45,6 +45,7 @@ import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.net.SocketOutputStream;
 import org.apache.hadoop.net.SocketOutputStream;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DataChecksum;
 
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Preconditions;
 
 
 /**
 /**
@@ -141,13 +142,22 @@ class BlockSender implements java.io.Closeable {
 
 
   // Cache-management related fields
   // Cache-management related fields
   private final long readaheadLength;
   private final long readaheadLength;
-  private boolean shouldDropCacheBehindRead;
+
   private ReadaheadRequest curReadahead;
   private ReadaheadRequest curReadahead;
+
+  private final boolean alwaysReadahead;
+  
+  private final boolean dropCacheBehindLargeReads;
+  
+  private final boolean dropCacheBehindAllReads;
+  
   private long lastCacheDropOffset;
   private long lastCacheDropOffset;
-  private static final long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
+  
+  @VisibleForTesting
+  static long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
+  
   /**
   /**
-   * Minimum length of read below which management of the OS
-   * buffer cache is disabled.
+   * See {{@link BlockSender#isLongRead()}
    */
    */
   private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024;
   private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024;
   
   
@@ -167,16 +177,42 @@ class BlockSender implements java.io.Closeable {
    */
    */
   BlockSender(ExtendedBlock block, long startOffset, long length,
   BlockSender(ExtendedBlock block, long startOffset, long length,
               boolean corruptChecksumOk, boolean verifyChecksum,
               boolean corruptChecksumOk, boolean verifyChecksum,
-              boolean sendChecksum,
-              DataNode datanode, String clientTraceFmt)
+              boolean sendChecksum, DataNode datanode, String clientTraceFmt,
+              CachingStrategy cachingStrategy)
       throws IOException {
       throws IOException {
     try {
     try {
       this.block = block;
       this.block = block;
       this.corruptChecksumOk = corruptChecksumOk;
       this.corruptChecksumOk = corruptChecksumOk;
       this.verifyChecksum = verifyChecksum;
       this.verifyChecksum = verifyChecksum;
       this.clientTraceFmt = clientTraceFmt;
       this.clientTraceFmt = clientTraceFmt;
-      this.readaheadLength = datanode.getDnConf().readaheadLength;
-      this.shouldDropCacheBehindRead = datanode.getDnConf().dropCacheBehindReads;
+
+      /*
+       * If the client asked for the cache to be dropped behind all reads,
+       * we honor that.  Otherwise, we use the DataNode defaults.
+       * When using DataNode defaults, we use a heuristic where we only
+       * drop the cache for large reads.
+       */
+      if (cachingStrategy.getDropBehind() == null) {
+        this.dropCacheBehindAllReads = false;
+        this.dropCacheBehindLargeReads =
+            datanode.getDnConf().dropCacheBehindReads;
+      } else {
+        this.dropCacheBehindAllReads =
+            this.dropCacheBehindLargeReads =
+                 cachingStrategy.getDropBehind().booleanValue();
+      }
+      /*
+       * Similarly, if readahead was explicitly requested, we always do it.
+       * Otherwise, we read ahead based on the DataNode settings, and only
+       * when the reads are large.
+       */
+      if (cachingStrategy.getReadahead() == null) {
+        this.alwaysReadahead = false;
+        this.readaheadLength = datanode.getDnConf().readaheadLength;
+      } else {
+        this.alwaysReadahead = true;
+        this.readaheadLength = cachingStrategy.getReadahead().longValue();
+      }
       this.datanode = datanode;
       this.datanode = datanode;
       
       
       if (verifyChecksum) {
       if (verifyChecksum) {
@@ -335,10 +371,11 @@ class BlockSender implements java.io.Closeable {
    */
    */
   @Override
   @Override
   public void close() throws IOException {
   public void close() throws IOException {
-    if (blockInFd != null && shouldDropCacheBehindRead && isLongRead()) {
-      // drop the last few MB of the file from cache
+    if (blockInFd != null &&
+        ((dropCacheBehindAllReads) ||
+         (dropCacheBehindLargeReads && isLongRead()))) {
       try {
       try {
-        NativeIO.POSIX.posixFadviseIfPossible(
+        NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
             blockInFd, lastCacheDropOffset, offset - lastCacheDropOffset,
             blockInFd, lastCacheDropOffset, offset - lastCacheDropOffset,
             NativeIO.POSIX.POSIX_FADV_DONTNEED);
             NativeIO.POSIX.POSIX_FADV_DONTNEED);
       } catch (Exception e) {
       } catch (Exception e) {
@@ -637,7 +674,7 @@ class BlockSender implements java.io.Closeable {
 
 
     if (isLongRead() && blockInFd != null) {
     if (isLongRead() && blockInFd != null) {
       // Advise that this file descriptor will be accessed sequentially.
       // Advise that this file descriptor will be accessed sequentially.
-      NativeIO.POSIX.posixFadviseIfPossible(
+      NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
           blockInFd, 0, 0, NativeIO.POSIX.POSIX_FADV_SEQUENTIAL);
           blockInFd, 0, 0, NativeIO.POSIX.POSIX_FADV_SEQUENTIAL);
     }
     }
     
     
@@ -705,37 +742,47 @@ class BlockSender implements java.io.Closeable {
    * and drop-behind.
    * and drop-behind.
    */
    */
   private void manageOsCache() throws IOException {
   private void manageOsCache() throws IOException {
-    if (!isLongRead() || blockInFd == null) {
-      // don't manage cache manually for short-reads, like
-      // HBase random read workloads.
-      return;
-    }
+    // We can't manage the cache for this block if we don't have a file
+    // descriptor to work with.
+    if (blockInFd == null) return;
 
 
     // Perform readahead if necessary
     // Perform readahead if necessary
-    if (readaheadLength > 0 && datanode.readaheadPool != null) {
+    if ((readaheadLength > 0) && (datanode.readaheadPool != null) &&
+          (alwaysReadahead || isLongRead())) {
       curReadahead = datanode.readaheadPool.readaheadStream(
       curReadahead = datanode.readaheadPool.readaheadStream(
-          clientTraceFmt, blockInFd,
-          offset, readaheadLength, Long.MAX_VALUE,
+          clientTraceFmt, blockInFd, offset, readaheadLength, Long.MAX_VALUE,
           curReadahead);
           curReadahead);
     }
     }
 
 
     // Drop what we've just read from cache, since we aren't
     // Drop what we've just read from cache, since we aren't
     // likely to need it again
     // likely to need it again
-    long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
-    if (shouldDropCacheBehindRead &&
-        offset >= nextCacheDropOffset) {
-      long dropLength = offset - lastCacheDropOffset;
-      if (dropLength >= 1024) {
-        NativeIO.POSIX.posixFadviseIfPossible(blockInFd,
-            lastCacheDropOffset, dropLength,
+    if (dropCacheBehindAllReads ||
+        (dropCacheBehindLargeReads && isLongRead())) {
+      long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
+      if (offset >= nextCacheDropOffset) {
+        long dropLength = offset - lastCacheDropOffset;
+        NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
+            blockInFd, lastCacheDropOffset, dropLength,
             NativeIO.POSIX.POSIX_FADV_DONTNEED);
             NativeIO.POSIX.POSIX_FADV_DONTNEED);
+        lastCacheDropOffset = offset;
       }
       }
-      lastCacheDropOffset += CACHE_DROP_INTERVAL_BYTES;
     }
     }
   }
   }
 
 
+  /**
+   * Returns true if we have done a long enough read for this block to qualify
+   * for the DataNode-wide cache management defaults.  We avoid applying the
+   * cache management defaults to smaller reads because the overhead would be
+   * too high.
+   *
+   * Note that if the client explicitly asked for dropBehind, we will do it
+   * even on short reads.
+   * 
+   * This is also used to determine when to invoke
+   * posix_fadvise(POSIX_FADV_SEQUENTIAL).
+   */
   private boolean isLongRead() {
   private boolean isLongRead() {
-    return (endOffset - offset) > LONG_READ_THRESHOLD_BYTES;
+    return (endOffset - initialOffset) > LONG_READ_THRESHOLD_BYTES;
   }
   }
 
 
   /**
   /**

+ 5 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -1517,6 +1517,7 @@ public class DataNode extends Configured
     final BlockConstructionStage stage;
     final BlockConstructionStage stage;
     final private DatanodeRegistration bpReg;
     final private DatanodeRegistration bpReg;
     final String clientname;
     final String clientname;
+    final CachingStrategy cachingStrategy;
 
 
     /**
     /**
      * Connect to the first item in the target list.  Pass along the 
      * Connect to the first item in the target list.  Pass along the 
@@ -1537,6 +1538,8 @@ public class DataNode extends Configured
       BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
       BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
       bpReg = bpos.bpRegistration;
       bpReg = bpos.bpRegistration;
       this.clientname = clientname;
       this.clientname = clientname;
+      this.cachingStrategy =
+          new CachingStrategy(true, getDnConf().readaheadLength);
     }
     }
 
 
     /**
     /**
@@ -1579,7 +1582,7 @@ public class DataNode extends Configured
             HdfsConstants.SMALL_BUFFER_SIZE));
             HdfsConstants.SMALL_BUFFER_SIZE));
         in = new DataInputStream(unbufIn);
         in = new DataInputStream(unbufIn);
         blockSender = new BlockSender(b, 0, b.getNumBytes(), 
         blockSender = new BlockSender(b, 0, b.getNumBytes(), 
-            false, false, true, DataNode.this, null);
+            false, false, true, DataNode.this, null, cachingStrategy);
         DatanodeInfo srcNode = new DatanodeInfo(bpReg);
         DatanodeInfo srcNode = new DatanodeInfo(bpReg);
 
 
         //
         //
@@ -1592,7 +1595,7 @@ public class DataNode extends Configured
         }
         }
 
 
         new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
         new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
-            stage, 0, 0, 0, 0, blockSender.getChecksum());
+            stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
 
 
         // send data & checksum
         // send data & checksum
         blockSender.sendBlock(out, unbufOut, null);
         blockSender.sendBlock(out, unbufOut, null);

+ 13 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -299,7 +299,8 @@ class DataXceiver extends Receiver implements Runnable {
       final String clientName,
       final String clientName,
       final long blockOffset,
       final long blockOffset,
       final long length,
       final long length,
-      final boolean sendChecksum) throws IOException {
+      final boolean sendChecksum,
+      final CachingStrategy cachingStrategy) throws IOException {
     previousOpClientName = clientName;
     previousOpClientName = clientName;
 
 
     OutputStream baseStream = getOutputStream();
     OutputStream baseStream = getOutputStream();
@@ -324,7 +325,8 @@ class DataXceiver extends Receiver implements Runnable {
     try {
     try {
       try {
       try {
         blockSender = new BlockSender(block, blockOffset, length,
         blockSender = new BlockSender(block, blockOffset, length,
-            true, false, sendChecksum, datanode, clientTraceFmt);
+            true, false, sendChecksum, datanode, clientTraceFmt,
+            cachingStrategy);
       } catch(IOException e) {
       } catch(IOException e) {
         String msg = "opReadBlock " + block + " received exception " + e; 
         String msg = "opReadBlock " + block + " received exception " + e; 
         LOG.info(msg);
         LOG.info(msg);
@@ -393,7 +395,8 @@ class DataXceiver extends Receiver implements Runnable {
       final long minBytesRcvd,
       final long minBytesRcvd,
       final long maxBytesRcvd,
       final long maxBytesRcvd,
       final long latestGenerationStamp,
       final long latestGenerationStamp,
-      DataChecksum requestedChecksum) throws IOException {
+      DataChecksum requestedChecksum,
+      CachingStrategy cachingStrategy) throws IOException {
     previousOpClientName = clientname;
     previousOpClientName = clientname;
     updateCurrentThreadName("Receiving block " + block);
     updateCurrentThreadName("Receiving block " + block);
     final boolean isDatanode = clientname.length() == 0;
     final boolean isDatanode = clientname.length() == 0;
@@ -452,7 +455,8 @@ class DataXceiver extends Receiver implements Runnable {
             peer.getRemoteAddressString(),
             peer.getRemoteAddressString(),
             peer.getLocalAddressString(),
             peer.getLocalAddressString(),
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
-            clientname, srcDataNode, datanode, requestedChecksum);
+            clientname, srcDataNode, datanode, requestedChecksum,
+            cachingStrategy);
       } else {
       } else {
         datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
         datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
       }
       }
@@ -497,7 +501,8 @@ class DataXceiver extends Receiver implements Runnable {
 
 
           new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
           new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
               clientname, targets, srcDataNode, stage, pipelineSize,
               clientname, targets, srcDataNode, stage, pipelineSize,
-              minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum);
+              minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum,
+              cachingStrategy);
 
 
           mirrorOut.flush();
           mirrorOut.flush();
 
 
@@ -715,7 +720,7 @@ class DataXceiver extends Receiver implements Runnable {
     try {
     try {
       // check if the block exists or not
       // check if the block exists or not
       blockSender = new BlockSender(block, 0, -1, false, false, true, datanode, 
       blockSender = new BlockSender(block, 0, -1, false, false, true, datanode, 
-          null);
+          null, CachingStrategy.newDropBehind());
 
 
       // set up response stream
       // set up response stream
       OutputStream baseStream = getOutputStream();
       OutputStream baseStream = getOutputStream();
@@ -846,7 +851,8 @@ class DataXceiver extends Receiver implements Runnable {
       blockReceiver = new BlockReceiver(
       blockReceiver = new BlockReceiver(
           block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
           block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
           proxySock.getLocalSocketAddress().toString(),
           proxySock.getLocalSocketAddress().toString(),
-          null, 0, 0, 0, "", null, datanode, remoteChecksum);
+          null, 0, 0, 0, "", null, datanode, remoteChecksum,
+          CachingStrategy.newDropBehind());
 
 
       // receive a block
       // receive a block
       blockReceiver.receiveBlock(null, null, null, null, 
       blockReceiver.receiveBlock(null, null, null, null, 

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java

@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.NodeBase;
@@ -574,8 +575,8 @@ public class NamenodeFsck {
         blockReader = BlockReaderFactory.newBlockReader(dfs.getConf(),
         blockReader = BlockReaderFactory.newBlockReader(dfs.getConf(),
             file, block, lblock.getBlockToken(), 0, -1, true, "fsck",
             file, block, lblock.getBlockToken(), 0, -1, true, "fsck",
             TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
             TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
-                getDataEncryptionKey()),
-            chosenNode, null, null, null, false);
+                getDataEncryptionKey()), chosenNode, null, null, null, 
+                false, CachingStrategy.newDropBehind());
         
         
       }  catch (IOException ex) {
       }  catch (IOException ex) {
         // Put chosen node into dead list, continue
         // Put chosen node into dead list, continue

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto

@@ -54,11 +54,17 @@ message ClientOperationHeaderProto {
   required string clientName = 2;
   required string clientName = 2;
 }
 }
 
 
+message CachingStrategyProto {
+  optional bool dropBehind = 1;
+  optional int64 readahead = 2;
+}
+
 message OpReadBlockProto {
 message OpReadBlockProto {
   required ClientOperationHeaderProto header = 1;
   required ClientOperationHeaderProto header = 1;
   required uint64 offset = 2;
   required uint64 offset = 2;
   required uint64 len = 3;
   required uint64 len = 3;
   optional bool sendChecksums = 4 [default = true];
   optional bool sendChecksums = 4 [default = true];
+  optional CachingStrategyProto cachingStrategy = 5;
 }
 }
 
 
 
 
@@ -100,6 +106,7 @@ message OpWriteBlockProto {
    * The requested checksum mechanism for this block write.
    * The requested checksum mechanism for this block write.
    */
    */
   required ChecksumProto requestedChecksum = 9;
   required ChecksumProto requestedChecksum = 9;
+  optional CachingStrategyProto cachingStrategy = 10;
 }
 }
   
   
 message OpTransferBlockProto {
 message OpTransferBlockProto {

+ 45 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -1331,6 +1331,51 @@
   </description>
   </description>
 </property>
 </property>
 
 
+<property>
+  <name>dfs.client.cache.drop.behind.writes</name>
+  <value></value>
+  <description>
+    Just like dfs.datanode.drop.cache.behind.writes, this setting causes the
+    page cache to be dropped behind HDFS writes, potentially freeing up more
+    memory for other uses.  Unlike dfs.datanode.drop.cache.behind.writes, this
+    is a client-side setting rather than a setting for the entire datanode.
+    If present, this setting will override the DataNode default.
+
+    If the native libraries are not available to the DataNode, this
+    configuration has no effect.
+  </description>
+</property>
+
+<property>
+  <name>dfs.client.cache.drop.behind.reads</name>
+  <value></value>
+  <description>
+    Just like dfs.datanode.drop.cache.behind.reads, this setting causes the
+    page cache to be dropped behind HDFS reads, potentially freeing up more
+    memory for other uses.  Unlike dfs.datanode.drop.cache.behind.reads, this
+    is a client-side setting rather than a setting for the entire datanode.  If
+    present, this setting will override the DataNode default.
+
+    If the native libraries are not available to the DataNode, this
+    configuration has no effect.
+  </description>
+</property>
+
+<property>
+  <name>dfs.client.cache.readahead</name>
+  <value></value>
+  <description>
+    Just like dfs.datanode.readahead.bytes, this setting causes the datanode to
+    read ahead in the block file using posix_fadvise, potentially decreasing
+    I/O wait times.  Unlike dfs.datanode.readahead.bytes, this is a client-side
+    setting rather than a setting for the entire datanode.  If present, this
+    setting will override the DataNode default.
+
+    If the native libraries are not available to the DataNode, this
+    configuration has no effect.
+  </description>
+</property>
+
 <property>
 <property>
 	<name>dfs.namenode.enable.retrycache</name>
 	<name>dfs.namenode.enable.retrycache</name>
 	<value>true</value>
 	<value>true</value>

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java

@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 
 
@@ -155,7 +156,7 @@ public class BlockReaderTestUtil {
       testBlock.getBlockToken(), 
       testBlock.getBlockToken(), 
       offset, lenToRead,
       offset, lenToRead,
       true, "BlockReaderTestUtil", TcpPeerServer.peerFromSocket(sock),
       true, "BlockReaderTestUtil", TcpPeerServer.peerFromSocket(sock),
-      nodes[0], null, null, null, false);
+      nodes[0], null, null, null, false, CachingStrategy.newDefaultStrategy());
   }
   }
 
 
   /**
   /**

+ 12 - 10
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java

@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumIn
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
@@ -194,7 +195,7 @@ public class TestDataTransferProtocol {
     sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
     sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
         new DatanodeInfo[1], null, stage,
         new DatanodeInfo[1], null, stage,
         0, block.getNumBytes(), block.getNumBytes(), newGS,
         0, block.getNumBytes(), block.getNumBytes(), newGS,
-        DEFAULT_CHECKSUM);
+        DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
     if (eofExcepted) {
     if (eofExcepted) {
       sendResponse(Status.ERROR, null, null, recvOut);
       sendResponse(Status.ERROR, null, null, recvOut);
       sendRecvData(description, true);
       sendRecvData(description, true);
@@ -391,7 +392,7 @@ public class TestDataTransferProtocol {
         new DatanodeInfo[1], null,
         new DatanodeInfo[1], null,
         BlockConstructionStage.PIPELINE_SETUP_CREATE,
         BlockConstructionStage.PIPELINE_SETUP_CREATE,
         0, 0L, 0L, 0L,
         0, 0L, 0L, 0L,
-        badChecksum);
+        badChecksum, CachingStrategy.newDefaultStrategy());
     recvBuf.reset();
     recvBuf.reset();
     sendResponse(Status.ERROR, null, null, recvOut);
     sendResponse(Status.ERROR, null, null, recvOut);
     sendRecvData("wrong bytesPerChecksum while writing", true);
     sendRecvData("wrong bytesPerChecksum while writing", true);
@@ -402,7 +403,7 @@ public class TestDataTransferProtocol {
         BlockTokenSecretManager.DUMMY_TOKEN, "cl",
         BlockTokenSecretManager.DUMMY_TOKEN, "cl",
         new DatanodeInfo[1], null,
         new DatanodeInfo[1], null,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
-        DEFAULT_CHECKSUM);
+        DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
 
 
     PacketHeader hdr = new PacketHeader(
     PacketHeader hdr = new PacketHeader(
       4,     // size of packet
       4,     // size of packet
@@ -425,7 +426,7 @@ public class TestDataTransferProtocol {
         BlockTokenSecretManager.DUMMY_TOKEN, "cl",
         BlockTokenSecretManager.DUMMY_TOKEN, "cl",
         new DatanodeInfo[1], null,
         new DatanodeInfo[1], null,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
-        DEFAULT_CHECKSUM);
+        DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
 
 
     hdr = new PacketHeader(
     hdr = new PacketHeader(
       8,     // size of packet
       8,     // size of packet
@@ -452,21 +453,21 @@ public class TestDataTransferProtocol {
     recvBuf.reset();
     recvBuf.reset();
     blk.setBlockId(blkid-1);
     blk.setBlockId(blkid-1);
     sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
     sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
-        0L, fileLen, true);
+        0L, fileLen, true, CachingStrategy.newDefaultStrategy());
     sendRecvData("Wrong block ID " + newBlockId + " for read", false); 
     sendRecvData("Wrong block ID " + newBlockId + " for read", false); 
 
 
     // negative block start offset -1L
     // negative block start offset -1L
     sendBuf.reset();
     sendBuf.reset();
     blk.setBlockId(blkid);
     blk.setBlockId(blkid);
     sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
     sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
-        -1L, fileLen, true);
+        -1L, fileLen, true, CachingStrategy.newDefaultStrategy());
     sendRecvData("Negative start-offset for read for block " + 
     sendRecvData("Negative start-offset for read for block " + 
                  firstBlock.getBlockId(), false);
                  firstBlock.getBlockId(), false);
 
 
     // bad block start offset
     // bad block start offset
     sendBuf.reset();
     sendBuf.reset();
     sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
     sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
-        fileLen, fileLen, true);
+        fileLen, fileLen, true, CachingStrategy.newDefaultStrategy());
     sendRecvData("Wrong start-offset for reading block " +
     sendRecvData("Wrong start-offset for reading block " +
                  firstBlock.getBlockId(), false);
                  firstBlock.getBlockId(), false);
     
     
@@ -483,7 +484,8 @@ public class TestDataTransferProtocol {
     
     
     sendBuf.reset();
     sendBuf.reset();
     sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
     sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
-        0L, -1L-random.nextInt(oneMil), true);
+        0L, -1L-random.nextInt(oneMil), true,
+        CachingStrategy.newDefaultStrategy());
     sendRecvData("Negative length for reading block " +
     sendRecvData("Negative length for reading block " +
                  firstBlock.getBlockId(), false);
                  firstBlock.getBlockId(), false);
     
     
@@ -496,14 +498,14 @@ public class TestDataTransferProtocol {
         recvOut);
         recvOut);
     sendBuf.reset();
     sendBuf.reset();
     sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
     sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
-        0L, fileLen+1, true);
+        0L, fileLen+1, true, CachingStrategy.newDefaultStrategy());
     sendRecvData("Wrong length for reading block " +
     sendRecvData("Wrong length for reading block " +
                  firstBlock.getBlockId(), false);
                  firstBlock.getBlockId(), false);
     
     
     //At the end of all this, read the file to make sure that succeeds finally.
     //At the end of all this, read the file to make sure that succeeds finally.
     sendBuf.reset();
     sendBuf.reset();
     sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
     sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
-        0L, fileLen, true);
+        0L, fileLen, true, CachingStrategy.newDefaultStrategy());
     readFile(fileSys, file, fileLen);
     readFile(fileSys, file, fileLen);
     } finally {
     } finally {
       cluster.shutdown();
       cluster.shutdown();

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java

@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
 import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
 import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
 import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
@@ -148,7 +149,8 @@ public class TestBlockTokenWithDFS {
       blockReader = BlockReaderFactory.newBlockReader(
       blockReader = BlockReaderFactory.newBlockReader(
           new DFSClient.Conf(conf), file, block, lblock.getBlockToken(), 0, -1,
           new DFSClient.Conf(conf), file, block, lblock.getBlockToken(), 0, -1,
           true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s),
           true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s),
-          nodes[0], null, null, null, false);
+          nodes[0], null, null, null, false,
+          CachingStrategy.newDefaultStrategy());
 
 
     } catch (IOException ex) {
     } catch (IOException ex) {
       if (ex instanceof InvalidBlockTokenException) {
       if (ex instanceof InvalidBlockTokenException) {

+ 369 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java

@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheTracker;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestCachingStrategy {
+  private static final Log LOG = LogFactory.getLog(TestCachingStrategy.class);
+  private static int MAX_TEST_FILE_LEN = 1024 * 1024;
+  private static int WRITE_PACKET_SIZE = DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
+  
+  private final static TestRecordingCacheTracker tracker =
+      new TestRecordingCacheTracker();
+
+  @BeforeClass
+  public static void setupTest() {
+    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
+
+    // Track calls to posix_fadvise.
+    NativeIO.POSIX.cacheTracker = tracker;
+    
+    // Normally, we wait for a few megabytes of data to be read or written 
+    // before dropping the cache.  This is to avoid an excessive number of
+    // JNI calls to the posix_fadvise function.  However, for the purpose
+    // of this test, we want to use small files and see all fadvise calls
+    // happen.
+    BlockSender.CACHE_DROP_INTERVAL_BYTES = 4096;
+    BlockReceiver.CACHE_DROP_LAG_BYTES = 4096;
+  }
+
+  private static class Stats {
+    private final String fileName;
+    private final boolean dropped[] = new boolean[MAX_TEST_FILE_LEN];
+
+    Stats(String fileName) {
+      this.fileName = fileName;
+    }
+    
+    synchronized void fadvise(int offset, int len, int flags) {
+      LOG.debug("got fadvise(offset=" + offset + ", len=" + len +
+          ",flags=" + flags + ")");
+      if (flags == NativeIO.POSIX.POSIX_FADV_DONTNEED) {
+        for (int i = 0; i < (int)len; i++) {
+          dropped[(int)(offset + i)] = true;
+        }
+      }
+    }
+
+    synchronized void assertNotDroppedInRange(int start, int end) {
+      for (int i = start; i < end; i++) {
+        if (dropped[i]) {
+          throw new RuntimeException("in file " + fileName + ", we " +
+              "dropped the cache at offset " + i);
+        }
+      }
+    }
+    
+    synchronized void assertDroppedInRange(int start, int end) {
+      for (int i = start; i < end; i++) {
+        if (!dropped[i]) {
+          throw new RuntimeException("in file " + fileName + ", we " +
+              "did not drop the cache at offset " + i);
+        }
+      }
+    }
+    
+    synchronized void clear() {
+      Arrays.fill(dropped, false);
+    }
+  }
+
+  private static class TestRecordingCacheTracker implements CacheTracker {
+    private final Map<String, Stats> map = new TreeMap<String, Stats>();
+
+    @Override
+    synchronized public void fadvise(String name,
+        long offset, long len, int flags) {
+      if ((len < 0) || (len > Integer.MAX_VALUE)) {
+        throw new RuntimeException("invalid length of " + len +
+            " passed to posixFadviseIfPossible");
+      }
+      if ((offset < 0) || (offset > Integer.MAX_VALUE)) {
+        throw new RuntimeException("invalid offset of " + offset +
+            " passed to posixFadviseIfPossible");
+      }
+      Stats stats = map.get(name);
+      if (stats == null) {
+        stats = new Stats(name);
+        map.put(name, stats);
+      }
+      stats.fadvise((int)offset, (int)len, flags);
+    }
+
+    synchronized void clear() {
+      map.clear();
+    }
+    
+    synchronized Stats getStats(String fileName) {
+      return map.get(fileName);
+    }
+    
+    synchronized public String toString() {
+      StringBuilder bld = new StringBuilder();
+      bld.append("TestRecordingCacheManipulator{");
+      String prefix = "";
+      for (String fileName : map.keySet()) {
+        bld.append(prefix);
+        prefix = ", ";
+        bld.append(fileName);
+      }
+      bld.append("}");
+      return bld.toString();
+    }
+  }
+
+  static void createHdfsFile(FileSystem fs, Path p, long length,
+        Boolean dropBehind) throws Exception {
+    FSDataOutputStream fos = null;
+    try {
+      // create file with replication factor of 1
+      fos = fs.create(p, (short)1);
+      if (dropBehind != null) {
+        fos.setDropBehind(dropBehind);
+      }
+      byte buf[] = new byte[8196];
+      while (length > 0) {
+        int amt = (length > buf.length) ? (int)buf.length : (int)length;
+        fos.write(buf, 0, amt);
+        length -= amt;
+      }
+    } catch (IOException e) {
+      LOG.error("ioexception", e);
+    } finally {
+      if (fos != null) {
+        fos.close();
+      }
+    }
+  }
+  
+  static long readHdfsFile(FileSystem fs, Path p, long length,
+      Boolean dropBehind) throws Exception {
+    FSDataInputStream fis = null;
+    long totalRead = 0;
+    try {
+      fis = fs.open(p);
+      if (dropBehind != null) {
+        fis.setDropBehind(dropBehind);
+      }
+      byte buf[] = new byte[8196];
+      while (length > 0) {
+        int amt = (length > buf.length) ? (int)buf.length : (int)length;
+        int ret = fis.read(buf, 0, amt);
+        if (ret == -1) {
+          return totalRead;
+        }
+        totalRead += ret;
+        length -= ret;
+      }
+    } catch (IOException e) {
+      LOG.error("ioexception", e);
+    } finally {
+      if (fis != null) {
+        fis.close();
+      }
+    }
+    throw new RuntimeException("unreachable");
+  }
+ 
+  @Test(timeout=120000)
+  public void testFadviseAfterWriteThenRead() throws Exception {
+    // start a cluster
+    LOG.info("testFadviseAfterWriteThenRead");
+    tracker.clear();
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+    String TEST_PATH = "/test";
+    int TEST_PATH_LEN = MAX_TEST_FILE_LEN;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
+          .build();
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+
+      // create new file
+      createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, true);
+      // verify that we dropped everything from the cache during file creation.
+      ExtendedBlock block = cluster.getNameNode().getRpcServer().getBlockLocations(
+          TEST_PATH, 0, Long.MAX_VALUE).get(0).getBlock();
+      String fadvisedFileName = MiniDFSCluster.getBlockFile(0, block).getName();
+      Stats stats = tracker.getStats(fadvisedFileName);
+      stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
+      stats.clear();
+      
+      // read file
+      readHdfsFile(fs, new Path(TEST_PATH), Long.MAX_VALUE, true);
+      // verify that we dropped everything from the cache.
+      Assert.assertNotNull(stats);
+      stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /***
+   * Test the scenario where the DataNode defaults to not dropping the cache,
+   * but our client defaults are set.
+   */
+  @Test(timeout=120000)
+  public void testClientDefaults() throws Exception {
+    // start a cluster
+    LOG.info("testClientDefaults");
+    tracker.clear();
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY, false);
+    conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY, false);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS, true);
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, true);
+    MiniDFSCluster cluster = null;
+    String TEST_PATH = "/test";
+    int TEST_PATH_LEN = MAX_TEST_FILE_LEN;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
+          .build();
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+      
+      // create new file
+      createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, null);
+      // verify that we dropped everything from the cache during file creation.
+      ExtendedBlock block = cluster.getNameNode().getRpcServer().getBlockLocations(
+          TEST_PATH, 0, Long.MAX_VALUE).get(0).getBlock();
+      String fadvisedFileName = MiniDFSCluster.getBlockFile(0, block).getName();
+      Stats stats = tracker.getStats(fadvisedFileName);
+      stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
+      stats.clear();
+      
+      // read file
+      readHdfsFile(fs, new Path(TEST_PATH), Long.MAX_VALUE, null);
+      // verify that we dropped everything from the cache.
+      Assert.assertNotNull(stats);
+      stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  @Test(timeout=120000)
+  public void testFadviseSkippedForSmallReads() throws Exception {
+    // start a cluster
+    LOG.info("testFadviseSkippedForSmallReads");
+    tracker.clear();
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY, true);
+    conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY, true);
+    MiniDFSCluster cluster = null;
+    String TEST_PATH = "/test";
+    int TEST_PATH_LEN = MAX_TEST_FILE_LEN;
+    FSDataInputStream fis = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
+          .build();
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+
+      // create new file
+      createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, null);
+      // Since the DataNode was configured with drop-behind, and we didn't
+      // specify any policy, we should have done drop-behind.
+      ExtendedBlock block = cluster.getNameNode().getRpcServer().getBlockLocations(
+          TEST_PATH, 0, Long.MAX_VALUE).get(0).getBlock();
+      String fadvisedFileName = MiniDFSCluster.getBlockFile(0, block).getName();
+      Stats stats = tracker.getStats(fadvisedFileName);
+      stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
+      stats.clear();
+      stats.assertNotDroppedInRange(0, TEST_PATH_LEN);
+
+      // read file
+      fis = fs.open(new Path(TEST_PATH));
+      byte buf[] = new byte[17];
+      fis.readFully(4096, buf, 0, buf.length);
+
+      // we should not have dropped anything because of the small read.
+      stats = tracker.getStats(fadvisedFileName);
+      stats.assertNotDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
+    } finally {
+      IOUtils.cleanup(null, fis);
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+  
+  @Test(timeout=120000)
+  public void testNoFadviseAfterWriteThenRead() throws Exception {
+    // start a cluster
+    LOG.info("testNoFadviseAfterWriteThenRead");
+    tracker.clear();
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+    String TEST_PATH = "/test";
+    int TEST_PATH_LEN = MAX_TEST_FILE_LEN;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
+          .build();
+      cluster.waitActive();
+      FileSystem fs = cluster.getFileSystem();
+
+      // create new file
+      createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, false);
+      // verify that we did not drop everything from the cache during file creation.
+      ExtendedBlock block = cluster.getNameNode().getRpcServer().getBlockLocations(
+          TEST_PATH, 0, Long.MAX_VALUE).get(0).getBlock();
+      String fadvisedFileName = MiniDFSCluster.getBlockFile(0, block).getName();
+      Stats stats = tracker.getStats(fadvisedFileName);
+      Assert.assertNull(stats);
+      
+      // read file
+      readHdfsFile(fs, new Path(TEST_PATH), Long.MAX_VALUE, false);
+      // verify that we dropped everything from the cache.
+      Assert.assertNull(stats);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+}

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

@@ -287,7 +287,8 @@ public class TestDataNodeVolumeFailure {
     BlockReader blockReader =
     BlockReader blockReader =
       BlockReaderFactory.newBlockReader(new DFSClient.Conf(conf), file, block,
       BlockReaderFactory.newBlockReader(new DFSClient.Conf(conf), file, block,
         lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure",
         lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure",
-        TcpPeerServer.peerFromSocket(s), datanode, null, null, null, false);
+        TcpPeerServer.peerFromSocket(s), datanode, null, null, null, false,
+        CachingStrategy.newDefaultStrategy());
     blockReader.close();
     blockReader.close();
   }
   }
   
   

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java

@@ -148,7 +148,7 @@ public class TestDiskError {
         BlockTokenSecretManager.DUMMY_TOKEN, "",
         BlockTokenSecretManager.DUMMY_TOKEN, "",
         new DatanodeInfo[0], null,
         new DatanodeInfo[0], null,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
-        checksum);
+        checksum, CachingStrategy.newDefaultStrategy());
     out.flush();
     out.flush();
 
 
     // close the connection before sending the content of the block
     // close the connection before sending the content of the block

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java

@@ -69,7 +69,7 @@ public class FadvisedChunkedFile extends ChunkedFile {
     }
     }
     if (manageOsCache && getEndOffset() - getStartOffset() > 0) {
     if (manageOsCache && getEndOffset() - getStartOffset() > 0) {
       try {
       try {
-        NativeIO.POSIX.posixFadviseIfPossible(
+        NativeIO.POSIX.posixFadviseIfPossible(identifier,
             fd,
             fd,
             getStartOffset(), getEndOffset() - getStartOffset(),
             getStartOffset(), getEndOffset() - getStartOffset(),
             NativeIO.POSIX.POSIX_FADV_DONTNEED);
             NativeIO.POSIX.POSIX_FADV_DONTNEED);

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java

@@ -71,7 +71,7 @@ public class FadvisedFileRegion extends DefaultFileRegion {
     }
     }
     if (manageOsCache && getCount() > 0) {
     if (manageOsCache && getCount() > 0) {
       try {
       try {
-        NativeIO.POSIX.posixFadviseIfPossible(
+        NativeIO.POSIX.posixFadviseIfPossible(identifier,
            fd, getPosition(), getCount(),
            fd, getPosition(), getCount(),
            NativeIO.POSIX.POSIX_FADV_DONTNEED);
            NativeIO.POSIX.POSIX_FADV_DONTNEED);
       } catch (Throwable t) {
       } catch (Throwable t) {