|
@@ -26,6 +26,8 @@ import org.apache.hadoop.hdds.HddsUtils;
|
|
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
|
|
|
|
|
import io.opentracing.Scope;
|
|
|
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
|
|
|
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
|
|
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
|
|
|
import org.apache.ratis.protocol.RaftGroup;
|
|
|
import org.apache.ratis.protocol.RaftGroupId;
|
|
@@ -350,13 +352,20 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
}
|
|
|
|
|
|
private ContainerCommandResponseProto dispatchCommand(
|
|
|
- ContainerCommandRequestProto requestProto,
|
|
|
- DispatcherContext context) throws IOException {
|
|
|
+ ContainerCommandRequestProto requestProto, DispatcherContext context) {
|
|
|
LOG.trace("dispatch {}", requestProto);
|
|
|
- if(isBlockTokenEnabled) {
|
|
|
- // ServerInterceptors intercepts incoming request and creates ugi.
|
|
|
- tokenVerifier.verify(UserGroupInformation.getCurrentUser()
|
|
|
- .getShortUserName(), requestProto.getEncodedToken());
|
|
|
+ if (isBlockTokenEnabled) {
|
|
|
+ try {
|
|
|
+ // ServerInterceptors intercepts incoming request and creates ugi.
|
|
|
+ tokenVerifier
|
|
|
+ .verify(UserGroupInformation.getCurrentUser().getShortUserName(),
|
|
|
+ requestProto.getEncodedToken());
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ StorageContainerException sce = new StorageContainerException(
|
|
|
+ "Block token verification failed. " + ioe.getMessage(), ioe,
|
|
|
+ ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED);
|
|
|
+ return ContainerUtils.logAndReturnError(LOG, sce, requestProto);
|
|
|
+ }
|
|
|
}
|
|
|
ContainerCommandResponseProto response =
|
|
|
dispatcher.dispatch(requestProto, context);
|
|
@@ -365,7 +374,7 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
}
|
|
|
|
|
|
private Message runCommand(ContainerCommandRequestProto requestProto,
|
|
|
- DispatcherContext context) throws IOException {
|
|
|
+ DispatcherContext context) {
|
|
|
return dispatchCommand(requestProto, context)::toByteString;
|
|
|
}
|
|
|
|
|
@@ -394,14 +403,10 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
.setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
|
|
|
.setCreateContainerSet(createContainerSet)
|
|
|
.build();
|
|
|
- CompletableFuture<Message> writeChunkFuture;
|
|
|
- try {
|
|
|
- Message msg = runCommand(requestProto, context);
|
|
|
- writeChunkFuture = CompletableFuture
|
|
|
- .supplyAsync(() -> msg, chunkExecutor);
|
|
|
- }catch(IOException ie) {
|
|
|
- writeChunkFuture = completeExceptionally(ie);
|
|
|
- }
|
|
|
+ // ensure the write chunk happens asynchronously in writeChunkExecutor pool
|
|
|
+ // thread.
|
|
|
+ CompletableFuture<Message> writeChunkFuture = CompletableFuture
|
|
|
+ .supplyAsync(() -> runCommand(requestProto, context), chunkExecutor);
|
|
|
|
|
|
writeChunkFutureMap.put(entryIndex, writeChunkFuture);
|
|
|
LOG.debug("writeChunk writeStateMachineData : blockId " + write.getBlockID()
|
|
@@ -567,16 +572,18 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
// readStateMachineData should only be called for "write" to Ratis.
|
|
|
Preconditions.checkArgument(!HddsUtils.isReadOnly(requestProto));
|
|
|
if (requestProto.getCmdType() == Type.WriteChunk) {
|
|
|
- CompletableFuture<ByteString> future = new CompletableFuture<>();
|
|
|
- return future.supplyAsync(() -> {
|
|
|
+ final CompletableFuture<ByteString> future = new CompletableFuture<>();
|
|
|
+ CompletableFuture.supplyAsync(() -> {
|
|
|
try {
|
|
|
- return getCachedStateMachineData(entry.getIndex(), entry.getTerm(),
|
|
|
- requestProto);
|
|
|
+ future.complete(
|
|
|
+ getCachedStateMachineData(entry.getIndex(), entry.getTerm(),
|
|
|
+ requestProto));
|
|
|
} catch (ExecutionException e) {
|
|
|
future.completeExceptionally(e);
|
|
|
- return null;
|
|
|
}
|
|
|
+ return future;
|
|
|
}, chunkExecutor);
|
|
|
+ return future;
|
|
|
} else {
|
|
|
throw new IllegalStateException("Cmd type:" + requestProto.getCmdType()
|
|
|
+ " cannot have state machine data");
|
|
@@ -627,7 +634,6 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
ContainerCommandRequestProto requestProto =
|
|
|
getRequestProto(trx.getStateMachineLogEntry().getLogData());
|
|
|
Type cmdType = requestProto.getCmdType();
|
|
|
- CompletableFuture<Message> future;
|
|
|
// Make sure that in write chunk, the user data is not set
|
|
|
if (cmdType == Type.WriteChunk) {
|
|
|
Preconditions
|
|
@@ -638,13 +644,11 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
if (cmdType == Type.WriteChunk || cmdType ==Type.PutSmallFile) {
|
|
|
builder.setCreateContainerSet(createContainerSet);
|
|
|
}
|
|
|
- try {
|
|
|
- Message msg = runCommand(requestProto, builder.build());
|
|
|
- future = CompletableFuture.supplyAsync(() -> msg,
|
|
|
- getCommandExecutor(requestProto));
|
|
|
- } catch (IOException ie) {
|
|
|
- future = completeExceptionally(ie);
|
|
|
- }
|
|
|
+ // Ensure the command gets executed in a separate thread than
|
|
|
+ // stateMachineUpdater thread which is calling applyTransaction here.
|
|
|
+ CompletableFuture<Message> future = CompletableFuture
|
|
|
+ .supplyAsync(() -> runCommand(requestProto, builder.build()),
|
|
|
+ getCommandExecutor(requestProto));
|
|
|
|
|
|
lastIndex = index;
|
|
|
future.thenAccept(m -> {
|