Browse Source

HADOOP-10279. Create multiplexer, a requirement for the fair queue. (Contributed by Chris Li)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1604090 13f79535-47bb-0310-9956-ffa450edef68
Arpit Agarwal 11 years ago
parent
commit
e74d99b81e

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -442,6 +442,9 @@ Release 2.5.0 - UNRELEASED
     HADOOP-10557. FsShell -cp -pa option for preserving extended ACLs.
     (Akira Ajisaka via cnauroth)
 
+    HADOOP-10279. Create multiplexer, a requirement for the fair queue.
+    (Chris Li via Arpit Agarwal)
+
   OPTIMIZATIONS
 
   BUG FIXES 

+ 148 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WeightedRoundRobinMultiplexer.java

@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Determines which queue to start reading from, occasionally drawing from
+ * low-priority queues in order to prevent starvation. Given the pull pattern
+ * [9, 4, 1] for 3 queues:
+ *
+ * The cycle is (a minimum of) 9+4+1=14 reads.
+ * Queue 0 is read (at least) 9 times
+ * Queue 1 is read (at least) 4 times
+ * Queue 2 is read (at least) 1 time
+ * Repeat
+ *
+ * There may be more reads than the minimum due to race conditions. This is
+ * allowed by design for performance reasons.
+ */
+public class WeightedRoundRobinMultiplexer {
+  // Config keys
+  public static final String IPC_CALLQUEUE_WRRMUX_WEIGHTS_KEY =
+    "faircallqueue.multiplexer.weights";
+
+  public static final Log LOG =
+    LogFactory.getLog(WeightedRoundRobinMultiplexer.class);
+
+  private final int numQueues; // The number of queues under our provisioning
+
+  private final AtomicInteger currentQueueIndex; // Current queue we're serving
+  private final AtomicInteger requestsLeft; // Number of requests left for this queue
+
+  private int[] queueWeights; // The weights for each queue
+
+  public WeightedRoundRobinMultiplexer(int aNumQueues, String ns,
+    Configuration conf) {
+    if (aNumQueues <= 0) {
+      throw new IllegalArgumentException("Requested queues (" + aNumQueues +
+        ") must be greater than zero.");
+    }
+
+    this.numQueues = aNumQueues;
+    this.queueWeights = conf.getInts(ns + "." +
+      IPC_CALLQUEUE_WRRMUX_WEIGHTS_KEY);
+
+    if (this.queueWeights.length == 0) {
+      this.queueWeights = getDefaultQueueWeights(this.numQueues);
+    } else if (this.queueWeights.length != this.numQueues) {
+      throw new IllegalArgumentException(ns + "." +
+        IPC_CALLQUEUE_WRRMUX_WEIGHTS_KEY + " must specify exactly " +
+        this.numQueues + " weights: one for each priority level.");
+    }
+
+    this.currentQueueIndex = new AtomicInteger(0);
+    this.requestsLeft = new AtomicInteger(this.queueWeights[0]);
+
+    LOG.info("WeightedRoundRobinMultiplexer is being used.");
+  }
+
+  /**
+   * Creates default weights for each queue. The weights are 2^N.
+   */
+  private int[] getDefaultQueueWeights(int aNumQueues) {
+    int[] weights = new int[aNumQueues];
+
+    int weight = 1; // Start low
+    for(int i = aNumQueues - 1; i >= 0; i--) { // Start at lowest queue
+      weights[i] = weight;
+      weight *= 2; // Double every iteration
+    }
+    return weights;
+  }
+
+  /**
+   * Move to the next queue.
+   */
+  private void moveToNextQueue() {
+    int thisIdx = this.currentQueueIndex.get();
+
+    // Wrap to fit in our bounds
+    int nextIdx = (thisIdx + 1) % this.numQueues;
+
+    // Set to next index: once this is called, requests will start being
+    // drawn from nextIdx, but requestsLeft will continue to decrement into
+    // the negatives
+    this.currentQueueIndex.set(nextIdx);
+
+    // Finally, reset requestsLeft. This will enable moveToNextQueue to be
+    // called again, for the new currentQueueIndex
+    this.requestsLeft.set(this.queueWeights[nextIdx]);
+  }
+
+  /**
+   * Advances the index, which will change the current index
+   * if called enough times.
+   */
+  private void advanceIndex() {
+    // Since we did read, we should decrement
+    int requestsLeftVal = this.requestsLeft.decrementAndGet();
+
+    // Strict compare with zero (instead of inequality) so that if another
+    // thread decrements requestsLeft, only one thread will be responsible
+    // for advancing currentQueueIndex
+    if (requestsLeftVal == 0) {
+      // This is guaranteed to be called exactly once per currentQueueIndex
+      this.moveToNextQueue();
+    }
+  }
+
+  /**
+   * Gets the current index. Should be accompanied by a call to
+   * advanceIndex at some point.
+   */
+  private int getCurrentIndex() {
+    return this.currentQueueIndex.get();
+  }
+
+  /**
+   * Use the mux by getting and advancing index.
+   */
+  public int getAndAdvanceCurrentIndex() {
+    int idx = this.getCurrentIndex();
+    this.advanceIndex();
+    return idx;
+  }
+
+}

+ 142 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestWeightedRoundRobinMultiplexer.java

@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import static org.apache.hadoop.ipc.WeightedRoundRobinMultiplexer.IPC_CALLQUEUE_WRRMUX_WEIGHTS_KEY;
+
+public class TestWeightedRoundRobinMultiplexer {
+  public static final Log LOG = LogFactory.getLog(TestWeightedRoundRobinMultiplexer.class);
+
+  private WeightedRoundRobinMultiplexer mux;
+
+  @Test(expected=IllegalArgumentException.class)
+  public void testInstantiateNegativeMux() {
+    mux = new WeightedRoundRobinMultiplexer(-1, "", new Configuration());
+  }
+
+  @Test(expected=IllegalArgumentException.class)
+  public void testInstantiateZeroMux() {
+    mux = new WeightedRoundRobinMultiplexer(0, "", new Configuration());
+  }
+
+  @Test(expected=IllegalArgumentException.class)
+  public void testInstantiateIllegalMux() {
+    Configuration conf = new Configuration();
+    conf.setStrings("namespace." + IPC_CALLQUEUE_WRRMUX_WEIGHTS_KEY,
+      "1", "2", "3");
+
+    // ask for 3 weights with 2 queues
+    mux = new WeightedRoundRobinMultiplexer(2, "namespace", conf);
+  }
+
+  @Test
+  public void testLegalInstantiation() {
+    Configuration conf = new Configuration();
+    conf.setStrings("namespace." + IPC_CALLQUEUE_WRRMUX_WEIGHTS_KEY,
+      "1", "2", "3");
+
+    // ask for 3 weights with 3 queues
+    mux = new WeightedRoundRobinMultiplexer(3, "namespace.", conf);
+  }
+
+  @Test
+  public void testDefaultPattern() {
+    // Mux of size 1: 0 0 0 0 0, etc
+    mux = new WeightedRoundRobinMultiplexer(1, "", new Configuration());
+    for(int i = 0; i < 10; i++) {
+      assertEquals(mux.getAndAdvanceCurrentIndex(), 0);
+    }
+
+    // Mux of size 2: 0 0 1 0 0 1 0 0 1, etc
+    mux = new WeightedRoundRobinMultiplexer(2, "", new Configuration());
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 0);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 0);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 1);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 0);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 0);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 1);
+
+    // Size 3: 4x0 2x1 1x2, etc
+    mux = new WeightedRoundRobinMultiplexer(3, "", new Configuration());
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 0);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 0);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 0);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 0);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 1);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 1);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 2);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 0);
+
+    // Size 4: 8x0 4x1 2x2 1x3
+    mux = new WeightedRoundRobinMultiplexer(4, "", new Configuration());
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 0);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 0);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 0);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 0);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 0);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 0);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 0);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 0);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 1);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 1);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 1);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 1);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 2);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 2);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 3);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 0);
+  }
+
+  @Test
+  public void testCustomPattern() {
+    // 1x0 1x1
+    Configuration conf = new Configuration();
+    conf.setStrings("test.custom." + IPC_CALLQUEUE_WRRMUX_WEIGHTS_KEY,
+      "1", "1");
+
+    mux = new WeightedRoundRobinMultiplexer(2, "test.custom", conf);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 0);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 1);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 0);
+    assertEquals(mux.getAndAdvanceCurrentIndex(), 1);
+
+    // 1x0 3x1 2x2
+    conf.setStrings("test.custom." + IPC_CALLQUEUE_WRRMUX_WEIGHTS_KEY,
+      "1", "3", "2");
+
+    mux = new WeightedRoundRobinMultiplexer(3, "test.custom", conf);
+
+    for(int i = 0; i < 5; i++) {
+      assertEquals(mux.getAndAdvanceCurrentIndex(), 0);
+      assertEquals(mux.getAndAdvanceCurrentIndex(), 1);
+      assertEquals(mux.getAndAdvanceCurrentIndex(), 1);
+      assertEquals(mux.getAndAdvanceCurrentIndex(), 1);
+      assertEquals(mux.getAndAdvanceCurrentIndex(), 2);
+      assertEquals(mux.getAndAdvanceCurrentIndex(), 2);
+    } // Ensure pattern repeats
+
+  }
+}