|
@@ -142,7 +142,6 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
private ExecutorService[] executors;
|
|
|
private final int numExecutors;
|
|
|
private final Map<Long, Long> applyTransactionCompletionMap;
|
|
|
- private long lastIndex;
|
|
|
private final Cache<Long, ByteString> stateMachineDataCache;
|
|
|
private final boolean isBlockTokenEnabled;
|
|
|
private final TokenVerifier tokenVerifier;
|
|
@@ -165,7 +164,6 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
this.executors = executors.toArray(new ExecutorService[numExecutors]);
|
|
|
this.writeChunkFutureMap = new ConcurrentHashMap<>();
|
|
|
applyTransactionCompletionMap = new ConcurrentHashMap<>();
|
|
|
- this.lastIndex = RaftServerConstants.INVALID_LOG_INDEX;
|
|
|
stateMachineDataCache = CacheBuilder.newBuilder()
|
|
|
.expireAfterAccess(expiryInterval, TimeUnit.MILLISECONDS)
|
|
|
// set the limit on no of cached entries equal to no of max threads
|
|
@@ -204,7 +202,6 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
"The snapshot info is null." + "Setting the last applied index to:"
|
|
|
+ empty);
|
|
|
setLastAppliedTermIndex(empty);
|
|
|
- lastIndex = RaftServerConstants.INVALID_LOG_INDEX;
|
|
|
return RaftServerConstants.INVALID_LOG_INDEX;
|
|
|
}
|
|
|
|
|
@@ -213,7 +210,6 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
|
|
|
LOG.info("Setting the last applied index to " + last);
|
|
|
setLastAppliedTermIndex(last);
|
|
|
- lastIndex = last.getIndex();
|
|
|
|
|
|
// initialize the dispatcher with snapshot so that it build the missing
|
|
|
// container list
|
|
@@ -575,6 +571,18 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Notifies the state machine about index updates because of entries
|
|
|
+ * which do not cause state machine update, i.e. conf entries, metadata
|
|
|
+ * entries
|
|
|
+ * @param term term of the log entry
|
|
|
+ * @param index index of the log entry
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void notifyIndexUpdate(long term, long index) {
|
|
|
+ applyTransactionCompletionMap.put(index, term);
|
|
|
+ }
|
|
|
+
|
|
|
/*
|
|
|
* ApplyTransaction calls in Ratis are sequential.
|
|
|
*/
|
|
@@ -586,14 +594,6 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
.setTerm(trx.getLogEntry().getTerm())
|
|
|
.setLogIndex(index);
|
|
|
|
|
|
- // ApplyTransaction call can come with an entryIndex much greater than
|
|
|
- // lastIndex updated because in between entries in the raft log can be
|
|
|
- // appended because raft config persistence. Just add a dummy entry
|
|
|
- // for those.
|
|
|
- for (long i = lastIndex + 1; i < index; i++) {
|
|
|
- LOG.info("Gap in indexes at:{} detected, adding dummy entries ", i);
|
|
|
- applyTransactionCompletionMap.put(i, trx.getLogEntry().getTerm());
|
|
|
- }
|
|
|
try {
|
|
|
metrics.incNumApplyTransactionsOps();
|
|
|
ContainerCommandRequestProto requestProto =
|
|
@@ -616,7 +616,6 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
.supplyAsync(() -> runCommand(requestProto, builder.build()),
|
|
|
getCommandExecutor(requestProto));
|
|
|
|
|
|
- lastIndex = index;
|
|
|
future.thenAccept(m -> {
|
|
|
final Long previous =
|
|
|
applyTransactionCompletionMap
|