Browse Source

HDFS-16175.Improve the configurable value of Server #PURGE_INTERVAL_NANOS. (#3307)

Co-authored-by: zhujianghua <zhujianghua@zhujianghuadeMacBook-Pro.local>
Reviewed-by: Ayush Saxena <ayushsaxena@apache.org>
(cherry picked from commit ad54f5195c8c01f333703c55cd70703109d75f29)
(cherry picked from commit 2b2f8f575bffb18df53a3cf70cc45ba6384798e2)
jianghuazhu 3 years ago
parent
commit
3b38d011a8

+ 4 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java

@@ -459,6 +459,10 @@ public class CommonConfigurationKeysPublic {
                                                 "ipc.server.log.slow.rpc";
                                                 "ipc.server.log.slow.rpc";
   public static final boolean IPC_SERVER_LOG_SLOW_RPC_DEFAULT = false;
   public static final boolean IPC_SERVER_LOG_SLOW_RPC_DEFAULT = false;
 
 
+  public static final String IPC_SERVER_PURGE_INTERVAL_MINUTES_KEY =
+    "ipc.server.purge.interval";
+  public static final int IPC_SERVER_PURGE_INTERVAL_MINUTES_DEFAULT = 15;
+
   /**
   /**
    * @see
    * @see
    * <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
    * <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">

+ 23 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -480,6 +480,8 @@ public abstract class Server {
   volatile private boolean running = true;         // true while server runs
   volatile private boolean running = true;         // true while server runs
   private CallQueueManager<Call> callQueue;
   private CallQueueManager<Call> callQueue;
 
 
+  private long purgeIntervalNanos;
+
   // maintains the set of client connections and handles idle timeouts
   // maintains the set of client connections and handles idle timeouts
   private ConnectionManager connectionManager;
   private ConnectionManager connectionManager;
   private Listener listener = null;
   private Listener listener = null;
@@ -509,6 +511,20 @@ public abstract class Server {
     this.logSlowRPC = logSlowRPCFlag;
     this.logSlowRPC = logSlowRPCFlag;
   }
   }
 
 
+  private void setPurgeIntervalNanos(int purgeInterval) {
+    int tmpPurgeInterval = CommonConfigurationKeysPublic.
+        IPC_SERVER_PURGE_INTERVAL_MINUTES_DEFAULT;
+    if (purgeInterval > 0) {
+      tmpPurgeInterval = purgeInterval;
+    }
+    this.purgeIntervalNanos = TimeUnit.NANOSECONDS.convert(
+            tmpPurgeInterval, TimeUnit.MINUTES);
+  }
+
+  @VisibleForTesting
+  public long getPurgeIntervalNanos() {
+    return this.purgeIntervalNanos;
+  }
 
 
   /**
   /**
    * Logs a Slow RPC Request.
    * Logs a Slow RPC Request.
@@ -1477,9 +1493,6 @@ public abstract class Server {
     }
     }
   }
   }
 
 
-  private final static long PURGE_INTERVAL_NANOS = TimeUnit.NANOSECONDS.convert(
-      15, TimeUnit.MINUTES);
-
   // Sends responses of RPC back to clients.
   // Sends responses of RPC back to clients.
   private class Responder extends Thread {
   private class Responder extends Thread {
     private final Selector writeSelector;
     private final Selector writeSelector;
@@ -1515,7 +1528,7 @@ public abstract class Server {
         try {
         try {
           waitPending();     // If a channel is being registered, wait.
           waitPending();     // If a channel is being registered, wait.
           writeSelector.select(
           writeSelector.select(
-              TimeUnit.NANOSECONDS.toMillis(PURGE_INTERVAL_NANOS));
+              TimeUnit.NANOSECONDS.toMillis(purgeIntervalNanos));
           Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
           Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
           while (iter.hasNext()) {
           while (iter.hasNext()) {
             SelectionKey key = iter.next();
             SelectionKey key = iter.next();
@@ -1538,7 +1551,7 @@ public abstract class Server {
             }
             }
           }
           }
           long nowNanos = Time.monotonicNowNanos();
           long nowNanos = Time.monotonicNowNanos();
-          if (nowNanos < lastPurgeTimeNanos + PURGE_INTERVAL_NANOS) {
+          if (nowNanos < lastPurgeTimeNanos + purgeIntervalNanos) {
             continue;
             continue;
           }
           }
           lastPurgeTimeNanos = nowNanos;
           lastPurgeTimeNanos = nowNanos;
@@ -1616,7 +1629,7 @@ public abstract class Server {
         Iterator<RpcCall> iter = responseQueue.listIterator(0);
         Iterator<RpcCall> iter = responseQueue.listIterator(0);
         while (iter.hasNext()) {
         while (iter.hasNext()) {
           call = iter.next();
           call = iter.next();
-          if (now > call.responseTimestampNanos + PURGE_INTERVAL_NANOS) {
+          if (now > call.responseTimestampNanos + purgeIntervalNanos) {
             closeConnection(call.connection);
             closeConnection(call.connection);
             break;
             break;
           }
           }
@@ -3119,6 +3132,10 @@ public abstract class Server {
         CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC,
         CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC,
         CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_DEFAULT));
         CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_DEFAULT));
 
 
+    this.setPurgeIntervalNanos(conf.getInt(
+        CommonConfigurationKeysPublic.IPC_SERVER_PURGE_INTERVAL_MINUTES_KEY,
+        CommonConfigurationKeysPublic.IPC_SERVER_PURGE_INTERVAL_MINUTES_DEFAULT));
+
     // Create the responder here
     // Create the responder here
     responder = new Responder();
     responder = new Responder();
     
     

+ 8 - 0
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -2107,6 +2107,14 @@
     </description>
     </description>
 </property>
 </property>
 
 
+<property>
+  <name>ipc.server.purge.interval</name>
+  <value>15</value>
+  <description>Define how often calls are cleaned up in the server.
+    The default is 15 minutes. The unit is minutes.
+  </description>
+</property>
+
 <property>
 <property>
   <name>ipc.maximum.data.length</name>
   <name>ipc.maximum.data.length</name>
   <value>67108864</value>
   <value>67108864</value>

+ 20 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestServer.java

@@ -26,8 +26,10 @@ import java.io.IOException;
 import java.net.BindException;
 import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.ServerSocket;
+import java.util.concurrent.TimeUnit;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.Server.Call;
 import org.apache.hadoop.ipc.Server.Call;
@@ -185,4 +187,22 @@ public class TestServer {
     assertTrue(handler.isSuppressedLog(IpcException.class));
     assertTrue(handler.isSuppressedLog(IpcException.class));
     assertFalse(handler.isSuppressedLog(RpcClientException.class));
     assertFalse(handler.isSuppressedLog(RpcClientException.class));
   }
   }
+
+  @Test (timeout=300000)
+  public void testPurgeIntervalNanosConf() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(CommonConfigurationKeysPublic.
+        IPC_SERVER_PURGE_INTERVAL_MINUTES_KEY, 3);
+    Server server = new Server("0.0.0.0", 0, LongWritable.class,
+            1, conf) {
+      @Override
+      public Writable call(
+              RPC.RpcKind rpcKind, String protocol, Writable param,
+              long receiveTime) throws Exception {
+        return null;
+      }
+    };
+    long purgeInterval = TimeUnit.NANOSECONDS.convert(3, TimeUnit.MINUTES);
+    assertEquals(server.getPurgeIntervalNanos(), purgeInterval);
+  }
 }
 }