|
@@ -119,7 +119,8 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
private final ConcurrentHashMap<Long, Message> createContainerResponseMap;
|
|
|
private ExecutorService[] executors;
|
|
|
private final int numExecutors;
|
|
|
- private final Map<Long, Long> containerCommandCompletionMap;
|
|
|
+ private final Map<Long, Long> applyTransactionCompletionMap;
|
|
|
+ private long lastIndex;
|
|
|
/**
|
|
|
* CSM metrics.
|
|
|
*/
|
|
@@ -137,7 +138,8 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
this.executors = executors.toArray(new ExecutorService[numExecutors]);
|
|
|
this.writeChunkFutureMap = new ConcurrentHashMap<>();
|
|
|
this.createContainerResponseMap = new ConcurrentHashMap<>();
|
|
|
- containerCommandCompletionMap = new ConcurrentHashMap<>();
|
|
|
+ applyTransactionCompletionMap = new ConcurrentHashMap<>();
|
|
|
+ this.lastIndex = RaftServerConstants.INVALID_LOG_INDEX;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -161,10 +163,12 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
|
|
|
private long loadSnapshot(SingleFileSnapshotInfo snapshot) {
|
|
|
if (snapshot == null) {
|
|
|
- TermIndex empty = TermIndex.newTermIndex(0, 0);
|
|
|
+ TermIndex empty = TermIndex.newTermIndex(0,
|
|
|
+ RaftServerConstants.INVALID_LOG_INDEX);
|
|
|
LOG.info("The snapshot info is null." +
|
|
|
"Setting the last applied index to:" + empty);
|
|
|
setLastAppliedTermIndex(empty);
|
|
|
+ lastIndex = RaftServerConstants.INVALID_LOG_INDEX;
|
|
|
return RaftServerConstants.INVALID_LOG_INDEX;
|
|
|
}
|
|
|
|
|
@@ -173,6 +177,7 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
snapshot.getFile().getPath().toFile());
|
|
|
LOG.info("Setting the last applied index to " + last);
|
|
|
setLastAppliedTermIndex(last);
|
|
|
+ lastIndex = last.getIndex();
|
|
|
return last.getIndex();
|
|
|
}
|
|
|
|
|
@@ -450,7 +455,7 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
Long appliedTerm = null;
|
|
|
long appliedIndex = -1;
|
|
|
for(long i = getLastAppliedTermIndex().getIndex() + 1;; i++) {
|
|
|
- final Long removed = containerCommandCompletionMap.remove(i);
|
|
|
+ final Long removed = applyTransactionCompletionMap.remove(i);
|
|
|
if (removed == null) {
|
|
|
break;
|
|
|
}
|
|
@@ -458,7 +463,7 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
appliedIndex = i;
|
|
|
}
|
|
|
if (appliedTerm != null) {
|
|
|
- updateLastAppliedTermIndex(appliedIndex, appliedTerm);
|
|
|
+ updateLastAppliedTermIndex(appliedTerm, appliedIndex);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -467,7 +472,15 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
*/
|
|
|
@Override
|
|
|
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
|
|
|
+ // ApplyTransaction call can come with an entryIndex much greater than
|
|
|
+ // lastIndex updated because in between entries in the raft log can be
|
|
|
+ // appended because raft config persistence. Just add a dummy entry
|
|
|
+ // for those.
|
|
|
long index = trx.getLogEntry().getIndex();
|
|
|
+ for (long i = lastIndex + 1; i < index; i++) {
|
|
|
+ LOG.info("Gap in indexes at:{} detected, adding dummy entries ", i);
|
|
|
+ applyTransactionCompletionMap.put(i, trx.getLogEntry().getTerm());
|
|
|
+ }
|
|
|
try {
|
|
|
metrics.incNumApplyTransactionsOps();
|
|
|
ContainerCommandRequestProto requestProto =
|
|
@@ -500,8 +513,8 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
getCommandExecutor(requestProto));
|
|
|
} else if (cmdType == Type.CreateContainer) {
|
|
|
long containerID = requestProto.getContainerID();
|
|
|
- return CompletableFuture.completedFuture(
|
|
|
- createContainerResponseMap.get(containerID));
|
|
|
+ future = CompletableFuture.completedFuture(
|
|
|
+ createContainerResponseMap.remove(containerID));
|
|
|
} else {
|
|
|
// Make sure that in write chunk, the user data is not set
|
|
|
if (cmdType == Type.WriteChunk) {
|
|
@@ -512,9 +525,10 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
getCommandExecutor(requestProto));
|
|
|
}
|
|
|
|
|
|
+ lastIndex = index;
|
|
|
future.thenAccept(m -> {
|
|
|
final Long previous =
|
|
|
- containerCommandCompletionMap
|
|
|
+ applyTransactionCompletionMap
|
|
|
.put(index, trx.getLogEntry().getTerm());
|
|
|
Preconditions.checkState(previous == null);
|
|
|
updateLastApplied();
|