Просмотр исходного кода

HDFS-11580. Ozone: Support asynchronus client API for SCM and containers. Contributed by Anu Engineer.

Anu Engineer 8 лет назад
Родитель
Сommit
92945d01b2

+ 30 - 3
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java

@@ -35,6 +35,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 
 
 /**
 /**
@@ -51,6 +53,7 @@ public class XceiverClient extends XceiverClientSpi {
   /**
   /**
    * Constructs a client that can communicate with the Container framework on
    * Constructs a client that can communicate with the Container framework on
    * data nodes.
    * data nodes.
+   *
    * @param pipeline - Pipeline that defines the machines.
    * @param pipeline - Pipeline that defines the machines.
    * @param config -- Ozone Config
    * @param config -- Ozone Config
    */
    */
@@ -91,6 +94,7 @@ public class XceiverClient extends XceiverClientSpi {
 
 
   /**
   /**
    * Returns if the exceiver client connects to a server.
    * Returns if the exceiver client connects to a server.
+   *
    * @return True if the connection is alive, false otherwise.
    * @return True if the connection is alive, false otherwise.
    */
    */
   @VisibleForTesting
   @VisibleForTesting
@@ -100,7 +104,7 @@ public class XceiverClient extends XceiverClientSpi {
 
 
   @Override
   @Override
   public void close() {
   public void close() {
-    if(group != null) {
+    if (group != null) {
       group.shutdownGracefully(0, 0, TimeUnit.SECONDS);
       group.shutdownGracefully(0, 0, TimeUnit.SECONDS);
     }
     }
 
 
@@ -118,12 +122,35 @@ public class XceiverClient extends XceiverClientSpi {
   public ContainerProtos.ContainerCommandResponseProto sendCommand(
   public ContainerProtos.ContainerCommandResponseProto sendCommand(
       ContainerProtos.ContainerCommandRequestProto request)
       ContainerProtos.ContainerCommandRequestProto request)
       throws IOException {
       throws IOException {
-    if((channelFuture == null) || (!channelFuture.channel().isActive())) {
+    try {
+    if ((channelFuture == null) || (!channelFuture.channel().isActive())) {
       throw new IOException("This channel is not connected.");
       throw new IOException("This channel is not connected.");
     }
     }
     XceiverClientHandler handler =
     XceiverClientHandler handler =
         channelFuture.channel().pipeline().get(XceiverClientHandler.class);
         channelFuture.channel().pipeline().get(XceiverClientHandler.class);
 
 
-    return handler.sendCommand(request);
+      return handler.sendCommand(request);
+    } catch (ExecutionException | InterruptedException e) {
+      throw new IOException("Unexpected exception during execution", e);
+    }
+  }
+
+  /**
+   * Sends a given command to server gets a waitable future back.
+   *
+   * @param request Request
+   * @return Response to the command
+   * @throws IOException
+   */
+  @Override
+  public CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+    sendCommandAsync(ContainerProtos.ContainerCommandRequestProto request)
+      throws IOException, ExecutionException, InterruptedException {
+      if ((channelFuture == null) || (!channelFuture.channel().isActive())) {
+        throw new IOException("This channel is not connected.");
+      }
+      XceiverClientHandler handler =
+          channelFuture.channel().pipeline().get(XceiverClientHandler.class);
+      return handler.sendCommandAsync(request);
   }
   }
 }
 }

+ 57 - 24
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java

@@ -17,26 +17,36 @@
  */
  */
 package org.apache.hadoop.scm;
 package org.apache.hadoop.scm;
 
 
+import com.google.common.base.Preconditions;
 import io.netty.channel.Channel;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.SimpleChannelInboundHandler;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
 
 
 /**
 /**
  * Netty client handler.
  * Netty client handler.
  */
  */
 public class XceiverClientHandler extends
 public class XceiverClientHandler extends
-    SimpleChannelInboundHandler<ContainerProtos.ContainerCommandResponseProto> {
+    SimpleChannelInboundHandler<ContainerCommandResponseProto> {
 
 
   static final Logger LOG = LoggerFactory.getLogger(XceiverClientHandler.class);
   static final Logger LOG = LoggerFactory.getLogger(XceiverClientHandler.class);
-  private final BlockingQueue<ContainerProtos.ContainerCommandResponseProto>
-      responses = new LinkedBlockingQueue<>();
+  private final ConcurrentMap<String,
+      CompletableFuture<ContainerCommandResponseProto>> responses =
+      new ConcurrentHashMap<>();
+
   private final Pipeline pipeline;
   private final Pipeline pipeline;
   private volatile Channel channel;
   private volatile Channel channel;
 
 
@@ -56,15 +66,24 @@ public class XceiverClientHandler extends
    * .ContainerCommandResponseProto}.
    * .ContainerCommandResponseProto}.
    *
    *
    * @param ctx the {@link ChannelHandlerContext} which this {@link
    * @param ctx the {@link ChannelHandlerContext} which this {@link
-   *            SimpleChannelInboundHandler} belongs to
+   * SimpleChannelInboundHandler} belongs to
    * @param msg the message to handle
    * @param msg the message to handle
    * @throws Exception is thrown if an error occurred
    * @throws Exception is thrown if an error occurred
    */
    */
   @Override
   @Override
   public void channelRead0(ChannelHandlerContext ctx,
   public void channelRead0(ChannelHandlerContext ctx,
-                           ContainerProtos.ContainerCommandResponseProto msg)
+      ContainerProtos.ContainerCommandResponseProto msg)
       throws Exception {
       throws Exception {
-    responses.add(msg);
+    Preconditions.checkNotNull(msg);
+    String key = msg.getTraceID();
+    CompletableFuture<ContainerCommandResponseProto> future =
+        responses.remove(key);
+    if (future != null) {
+      future.complete(msg);
+    } else {
+      LOG.error("A reply received for message that was not queued. trace " +
+          "ID: {}", msg.getTraceID());
+    }
   }
   }
 
 
   @Override
   @Override
@@ -88,25 +107,39 @@ public class XceiverClientHandler extends
    * @param request - request.
    * @param request - request.
    * @return -- response
    * @return -- response
    */
    */
-  public ContainerProtos.ContainerCommandResponseProto
-      sendCommand(ContainerProtos.ContainerCommandRequestProto request) {
 
 
-    ContainerProtos.ContainerCommandResponseProto response;
-    channel.writeAndFlush(request);
-    boolean interrupted = false;
-    for (;;) {
-      try {
-        response = responses.take();
-        break;
-      } catch (InterruptedException ignore) {
-        interrupted = true;
-      }
-    }
+  public ContainerCommandResponseProto
+    sendCommand(ContainerProtos.ContainerCommandRequestProto request)
+      throws ExecutionException, InterruptedException {
+    Future<ContainerCommandResponseProto> future = sendCommandAsync(request);
+    return future.get();
+  }
 
 
-    if (interrupted) {
-      Thread.currentThread().interrupt();
+  /**
+   * SendCommandAsyc queues a command to the Netty Subsystem and returns a
+   * CompletableFuture. This Future is marked compeleted in the channelRead0
+   * when the call comes back.
+   * @param request - Request to execute
+   * @return CompletableFuture
+   */
+  public CompletableFuture<ContainerCommandResponseProto>
+    sendCommandAsync(ContainerProtos.ContainerCommandRequestProto request) {
+    CompletableFuture<ContainerCommandResponseProto> response =
+        new CompletableFuture<>();
+
+    CompletableFuture<ContainerCommandResponseProto> previous =
+        responses.putIfAbsent(request.getTraceID(), response);
+
+    if (previous != null) {
+      LOG.error("Command with Trace already exists. Ignoring this command. " +
+              "{}. Previous Command: {}", request.getTraceID(),
+          previous.toString());
+      throw new IllegalStateException("Duplicate trace ID. Command with this " +
+          "trace ID is already executing. Please ensure that " +
+          "trace IDs are not reused. ID: " + request.getTraceID());
     }
     }
+
+    channel.writeAndFlush(request);
     return response;
     return response;
   }
   }
-
 }
 }

+ 17 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.scm;
 
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
 import org.apache.ratis.RatisHelper;
 import org.apache.ratis.RatisHelper;
@@ -34,6 +35,8 @@ import org.slf4j.LoggerFactory;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.util.Objects;
 import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReference;
 
 
 /**
 /**
@@ -103,4 +106,18 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     return ContainerCommandResponseProto.parseFrom(
     return ContainerCommandResponseProto.parseFrom(
         ShadedProtoUtil.asByteString(reply.getMessage().getContent()));
         ShadedProtoUtil.asByteString(reply.getMessage().getContent()));
   }
   }
+
+  /**
+   * Sends a given command to server gets a waitable future back.
+   *
+   * @param request Request
+   * @return Response to the command
+   * @throws IOException
+   */
+  @Override
+  public CompletableFuture<ContainerCommandResponseProto>
+    sendCommandAsync(ContainerCommandRequestProto request)
+      throws IOException, ExecutionException, InterruptedException {
+    throw new IOException("Not implemented");
+  }
 }
 }

+ 13 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientSpi.java

@@ -27,6 +27,8 @@ import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 
 
 import java.io.Closeable;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicInteger;
 
 
 /**
 /**
@@ -94,4 +96,15 @@ public abstract class XceiverClientSpi implements Closeable {
    */
    */
   public abstract ContainerCommandResponseProto sendCommand(
   public abstract ContainerCommandResponseProto sendCommand(
       ContainerCommandRequestProto request) throws IOException;
       ContainerCommandRequestProto request) throws IOException;
+
+  /**
+   * Sends a given command to server gets a waitable future back.
+   * @param request Request
+   * @return Response to the command
+   * @throws IOException
+   */
+  public abstract CompletableFuture<ContainerCommandResponseProto>
+    sendCommandAsync(ContainerCommandRequestProto request) throws IOException,
+      ExecutionException, InterruptedException;
+
 }
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java

@@ -59,7 +59,7 @@ import org.apache.hadoop.scm.XceiverClientSpi;
  * Implementation of all container protocol calls performed by Container
  * Implementation of all container protocol calls performed by Container
  * clients.
  * clients.
  */
  */
-public final class ContainerProtocolCalls {
+public final class ContainerProtocolCalls  {
 
 
   /**
   /**
    * There is no need to instantiate this class.
    * There is no need to instantiate this class.

+ 7 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java

@@ -248,6 +248,7 @@ public final class ContainerTestHelper {
         ContainerCommandRequestProto.newBuilder();
         ContainerCommandRequestProto.newBuilder();
     request.setCmdType(ContainerProtos.Type.PutSmallFile);
     request.setCmdType(ContainerProtos.Type.PutSmallFile);
     request.setPutSmallFile(smallFileRequest);
     request.setPutSmallFile(smallFileRequest);
+    request.setTraceID(UUID.randomUUID().toString());
     return request.build();
     return request.build();
   }
   }
 
 
@@ -517,7 +518,8 @@ public final class ContainerTestHelper {
             pipeline.getProtobufMessage()).build();
             pipeline.getProtobufMessage()).build();
     ContainerProtos.ContainerCommandRequestProto cmd =
     ContainerProtos.ContainerCommandRequestProto cmd =
         ContainerCommandRequestProto.newBuilder().setCmdType(ContainerProtos
         ContainerCommandRequestProto.newBuilder().setCmdType(ContainerProtos
-            .Type.CloseContainer).setCloseContainer(closeReqeuest).build();
+            .Type.CloseContainer).setCloseContainer(closeReqeuest)
+            .build();
     return cmd;
     return cmd;
   }
   }
 
 
@@ -533,7 +535,9 @@ public final class ContainerTestHelper {
         ContainerProtos.DeleteContainerRequestProto.newBuilder().setName(
         ContainerProtos.DeleteContainerRequestProto.newBuilder().setName(
             pipeline.getContainerName()).setPipeline(
             pipeline.getContainerName()).setPipeline(
             pipeline.getProtobufMessage()).setForceDelete(forceDelete).build();
             pipeline.getProtobufMessage()).setForceDelete(forceDelete).build();
-    return ContainerCommandRequestProto.newBuilder().setCmdType(ContainerProtos
-        .Type.DeleteContainer).setDeleteContainer(deleteRequest).build();
+    return ContainerCommandRequestProto.newBuilder()
+        .setCmdType(ContainerProtos.Type.DeleteContainer)
+        .setDeleteContainer(deleteRequest)
+        .build();
   }
   }
 }
 }

+ 66 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java

@@ -28,14 +28,17 @@ import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.apache.hadoop.scm.XceiverClient;
 import org.apache.hadoop.scm.XceiverClient;
 import org.apache.hadoop.scm.XceiverClientSpi;
 import org.apache.hadoop.scm.XceiverClientSpi;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 import org.junit.rules.Timeout;
 
 
 import java.util.HashMap;
 import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Map;
-
+import java.util.concurrent.CompletableFuture;
 
 
 /**
 /**
  * Tests ozone containers.
  * Tests ozone containers.
@@ -226,7 +229,6 @@ public class TestOzoneContainer {
       final ContainerProtos.ContainerCommandRequestProto smallFileRequest
       final ContainerProtos.ContainerCommandRequestProto smallFileRequest
           = ContainerTestHelper.getWriteSmallFileRequest(
           = ContainerTestHelper.getWriteSmallFileRequest(
           client.getPipeline(), containerName, keyName, 1024);
           client.getPipeline(), containerName, keyName, 1024);
-
       ContainerProtos.ContainerCommandResponseProto response
       ContainerProtos.ContainerCommandResponseProto response
           = client.sendCommand(smallFileRequest);
           = client.sendCommand(smallFileRequest);
       Assert.assertNotNull(response);
       Assert.assertNotNull(response);
@@ -247,6 +249,8 @@ public class TestOzoneContainer {
     }
     }
   }
   }
 
 
+
+
   @Test
   @Test
   public void testCloseContainer() throws Exception {
   public void testCloseContainer() throws Exception {
     MiniOzoneCluster cluster = null;
     MiniOzoneCluster cluster = null;
@@ -415,6 +419,66 @@ public class TestOzoneContainer {
     }
     }
   }
   }
 
 
+
+  // Runs a set of commands as Async calls and verifies that calls indeed worked
+  // as expected.
+  static void runAsyncTests(
+      String containerName, XceiverClientSpi client) throws Exception {
+    try {
+      client.connect();
+
+      createContainerForTesting(client, containerName);
+      final List<CompletableFuture> computeResults = new LinkedList<>();
+      int requestCount = 1000;
+      // Create a bunch of Async calls from this test.
+      for(int x = 0; x <requestCount; x++) {
+        String keyName = OzoneUtils.getRequestID();
+        final ContainerProtos.ContainerCommandRequestProto smallFileRequest
+            = ContainerTestHelper.getWriteSmallFileRequest(
+            client.getPipeline(), containerName, keyName, 1024);
+
+        CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+            response = client.sendCommandAsync(smallFileRequest);
+        computeResults.add(response);
+      }
+
+      CompletableFuture<Void> combinedFuture =
+          CompletableFuture.allOf(computeResults.toArray(
+              new CompletableFuture[computeResults.size()]));
+      // Wait for all futures to complete.
+      combinedFuture.get();
+      // Assert that all futures are indeed done.
+      for (CompletableFuture future : computeResults) {
+        Assert.assertTrue(future.isDone());
+      }
+    } finally {
+      if (client != null) {
+        client.close();
+      }
+    }
+  }
+
+  @Test
+  public void testXcieverClientAsync() throws Exception {
+    MiniOzoneCluster cluster = null;
+    XceiverClient client = null;
+    try {
+      OzoneConfiguration conf = newOzoneConfiguration();
+
+      client = createClientForTesting(conf);
+      cluster = new MiniOzoneCluster.Builder(conf)
+          .setRandomContainerPort(false)
+          .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
+      String containerName = client.getPipeline().getContainerName();
+      runAsyncTests(containerName, client);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+
   private static XceiverClient createClientForTesting(OzoneConfiguration conf)
   private static XceiverClient createClientForTesting(OzoneConfiguration conf)
       throws Exception {
       throws Exception {
     String containerName = OzoneUtils.getRequestID();
     String containerName = OzoneUtils.getRequestID();