|
@@ -17,12 +17,15 @@
|
|
|
package org.apache.hadoop.ozone.scm.pipelines.ratis;
|
|
|
|
|
|
import com.google.common.base.Preconditions;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
|
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
|
|
-import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy;
|
|
|
+import org.apache.hadoop.ozone.scm.container.placement.algorithms
|
|
|
+ .ContainerPlacementPolicy;
|
|
|
import org.apache.hadoop.ozone.scm.node.NodeManager;
|
|
|
import org.apache.hadoop.ozone.scm.pipelines.PipelineManager;
|
|
|
import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
|
|
|
+import org.apache.hadoop.scm.XceiverClientRatis;
|
|
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -35,8 +38,10 @@ import java.util.Set;
|
|
|
import java.util.UUID;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
-import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState.ALLOCATED;
|
|
|
-import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState.OPEN;
|
|
|
+import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos
|
|
|
+ .LifeCycleState.ALLOCATED;
|
|
|
+import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos
|
|
|
+ .LifeCycleState.OPEN;
|
|
|
|
|
|
|
|
|
/**
|
|
@@ -54,6 +59,7 @@ public class RatisManagerImpl implements PipelineManager {
|
|
|
private final List<Pipeline> activePipelines;
|
|
|
private final AtomicInteger pipelineIndex;
|
|
|
private static final String PREFIX = "Ratis-";
|
|
|
+ private final Configuration conf;
|
|
|
|
|
|
/**
|
|
|
* Constructs a Ratis Pipeline Manager.
|
|
@@ -61,13 +67,14 @@ public class RatisManagerImpl implements PipelineManager {
|
|
|
* @param nodeManager
|
|
|
*/
|
|
|
public RatisManagerImpl(NodeManager nodeManager,
|
|
|
- ContainerPlacementPolicy placementPolicy, long size) {
|
|
|
+ ContainerPlacementPolicy placementPolicy, long size, Configuration conf) {
|
|
|
this.nodeManager = nodeManager;
|
|
|
this.placementPolicy = placementPolicy;
|
|
|
this.containerSize = size;
|
|
|
ratisMembers = new HashSet<>();
|
|
|
activePipelines = new LinkedList<>();
|
|
|
pipelineIndex = new AtomicInteger(0);
|
|
|
+ this.conf = conf;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -85,7 +92,7 @@ public class RatisManagerImpl implements PipelineManager {
|
|
|
*/
|
|
|
@Override
|
|
|
public synchronized Pipeline getPipeline(String containerName,
|
|
|
- OzoneProtos.ReplicationFactor replicationFactor) {
|
|
|
+ OzoneProtos.ReplicationFactor replicationFactor) throws IOException {
|
|
|
/**
|
|
|
* In the ratis world, we have a very simple policy.
|
|
|
*
|
|
@@ -106,7 +113,13 @@ public class RatisManagerImpl implements PipelineManager {
|
|
|
Preconditions.checkState(newNodes.size() ==
|
|
|
getReplicationCount(replicationFactor), "Replication factor " +
|
|
|
"does not match the expected node count.");
|
|
|
- pipeline = allocateRatisPipeline(newNodes, containerName);
|
|
|
+ pipeline =
|
|
|
+ allocateRatisPipeline(newNodes, containerName, replicationFactor);
|
|
|
+ try (XceiverClientRatis client =
|
|
|
+ XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
|
|
|
+ client
|
|
|
+ .createPipeline(pipeline.getPipelineName(), pipeline.getMachines());
|
|
|
+ }
|
|
|
} else {
|
|
|
pipeline = findOpenPipeline();
|
|
|
}
|
|
@@ -151,7 +164,8 @@ public class RatisManagerImpl implements PipelineManager {
|
|
|
* @param containerName - container Name
|
|
|
* @return - Pipeline.
|
|
|
*/
|
|
|
- Pipeline allocateRatisPipeline(List<DatanodeID> nodes, String containerName) {
|
|
|
+ Pipeline allocateRatisPipeline(List<DatanodeID> nodes, String containerName,
|
|
|
+ OzoneProtos.ReplicationFactor factor) {
|
|
|
Preconditions.checkNotNull(nodes);
|
|
|
Pipeline pipeline = PipelineSelector.newPipelineFromNodes(nodes);
|
|
|
if (pipeline != null) {
|
|
@@ -160,6 +174,7 @@ public class RatisManagerImpl implements PipelineManager {
|
|
|
UUID.randomUUID().toString().substring(PREFIX.length());
|
|
|
pipeline.setType(OzoneProtos.ReplicationType.RATIS);
|
|
|
pipeline.setLifeCycleState(ALLOCATED);
|
|
|
+ pipeline.setFactor(factor);
|
|
|
pipeline.setPipelineName(pipelineName);
|
|
|
pipeline.setContainerName(containerName);
|
|
|
LOG.info("Creating new ratis pipeline: {}", pipeline.toString());
|
|
@@ -192,8 +207,12 @@ public class RatisManagerImpl implements PipelineManager {
|
|
|
//TODO: Add Raft State to the Nodes, so we can query and skip nodes from
|
|
|
// data from datanode instead of maintaining a set.
|
|
|
for (DatanodeID datanode : datanodes) {
|
|
|
+ Preconditions.checkNotNull(datanode);
|
|
|
if (!ratisMembers.contains(datanode)) {
|
|
|
newNodesList.add(datanode);
|
|
|
+ // once a datanode has been added to a pipeline, exclude it from
|
|
|
+ // further allocations
|
|
|
+ ratisMembers.add(datanode);
|
|
|
if (newNodesList.size() == count) {
|
|
|
LOG.info("Allocating a new pipeline of size: {}", count);
|
|
|
return newNodesList;
|