Przeglądaj źródła

HDFS-7694. FSDataInputStream should support "unbuffer" (cmccabe)

(cherry picked from commit 6b39ad0865cb2a7960dd59d68178f0bf28865ce2)
(cherry picked from commit e35788aa5a41940651af0b73dfeaeca011556904)
Colin Patrick Mccabe 10 lat temu
rodzic
commit
8ef73cd4ce

+ 36 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanUnbuffer.java

@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * FSDataInputStreams implement this interface to indicate that they can clear
+ * their buffers on request.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface CanUnbuffer {
+  /**
+   * Reduce the buffering.  This will also free sockets and file descriptors
+   * held by the stream, if possible.
+   */
+  public void unbuffer();
+}

+ 11 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java

@@ -35,7 +35,7 @@ import org.apache.hadoop.util.IdentityHashStore;
 public class FSDataInputStream extends DataInputStream
     implements Seekable, PositionedReadable, 
       ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
-      HasEnhancedByteBufferAccess {
+      HasEnhancedByteBufferAccess, CanUnbuffer {
   /**
    * Map ByteBuffers that we have handed out to readers to ByteBufferPool 
    * objects
@@ -220,4 +220,14 @@ public class FSDataInputStream extends DataInputStream
       bufferPool.putBuffer(buffer);
     }
   }
+
+  @Override
+  public void unbuffer() {
+    try {
+      ((CanUnbuffer)in).unbuffer();
+    } catch (ClassCastException e) {
+      throw new UnsupportedOperationException("this stream does not " +
+          "support unbuffering.");
+    }
+  }
 }

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -115,6 +115,8 @@ Release 2.6.1 - 2015-09-23
     HDFS-8384. Allow NN to startup if there are files having a lease but are not
     under construction. (jing9)
 
+    HDFS-7694. FSDataInputStream should support "unbuffer" (cmccabe)
+
   OPTIMIZATIONS
 
     HDFS-8480. Fix performance and timeout issues in HDFS-7929 by using

+ 7 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java

@@ -50,6 +50,7 @@ import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.ByteBufferUtil;
 import org.apache.hadoop.fs.CanSetDropBehind;
 import org.apache.hadoop.fs.CanSetReadahead;
+import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
@@ -86,7 +87,7 @@ import com.google.common.annotations.VisibleForTesting;
 @InterfaceAudience.Private
 public class DFSInputStream extends FSInputStream
 implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
-    HasEnhancedByteBufferAccess {
+    HasEnhancedByteBufferAccess, CanUnbuffer {
   @VisibleForTesting
   public static boolean tcpReadsDisabledForTesting = false;
   private long hedgedReadOpsLoopNumForTesting = 0;
@@ -1767,4 +1768,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       ((ByteBufferPool)val).putBuffer(buffer);
     }
   }
+
+  @Override
+  public synchronized void unbuffer() {
+    closeCurrentBlockReader();
+  }
 }

+ 7 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java

@@ -29,16 +29,21 @@ import com.google.common.collect.LinkedListMultimap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Time;
 
 /**
  * A cache of input stream sockets to Data Node.
  */
-class PeerCache {
+@InterfaceStability.Unstable
+@InterfaceAudience.Private
+@VisibleForTesting
+public class PeerCache {
   private static final Log LOG = LogFactory.getLog(PeerCache.class);
   
   private static class Key {

+ 28 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c

@@ -1006,6 +1006,34 @@ done:
     return file;
 }
 
+int hdfsUnbufferFile(hdfsFile file)
+{
+    int ret;
+    jthrowable jthr;
+    JNIEnv *env = getJNIEnv();
+
+    if (!env) {
+        ret = EINTERNAL;
+        goto done;
+    }
+    if (file->type != HDFS_STREAM_INPUT) {
+        ret = ENOTSUP;
+        goto done;
+    }
+    jthr = invokeMethod(env, NULL, INSTANCE, file->file, HADOOP_ISTRM,
+                     "unbuffer", "()V");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                HADOOP_ISTRM "#unbuffer failed:");
+        goto done;
+    }
+    ret = 0;
+
+done:
+    errno = ret;
+    return ret;
+}
+
 int hdfsCloseFile(hdfsFS fs, hdfsFile file)
 {
     int ret;

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h

@@ -334,6 +334,15 @@ extern  "C" {
     hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
                           int bufferSize, short replication, tSize blocksize);
 
+    /**
+     * hdfsUnbufferFile - Reduce the buffering done on a file.
+     *
+     * @param file  The file to unbuffer.
+     * @return      0 on success
+     *              ENOTSUP if the file does not support unbuffering
+     *              Errno will also be set to this value.
+     */
+    int hdfsUnbufferFile(hdfsFile file);
 
     /** 
      * hdfsCloseFile - Close an open file. 

+ 127 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestUnbuffer.java

@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.PeerCache;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestUnbuffer {
+  private static final Log LOG =
+      LogFactory.getLog(TestUnbuffer.class.getName());
+
+  /**
+   * Test that calling Unbuffer closes sockets.
+   */
+  @Test
+  public void testUnbufferClosesSockets() throws Exception {
+    Configuration conf = new Configuration();
+    // Set a new ClientContext.  This way, we will have our own PeerCache,
+    // rather than sharing one with other unit tests.
+    conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT,
+        "testUnbufferClosesSocketsContext");
+
+    // Disable short-circuit reads.  With short-circuit, we wouldn't hold open a
+    // TCP socket.
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
+
+    // Set a really long socket timeout to avoid test timing issues.
+    conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
+        100000000L);
+    conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
+        100000000L);
+
+    MiniDFSCluster cluster = null;
+    FSDataInputStream stream = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).build();
+      DistributedFileSystem dfs = (DistributedFileSystem)
+          FileSystem.newInstance(conf);
+      final Path TEST_PATH = new Path("/test1");
+      DFSTestUtil.createFile(dfs, TEST_PATH, 128, (short)1, 1);
+      stream = dfs.open(TEST_PATH);
+      // Read a byte.  This will trigger the creation of a block reader.
+      stream.seek(2);
+      int b = stream.read();
+      Assert.assertTrue(-1 != b);
+
+      // The Peer cache should start off empty.
+      PeerCache cache = dfs.getClient().getClientContext().getPeerCache();
+      Assert.assertEquals(0, cache.size());
+
+      // Unbuffer should clear the block reader and return the socket to the
+      // cache.
+      stream.unbuffer();
+      stream.seek(2);
+      Assert.assertEquals(1, cache.size());
+      int b2 = stream.read();
+      Assert.assertEquals(b, b2);
+    } finally {
+      if (stream != null) {
+        IOUtils.cleanup(null, stream);
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Test opening many files via TCP (not short-circuit).
+   *
+   * This is practical when using unbuffer, because it reduces the number of
+   * sockets and amount of memory that we use.
+   */
+  @Test
+  public void testOpenManyFilesViaTcp() throws Exception {
+    final int NUM_OPENS = 500;
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
+    MiniDFSCluster cluster = null;
+    FSDataInputStream[] streams = new FSDataInputStream[NUM_OPENS];
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).build();
+      DistributedFileSystem dfs = cluster.getFileSystem();
+      final Path TEST_PATH = new Path("/testFile");
+      DFSTestUtil.createFile(dfs, TEST_PATH, 131072, (short)1, 1);
+
+      for (int i = 0; i < NUM_OPENS; i++) {
+        streams[i] = dfs.open(TEST_PATH);
+        LOG.info("opening file " + i + "...");
+        Assert.assertTrue(-1 != streams[i].read());
+        streams[i].unbuffer();
+      }
+    } finally {
+      for (FSDataInputStream stream : streams) {
+        IOUtils.cleanup(null, stream);
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+}