浏览代码

HADOOP-15253. Should update maxQueueSize when refresh call queue. Contributed by Tao Jie.

(cherry picked from commit acfd764fcc9990e507c0e7cea746652375aaa632)
Konstantin V Shvachko 7 年之前
父节点
当前提交
b0d6021122

+ 3 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -656,6 +656,9 @@ public abstract class Server {
   public synchronized void refreshCallQueue(Configuration conf) {
     // Create the next queue
     String prefix = getQueueClassPrefix();
+    this.maxQueueSize = handlerCount * conf.getInt(
+        CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
+        CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
     callQueue.swapQueue(getSchedulerClass(prefix, conf),
         getQueueClass(prefix, conf), maxQueueSize, prefix, conf);
   }

+ 15 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java

@@ -30,8 +30,10 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
@@ -141,8 +143,16 @@ public class TestRefreshCallQueue {
 
     // throw an error when we double-initialize JvmMetrics
     DefaultMetricsSystem.setMiniClusterMode(false);
-
+    int serviceHandlerCount = config.getInt(
+        DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
+        DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
     NameNodeRpcServer rpcServer = (NameNodeRpcServer) cluster.getNameNodeRpc();
+    // check callqueue size
+    assertEquals(CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT
+        * serviceHandlerCount, rpcServer.getClientRpcServer().getMaxQueueSize());
+    // Replace queue and update queue size
+    config.setInt(CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
+        150);
     try {
       rpcServer.getClientRpcServer().refreshCallQueue(config);
     } catch (Exception e) {
@@ -158,6 +168,9 @@ public class TestRefreshCallQueue {
     } finally {
       DefaultMetricsSystem.setMiniClusterMode(oldValue);
     }
-  }
+    // check callQueueSize has changed
+    assertEquals(150 * serviceHandlerCount, rpcServer.getClientRpcServer()
+        .getMaxQueueSize());
+ }
 
 }