Browse Source

HDDS-716. Update ozone to latest ratis snapshot build(0.3.0-aa38160-SNAPSHOT). Contributed by Mukul Kumar Singh.

Shashikant Banerjee 6 years ago
parent
commit
0891cdda79

+ 47 - 25
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java

@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.apache.ratis.proto.RaftProtos.StateMachineEntryProto;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.server.RaftServer;
@@ -49,7 +50,7 @@ import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.proto.RaftProtos.SMLogEntryProto;
+import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
 import org.apache.ratis.statemachine.StateMachineStorage;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
@@ -207,7 +208,7 @@ public class ContainerStateMachine extends BaseStateMachine {
     final ContainerCommandRequestProto proto =
         getRequestProto(request.getMessage().getContent());
 
-    final SMLogEntryProto log;
+    final StateMachineLogEntryProto log;
     if (proto.getCmdType() == Type.WriteChunk) {
       final WriteChunkRequestProto write = proto.getWriteChunk();
       // create the state machine data proto
@@ -237,23 +238,39 @@ public class ContainerStateMachine extends BaseStateMachine {
               .setWriteChunk(commitWriteChunkProto)
               .build();
 
-      log = SMLogEntryProto.newBuilder()
-          .setData(commitContainerCommandProto.toByteString())
-          .setStateMachineData(dataContainerCommandProto.toByteString())
-          .build();
+      log = createSMLogEntryProto(request,
+          commitContainerCommandProto.toByteString(),
+          dataContainerCommandProto.toByteString());
     } else if (proto.getCmdType() == Type.CreateContainer) {
-      log = SMLogEntryProto.newBuilder()
-          .setData(request.getMessage().getContent())
-          .setStateMachineData(request.getMessage().getContent())
-          .build();
+      log = createSMLogEntryProto(request,
+          request.getMessage().getContent(), request.getMessage().getContent());
     } else {
-      log = SMLogEntryProto.newBuilder()
-          .setData(request.getMessage().getContent())
-          .build();
+      log = createSMLogEntryProto(request, request.getMessage().getContent(),
+          null);
     }
     return new TransactionContextImpl(this, request, log);
   }
 
+  private StateMachineLogEntryProto createSMLogEntryProto(RaftClientRequest r,
+      ByteString logData, ByteString smData) {
+    StateMachineLogEntryProto.Builder builder =
+        StateMachineLogEntryProto.newBuilder();
+
+    builder.setCallId(r.getCallId())
+        .setClientId(r.getClientId().toByteString())
+        .setLogData(logData);
+
+    if (smData != null) {
+      builder.setStateMachineEntry(StateMachineEntryProto.newBuilder()
+          .setStateMachineData(smData).build());
+    }
+    return builder.build();
+  }
+
+  private ByteString getStateMachineData(StateMachineLogEntryProto entryProto) {
+    return entryProto.getStateMachineEntry().getStateMachineData();
+  }
+
   private ContainerCommandRequestProto getRequestProto(ByteString request)
       throws InvalidProtocolBufferException {
     return ContainerCommandRequestProto.parseFrom(request);
@@ -315,7 +332,7 @@ public class ContainerStateMachine extends BaseStateMachine {
     try {
       metrics.incNumWriteStateMachineOps();
       final ContainerCommandRequestProto requestProto =
-          getRequestProto(entry.getSmLogEntry().getStateMachineData());
+          getRequestProto(getStateMachineData(entry.getStateMachineLogEntry()));
       Type cmdType = requestProto.getCmdType();
       switch (cmdType) {
       case CreateContainer:
@@ -345,8 +362,8 @@ public class ContainerStateMachine extends BaseStateMachine {
     }
   }
 
-  private ByteString readStateMachineData(LogEntryProto entry,
-      ContainerCommandRequestProto requestProto) {
+  private ByteString readStateMachineData(ContainerCommandRequestProto
+                                              requestProto) {
     WriteChunkRequestProto writeChunkRequestProto =
         requestProto.getWriteChunk();
     // Assert that store log entry is for COMMIT_DATA, the WRITE_DATA is
@@ -361,7 +378,8 @@ public class ContainerStateMachine extends BaseStateMachine {
             .setChunkData(writeChunkRequestProto.getChunkData());
     ContainerCommandRequestProto dataContainerCommandProto =
         ContainerCommandRequestProto.newBuilder(requestProto)
-            .setCmdType(Type.ReadChunk).setReadChunk(readChunkRequestProto)
+            .setCmdType(Type.ReadChunk)
+            .setReadChunk(readChunkRequestProto)
             .build();
 
     // read the chunk
@@ -376,7 +394,8 @@ public class ContainerStateMachine extends BaseStateMachine {
     final WriteChunkRequestProto.Builder dataWriteChunkProto =
         WriteChunkRequestProto.newBuilder(writeChunkRequestProto)
             // adding the state machine data
-            .setData(responseProto.getData()).setStage(Stage.WRITE_DATA);
+            .setData(responseProto.getData())
+            .setStage(Stage.WRITE_DATA);
 
     ContainerCommandRequestProto.Builder newStateMachineProto =
         ContainerCommandRequestProto.newBuilder(requestProto)
@@ -410,21 +429,20 @@ public class ContainerStateMachine extends BaseStateMachine {
   @Override
   public CompletableFuture<ByteString> readStateMachineData(
       LogEntryProto entry) {
-    SMLogEntryProto smLogEntryProto = entry.getSmLogEntry();
-    if (!smLogEntryProto.getStateMachineData().isEmpty()) {
+    StateMachineLogEntryProto smLogEntryProto = entry.getStateMachineLogEntry();
+    if (!getStateMachineData(smLogEntryProto).isEmpty()) {
       return CompletableFuture.completedFuture(ByteString.EMPTY);
     }
 
     try {
       final ContainerCommandRequestProto requestProto =
-          getRequestProto(entry.getSmLogEntry().getData());
+          getRequestProto(entry.getStateMachineLogEntry().getLogData());
       // readStateMachineData should only be called for "write" to Ratis.
       Preconditions.checkArgument(!HddsUtils.isReadOnly(requestProto));
 
       if (requestProto.getCmdType() == Type.WriteChunk) {
         return CompletableFuture.supplyAsync(() ->
-                readStateMachineData(entry, requestProto),
-            chunkExecutor);
+                readStateMachineData(requestProto), chunkExecutor);
       } else if (requestProto.getCmdType() == Type.CreateContainer) {
         return CompletableFuture.completedFuture(requestProto.toByteString());
       } else {
@@ -462,7 +480,7 @@ public class ContainerStateMachine extends BaseStateMachine {
     try {
       metrics.incNumApplyTransactionsOps();
       ContainerCommandRequestProto requestProto =
-          getRequestProto(trx.getSMLogEntry().getData());
+          getRequestProto(trx.getStateMachineLogEntry().getLogData());
       Type cmdType = requestProto.getCmdType();
       CompletableFuture<Message> future;
       if (cmdType == Type.PutBlock) {
@@ -490,6 +508,11 @@ public class ContainerStateMachine extends BaseStateMachine {
             .supplyAsync(() -> runCommand(containerCommandRequestProto),
                 getCommandExecutor(requestProto));
       } else {
+        // Make sure that in write chunk, the user data is not set
+        if (cmdType == Type.WriteChunk) {
+          Preconditions.checkArgument(requestProto
+              .getWriteChunk().getData().isEmpty());
+        }
         future = CompletableFuture.supplyAsync(() -> runCommand(requestProto),
             getCommandExecutor(requestProto));
       }
@@ -534,7 +557,6 @@ public class ContainerStateMachine extends BaseStateMachine {
 
   @Override
   public void close() throws IOException {
-    takeSnapshot();
     for (int i = 0; i < numExecutors; i++) {
       executors[i].shutdown();
     }

+ 1 - 1
hadoop-project/pom.xml

@@ -103,7 +103,7 @@
     <ldap-api.version>1.0.0-M33</ldap-api.version>
 
     <!-- Apache Ratis version -->
-    <ratis.version>0.3.0-9b2d7b6-SNAPSHOT</ratis.version>
+    <ratis.version>0.3.0-aa38160-SNAPSHOT</ratis.version>
     <jcache.version>1.0-alpha-1</jcache.version>
     <ehcache.version>3.3.1</ehcache.version>
     <hikari.version>2.4.12</hikari.version>