瀏覽代碼

HDFS-5049. Add JNI mlock support. (Andrew Wang via Colin Patrick McCabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4949@1512427 13f79535-47bb-0310-9956-ffa450edef68
Colin McCabe 12 年之前
父節點
當前提交
2a4031940c

+ 85 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java

@@ -23,6 +23,7 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -145,6 +146,12 @@ public class NativeIO {
       return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded;
     }
 
+    private static void assertCodeLoaded() throws IOException {
+      if (!isAvailable()) {
+        throw new IOException("NativeIO was not loaded");
+      }
+    }
+
     /** Wrapper around open(2) */
     public static native FileDescriptor open(String path, int flags, int mode) throws IOException;
     /** Wrapper around fstat(2) */
@@ -225,6 +232,84 @@ public class NativeIO {
       }
     }
 
+    static native void mlock_native(
+        ByteBuffer buffer, long len) throws NativeIOException;
+    static native void munlock_native(
+        ByteBuffer buffer, long len) throws NativeIOException;
+
+    /**
+     * Locks the provided direct ByteBuffer into memory, preventing it from
+     * swapping out. After a buffer is locked, future accesses will not incur
+     * a page fault.
+     * 
+     * See the mlock(2) man page for more information.
+     * 
+     * @throws NativeIOException
+     */
+    public static void mlock(ByteBuffer buffer, long len)
+        throws IOException {
+      assertCodeLoaded();
+      if (!buffer.isDirect()) {
+        throw new IOException("Cannot mlock a non-direct ByteBuffer");
+      }
+      mlock_native(buffer, len);
+    }
+
+    /**
+     * Unlocks a locked direct ByteBuffer, allowing it to swap out of memory.
+     * This is a no-op if the ByteBuffer was not previously locked.
+     * 
+     * See the munlock(2) man page for more information.
+     * 
+     * @throws NativeIOException
+     */
+    public static void munlock(ByteBuffer buffer, long len)
+        throws IOException {
+      assertCodeLoaded();
+      if (!buffer.isDirect()) {
+        throw new IOException("Cannot munlock a non-direct ByteBuffer");
+      }
+      munlock_native(buffer, len);
+    }
+
+    /**
+     * Resource limit types copied from <sys/resource.h>
+     */
+    private static class ResourceLimit {
+      public static final int RLIMIT_CPU        = 0;
+      public static final int RLIMIT_FSIZE      = 1;
+      public static final int RLIMIT_DATA       = 2;
+      public static final int RLIMIT_STACK      = 3;
+      public static final int RLIMIT_CORE       = 4;
+      public static final int RLIMIT_RSS        = 5;
+      public static final int RLIMIT_NPROC      = 6;
+      public static final int RLIMIT_NOFILE     = 7;
+      public static final int RLIMIT_MEMLOCK    = 8;
+      public static final int RLIMIT_AS         = 9;
+      public static final int RLIMIT_LOCKS      = 10;
+      public static final int RLIMIT_SIGPENDING = 11;
+      public static final int RLIMIT_MSGQUEUE   = 12;
+      public static final int RLIMIT_NICE       = 13;
+      public static final int RLIMIT_RTPRIO     = 14;
+      public static final int RLIMIT_RTTIME     = 15;
+      public static final int RLIMIT_NLIMITS    = 16;
+    }
+
+    static native String getrlimit(int limit) throws NativeIOException;
+    /**
+     * Returns the soft limit on the number of bytes that may be locked by the
+     * process in bytes (RLIMIT_MEMLOCK).
+     * 
+     * See the getrlimit(2) man page for more information
+     *  
+     * @return maximum amount of locked memory in bytes
+     */
+    public static long getMemlockLimit() throws IOException {
+      assertCodeLoaded();
+      String strLimit = getrlimit(ResourceLimit.RLIMIT_MEMLOCK);
+      return Long.parseLong(strLimit);
+    }
+
     /** Linux only methods used for getOwner() implementation */
     private static native long getUIDforFDOwnerforOwner(FileDescriptor fd) throws IOException;
     private static native String getUserName(long uid) throws IOException;

+ 73 - 0
hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/nativeio/NativeIO.c

@@ -31,8 +31,11 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
+#include <sys/mman.h>
+#include <sys/resource.h>
 #include <sys/stat.h>
 #include <sys/syscall.h>
+#include <sys/time.h>
 #include <sys/types.h>
 #include <unistd.h>
 #include "config.h"
@@ -360,6 +363,76 @@ Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_sync_1file_1range(
 #endif
 }
 
+/**
+ * public static native void mlock_native(
+ *   ByteBuffer buffer, long offset);
+ *
+ * The "00024" in the function name is an artifact of how JNI encodes
+ * special characters. U+0024 is '$'.
+ */
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_mlock_1native(
+  JNIEnv *env, jclass clazz,
+  jobject buffer, jlong len)
+{
+  void* buf = (void*)(*env)->GetDirectBufferAddress(env, buffer);
+  PASS_EXCEPTIONS(env);
+
+  if (mlock(buf, len)) {
+    throw_ioe(env, errno);
+  }
+}
+
+/**
+ * public static native void munlock_native(
+ *   ByteBuffer buffer, long offset);
+ *
+ * The "00024" in the function name is an artifact of how JNI encodes
+ * special characters. U+0024 is '$'.
+ */
+JNIEXPORT void JNICALL
+Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_munlock_1native(
+  JNIEnv *env, jclass clazz,
+  jobject buffer, jlong len)
+{
+  void* buf = (void*)(*env)->GetDirectBufferAddress(env, buffer);
+  PASS_EXCEPTIONS(env);
+
+  if (munlock(buf, len)) {
+    throw_ioe(env, errno);
+  }
+}
+
+/**
+ * public static native String getrlimit(
+ *   int resource);
+ *
+ * The "00024" in the function name is an artifact of how JNI encodes
+ * special characters. U+0024 is '$'.
+ */
+JNIEXPORT jstring JNICALL
+Java_org_apache_hadoop_io_nativeio_NativeIO_00024POSIX_getrlimit(
+  JNIEnv *env, jclass clazz,
+  jint resource)
+{
+  jstring ret = NULL;
+
+  struct rlimit rlim;
+  int rc = getrlimit((int)resource, &rlim);
+  if (rc != 0) {
+    throw_ioe(env, errno);
+    goto cleanup;
+  }
+
+  // Convert soft limit into a string
+  char limit[17];
+  int len = snprintf(&limit, 17, "%d", rlim.rlim_cur);
+  ret = (*env)->NewStringUTF(env,&limit);
+
+cleanup:
+  return ret;
+}
+
 #ifdef __FreeBSD__
 static int toFreeBSDFlags(int flags)
 {

+ 58 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/nativeio/TestNativeIO.java

@@ -24,6 +24,9 @@ import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -32,6 +35,7 @@ import java.util.List;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+
 import static org.junit.Assume.*;
 import static org.junit.Assert.*;
 
@@ -45,6 +49,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Time;
 
 public class TestNativeIO {
@@ -524,4 +529,57 @@ public class TestNativeIO {
 
     FileUtils.deleteQuietly(TEST_DIR);
   }
+
+  @Test(timeout=10000)
+  public void testMlock() throws Exception {
+    assumeTrue(NativeIO.isAvailable());
+    assumeTrue(Shell.LINUX);
+    final File TEST_FILE = new File(new File(
+        System.getProperty("test.build.data","build/test/data")),
+        "testMlockFile");
+    final int BUF_LEN = 12289;
+    byte buf[] = new byte[BUF_LEN];
+    int bufSum = 0;
+    for (int i = 0; i < buf.length; i++) {
+      buf[i] = (byte)(i % 60);
+      bufSum += buf[i];
+    }
+    FileOutputStream fos = new FileOutputStream(TEST_FILE);
+    fos.write(buf);
+    fos.getChannel().force(true);
+    fos.close();
+    
+    FileInputStream fis = null;
+    FileChannel channel = null;
+    try {
+      // Map file into memory
+      fis = new FileInputStream(TEST_FILE);
+      channel = fis.getChannel();
+      long fileSize = channel.size();
+      MappedByteBuffer mapbuf = channel.map(MapMode.READ_ONLY, 0, fileSize);
+      // mlock the buffer
+      NativeIO.POSIX.mlock(mapbuf, fileSize);
+      // Read the buffer
+      int sum = 0;
+      for (int i=0; i<fileSize; i++) {
+        sum += mapbuf.get(i);
+      }
+      assertEquals("Expected sums to be equal", bufSum, sum);
+      // munlock the buffer
+      NativeIO.POSIX.munlock(mapbuf, fileSize);
+    } finally {
+      if (channel != null) {
+        channel.close();
+      }
+      if (fis != null) {
+        fis.close();
+      }
+    }
+  }
+
+  @Test(timeout=10000)
+  public void testGetMemlockLimit() throws Exception {
+    assumeTrue(NativeIO.isAvailable());
+    NativeIO.POSIX.getMemlockLimit();
+  }
 }

+ 14 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt

@@ -0,0 +1,14 @@
+Hadoop HDFS Change Log for HDFS-4949
+
+HDFS-4949 (Unreleased)
+
+  INCOMPATIBLE CHANGES
+
+  NEW FEATURES
+
+  IMPROVEMENTS
+    HDFS-5049.  Add JNI mlock support.  (Andrew Wang via Colin Patrick McCabe)
+
+  OPTIMIZATIONS
+
+  BUG FIXES

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -98,6 +98,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT = false;
   public static final String  DFS_DATANODE_USE_DN_HOSTNAME = "dfs.datanode.use.datanode.hostname";
   public static final boolean DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT = false;
+  public static final String  DFS_DATANODE_MAX_LOCKED_MEMORY_KEY = "dfs.datanode.max.locked.memory";
+  public static final long    DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT = 0;
 
   public static final String  DFS_NAMENODE_HTTP_PORT_KEY = "dfs.http.port";
   public static final int     DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
@@ -25,6 +26,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY;
@@ -39,6 +42,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_SUPPORTED_NA
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@ -73,6 +77,8 @@ public class DNConf {
   
   final long xceiverStopTimeout;
 
+  final long maxLockedMemory;
+
   public DNConf(Configuration conf) {
     socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
         HdfsServerConstants.READ_TIMEOUT);
@@ -137,6 +143,10 @@ public class DNConf {
     this.xceiverStopTimeout = conf.getLong(
         DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
         DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
+
+    this.maxLockedMemory = conf.getLong(
+        DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+        DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT);
   }
   
   // We get minimumNameNodeVersion via a method so it can be mocked out in tests.

+ 21 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -40,6 +40,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEF
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
@@ -149,6 +150,7 @@ import org.apache.hadoop.hdfs.web.resources.Param;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
@@ -736,6 +738,25 @@ public class DataNode extends Configured
     this.conf = conf;
     this.dnConf = new DNConf(conf);
 
+    if (dnConf.maxLockedMemory > 0) {
+      if (!NativeIO.isAvailable()) {
+        throw new RuntimeException(String.format(
+            "Cannot start datanode because the configured max locked memory" +
+            " size (%s) is greater than zero and native code is not available.",
+            DFS_DATANODE_MAX_LOCKED_MEMORY_KEY));
+      }
+      long ulimit = NativeIO.POSIX.getMemlockLimit();
+      if (dnConf.maxLockedMemory > ulimit) {
+      throw new RuntimeException(String.format(
+          "Cannot start datanode because the configured max locked memory" +
+          " size (%s) of %d bytes is less than the datanode's available" +
+          " RLIMIT_MEMLOCK ulimit of %d bytes.",
+          DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+          dnConf.maxLockedMemory,
+          ulimit));
+      }
+    }
+
     storage = new DataStorage();
     
     // global DN settings

+ 25 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeConfig.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
 import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
 
 import java.io.File;
 import java.io.IOException;
@@ -30,6 +31,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -106,4 +109,26 @@ public class TestDatanodeConfig {
       throw new IOException("Bad URI", e);
     }
   }
+
+  @Test(timeout=60000)
+  public void testMemlockLimit() throws Exception {
+    assumeTrue(NativeIO.isAvailable());
+    final long memlockLimit = NativeIO.POSIX.getMemlockLimit();
+    Configuration conf = cluster.getConfiguration(0);
+    // Try starting the DN with limit configured to the ulimit
+    conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+        memlockLimit);
+    DataNode dn = null;
+    dn = DataNode.createDataNode(new String[]{},  conf);
+    dn.shutdown();
+    // Try starting the DN with a limit > ulimit
+    conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+        memlockLimit+1);
+    try {
+      dn = DataNode.createDataNode(new String[]{}, conf);
+    } catch (RuntimeException e) {
+      GenericTestUtils.assertExceptionContains(
+          "less than the datanode's available RLIMIT_MEMLOCK", e);
+    }
+  }
 }