|
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
|
|
|
|
|
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
|
|
|
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
|
|
+import org.apache.hadoop.util.Time;
|
|
|
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
|
|
|
import org.apache.ratis.protocol.RaftGroupId;
|
|
|
import org.apache.ratis.server.RaftServer;
|
|
@@ -195,17 +196,16 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
if (snapshot == null) {
|
|
|
TermIndex empty =
|
|
|
TermIndex.newTermIndex(0, RaftLog.INVALID_LOG_INDEX);
|
|
|
- LOG.info(
|
|
|
- "The snapshot info is null." + "Setting the last applied index to:"
|
|
|
- + empty);
|
|
|
+ LOG.info("{}: The snapshot info is null. Setting the last applied index" +
|
|
|
+ "to:{}", gid, empty);
|
|
|
setLastAppliedTermIndex(empty);
|
|
|
- return RaftLog.INVALID_LOG_INDEX;
|
|
|
+ return empty.getIndex();
|
|
|
}
|
|
|
|
|
|
final File snapshotFile = snapshot.getFile().getPath().toFile();
|
|
|
final TermIndex last =
|
|
|
SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
|
|
|
- LOG.info("Setting the last applied index to " + last);
|
|
|
+ LOG.info("{}: Setting the last applied index to {}", gid, last);
|
|
|
setLastAppliedTermIndex(last);
|
|
|
|
|
|
// initialize the dispatcher with snapshot so that it build the missing
|
|
@@ -241,18 +241,20 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
@Override
|
|
|
public long takeSnapshot() throws IOException {
|
|
|
TermIndex ti = getLastAppliedTermIndex();
|
|
|
- LOG.info("Taking snapshot at termIndex:" + ti);
|
|
|
+ long startTime = Time.monotonicNow();
|
|
|
if (ti != null && ti.getIndex() != RaftLog.INVALID_LOG_INDEX) {
|
|
|
final File snapshotFile =
|
|
|
storage.getSnapshotFile(ti.getTerm(), ti.getIndex());
|
|
|
- LOG.info("Taking a snapshot to file {}", snapshotFile);
|
|
|
+ LOG.info("{}: Taking a snapshot at:{} file {}", gid, ti, snapshotFile);
|
|
|
try (FileOutputStream fos = new FileOutputStream(snapshotFile)) {
|
|
|
persistContainerSet(fos);
|
|
|
} catch (IOException ioe) {
|
|
|
- LOG.warn("Failed to write snapshot file \"" + snapshotFile
|
|
|
- + "\", last applied index=" + ti);
|
|
|
+ LOG.info("{}: Failed to write snapshot at:{} file {}", gid, ti,
|
|
|
+ snapshotFile);
|
|
|
throw ioe;
|
|
|
}
|
|
|
+ LOG.info("{}: Finished taking a snapshot at:{} file:{} time:{}",
|
|
|
+ gid, ti, snapshotFile, (Time.monotonicNow() - startTime));
|
|
|
return ti.getIndex();
|
|
|
}
|
|
|
return -1;
|
|
@@ -326,7 +328,7 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
|
|
|
private ContainerCommandResponseProto dispatchCommand(
|
|
|
ContainerCommandRequestProto requestProto, DispatcherContext context) {
|
|
|
- LOG.trace("dispatch {} containerID={} pipelineID={} traceID={}",
|
|
|
+ LOG.trace("{}: dispatch {} containerID={} pipelineID={} traceID={}", gid,
|
|
|
requestProto.getCmdType(), requestProto.getContainerID(),
|
|
|
requestProto.getPipelineID(), requestProto.getTraceID());
|
|
|
if (isBlockTokenEnabled) {
|
|
@@ -344,7 +346,7 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
}
|
|
|
ContainerCommandResponseProto response =
|
|
|
dispatcher.dispatch(requestProto, context);
|
|
|
- LOG.trace("response {}", response);
|
|
|
+ LOG.trace("{}: response {}", gid, response);
|
|
|
return response;
|
|
|
}
|
|
|
|
|
@@ -384,18 +386,18 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
.supplyAsync(() -> runCommand(requestProto, context), chunkExecutor);
|
|
|
|
|
|
writeChunkFutureMap.put(entryIndex, writeChunkFuture);
|
|
|
- LOG.debug("writeChunk writeStateMachineData : blockId " + write.getBlockID()
|
|
|
- + " logIndex " + entryIndex + " chunkName " + write.getChunkData()
|
|
|
- .getChunkName());
|
|
|
+ LOG.debug(gid + ": writeChunk writeStateMachineData : blockId " +
|
|
|
+ write.getBlockID() + " logIndex " + entryIndex + " chunkName "
|
|
|
+ + write.getChunkData().getChunkName());
|
|
|
// Remove the future once it finishes execution from the
|
|
|
// writeChunkFutureMap.
|
|
|
writeChunkFuture.thenApply(r -> {
|
|
|
metrics.incNumBytesWrittenCount(
|
|
|
requestProto.getWriteChunk().getChunkData().getLen());
|
|
|
writeChunkFutureMap.remove(entryIndex);
|
|
|
- LOG.debug("writeChunk writeStateMachineData completed: blockId " + write
|
|
|
- .getBlockID() + " logIndex " + entryIndex + " chunkName " + write
|
|
|
- .getChunkData().getChunkName());
|
|
|
+ LOG.debug(gid + ": writeChunk writeStateMachineData completed: blockId" +
|
|
|
+ write.getBlockID() + " logIndex " + entryIndex + " chunkName "
|
|
|
+ + write.getChunkData().getChunkName());
|
|
|
return r;
|
|
|
});
|
|
|
return writeChunkFuture;
|
|
@@ -554,12 +556,12 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
metrics.incNumReadStateMachineFails();
|
|
|
- LOG.error("unable to read stateMachineData:" + e);
|
|
|
+ LOG.error("{} unable to read stateMachineData:", gid, e);
|
|
|
return completeExceptionally(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void updateLastApplied() {
|
|
|
+ private synchronized void updateLastApplied() {
|
|
|
Long appliedTerm = null;
|
|
|
long appliedIndex = -1;
|
|
|
for(long i = getLastAppliedTermIndex().getIndex() + 1;; i++) {
|