Browse Source

HDFS-5050. Add DataNode support for mlock and munlock (contributed by Andrew Wang)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4949@1517106 13f79535-47bb-0310-9956-ffa450edef68
Colin McCabe 12 years ago
parent
commit
b992219fa1
17 changed files with 1027 additions and 2 deletions
  1. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt
  2. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  3. 8 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
  4. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
  5. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
  6. 22 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
  7. 240 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
  8. 108 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
  9. 35 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
  10. 249 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
  11. 15 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java
  12. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
  13. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
  14. 23 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
  15. 11 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/LogVerificationAppender.java
  16. 32 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
  17. 266 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt

@@ -15,6 +15,9 @@ HDFS-4949 (Unreleased)
     HDFS-5052.  Add cacheRequest/uncacheRequest support to NameNode.
     (contributed by Colin Patrick McCabe)
 
+    HDFS-5050.  Add DataNode support for mlock and munlock
+    (Andrew Wang via Colin Patrick McCabe)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -100,6 +100,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final boolean DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT = false;
   public static final String  DFS_DATANODE_MAX_LOCKED_MEMORY_KEY = "dfs.datanode.max.locked.memory";
   public static final long    DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT = 0;
+  public static final String  DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY = "dfs.datanode.fsdatasetcache.max.threads.per.volume";
+  public static final int     DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4;
 
   public static final String  DFS_NAMENODE_HTTP_PORT_KEY = "dfs.http.port";
   public static final int     DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java

@@ -549,6 +549,14 @@ class BPOfferService {
       }
       dn.metrics.incrBlocksRemoved(toDelete.length);
       break;
+    case DatanodeProtocol.DNA_CACHE:
+      LOG.info("DatanodeCommand action: DNA_CACHE");
+      dn.getFSDataset().cache(bcmd.getBlockPoolId(), bcmd.getBlocks());
+      break;
+    case DatanodeProtocol.DNA_UNCACHE:
+      LOG.info("DatanodeCommand action: DNA_UNCACHE");
+      dn.getFSDataset().uncache(bcmd.getBlockPoolId(), bcmd.getBlocks());
+      break;
     case DatanodeProtocol.DNA_SHUTDOWN:
       // TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command
       // See HDFS-2987.

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

@@ -449,6 +449,10 @@ class BPServiceActor implements Runnable {
     long startTime = Time.monotonicNow();
     if (startTime - lastCacheReport > dnConf.cacheReportInterval) {
       // TODO: Implement me!
+      String bpid = bpos.getBlockPoolId();
+      BlockListAsLongs blocks = dn.getFSDataset().getCacheReport(bpid);
+      cmd = bpNamenode.cacheReport(bpRegistration, bpid,
+          blocks.getBlockListAsLongs());
     }
     return cmd;
   }

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java

@@ -160,4 +160,8 @@ public class DNConf {
   public long getXceiverStopTimeout() {
     return xceiverStopTimeout;
   }
+
+  public long getMaxLockedMemory() {
+    return maxLockedMemory;
+  }
 }

+ 22 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java

@@ -269,6 +269,14 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    */
   public BlockListAsLongs getBlockReport(String bpid);
 
+  /**
+   * Returns the cache report - the full list of cached blocks of a
+   * block pool
+   * @param bpid Block Pool Id
+   * @return - the cache report - the full list of cached blocks
+   */
+  public BlockListAsLongs getCacheReport(String bpid);
+
   /** Does the dataset contain the block? */
   public boolean contains(ExtendedBlock block);
 
@@ -294,6 +302,20 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    */
   public void invalidate(String bpid, Block invalidBlks[]) throws IOException;
 
+  /**
+   * Caches the specified blocks
+   * @param bpid Block pool id
+   * @param cacheBlks - block to cache
+   */
+  public void cache(String bpid, Block[] cacheBlks);
+
+  /**
+   * Uncaches the specified blocks
+   * @param bpid Block pool id
+   * @param uncacheBlks - blocks to uncache
+   */
+  public void uncache(String bpid, Block[] uncacheBlks);
+
     /**
      * Check if all the data directories are healthy
      * @throws DiskErrorException

+ 240 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java

@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Manages caching for an FsDatasetImpl by using the mmap(2) and mlock(2)
+ * system calls to lock blocks into memory. Block checksums are verified upon
+ * entry into the cache.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class FsDatasetCache {
+
+  private static final Log LOG = LogFactory.getLog(FsDatasetCache.class);
+
+  /**
+   * Map of cached blocks
+   */
+  private final ConcurrentMap<Long, MappableBlock> cachedBlocks;
+
+  private final FsDatasetImpl dataset;
+  /**
+   * Number of cached bytes
+   */
+  private AtomicLong usedBytes;
+  /**
+   * Total cache capacity in bytes
+   */
+  private final long maxBytes;
+
+  public FsDatasetCache(FsDatasetImpl dataset) {
+    this.dataset = dataset;
+    this.cachedBlocks = new ConcurrentHashMap<Long, MappableBlock>();
+    this.usedBytes = new AtomicLong(0);
+    this.maxBytes = dataset.datanode.getDnConf().getMaxLockedMemory();
+  }
+
+  /**
+   * @return if the block is cached
+   */
+  boolean isCached(String bpid, Block block) {
+    MappableBlock mapBlock = cachedBlocks.get(block.getBlockId());
+    if (mapBlock != null) {
+      return mapBlock.getBlockPoolId().equals(bpid);
+    }
+    return false;
+  }
+
+  /**
+   * @return List of cached blocks suitable for translation into a
+   * {@link BlockListAsLongs} for a cache report.
+   */
+  List<Block> getCachedBlocks(String bpid) {
+    List<Block> blocks = new ArrayList<Block>();
+    MappableBlock mapBlock = null;
+    // ConcurrentHashMap iteration doesn't see latest updates, which is okay
+    for (Iterator<MappableBlock> it = cachedBlocks.values().iterator();
+        it.hasNext(); mapBlock = it.next()) {
+      if (mapBlock.getBlockPoolId().equals(bpid)) {
+        blocks.add(mapBlock.getBlock());
+      }
+    }
+    return blocks;
+  }
+
+  /**
+   * Asynchronously attempts to cache a block. This is subject to the
+   * configured maximum locked memory limit.
+   * 
+   * @param block block to cache
+   * @param volume volume of the block
+   * @param blockIn stream of the block's data file
+   * @param metaIn stream of the block's meta file
+   */
+  void cacheBlock(String bpid, Block block, FsVolumeImpl volume,
+      FileInputStream blockIn, FileInputStream metaIn) {
+    if (isCached(bpid, block)) {
+      return;
+    }
+    MappableBlock mapBlock = null;
+    try {
+      mapBlock = new MappableBlock(bpid, block, volume, blockIn, metaIn);
+    } catch (IOException e) {
+      LOG.warn("Failed to cache replica " + block + ": Could not instantiate"
+          + " MappableBlock", e);
+      IOUtils.closeQuietly(blockIn);
+      IOUtils.closeQuietly(metaIn);
+      return;
+    }
+    // Check if there's sufficient cache capacity
+    boolean success = false;
+    long bytes = mapBlock.getNumBytes();
+    long used = usedBytes.get();
+    while (used+bytes < maxBytes) {
+      if (usedBytes.compareAndSet(used, used+bytes)) {
+        success = true;
+        break;
+      }
+      used = usedBytes.get();
+    }
+    if (!success) {
+      LOG.warn(String.format(
+          "Failed to cache replica %s: %s exceeded (%d + %d > %d)",
+          mapBlock.getBlock().toString(),
+          DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+          used, bytes, maxBytes));
+      mapBlock.close();
+      return;
+    }
+    // Submit it to the worker pool to be cached
+    volume.getExecutor().execute(new WorkerTask(mapBlock));
+  }
+
+  /**
+   * Uncaches a block if it is cached.
+   * @param block to uncache
+   */
+  void uncacheBlock(String bpid, Block block) {
+    MappableBlock mapBlock = cachedBlocks.get(block.getBlockId());
+    if (mapBlock != null &&
+        mapBlock.getBlockPoolId().equals(bpid) &&
+        mapBlock.getBlock().equals(block)) {
+      mapBlock.close();
+      cachedBlocks.remove(mapBlock);
+      long bytes = mapBlock.getNumBytes();
+      long used = usedBytes.get();
+      while (!usedBytes.compareAndSet(used, used - bytes)) {
+        used = usedBytes.get();
+      }
+    }
+  }
+
+  /**
+   * Background worker that mmaps, mlocks, and checksums a block
+   */
+  private class WorkerTask implements Runnable {
+
+    private MappableBlock block;
+    WorkerTask(MappableBlock block) {
+      this.block = block;
+    }
+
+    @Override
+    public void run() {
+      boolean success = false;
+      try {
+        block.map();
+        block.lock();
+        block.verifyChecksum();
+        success = true;
+      } catch (ChecksumException e) {
+        // Exception message is bogus since this wasn't caused by a file read
+        LOG.warn("Failed to cache block " + block.getBlock() + ": Checksum "
+            + "verification failed.");
+      } catch (IOException e) {
+        LOG.warn("Failed to cache block " + block.getBlock() + ": IOException",
+            e);
+      }
+      // If we failed or the block became uncacheable in the meantime,
+      // clean up and return the reserved cache allocation 
+      if (!success || 
+          !dataset.validToCache(block.getBlockPoolId(), block.getBlock())) {
+        block.close();
+        long used = usedBytes.get();
+        while (!usedBytes.compareAndSet(used, used-block.getNumBytes())) {
+          used = usedBytes.get();
+        }
+      } else {
+        cachedBlocks.put(block.getBlock().getBlockId(), block);
+      }
+    }
+  }
+
+  // Stats related methods for FsDatasetMBean
+
+  public long getCacheUsed() {
+    return usedBytes.get();
+  }
+
+  public long getCacheCapacity() {
+    return maxBytes;
+  }
+
+  public long getCacheRemaining() {
+    return maxBytes - usedBytes.get();
+  }
+}

+ 108 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -37,6 +37,7 @@ import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -170,6 +171,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   final FsVolumeList volumes;
   final ReplicaMap volumeMap;
   final FsDatasetAsyncDiskService asyncDiskService;
+  final FsDatasetCache cacheManager;
   private final int validVolsRequired;
 
   // Used for synchronizing access to usage stats
@@ -228,6 +230,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       roots[idx] = storage.getStorageDir(idx).getCurrentDir();
     }
     asyncDiskService = new FsDatasetAsyncDiskService(datanode, roots);
+    cacheManager = new FsDatasetCache(this);
     registerMBean(storage.getStorageID());
   }
 
@@ -287,6 +290,30 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     return volumes.numberOfFailedVolumes();
   }
 
+  /**
+   * Returns the total cache used by the datanode (in bytes).
+   */
+  @Override // FSDatasetMBean
+  public long getCacheUsed() {
+    return cacheManager.getCacheUsed();
+  }
+
+  /**
+   * Returns the total cache capacity of the datanode (in bytes).
+   */
+  @Override // FSDatasetMBean
+  public long getCacheCapacity() {
+    return cacheManager.getCacheCapacity();
+  }
+
+  /**
+   * Returns the total amount of cache remaining (in bytes).
+   */
+  @Override // FSDatasetMBean
+  public long getCacheRemaining() {
+    return cacheManager.getCacheRemaining();
+  }
+
   /**
    * Find the block's on-disk length
    */
@@ -534,6 +561,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   private synchronized ReplicaBeingWritten append(String bpid,
       FinalizedReplica replicaInfo, long newGS, long estimateBlockLen)
       throws IOException {
+    // uncache the block
+    cacheManager.uncacheBlock(bpid, replicaInfo);
     // unlink the finalized replica
     replicaInfo.unlinkBlock(1);
     
@@ -1001,6 +1030,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
   }
 
+  @Override // FsDatasetSpi
+  public BlockListAsLongs getCacheReport(String bpid) {
+    return new BlockListAsLongs(cacheManager.getCachedBlocks(bpid), null);
+  }
+
   /**
    * Get the list of finalized blocks from in-memory blockmap for a block pool.
    */
@@ -1143,6 +1177,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         volumeMap.remove(bpid, invalidBlks[i]);
       }
 
+      // Uncache the block synchronously
+      cacheManager.uncacheBlock(bpid, invalidBlks[i]);
       // Delete the block asynchronously to make sure we can do it fast enough
       asyncDiskService.deleteAsync(v, f,
           FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
@@ -1153,6 +1189,78 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
   }
 
+  synchronized boolean validToCache(String bpid, Block blk) {
+    ReplicaInfo info = volumeMap.get(bpid, blk);
+    if (info == null) {
+      LOG.warn("Failed to cache replica " + blk + ": ReplicaInfo not found.");
+      return false;
+    }
+    FsVolumeImpl volume = (FsVolumeImpl)info.getVolume();
+    if (volume == null) {
+      LOG.warn("Failed to cache replica " + blk + ": Volume not found.");
+      return false;
+    }
+    if (info.getState() != ReplicaState.FINALIZED) {
+      LOG.warn("Failed to cache replica " + blk + ": Replica is not"
+          + " finalized.");
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Asynchronously attempts to cache a single block via {@link FsDatasetCache}.
+   */
+  private void cacheBlock(String bpid, Block blk) {
+    ReplicaInfo info;
+    FsVolumeImpl volume;
+    synchronized (this) {
+      if (!validToCache(bpid, blk)) {
+        return;
+      }
+      info = volumeMap.get(bpid, blk);
+      volume = (FsVolumeImpl)info.getVolume();
+    }
+    // Try to open block and meta streams
+    FileInputStream blockIn = null;
+    FileInputStream metaIn = null;
+    boolean success = false;
+    try {
+      ExtendedBlock extBlk = new ExtendedBlock(bpid, blk);
+      blockIn = (FileInputStream)getBlockInputStream(extBlk, 0);
+      metaIn = (FileInputStream)getMetaDataInputStream(extBlk)
+          .getWrappedStream();
+      success = true;
+    } catch (ClassCastException e) {
+      LOG.warn("Failed to cache replica " + blk + ": Underlying blocks"
+          + " are not backed by files.", e);
+    } catch (IOException e) {
+      LOG.warn("Failed to cache replica " + blk + ": IOException while"
+          + " trying to open block or meta files.", e);
+    }
+    if (!success) {
+      IOUtils.closeQuietly(blockIn);
+      IOUtils.closeQuietly(metaIn);
+      return;
+    }
+    cacheManager.cacheBlock(bpid, blk, volume, blockIn, metaIn);
+  }
+
+  @Override // FsDatasetSpi
+  public void cache(String bpid, Block[] cacheBlks) {
+    for (int i=0; i<cacheBlks.length; i++) {
+      cacheBlock(bpid, cacheBlks[i]);
+    }
+  }
+
+  @Override // FsDatasetSpi
+  public void uncache(String bpid, Block[] uncacheBlks) {
+    for (int i=0; i<uncacheBlks.length; i++) {
+      Block blk = uncacheBlks[i];
+      cacheManager.uncacheBlock(bpid, blk);
+    }
+  }
+
   @Override // FsDatasetSpi
   public synchronized boolean contains(final ExtendedBlock block) {
     final long blockId = block.getLocalBlock().getBlockId();

+ 35 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java

@@ -18,11 +18,17 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -34,6 +40,8 @@ import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * The underlying volume used to store replica.
  * 
@@ -48,6 +56,13 @@ class FsVolumeImpl implements FsVolumeSpi {
   private final File currentDir;    // <StorageDirectory>/current
   private final DF usage;           
   private final long reserved;
+  /**
+   * Per-volume worker pool that processes new blocks to cache.
+   * The maximum number of workers per volume is bounded (configurable via
+   * dfs.datanode.fsdatasetcache.max.threads.per.volume) to limit resource
+   * contention.
+   */
+  private final ThreadPoolExecutor cacheExecutor;
   
   FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir,
       Configuration conf) throws IOException {
@@ -59,6 +74,20 @@ class FsVolumeImpl implements FsVolumeSpi {
     this.currentDir = currentDir; 
     File parent = currentDir.getParentFile();
     this.usage = new DF(parent, conf);
+    final int maxNumThreads = dataset.datanode.getConf().getInt(
+        DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY,
+        DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT
+        );
+    ThreadFactory workerFactory = new ThreadFactoryBuilder()
+        .setDaemon(true)
+        .setNameFormat("FsVolumeImplWorker-" + parent.toString() + "-%d")
+        .build();
+    cacheExecutor = new ThreadPoolExecutor(
+        1, maxNumThreads,
+        60, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<Runnable>(),
+        workerFactory);
+    cacheExecutor.allowCoreThreadTimeOut(true);
   }
   
   File getCurrentDir() {
@@ -166,7 +195,11 @@ class FsVolumeImpl implements FsVolumeSpi {
   File addBlock(String bpid, Block b, File f) throws IOException {
     return getBlockPoolSlice(bpid).addBlock(b, f);
   }
-    
+
+  Executor getExecutor() {
+    return cacheExecutor;
+  }
+
   void checkDirs() throws DiskErrorException {
     // TODO:FEDERATION valid synchronization
     for(BlockPoolSlice s : bpSlices.values()) {
@@ -210,6 +243,7 @@ class FsVolumeImpl implements FsVolumeSpi {
   }
 
   void shutdown() {
+    cacheExecutor.shutdown();
     Set<Entry<String, BlockPoolSlice>> set = bpSlices.entrySet();
     for (Entry<String, BlockPoolSlice> entry : set) {
       entry.getValue().shutdown();

+ 249 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java

@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.io.BufferedInputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.util.DataChecksum;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Low-level wrapper for a Block and its backing files that provides mmap,
+ * mlock, and checksum verification operations.
+ * 
+ * This could be a private class of FsDatasetCache, not meant for other users.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class MappableBlock implements Closeable {
+
+  private final String bpid;
+  private final Block block;
+  private final FsVolumeImpl volume;
+
+  private final FileInputStream blockIn;
+  private final FileInputStream metaIn;
+  private final FileChannel blockChannel;
+  private final FileChannel metaChannel;
+  private final long blockSize;
+
+  private boolean isMapped;
+  private boolean isLocked;
+  private boolean isChecksummed;
+
+  private MappedByteBuffer blockMapped = null;
+
+  public MappableBlock(String bpid, Block blk, FsVolumeImpl volume,
+      FileInputStream blockIn, FileInputStream metaIn) throws IOException {
+    this.bpid = bpid;
+    this.block = blk;
+    this.volume = volume;
+
+    this.blockIn = blockIn;
+    this.metaIn = metaIn;
+    this.blockChannel = blockIn.getChannel();
+    this.metaChannel = metaIn.getChannel();
+    this.blockSize = blockChannel.size();
+
+    this.isMapped = false;
+    this.isLocked = false;
+    this.isChecksummed = false;
+  }
+
+  public String getBlockPoolId() {
+    return bpid;
+  }
+
+  public Block getBlock() {
+    return block;
+  }
+
+  public FsVolumeImpl getVolume() {
+    return volume;
+  }
+
+  public boolean isMapped() {
+    return isMapped;
+  }
+
+  public boolean isLocked() {
+    return isLocked;
+  }
+
+  public boolean isChecksummed() {
+    return isChecksummed;
+  }
+
+  /**
+   * Returns the number of bytes on disk for the block file
+   */
+  public long getNumBytes() {
+    return blockSize;
+  }
+
+  /**
+   * Maps the block into memory. See mmap(2).
+   */
+  public void map() throws IOException {
+    if (isMapped) {
+      return;
+    }
+    blockMapped = blockChannel.map(MapMode.READ_ONLY, 0, blockSize);
+    isMapped = true;
+  }
+
+  /**
+   * Unmaps the block from memory. See munmap(2).
+   */
+  public void unmap() {
+    if (!isMapped) {
+      return;
+    }
+    if (blockMapped instanceof sun.nio.ch.DirectBuffer) {
+      sun.misc.Cleaner cleaner =
+          ((sun.nio.ch.DirectBuffer)blockMapped).cleaner();
+      cleaner.clean();
+    }
+    isMapped = false;
+    isLocked = false;
+    isChecksummed = false;
+  }
+
+  /**
+   * Locks the block into memory. This prevents the block from being paged out.
+   * See mlock(2).
+   */
+  public void lock() throws IOException {
+    Preconditions.checkArgument(isMapped,
+        "Block must be mapped before it can be locked!");
+    if (isLocked) {
+      return;
+    }
+    NativeIO.POSIX.mlock(blockMapped, blockSize);
+    isLocked = true;
+  }
+
+  /**
+   * Unlocks the block from memory, allowing it to be paged out. See munlock(2).
+   */
+  public void unlock() throws IOException {
+    if (!isLocked || !isMapped) {
+      return;
+    }
+    NativeIO.POSIX.munlock(blockMapped, blockSize);
+    isLocked = false;
+    isChecksummed = false;
+  }
+
+  /**
+   * Reads bytes into a buffer until EOF or the buffer's limit is reached
+   */
+  private int fillBuffer(FileChannel channel, ByteBuffer buf)
+      throws IOException {
+    int bytesRead = channel.read(buf);
+    if (bytesRead < 0) {
+      //EOF
+      return bytesRead;
+    }
+    while (buf.remaining() > 0) {
+      int n = channel.read(buf);
+      if (n < 0) {
+        //EOF
+        return bytesRead;
+      }
+      bytesRead += n;
+    }
+    return bytesRead;
+  }
+
+  /**
+   * Verifies the block's checksum. This is an I/O intensive operation.
+   * @return if the block was successfully checksummed.
+   */
+  public void verifyChecksum() throws IOException, ChecksumException {
+    Preconditions.checkArgument(isLocked && isMapped,
+        "Block must be mapped and locked before checksum verification!");
+    // skip if checksum has already been successfully verified
+    if (isChecksummed) {
+      return;
+    }
+    // Verify the checksum from the block's meta file
+    // Get the DataChecksum from the meta file header
+    metaChannel.position(0);
+    BlockMetadataHeader header =
+        BlockMetadataHeader.readHeader(new DataInputStream(
+            new BufferedInputStream(metaIn, BlockMetadataHeader
+                .getHeaderSize())));
+    DataChecksum checksum = header.getChecksum();
+    final int bytesPerChecksum = checksum.getBytesPerChecksum();
+    final int checksumSize = checksum.getChecksumSize();
+    final int numChunks = (8*1024*1024) / bytesPerChecksum;
+    ByteBuffer blockBuf = ByteBuffer.allocate(numChunks*bytesPerChecksum);
+    ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks*checksumSize);
+    // Verify the checksum
+    int bytesVerified = 0;
+    while (bytesVerified < blockChannel.size()) {
+      Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
+          "Unexpected partial chunk before EOF");
+      assert bytesVerified % bytesPerChecksum == 0;
+      int bytesRead = fillBuffer(blockChannel, blockBuf);
+      if (bytesRead == -1) {
+        throw new IOException("Premature EOF");
+      }
+      blockBuf.flip();
+      // Number of read chunks, including partial chunk at end
+      int chunks = (bytesRead+bytesPerChecksum-1) / bytesPerChecksum;
+      checksumBuf.limit(chunks*bytesPerChecksum);
+      fillBuffer(metaChannel, checksumBuf);
+      checksumBuf.flip();
+      checksum.verifyChunkedSums(blockBuf, checksumBuf, block.getBlockName(),
+          bytesVerified);
+      // Success
+      bytesVerified += bytesRead;
+      blockBuf.clear();
+      checksumBuf.clear();
+    }
+    isChecksummed = true;
+    // Can close the backing file since everything is safely in memory
+    blockChannel.close();
+  }
+
+  @Override
+  public void close() {
+    unmap();
+    IOUtils.closeQuietly(blockIn);
+    IOUtils.closeQuietly(metaIn);
+  }
+}

+ 15 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java

@@ -77,4 +77,19 @@ public interface FSDatasetMBean {
    * @return The number of failed volumes in the datanode.
    */
   public int getNumFailedVolumes();
+
+  /**
+   * Returns the total cache used by the datanode (in bytes).
+   */
+  public long getCacheUsed();
+
+  /**
+   * Returns the total cache capacity of the datanode (in bytes).
+   */
+  public long getCacheCapacity();
+
+  /**
+   * Returns the total amount of cache remaining (in bytes).
+   */
+  public long getCacheRemaining();
 }

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java

@@ -74,6 +74,8 @@ public interface DatanodeProtocol {
   final static int DNA_RECOVERBLOCK = 6;  // request a block recovery
   final static int DNA_ACCESSKEYUPDATE = 7;  // update access key
   final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth
+  final static int DNA_CACHE = 9;      // cache blocks
+  final static int DNA_UNCACHE = 10;   // uncache blocks
 
   /** 
    * Register Datanode.

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto

@@ -101,7 +101,9 @@ message BlockCommandProto {
   enum Action {  
     TRANSFER = 1;   // Transfer blocks to another datanode
     INVALIDATE = 2; // Invalidate blocks
-    SHUTDOWN = 3; // Shutdown the datanode
+    SHUTDOWN = 3;   // Shutdown the datanode
+    CACHE = 4;      // Cache blocks on the datanode
+    UNCACHE = 5;    // Uncache blocks on the datanode
   }
   required Action action = 1;
   required string blockPoolId = 2;

+ 23 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -1419,4 +1419,27 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.datanode.max.locked.memory</name>
+  <value>0</value>
+  <description>
+    The amount of memory in bytes to use for caching of block replicas in
+    memory on the datanode. The datanode's maximum locked memory soft ulimit
+    (RLIMIT_MEMLOCK) must be set to at least this value, else the datanode
+    will abort on startup.
+
+    By default, this parameter set to 0, which disables in-memory caching.
+  </description>
+</property>
+
+<property>
+  <name>dfs.datanode.fsdatasetcache.max.threads.per.volume</name>
+  <value>4</value>
+  <description>
+    The maximum number of threads per volume to use for caching new data
+    on the datanode. These threads consume both I/O and CPU. This can affect
+    normal datanode operations.
+  </description>
+</property>
+
 </configuration>

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/LogVerificationAppender.java

@@ -61,4 +61,15 @@ public class LogVerificationAppender extends AppenderSkeleton {
     }
     return count;
   }
+
+  public int countLinesWithMessage(final String text) {
+    int count = 0;
+    for (LoggingEvent e: getLog()) {
+      String msg = e.getRenderedMessage();
+      if (msg != null && msg.contains(text)) {
+        count++;
+      }
+    }
+    return count;
+  }
 }

+ 32 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -465,6 +465,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     return new BlockListAsLongs(blocks, null);
   }
 
+  @Override // FsDatasetSpi
+  public BlockListAsLongs getCacheReport(String bpid) {
+    return new BlockListAsLongs();
+  }
+
   @Override // FSDatasetMBean
   public long getCapacity() {
     return storage.getCapacity();
@@ -490,6 +495,21 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     return storage.getNumFailedVolumes();
   }
 
+  @Override // FSDatasetMBean
+  public long getCacheUsed() {
+    return 0l;
+  }
+
+  @Override // FSDatasetMBean
+  public long getCacheCapacity() {
+    return 0l;
+  }
+
+  @Override // FSDatasetMBean
+  public long getCacheRemaining() {
+    return 0l;
+  }
+
   @Override // FsDatasetSpi
   public synchronized long getLength(ExtendedBlock b) throws IOException {
     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
@@ -559,6 +579,18 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     }
   }
 
+  @Override // FSDatasetSpi
+  public void cache(String bpid, Block[] cacheBlks) {
+    throw new UnsupportedOperationException(
+        "SimulatedFSDataset does not support cache operation!");
+  }
+
+  @Override // FSDatasetSpi
+  public void uncache(String bpid, Block[] uncacheBlks) {
+    throw new UnsupportedOperationException(
+        "SimulatedFSDataset does not support uncache operation!");
+  }
+
   private BInfo getBInfo(final ExtendedBlock b) {
     final Map<Block, BInfo> map = blockMap.get(b.getBlockPoolId());
     return map == null? null: map.get(b.getLocalBlock());

+ 266 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java

@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.doReturn;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.HdfsBlockLocation;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.LogVerificationAppender;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.namenode.FSImage;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
+import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestFsDatasetCache {
+
+  // Most Linux installs allow a default of 64KB locked memory
+  private static final long CACHE_CAPACITY = 64 * 1024;
+  private static final long BLOCK_SIZE = 4096;
+
+  private static Configuration conf;
+  private static MiniDFSCluster cluster = null;
+  private static FileSystem fs;
+  private static NameNode nn;
+  private static FSImage fsImage;
+  private static DataNode dn;
+  private static FsDatasetSpi<?> fsd;
+  private static DatanodeProtocolClientSideTranslatorPB spyNN;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+        CACHE_CAPACITY);
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1).build();
+    cluster.waitActive();
+
+    fs = cluster.getFileSystem();
+    nn = cluster.getNameNode();
+    fsImage = nn.getFSImage();
+    dn = cluster.getDataNodes().get(0);
+    fsd = dn.getFSDataset();
+
+    spyNN = DataNodeTestUtils.spyOnBposToNN(dn, nn);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  private static void setHeartbeatResponse(DatanodeCommand[] cmds)
+      throws IOException {
+    HeartbeatResponse response = new HeartbeatResponse(
+        cmds,
+        new NNHAStatusHeartbeat(HAServiceState.ACTIVE,
+        fsImage.getLastAppliedOrWrittenTxId()));
+    doReturn(response).when(spyNN).sendHeartbeat(
+        (DatanodeRegistration) any(),
+        (StorageReport[]) any(),
+        anyInt(), anyInt(), anyInt());
+  }
+
+  private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) {
+    return cacheBlocks(new HdfsBlockLocation[] {loc});
+  }
+
+  private static DatanodeCommand[] cacheBlocks(HdfsBlockLocation[] locs) {
+    return new DatanodeCommand[] {
+        getResponse(locs, DatanodeProtocol.DNA_CACHE)
+    };
+  }
+
+  private static DatanodeCommand[] uncacheBlock(HdfsBlockLocation loc) {
+    return uncacheBlocks(new HdfsBlockLocation[] {loc});
+  }
+
+  private static DatanodeCommand[] uncacheBlocks(HdfsBlockLocation[] locs) {
+    return new DatanodeCommand[] {
+        getResponse(locs, DatanodeProtocol.DNA_UNCACHE)
+    };
+  }
+
+  /**
+   * Creates a cache or uncache DatanodeCommand from an array of locations
+   */
+  private static DatanodeCommand getResponse(HdfsBlockLocation[] locs,
+      int action) {
+    String bpid = locs[0].getLocatedBlock().getBlock().getBlockPoolId();
+    Block[] blocks = new Block[locs.length];
+    for (int i=0; i<locs.length; i++) {
+      blocks[i] = locs[i].getLocatedBlock().getBlock().getLocalBlock();
+    }
+    return new BlockCommand(action, bpid, blocks);
+  }
+
+  private static long[] getBlockSizes(HdfsBlockLocation[] locs)
+      throws Exception {
+    long[] sizes = new long[locs.length];
+    for (int i=0; i<locs.length; i++) {
+      HdfsBlockLocation loc = locs[i];
+      String bpid = loc.getLocatedBlock().getBlock().getBlockPoolId();
+      Block block = loc.getLocatedBlock().getBlock().getLocalBlock();
+      ExtendedBlock extBlock = new ExtendedBlock(bpid, block);
+      FileChannel blockChannel =
+          ((FileInputStream)fsd.getBlockInputStream(extBlock, 0)).getChannel();
+      sizes[i] = blockChannel.size();
+    }
+    return sizes;
+  }
+
+  /**
+   * Blocks until cache usage changes from the current value, then verifies
+   * against the expected new value.
+   */
+  private long verifyExpectedCacheUsage(final long current,
+      final long expected) throws Exception {
+    long cacheUsed = fsd.getCacheUsed();
+    while (cacheUsed == current) {
+      cacheUsed = fsd.getCacheUsed();
+      Thread.sleep(100);
+    }
+    long cacheCapacity = fsd.getCacheCapacity();
+    long cacheRemaining = fsd.getCacheRemaining();
+    assertEquals("Sum of used and remaining cache does not equal total",
+        cacheCapacity, cacheUsed+cacheRemaining);
+    assertEquals("Unexpected amount of cache used", expected, cacheUsed);
+    return cacheUsed;
+  }
+
+  @Test(timeout=60000)
+  public void testCacheAndUncacheBlock() throws Exception {
+    final int NUM_BLOCKS = 5;
+
+    // Write a test file
+    final Path testFile = new Path("/testCacheBlock");
+    final long testFileLen = BLOCK_SIZE*NUM_BLOCKS;
+    DFSTestUtil.createFile(fs, testFile, testFileLen, (short)1, 0xABBAl);
+
+    // Get the details of the written file
+    HdfsBlockLocation[] locs =
+        (HdfsBlockLocation[])fs.getFileBlockLocations(testFile, 0, testFileLen);
+    assertEquals("Unexpected number of blocks", NUM_BLOCKS, locs.length);
+    final long[] blockSizes = getBlockSizes(locs);
+
+    // Check initial state
+    final long cacheCapacity = fsd.getCacheCapacity();
+    long cacheUsed = fsd.getCacheUsed();
+    long current = 0;
+    assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity);
+    assertEquals("Unexpected amount of cache used", current, cacheUsed);
+
+    // Cache each block in succession, checking each time
+    for (int i=0; i<NUM_BLOCKS; i++) {
+      setHeartbeatResponse(cacheBlock(locs[i]));
+      current = verifyExpectedCacheUsage(current, current + blockSizes[i]);
+    }
+
+    // Uncache each block in succession, again checking each time
+    for (int i=0; i<NUM_BLOCKS; i++) {
+      setHeartbeatResponse(uncacheBlock(locs[i]));
+      current = verifyExpectedCacheUsage(current, current - blockSizes[i]);
+    }
+  }
+
+  @Test(timeout=60000)
+  public void testFilesExceedMaxLockedMemory() throws Exception {
+    // Create some test files that will exceed total cache capacity
+    // Don't forget that meta files take up space too!
+    final int numFiles = 4;
+    final long fileSize = CACHE_CAPACITY / numFiles;
+    final Path[] testFiles = new Path[4];
+    final HdfsBlockLocation[][] fileLocs = new HdfsBlockLocation[numFiles][];
+    final long[] fileSizes = new long[numFiles];
+    for (int i=0; i<numFiles; i++) {
+      testFiles[i] = new Path("/testFilesExceedMaxLockedMemory-" + i);
+      DFSTestUtil.createFile(fs, testFiles[i], fileSize, (short)1, 0xDFAl);
+      fileLocs[i] = (HdfsBlockLocation[])fs.getFileBlockLocations(
+          testFiles[i], 0, fileSize);
+      // Get the file size (sum of blocks)
+      long[] sizes = getBlockSizes(fileLocs[i]);
+      for (int j=0; j<sizes.length; j++) {
+        fileSizes[i] += sizes[j];
+      }
+    }
+
+    // Cache the first n-1 files
+    long current = 0;
+    for (int i=0; i<numFiles-1; i++) {
+      setHeartbeatResponse(cacheBlocks(fileLocs[i]));
+      current = verifyExpectedCacheUsage(current, current + fileSizes[i]);
+    }
+    final long oldCurrent = current;
+
+    // nth file should hit a capacity exception
+    final LogVerificationAppender appender = new LogVerificationAppender();
+    final Logger logger = Logger.getRootLogger();
+    logger.addAppender(appender);
+    setHeartbeatResponse(cacheBlocks(fileLocs[numFiles-1]));
+    int lines = 0;
+    while (lines == 0) {
+      Thread.sleep(100);
+      lines = appender.countLinesWithMessage(
+          DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY + " exceeded");
+    }
+
+    // Uncache the cached part of the nth file
+    setHeartbeatResponse(uncacheBlocks(fileLocs[numFiles-1]));
+    while (fsd.getCacheUsed() != oldCurrent) {
+      Thread.sleep(100);
+    }
+
+    // Uncache the n-1 files
+    for (int i=0; i<numFiles-1; i++) {
+      setHeartbeatResponse(uncacheBlocks(fileLocs[i]));
+      current = verifyExpectedCacheUsage(current, current - fileSizes[i]);
+    }
+  }
+}