|
@@ -24,7 +24,6 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms
|
|
.ContainerPlacementPolicy;
|
|
.ContainerPlacementPolicy;
|
|
import org.apache.hadoop.hdds.scm.container.placement.algorithms
|
|
import org.apache.hadoop.hdds.scm.container.placement.algorithms
|
|
.SCMContainerPlacementRandom;
|
|
.SCMContainerPlacementRandom;
|
|
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
|
|
|
import org.apache.hadoop.hdds.scm.node.NodeManager;
|
|
import org.apache.hadoop.hdds.scm.node.NodeManager;
|
|
import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl;
|
|
import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl;
|
|
import org.apache.hadoop.hdds.scm.pipelines.standalone.StandaloneManagerImpl;
|
|
import org.apache.hadoop.hdds.scm.pipelines.standalone.StandaloneManagerImpl;
|
|
@@ -34,28 +33,17 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
|
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
|
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
|
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
|
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
|
|
import org.apache.hadoop.ozone.OzoneConsts;
|
|
import org.apache.hadoop.ozone.OzoneConsts;
|
|
-import org.apache.hadoop.ozone.common.statemachine
|
|
|
|
- .InvalidStateTransitionException;
|
|
|
|
-import org.apache.hadoop.ozone.common.statemachine.StateMachine;
|
|
|
|
-import org.apache.hadoop.ozone.lease.Lease;
|
|
|
|
-import org.apache.hadoop.ozone.lease.LeaseException;
|
|
|
|
-import org.apache.hadoop.ozone.lease.LeaseManager;
|
|
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.lang.reflect.Constructor;
|
|
import java.lang.reflect.Constructor;
|
|
import java.lang.reflect.InvocationTargetException;
|
|
import java.lang.reflect.InvocationTargetException;
|
|
-import java.util.HashSet;
|
|
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
import java.util.UUID;
|
|
import java.util.UUID;
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
-import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
|
|
|
|
- .FAILED_TO_CHANGE_PIPELINE_STATE;
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Sends the request to the right pipeline manager.
|
|
* Sends the request to the right pipeline manager.
|
|
*/
|
|
*/
|
|
@@ -69,10 +57,6 @@ public class PipelineSelector {
|
|
private final StandaloneManagerImpl standaloneManager;
|
|
private final StandaloneManagerImpl standaloneManager;
|
|
private final long containerSize;
|
|
private final long containerSize;
|
|
private final Node2PipelineMap node2PipelineMap;
|
|
private final Node2PipelineMap node2PipelineMap;
|
|
- private final LeaseManager<Pipeline> pipelineLeaseManager;
|
|
|
|
- private final StateMachine<LifeCycleState,
|
|
|
|
- HddsProtos.LifeCycleEvent> stateMachine;
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Constructs a pipeline Selector.
|
|
* Constructs a pipeline Selector.
|
|
*
|
|
*
|
|
@@ -93,74 +77,6 @@ public class PipelineSelector {
|
|
this.ratisManager =
|
|
this.ratisManager =
|
|
new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize,
|
|
new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize,
|
|
conf, node2PipelineMap);
|
|
conf, node2PipelineMap);
|
|
- // Initialize the container state machine.
|
|
|
|
- Set<HddsProtos.LifeCycleState> finalStates = new HashSet();
|
|
|
|
- long pipelineCreationLeaseTimeout = conf.getTimeDuration(
|
|
|
|
- ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT,
|
|
|
|
- ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT,
|
|
|
|
- TimeUnit.MILLISECONDS);
|
|
|
|
- LOG.trace("Starting Pipeline Lease Manager.");
|
|
|
|
- pipelineLeaseManager = new LeaseManager<>(pipelineCreationLeaseTimeout);
|
|
|
|
- pipelineLeaseManager.start();
|
|
|
|
-
|
|
|
|
- // These are the steady states of a container.
|
|
|
|
- finalStates.add(HddsProtos.LifeCycleState.OPEN);
|
|
|
|
- finalStates.add(HddsProtos.LifeCycleState.CLOSED);
|
|
|
|
-
|
|
|
|
- this.stateMachine = new StateMachine<>(HddsProtos.LifeCycleState.ALLOCATED,
|
|
|
|
- finalStates);
|
|
|
|
- initializeStateMachine();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Event and State Transition Mapping:
|
|
|
|
- *
|
|
|
|
- * State: ALLOCATED ---------------> CREATING
|
|
|
|
- * Event: CREATE
|
|
|
|
- *
|
|
|
|
- * State: CREATING ---------------> OPEN
|
|
|
|
- * Event: CREATED
|
|
|
|
- *
|
|
|
|
- * State: OPEN ---------------> CLOSING
|
|
|
|
- * Event: FINALIZE
|
|
|
|
- *
|
|
|
|
- * State: CLOSING ---------------> CLOSED
|
|
|
|
- * Event: CLOSE
|
|
|
|
- *
|
|
|
|
- * State: CREATING ---------------> CLOSED
|
|
|
|
- * Event: TIMEOUT
|
|
|
|
- *
|
|
|
|
- *
|
|
|
|
- * Container State Flow:
|
|
|
|
- *
|
|
|
|
- * [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING]
|
|
|
|
- * (CREATE) | (CREATED) (FINALIZE) |
|
|
|
|
- * | |
|
|
|
|
- * | |
|
|
|
|
- * |(TIMEOUT) |(CLOSE)
|
|
|
|
- * | |
|
|
|
|
- * +--------> [CLOSED] <--------+
|
|
|
|
- */
|
|
|
|
- private void initializeStateMachine() {
|
|
|
|
- stateMachine.addTransition(HddsProtos.LifeCycleState.ALLOCATED,
|
|
|
|
- HddsProtos.LifeCycleState.CREATING,
|
|
|
|
- HddsProtos.LifeCycleEvent.CREATE);
|
|
|
|
-
|
|
|
|
- stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
|
|
|
|
- HddsProtos.LifeCycleState.OPEN,
|
|
|
|
- HddsProtos.LifeCycleEvent.CREATED);
|
|
|
|
-
|
|
|
|
- stateMachine.addTransition(HddsProtos.LifeCycleState.OPEN,
|
|
|
|
- HddsProtos.LifeCycleState.CLOSING,
|
|
|
|
- HddsProtos.LifeCycleEvent.FINALIZE);
|
|
|
|
-
|
|
|
|
- stateMachine.addTransition(HddsProtos.LifeCycleState.CLOSING,
|
|
|
|
- HddsProtos.LifeCycleState.CLOSED,
|
|
|
|
- HddsProtos.LifeCycleEvent.CLOSE);
|
|
|
|
-
|
|
|
|
- stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
|
|
|
|
- HddsProtos.LifeCycleState.CLOSED,
|
|
|
|
- HddsProtos.LifeCycleEvent.TIMEOUT);
|
|
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -172,14 +88,15 @@ public class PipelineSelector {
|
|
* @return pipeline corresponding to nodes
|
|
* @return pipeline corresponding to nodes
|
|
*/
|
|
*/
|
|
public static Pipeline newPipelineFromNodes(
|
|
public static Pipeline newPipelineFromNodes(
|
|
- List<DatanodeDetails> nodes, ReplicationType replicationType,
|
|
|
|
- ReplicationFactor replicationFactor, String name) {
|
|
|
|
|
|
+ List<DatanodeDetails> nodes, LifeCycleState state,
|
|
|
|
+ ReplicationType replicationType, ReplicationFactor replicationFactor,
|
|
|
|
+ String name) {
|
|
Preconditions.checkNotNull(nodes);
|
|
Preconditions.checkNotNull(nodes);
|
|
Preconditions.checkArgument(nodes.size() > 0);
|
|
Preconditions.checkArgument(nodes.size() > 0);
|
|
String leaderId = nodes.get(0).getUuidString();
|
|
String leaderId = nodes.get(0).getUuidString();
|
|
- // A new pipeline always starts in allocated state
|
|
|
|
- Pipeline pipeline = new Pipeline(leaderId, LifeCycleState.ALLOCATED,
|
|
|
|
- replicationType, replicationFactor, name);
|
|
|
|
|
|
+ Pipeline
|
|
|
|
+ pipeline = new Pipeline(leaderId, state, replicationType,
|
|
|
|
+ replicationFactor, name);
|
|
for (DatanodeDetails node : nodes) {
|
|
for (DatanodeDetails node : nodes) {
|
|
pipeline.addMember(node);
|
|
pipeline.addMember(node);
|
|
}
|
|
}
|
|
@@ -258,35 +175,8 @@ public class PipelineSelector {
|
|
LOG.debug("Getting replication pipeline forReplicationType {} :" +
|
|
LOG.debug("Getting replication pipeline forReplicationType {} :" +
|
|
" ReplicationFactor {}", replicationType.toString(),
|
|
" ReplicationFactor {}", replicationType.toString(),
|
|
replicationFactor.toString());
|
|
replicationFactor.toString());
|
|
-
|
|
|
|
- /**
|
|
|
|
- * In the Ozone world, we have a very simple policy.
|
|
|
|
- *
|
|
|
|
- * 1. Try to create a pipeline if there are enough free nodes.
|
|
|
|
- *
|
|
|
|
- * 2. This allows all nodes to part of a pipeline quickly.
|
|
|
|
- *
|
|
|
|
- * 3. if there are not enough free nodes, return already allocated pipeline
|
|
|
|
- * in a round-robin fashion.
|
|
|
|
- *
|
|
|
|
- * TODO: Might have to come up with a better algorithm than this.
|
|
|
|
- * Create a new placement policy that returns pipelines in round robin
|
|
|
|
- * fashion.
|
|
|
|
- */
|
|
|
|
- Pipeline pipeline =
|
|
|
|
- manager.createPipeline(replicationFactor, replicationType);
|
|
|
|
- if (pipeline == null) {
|
|
|
|
- // try to return a pipeline from already allocated pipelines
|
|
|
|
- pipeline = manager.getPipeline(replicationFactor, replicationType);
|
|
|
|
- } else {
|
|
|
|
- // if a new pipeline is created, initialize its state machine
|
|
|
|
- updatePipelineState(pipeline,HddsProtos.LifeCycleEvent.CREATE);
|
|
|
|
-
|
|
|
|
- //TODO: move the initialization of pipeline to Ozone Client
|
|
|
|
- manager.initializePipeline(pipeline);
|
|
|
|
- updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CREATED);
|
|
|
|
- }
|
|
|
|
- return pipeline;
|
|
|
|
|
|
+ return manager.
|
|
|
|
+ getPipeline(replicationFactor, replicationType);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -304,6 +194,19 @@ public class PipelineSelector {
|
|
" pipelineName:{}", replicationType, pipelineName);
|
|
" pipelineName:{}", replicationType, pipelineName);
|
|
return manager.getPipeline(pipelineName);
|
|
return manager.getPipeline(pipelineName);
|
|
}
|
|
}
|
|
|
|
+ /**
|
|
|
|
+ * Creates a pipeline from a specified set of Nodes.
|
|
|
|
+ */
|
|
|
|
+
|
|
|
|
+ public void createPipeline(ReplicationType replicationType, String
|
|
|
|
+ pipelineID, List<DatanodeDetails> datanodes) throws IOException {
|
|
|
|
+ PipelineManager manager = getPipelineManager(replicationType);
|
|
|
|
+ Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
|
|
|
|
+ LOG.debug("Creating a pipeline: {} with nodes:{}", pipelineID,
|
|
|
|
+ datanodes.stream().map(DatanodeDetails::toString)
|
|
|
|
+ .collect(Collectors.joining(",")));
|
|
|
|
+ manager.createPipeline(pipelineID, datanodes);
|
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
/**
|
|
* Close the pipeline with the given clusterId.
|
|
* Close the pipeline with the given clusterId.
|
|
@@ -348,77 +251,12 @@ public class PipelineSelector {
|
|
}
|
|
}
|
|
|
|
|
|
public void removePipeline(UUID dnId) {
|
|
public void removePipeline(UUID dnId) {
|
|
- Set<Pipeline> pipelineSet =
|
|
|
|
|
|
+ Set<Pipeline> pipelineChannelSet =
|
|
node2PipelineMap.getPipelines(dnId);
|
|
node2PipelineMap.getPipelines(dnId);
|
|
- for (Pipeline pipeline : pipelineSet) {
|
|
|
|
- getPipelineManager(pipeline.getType())
|
|
|
|
- .removePipeline(pipeline);
|
|
|
|
|
|
+ for (Pipeline pipelineChannel : pipelineChannelSet) {
|
|
|
|
+ getPipelineManager(pipelineChannel.getType())
|
|
|
|
+ .removePipeline(pipelineChannel);
|
|
}
|
|
}
|
|
node2PipelineMap.removeDatanode(dnId);
|
|
node2PipelineMap.removeDatanode(dnId);
|
|
}
|
|
}
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Update the Pipeline State to the next state.
|
|
|
|
- *
|
|
|
|
- * @param pipeline - Pipeline
|
|
|
|
- * @param event - LifeCycle Event
|
|
|
|
- * @throws SCMException on Failure.
|
|
|
|
- */
|
|
|
|
- public void updatePipelineState(Pipeline pipeline,
|
|
|
|
- HddsProtos.LifeCycleEvent event) throws IOException {
|
|
|
|
- HddsProtos.LifeCycleState newState;
|
|
|
|
- try {
|
|
|
|
- newState = stateMachine.getNextState(pipeline.getLifeCycleState(), event);
|
|
|
|
- } catch (InvalidStateTransitionException ex) {
|
|
|
|
- String error = String.format("Failed to update pipeline state %s, " +
|
|
|
|
- "reason: invalid state transition from state: %s upon " +
|
|
|
|
- "event: %s.",
|
|
|
|
- pipeline.getPipelineName(), pipeline.getLifeCycleState(), event);
|
|
|
|
- LOG.error(error);
|
|
|
|
- throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // This is a post condition after executing getNextState.
|
|
|
|
- Preconditions.checkNotNull(newState);
|
|
|
|
- Preconditions.checkNotNull(pipeline);
|
|
|
|
- try {
|
|
|
|
- switch (event) {
|
|
|
|
- case CREATE:
|
|
|
|
- // Acquire lease on pipeline
|
|
|
|
- Lease<Pipeline> pipelineLease = pipelineLeaseManager.acquire(pipeline);
|
|
|
|
- // Register callback to be executed in case of timeout
|
|
|
|
- pipelineLease.registerCallBack(() -> {
|
|
|
|
- updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.TIMEOUT);
|
|
|
|
- return null;
|
|
|
|
- });
|
|
|
|
- break;
|
|
|
|
- case CREATED:
|
|
|
|
- // Release the lease on pipeline
|
|
|
|
- pipelineLeaseManager.release(pipeline);
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- case FINALIZE:
|
|
|
|
- //TODO: cleanup pipeline by closing all the containers on the pipeline
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- case CLOSE:
|
|
|
|
- case TIMEOUT:
|
|
|
|
- // TODO: Release the nodes here when pipelines are destroyed
|
|
|
|
- break;
|
|
|
|
- default:
|
|
|
|
- throw new SCMException("Unsupported pipeline LifeCycleEvent.",
|
|
|
|
- FAILED_TO_CHANGE_PIPELINE_STATE);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- pipeline.setLifeCycleState(newState);
|
|
|
|
- } catch (LeaseException e) {
|
|
|
|
- throw new IOException("Lease Exception.", e);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public void shutdown() {
|
|
|
|
- if (pipelineLeaseManager != null) {
|
|
|
|
- pipelineLeaseManager.shutdown();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
}
|
|
}
|