Browse Source

HDFS-12986. Ozone: Update ozone to latest ratis snapshot build (0.1.1-alpha-0f7169d-SNAPSHOT). Contributed by Lokesh Jain

Tsz-Wo Nicholas Sze 7 years ago
parent
commit
ba4d5a52a8

+ 1 - 0
hadoop-client-modules/hadoop-client-runtime/pom.xml

@@ -157,6 +157,7 @@
                       <!-- Leave javax APIs that are stable -->
                       <!-- the jdk ships part of the javax.annotation namespace, so if we want to relocate this we'll have to care it out by class :( -->
                       <exclude>com.google.code.findbugs:jsr305</exclude>
+                      <exclude>io.dropwizard.metrics:metrics-core</exclude>
                     </excludes>
                   </artifactSet>
                   <filters>

+ 31 - 8
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.scm;
 
 import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
@@ -42,6 +43,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -68,7 +70,9 @@ public final class XceiverClientRatis extends XceiverClientSpi {
   private final AtomicReference<RaftClient> client = new AtomicReference<>();
   private final int maxOutstandingRequests;
 
-  /** Constructs a client. */
+  /**
+   * Constructs a client.
+   */
   private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
       int maxOutStandingChunks) {
     super();
@@ -78,7 +82,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
   }
 
   /**
-   *  {@inheritDoc}
+   * {@inheritDoc}
    */
   public void createPipeline(String clusterId, List<DatanodeID> datanodes)
       throws IOException {
@@ -90,6 +94,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
 
   /**
    * Returns Ratis as pipeline Type.
+   *
    * @return - Ratis
    */
   @Override
@@ -97,8 +102,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     return OzoneProtos.ReplicationType.RATIS;
   }
 
-  private void reinitialize(
-      List<DatanodeID> datanodes, RaftGroup group)
+  private void reinitialize(List<DatanodeID> datanodes, RaftGroup group)
       throws IOException {
     if (datanodes.isEmpty()) {
       return;
@@ -124,8 +128,9 @@ public final class XceiverClientRatis extends XceiverClientSpi {
 
   /**
    * Adds a new peers to the Ratis Ring.
+   *
    * @param datanode - new datanode
-   * @param group - Raft group
+   * @param group    - Raft group
    * @throws IOException - on Failure.
    */
   private void reinitialize(DatanodeID datanode, RaftGroup group)
@@ -141,8 +146,6 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     }
   }
 
-
-
   @Override
   public Pipeline getPipeline() {
     return pipeline;
@@ -216,6 +219,16 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     return reply;
   }
 
+  private CompletableFuture<RaftClientReply> sendRequestAsync(
+      ContainerCommandRequestProto request) throws IOException {
+    boolean isReadOnlyRequest = isReadOnly(request);
+    ByteString byteString =
+        ShadedProtoUtil.asShadedByteString(request.toByteArray());
+    LOG.debug("sendCommandAsync {} {}", isReadOnlyRequest, request);
+    return isReadOnlyRequest ? getClient().sendReadOnlyAsync(() -> byteString) :
+        getClient().sendAsync(() -> byteString);
+  }
+
   @Override
   public ContainerCommandResponseProto sendCommand(
       ContainerCommandRequestProto request) throws IOException {
@@ -236,6 +249,16 @@ public final class XceiverClientRatis extends XceiverClientSpi {
   public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
       ContainerCommandRequestProto request)
       throws IOException, ExecutionException, InterruptedException {
-    throw new IOException("Not implemented");
+    return sendRequestAsync(request).whenComplete((reply, e) ->
+          LOG.debug("received reply {} for request: {} exception: {}", request,
+              reply, e))
+        .thenApply(reply -> {
+          try {
+            return ContainerCommandResponseProto.parseFrom(
+                ShadedProtoUtil.asByteString(reply.getMessage().getContent()));
+          } catch (InvalidProtocolBufferException e) {
+            throw new CompletionException(e);
+          }
+        });
   }
 }

+ 10 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java

@@ -38,6 +38,7 @@ import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -115,18 +116,17 @@ public final class XceiverServerRatis implements XceiverServerSpi {
     RaftServerConfigKeys.setStorageDir(properties, new File(storageDir));
     RaftConfigKeys.Rpc.setType(properties, rpc);
 
-    //TODO: change these configs to setter after RATIS-154
-    properties.setInt("raft.server.log.segment.cache.num.max", 2);
-    properties.setInt("raft.grpc.message.size.max",
-        scmChunkSize + raftSegmentPreallocatedSize);
-    properties.setInt("raft.server.rpc.timeout.min", 800);
-    properties.setInt("raft.server.rpc.timeout.max", 1000);
+    RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2);
+    GrpcConfigKeys.setMessageSizeMax(properties,
+        SizeInBytes.valueOf(scmChunkSize + raftSegmentPreallocatedSize));
+    RaftServerConfigKeys.Rpc.setTimeoutMin(properties,
+        TimeDuration.valueOf(800, TimeUnit.MILLISECONDS));
+    RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
+        TimeDuration.valueOf(1000, TimeUnit.MILLISECONDS));
     if (rpc == SupportedRpcType.GRPC) {
       GrpcConfigKeys.Server.setPort(properties, port);
-    } else {
-      if (rpc == SupportedRpcType.NETTY) {
-        NettyConfigKeys.Server.setPort(properties, port);
-      }
+    } else if (rpc == SupportedRpcType.NETTY) {
+      NettyConfigKeys.Server.setPort(properties, port);
     }
     return properties;
   }