Bladeren bron

HADOOP-19509. Add a config entry to make IPC.Client checkAsyncCall off by default. (#7521)

hfutatzhanghb 2 maanden geleden
bovenliggende
commit
d26f4fdf3a

+ 21 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

@@ -202,9 +202,24 @@ public class Client implements AutoCloseable {
   private final boolean fallbackAllowed;
   private final boolean bindToWildCardAddress;
   private final byte[] clientId;
-  private final int maxAsyncCalls;
+  private int maxAsyncCalls;
   private final AtomicInteger asyncCallCounter = new AtomicInteger(0);
 
+  @VisibleForTesting
+  public int getAsyncCallCounter() {
+    return asyncCallCounter.get();
+  }
+
+  @VisibleForTesting
+  public void setMaxAsyncCalls(int limits) {
+    this.maxAsyncCalls = limits;
+  }
+
+  @VisibleForTesting
+  public boolean isAsyncCallCheckEabled() {
+    return maxAsyncCalls >= 0;
+  }
+
   /**
    * set the ping interval value in configuration
    * 
@@ -1460,7 +1475,7 @@ public class Client implements AutoCloseable {
   }
 
   private void checkAsyncCall() throws IOException {
-    if (isAsynchronousMode()) {
+    if (isAsynchronousMode() && isAsyncCallCheckEabled()) {
       if (asyncCallCounter.incrementAndGet() > maxAsyncCalls) {
         String errMsg = String.format(
             "Exceeded limit of max asynchronous calls: %d, " +
@@ -1518,7 +1533,7 @@ public class Client implements AutoCloseable {
         throw ioe;
       }
     } catch (Exception e) {
-      if (isAsynchronousMode()) {
+      if (isAsynchronousMode() && isAsyncCallCheckEabled()) {
         releaseAsyncCall();
       }
       throw e;
@@ -1527,7 +1542,9 @@ public class Client implements AutoCloseable {
     if (isAsynchronousMode()) {
       CompletableFuture<Writable> result = call.rpcResponseFuture.handle(
           (rpcResponse, e) -> {
-            releaseAsyncCall();
+            if (isAsyncCallCheckEabled()) {
+              releaseAsyncCall();
+            }
             if (e != null) {
               IOException ioe = (IOException) e;
               throw new CompletionException(warpIOException(ioe, connection));

+ 1 - 0
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -2409,6 +2409,7 @@ The switch to turn S3A auditing on or off.
   <value>100</value>
   <description>
     Define the maximum number of outstanding async calls.
+    If negative, there is no limit on the number of outstanding async calls.
   </description>
 </property>
 

+ 37 - 16
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java

@@ -38,7 +38,11 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -65,7 +69,7 @@ public class TestAsyncIPC {
     conf = new Configuration();
     conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 10000);
     Client.setPingInterval(conf, TestIPC.PING_INTERVAL);
-    // set asynchronous mode for main thread
+    // Set asynchronous mode for main thread.
     Client.setAsynchronousMode(true);
   }
 
@@ -78,17 +82,22 @@ public class TestAsyncIPC {
         new HashMap<Integer, Future<LongWritable>>();
     Map<Integer, Long> expectedValues = new HashMap<Integer, Long>();
 
-    public AsyncCaller(Client client, InetSocketAddress server, int count) {
+    AsyncCaller(Client client, InetSocketAddress server, int count,
+        boolean checkAsyncCallEnabled) {
       this.client = client;
+      // Disable checkAsyncCall.
+      if (!checkAsyncCallEnabled) {
+        this.client.setMaxAsyncCalls(-1);
+      }
       this.server = server;
       this.count = count;
-      // set asynchronous mode, since AsyncCaller extends Thread
+      // Set asynchronous mode, since AsyncCaller extends Thread.
       Client.setAsynchronousMode(true);
     }
 
     @Override
     public void run() {
-      // in case Thread#Start is called, which will spawn new thread
+      // In case Thread#Start is called, which will spawn new thread.
       Client.setAsynchronousMode(true);
       for (int i = 0; i < count; i++) {
         try {
@@ -227,7 +236,7 @@ public class TestAsyncIPC {
       this.client = client;
       this.server = server;
       this.count = count;
-      // set asynchronous mode, since AsyncLimitlCaller extends Thread
+      // Set asynchronous mode, since AsyncLimitlCaller extends Thread.
       Client.setAsynchronousMode(true);
       this.callerId = callerId;
     }
@@ -285,12 +294,19 @@ public class TestAsyncIPC {
     }
   }
 
+  @Test
+  @Timeout(value = 60)
+  public void testAsyncCallCheckDisabled() throws IOException, InterruptedException,
+      ExecutionException {
+    internalTestAsyncCall(3, true, 2, 5, 10, false);
+  }
+
   @Test
   @Timeout(value = 60)
   public void testAsyncCall() throws IOException, InterruptedException,
       ExecutionException {
-    internalTestAsyncCall(3, false, 2, 5, 100);
-    internalTestAsyncCall(3, true, 2, 5, 10);
+    internalTestAsyncCall(3, false, 2, 5, 100, true);
+    internalTestAsyncCall(3, true, 2, 5, 10, true);
   }
 
   @Test
@@ -301,7 +317,8 @@ public class TestAsyncIPC {
   }
 
   public void internalTestAsyncCall(int handlerCount, boolean handlerSleep,
-      int clientCount, int callerCount, int callCount) throws IOException,
+      int clientCount, int callerCount, int callCount,
+      boolean checkAsyncCallEnabled) throws IOException,
       InterruptedException, ExecutionException {
     Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
     InetSocketAddress addr = NetUtils.getConnectAddress(server);
@@ -314,10 +331,14 @@ public class TestAsyncIPC {
 
     AsyncCaller[] callers = new AsyncCaller[callerCount];
     for (int i = 0; i < callerCount; i++) {
-      callers[i] = new AsyncCaller(clients[i % clientCount], addr, callCount);
+      callers[i] = new AsyncCaller(clients[i % clientCount], addr, callCount,
+          checkAsyncCallEnabled);
       callers[i].start();
     }
     for (int i = 0; i < callerCount; i++) {
+      if (!checkAsyncCallEnabled) {
+        assertEquals(0, clients[i % clientCount].getAsyncCallCounter());
+      }
       callers[i].join();
       callers[i].assertReturnValues();
     }
@@ -340,7 +361,7 @@ public class TestAsyncIPC {
     int asyncCallCount = client.getAsyncCallCount();
 
     try {
-      AsyncCaller caller = new AsyncCaller(client, addr, callCount);
+      AsyncCaller caller = new AsyncCaller(client, addr, callCount, true);
       caller.run();
       caller.assertReturnValues();
       caller.assertReturnValues();
@@ -364,7 +385,7 @@ public class TestAsyncIPC {
     final Client client = new Client(LongWritable.class, conf);
 
     try {
-      final AsyncCaller caller = new AsyncCaller(client, addr, 10);
+      final AsyncCaller caller = new AsyncCaller(client, addr, 10, true);
       caller.run();
       caller.assertReturnValues(10, TimeUnit.MILLISECONDS);
     } finally {
@@ -463,7 +484,7 @@ public class TestAsyncIPC {
     try {
       InetSocketAddress addr = NetUtils.getConnectAddress(server);
       server.start();
-      final AsyncCaller caller = new AsyncCaller(client, addr, 4);
+      final AsyncCaller caller = new AsyncCaller(client, addr, 4, true);
       caller.run();
       caller.assertReturnValues();
     } finally {
@@ -501,7 +522,7 @@ public class TestAsyncIPC {
     try {
       InetSocketAddress addr = NetUtils.getConnectAddress(server);
       server.start();
-      final AsyncCaller caller = new AsyncCaller(client, addr, 10);
+      final AsyncCaller caller = new AsyncCaller(client, addr, 10, true);
       caller.run();
       caller.assertReturnValues();
     } finally {
@@ -538,7 +559,7 @@ public class TestAsyncIPC {
     try {
       InetSocketAddress addr = NetUtils.getConnectAddress(server);
       server.start();
-      final AsyncCaller caller = new AsyncCaller(client, addr, 10);
+      final AsyncCaller caller = new AsyncCaller(client, addr, 10, true);
       caller.run();
       caller.assertReturnValues();
     } finally {
@@ -580,7 +601,7 @@ public class TestAsyncIPC {
       server.start();
       AsyncCaller[] callers = new AsyncCaller[callerCount];
       for (int i = 0; i < callerCount; ++i) {
-        callers[i] = new AsyncCaller(client, addr, perCallerCallCount);
+        callers[i] = new AsyncCaller(client, addr, perCallerCallCount, true);
         callers[i].start();
       }
       for (int i = 0; i < callerCount; ++i) {