|
@@ -150,7 +150,7 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
private final Cache<Long, ByteString> stateMachineDataCache;
|
|
private final Cache<Long, ByteString> stateMachineDataCache;
|
|
private final boolean isBlockTokenEnabled;
|
|
private final boolean isBlockTokenEnabled;
|
|
private final TokenVerifier tokenVerifier;
|
|
private final TokenVerifier tokenVerifier;
|
|
- private final AtomicBoolean isStateMachineHealthy;
|
|
|
|
|
|
+ private final AtomicBoolean stateMachineHealthy;
|
|
|
|
|
|
private final Semaphore applyTransactionSemaphore;
|
|
private final Semaphore applyTransactionSemaphore;
|
|
/**
|
|
/**
|
|
@@ -190,7 +190,7 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
ScmConfigKeys.
|
|
ScmConfigKeys.
|
|
DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT);
|
|
DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT);
|
|
applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions);
|
|
applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions);
|
|
- isStateMachineHealthy = new AtomicBoolean(true);
|
|
|
|
|
|
+ stateMachineHealthy = new AtomicBoolean(true);
|
|
this.executors = new ExecutorService[numContainerOpExecutors];
|
|
this.executors = new ExecutorService[numContainerOpExecutors];
|
|
for (int i = 0; i < numContainerOpExecutors; i++) {
|
|
for (int i = 0; i < numContainerOpExecutors; i++) {
|
|
final int index = i;
|
|
final int index = i;
|
|
@@ -271,11 +271,15 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
IOUtils.write(builder.build().toByteArray(), out);
|
|
IOUtils.write(builder.build().toByteArray(), out);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public boolean isStateMachineHealthy() {
|
|
|
|
+ return stateMachineHealthy.get();
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public long takeSnapshot() throws IOException {
|
|
public long takeSnapshot() throws IOException {
|
|
TermIndex ti = getLastAppliedTermIndex();
|
|
TermIndex ti = getLastAppliedTermIndex();
|
|
long startTime = Time.monotonicNow();
|
|
long startTime = Time.monotonicNow();
|
|
- if (!isStateMachineHealthy.get()) {
|
|
|
|
|
|
+ if (!isStateMachineHealthy()) {
|
|
String msg =
|
|
String msg =
|
|
"Failed to take snapshot " + " for " + gid + " as the stateMachine"
|
|
"Failed to take snapshot " + " for " + gid + " as the stateMachine"
|
|
+ " is unhealthy. The last applied index is at " + ti;
|
|
+ " is unhealthy. The last applied index is at " + ti;
|
|
@@ -731,7 +735,11 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
metrics.incPipelineLatency(cmdType,
|
|
metrics.incPipelineLatency(cmdType,
|
|
Time.monotonicNowNanos() - startTime);
|
|
Time.monotonicNowNanos() - startTime);
|
|
}
|
|
}
|
|
- if (r.getResult() != ContainerProtos.Result.SUCCESS) {
|
|
|
|
|
|
+ // ignore close container exception while marking the stateMachine
|
|
|
|
+ // unhealthy
|
|
|
|
+ if (r.getResult() != ContainerProtos.Result.SUCCESS
|
|
|
|
+ && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
|
|
|
|
+ && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO) {
|
|
StorageContainerException sce =
|
|
StorageContainerException sce =
|
|
new StorageContainerException(r.getMessage(), r.getResult());
|
|
new StorageContainerException(r.getMessage(), r.getResult());
|
|
LOG.error(
|
|
LOG.error(
|
|
@@ -744,7 +752,7 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
// caught in stateMachineUpdater in Ratis and ratis server will
|
|
// caught in stateMachineUpdater in Ratis and ratis server will
|
|
// shutdown.
|
|
// shutdown.
|
|
applyTransactionFuture.completeExceptionally(sce);
|
|
applyTransactionFuture.completeExceptionally(sce);
|
|
- isStateMachineHealthy.compareAndSet(true, false);
|
|
|
|
|
|
+ stateMachineHealthy.compareAndSet(true, false);
|
|
ratisServer.handleApplyTransactionFailure(gid, trx.getServerRole());
|
|
ratisServer.handleApplyTransactionFailure(gid, trx.getServerRole());
|
|
} else {
|
|
} else {
|
|
LOG.debug(
|
|
LOG.debug(
|
|
@@ -759,7 +767,7 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
// add the entry to the applyTransactionCompletionMap only if the
|
|
// add the entry to the applyTransactionCompletionMap only if the
|
|
// stateMachine is healthy i.e, there has been no applyTransaction
|
|
// stateMachine is healthy i.e, there has been no applyTransaction
|
|
// failures before.
|
|
// failures before.
|
|
- if (isStateMachineHealthy.get()) {
|
|
|
|
|
|
+ if (isStateMachineHealthy()) {
|
|
final Long previous = applyTransactionCompletionMap
|
|
final Long previous = applyTransactionCompletionMap
|
|
.put(index, trx.getLogEntry().getTerm());
|
|
.put(index, trx.getLogEntry().getTerm());
|
|
Preconditions.checkState(previous == null);
|
|
Preconditions.checkState(previous == null);
|