浏览代码

HDFS-14146. [SBN read] Handle exceptions from and prevent handler threads from blocking within internalQueueCall. Contributed by Chao Sun.

Erik Krogen 6 年之前
父节点
当前提交
ef3e1929dd

+ 2 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ExternalCall.java

@@ -24,6 +24,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.ipc.Server.Call;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
 import org.apache.hadoop.security.UserGroupInformation;
 
 public abstract class ExternalCall<T> extends Call {
@@ -78,7 +79,7 @@ public abstract class ExternalCall<T> extends Call {
   }
 
   @Override
-  final void doResponse(Throwable t) {
+  final void doResponse(Throwable t, RpcStatusProto status) {
     synchronized(done) {
       error = t;
       done.set(true);

+ 32 - 7
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -820,7 +820,11 @@ public abstract class Server {
       }
     }
 
-    void doResponse(Throwable t) throws IOException {}
+    void doResponse(Throwable t) throws IOException {
+      doResponse(t, RpcStatusProto.FATAL);
+    }
+
+    void doResponse(Throwable t, RpcStatusProto proto) throws IOException {}
 
     // For Schedulable
     @Override
@@ -987,15 +991,17 @@ public abstract class Server {
     }
 
     @Override
-    void doResponse(Throwable t) throws IOException {
+    void doResponse(Throwable t, RpcStatusProto status) throws IOException {
       RpcCall call = this;
       if (t != null) {
+        if (status == null) {
+          status = RpcStatusProto.FATAL;
+        }
         // clone the call to prevent a race with another thread stomping
         // on the response while being sent.  the original call is
         // effectively discarded since the wait count won't hit zero
         call = new RpcCall(this);
-        setupResponse(call,
-            RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER,
+        setupResponse(call, status, RpcErrorCodeProto.ERROR_RPC_SERVER,
             null, t.getClass().getName(), StringUtils.stringifyException(t));
       } else {
         setupResponse(call, call.responseParams.returnStatus,
@@ -2749,8 +2755,18 @@ public abstract class Server {
 
   private void internalQueueCall(Call call)
       throws IOException, InterruptedException {
+    internalQueueCall(call, true);
+  }
+
+  private void internalQueueCall(Call call, boolean blocking)
+      throws IOException, InterruptedException {
     try {
-      callQueue.put(call); // queue the call; maybe blocked here
+      // queue the call, may be blocked if blocking is true.
+      if (blocking) {
+        callQueue.put(call);
+      } else {
+        callQueue.add(call);
+      }
     } catch (CallQueueOverflowException cqe) {
       // If rpc scheduler indicates back off based on performance degradation
       // such as response time or rpc queue is full, we will ask the client
@@ -2794,8 +2810,8 @@ public abstract class Server {
              * In case of Observer, it handles only reads, which are
              * commutative.
              */
-            //Re-queue the call and continue
-            internalQueueCall(call);
+            // Re-queue the call and continue
+            requeueCall(call);
             continue;
           }
           if (LOG.isDebugEnabled()) {
@@ -2837,6 +2853,15 @@ public abstract class Server {
       LOG.debug(Thread.currentThread().getName() + ": exiting");
     }
 
+    private void requeueCall(Call call)
+        throws IOException, InterruptedException {
+      try {
+        internalQueueCall(call, false);
+      } catch (RpcServerException rse) {
+        call.doResponse(rse.getCause(), rse.getRpcStatusProto());
+      }
+    }
+
   }
 
   @VisibleForTesting

+ 63 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java

@@ -25,12 +25,16 @@ import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.ipc.RpcScheduler;
+import org.apache.hadoop.ipc.Schedulable;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
 import org.junit.After;
@@ -84,6 +88,36 @@ public class TestConsistentReadsObserver {
     }
   }
 
+  @Test
+  public void testRequeueCall() throws Exception {
+    setObserverRead(true);
+
+    // Update the configuration just for the observer, by enabling
+    // IPC backoff and using the test scheduler class, which starts to backoff
+    // after certain number of calls.
+    final int observerIdx = 2;
+    NameNode nn = dfsCluster.getNameNode(observerIdx);
+    int port = nn.getNameNodeAddress().getPort();
+    Configuration configuration = dfsCluster.getConfiguration(observerIdx);
+    String prefix = CommonConfigurationKeys.IPC_NAMESPACE + "." + port + ".";
+    configuration.set(prefix + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY,
+        TestRpcScheduler.class.getName());
+    configuration.setBoolean(prefix
+        + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
+
+    dfsCluster.restartNameNode(observerIdx);
+    dfsCluster.transitionToObserver(observerIdx);
+
+    dfs.create(testPath, (short)1).close();
+    assertSentTo(0);
+
+    // Since we haven't tailed edit logs on the observer, it will fall behind
+    // and keep re-queueing the incoming request. Eventually, RPC backoff will
+    // be triggered and client should retry active NN.
+    dfs.getFileStatus(testPath);
+    assertSentTo(0);
+  }
+
   @Test
   public void testMsyncSimple() throws Exception {
     // 0 == not completed, 1 == succeeded, -1 == failed
@@ -169,4 +203,33 @@ public class TestConsistentReadsObserver {
     dfs = HATestUtil.configureObserverReadFs(
         dfsCluster, conf, ObserverReadProxyProvider.class, flag);
   }
+
+  /**
+   * A dummy test scheduler that starts backoff after a fixed number
+   * of requests.
+   */
+  public static class TestRpcScheduler implements RpcScheduler {
+    // Allow a number of RPCs to pass in order for the NN restart to succeed.
+    private int allowed = 10;
+    public TestRpcScheduler() {}
+
+    @Override
+    public int getPriorityLevel(Schedulable obj) {
+      return 0;
+    }
+
+    @Override
+    public boolean shouldBackOff(Schedulable obj) {
+      return --allowed < 0;
+    }
+
+    @Override
+    public void addResponseTime(String name, int priorityLevel, int queueTime,
+        int processingTime) {
+    }
+
+    @Override
+    public void stop() {
+    }
+  }
 }