|
@@ -0,0 +1,243 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.hdfs.server.federation.router;
|
|
|
+
|
|
|
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode;
|
|
|
+import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
|
+import static org.junit.Assert.assertTrue;
|
|
|
+import static org.junit.Assert.fail;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.net.URI;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
+import java.util.concurrent.Future;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.hdfs.DFSClient;
|
|
|
+import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
|
+import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
+import org.apache.hadoop.ipc.RemoteException;
|
|
|
+import org.apache.hadoop.ipc.StandbyException;
|
|
|
+import org.junit.After;
|
|
|
+import org.junit.Test;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
+
|
|
|
+/**
|
|
|
+ * Test the Router overload control which rejects requests when the RPC client
|
|
|
+ * is overloaded. This feature is managed by
|
|
|
+ * {@link RBFConfigKeys#DFS_ROUTER_CLIENT_REJECT_OVERLOAD}.
|
|
|
+ */
|
|
|
+public class TestRouterClientRejectOverload {
|
|
|
+
|
|
|
+ private static final Logger LOG =
|
|
|
+ LoggerFactory.getLogger(TestRouterClientRejectOverload.class);
|
|
|
+
|
|
|
+ private StateStoreDFSCluster cluster;
|
|
|
+
|
|
|
+ @After
|
|
|
+ public void cleanup() {
|
|
|
+ if (cluster != null) {
|
|
|
+ cluster.shutdown();
|
|
|
+ cluster = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void setupCluster(boolean overloadControl) throws Exception {
|
|
|
+ // Build and start a federated cluster
|
|
|
+ cluster = new StateStoreDFSCluster(false, 2);
|
|
|
+ Configuration routerConf = new RouterConfigBuilder()
|
|
|
+ .stateStore()
|
|
|
+ .metrics()
|
|
|
+ .admin()
|
|
|
+ .rpc()
|
|
|
+ .build();
|
|
|
+
|
|
|
+ // Reduce the number of RPC clients threads to overload the Router easy
|
|
|
+ routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 4);
|
|
|
+ // Overload control
|
|
|
+ routerConf.setBoolean(
|
|
|
+ RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD, overloadControl);
|
|
|
+
|
|
|
+ // No need for datanodes as we use renewLease() for testing
|
|
|
+ cluster.setNumDatanodesPerNameservice(0);
|
|
|
+
|
|
|
+ cluster.addRouterOverrides(routerConf);
|
|
|
+ cluster.startCluster();
|
|
|
+ cluster.startRouters();
|
|
|
+ cluster.waitClusterUp();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testWithoutOverloadControl() throws Exception {
|
|
|
+ setupCluster(false);
|
|
|
+
|
|
|
+ // Nobody should get overloaded
|
|
|
+ testOverloaded(0);
|
|
|
+
|
|
|
+ // Set subcluster 0 as slow
|
|
|
+ MiniDFSCluster dfsCluster = cluster.getCluster();
|
|
|
+ NameNode nn0 = dfsCluster.getNameNode(0);
|
|
|
+ simulateSlowNamenode(nn0, 1);
|
|
|
+
|
|
|
+ // Nobody should get overloaded, but it will be really slow
|
|
|
+ testOverloaded(0);
|
|
|
+
|
|
|
+ // No rejected requests expected
|
|
|
+ for (RouterContext router : cluster.getRouters()) {
|
|
|
+ FederationRPCMetrics rpcMetrics =
|
|
|
+ router.getRouter().getRpcServer().getRPCMetrics();
|
|
|
+ assertEquals(0, rpcMetrics.getProxyOpFailureClientOverloaded());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testOverloadControl() throws Exception {
|
|
|
+ setupCluster(true);
|
|
|
+
|
|
|
+ List<RouterContext> routers = cluster.getRouters();
|
|
|
+ FederationRPCMetrics rpcMetrics0 =
|
|
|
+ routers.get(0).getRouter().getRpcServer().getRPCMetrics();
|
|
|
+ FederationRPCMetrics rpcMetrics1 =
|
|
|
+ routers.get(1).getRouter().getRpcServer().getRPCMetrics();
|
|
|
+
|
|
|
+ // Nobody should get overloaded
|
|
|
+ testOverloaded(0);
|
|
|
+ assertEquals(0, rpcMetrics0.getProxyOpFailureClientOverloaded());
|
|
|
+ assertEquals(0, rpcMetrics1.getProxyOpFailureClientOverloaded());
|
|
|
+
|
|
|
+ // Set subcluster 0 as slow
|
|
|
+ MiniDFSCluster dfsCluster = cluster.getCluster();
|
|
|
+ NameNode nn0 = dfsCluster.getNameNode(0);
|
|
|
+ simulateSlowNamenode(nn0, 1);
|
|
|
+
|
|
|
+ // The subcluster should be overloaded now and reject 4-5 requests
|
|
|
+ testOverloaded(4, 6);
|
|
|
+ assertTrue(rpcMetrics0.getProxyOpFailureClientOverloaded()
|
|
|
+ + rpcMetrics1.getProxyOpFailureClientOverloaded() >= 4);
|
|
|
+
|
|
|
+ // Client using HA with 2 Routers
|
|
|
+ // A single Router gets overloaded, but 2 will handle it
|
|
|
+ Configuration clientConf = cluster.getRouterClientConf();
|
|
|
+
|
|
|
+ // Each Router should get a similar number of ops (>=8) out of 2*10
|
|
|
+ long iniProxyOps0 = rpcMetrics0.getProxyOps();
|
|
|
+ long iniProxyOps1 = rpcMetrics1.getProxyOps();
|
|
|
+ testOverloaded(0, 0, new URI("hdfs://fed/"), clientConf, 10);
|
|
|
+ long proxyOps0 = rpcMetrics0.getProxyOps() - iniProxyOps0;
|
|
|
+ long proxyOps1 = rpcMetrics1.getProxyOps() - iniProxyOps1;
|
|
|
+ assertEquals(2 * 10, proxyOps0 + proxyOps1);
|
|
|
+ assertTrue(proxyOps0 + " operations: not distributed", proxyOps0 >= 8);
|
|
|
+ assertTrue(proxyOps1 + " operations: not distributed", proxyOps1 >= 8);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void testOverloaded(int expOverload) throws Exception {
|
|
|
+ testOverloaded(expOverload, expOverload);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void testOverloaded(int expOverloadMin, int expOverloadMax)
|
|
|
+ throws Exception {
|
|
|
+ RouterContext routerContext = cluster.getRandomRouter();
|
|
|
+ URI address = routerContext.getFileSystemURI();
|
|
|
+ Configuration conf = new HdfsConfiguration();
|
|
|
+ testOverloaded(expOverloadMin, expOverloadMax, address, conf, 10);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test if the Router gets overloaded by submitting requests in parallel.
|
|
|
+ * We check how many requests got rejected at the end.
|
|
|
+ * @param expOverloadMin Min number of requests expected as overloaded.
|
|
|
+ * @param expOverloadMax Max number of requests expected as overloaded.
|
|
|
+ * @param address Destination address.
|
|
|
+ * @param conf Configuration of the client.
|
|
|
+ * @param numOps Number of operations to submit.
|
|
|
+ * @throws Exception If it cannot perform the test.
|
|
|
+ */
|
|
|
+ private void testOverloaded(int expOverloadMin, int expOverloadMax,
|
|
|
+ final URI address, final Configuration conf, final int numOps)
|
|
|
+ throws Exception {
|
|
|
+
|
|
|
+ // Submit renewLease() ops which go to all subclusters
|
|
|
+ final AtomicInteger overloadException = new AtomicInteger();
|
|
|
+ ExecutorService exec = Executors.newFixedThreadPool(numOps);
|
|
|
+ List<Future<?>> futures = new ArrayList<>();
|
|
|
+ for (int i = 0; i < numOps; i++) {
|
|
|
+ // Stagger the operations a little (50ms)
|
|
|
+ final int sleepTime = i * 50;
|
|
|
+ Future<?> future = exec.submit(new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ DFSClient routerClient = null;
|
|
|
+ try {
|
|
|
+ Thread.sleep(sleepTime);
|
|
|
+ routerClient = new DFSClient(address, conf);
|
|
|
+ String clientName = routerClient.getClientName();
|
|
|
+ ClientProtocol routerProto = routerClient.getNamenode();
|
|
|
+ routerProto.renewLease(clientName);
|
|
|
+ } catch (RemoteException re) {
|
|
|
+ IOException ioe = re.unwrapRemoteException();
|
|
|
+ assertTrue("Wrong exception: " + ioe,
|
|
|
+ ioe instanceof StandbyException);
|
|
|
+ assertExceptionContains("is overloaded", ioe);
|
|
|
+ overloadException.incrementAndGet();
|
|
|
+ } catch (IOException e) {
|
|
|
+ fail("Unexpected exception: " + e);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ fail("Cannot sleep: " + e);
|
|
|
+ } finally {
|
|
|
+ if (routerClient != null) {
|
|
|
+ try {
|
|
|
+ routerClient.close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Cannot close the client");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ futures.add(future);
|
|
|
+ }
|
|
|
+ // Wait until all the requests are done
|
|
|
+ while (!futures.isEmpty()) {
|
|
|
+ futures.remove(0).get();
|
|
|
+ }
|
|
|
+ exec.shutdown();
|
|
|
+
|
|
|
+ int num = overloadException.get();
|
|
|
+ if (expOverloadMin == expOverloadMax) {
|
|
|
+ assertEquals(expOverloadMin, num);
|
|
|
+ } else {
|
|
|
+ assertTrue("Expected >=" + expOverloadMin + " but was " + num,
|
|
|
+ num >= expOverloadMin);
|
|
|
+ assertTrue("Expected <=" + expOverloadMax + " but was " + num,
|
|
|
+ num <= expOverloadMax);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|