|
@@ -411,7 +411,8 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
}
|
|
}
|
|
|
|
|
|
private CompletableFuture<Message> handleWriteChunk(
|
|
private CompletableFuture<Message> handleWriteChunk(
|
|
- ContainerCommandRequestProto requestProto, long entryIndex, long term) {
|
|
|
|
|
|
+ ContainerCommandRequestProto requestProto, long entryIndex, long term,
|
|
|
|
+ long startTime) {
|
|
final WriteChunkRequestProto write = requestProto.getWriteChunk();
|
|
final WriteChunkRequestProto write = requestProto.getWriteChunk();
|
|
RaftServer server = ratisServer.getServer();
|
|
RaftServer server = ratisServer.getServer();
|
|
Preconditions.checkState(server instanceof RaftServerProxy);
|
|
Preconditions.checkState(server instanceof RaftServerProxy);
|
|
@@ -461,6 +462,8 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
|
|
write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
|
|
write.getChunkData().getChunkName());
|
|
write.getChunkData().getChunkName());
|
|
raftFuture.complete(r::toByteString);
|
|
raftFuture.complete(r::toByteString);
|
|
|
|
+ metrics.recordWriteStateMachineCompletion(
|
|
|
|
+ Time.monotonicNowNanos() - startTime);
|
|
}
|
|
}
|
|
|
|
|
|
writeChunkFutureMap.remove(entryIndex);
|
|
writeChunkFutureMap.remove(entryIndex);
|
|
@@ -477,6 +480,7 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) {
|
|
public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) {
|
|
try {
|
|
try {
|
|
metrics.incNumWriteStateMachineOps();
|
|
metrics.incNumWriteStateMachineOps();
|
|
|
|
+ long writeStateMachineStartTime = Time.monotonicNowNanos();
|
|
ContainerCommandRequestProto requestProto =
|
|
ContainerCommandRequestProto requestProto =
|
|
getContainerCommandRequestProto(
|
|
getContainerCommandRequestProto(
|
|
entry.getStateMachineLogEntry().getLogData());
|
|
entry.getStateMachineLogEntry().getLogData());
|
|
@@ -493,7 +497,7 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
switch (cmdType) {
|
|
switch (cmdType) {
|
|
case WriteChunk:
|
|
case WriteChunk:
|
|
return handleWriteChunk(requestProto, entry.getIndex(),
|
|
return handleWriteChunk(requestProto, entry.getIndex(),
|
|
- entry.getTerm());
|
|
|
|
|
|
+ entry.getTerm(), writeStateMachineStartTime);
|
|
default:
|
|
default:
|
|
throw new IllegalStateException("Cmd Type:" + cmdType
|
|
throw new IllegalStateException("Cmd Type:" + cmdType
|
|
+ " should not have state machine data");
|
|
+ " should not have state machine data");
|
|
@@ -673,6 +677,7 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
.setTerm(trx.getLogEntry().getTerm())
|
|
.setTerm(trx.getLogEntry().getTerm())
|
|
.setLogIndex(index);
|
|
.setLogIndex(index);
|
|
|
|
|
|
|
|
+ long applyTxnStartTime = Time.monotonicNowNanos();
|
|
try {
|
|
try {
|
|
applyTransactionSemaphore.acquire();
|
|
applyTransactionSemaphore.acquire();
|
|
metrics.incNumApplyTransactionsOps();
|
|
metrics.incNumApplyTransactionsOps();
|
|
@@ -740,7 +745,11 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return applyTransactionFuture;
|
|
return applyTransactionFuture;
|
|
- }).whenComplete((r, t) -> applyTransactionSemaphore.release());
|
|
|
|
|
|
+ }).whenComplete((r, t) -> {
|
|
|
|
+ applyTransactionSemaphore.release();
|
|
|
|
+ metrics.recordApplyTransactionCompletion(
|
|
|
|
+ Time.monotonicNowNanos() - applyTxnStartTime);
|
|
|
|
+ });
|
|
return applyTransactionFuture;
|
|
return applyTransactionFuture;
|
|
} catch (IOException | InterruptedException e) {
|
|
} catch (IOException | InterruptedException e) {
|
|
metrics.incNumApplyTransactionsFails();
|
|
metrics.incNumApplyTransactionsFails();
|