瀏覽代碼

HDFS-6208. Merging change r1586154 from trunk to branch-2.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1586160 13f79535-47bb-0310-9956-ffa450edef68
Chris Nauroth 11 年之前
父節點
當前提交
129bf91c5e

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -111,6 +111,8 @@ Release 2.4.1 - UNRELEASED
     HDFS-6209. TestValidateConfigurationSettings should use random ports.
     (Arpit Agarwal via szetszwo) 
 
+    HDFS-6208. DataNode caching can leak file descriptors. (cnauroth)
+
 Release 2.4.0 - 2014-04-07 
 
   INCOMPATIBLE CHANGES

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java

@@ -395,6 +395,8 @@ public class FsDatasetCache {
         dataset.datanode.getMetrics().incrBlocksCached(1);
         success = true;
       } finally {
+        IOUtils.closeQuietly(blockIn);
+        IOUtils.closeQuietly(metaIn);
         if (!success) {
           if (reservedBytes) {
             newUsedBytes = usedBytesCount.release(length);
@@ -403,8 +405,6 @@ public class FsDatasetCache {
             LOG.debug("Caching of " + key + " was aborted.  We are now " +
                 "caching only " + newUsedBytes + " + bytes in total.");
           }
-          IOUtils.closeQuietly(blockIn);
-          IOUtils.closeQuietly(metaIn);
           if (mappableBlock != null) {
             mappableBlock.close();
           }

+ 40 - 32
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java

@@ -28,6 +28,7 @@ import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileChannel.MapMode;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.ChecksumException;
@@ -76,8 +77,9 @@ public class MappableBlock implements Closeable {
       String blockFileName) throws IOException {
     MappableBlock mappableBlock = null;
     MappedByteBuffer mmap = null;
+    FileChannel blockChannel = null;
     try {
-      FileChannel blockChannel = blockIn.getChannel();
+      blockChannel = blockIn.getChannel();
       if (blockChannel == null) {
         throw new IOException("Block InputStream has no FileChannel.");
       }
@@ -86,6 +88,7 @@ public class MappableBlock implements Closeable {
       verifyChecksum(length, metaIn, blockChannel, blockFileName);
       mappableBlock = new MappableBlock(mmap, length);
     } finally {
+      IOUtils.closeQuietly(blockChannel);
       if (mappableBlock == null) {
         if (mmap != null) {
           NativeIO.POSIX.munmap(mmap); // unmapping also unlocks
@@ -108,38 +111,43 @@ public class MappableBlock implements Closeable {
         BlockMetadataHeader.readHeader(new DataInputStream(
             new BufferedInputStream(metaIn, BlockMetadataHeader
                 .getHeaderSize())));
-    FileChannel metaChannel = metaIn.getChannel();
-    if (metaChannel == null) {
-      throw new IOException("Block InputStream meta file has no FileChannel.");
-    }
-    DataChecksum checksum = header.getChecksum();
-    final int bytesPerChecksum = checksum.getBytesPerChecksum();
-    final int checksumSize = checksum.getChecksumSize();
-    final int numChunks = (8*1024*1024) / bytesPerChecksum;
-    ByteBuffer blockBuf = ByteBuffer.allocate(numChunks*bytesPerChecksum);
-    ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks*checksumSize);
-    // Verify the checksum
-    int bytesVerified = 0;
-    while (bytesVerified < length) {
-      Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
-          "Unexpected partial chunk before EOF");
-      assert bytesVerified % bytesPerChecksum == 0;
-      int bytesRead = fillBuffer(blockChannel, blockBuf);
-      if (bytesRead == -1) {
-        throw new IOException("checksum verification failed: premature EOF");
+    FileChannel metaChannel = null;
+    try {
+      metaChannel = metaIn.getChannel();
+      if (metaChannel == null) {
+        throw new IOException("Block InputStream meta file has no FileChannel.");
       }
-      blockBuf.flip();
-      // Number of read chunks, including partial chunk at end
-      int chunks = (bytesRead+bytesPerChecksum-1) / bytesPerChecksum;
-      checksumBuf.limit(chunks*checksumSize);
-      fillBuffer(metaChannel, checksumBuf);
-      checksumBuf.flip();
-      checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
-          bytesVerified);
-      // Success
-      bytesVerified += bytesRead;
-      blockBuf.clear();
-      checksumBuf.clear();
+      DataChecksum checksum = header.getChecksum();
+      final int bytesPerChecksum = checksum.getBytesPerChecksum();
+      final int checksumSize = checksum.getChecksumSize();
+      final int numChunks = (8*1024*1024) / bytesPerChecksum;
+      ByteBuffer blockBuf = ByteBuffer.allocate(numChunks*bytesPerChecksum);
+      ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks*checksumSize);
+      // Verify the checksum
+      int bytesVerified = 0;
+      while (bytesVerified < length) {
+        Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
+            "Unexpected partial chunk before EOF");
+        assert bytesVerified % bytesPerChecksum == 0;
+        int bytesRead = fillBuffer(blockChannel, blockBuf);
+        if (bytesRead == -1) {
+          throw new IOException("checksum verification failed: premature EOF");
+        }
+        blockBuf.flip();
+        // Number of read chunks, including partial chunk at end
+        int chunks = (bytesRead+bytesPerChecksum-1) / bytesPerChecksum;
+        checksumBuf.limit(chunks*checksumSize);
+        fillBuffer(metaChannel, checksumBuf);
+        checksumBuf.flip();
+        checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
+            bytesVerified);
+        // Success
+        bytesVerified += bytesRead;
+        blockBuf.clear();
+        checksumBuf.clear();
+      }
+    } finally {
+      IOUtils.closeQuietly(metaChannel);
     }
   }
 

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java

@@ -147,6 +147,12 @@ public class TestCacheDirectives {
 
   @After
   public void teardown() throws Exception {
+    // Remove cache directives left behind by tests so that we release mmaps.
+    RemoteIterator<CacheDirectiveEntry> iter = dfs.listCacheDirectives(null);
+    while (iter.hasNext()) {
+      dfs.removeCacheDirective(iter.next().getInfo().getId());
+    }
+    waitForCachedBlocks(namenode, 0, 0, "teardown");
     if (cluster != null) {
       cluster.shutdown();
     }