瀏覽代碼

HDFS-6107. When a block cannot be cached due to limited space on the DataNode, it becomes uncacheable (cmccabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.4@1578516 13f79535-47bb-0310-9956-ffa450edef68
Colin McCabe 11 年之前
父節點
當前提交
d50ebb1bfe

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -429,6 +429,9 @@ Release 2.4.0 - UNRELEASED
     HDFS-5981. PBImageXmlWriter generates malformed XML.
     (Haohui Mai via cnauroth)
 
+    HDFS-6107. When a block can't be cached due to limited space on the
+    DataNode, that block becomes uncacheable (cmccabe)
+
   BREAKDOWN OF HDFS-4685 SUBTASKS AND RELATED JIRAS
 
     HDFS-5596. Implement RPC stubs. (Haohui Mai via cnauroth)

+ 0 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java

@@ -598,14 +598,12 @@ class BPOfferService {
         blockIdCmd.getBlockPoolId() + " of [" +
           blockIdArrayToString(blockIdCmd.getBlockIds()) + "]");
       dn.getFSDataset().cache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
-      dn.metrics.incrBlocksCached(blockIdCmd.getBlockIds().length);
       break;
     case DatanodeProtocol.DNA_UNCACHE:
       LOG.info("DatanodeCommand action: DNA_UNCACHE for " +
         blockIdCmd.getBlockPoolId() + " of [" +
           blockIdArrayToString(blockIdCmd.getBlockIds()) + "]");
       dn.getFSDataset().uncache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
-      dn.metrics.incrBlocksUncached(blockIdCmd.getBlockIds().length);
       break;
     case DatanodeProtocol.DNA_SHUTDOWN:
       // TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -1041,7 +1041,7 @@ public class DataNode extends Configured
     }
   }
     
-  DataNodeMetrics getMetrics() {
+  public DataNodeMetrics getMetrics() {
     return metrics;
   }
   

+ 14 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java

@@ -337,15 +337,16 @@ public class FsDatasetCache {
       ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(),
           key.getBlockId(), length, genstamp);
       long newUsedBytes = usedBytesCount.reserve(length);
-      if (newUsedBytes < 0) {
-        LOG.warn("Failed to cache " + key + ": could not reserve " + length +
-            " more bytes in the cache: " +
-            DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY +
-            " of " + maxBytes + " exceeded.");
-        numBlocksFailedToCache.incrementAndGet();
-        return;
-      }
+      boolean reservedBytes = false;
       try {
+        if (newUsedBytes < 0) {
+          LOG.warn("Failed to cache " + key + ": could not reserve " + length +
+              " more bytes in the cache: " +
+              DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY +
+              " of " + maxBytes + " exceeded.");
+          return;
+        }
+        reservedBytes = true;
         try {
           blockIn = (FileInputStream)dataset.getBlockInputStream(extBlk, 0);
           metaIn = (FileInputStream)dataset.getMetaDataInputStream(extBlk)
@@ -391,10 +392,13 @@ public class FsDatasetCache {
         }
         dataset.datanode.getShortCircuitRegistry().processBlockMlockEvent(key);
         numBlocksCached.addAndGet(1);
+        dataset.datanode.getMetrics().incrBlocksCached(1);
         success = true;
       } finally {
         if (!success) {
-          newUsedBytes = usedBytesCount.release(length);
+          if (reservedBytes) {
+            newUsedBytes = usedBytesCount.release(length);
+          }
           if (LOG.isDebugEnabled()) {
             LOG.debug("Caching of " + key + " was aborted.  We are now " +
                 "caching only " + newUsedBytes + " + bytes in total.");
@@ -439,6 +443,7 @@ public class FsDatasetCache {
       long newUsedBytes =
           usedBytesCount.release(value.mappableBlock.getLength());
       numBlocksCached.addAndGet(-1);
+      dataset.datanode.getMetrics().incrBlocksUncached(1);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Uncaching of " + key + " completed.  " +
             "usedBytes = " + newUsedBytes);

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java

@@ -1143,6 +1143,11 @@ public class DFSTestUtil {
           }
           return false;
         }
+        LOG.info("verifyExpectedCacheUsage: got " +
+            curCacheUsed + "/" + expectedCacheUsed + " bytes cached; " +
+            curBlocks + "/" + expectedBlocks + " blocks cached. " +
+            "memlock limit = " +
+            NativeIO.POSIX.getCacheManipulator().getMemlockLimit());
         return true;
       }
     }, 100, 60000);

+ 83 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java

@@ -40,12 +40,15 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.HdfsBlockLocation;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.hdfs.BlockReaderTestUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.LogVerificationAppender;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@@ -80,6 +83,7 @@ import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 
 import com.google.common.base.Supplier;
+import com.google.common.primitives.Ints;
 
 public class TestFsDatasetCache {
   private static final Log LOG = LogFactory.getLog(TestFsDatasetCache.class);
@@ -349,10 +353,13 @@ public class TestFsDatasetCache {
         fsd.getNumBlocksFailedToCache() > 0);
 
     // Uncache the n-1 files
+    int curCachedBlocks = 16;
     for (int i=0; i<numFiles-1; i++) {
       setHeartbeatResponse(uncacheBlocks(fileLocs[i]));
-      total -= rounder.round(fileSizes[i]);
-      DFSTestUtil.verifyExpectedCacheUsage(total, 4 * (numFiles - 2 - i), fsd);
+      long uncachedBytes = rounder.round(fileSizes[i]);
+      total -= uncachedBytes;
+      curCachedBlocks -= uncachedBytes / BLOCK_SIZE;
+      DFSTestUtil.verifyExpectedCacheUsage(total, curCachedBlocks, fsd);
     }
     LOG.info("finishing testFilesExceedMaxLockedMemory");
   }
@@ -491,4 +498,78 @@ public class TestFsDatasetCache {
     MetricsAsserts.assertCounter("BlocksCached", 1l, dnMetrics);
     MetricsAsserts.assertCounter("BlocksUncached", 1l, dnMetrics);
   }
+
+  @Test(timeout=60000)
+  public void testReCacheAfterUncache() throws Exception {
+    final int TOTAL_BLOCKS_PER_CACHE =
+        Ints.checkedCast(CACHE_CAPACITY / BLOCK_SIZE);
+    BlockReaderTestUtil.enableHdfsCachingTracing();
+    Assert.assertEquals(0, CACHE_CAPACITY % BLOCK_SIZE);
+    
+    // Create a small file
+    final Path SMALL_FILE = new Path("/smallFile");
+    DFSTestUtil.createFile(fs, SMALL_FILE,
+        BLOCK_SIZE, (short)1, 0xcafe);
+
+    // Create a file that will take up the whole cache
+    final Path BIG_FILE = new Path("/bigFile");
+    DFSTestUtil.createFile(fs, BIG_FILE,
+        TOTAL_BLOCKS_PER_CACHE * BLOCK_SIZE, (short)1, 0xbeef);
+    final DistributedFileSystem dfs = cluster.getFileSystem();
+    dfs.addCachePool(new CachePoolInfo("pool"));
+    final long bigCacheDirectiveId = 
+        dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
+        .setPool("pool").setPath(BIG_FILE).setReplication((short)1).build());
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
+        long blocksCached =
+            MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
+        if (blocksCached != TOTAL_BLOCKS_PER_CACHE) {
+          LOG.info("waiting for " + TOTAL_BLOCKS_PER_CACHE + " to " +
+              "be cached.   Right now only " + blocksCached + " blocks are cached.");
+          return false;
+        }
+        LOG.info(TOTAL_BLOCKS_PER_CACHE + " blocks are now cached.");
+        return true;
+      }
+    }, 1000, 30000);
+    
+    // Try to cache a smaller file.  It should fail.
+    final long shortCacheDirectiveId =
+      dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
+        .setPool("pool").setPath(SMALL_FILE).setReplication((short)1).build());
+    Thread.sleep(10000);
+    MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
+    Assert.assertEquals(TOTAL_BLOCKS_PER_CACHE,
+        MetricsAsserts.getLongCounter("BlocksCached", dnMetrics));
+    
+    // Uncache the big file and verify that the small file can now be
+    // cached (regression test for HDFS-6107)
+    dfs.removeCacheDirective(bigCacheDirectiveId);
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        RemoteIterator<CacheDirectiveEntry> iter;
+        try {
+          iter = dfs.listCacheDirectives(
+              new CacheDirectiveInfo.Builder().build());
+          CacheDirectiveEntry entry;
+          do {
+            entry = iter.next();
+          } while (entry.getInfo().getId() != shortCacheDirectiveId);
+          if (entry.getStats().getFilesCached() != 1) {
+            LOG.info("waiting for directive " + shortCacheDirectiveId +
+                " to be cached.  stats = " + entry.getStats());
+            return false;
+          }
+          LOG.info("directive " + shortCacheDirectiveId + " has been cached.");
+        } catch (IOException e) {
+          Assert.fail("unexpected exception" + e.toString());
+        }
+        return true;
+      }
+    }, 1000, 30000);
+  }
 }