|
@@ -19,11 +19,16 @@ package org.apache.hadoop.ozone.om.ratis;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.base.Preconditions;
|
|
import com.google.common.base.Preconditions;
|
|
|
|
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
import com.google.protobuf.ServiceException;
|
|
import com.google.protobuf.ServiceException;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.util.Collection;
|
|
import java.util.Collection;
|
|
import java.util.UUID;
|
|
import java.util.UUID;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.CompletableFuture;
|
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
|
+import java.util.concurrent.ThreadFactory;
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
+
|
|
import org.apache.hadoop.ozone.container.common.transport.server.ratis
|
|
import org.apache.hadoop.ozone.container.common.transport.server.ratis
|
|
.ContainerStateMachine;
|
|
.ContainerStateMachine;
|
|
import org.apache.hadoop.ozone.om.OzoneManager;
|
|
import org.apache.hadoop.ozone.om.OzoneManager;
|
|
@@ -38,6 +43,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
|
|
import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandler;
|
|
import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandler;
|
|
import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandlerImpl;
|
|
import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandlerImpl;
|
|
import org.apache.hadoop.util.Time;
|
|
import org.apache.hadoop.util.Time;
|
|
|
|
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
|
import org.apache.ratis.proto.RaftProtos;
|
|
import org.apache.ratis.proto.RaftProtos;
|
|
import org.apache.ratis.protocol.Message;
|
|
import org.apache.ratis.protocol.Message;
|
|
import org.apache.ratis.protocol.RaftClientRequest;
|
|
import org.apache.ratis.protocol.RaftClientRequest;
|
|
@@ -70,6 +76,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
|
|
private RaftGroupId raftGroupId;
|
|
private RaftGroupId raftGroupId;
|
|
private long lastAppliedIndex = 0;
|
|
private long lastAppliedIndex = 0;
|
|
private final OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer;
|
|
private final OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer;
|
|
|
|
+ private final ExecutorService executorService;
|
|
|
|
|
|
public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
|
|
public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
|
|
this.omRatisServer = ratisServer;
|
|
this.omRatisServer = ratisServer;
|
|
@@ -79,6 +86,9 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
|
|
this::updateLastAppliedIndex);
|
|
this::updateLastAppliedIndex);
|
|
this.handler = new OzoneManagerHARequestHandlerImpl(ozoneManager,
|
|
this.handler = new OzoneManagerHARequestHandlerImpl(ozoneManager,
|
|
ozoneManagerDoubleBuffer);
|
|
ozoneManagerDoubleBuffer);
|
|
|
|
+ ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
|
|
|
|
+ .setNameFormat("OM StateMachine ApplyTransaction Thread - %d").build();
|
|
|
|
+ this.executorService = HadoopExecutors.newSingleThreadExecutor(build);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -132,8 +142,36 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
|
|
OMRequest request = OMRatisHelper.convertByteStringToOMRequest(
|
|
OMRequest request = OMRatisHelper.convertByteStringToOMRequest(
|
|
trx.getStateMachineLogEntry().getLogData());
|
|
trx.getStateMachineLogEntry().getLogData());
|
|
long trxLogIndex = trx.getLogEntry().getIndex();
|
|
long trxLogIndex = trx.getLogEntry().getIndex();
|
|
- CompletableFuture<Message> future = CompletableFuture
|
|
|
|
- .supplyAsync(() -> runCommand(request, trxLogIndex));
|
|
|
|
|
|
+ // In the current approach we have one single global thread executor.
|
|
|
|
+ // with single thread. Right now this is being done for correctness, as
|
|
|
|
+ // applyTransaction will be run on multiple OM's we want to execute the
|
|
|
|
+ // transactions in the same order on all OM's, otherwise there is a
|
|
|
|
+ // chance that OM replica's can be out of sync.
|
|
|
|
+ // TODO: In this way we are making all applyTransactions in
|
|
|
|
+ // OM serial order. Revisit this in future to use multiple executors for
|
|
|
|
+ // volume/bucket.
|
|
|
|
+
|
|
|
|
+ // Reason for not immediately implementing executor per volume is, if
|
|
|
|
+ // one executor operations are slow, we cannot update the
|
|
|
|
+ // lastAppliedIndex in OzoneManager StateMachine, even if other
|
|
|
|
+ // executor has completed the transactions with id more.
|
|
|
|
+
|
|
|
|
+ // We have 300 transactions, And for each volume we have transactions
|
|
|
|
+ // of 150. Volume1 transactions 0 - 149 and Volume2 transactions 150 -
|
|
|
|
+ // 299.
|
|
|
|
+ // Example: Executor1 - Volume1 - 100 (current completed transaction)
|
|
|
|
+ // Example: Executor2 - Volume2 - 299 (current completed transaction)
|
|
|
|
+
|
|
|
|
+ // Now we have applied transactions of 0 - 100 and 149 - 299. We
|
|
|
|
+ // cannot update lastAppliedIndex to 299. We need to update it to 100,
|
|
|
|
+ // since 101 - 149 are not applied. When OM restarts it will
|
|
|
|
+ // applyTransactions from lastAppliedIndex.
|
|
|
|
+ // We can update the lastAppliedIndex to 100, and update it to 299,
|
|
|
|
+ // only after completing 101 - 149. In initial stage, we are starting
|
|
|
|
+ // with single global executor. Will revisit this when needed.
|
|
|
|
+
|
|
|
|
+ CompletableFuture<Message> future = CompletableFuture.supplyAsync(
|
|
|
|
+ () -> runCommand(request, trxLogIndex), executorService);
|
|
return future;
|
|
return future;
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
return completeExceptionally(e);
|
|
return completeExceptionally(e);
|
|
@@ -301,6 +339,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
|
|
|
|
|
|
public void stop() {
|
|
public void stop() {
|
|
ozoneManagerDoubleBuffer.stop();
|
|
ozoneManagerDoubleBuffer.stop();
|
|
|
|
+ HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS);
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
}
|