瀏覽代碼

HDDS-365. Implement flushStateMachineData for containerStateMachine. Contributed by Shashikant Banerjee.

Mukul Kumar Singh 6 年之前
父節點
當前提交
2651e2c43d

+ 1 - 1
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java

@@ -87,7 +87,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
     }
     }
     LOG.debug("Connecting to server Port : " + leader.getIpAddress());
     LOG.debug("Connecting to server Port : " + leader.getIpAddress());
     channel = NettyChannelBuilder.forAddress(leader.getIpAddress(), port)
     channel = NettyChannelBuilder.forAddress(leader.getIpAddress(), port)
-        .usePlaintext(true)
+        .usePlaintext()
         .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
         .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
         .build();
         .build();
     asyncStub = XceiverClientProtocolServiceGrpc.newStub(channel);
     asyncStub = XceiverClientProtocolServiceGrpc.newStub(channel);

+ 1 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java

@@ -77,7 +77,7 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
     datanodeDetails.setPort(
     datanodeDetails.setPort(
         DatanodeDetails.newPort(DatanodeDetails.Port.Name.STANDALONE, port));
         DatanodeDetails.newPort(DatanodeDetails.Port.Name.STANDALONE, port));
     server = ((NettyServerBuilder) ServerBuilder.forPort(port))
     server = ((NettyServerBuilder) ServerBuilder.forPort(port))
-        .maxMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
+        .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
         .addService(new GrpcXceiverService(dispatcher))
         .addService(new GrpcXceiverService(dispatcher))
         .build();
         .build();
     storageContainer = dispatcher;
     storageContainer = dispatcher;

+ 18 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java

@@ -59,6 +59,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.stream.Collectors;
 
 
 /** A {@link org.apache.ratis.statemachine.StateMachine} for containers.
 /** A {@link org.apache.ratis.statemachine.StateMachine} for containers.
  *
  *
@@ -316,6 +317,23 @@ public class ContainerStateMachine extends BaseStateMachine {
     return LogEntryProto.newBuilder().setSmLogEntry(log).build();
     return LogEntryProto.newBuilder().setSmLogEntry(log).build();
   }
   }
 
 
+  /**
+   * Returns the combined future of all the writeChunks till the given log
+   * index. The Raft log worker will wait for the stateMachineData to complete
+   * flush as well.
+   *
+   * @param index log index till which the stateMachine data needs to be flushed
+   * @return Combined future of all writeChunks till the log index given.
+   */
+  @Override
+  public CompletableFuture<Void> flushStateMachineData(long index) {
+    List<CompletableFuture<Message>> futureList =
+        writeChunkFutureMap.entrySet().stream().filter(x -> x.getKey() <= index)
+            .map(x -> x.getValue()).collect(Collectors.toList());
+    CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
+        futureList.toArray(new CompletableFuture[futureList.size()]));
+    return combinedFuture;
+  }
   /*
   /*
    * This api is used by the leader while appending logs to the follower
    * This api is used by the leader while appending logs to the follower
    * This allows the leader to read the state machine data from the
    * This allows the leader to read the state machine data from the

+ 1 - 1
hadoop-project/pom.xml

@@ -97,7 +97,7 @@
     <ldap-api.version>1.0.0-M33</ldap-api.version>
     <ldap-api.version>1.0.0-M33</ldap-api.version>
 
 
     <!-- Apache Ratis version -->
     <!-- Apache Ratis version -->
-    <ratis.version>0.3.0-e4a016f-SNAPSHOT</ratis.version>
+    <ratis.version>0.3.0-e6fd494-SNAPSHOT</ratis.version>
     <jcache.version>1.0-alpha-1</jcache.version>
     <jcache.version>1.0-alpha-1</jcache.version>
     <ehcache.version>3.3.1</ehcache.version>
     <ehcache.version>3.3.1</ehcache.version>
     <hikari.version>2.4.12</hikari.version>
     <hikari.version>2.4.12</hikari.version>