Browse Source

HADOOP-18446. [SBN read] Add a re-queue metric to RpcMetrics to quantify the number of re-queued RPCs (#4871)

Signed-off-by: Erik Krogen <xkrogen@apache.org>
Co-authored-by: zengqiang.xu <zengqiang.xu@shopee.com>
ZanderXu 2 years ago
parent
commit
a73c4804d8

+ 1 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -3156,6 +3156,7 @@ public abstract class Server {
         throws IOException, InterruptedException {
       try {
         internalQueueCall(call, false);
+        rpcMetrics.incrRequeueCalls();
       } catch (RpcServerException rse) {
         call.doResponse(rse.getCause(), rse.getRpcStatusProto());
       }

+ 18 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java

@@ -128,6 +128,8 @@ public class RpcMetrics {
   MutableCounterLong rpcClientBackoff;
   @Metric("Number of Slow RPC calls")
   MutableCounterLong rpcSlowCalls;
+  @Metric("Number of requeue calls")
+  MutableCounterLong rpcRequeueCalls;
 
   @Metric("Number of open connections") public int numOpenConnections() {
     return server.getNumOpenConnections();
@@ -304,6 +306,13 @@ public class RpcMetrics {
     rpcSlowCalls.incr();
   }
 
+  /**
+   * Increments the Requeue Calls counter.
+   */
+  public void incrRequeueCalls() {
+    rpcRequeueCalls.incr();
+  }
+
   /**
    * Returns a MutableRate Counter.
    * @return Mutable Rate
@@ -344,6 +353,15 @@ public class RpcMetrics {
     return rpcSlowCalls.value();
   }
 
+  /**
+   * Returns the number of requeue calls.
+   * @return long
+   */
+  @VisibleForTesting
+  public long getRpcRequeueCalls() {
+    return rpcRequeueCalls.value();
+  }
+
   public MutableRate getDeferredRpcProcessingTime() {
     return deferredRpcProcessingTime;
   }

+ 44 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java

@@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_EN
 import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState;
 import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.*;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
@@ -36,7 +37,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
@@ -61,9 +65,12 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer;
 import org.apache.hadoop.hdfs.server.namenode.TestFsck;
 import org.apache.hadoop.hdfs.tools.GetGroups;
 import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
+import org.apache.hadoop.ipc.metrics.RpcMetrics;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.junit.After;
@@ -124,6 +131,43 @@ public class TestObserverNode {
     }
   }
 
+  @Test
+  public void testObserverRequeue() throws Exception {
+    ScheduledExecutorService interruptor =
+        Executors.newScheduledThreadPool(1);
+
+    FSNamesystem observerFsNS = dfsCluster.getNamesystem(2);
+    RpcMetrics obRpcMetrics = ((NameNodeRpcServer)dfsCluster
+        .getNameNodeRpc(2)).getClientRpcServer().getRpcMetrics();
+    try {
+      // Stop EditlogTailer of Observer NameNode.
+      observerFsNS.getEditLogTailer().stop();
+      long oldRequeueNum = obRpcMetrics.getRpcRequeueCalls();
+      ScheduledFuture<FileStatus> scheduledFuture = interruptor.schedule(
+          () -> {
+            Path tmpTestPath = new Path("/TestObserverRequeue");
+            dfs.create(tmpTestPath, (short)1).close();
+            assertSentTo(0);
+            // This operation will be blocked in ObserverNameNode
+            // until EditlogTailer tailed edits from journalNode.
+            FileStatus fileStatus = dfs.getFileStatus(tmpTestPath);
+            assertSentTo(2);
+            return fileStatus;
+          }, 0, TimeUnit.SECONDS);
+
+      GenericTestUtils.waitFor(() -> obRpcMetrics.getRpcRequeueCalls() > oldRequeueNum,
+          50, 10000);
+
+      observerFsNS.getEditLogTailer().doTailEdits();
+      FileStatus fileStatus = scheduledFuture.get(10000, TimeUnit.MILLISECONDS);
+      assertNotNull(fileStatus);
+    } finally {
+      EditLogTailer editLogTailer = new EditLogTailer(observerFsNS, conf);
+      observerFsNS.setEditLogTailerForTests(editLogTailer);
+      editLogTailer.start();
+    }
+  }
+
   @Test
   public void testNoActiveToObserver() throws Exception {
     try {