Browse Source

HDFS-11754. Make FsServerDefaults cache configurable. Contributed by Mikhail Erofeev.

(cherry picked from commit 53509f295b5274059541565d7216bf98aa35347d)
Kihwal Lee 7 years ago
parent
commit
2e1532a712

+ 7 - 3
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -24,6 +24,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACH
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_LOCAL_INTERFACES;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_LOCAL_INTERFACES;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_DEFAULT;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY;
 
 
@@ -205,8 +207,6 @@ import com.google.common.net.InetAddresses;
 public class DFSClient implements java.io.Closeable, RemotePeerFactory,
 public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     DataEncryptionKeyFactory {
     DataEncryptionKeyFactory {
   public static final Logger LOG = LoggerFactory.getLogger(DFSClient.class);
   public static final Logger LOG = LoggerFactory.getLogger(DFSClient.class);
-  // 1 hour
-  public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L;
   private static final String DFS_KMS_PREFIX = "dfs-kms-";
   private static final String DFS_KMS_PREFIX = "dfs-kms-";
 
 
   private final Configuration conf;
   private final Configuration conf;
@@ -240,6 +240,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
   private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
   private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
   private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
   private final int smallBufferSize;
   private final int smallBufferSize;
+  private final long serverDefaultsValidityPeriod;
 
 
   public DfsClientConf getConf() {
   public DfsClientConf getConf() {
     return dfsClientConf;
     return dfsClientConf;
@@ -371,6 +372,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
             null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_READS, false);
             null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_READS, false);
     Long readahead = (conf.get(DFS_CLIENT_CACHE_READAHEAD) == null) ?
     Long readahead = (conf.get(DFS_CLIENT_CACHE_READAHEAD) == null) ?
         null : conf.getLong(DFS_CLIENT_CACHE_READAHEAD, 0);
         null : conf.getLong(DFS_CLIENT_CACHE_READAHEAD, 0);
+    this.serverDefaultsValidityPeriod =
+            conf.getLong(DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_KEY,
+      DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_DEFAULT);
     Boolean writeDropBehind =
     Boolean writeDropBehind =
         (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES) == null) ?
         (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES) == null) ?
             null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, false);
             null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, false);
@@ -663,7 +667,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     checkOpen();
     checkOpen();
     long now = Time.monotonicNow();
     long now = Time.monotonicNow();
     if ((serverDefaults == null) ||
     if ((serverDefaults == null) ||
-        (now - serverDefaultsLastUpdate > SERVER_DEFAULTS_VALIDITY_PERIOD)) {
+        (now - serverDefaultsLastUpdate > serverDefaultsValidityPeriod)) {
       serverDefaults = namenode.getServerDefaults();
       serverDefaults = namenode.getServerDefaults();
       serverDefaultsLastUpdate = now;
       serverDefaultsLastUpdate = now;
     }
     }

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java

@@ -112,6 +112,10 @@ public interface HdfsClientConfigKeys {
   String  DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY =
   String  DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY =
       "dfs.client.max.block.acquire.failures";
       "dfs.client.max.block.acquire.failures";
   int     DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT = 3;
   int     DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT = 3;
+  String  DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_KEY =
+      "dfs.client.server-defaults.validity.period.ms";
+  long    DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_DEFAULT =
+      TimeUnit.HOURS.toMillis(1);
   String  DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type";
   String  DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type";
   String  DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C";
   String  DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C";
   String  DFS_BYTES_PER_CHECKSUM_KEY = "dfs.bytes-per-checksum";
   String  DFS_BYTES_PER_CHECKSUM_KEY = "dfs.bytes-per-checksum";

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -2282,6 +2282,16 @@
   </description>
   </description>
 </property>
 </property>
 
 
+<property>
+  <name>dfs.client.server-defaults.validity.period.ms</name>
+  <value>3600000</value>
+  <description>
+    The amount of milliseconds after which cached server defaults are updated.
+
+    By default this parameter is set to 1 hour.
+  </description>
+</property>
+
 <property>
 <property>
   <name>dfs.namenode.enable.retrycache</name>
   <name>dfs.namenode.enable.retrycache</name>
   <value>true</value>
   <value>true</value>

+ 103 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java

@@ -23,6 +23,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY;
@@ -38,6 +39,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
 import static org.junit.Assume.assumeTrue;
+import static org.mockito.Mockito.doReturn;
 
 
 import java.io.BufferedReader;
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.ByteArrayOutputStream;
@@ -51,6 +53,7 @@ import java.net.URI;
 import java.net.UnknownHostException;
 import java.net.UnknownHostException;
 import java.security.PrivilegedExceptionAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.EnumSet;
 import java.util.EnumSet;
+import java.util.concurrent.TimeUnit;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -179,6 +182,106 @@ public class TestFileCreation {
     }
     }
   }
   }
 
 
+  /**
+   * Test that server default values are cached on the client size
+   * and are stale after namenode update.
+   */
+  @Test
+  public void testServerDefaultsWithCaching()
+      throws IOException, InterruptedException {
+    // Create cluster with an explicit block size param
+    Configuration clusterConf = new HdfsConfiguration();
+    long originalBlockSize = DFS_BLOCK_SIZE_DEFAULT * 2;
+    clusterConf.setLong(DFS_BLOCK_SIZE_KEY, originalBlockSize);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(clusterConf)
+        .numDataNodes(0)
+        .build();
+    cluster.waitActive();
+    // Set a spy namesystem inside the namenode and return it
+    FSNamesystem spyNamesystem =
+        NameNodeAdapter.spyOnNamesystem(cluster.getNameNode());
+    InetSocketAddress nameNodeAddr = cluster.getNameNode().getNameNodeAddress();
+    try {
+      // Create a dfs client and set a long enough validity interval
+      Configuration clientConf = new HdfsConfiguration();
+      clientConf.setLong(DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_KEY,
+          TimeUnit.MINUTES.toMillis(1));
+      DFSClient dfsClient = new DFSClient(nameNodeAddr, clientConf);
+      FsServerDefaults defaults = dfsClient.getServerDefaults();
+      assertEquals(originalBlockSize, defaults.getBlockSize());
+
+      // Update the namenode with a new parameter
+      long updatedDefaultBlockSize = DFS_BLOCK_SIZE_DEFAULT * 3;
+      FsServerDefaults newDefaults =
+          new FsServerDefaults(updatedDefaultBlockSize,
+              defaults.getBytesPerChecksum(), defaults.getWritePacketSize(),
+              defaults.getReplication(), defaults.getFileBufferSize(),
+              defaults.getEncryptDataTransfer(), defaults.getTrashInterval(),
+              defaults.getChecksumType(), defaults.getKeyProviderUri(),
+              defaults.getDefaultStoragePolicyId());
+      doReturn(newDefaults).when(spyNamesystem).getServerDefaults();
+
+      // The value is stale
+      Thread.sleep(1);
+      defaults = dfsClient.getServerDefaults();
+      assertEquals(originalBlockSize, defaults.getBlockSize());
+
+      // Another client reads the updated value correctly
+      DFSClient newDfsClient = new DFSClient(nameNodeAddr, clientConf);
+      defaults = newDfsClient.getServerDefaults();
+      assertEquals(updatedDefaultBlockSize, defaults.getBlockSize());
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test that server defaults are updated on the client after cache expiration.
+   */
+  @Test
+  public void testServerDefaultsWithMinimalCaching()
+      throws IOException, InterruptedException {
+    // Create cluster with an explicit block size param
+    Configuration clusterConf = new HdfsConfiguration();
+    long originalBlockSize = DFS_BLOCK_SIZE_DEFAULT * 2;
+    clusterConf.setLong(DFS_BLOCK_SIZE_KEY, originalBlockSize);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(clusterConf)
+        .numDataNodes(0)
+        .build();
+    cluster.waitActive();
+    // Set a spy namesystem inside the namenode and return it
+    FSNamesystem spyNamesystem =
+        NameNodeAdapter.spyOnNamesystem(cluster.getNameNode());
+    InetSocketAddress nameNodeAddr = cluster.getNameNode().getNameNodeAddress();
+    try {
+      // Create a dfs client and set a minimal validity interval
+      Configuration clientConf = new HdfsConfiguration();
+      // Invalidate cache in at most 1 ms, see DfsClient#getServerDefaults
+      clientConf.setLong(DFS_CLIENT_SERVER_DEFAULTS_VALIDITY_PERIOD_MS_KEY, 0L);
+      DFSClient dfsClient = new DFSClient(nameNodeAddr, clientConf);
+      FsServerDefaults defaults = dfsClient.getServerDefaults();
+      assertEquals(originalBlockSize, defaults.getBlockSize());
+
+      // Update the namenode with a new FsServerDefaults
+      long updatedDefaultBlockSize = DFS_BLOCK_SIZE_DEFAULT * 3;
+      FsServerDefaults newDefaults =
+          new FsServerDefaults(updatedDefaultBlockSize,
+              defaults.getBytesPerChecksum(), defaults.getWritePacketSize(),
+              defaults.getReplication(), defaults.getFileBufferSize(),
+              defaults.getEncryptDataTransfer(), defaults.getTrashInterval(),
+              defaults.getChecksumType(), defaults.getKeyProviderUri(),
+              defaults.getDefaultStoragePolicyId());
+      doReturn(newDefaults).when(spyNamesystem).getServerDefaults();
+
+      Thread.sleep(1);
+      defaults = dfsClient.getServerDefaults();
+      // Value is updated correctly
+      assertEquals(updatedDefaultBlockSize, defaults.getBlockSize());
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   @Test
   @Test
   public void testFileCreation() throws IOException {
   public void testFileCreation() throws IOException {
     checkFileCreation(null, false);
     checkFileCreation(null, false);