Explorar el Código

HDFS-12593. Ozone: update Ratis to the latest snapshot. Contributed by Tsz Wo Nicholas Sze.

Mukul Kumar Singh hace 7 años
padre
commit
fae6528c7b

+ 12 - 8
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java

@@ -20,12 +20,12 @@ package org.apache.hadoop.scm;
 
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
-import org.apache.ratis.RatisHelper;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.ratis.RatisHelper;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeer;
@@ -37,11 +37,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 import java.util.List;
 import java.util.Objects;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 
 /**
 /**
  * An abstract implementation of {@link XceiverClientSpi} using Ratis.
  * An abstract implementation of {@link XceiverClientSpi} using Ratis.
@@ -75,8 +77,9 @@ public final class XceiverClientRatis extends XceiverClientSpi {
    */
    */
   public void createPipeline(String clusterId, List<DatanodeID> datanodes)
   public void createPipeline(String clusterId, List<DatanodeID> datanodes)
       throws IOException {
       throws IOException {
-    final RaftPeer[] newPeers = datanodes.stream().map(RatisHelper::toRaftPeer)
-        .toArray(RaftPeer[]::new);
+    final List<RaftPeer> newPeers = datanodes.stream()
+        .map(RatisHelper::toRaftPeer)
+        .collect(Collectors.toList());
     LOG.debug("initializing pipeline:{} with nodes:{}", clusterId, newPeers);
     LOG.debug("initializing pipeline:{} with nodes:{}", clusterId, newPeers);
     reinitialize(datanodes, newPeers);
     reinitialize(datanodes, newPeers);
   }
   }
@@ -90,7 +93,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     return OzoneProtos.ReplicationType.RATIS;
     return OzoneProtos.ReplicationType.RATIS;
   }
   }
 
 
-  private void reinitialize(List<DatanodeID> datanodes, RaftPeer[] newPeers)
+  private void reinitialize(
+      List<DatanodeID> datanodes, Collection<RaftPeer> newPeers)
       throws IOException {
       throws IOException {
     if (datanodes.isEmpty()) {
     if (datanodes.isEmpty()) {
       return;
       return;
@@ -120,11 +124,11 @@ public final class XceiverClientRatis extends XceiverClientSpi {
    * @param newPeers - Raft machines
    * @param newPeers - Raft machines
    * @throws IOException - on Failure.
    * @throws IOException - on Failure.
    */
    */
-  private void reinitialize(DatanodeID datanode, RaftPeer[] newPeers)
+  private void reinitialize(DatanodeID datanode, Collection<RaftPeer> newPeers)
       throws IOException {
       throws IOException {
     final RaftPeer p = RatisHelper.toRaftPeer(datanode);
     final RaftPeer p = RatisHelper.toRaftPeer(datanode);
     try (RaftClient client = RatisHelper.newRaftClient(rpcType, p)) {
     try (RaftClient client = RatisHelper.newRaftClient(rpcType, p)) {
-      client.reinitialize(newPeers, p.getId());
+      client.reinitialize(RatisHelper.newRaftGroup(newPeers), p.getId());
     } catch (IOException ioe) {
     } catch (IOException ioe) {
       LOG.error("Failed to reinitialize RaftPeer:{} datanode: {}  ",
       LOG.error("Failed to reinitialize RaftPeer:{} datanode: {}  ",
           p, datanode, ioe);
           p, datanode, ioe);

+ 38 - 19
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java

@@ -23,15 +23,15 @@ import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.ratis.client.ClientFactory;
 import org.apache.ratis.client.ClientFactory;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.RpcType;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
+import java.util.*;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 
 
 /**
 /**
@@ -41,6 +41,10 @@ public interface RatisHelper {
   Logger LOG = LoggerFactory.getLogger(RatisHelper.class);
   Logger LOG = LoggerFactory.getLogger(RatisHelper.class);
 
 
   static String toRaftPeerIdString(DatanodeID id) {
   static String toRaftPeerIdString(DatanodeID id) {
+    return id.getIpAddr() + "_" + id.getRatisPort();
+  }
+
+  static String toRaftPeerAddressString(DatanodeID id) {
     return id.getIpAddr() + ":" + id.getRatisPort();
     return id.getIpAddr() + ":" + id.getRatisPort();
   }
   }
 
 
@@ -48,44 +52,59 @@ public interface RatisHelper {
     return RaftPeerId.valueOf(toRaftPeerIdString(id));
     return RaftPeerId.valueOf(toRaftPeerIdString(id));
   }
   }
 
 
-  static RaftPeer toRaftPeer(String id) {
-    return new RaftPeer(RaftPeerId.valueOf(id), id);
-  }
-
   static RaftPeer toRaftPeer(DatanodeID id) {
   static RaftPeer toRaftPeer(DatanodeID id) {
-    return toRaftPeer(toRaftPeerIdString(id));
+    return new RaftPeer(toRaftPeerId(id), toRaftPeerAddressString(id));
   }
   }
 
 
   static List<RaftPeer> toRaftPeers(Pipeline pipeline) {
   static List<RaftPeer> toRaftPeers(Pipeline pipeline) {
-    return pipeline.getMachines().stream()
-        .map(RatisHelper::toRaftPeer)
+    return toRaftPeers(pipeline.getMachines());
+  }
+
+  static <E extends DatanodeID> List<RaftPeer> toRaftPeers(List<E> datanodes) {
+    return datanodes.stream().map(RatisHelper::toRaftPeer)
         .collect(Collectors.toList());
         .collect(Collectors.toList());
   }
   }
 
 
-  static RaftPeer[] toRaftPeerArray(Pipeline pipeline) {
-    return toRaftPeers(pipeline).toArray(RaftPeer.EMPTY_PEERS);
+  /* TODO: use a dummy id for all groups for the moment.
+   *       It should be changed to a unique id for each group.
+   */
+  RaftGroupId DUMMY_GROUP_ID = RaftGroupId.randomId();
+
+  RaftGroup EMPTY_GROUP = new RaftGroup(DUMMY_GROUP_ID,
+      Collections.emptyList());
+
+  static RaftGroup emptyRaftGroup() {
+    return EMPTY_GROUP;
+  }
+
+  static RaftGroup newRaftGroup(Collection<RaftPeer> peers) {
+    return peers.isEmpty()? emptyRaftGroup()
+        : new RaftGroup(DUMMY_GROUP_ID, peers);
+  }
+
+  static RaftGroup newRaftGroup(Pipeline pipeline) {
+    return newRaftGroup(toRaftPeers(pipeline));
   }
   }
 
 
   static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline) {
   static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline) {
     return newRaftClient(rpcType, toRaftPeerId(pipeline.getLeader()),
     return newRaftClient(rpcType, toRaftPeerId(pipeline.getLeader()),
-        toRaftPeers(pipeline));
+        newRaftGroup(pipeline));
   }
   }
 
 
   static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader) {
   static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader) {
     return newRaftClient(rpcType, leader.getId(),
     return newRaftClient(rpcType, leader.getId(),
-        new ArrayList<>(Arrays.asList(leader)));
+        newRaftGroup(new ArrayList<>(Arrays.asList(leader))));
   }
   }
 
 
   static RaftClient newRaftClient(
   static RaftClient newRaftClient(
-      RpcType rpcType, RaftPeerId leader, List<RaftPeer> peers) {
-    LOG.trace("newRaftClient: {}, leader={}, peers={}", rpcType, leader, peers);
+      RpcType rpcType, RaftPeerId leader, RaftGroup group) {
+    LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, group);
     final RaftProperties properties = new RaftProperties();
     final RaftProperties properties = new RaftProperties();
-    final ClientFactory factory = ClientFactory.cast(rpcType.newFactory(
-        properties, null));
+    final ClientFactory factory = ClientFactory.cast(rpcType.newFactory(null));
 
 
     return RaftClient.newBuilder()
     return RaftClient.newBuilder()
         .setClientRpc(factory.newRaftClientRpc())
         .setClientRpc(factory.newRaftClientRpc())
-        .setServers(peers)
+        .setRaftGroup(group)
         .setLeaderId(leader)
         .setLeaderId(leader)
         .setProperties(properties)
         .setProperties(properties)
         .build();
         .build();

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java

@@ -80,7 +80,7 @@ public class ContainerStateMachine extends BaseStateMachine {
 
 
   @Override
   @Override
   public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
   public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
-    final SMLogEntryProto logEntry = trx.getSMLogEntry().get();
+    final SMLogEntryProto logEntry = trx.getSMLogEntry();
     return dispatch(ShadedProtoUtil.asByteString(logEntry.getData()),
     return dispatch(ShadedProtoUtil.asByteString(logEntry.getData()),
         response ->
         response ->
             () -> ShadedProtoUtil.asShadedByteString(response.toByteArray())
             () -> ShadedProtoUtil.asShadedByteString(response.toByteArray())

+ 2 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java

@@ -45,7 +45,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.ServerSocket;
 import java.net.SocketAddress;
 import java.net.SocketAddress;
-import java.util.Collections;
 import java.util.Objects;
 import java.util.Objects;
 
 
 /**
 /**
@@ -64,7 +63,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
 
 
     this.server = RaftServer.newBuilder()
     this.server = RaftServer.newBuilder()
         .setServerId(RatisHelper.toRaftPeerId(id))
         .setServerId(RatisHelper.toRaftPeerId(id))
-        .setPeers(Collections.emptyList())
+        .setGroup(RatisHelper.emptyRaftGroup())
         .setProperties(newRaftProperties(rpcType, port, storageDir))
         .setProperties(newRaftProperties(rpcType, port, storageDir))
         .setStateMachine(new ContainerStateMachine(dispatcher))
         .setStateMachine(new ContainerStateMachine(dispatcher))
         .build();
         .build();
@@ -73,7 +72,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
   static RaftProperties newRaftProperties(
   static RaftProperties newRaftProperties(
       RpcType rpc, int port, String storageDir) {
       RpcType rpc, int port, String storageDir) {
     final RaftProperties properties = new RaftProperties();
     final RaftProperties properties = new RaftProperties();
-    RaftServerConfigKeys.setStorageDir(properties, storageDir);
+    RaftServerConfigKeys.setStorageDir(properties, new File(storageDir));
     RaftConfigKeys.Rpc.setType(properties, rpc);
     RaftConfigKeys.Rpc.setType(properties, rpc);
     if (rpc == SupportedRpcType.GRPC) {
     if (rpc == SupportedRpcType.GRPC) {
       GrpcConfigKeys.Server.setPort(properties, port);
       GrpcConfigKeys.Server.setPort(properties, port);

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java

@@ -134,9 +134,8 @@ public class TestContainerServer {
   static void initXceiverServerRatis(
   static void initXceiverServerRatis(
       RpcType rpc, DatanodeID id, Pipeline pipeline) throws IOException {
       RpcType rpc, DatanodeID id, Pipeline pipeline) throws IOException {
     final RaftPeer p = RatisHelper.toRaftPeer(id);
     final RaftPeer p = RatisHelper.toRaftPeer(id);
-    final RaftPeer[] peers = RatisHelper.toRaftPeerArray(pipeline);
     final RaftClient client = RatisHelper.newRaftClient(rpc, p);
     final RaftClient client = RatisHelper.newRaftClient(rpc, p);
-    client.reinitialize(peers, p.getId());
+    client.reinitialize(RatisHelper.newRaftGroup(pipeline), p.getId());
   }
   }
 
 
 
 

+ 1 - 1
hadoop-project/pom.xml

@@ -99,7 +99,7 @@
     <ldap-api.version>1.0.0-M33</ldap-api.version>
     <ldap-api.version>1.0.0-M33</ldap-api.version>
 
 
     <!-- Apache Ratis version -->
     <!-- Apache Ratis version -->
-    <ratis.version>0.1.1-alpha-SNAPSHOT</ratis.version>
+    <ratis.version>0.1.1-alpha-7a5c3ea-SNAPSHOT</ratis.version>
     <jcache.version>1.0-alpha-1</jcache.version>
     <jcache.version>1.0-alpha-1</jcache.version>
     <ehcache.version>3.3.1</ehcache.version>
     <ehcache.version>3.3.1</ehcache.version>
     <hikari.version>2.4.12</hikari.version>
     <hikari.version>2.4.12</hikari.version>