|
@@ -120,7 +120,8 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
createContainerFutureMap;
|
|
|
private ExecutorService[] executors;
|
|
|
private final int numExecutors;
|
|
|
- private final Map<Long, Long> containerCommandCompletionMap;
|
|
|
+ private final Map<Long, Long> applyTransactionCompletionMap;
|
|
|
+ private long lastIndex;
|
|
|
/**
|
|
|
* CSM metrics.
|
|
|
*/
|
|
@@ -138,7 +139,8 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
this.executors = executors.toArray(new ExecutorService[numExecutors]);
|
|
|
this.writeChunkFutureMap = new ConcurrentHashMap<>();
|
|
|
this.createContainerFutureMap = new ConcurrentHashMap<>();
|
|
|
- containerCommandCompletionMap = new ConcurrentHashMap<>();
|
|
|
+ applyTransactionCompletionMap = new ConcurrentHashMap<>();
|
|
|
+ this.lastIndex = RaftServerConstants.INVALID_LOG_INDEX;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -162,10 +164,12 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
|
|
|
private long loadSnapshot(SingleFileSnapshotInfo snapshot) {
|
|
|
if (snapshot == null) {
|
|
|
- TermIndex empty = TermIndex.newTermIndex(0, 0);
|
|
|
+ TermIndex empty = TermIndex.newTermIndex(0,
|
|
|
+ RaftServerConstants.INVALID_LOG_INDEX);
|
|
|
LOG.info("The snapshot info is null." +
|
|
|
"Setting the last applied index to:" + empty);
|
|
|
setLastAppliedTermIndex(empty);
|
|
|
+ lastIndex = RaftServerConstants.INVALID_LOG_INDEX;
|
|
|
return RaftServerConstants.INVALID_LOG_INDEX;
|
|
|
}
|
|
|
|
|
@@ -174,6 +178,7 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
snapshot.getFile().getPath().toFile());
|
|
|
LOG.info("Setting the last applied index to " + last);
|
|
|
setLastAppliedTermIndex(last);
|
|
|
+ lastIndex = last.getIndex();
|
|
|
return last.getIndex();
|
|
|
}
|
|
|
|
|
@@ -471,7 +476,7 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
Long appliedTerm = null;
|
|
|
long appliedIndex = -1;
|
|
|
for(long i = getLastAppliedTermIndex().getIndex() + 1;; i++) {
|
|
|
- final Long removed = containerCommandCompletionMap.remove(i);
|
|
|
+ final Long removed = applyTransactionCompletionMap.remove(i);
|
|
|
if (removed == null) {
|
|
|
break;
|
|
|
}
|
|
@@ -479,7 +484,7 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
appliedIndex = i;
|
|
|
}
|
|
|
if (appliedTerm != null) {
|
|
|
- updateLastAppliedTermIndex(appliedIndex, appliedTerm);
|
|
|
+ updateLastAppliedTermIndex(appliedTerm, appliedIndex);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -489,6 +494,15 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
@Override
|
|
|
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
|
|
|
long index = trx.getLogEntry().getIndex();
|
|
|
+
|
|
|
+ // ApplyTransaction call can come with an entryIndex much greater than
|
|
|
+ // lastIndex updated because in between entries in the raft log can be
|
|
|
+ // appended because raft config persistence. Just add a dummy entry
|
|
|
+ // for those.
|
|
|
+ for (long i = lastIndex + 1; i < index; i++) {
|
|
|
+ LOG.info("Gap in indexes at:{} detected, adding dummy entries ", i);
|
|
|
+ applyTransactionCompletionMap.put(i, trx.getLogEntry().getTerm());
|
|
|
+ }
|
|
|
try {
|
|
|
metrics.incNumApplyTransactionsOps();
|
|
|
ContainerCommandRequestProto requestProto =
|
|
@@ -553,9 +567,10 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ lastIndex = index;
|
|
|
future.thenAccept(m -> {
|
|
|
final Long previous =
|
|
|
- containerCommandCompletionMap
|
|
|
+ applyTransactionCompletionMap
|
|
|
.put(index, trx.getLogEntry().getTerm());
|
|
|
Preconditions.checkState(previous == null);
|
|
|
updateLastApplied();
|