|
@@ -26,17 +26,17 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.WriteChunkReq
|
|
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
|
|
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
|
|
import org.apache.ratis.conf.RaftProperties;
|
|
import org.apache.ratis.conf.RaftProperties;
|
|
import org.apache.ratis.protocol.Message;
|
|
import org.apache.ratis.protocol.Message;
|
|
-import org.apache.ratis.protocol.RaftClientReply;
|
|
|
|
import org.apache.ratis.protocol.RaftClientRequest;
|
|
import org.apache.ratis.protocol.RaftClientRequest;
|
|
import org.apache.ratis.protocol.RaftPeerId;
|
|
import org.apache.ratis.protocol.RaftPeerId;
|
|
import org.apache.ratis.server.storage.RaftStorage;
|
|
import org.apache.ratis.server.storage.RaftStorage;
|
|
import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil;
|
|
import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil;
|
|
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
|
|
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
|
|
import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
|
|
import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
|
|
-import org.apache.ratis.statemachine.BaseStateMachine;
|
|
|
|
-import org.apache.ratis.statemachine.SimpleStateMachineStorage;
|
|
|
|
|
|
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
|
|
|
|
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
|
|
import org.apache.ratis.statemachine.StateMachineStorage;
|
|
import org.apache.ratis.statemachine.StateMachineStorage;
|
|
import org.apache.ratis.statemachine.TransactionContext;
|
|
import org.apache.ratis.statemachine.TransactionContext;
|
|
|
|
+import org.apache.ratis.statemachine.impl.TransactionContextImpl;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
|
|
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
|
|
@@ -55,8 +55,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
*
|
|
*
|
|
* Read only requests are classified in
|
|
* Read only requests are classified in
|
|
* {@link org.apache.hadoop.scm.XceiverClientRatis#isReadOnly}
|
|
* {@link org.apache.hadoop.scm.XceiverClientRatis#isReadOnly}
|
|
- * and these readonly requests are replied from the
|
|
|
|
- * {@link #query(RaftClientRequest)}
|
|
|
|
|
|
+ * and these readonly requests are replied from the {@link #query(Message)}.
|
|
*
|
|
*
|
|
* The write requests can be divided into requests with user data
|
|
* The write requests can be divided into requests with user data
|
|
* (WriteChunkRequest) and other request without user data.
|
|
* (WriteChunkRequest) and other request without user data.
|
|
@@ -90,7 +89,7 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
= new SimpleStateMachineStorage();
|
|
= new SimpleStateMachineStorage();
|
|
private final ContainerDispatcher dispatcher;
|
|
private final ContainerDispatcher dispatcher;
|
|
private ThreadPoolExecutor writeChunkExecutor;
|
|
private ThreadPoolExecutor writeChunkExecutor;
|
|
- private final ConcurrentHashMap<String, CompletableFuture<Message>>
|
|
|
|
|
|
+ private final ConcurrentHashMap<Long, CompletableFuture<Message>>
|
|
writeChunkFutureMap;
|
|
writeChunkFutureMap;
|
|
private final ConcurrentHashMap<String, CompletableFuture<Message>>
|
|
private final ConcurrentHashMap<String, CompletableFuture<Message>>
|
|
createContainerFutureMap;
|
|
createContainerFutureMap;
|
|
@@ -171,7 +170,7 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
.setData(request.getMessage().getContent())
|
|
.setData(request.getMessage().getContent())
|
|
.build();
|
|
.build();
|
|
}
|
|
}
|
|
- return new TransactionContext(this, request, log);
|
|
|
|
|
|
+ return new TransactionContextImpl(this, request, log);
|
|
}
|
|
}
|
|
|
|
|
|
private ByteString getShadedByteString(ContainerCommandRequestProto proto) {
|
|
private ByteString getShadedByteString(ContainerCommandRequestProto proto) {
|
|
@@ -191,34 +190,47 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
return () -> ShadedProtoUtil.asShadedByteString(response.toByteArray());
|
|
return () -> ShadedProtoUtil.asShadedByteString(response.toByteArray());
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private CompletableFuture<Message> handleWriteChunk(
|
|
|
|
+ ContainerCommandRequestProto requestProto, long entryIndex) {
|
|
|
|
+ final WriteChunkRequestProto write = requestProto.getWriteChunk();
|
|
|
|
+ String containerName = write.getPipeline().getContainerName();
|
|
|
|
+ CompletableFuture<Message> future =
|
|
|
|
+ createContainerFutureMap.get(containerName);
|
|
|
|
+ CompletableFuture<Message> writeChunkFuture;
|
|
|
|
+ if (future != null) {
|
|
|
|
+ writeChunkFuture = future.thenApplyAsync(
|
|
|
|
+ v -> runCommand(requestProto), writeChunkExecutor);
|
|
|
|
+ } else {
|
|
|
|
+ writeChunkFuture = CompletableFuture.supplyAsync(
|
|
|
|
+ () -> runCommand(requestProto), writeChunkExecutor);
|
|
|
|
+ }
|
|
|
|
+ writeChunkFutureMap.put(entryIndex, writeChunkFuture);
|
|
|
|
+ return writeChunkFuture;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private CompletableFuture<Message> handleCreateContainer(
|
|
|
|
+ ContainerCommandRequestProto requestProto) {
|
|
|
|
+ String containerName =
|
|
|
|
+ requestProto.getCreateContainer().getContainerData().getName();
|
|
|
|
+ createContainerFutureMap.
|
|
|
|
+ computeIfAbsent(containerName, k -> new CompletableFuture<>());
|
|
|
|
+ return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) {
|
|
public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) {
|
|
try {
|
|
try {
|
|
final ContainerCommandRequestProto requestProto =
|
|
final ContainerCommandRequestProto requestProto =
|
|
getRequestProto(entry.getSmLogEntry().getStateMachineData());
|
|
getRequestProto(entry.getSmLogEntry().getStateMachineData());
|
|
- if (requestProto.getCmdType() == ContainerProtos.Type.CreateContainer) {
|
|
|
|
- String containerName =
|
|
|
|
- requestProto.getCreateContainer().getContainerData().getName();
|
|
|
|
- createContainerFutureMap.
|
|
|
|
- computeIfAbsent(containerName, k -> new CompletableFuture<>());
|
|
|
|
- return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
|
|
|
|
- } else {
|
|
|
|
- final WriteChunkRequestProto write = requestProto.getWriteChunk();
|
|
|
|
- String containerName = write.getPipeline().getContainerName();
|
|
|
|
- CompletableFuture<Message> future =
|
|
|
|
- createContainerFutureMap.get(containerName);
|
|
|
|
-
|
|
|
|
- CompletableFuture<Message> writeChunkFuture;
|
|
|
|
- if (future != null) {
|
|
|
|
- writeChunkFuture = future.thenApplyAsync(
|
|
|
|
- v -> runCommand(requestProto), writeChunkExecutor);
|
|
|
|
- } else {
|
|
|
|
- writeChunkFuture = CompletableFuture.supplyAsync(
|
|
|
|
- () -> runCommand(requestProto), writeChunkExecutor);
|
|
|
|
- }
|
|
|
|
- writeChunkFutureMap
|
|
|
|
- .put(write.getChunkData().getChunkName(), writeChunkFuture);
|
|
|
|
- return writeChunkFuture;
|
|
|
|
|
|
+ ContainerProtos.Type cmdType = requestProto.getCmdType();
|
|
|
|
+ switch (cmdType) {
|
|
|
|
+ case CreateContainer:
|
|
|
|
+ return handleCreateContainer(requestProto);
|
|
|
|
+ case WriteChunk:
|
|
|
|
+ return handleWriteChunk(requestProto, entry.getIndex());
|
|
|
|
+ default:
|
|
|
|
+ throw new IllegalStateException("Cmd Type:" + cmdType
|
|
|
|
+ + " should not have state machine data");
|
|
}
|
|
}
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
return completeExceptionally(e);
|
|
return completeExceptionally(e);
|
|
@@ -226,13 +238,11 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
- public CompletableFuture<RaftClientReply> query(RaftClientRequest request) {
|
|
|
|
|
|
+ public CompletableFuture<Message> query(Message request) {
|
|
try {
|
|
try {
|
|
final ContainerCommandRequestProto requestProto =
|
|
final ContainerCommandRequestProto requestProto =
|
|
- getRequestProto(request.getMessage().getContent());
|
|
|
|
- RaftClientReply raftClientReply =
|
|
|
|
- new RaftClientReply(request, runCommand(requestProto));
|
|
|
|
- return CompletableFuture.completedFuture(raftClientReply);
|
|
|
|
|
|
+ getRequestProto(request.getContent());
|
|
|
|
+ return CompletableFuture.completedFuture(runCommand(requestProto));
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
return completeExceptionally(e);
|
|
return completeExceptionally(e);
|
|
}
|
|
}
|
|
@@ -243,19 +253,20 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
try {
|
|
try {
|
|
ContainerCommandRequestProto requestProto =
|
|
ContainerCommandRequestProto requestProto =
|
|
getRequestProto(trx.getSMLogEntry().getData());
|
|
getRequestProto(trx.getSMLogEntry().getData());
|
|
|
|
+ ContainerProtos.Type cmdType = requestProto.getCmdType();
|
|
|
|
|
|
- if (requestProto.getCmdType() == ContainerProtos.Type.WriteChunk) {
|
|
|
|
|
|
+ if (cmdType == ContainerProtos.Type.WriteChunk) {
|
|
WriteChunkRequestProto write = requestProto.getWriteChunk();
|
|
WriteChunkRequestProto write = requestProto.getWriteChunk();
|
|
// the data field has already been removed in start Transaction
|
|
// the data field has already been removed in start Transaction
|
|
Preconditions.checkArgument(!write.hasData());
|
|
Preconditions.checkArgument(!write.hasData());
|
|
CompletableFuture<Message> stateMachineFuture =
|
|
CompletableFuture<Message> stateMachineFuture =
|
|
- writeChunkFutureMap.remove(write.getChunkData().getChunkName());
|
|
|
|
|
|
+ writeChunkFutureMap.remove(trx.getLogEntry().getIndex());
|
|
return stateMachineFuture
|
|
return stateMachineFuture
|
|
.thenComposeAsync(v ->
|
|
.thenComposeAsync(v ->
|
|
CompletableFuture.completedFuture(runCommand(requestProto)));
|
|
CompletableFuture.completedFuture(runCommand(requestProto)));
|
|
} else {
|
|
} else {
|
|
Message message = runCommand(requestProto);
|
|
Message message = runCommand(requestProto);
|
|
- if (requestProto.getCmdType() == ContainerProtos.Type.CreateContainer) {
|
|
|
|
|
|
+ if (cmdType == ContainerProtos.Type.CreateContainer) {
|
|
String containerName =
|
|
String containerName =
|
|
requestProto.getCreateContainer().getContainerData().getName();
|
|
requestProto.getCreateContainer().getContainerData().getName();
|
|
createContainerFutureMap.remove(containerName).complete(message);
|
|
createContainerFutureMap.remove(containerName).complete(message);
|