浏览代码

YARN-11493. [Federation] ConfiguredRMFailoverProxyProvider Supports Randomly Select an Router. (#5651)

slfan1989 2 年之前
父节点
当前提交
5ddaf2e133

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -3966,6 +3966,11 @@ public class YarnConfiguration extends Configuration {
   public static final String FEDERATION_ENABLED = FEDERATION_PREFIX + "enabled";
   public static final boolean DEFAULT_FEDERATION_ENABLED = false;
 
+  public static final String FEDERATION_YARN_CLIENT_FAILOVER_RANDOM_ORDER =
+      FEDERATION_PREFIX + "failover.random.order";
+
+  public static final boolean DEFAULT_FEDERATION_YARN_CLIENT_FAILOVER_RANDOM_ORDER = false;
+
   public static final String FEDERATION_FAILOVER_ENABLED =
       FEDERATION_PREFIX + "failover.enabled";
   public static final boolean DEFAULT_FEDERATION_FAILOVER_ENABLED = true;

+ 74 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailoverProxyProvider.java

@@ -31,8 +31,11 @@ import java.io.IOException;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.times;
@@ -51,6 +54,8 @@ public class TestRMFailoverProxyProvider {
   private static final int RM2_PORT = 8031;
   private static final int RM3_PORT = 8033;
 
+  private static final int NUM_ITERATIONS = 50;
+
   private Configuration conf;
 
   private class TestProxy extends Proxy implements Closeable {
@@ -303,5 +308,73 @@ public class TestRMFailoverProxyProvider {
         .getProxy(any(YarnConfiguration.class), any(Class.class),
         eq(mockAdd3));
   }
+
+  @Test
+  public void testRandomSelectRouter() throws Exception {
+
+    // We design a test case like this:
+    // We have three routers (router1, router2, and router3),
+    // we enable Federation mode and random selection mode.
+    // After iterating 50 times, since the selection is random,
+    // each router should be selected more than 0 times,
+    // and the sum of the number of times each router is selected should be equal to 50.
+
+    final AtomicInteger router1Count = new AtomicInteger(0);
+    final AtomicInteger router2Count = new AtomicInteger(0);
+    final AtomicInteger router3Count = new AtomicInteger(0);
+
+    conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.FEDERATION_YARN_CLIENT_FAILOVER_RANDOM_ORDER, true);
+    conf.set(YarnConfiguration.RM_HA_IDS, "router0,router1,router2");
+
+    // Create two proxies and mock a RMProxy
+    Proxy mockRouterProxy = new TestProxy((proxy, method, args) -> null);
+
+    Class protocol = ApplicationClientProtocol.class;
+    RMProxy<Proxy> mockRMProxy = mock(RMProxy.class);
+    ConfiguredRMFailoverProxyProvider<Proxy> fpp = new ConfiguredRMFailoverProxyProvider<>();
+
+    // generate two address with different ports.
+    // Default port of yarn RM
+    InetSocketAddress mockRouterAdd = new InetSocketAddress(RM1_PORT);
+
+    // Mock RMProxy methods
+    when(mockRMProxy.getRMAddress(any(YarnConfiguration.class),
+        any(Class.class))).thenReturn(mockRouterAdd);
+    when(mockRMProxy.getProxy(any(YarnConfiguration.class),
+        any(Class.class), eq(mockRouterAdd))).thenReturn(mockRouterProxy);
+
+    // Initialize failover proxy provider and get proxy from it.
+    for (int i = 0; i < NUM_ITERATIONS; i++) {
+      fpp.init(conf, mockRMProxy, protocol);
+      FailoverProxyProvider.ProxyInfo<Proxy> proxy = fpp.getProxy();
+      if ("router0".equals(proxy.proxyInfo)) {
+        router1Count.incrementAndGet();
+      }
+      if ("router1".equals(proxy.proxyInfo)) {
+        router2Count.incrementAndGet();
+      }
+      if ("router2".equals(proxy.proxyInfo)) {
+        router3Count.incrementAndGet();
+      }
+    }
+
+    // router1Count、router2Count、router3Count are
+    // less than NUM_ITERATIONS
+    assertTrue(router1Count.get() < NUM_ITERATIONS);
+    assertTrue(router2Count.get() < NUM_ITERATIONS);
+    assertTrue(router3Count.get() < NUM_ITERATIONS);
+
+    // router1Count、router2Count、router3Count are
+    // more than NUM_ITERATIONS
+    assertTrue(router1Count.get() > 0);
+    assertTrue(router2Count.get() > 0);
+    assertTrue(router3Count.get() > 0);
+
+    // totals(router1Count+router2Count+router3Count ) should be equal NUM_ITERATIONS
+    int totalCount = router1Count.get() + router2Count.get() + router3Count.get();
+    assertEquals(NUM_ITERATIONS, totalCount);
+  }
 }
 

+ 46 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java

@@ -21,9 +21,12 @@ package org.apache.hadoop.yarn.client;
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,7 +60,7 @@ public class ConfiguredRMFailoverProxyProvider<T>
     this.protocol = protocol;
     this.rmProxy.checkAllowedProtocols(this.protocol);
     this.conf = new YarnConfiguration(configuration);
-    Collection<String> rmIds = HAUtil.getRMHAIds(conf);
+    Collection<String> rmIds = getRMIds(conf);
     this.rmServiceIds = rmIds.toArray(new String[rmIds.size()]);
     conf.set(YarnConfiguration.RM_HA_ID, rmServiceIds[currentProxyIndex]);
 
@@ -119,4 +122,46 @@ public class ConfiguredRMFailoverProxyProvider<T>
       }
     }
   }
+
+  /**
+   * Get the list of RM IDs.
+   *
+   * @param pConfiguration Configuration.
+   * @return rmId.
+   */
+  private Collection<String> getRMIds(Configuration pConfiguration) {
+    boolean isFederationEnabled = HAUtil.isFederationEnabled(pConfiguration);
+    if (!isFederationEnabled) {
+      return HAUtil.getRMHAIds(pConfiguration);
+    }
+    return getRandomOrderByRandomFlag(pConfiguration);
+  }
+
+  /**
+   * YARN Federation mode, the Router is considered as an RM for the client.
+   * We want the client to be able to randomly
+   * Select a Router and support failover when selecting a Router.
+   * The original code always started trying from the first
+   * Router when the client selected a Router,
+   * but this method will support random Router selection.
+   *
+   * For clusters that have not enabled Federation mode, the behavior remains unchanged.
+   *
+   * @param pConfiguration Configuration.
+   * @return rmIds
+   */
+  private Collection<String> getRandomOrderByRandomFlag(Configuration pConfiguration) {
+    Collection<String> rmIds = HAUtil.getRMHAIds(pConfiguration);
+    boolean isRandomOrder = pConfiguration.getBoolean(
+        YarnConfiguration.FEDERATION_YARN_CLIENT_FAILOVER_RANDOM_ORDER,
+        YarnConfiguration.DEFAULT_FEDERATION_YARN_CLIENT_FAILOVER_RANDOM_ORDER);
+    // If the Random option is not enabled, returns the configured array.
+    if (!isRandomOrder) {
+      return rmIds;
+    }
+    // If the Random option is enabled, returns an array of Random.
+    List<String> rmIdList = new ArrayList<>(rmIds);
+    Collections.shuffle(rmIdList);
+    return rmIdList;
+  }
 }

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -5250,4 +5250,15 @@
     <value>org.apache.hadoop.yarn.server.federation.cache.FederationJCache</value>
   </property>
 
+  <property>
+    <description>
+      After enabling YARN Federation mode,
+      clients can be allowed to randomly choose a Router and support FailOver.
+      By default, the configuration is set to false,
+      and clients start attempting from the first Router.
+    </description>
+    <name>yarn.federation.failover.random.order</name>
+    <value>false</value>
+  </property>
+
 </configuration>