Jelajahi Sumber

HDDS-554. In XceiverClientSpi, implement sendCommand(..) using sendCommandAsync(..). Contributed by Tsz Wo Nicholas Sze.

Nanda kumar 6 tahun lalu
induk
melakukan
3f6195045e

+ 0 - 23
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java

@@ -120,29 +120,6 @@ public class XceiverClientGrpc extends XceiverClientSpi {
     return pipeline;
   }
 
-  @Override
-  public ContainerCommandResponseProto sendCommand(
-      ContainerCommandRequestProto request) throws IOException {
-    try {
-      return sendCommandAsync(request).get();
-    } catch (ExecutionException | InterruptedException e) {
-      /**
-       * In case the grpc channel handler throws an exception,
-       * the exception thrown will be wrapped within {@link ExecutionException}.
-       * Unwarpping here so that original exception gets passed
-       * to to the client.
-       */
-      if (e instanceof ExecutionException) {
-        Throwable cause = e.getCause();
-        if (cause instanceof IOException) {
-          throw (IOException) cause;
-        }
-      }
-      throw new IOException(
-          "Unexpected exception during execution:" + e.getMessage());
-    }
-  }
-
   /**
    * Sends a given command to server gets a waitable future back.
    *

+ 2 - 30
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java

@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hdds.scm;
 
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.ratis.retry.RetryPolicy;
@@ -52,7 +51,6 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -183,20 +181,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     return Objects.requireNonNull(client.get(), "client is null");
   }
 
-  private RaftClientReply sendRequest(ContainerCommandRequestProto request)
-      throws IOException {
-    boolean isReadOnlyRequest = HddsUtils.isReadOnly(request);
-    ByteString byteString = request.toByteString();
-    LOG.debug("sendCommand {} {}", isReadOnlyRequest, request);
-    final RaftClientReply reply =  isReadOnlyRequest ?
-        getClient().sendReadOnly(() -> byteString) :
-        getClient().send(() -> byteString);
-    LOG.debug("reply {} {}", isReadOnlyRequest, reply);
-    return reply;
-  }
-
   private CompletableFuture<RaftClientReply> sendRequestAsync(
-      ContainerCommandRequestProto request) throws IOException {
+      ContainerCommandRequestProto request) {
     boolean isReadOnlyRequest = HddsUtils.isReadOnly(request);
     ByteString byteString = request.toByteString();
     LOG.debug("sendCommandAsync {} {}", isReadOnlyRequest, request);
@@ -204,19 +190,6 @@ public final class XceiverClientRatis extends XceiverClientSpi {
         getClient().sendAsync(() -> byteString);
   }
 
-  @Override
-  public ContainerCommandResponseProto sendCommand(
-      ContainerCommandRequestProto request) throws IOException {
-    final RaftClientReply reply = sendRequest(request);
-    if (reply == null) {
-      throw new IOException(
-          String.format("Could not execute the request %s", request));
-    }
-    Preconditions.checkState(reply.isSuccess());
-    return ContainerCommandResponseProto.parseFrom(
-        reply.getMessage().getContent());
-  }
-
   /**
    * Sends a given command to server gets a waitable future back.
    *
@@ -226,8 +199,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
    */
   @Override
   public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
-      ContainerCommandRequestProto request)
-      throws IOException, ExecutionException, InterruptedException {
+      ContainerCommandRequestProto request) {
     return sendRequestAsync(request).whenComplete((reply, e) ->
           LOG.debug("received reply {} for request: {} exception: {}", request,
               reply, e))

+ 8 - 2
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java

@@ -95,8 +95,14 @@ public abstract class XceiverClientSpi implements Closeable {
    * @return Response to the command
    * @throws IOException
    */
-  public abstract ContainerCommandResponseProto sendCommand(
-      ContainerCommandRequestProto request) throws IOException;
+  public ContainerCommandResponseProto sendCommand(
+      ContainerCommandRequestProto request) throws IOException {
+    try {
+      return sendCommandAsync(request).get();
+    } catch (ExecutionException | InterruptedException e) {
+      throw new IOException("Failed to command " + request, e);
+    }
+  }
 
   /**
    * Sends a given command to server gets a waitable future back.