Explorar el Código

HADOOP-18868. Optimize the configuration and use of callqueue overflow trigger failover (#5998)

huhaiyang hace 1 año
padre
commit
9d48af8d70

+ 62 - 10
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java

@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.ipc;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT;
+
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
@@ -63,7 +66,7 @@ public class CallQueueManager<E extends Schedulable>
   }
 
   private volatile boolean clientBackOffEnabled;
-  private boolean serverFailOverEnabled;
+  private volatile boolean serverFailOverEnabled;
 
   // Atomic refs point to active callQueue
   // We have two so we can better control swapping
@@ -81,18 +84,15 @@ public class CallQueueManager<E extends Schedulable>
         namespace, conf);
     int[] capacityWeights = parseCapacityWeights(priorityLevels,
         namespace, conf);
+    this.serverFailOverEnabled = getServerFailOverEnable(namespace, conf);
     BlockingQueue<E> bq = createCallQueueInstance(backingClass,
         priorityLevels, maxQueueSize, namespace, capacityWeights, conf);
     this.clientBackOffEnabled = clientBackOffEnabled;
-    this.serverFailOverEnabled = conf.getBoolean(
-        namespace + "." +
-        CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE,
-        CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT);
     this.putRef = new AtomicReference<BlockingQueue<E>>(bq);
     this.takeRef = new AtomicReference<BlockingQueue<E>>(bq);
     LOG.info("Using callQueue: {}, queueCapacity: {}, " +
-        "scheduler: {}, ipcBackoff: {}.",
-        backingClass, maxQueueSize, schedulerClass, clientBackOffEnabled);
+        "scheduler: {}, ipcBackoff: {}, ipcFailOver: {}.",
+        backingClass, maxQueueSize, schedulerClass, clientBackOffEnabled, serverFailOverEnabled);
   }
 
   @VisibleForTesting // only!
@@ -105,6 +105,41 @@ public class CallQueueManager<E extends Schedulable>
     this.serverFailOverEnabled = serverFailOverEnabled;
   }
 
+  /**
+   * Return boolean value configured by property 'ipc.<port>.callqueue.overflow.trigger.failover'
+   * if it is present. If the config is not present, default config
+   * (without port) is used to derive class i.e 'ipc.callqueue.overflow.trigger.failover',
+   * and derived value is returned if configured. Otherwise, default value
+   * {@link CommonConfigurationKeys#IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT} is returned.
+   *
+   * @param namespace Namespace "ipc" + "." + Server's listener port.
+   * @param conf Configuration properties.
+   * @return Value returned based on configuration.
+   */
+  private boolean getServerFailOverEnable(String namespace, Configuration conf) {
+    String propertyKey = namespace + "." +
+        CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE;
+
+    if (conf.get(propertyKey) != null) {
+      return conf.getBoolean(propertyKey,
+          CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT);
+    }
+
+    String[] nsPort = namespace.split("\\.");
+    if (nsPort.length == 2) {
+      // Only if ns is split with ".", we can separate namespace and port.
+      // In the absence of "ipc.<port>.callqueue.overflow.trigger.failover" property,
+      // we look up "ipc.callqueue.overflow.trigger.failover" property.
+      return conf.getBoolean(nsPort[0] + "."
+          + IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE, IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT);
+    }
+
+    // Otherwise return default value.
+    LOG.info("{} not specified set default value is {}",
+        IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE, IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT);
+    return CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT;
+  }
+
   private static <T extends RpcScheduler> T createScheduler(
       Class<T> theClass, int priorityLevels, String ns, Configuration conf) {
     // Used for custom, configurable scheduler
@@ -155,9 +190,9 @@ public class CallQueueManager<E extends Schedulable>
     // Used for custom, configurable callqueues
     try {
       Constructor<T> ctor = theClass.getDeclaredConstructor(int.class,
-          int.class, String.class, int[].class, Configuration.class);
-      return ctor.newInstance(priorityLevels, maxLen, ns,
-          capacityWeights, conf);
+          int.class, String.class, int[].class, boolean.class, Configuration.class);
+      return ctor.newInstance(priorityLevels, maxLen, ns, capacityWeights,
+          this.serverFailOverEnabled, conf);
     } catch (RuntimeException e) {
       throw e;
     } catch (InvocationTargetException e) {
@@ -199,6 +234,20 @@ public class CallQueueManager<E extends Schedulable>
     return clientBackOffEnabled;
   }
 
+  @VisibleForTesting
+  public boolean isServerFailOverEnabled() {
+    return serverFailOverEnabled;
+  }
+
+  @VisibleForTesting
+  public boolean isServerFailOverEnabledByQueue() {
+    BlockingQueue<E> bq = putRef.get();
+    if (bq instanceof FairCallQueue) {
+      return ((FairCallQueue<E>) bq).isServerFailOverEnabled();
+    }
+    return false;
+  }
+
   // Based on policy to determine back off current call
   boolean shouldBackOff(Schedulable e) {
     return scheduler.shouldBackOff(e);
@@ -421,6 +470,9 @@ public class CallQueueManager<E extends Schedulable>
     RpcScheduler newScheduler = createScheduler(schedulerClass, priorityLevels,
         ns, conf);
     int[] capacityWeights = parseCapacityWeights(priorityLevels, ns, conf);
+
+    // Update serverFailOverEnabled.
+    this.serverFailOverEnabled = getServerFailOverEnable(ns, conf);
     BlockingQueue<E> newQ = createCallQueueInstance(queueClassToUse,
         priorityLevels, maxSize, ns, capacityWeights, conf);
 

+ 19 - 7
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java

@@ -35,7 +35,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException;
 import org.apache.hadoop.metrics2.MetricsCollector;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
@@ -90,7 +89,16 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
   public FairCallQueue(int priorityLevels, int capacity, String ns,
       Configuration conf) {
     this(priorityLevels, capacity, ns,
-        CallQueueManager.getDefaultQueueCapacityWeights(priorityLevels), conf);
+        CallQueueManager.getDefaultQueueCapacityWeights(priorityLevels),
+        false, conf);
+  }
+
+  @VisibleForTesting
+  public FairCallQueue(int priorityLevels, int capacity, String ns, boolean serverFailOverEnabled,
+      Configuration conf) {
+    this(priorityLevels, capacity, ns,
+        CallQueueManager.getDefaultQueueCapacityWeights(priorityLevels),
+        serverFailOverEnabled, conf);
   }
 
   /**
@@ -101,18 +109,21 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
    * @param ns the prefix to use for configuration
    * @param capacityWeights the weights array for capacity allocation
    *                        among subqueues
+   * @param serverFailOverEnabled whether or not to enable callqueue overflow trigger failover
+   *                              for stateless servers when RPC call queue is filled
    * @param conf the configuration to read from
    * Notes: Each sub-queue has a capacity of `capacity / numSubqueues`.
    * The first or the highest priority sub-queue has an excess capacity
    * of `capacity % numSubqueues`
    */
   public FairCallQueue(int priorityLevels, int capacity, String ns,
-      int[] capacityWeights, Configuration conf) {
+      int[] capacityWeights, boolean serverFailOverEnabled, Configuration conf) {
     if(priorityLevels < 1) {
       throw new IllegalArgumentException("Number of Priority Levels must be " +
           "at least 1");
     }
     int numQueues = priorityLevels;
+    this.serverFailOverEnabled = serverFailOverEnabled;
     LOG.info("FairCallQueue is in use with " + numQueues +
         " queues with total capacity of " + capacity);
 
@@ -135,10 +146,6 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
       }
       this.overflowedCalls.add(new AtomicLong(0));
     }
-    this.serverFailOverEnabled = conf.getBoolean(
-        ns + "." +
-        CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE,
-        CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT);
 
     this.multiplexer = new WeightedRoundRobinMultiplexer(numQueues, ns, conf);
     // Make this the active source of metrics
@@ -493,4 +500,9 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
   public void setMultiplexer(RpcMultiplexer newMux) {
     this.multiplexer = newMux;
   }
+
+  @VisibleForTesting
+  public boolean isServerFailOverEnabled() {
+    return serverFailOverEnabled;
+  }
 }

+ 10 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -3857,6 +3857,16 @@ public abstract class Server {
     callQueue.setClientBackoffEnabled(value);
   }
 
+  @VisibleForTesting
+  public boolean isServerFailOverEnabled() {
+    return callQueue.isServerFailOverEnabled();
+  }
+
+  @VisibleForTesting
+  public boolean isServerFailOverEnabledByQueue() {
+    return callQueue.isServerFailOverEnabledByQueue();
+  }
+
   /**
    * The maximum size of the rpc call queue of this server.
    * @return The maximum size of the rpc call queue.

+ 11 - 1
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -2580,13 +2580,23 @@ The switch to turn S3A auditing on or off.
 </property>
 
 <property>
-  <name>callqueue.overflow.trigger.failover</name>
+  <name>ipc.[port_number].callqueue.overflow.trigger.failover</name>
   <value>false</value>
   <description>
     Enable callqueue overflow trigger failover for stateless servers.
   </description>
 </property>
 
+<property>
+  <name>ipc.callqueue.overflow.trigger.failover</name>
+  <value>false</value>
+  <description>
+    This property is used as fallback property in case
+    "ipc.[port_number].callqueue.overflow.trigger.failover" is not defined.
+    It determines whether or not to enable callqueue overflow trigger failover for stateless servers.
+  </description>
+</property>
+
 <!-- FairCallQueue properties -->
 <!-- See FairCallQueue documentation for a table of all properties -->
 

+ 4 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java

@@ -149,6 +149,10 @@ public class TestCommonConfigurationFields extends TestConfigurationFieldsBase {
     xmlPropsToSkipCompare.add("fs.azure.saskey.usecontainersaskeyforallaccess");
     xmlPropsToSkipCompare.add("fs.azure.user.agent.prefix");
 
+    // Properties in enable callqueue overflow trigger failover for stateless servers.
+    xmlPropsToSkipCompare.add("ipc.[port_number].callqueue.overflow.trigger.failover");
+    xmlPropsToSkipCompare.add("ipc.callqueue.overflow.trigger.failover");
+
     // FairCallQueue configs that includes dynamic ports in its keys
     xmlPropsToSkipCompare.add("ipc.[port_number].backoff.enable");
     xmlPropsToSkipCompare.add("ipc.backoff.enable");

+ 26 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.ipc;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -517,4 +518,29 @@ public class TestCallQueueManager {
     verify(queue, times(0)).put(call);
     verify(queue, times(0)).add(call);
   }
+
+  @Test
+  public void testCallQueueOverEnabled() {
+    // default ipc.callqueue.overflow.trigger.failover' configure false.
+    String ns = "ipc.8888";
+    conf.setBoolean("ipc.callqueue.overflow.trigger.failover", false);
+    manager = new CallQueueManager<>(fcqueueClass, rpcSchedulerClass, false,
+        10, ns, conf);
+    assertFalse(manager.isServerFailOverEnabled());
+    assertFalse(manager.isServerFailOverEnabledByQueue());
+
+    // set ipc.8888.callqueue.overflow.trigger.failover configure true.
+    conf.setBoolean("ipc.8888.callqueue.overflow.trigger.failover", true);
+    manager = new CallQueueManager<>(fcqueueClass, rpcSchedulerClass, false,
+        10, ns, conf);
+    assertTrue(manager.isServerFailOverEnabled());
+    assertTrue(manager.isServerFailOverEnabledByQueue());
+
+    // set ipc.callqueue.overflow.trigger.failover' configure true.
+    conf.setBoolean("ipc.callqueue.overflow.trigger.failover", true);
+    manager = new CallQueueManager<>(fcqueueClass, rpcSchedulerClass, false,
+        10, ns, conf);
+    assertTrue(manager.isServerFailOverEnabled());
+    assertTrue(manager.isServerFailOverEnabledByQueue());
+  }
 }

+ 5 - 7
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java

@@ -28,7 +28,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.times;
 
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.junit.Before;
 import org.junit.Test;
 import static org.junit.Assert.assertEquals;
@@ -105,7 +104,7 @@ public class TestFairCallQueue {
     fairCallQueue = new FairCallQueue<Schedulable>(7, 1025, "ns", conf);
     assertThat(fairCallQueue.remainingCapacity()).isEqualTo(1025);
     fairCallQueue = new FairCallQueue<Schedulable>(7, 1025, "ns",
-        new int[]{7, 6, 5, 4, 3, 2, 1}, conf);
+        new int[]{7, 6, 5, 4, 3, 2, 1}, false, conf);
     assertThat(fairCallQueue.remainingCapacity()).isEqualTo(1025);
   }
 
@@ -170,7 +169,7 @@ public class TestFairCallQueue {
     // default weights i.e. all queues share capacity
     fcq = new FairCallQueue<Schedulable>(numQueues, 4, "ns", conf);
     FairCallQueue<Schedulable> fcq1 = new FairCallQueue<Schedulable>(
-        numQueues, capacity, "ns", new int[]{1, 3}, conf);
+        numQueues, capacity, "ns", new int[]{1, 3}, false, conf);
 
     for (int i=0; i < capacity; i++) {
       Schedulable call = mockCall("u", i%2);
@@ -221,11 +220,10 @@ public class TestFairCallQueue {
     Configuration conf = new Configuration();
     // Config for server to throw StandbyException instead of the
     // regular RetriableException if call queue is full.
-    conf.setBoolean(
-        "ns." + CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE,
-        true);
+
     // 3 queues, 2 slots each.
-    fcq = Mockito.spy(new FairCallQueue<>(3, 6, "ns", conf));
+    fcq = Mockito.spy(new FairCallQueue<>(3, 6, "ns",
+        true, conf));
 
     Schedulable p0 = mockCall("a", 0);
     Schedulable p1 = mockCall("b", 1);

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java

@@ -88,7 +88,7 @@ public class TestRefreshCallQueue {
   @SuppressWarnings("serial")
   public static class MockCallQueue<E> extends LinkedBlockingQueue<E> {
     public MockCallQueue(int levels, int cap, String ns, int[] capacityWeights,
-        Configuration conf) {
+        boolean serverFailOverEnabled, Configuration conf) {
       super(cap);
       mockQueueConstructions++;
     }
@@ -172,6 +172,6 @@ public class TestRefreshCallQueue {
     // check callQueueSize has changed
     assertEquals(150 * serviceHandlerCount, rpcServer.getClientRpcServer()
         .getMaxQueueSize());
- }
+  }
 
 }