Explorar o código

HDDS-845. Create a new raftClient instance for every watch request for Ratis. Contributed by Shashikant Banerjee.

Shashikant Banerjee %!s(int64=6) %!d(string=hai) anos
pai
achega
10cf5773ba

+ 2 - 1
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java

@@ -290,7 +290,8 @@ public class XceiverClientGrpc extends XceiverClientSpi {
 
   @Override
   public void watchForCommit(long index, long timeout)
-      throws InterruptedException, ExecutionException, TimeoutException {
+      throws InterruptedException, ExecutionException, TimeoutException,
+      IOException {
     // there is no notion of watch for commit index in standalone pipeline
   };
 

+ 29 - 9
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java

@@ -122,11 +122,15 @@ public final class XceiverClientRatis extends XceiverClientSpi {
   public void close() {
     final RaftClient c = client.getAndSet(null);
     if (c != null) {
-      try {
-        c.close();
-      } catch (IOException e) {
-        throw new IllegalStateException(e);
-      }
+      closeRaftClient(c);
+    }
+  }
+
+  private void closeRaftClient(RaftClient raftClient) {
+    try {
+      raftClient.close();
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
     }
   }
 
@@ -145,19 +149,35 @@ public final class XceiverClientRatis extends XceiverClientSpi {
 
   @Override
   public void watchForCommit(long index, long timeout)
-      throws InterruptedException, ExecutionException, TimeoutException {
-    // TODO: Create a new Raft client instance to watch
-    CompletableFuture<RaftClientReply> replyFuture = getClient()
+      throws InterruptedException, ExecutionException, TimeoutException,
+      IOException {
+    LOG.debug("commit index : {} watch timeout : {}", index, timeout);
+    // create a new RaftClient instance for watch request
+    RaftClient raftClient =
+        RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy);
+    CompletableFuture<RaftClientReply> replyFuture = raftClient
         .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
     try {
       replyFuture.get(timeout, TimeUnit.MILLISECONDS);
     } catch (TimeoutException toe) {
       LOG.warn("3 way commit failed ", toe);
-      getClient()
+
+      closeRaftClient(raftClient);
+      // generate a new raft client instance again so that next watch request
+      // does not get blocked for the previous one
+
+      // TODO : need to remove the code to create the new RaftClient instance
+      // here once the watch request bypassing sliding window in Raft Client
+      // gets fixed.
+      raftClient =
+          RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy);
+      raftClient
           .sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
           .get(timeout, TimeUnit.MILLISECONDS);
       LOG.info("Could not commit " + index + " to all the nodes."
           + "Committed by majority.");
+    } finally {
+      closeRaftClient(raftClient);
     }
   }
   /**

+ 2 - 1
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java

@@ -126,5 +126,6 @@ public abstract class XceiverClientSpi implements Closeable {
   public abstract HddsProtos.ReplicationType getPipelineType();
 
   public abstract void watchForCommit(long index, long timeout)
-      throws InterruptedException, ExecutionException, TimeoutException;
+      throws InterruptedException, ExecutionException, TimeoutException,
+      IOException;
 }