소스 검색

HDFS-17265. RBF: Throwing an exception prevents the permit from being released when using FairnessPolicyController (#6298)

Jian Zhang 1 년 전
부모
커밋
29b0a6c28d

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java

@@ -89,7 +89,8 @@ public class AbstractRouterRpcFairnessPolicyController
     this.permits.put(nsId, new Semaphore(maxPermits));
   }
 
-  protected int getAvailablePermits(String nsId) {
+  @Override
+  public int getAvailablePermits(String nsId) {
     return this.permits.get(nsId).availablePermits();
   }
 

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java

@@ -51,4 +51,9 @@ public class NoRouterRpcFairnessPolicyController implements
   public String getAvailableHandlerOnPerNs(){
     return "N/A";
   }
+
+  @Override
+  public int getAvailablePermits(String nsId) {
+    return 0;
+  }
 }

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java

@@ -69,4 +69,12 @@ public interface RouterRpcFairnessPolicyController {
    * @return the JSON string of the available handler for each name service.
    */
   String getAvailableHandlerOnPerNs();
+
+  /**
+   * Returns the available handler for each name service.
+   *
+   * @param nsId name service id.
+   * @return the available handler for each name service.
+   */
+  int getAvailablePermits(String nsId);
 }

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java

@@ -1119,10 +1119,10 @@ public class RouterRpcClient {
     // Invoke in priority order
     for (final RemoteLocationContext loc : locations) {
       String ns = loc.getNameserviceId();
-      acquirePermit(ns, ugi, remoteMethod, controller);
       boolean isObserverRead = isObserverReadEligible(ns, m);
       List<? extends FederationNamenodeContext> namenodes =
           getOrderedNamenodes(ns, isObserverRead);
+      acquirePermit(ns, ugi, remoteMethod, controller);
       try {
         Class<?> proto = remoteMethod.getProtocol();
         Object[] params = remoteMethod.getParams(loc);
@@ -1482,11 +1482,11 @@ public class RouterRpcClient {
       // Shortcut, just one call
       T location = locations.iterator().next();
       String ns = location.getNameserviceId();
-      RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
-      acquirePermit(ns, ugi, method, controller);
       boolean isObserverRead = isObserverReadEligible(ns, m);
       final List<? extends FederationNamenodeContext> namenodes =
           getOrderedNamenodes(ns, isObserverRead);
+      RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
+      acquirePermit(ns, ugi, method, controller);
       try {
         Class<?> proto = method.getProtocol();
         Object[] paramList = method.getParams(location);

+ 56 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java

@@ -20,9 +20,13 @@ package org.apache.hadoop.hdfs.server.federation.fairness;
 import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
@@ -32,9 +36,14 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
 import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
 import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
 import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.test.LambdaTestUtils;
 import org.junit.After;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -100,6 +109,53 @@ public class TestRouterHandlersFairness {
     startLoadTest(true);
   }
 
+  /**
+   * Ensure that the semaphore is not acquired,
+   * when invokeSequential or invokeConcurrent throws any exception.
+   */
+  @Test
+  public void testReleasedWhenExceptionOccurs() throws Exception{
+    setupCluster(true, false);
+    RouterContext routerContext = cluster.getRandomRouter();
+    RouterRpcClient rpcClient =
+        routerContext.getRouter().getRpcServer().getRPCClient();
+    // Mock an ActiveNamenodeResolver and inject it into RouterRpcClient,
+    // so RouterRpcClient's getOrderedNamenodes method will report an exception.
+    ActiveNamenodeResolver mockNamenodeResolver = mock(ActiveNamenodeResolver.class);
+    Field field = rpcClient.getClass().getDeclaredField("namenodeResolver");
+    field.setAccessible(true);
+    field.set(rpcClient, mockNamenodeResolver);
+
+    // Use getFileInfo test invokeSequential.
+    DFSClient client = routerContext.getClient();
+    int availablePermits =
+        rpcClient.getRouterRpcFairnessPolicyController().getAvailablePermits("ns0");
+    LambdaTestUtils.intercept(IOException.class,  () -> {
+      LOG.info("Use getFileInfo test invokeSequential.");
+      client.getFileInfo("/test.txt");
+    });
+    // Ensure that the semaphore is not acquired.
+    assertEquals(availablePermits,
+        rpcClient.getRouterRpcFairnessPolicyController().getAvailablePermits("ns0"));
+
+    // Use renewLease test invokeConcurrent.
+    Collection<RemoteLocation> locations = new ArrayList<>();
+    locations.add(new RemoteLocation("ns0", "/", "/"));
+    RemoteMethod renewLease = new RemoteMethod(
+        "renewLease",
+        new Class[]{java.lang.String.class, java.util.List.class},
+        new Object[]{null, null});
+    availablePermits =
+        rpcClient.getRouterRpcFairnessPolicyController().getAvailablePermits("ns0");
+    LambdaTestUtils.intercept(IOException.class,  () -> {
+      LOG.info("Use renewLease test invokeConcurrent.");
+      rpcClient.invokeConcurrent(locations, renewLease);
+    });
+    // Ensure that the semaphore is not acquired.
+    assertEquals(availablePermits,
+        rpcClient.getRouterRpcFairnessPolicyController().getAvailablePermits("ns0"));
+  }
+
   /**
    * Start a generic load test as a client against a cluster which has either
    * fairness configured or not configured. Test will spawn a set of 100