|
@@ -16,33 +16,17 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
|
|
|
|
|
|
-import java.io.FileInputStream;
|
|
|
-import java.nio.file.Files;
|
|
|
-import java.nio.file.Path;
|
|
|
import java.util.List;
|
|
|
-import java.util.concurrent.CompletableFuture;
|
|
|
-import java.util.concurrent.locks.Lock;
|
|
|
-import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
|
|
-import org.apache.hadoop.hdds.protocol.proto
|
|
|
- .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
|
|
|
-import org.apache.hadoop.hdds.protocol.proto
|
|
|
- .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
|
|
|
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
|
|
|
-import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
|
|
|
-import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
|
|
|
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
|
|
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
|
|
|
-import org.apache.hadoop.ozone.container.common.interfaces.Handler;
|
|
|
-import org.apache.hadoop.ozone.container.common.statemachine
|
|
|
- .SCMConnectionManager;
|
|
|
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
|
|
|
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
|
|
|
+import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
|
|
|
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
|
|
|
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
|
|
|
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
|
|
-import org.apache.hadoop.ozone.container.replication.ContainerDownloader;
|
|
|
-import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
|
|
|
+import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
|
|
|
+import org.apache.hadoop.ozone.container.replication.ReplicationTask;
|
|
|
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
|
|
|
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
|
|
|
|
@@ -58,39 +42,19 @@ public class ReplicateContainerCommandHandler implements CommandHandler {
|
|
|
static final Logger LOG =
|
|
|
LoggerFactory.getLogger(ReplicateContainerCommandHandler.class);
|
|
|
|
|
|
- private ContainerDispatcher containerDispatcher;
|
|
|
-
|
|
|
private int invocationCount;
|
|
|
|
|
|
private long totalTime;
|
|
|
|
|
|
- private ContainerDownloader downloader;
|
|
|
-
|
|
|
private Configuration conf;
|
|
|
|
|
|
- private TarContainerPacker packer = new TarContainerPacker();
|
|
|
-
|
|
|
- private ContainerSet containerSet;
|
|
|
-
|
|
|
- private Lock lock = new ReentrantLock();
|
|
|
+ private ReplicationSupervisor supervisor;
|
|
|
|
|
|
public ReplicateContainerCommandHandler(
|
|
|
Configuration conf,
|
|
|
- ContainerSet containerSet,
|
|
|
- ContainerDispatcher containerDispatcher,
|
|
|
- ContainerDownloader downloader) {
|
|
|
+ ReplicationSupervisor supervisor) {
|
|
|
this.conf = conf;
|
|
|
- this.containerSet = containerSet;
|
|
|
- this.downloader = downloader;
|
|
|
- this.containerDispatcher = containerDispatcher;
|
|
|
- }
|
|
|
-
|
|
|
- public ReplicateContainerCommandHandler(
|
|
|
- Configuration conf,
|
|
|
- ContainerSet containerSet,
|
|
|
- ContainerDispatcher containerDispatcher) {
|
|
|
- this(conf, containerSet, containerDispatcher,
|
|
|
- new SimpleContainerDownloader(conf));
|
|
|
+ this.supervisor = supervisor;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -108,72 +72,12 @@ public class ReplicateContainerCommandHandler implements CommandHandler {
|
|
|
String.format("Replication command is received for container %d "
|
|
|
+ "but the size of source datanodes was 0.", containerID));
|
|
|
|
|
|
- LOG.info("Starting replication of container {} from {}", containerID,
|
|
|
- sourceDatanodes);
|
|
|
- CompletableFuture<Path> tempTarFile = downloader
|
|
|
- .getContainerDataFromReplicas(containerID,
|
|
|
- sourceDatanodes);
|
|
|
-
|
|
|
- CompletableFuture<Void> result =
|
|
|
- tempTarFile.thenAccept(path -> {
|
|
|
- LOG.info("Container {} is downloaded, starting to import.",
|
|
|
- containerID);
|
|
|
- importContainer(containerID, path);
|
|
|
- });
|
|
|
-
|
|
|
- result.whenComplete((aVoid, throwable) -> {
|
|
|
- if (throwable != null) {
|
|
|
- LOG.error("Container replication was unsuccessful .", throwable);
|
|
|
- } else {
|
|
|
- LOG.info("Container {} is replicated successfully", containerID);
|
|
|
- }
|
|
|
- });
|
|
|
- } finally {
|
|
|
- updateCommandStatus(context, command, true, LOG);
|
|
|
+ ReplicationTask replicationTask =
|
|
|
+ new ReplicationTask(containerID, sourceDatanodes);
|
|
|
+ supervisor.addTask(replicationTask);
|
|
|
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- protected void importContainer(long containerID, Path tarFilePath) {
|
|
|
- lock.lock();
|
|
|
- try {
|
|
|
- ContainerData originalContainerData;
|
|
|
- try (FileInputStream tempContainerTarStream = new FileInputStream(
|
|
|
- tarFilePath.toFile())) {
|
|
|
- byte[] containerDescriptorYaml =
|
|
|
- packer.unpackContainerDescriptor(tempContainerTarStream);
|
|
|
- originalContainerData = ContainerDataYaml.readContainer(
|
|
|
- containerDescriptorYaml);
|
|
|
- }
|
|
|
-
|
|
|
- try (FileInputStream tempContainerTarStream = new FileInputStream(
|
|
|
- tarFilePath.toFile())) {
|
|
|
-
|
|
|
- Handler handler = containerDispatcher.getHandler(
|
|
|
- originalContainerData.getContainerType());
|
|
|
-
|
|
|
- Container container = handler.importContainer(containerID,
|
|
|
- originalContainerData.getMaxSize(),
|
|
|
- tempContainerTarStream,
|
|
|
- packer);
|
|
|
-
|
|
|
- containerSet.addContainer(container);
|
|
|
- }
|
|
|
-
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error(
|
|
|
- "Can't import the downloaded container data id=" + containerID,
|
|
|
- e);
|
|
|
- try {
|
|
|
- Files.delete(tarFilePath);
|
|
|
- } catch (Exception ex) {
|
|
|
- LOG.error(
|
|
|
- "Container import is failed and the downloaded file can't be "
|
|
|
- + "deleted: "
|
|
|
- + tarFilePath.toAbsolutePath().toString());
|
|
|
- }
|
|
|
} finally {
|
|
|
- lock.unlock();
|
|
|
+ updateCommandStatus(context, command, true, LOG);
|
|
|
}
|
|
|
}
|
|
|
|