|
@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis;
|
|
|
import com.google.common.base.Preconditions;
|
|
|
import com.google.common.cache.Cache;
|
|
|
import com.google.common.cache.CacheBuilder;
|
|
|
+import org.apache.commons.io.IOUtils;
|
|
|
import org.apache.hadoop.hdds.HddsUtils;
|
|
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
|
|
|
|
@@ -36,6 +37,8 @@ import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
|
|
|
import org.apache.ratis.thirdparty.com.google.protobuf
|
|
|
.InvalidProtocolBufferException;
|
|
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
|
|
|
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
|
|
|
+ ContainerIdSetProto;
|
|
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
|
|
.ContainerCommandRequestProto;
|
|
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
|
@@ -77,6 +80,12 @@ import java.util.concurrent.Callable;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
import java.util.stream.Collectors;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.concurrent.ConcurrentSkipListSet;
|
|
|
+import java.io.FileOutputStream;
|
|
|
+import java.io.FileInputStream;
|
|
|
+import java.io.OutputStream;
|
|
|
+
|
|
|
|
|
|
/** A {@link org.apache.ratis.statemachine.StateMachine} for containers.
|
|
|
*
|
|
@@ -126,6 +135,9 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
private final XceiverServerRatis ratisServer;
|
|
|
private final ConcurrentHashMap<Long, CompletableFuture<Message>>
|
|
|
writeChunkFutureMap;
|
|
|
+
|
|
|
+ // keeps track of the containers created per pipeline
|
|
|
+ private final Set<Long> createContainerSet;
|
|
|
private ExecutorService[] executors;
|
|
|
private final int numExecutors;
|
|
|
private final Map<Long, Long> applyTransactionCompletionMap;
|
|
@@ -160,6 +172,7 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
.maximumSize(chunkExecutor.getCorePoolSize()).build();
|
|
|
this.isBlockTokenEnabled = isBlockTokenEnabled;
|
|
|
this.tokenVerifier = tokenVerifier;
|
|
|
+ this.createContainerSet = new ConcurrentSkipListSet<>();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -181,26 +194,56 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
loadSnapshot(storage.getLatestSnapshot());
|
|
|
}
|
|
|
|
|
|
- private long loadSnapshot(SingleFileSnapshotInfo snapshot) {
|
|
|
+ private long loadSnapshot(SingleFileSnapshotInfo snapshot)
|
|
|
+ throws IOException {
|
|
|
if (snapshot == null) {
|
|
|
- TermIndex empty = TermIndex.newTermIndex(0,
|
|
|
- RaftServerConstants.INVALID_LOG_INDEX);
|
|
|
- LOG.info("The snapshot info is null." +
|
|
|
- "Setting the last applied index to:" + empty);
|
|
|
+ TermIndex empty =
|
|
|
+ TermIndex.newTermIndex(0, RaftServerConstants.INVALID_LOG_INDEX);
|
|
|
+ LOG.info(
|
|
|
+ "The snapshot info is null." + "Setting the last applied index to:"
|
|
|
+ + empty);
|
|
|
setLastAppliedTermIndex(empty);
|
|
|
lastIndex = RaftServerConstants.INVALID_LOG_INDEX;
|
|
|
return RaftServerConstants.INVALID_LOG_INDEX;
|
|
|
}
|
|
|
|
|
|
+ final File snapshotFile = snapshot.getFile().getPath().toFile();
|
|
|
final TermIndex last =
|
|
|
- SimpleStateMachineStorage.getTermIndexFromSnapshotFile(
|
|
|
- snapshot.getFile().getPath().toFile());
|
|
|
+ SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
|
|
|
LOG.info("Setting the last applied index to " + last);
|
|
|
setLastAppliedTermIndex(last);
|
|
|
lastIndex = last.getIndex();
|
|
|
+
|
|
|
+ // initialize the dispatcher with snapshot so that it build the missing
|
|
|
+ // container list
|
|
|
+ try (FileInputStream fin = new FileInputStream(snapshotFile)) {
|
|
|
+ byte[] containerIds = IOUtils.toByteArray(fin);
|
|
|
+ ContainerProtos.ContainerIdSetProto proto =
|
|
|
+ ContainerProtos.ContainerIdSetProto.parseFrom(containerIds);
|
|
|
+ // read the created containers list from the snapshot file and add it to
|
|
|
+ // the createContainerSet here.
|
|
|
+ // createContainerSet will further grow as and when containers get created
|
|
|
+ createContainerSet.addAll(proto.getContainerIdList());
|
|
|
+ dispatcher.buildMissingContainerSet(createContainerSet);
|
|
|
+ }
|
|
|
return last.getIndex();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * As a part of taking snapshot with Ratis StateMachine, it will persist
|
|
|
+ * the existing container set in the snapshotFile.
|
|
|
+ * @param out OutputStream mapped to the Ratis snapshot file
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public void persistContainerSet(OutputStream out) throws IOException {
|
|
|
+ ContainerIdSetProto.Builder builder = ContainerIdSetProto.newBuilder();
|
|
|
+ builder.addAllContainerId(createContainerSet);
|
|
|
+ // TODO : while snapshot is being taken, deleteContainer call should not
|
|
|
+ // should not happen. Lock protection will be required if delete
|
|
|
+ // container happens outside of Ratis.
|
|
|
+ IOUtils.write(builder.build().toByteArray(), out);
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public long takeSnapshot() throws IOException {
|
|
|
TermIndex ti = getLastAppliedTermIndex();
|
|
@@ -211,8 +254,13 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
LOG.info("Taking a snapshot to file {}", snapshotFile);
|
|
|
try {
|
|
|
//TODO: For now, just create the file to save the term index,
|
|
|
- //persist open container info to snapshot later.
|
|
|
- snapshotFile.createNewFile();
|
|
|
+ boolean created = snapshotFile.createNewFile();
|
|
|
+ if (!created) {
|
|
|
+ throw new IOException("Failed to create ratis snapshot file");
|
|
|
+ }
|
|
|
+ try (FileOutputStream fos = new FileOutputStream(snapshotFile)) {
|
|
|
+ persistContainerSet(fos);
|
|
|
+ }
|
|
|
} catch(IOException ioe) {
|
|
|
LOG.warn("Failed to write snapshot file \"" + snapshotFile
|
|
|
+ "\", last applied index=" + ti);
|
|
@@ -344,6 +392,7 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
.setTerm(term)
|
|
|
.setLogIndex(entryIndex)
|
|
|
.setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
|
|
|
+ .setCreateContainerSet(createContainerSet)
|
|
|
.build();
|
|
|
CompletableFuture<Message> writeChunkFuture;
|
|
|
try {
|
|
@@ -586,6 +635,9 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|
|
builder
|
|
|
.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA);
|
|
|
}
|
|
|
+ if (cmdType == Type.WriteChunk || cmdType ==Type.PutSmallFile) {
|
|
|
+ builder.setCreateContainerSet(createContainerSet);
|
|
|
+ }
|
|
|
try {
|
|
|
Message msg = runCommand(requestProto, builder.build());
|
|
|
future = CompletableFuture.supplyAsync(() -> msg,
|