Browse Source

HADOOP-13189. FairCallQueue makes callQueue larger than the configured capacity. Contributed by Vinitha Gankidi.

Konstantin V Shvachko 9 years ago
parent
commit
10574bce81

+ 2 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java

@@ -51,7 +51,8 @@ public class CallQueueManager<E> {
       maxQueueSize, namespace, conf);
     this.putRef = new AtomicReference<BlockingQueue<E>>(bq);
     this.takeRef = new AtomicReference<BlockingQueue<E>>(bq);
-    LOG.info("Using callQueue " + backingClass);
+    LOG.info("Using callQueue: " + backingClass + " queueCapacity: " +
+        maxQueueSize);
   }
 
   private <T extends BlockingQueue<E>> T createCallQueueInstance(

+ 13 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java

@@ -77,21 +77,28 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
 
   /**
    * Create a FairCallQueue.
-   * @param capacity the maximum size of each sub-queue
+   * @param capacity the total size of all sub-queues
    * @param ns the prefix to use for configuration
    * @param conf the configuration to read from
-   * Notes: the FairCallQueue has no fixed capacity. Rather, it has a minimum
-   * capacity of `capacity` and a maximum capacity of `capacity * number_queues`
+   * Notes: Each sub-queue has a capacity of `capacity / numSubqueues`.
+   * The first or the highest priority sub-queue has an excess capacity
+   * of `capacity % numSubqueues`
    */
   public FairCallQueue(int capacity, String ns, Configuration conf) {
     int numQueues = parseNumQueues(ns, conf);
-    LOG.info("FairCallQueue is in use with " + numQueues + " queues.");
+    LOG.info("FairCallQueue is in use with " + numQueues +
+        " queues with total capacity of " + capacity);
 
     this.queues = new ArrayList<BlockingQueue<E>>(numQueues);
     this.overflowedCalls = new ArrayList<AtomicLong>(numQueues);
-
+    int queueCapacity = capacity / numQueues;
+    int capacityForFirstQueue = queueCapacity + (capacity % numQueues);
     for(int i=0; i < numQueues; i++) {
-      this.queues.add(new LinkedBlockingQueue<E>(capacity));
+      if (i == 0) {
+        this.queues.add(new LinkedBlockingQueue<E>(capacityForFirstQueue));
+      } else {
+        this.queues.add(new LinkedBlockingQueue<E>(queueCapacity));
+      }
       this.overflowedCalls.add(new AtomicLong(0));
     }
 

+ 1 - 7
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java

@@ -18,12 +18,6 @@
 
 package org.apache.hadoop.ipc;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -64,7 +58,7 @@ public class TestFairCallQueue extends TestCase {
     Configuration conf = new Configuration();
     conf.setInt("ns." + IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2);
 
-    fcq = new FairCallQueue<Schedulable>(5, "ns", conf);
+    fcq = new FairCallQueue<Schedulable>(10, "ns", conf);
   }
 
   //