瀏覽代碼

HDDS-587. Add new classes for pipeline management. Contributed by Lokesh Jain.

Nandakumar 6 年之前
父節點
當前提交
5c8e023ba3
共有 15 個文件被更改,包括 1772 次插入0 次删除
  1. 211 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
  2. 80 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineID.java
  3. 24 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/package-info.java
  4. 56 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
  5. 58 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
  6. 35 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
  7. 179 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
  8. 212 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
  9. 135 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
  10. 226 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
  11. 80 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
  12. 24 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/package-info.java
  13. 246 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
  14. 104 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
  15. 102 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java

+ 211 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java

@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Represents a group of datanodes which store a container.
+ */
+public final class Pipeline {
+
+  private final PipelineID id;
+  private final ReplicationType type;
+  private final ReplicationFactor factor;
+
+  private LifeCycleState state;
+  private List<DatanodeDetails> nodes;
+
+  private Pipeline(PipelineID id, ReplicationType type,
+      ReplicationFactor factor, LifeCycleState state,
+      List<DatanodeDetails> nodes) {
+    this.id = id;
+    this.type = type;
+    this.factor = factor;
+    this.state = state;
+    this.nodes = nodes;
+  }
+
+  /**
+   * Returns the ID of this pipeline.
+   *
+   * @return PipelineID
+   */
+  public PipelineID getID() {
+    return id;
+  }
+
+  /**
+   * Returns the type.
+   *
+   * @return type - Simple or Ratis.
+   */
+  public ReplicationType getType() {
+    return type;
+  }
+
+  /**
+   * Returns the factor.
+   *
+   * @return type - Simple or Ratis.
+   */
+  public ReplicationFactor getFactor() {
+    return factor;
+  }
+
+  /**
+   * Returns the State of the pipeline.
+   *
+   * @return - LifeCycleStates.
+   */
+  public LifeCycleState getLifeCycleState() {
+    return state;
+  }
+
+  /**
+   * Returns the list of nodes which form this pipeline.
+   *
+   * @return List of DatanodeDetails
+   */
+  public List<DatanodeDetails> getNodes() {
+    return new ArrayList<>(nodes);
+  }
+
+  public HddsProtos.Pipeline getProtobufMessage() {
+    HddsProtos.Pipeline.Builder builder = HddsProtos.Pipeline.newBuilder();
+    builder.setId(id.getProtobuf());
+    builder.setType(type);
+    builder.setState(state);
+    builder.addAllMembers(nodes.stream().map(
+        DatanodeDetails::getProtoBufMessage).collect(Collectors.toList()));
+    return builder.build();
+  }
+
+  public static Pipeline fromProtobuf(HddsProtos.Pipeline pipeline) {
+    return new Pipeline(PipelineID.getFromProtobuf(pipeline.getId()),
+        pipeline.getType(), pipeline.getFactor(), pipeline.getState(),
+        pipeline.getMembersList().stream().map(DatanodeDetails::getFromProtoBuf)
+            .collect(Collectors.toList()));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    Pipeline that = (Pipeline) o;
+
+    return new EqualsBuilder()
+        .append(id, that.id)
+        .append(type, that.type)
+        .append(factor, that.factor)
+        .append(state, that.state)
+        .append(nodes, that.nodes)
+        .isEquals();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder()
+        .append(id)
+        .append(type)
+        .append(factor)
+        .append(state)
+        .append(nodes)
+        .toHashCode();
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  public static Builder newBuilder(Pipeline pipeline) {
+    return new Builder(pipeline);
+  }
+
+  /**
+   * Builder class for Pipeline.
+   */
+  public static class Builder {
+    private PipelineID id = null;
+    private ReplicationType type = null;
+    private ReplicationFactor factor = null;
+    private LifeCycleState state = null;
+    private List<DatanodeDetails> nodes = null;
+
+    public Builder() {}
+
+    public Builder(Pipeline pipeline) {
+      this.id = pipeline.getID();
+      this.type = pipeline.getType();
+      this.factor = pipeline.getFactor();
+      this.state = pipeline.getLifeCycleState();
+      this.nodes = pipeline.getNodes();
+    }
+
+    public Builder setId(PipelineID id1) {
+      this.id = id1;
+      return this;
+    }
+
+    public Builder setType(ReplicationType type1) {
+      this.type = type1;
+      return this;
+    }
+
+    public Builder setFactor(ReplicationFactor factor1) {
+      this.factor = factor1;
+      return this;
+    }
+
+    public Builder setState(LifeCycleState state1) {
+      this.state = state1;
+      return this;
+    }
+
+    public Builder setNodes(List<DatanodeDetails> nodes1) {
+      this.nodes = nodes1;
+      return this;
+    }
+
+    public Pipeline build() {
+      Preconditions.checkNotNull(id);
+      Preconditions.checkNotNull(type);
+      Preconditions.checkNotNull(factor);
+      Preconditions.checkNotNull(state);
+      Preconditions.checkNotNull(nodes);
+      return new Pipeline(id, type, factor, state, nodes);
+    }
+  }
+}

+ 80 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineID.java

@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+
+import java.util.UUID;
+
+/**
+ * ID for the pipeline, the ID is based on UUID.
+ */
+public final class PipelineID {
+
+  private UUID id;
+
+  private PipelineID(UUID id) {
+    this.id = id;
+  }
+
+  public static PipelineID randomId() {
+    return new PipelineID(UUID.randomUUID());
+  }
+
+  public static PipelineID valueOf(UUID id) {
+    return new PipelineID(id);
+  }
+
+  public UUID getId() {
+    return id;
+  }
+
+  public HddsProtos.PipelineID getProtobuf() {
+    return HddsProtos.PipelineID.newBuilder().setId(id.toString()).build();
+  }
+
+  public static PipelineID getFromProtobuf(HddsProtos.PipelineID protos) {
+    return new PipelineID(UUID.fromString(protos.getId()));
+  }
+
+  @Override
+  public String toString() {
+    return "PipelineID=" + id;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    PipelineID that = (PipelineID) o;
+
+    return id.equals(that.id);
+  }
+
+  @Override
+  public int hashCode() {
+    return id.hashCode();
+  }
+
+}

+ 24 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/package-info.java

@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+/**
+ Ozone supports the notion of different kind of pipelines.
+ That means that we can have a replication pipeline build on
+ Ratis, Simple or some other protocol. All Pipeline managers
+ the entities in charge of pipelines reside in the package.
+ */

+ 56 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java

@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Creates pipeline based on replication type.
+ */
+public final class PipelineFactory {
+
+  private Map<ReplicationType, PipelineProvider> providers;
+
+  PipelineFactory(NodeManager nodeManager,
+      PipelineStateManager stateManager) {
+    providers = new HashMap<>();
+    providers.put(ReplicationType.STAND_ALONE,
+        new SimplePipelineProvider(nodeManager));
+    providers.put(ReplicationType.RATIS,
+        new RatisPipelineProvider(nodeManager, stateManager));
+  }
+
+  public Pipeline create(ReplicationType type, ReplicationFactor factor)
+      throws IOException {
+    return providers.get(type).create(factor);
+  }
+
+  public Pipeline create(ReplicationType type, List<DatanodeDetails> nodes)
+      throws IOException {
+    return providers.get(type).create(nodes);
+  }
+}

+ 58 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java

@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Interface which exposes the api for pipeline management.
+ */
+public interface PipelineManager extends Closeable {
+
+  Pipeline createPipeline(ReplicationType type, ReplicationFactor factor)
+      throws IOException;
+
+  Pipeline createPipeline(ReplicationType type, List<DatanodeDetails> nodes)
+      throws IOException;
+
+  Pipeline getPipeline(PipelineID pipelineID) throws IOException;
+
+  void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID)
+      throws IOException;
+
+  void removeContainerFromPipeline(PipelineID pipelineID, ContainerID containerID)
+      throws IOException;
+
+  Set<ContainerID> getContainersInPipeline(PipelineID pipelineID)
+      throws IOException;
+
+  void finalizePipeline(PipelineID pipelineID) throws IOException;
+
+  void closePipeline(PipelineID pipelineId) throws IOException;
+
+  void removePipeline(PipelineID pipelineID) throws IOException;
+}

+ 35 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java

@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Interface for creating pipelines.
+ */
+public interface PipelineProvider {
+
+  Pipeline create(ReplicationFactor factor) throws IOException;
+
+  Pipeline create(List<DatanodeDetails> nodes) throws IOException;
+}

+ 179 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java

@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
+import org.apache.hadoop.ozone.common.statemachine.StateMachine;
+import org.apache.hadoop.ozone.lease.LeaseManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_CHANGE_PIPELINE_STATE;
+
+/**
+ * Manages the state of pipelines in SCM. All write operations like pipeline
+ * creation, removal and updates should come via SCMPipelineManager.
+ * PipelineStateMap class holds the data structures related to pipeline and its
+ * state. All the read and write operations in PipelineStateMap are protected
+ * by a read write lock.
+ */
+class PipelineStateManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      org.apache.hadoop.hdds.scm.pipelines.PipelineStateManager.class);
+
+  private final PipelineStateMap pipelineStateMap;
+  private final StateMachine<LifeCycleState, LifeCycleEvent> stateMachine;
+  private final LeaseManager<Pipeline> pipelineLeaseManager;
+
+  PipelineStateManager(Configuration conf) {
+    this.pipelineStateMap = new PipelineStateMap();
+    Set<LifeCycleState> finalStates = new HashSet<>();
+    long pipelineCreationLeaseTimeout = conf.getTimeDuration(
+        ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT,
+        ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    // TODO: Use LeaseManager for creation of pipelines.
+    // Add pipeline initialization logic.
+    this.pipelineLeaseManager = new LeaseManager<>("PipelineCreation",
+        pipelineCreationLeaseTimeout);
+    this.pipelineLeaseManager.start();
+
+    finalStates.add(LifeCycleState.CLOSED);
+    this.stateMachine = new StateMachine<>(LifeCycleState.ALLOCATED,
+        finalStates);
+    initializeStateMachine();
+  }
+
+
+  /*
+   * Event and State Transition Mapping.
+   *
+   * State: ALLOCATED ---------------> CREATING
+   * Event:                CREATE
+   *
+   * State: CREATING  ---------------> OPEN
+   * Event:               CREATED
+   *
+   * State: OPEN      ---------------> CLOSING
+   * Event:               FINALIZE
+   *
+   * State: CLOSING   ---------------> CLOSED
+   * Event:                CLOSE
+   *
+   * State: CREATING  ---------------> CLOSED
+   * Event:               TIMEOUT
+   *
+   *
+   * Container State Flow:
+   *
+   * [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING]
+   *            (CREATE)     | (CREATED)     (FINALIZE)   |
+   *                         |                            |
+   *                         |                            |
+   *                         |(TIMEOUT)                   |(CLOSE)
+   *                         |                            |
+   *                         +--------> [CLOSED] <--------+
+   */
+
+  /**
+   * Add javadoc.
+   */
+  private void initializeStateMachine() {
+    stateMachine.addTransition(LifeCycleState.ALLOCATED,
+        LifeCycleState.CREATING, LifeCycleEvent.CREATE);
+
+    stateMachine.addTransition(LifeCycleState.CREATING,
+        LifeCycleState.OPEN, LifeCycleEvent.CREATED);
+
+    stateMachine.addTransition(LifeCycleState.OPEN,
+        LifeCycleState.CLOSING, LifeCycleEvent.FINALIZE);
+
+    stateMachine.addTransition(LifeCycleState.CLOSING,
+        LifeCycleState.CLOSED, LifeCycleEvent.CLOSE);
+
+    stateMachine.addTransition(LifeCycleState.CREATING,
+        LifeCycleState.CLOSED, LifeCycleEvent.TIMEOUT);
+  }
+
+  Pipeline updatePipelineState(PipelineID pipelineID, LifeCycleEvent event)
+      throws IOException {
+    Pipeline pipeline = null;
+    try {
+      pipeline = pipelineStateMap.getPipeline(pipelineID);
+      LifeCycleState newState =
+          stateMachine.getNextState(pipeline.getLifeCycleState(), event);
+      return pipelineStateMap.updatePipelineState(pipeline.getID(), newState);
+    } catch (InvalidStateTransitionException ex) {
+      String error = String.format("Failed to update pipeline state %s, "
+              + "reason: invalid state transition from state: %s upon "
+              + "event: %s.", pipeline.getID(), pipeline.getLifeCycleState(),
+          event);
+      LOG.error(error);
+      throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE);
+    }
+  }
+
+  void addPipeline(Pipeline pipeline) throws IOException {
+    pipelineStateMap.addPipeline(pipeline);
+  }
+
+  void addContainerToPipeline(PipelineID pipelineId, ContainerID containerID)
+      throws IOException {
+    pipelineStateMap.addContainerToPipeline(pipelineId, containerID);
+  }
+
+  Pipeline getPipeline(PipelineID pipelineID) throws IOException {
+    return pipelineStateMap.getPipeline(pipelineID);
+  }
+
+  List<Pipeline> getPipelines(HddsProtos.ReplicationType type) {
+    return pipelineStateMap.getPipelines(type);
+  }
+
+  Set<ContainerID> getContainers(PipelineID pipelineID) throws IOException {
+    return pipelineStateMap.getContainers(pipelineID);
+  }
+
+  void removePipeline(PipelineID pipelineID) throws IOException {
+    pipelineStateMap.removePipeline(pipelineID);
+  }
+
+  void removeContainerFromPipeline(PipelineID pipelineID,
+      ContainerID containerID) throws IOException {
+    pipelineStateMap.removeContainerFromPipeline(pipelineID, containerID);
+  }
+
+  void close() {
+    pipelineLeaseManager.shutdown();
+  }
+}

+ 212 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java

@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * Holds the data structures which maintain the information about pipeline and
+ * its state. All the read write operations in this class are protected by a
+ * lock.
+ * Invariant: If a pipeline exists in PipelineStateMap, both pipelineMap and
+ * pipeline2container would have a non-null mapping for it.
+ */
+class PipelineStateMap {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      PipelineStateMap.class);
+
+  private final Map<PipelineID, Pipeline> pipelineMap;
+  private final Map<PipelineID, Set<ContainerID>> pipeline2container;
+
+  PipelineStateMap() {
+
+    this.pipelineMap = new HashMap<>();
+    this.pipeline2container = new HashMap<>();
+
+  }
+
+  /**
+   * Adds provided pipeline in the data structures.
+   *
+   * @param pipeline - Pipeline to add
+   * @throws IOException if pipeline with provided pipelineID already exists
+   */
+  void addPipeline(Pipeline pipeline) throws IOException {
+    Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
+    Preconditions.checkArgument(
+        pipeline.getNodes().size() == pipeline.getFactor().getNumber(),
+        String.format("Nodes size=%d, replication factor=%d do not match ",
+                pipeline.getNodes().size(), pipeline.getFactor().getNumber()));
+
+    if (pipelineMap.putIfAbsent(pipeline.getID(), pipeline) != null) {
+      LOG.warn("Duplicate pipeline ID detected. {}", pipeline.getID());
+      throw new IOException(String
+          .format("Duplicate pipeline ID %s detected.", pipeline.getID()));
+    }
+    pipeline2container.put(pipeline.getID(), new TreeSet<>());
+  }
+
+  /**
+   * Add container to an existing pipeline.
+   *
+   * @param pipelineID - PipelineID of the pipeline to which container is added
+   * @param containerID - ContainerID of the container to add
+   * @throws IOException if pipeline is not in open state or does not exist
+   */
+  void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID)
+      throws IOException {
+    Preconditions.checkNotNull(pipelineID,
+        "Pipeline Id cannot be null");
+    Preconditions.checkNotNull(containerID,
+        "container Id cannot be null");
+
+    Pipeline pipeline = getPipeline(pipelineID);
+    // TODO: verify the state we need the pipeline to be in
+    if (!isOpen(pipeline)) {
+      throw new IOException(
+          String.format("%s is not in open state", pipelineID));
+    }
+    pipeline2container.get(pipelineID).add(containerID);
+  }
+
+  /**
+   * Get pipeline corresponding to specified pipelineID.
+   *
+   * @param pipelineID - PipelineID of the pipeline to be retrieved
+   * @return Pipeline
+   * @throws IOException if pipeline is not found
+   */
+  Pipeline getPipeline(PipelineID pipelineID) throws IOException {
+    Pipeline pipeline = pipelineMap.get(pipelineID);
+    if (pipeline == null) {
+      throw new IOException(String.format("%s not found", pipelineID));
+    }
+    return pipeline;
+  }
+
+  /**
+   * Get pipeline corresponding to specified replication type.
+   *
+   * @param type - ReplicationType
+   * @return List of pipelines which have the specified replication type
+   */
+  List<Pipeline> getPipelines(ReplicationType type) {
+    Preconditions.checkNotNull(type, "Replication type cannot be null");
+
+    return pipelineMap.values().stream().filter(p -> p.getType().equals(type))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Get set of containers corresponding to a pipeline.
+   *
+   * @param pipelineID - PipelineID
+   * @return Set of Containers belonging to the pipeline
+   * @throws IOException if pipeline is not found
+   */
+  Set<ContainerID> getContainers(PipelineID pipelineID)
+      throws IOException {
+    Set<ContainerID> containerIDs = pipeline2container.get(pipelineID);
+    if (containerIDs == null) {
+      throw new IOException(String.format("%s not found", pipelineID));
+    }
+    return new HashSet<>(containerIDs);
+  }
+
+  /**
+   * Remove pipeline from the data structures.
+   *
+   * @param pipelineID - PipelineID of the pipeline to be removed
+   * @throws IOException if the pipeline is not empty or does not exist
+   */
+  void removePipeline(PipelineID pipelineID) throws IOException {
+    Preconditions.checkNotNull(pipelineID, "Pipeline Id cannot be null");
+
+    //TODO: Add a flag which suppresses exception if pipeline does not exist?
+    Set<ContainerID> containerIDs = getContainers(pipelineID);
+    if (containerIDs.size() != 0) {
+      throw new IOException(
+          String.format("Pipeline with %s is not empty", pipelineID));
+    }
+    pipelineMap.remove(pipelineID);
+    pipeline2container.remove(pipelineID);
+  }
+
+  /**
+   * Remove container from a pipeline.
+   *
+   * @param pipelineID - PipelineID of the pipeline from which container needs
+   *                   to be removed
+   * @param containerID - ContainerID of the container to remove
+   * @throws IOException if pipeline does not exist
+   */
+  void removeContainerFromPipeline(PipelineID pipelineID,
+      ContainerID containerID) throws IOException {
+    Preconditions.checkNotNull(pipelineID,
+        "Pipeline Id cannot be null");
+    Preconditions.checkNotNull(containerID,
+        "container Id cannot be null");
+
+    Pipeline pipeline = getPipeline(pipelineID);
+    Set<ContainerID> containerIDs = pipeline2container.get(pipelineID);
+    containerIDs.remove(containerID);
+    if (containerIDs.size() == 0 && isClosingOrClosed(pipeline)) {
+      removePipeline(pipelineID);
+    }
+  }
+
+  /**
+   * Updates the state of pipeline.
+   *
+   * @param pipelineID - PipelineID of the pipeline whose state needs
+   *                   to be updated
+   * @param state - new state of the pipeline
+   * @return Pipeline with the updated state
+   * @throws IOException if pipeline does not exist
+   */
+  Pipeline updatePipelineState(PipelineID pipelineID, LifeCycleState state)
+      throws IOException {
+    Preconditions.checkNotNull(pipelineID, "Pipeline Id cannot be null");
+    Preconditions.checkNotNull(state, "Pipeline LifeCycleState cannot be null");
+
+    Pipeline pipeline = getPipeline(pipelineID);
+    pipeline = pipelineMap
+        .put(pipelineID, Pipeline.newBuilder(pipeline).setState(state).build());
+    // TODO: Verify if need to throw exception for non-existent pipeline
+    return pipeline;
+  }
+
+  private boolean isClosingOrClosed(Pipeline pipeline) {
+    LifeCycleState state = pipeline.getLifeCycleState();
+    return state == LifeCycleState.CLOSING || state == LifeCycleState.CLOSED;
+  }
+
+  private boolean isOpen(Pipeline pipeline) {
+    return pipeline.getLifeCycleState() == LifeCycleState.OPEN;
+  }
+}

+ 135 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java

@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Implements Api for creating ratis pipelines.
+ */
+public class RatisPipelineProvider implements PipelineProvider {
+
+  private final NodeManager nodeManager;
+  private final PipelineStateManager stateManager;
+
+  RatisPipelineProvider(NodeManager nodeManager,
+      PipelineStateManager stateManager) {
+    this.nodeManager = nodeManager;
+    this.stateManager = stateManager;
+  }
+
+  /**
+   * Create pluggable container placement policy implementation instance.
+   *
+   * @param nodeManager - SCM node manager.
+   * @param conf - configuration.
+   * @return SCM container placement policy implementation instance.
+   */
+  @SuppressWarnings("unchecked")
+  // TODO: should we rename ContainerPlacementPolicy to PipelinePlacementPolicy?
+  private static ContainerPlacementPolicy createContainerPlacementPolicy(
+      final NodeManager nodeManager, final Configuration conf) {
+    Class<? extends ContainerPlacementPolicy> implClass =
+        (Class<? extends ContainerPlacementPolicy>) conf.getClass(
+            ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
+            SCMContainerPlacementRandom.class);
+
+    try {
+      Constructor<? extends ContainerPlacementPolicy> ctor =
+          implClass.getDeclaredConstructor(NodeManager.class,
+              Configuration.class);
+      return ctor.newInstance(nodeManager, conf);
+    } catch (RuntimeException e) {
+      throw e;
+    } catch (InvocationTargetException e) {
+      throw new RuntimeException(implClass.getName()
+          + " could not be constructed.", e.getCause());
+    } catch (Exception e) {
+//      LOG.error("Unhandled exception occurred, Placement policy will not " +
+//          "be functional.");
+      throw new IllegalArgumentException("Unable to load " +
+          "ContainerPlacementPolicy", e);
+    }
+  }
+
+
+  @Override
+  public Pipeline create(ReplicationFactor factor) throws IOException {
+    // Get set of datanodes already used for ratis pipeline
+    Set<DatanodeDetails> dnsUsed = new HashSet<>();
+    stateManager.getPipelines(ReplicationType.RATIS)
+        .forEach(p -> dnsUsed.addAll(p.getNodes()));
+
+    // Get list of healthy nodes
+    List<DatanodeDetails> dns =
+        nodeManager.getNodes(NodeState.HEALTHY)
+            .parallelStream()
+            .filter(dn -> !dnsUsed.contains(dn))
+            .limit(factor.getNumber())
+            .collect(Collectors.toList());
+    if (dns.size() < factor.getNumber()) {
+      String e = String
+          .format("Cannot create pipeline of factor %d using %d nodes.",
+              factor.getNumber(), dns.size());
+      throw new IOException(e);
+    }
+
+    return Pipeline.newBuilder()
+        .setId(PipelineID.randomId())
+        .setState(LifeCycleState.ALLOCATED)
+        .setType(ReplicationType.RATIS)
+        .setFactor(factor)
+        .setNodes(dns)
+        .build();
+  }
+
+  @Override
+  public Pipeline create(List<DatanodeDetails> nodes) throws IOException {
+    ReplicationFactor factor = ReplicationFactor.valueOf(nodes.size());
+    if (factor == null) {
+      throw new IOException(String
+          .format("Nodes size=%d does not match any replication factor",
+              nodes.size()));
+    }
+    return Pipeline.newBuilder()
+        .setId(PipelineID.randomId())
+        .setState(LifeCycleState.ALLOCATED)
+        .setType(ReplicationType.RATIS)
+        .setFactor(factor)
+        .setNodes(nodes)
+        .build();
+  }
+}

+ 226 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java

@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.hadoop.hdds.scm
+    .ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
+import static org.apache.hadoop.hdds.scm
+    .ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
+import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
+import static org.apache.hadoop.ozone.OzoneConsts.SCM_PIPELINE_DB;
+
+/**
+ * Implements api needed for management of pipelines. All the write operations
+ * for pipelines must come via PipelineManager. It synchronises all write
+ * and read operations via a ReadWriteLock.
+ */
+public class SCMPipelineManager implements PipelineManager {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SCMPipelineManager.class);
+
+  private final ReadWriteLock lock;
+  private final PipelineFactory pipelineFactory;
+  private final PipelineStateManager stateManager;
+  private final MetadataStore pipelineStore;
+
+  public SCMPipelineManager(Configuration conf, NodeManager nodeManager)
+      throws IOException {
+    this.lock = new ReentrantReadWriteLock();
+    this.stateManager = new PipelineStateManager(conf);
+    this.pipelineFactory = new PipelineFactory(nodeManager, stateManager);
+    int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
+        OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
+    File metaDir = getOzoneMetaDirPath(conf);
+    File pipelineDBPath = new File(metaDir, SCM_PIPELINE_DB);
+    this.pipelineStore =
+        MetadataStoreBuilder.newBuilder()
+            .setConf(conf)
+            .setDbFile(pipelineDBPath)
+            .setCacheSize(cacheSize * OzoneConsts.MB)
+            .build();
+
+    initializePipelineState();
+  }
+
+  private void initializePipelineState() throws IOException {
+    if (pipelineStore.isEmpty()) {
+      LOG.info("No pipeline exists in current db");
+      return;
+    }
+    List<Map.Entry<byte[], byte[]>> pipelines =
+        pipelineStore.getSequentialRangeKVs(null, Integer.MAX_VALUE, null);
+
+    for (Map.Entry<byte[], byte[]> entry : pipelines) {
+      Pipeline pipeline = Pipeline
+          .fromProtobuf(HddsProtos.Pipeline.PARSER.parseFrom(entry.getValue()));
+      Preconditions.checkNotNull(pipeline);
+      stateManager.addPipeline(pipeline);
+    }
+  }
+
+  @Override
+  public synchronized Pipeline createPipeline(
+      ReplicationType type, ReplicationFactor factor) throws IOException {
+    lock.writeLock().lock();
+    try {
+      Pipeline pipeline =  pipelineFactory.create(type, factor);
+      stateManager.addPipeline(pipeline);
+      try {
+        pipelineStore.put(pipeline.getID().getProtobuf().toByteArray(),
+            pipeline.getProtobufMessage().toByteArray());
+      } catch (IOException ioe) {
+        // if db operation fails we need to revert the pipeline creation in
+        // state manager.
+        stateManager.removePipeline(pipeline.getID());
+        throw ioe;
+      }
+      return pipeline;
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public Pipeline createPipeline(ReplicationType type,
+                                 List<DatanodeDetails> nodes)
+      throws IOException {
+    // This will mostly be used to create dummy pipeline for SimplePipelines.
+    lock.writeLock().lock();
+    try {
+      return pipelineFactory.create(type, nodes);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public Pipeline getPipeline(PipelineID pipelineID) throws IOException {
+    lock.readLock().lock();
+    try {
+      return stateManager.getPipeline(pipelineID);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public void addContainerToPipeline(PipelineID pipelineID,
+      ContainerID containerID) throws IOException {
+    lock.writeLock().lock();
+    try {
+      stateManager.addContainerToPipeline(pipelineID, containerID);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public void removeContainerFromPipeline(PipelineID pipelineID,
+      ContainerID containerID) throws IOException {
+    lock.writeLock().lock();
+    try {
+      stateManager.removeContainerFromPipeline(pipelineID, containerID);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public Set<ContainerID> getContainersInPipeline(PipelineID pipelineID)
+      throws IOException {
+    lock.readLock().lock();
+    try {
+      return stateManager.getContainers(pipelineID);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public void finalizePipeline(PipelineID pipelineId) throws IOException {
+    lock.writeLock().lock();
+    try {
+      //TODO: close all containers in this pipeline
+      Pipeline pipeline =
+          stateManager.updatePipelineState(pipelineId, LifeCycleEvent.FINALIZE);
+      pipelineStore.put(pipeline.getID().getProtobuf().toByteArray(),
+          pipeline.getProtobufMessage().toByteArray());
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public void closePipeline(PipelineID pipelineId) throws IOException {
+    lock.writeLock().lock();
+    try {
+      Pipeline pipeline =
+          stateManager.updatePipelineState(pipelineId, LifeCycleEvent.CLOSE);
+      pipelineStore.put(pipeline.getID().getProtobuf().toByteArray(),
+          pipeline.getProtobufMessage().toByteArray());
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public void removePipeline(PipelineID pipelineID) throws IOException {
+    lock.writeLock().lock();
+    try {
+      stateManager.removePipeline(pipelineID);
+      pipelineStore.delete(pipelineID.getProtobuf().toByteArray());
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    lock.writeLock().lock();
+    try {
+      stateManager.close();
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+}

+ 80 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java

@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Implements Api for creating stand alone pipelines.
+ */
+public class SimplePipelineProvider implements PipelineProvider {
+
+  private final NodeManager nodeManager;
+
+  public SimplePipelineProvider(NodeManager nodeManager) {
+    this.nodeManager = nodeManager;
+  }
+
+  @Override
+  public Pipeline create(ReplicationFactor factor) throws IOException {
+    List<DatanodeDetails> dns =
+        nodeManager.getNodes(NodeState.HEALTHY);
+    if (dns.size() < factor.getNumber()) {
+      String e = String
+          .format("Cannot create pipeline of factor %d using %d nodes.",
+              factor.getNumber(), dns.size());
+      throw new IOException(e);
+    }
+
+    Collections.shuffle(dns);
+    return Pipeline.newBuilder()
+        .setId(PipelineID.randomId())
+        .setState(LifeCycleState.ALLOCATED)
+        .setType(ReplicationType.STAND_ALONE)
+        .setFactor(factor)
+        .setNodes(dns.subList(0, factor.getNumber()))
+        .build();
+  }
+
+  @Override
+  public Pipeline create(List<DatanodeDetails> nodes) throws IOException {
+    ReplicationFactor factor = ReplicationFactor.valueOf(nodes.size());
+    if (factor == null) {
+      throw new IOException(String
+          .format("Nodes size=%d does not match any replication factor",
+              nodes.size()));
+    }
+    return Pipeline.newBuilder()
+        .setId(PipelineID.randomId())
+        .setState(LifeCycleState.ALLOCATED)
+        .setType(ReplicationType.STAND_ALONE)
+        .setFactor(factor)
+        .setNodes(nodes)
+        .build();
+  }
+}

+ 24 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/package-info.java

@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+/**
+ Ozone supports the notion of different kind of pipelines.
+ That means that we can have a replication pipeline build on
+ Ratis, Simple or some other protocol. All Pipeline managers
+ the entities in charge of pipelines reside in the package.
+ */

+ 246 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java

@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Test for PipelineStateManager.
+ */
+public class TestPipelineStateManager {
+
+  private PipelineStateManager stateManager;
+
+  @Before
+  public void init() throws Exception {
+    Configuration conf = new OzoneConfiguration();
+    stateManager = new PipelineStateManager(conf);
+  }
+
+  private Pipeline createDummyPipeline(int numNodes) {
+    List<DatanodeDetails> nodes = new ArrayList<>();
+    for (int i = 0; i < numNodes; i++) {
+      nodes.add(TestUtils.randomDatanodeDetails());
+    }
+    return Pipeline.newBuilder()
+        .setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.ONE)
+        .setNodes(nodes)
+        .setState(HddsProtos.LifeCycleState.ALLOCATED)
+        .setId(PipelineID.randomId())
+        .build();
+  }
+
+  @Test
+  public void testAddAndGetPipeline() throws IOException {
+    Pipeline pipeline = createDummyPipeline(0);
+    try {
+      stateManager.addPipeline(pipeline);
+      Assert.fail("Pipeline should not have been added");
+    } catch (IllegalArgumentException e) {
+      // replication factor and number of nodes in the pipeline do not match
+      Assert.assertTrue(e.getMessage().contains("do not match"));
+    }
+
+    // add a pipeline
+    pipeline = createDummyPipeline(1);
+    stateManager.addPipeline(pipeline);
+
+    try {
+      stateManager.addPipeline(pipeline);
+      Assert.fail("Pipeline should not have been added");
+    } catch (IOException e) {
+      // Can not add a pipeline twice
+      Assert.assertTrue(e.getMessage().contains("Duplicate pipeline ID"));
+    }
+
+    // verify pipeline returned is same
+    Pipeline pipeline1 = stateManager.getPipeline(pipeline.getID());
+    Assert.assertTrue(pipeline == pipeline1);
+
+    // clean up
+    stateManager.removePipeline(pipeline1.getID());
+  }
+
+  @Test
+  public void testGetPipelines() throws IOException {
+    Set<Pipeline> pipelines = new HashSet<>();
+    Pipeline pipeline = createDummyPipeline(1);
+    pipelines.add(pipeline);
+    stateManager.addPipeline(pipeline);
+    pipeline = createDummyPipeline(1);
+    pipelines.add(pipeline);
+    stateManager.addPipeline(pipeline);
+
+    Set<Pipeline> pipelines1 = new HashSet<>(stateManager.getPipelines(
+        HddsProtos.ReplicationType.RATIS));
+    Assert.assertEquals(pipelines, pipelines1);
+  }
+
+  @Test
+  public void testAddAndGetContainer() throws IOException {
+    long containerID = 0;
+    Pipeline pipeline = createDummyPipeline(1);
+    stateManager.addPipeline(pipeline);
+    pipeline = stateManager.getPipeline(pipeline.getID());
+
+    try {
+      stateManager
+          .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID));
+      Assert.fail("Container should not have been added");
+    } catch (IOException e) {
+      // add container possible only in container with open state
+      Assert.assertTrue(e.getMessage().contains("is not in open state"));
+    }
+
+    // move pipeline to open state
+    updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE,
+        HddsProtos.LifeCycleEvent.CREATED);
+
+    // add three containers
+    stateManager
+        .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(containerID));
+    stateManager
+        .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID));
+    stateManager
+        .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID));
+
+    //verify the number of containers returned
+    Set<ContainerID> containerIDs =
+        stateManager.getContainers(pipeline.getID());
+    Assert.assertEquals(containerIDs.size(), containerID);
+
+    removePipeline(pipeline);
+    try {
+      stateManager
+          .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID));
+      Assert.fail("Container should not have been added");
+    } catch (IOException e) {
+      // Can not add a container to removed pipeline
+      Assert.assertTrue(e.getMessage().contains("not found"));
+    }
+  }
+
+  @Test
+  public void testRemovePipeline() throws IOException {
+    Pipeline pipeline = createDummyPipeline(1);
+    stateManager.addPipeline(pipeline);
+    updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE,
+        HddsProtos.LifeCycleEvent.CREATED);
+    stateManager
+        .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(1));
+
+    try {
+      stateManager.removePipeline(pipeline.getID());
+      Assert.fail("Pipeline should not have been removed");
+    } catch (IOException e) {
+      // can not remove a pipeline which already has containers
+      Assert.assertTrue(e.getMessage().contains("not empty"));
+    }
+
+    // remove containers and then remove the pipeline
+    removePipeline(pipeline);
+  }
+
+  @Test
+  public void testRemoveContainer() throws IOException {
+    long containerID = 1;
+    Pipeline pipeline = createDummyPipeline(1);
+    // create an open pipeline in stateMap
+    stateManager.addPipeline(pipeline);
+    updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE,
+        HddsProtos.LifeCycleEvent.CREATED);
+
+    stateManager
+        .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(containerID));
+    stateManager
+        .removeContainerFromPipeline(pipeline.getID(), ContainerID.valueof(containerID));
+    // removeContainerFromPipeline in open pipeline does not lead to removal of pipeline
+    Assert.assertNotNull(stateManager.getPipeline(pipeline.getID()));
+
+    // add two containers in the pipeline
+    stateManager
+        .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID));
+    stateManager
+        .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID));
+
+    // move pipeline to closing state
+    updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.FINALIZE);
+
+    stateManager
+        .removeContainerFromPipeline(pipeline.getID(), ContainerID.valueof(containerID));
+    // removal of second last container in closing or closed pipeline should
+    // not lead to removal of pipeline
+    Assert.assertNotNull(stateManager.getPipeline(pipeline.getID()));
+    stateManager
+        .removeContainerFromPipeline(pipeline.getID(), ContainerID.valueof(--containerID));
+    // removal of last container in closing or closed pipeline should lead to
+    // removal of pipeline
+    try {
+      stateManager.getPipeline(pipeline.getID());
+      Assert.fail("getPipeline should have failed.");
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage().contains(" not found"));
+    }
+  }
+
+  @Test
+  public void testUpdatePipelineState() throws IOException {
+    Pipeline pipeline = createDummyPipeline(1);
+    stateManager.addPipeline(pipeline);
+    updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE,
+        HddsProtos.LifeCycleEvent.CREATED, HddsProtos.LifeCycleEvent.FINALIZE,
+        HddsProtos.LifeCycleEvent.CLOSE);
+
+    pipeline = createDummyPipeline(1);
+    stateManager.addPipeline(pipeline);
+    updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE,
+        HddsProtos.LifeCycleEvent.TIMEOUT);
+  }
+
+  private void updateEvents(PipelineID pipelineID,
+      HddsProtos.LifeCycleEvent... events) throws IOException {
+    for (HddsProtos.LifeCycleEvent event : events) {
+      stateManager.updatePipelineState(pipelineID, event);
+    }
+  }
+
+  private void removePipeline(Pipeline pipeline) throws IOException {
+    Set<ContainerID> containerIDs =
+        stateManager.getContainers(pipeline.getID());
+    for (ContainerID containerID : containerIDs) {
+      stateManager.removeContainerFromPipeline(pipeline.getID(), containerID);
+    }
+    stateManager.removePipeline(pipeline.getID());
+  }
+}

+ 104 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java

@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test for RatisPipelineProvider.
+ */
+public class TestRatisPipelineProvider {
+
+  private NodeManager nodeManager;
+  private PipelineProvider provider;
+  private PipelineStateManager stateManager;
+
+  @Before
+  public void init() throws Exception {
+    nodeManager = new MockNodeManager(true, 10);
+    stateManager = new PipelineStateManager(new OzoneConfiguration());
+    provider = new RatisPipelineProvider(nodeManager,
+        stateManager);
+  }
+
+  @Test
+  public void testCreatePipelineWithFactor() throws IOException {
+    HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
+    Pipeline pipeline = provider.create(factor);
+    stateManager.addPipeline(pipeline);
+    Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+    Assert.assertEquals(pipeline.getFactor(), factor);
+    Assert.assertEquals(pipeline.getLifeCycleState(),
+        HddsProtos.LifeCycleState.ALLOCATED);
+    Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+
+    factor = HddsProtos.ReplicationFactor.ONE;
+    Pipeline pipeline1 = provider.create(factor);
+    stateManager.addPipeline(pipeline1);
+    // New pipeline should not overlap with the previous created pipeline
+    Assert.assertTrue(
+        CollectionUtils.intersection(pipeline.getNodes(), pipeline1.getNodes())
+            .isEmpty());
+    Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
+    Assert.assertEquals(pipeline1.getFactor(), factor);
+    Assert.assertEquals(pipeline1.getLifeCycleState(),
+        HddsProtos.LifeCycleState.ALLOCATED);
+    Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
+  }
+
+  private List<DatanodeDetails> createListOfNodes(int nodeCount) {
+    List<DatanodeDetails> nodes = new ArrayList<>();
+    for (int i = 0; i < nodeCount; i++) {
+      nodes.add(TestUtils.randomDatanodeDetails());
+    }
+    return nodes;
+  }
+
+  @Test
+  public void testCreatePipelineWithNodes() throws IOException {
+    HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
+    Pipeline pipeline = provider.create(createListOfNodes(factor.getNumber()));
+    Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+    Assert.assertEquals(pipeline.getFactor(), factor);
+    Assert.assertEquals(pipeline.getLifeCycleState(),
+        HddsProtos.LifeCycleState.ALLOCATED);
+    Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+
+    factor = HddsProtos.ReplicationFactor.ONE;
+    pipeline = provider.create(createListOfNodes(factor.getNumber()));
+    Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+    Assert.assertEquals(pipeline.getFactor(), factor);
+    Assert.assertEquals(pipeline.getLifeCycleState(),
+        HddsProtos.LifeCycleState.ALLOCATED);
+    Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+  }
+}

+ 102 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java

@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test for SimplePipelineProvider.
+ */
+public class TestSimplePipelineProvider {
+
+  private NodeManager nodeManager;
+  private PipelineProvider provider;
+  private PipelineStateManager stateManager;
+
+  @Before
+  public void init() throws Exception {
+    nodeManager = new MockNodeManager(true, 10);
+    stateManager = new PipelineStateManager(new OzoneConfiguration());
+    provider = new SimplePipelineProvider(nodeManager);
+  }
+
+  @Test
+  public void testCreatePipelineWithFactor() throws IOException {
+    HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
+    Pipeline pipeline = provider.create(factor);
+    stateManager.addPipeline(pipeline);
+    Assert.assertEquals(pipeline.getType(),
+        HddsProtos.ReplicationType.STAND_ALONE);
+    Assert.assertEquals(pipeline.getFactor(), factor);
+    Assert.assertEquals(pipeline.getLifeCycleState(),
+        HddsProtos.LifeCycleState.ALLOCATED);
+    Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+
+    factor = HddsProtos.ReplicationFactor.ONE;
+    Pipeline pipeline1 = provider.create(factor);
+    stateManager.addPipeline(pipeline1);
+    Assert.assertEquals(pipeline1.getType(),
+        HddsProtos.ReplicationType.STAND_ALONE);
+    Assert.assertEquals(pipeline1.getFactor(), factor);
+    Assert.assertEquals(pipeline1.getLifeCycleState(),
+        HddsProtos.LifeCycleState.ALLOCATED);
+    Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
+  }
+
+  private List<DatanodeDetails> createListOfNodes(int nodeCount) {
+    List<DatanodeDetails> nodes = new ArrayList<>();
+    for (int i = 0; i < nodeCount; i++) {
+      nodes.add(TestUtils.randomDatanodeDetails());
+    }
+    return nodes;
+  }
+
+  @Test
+  public void testCreatePipelineWithNodes() throws IOException {
+    HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
+    Pipeline pipeline = provider.create(createListOfNodes(factor.getNumber()));
+    Assert.assertEquals(pipeline.getType(),
+        HddsProtos.ReplicationType.STAND_ALONE);
+    Assert.assertEquals(pipeline.getFactor(), factor);
+    Assert.assertEquals(pipeline.getLifeCycleState(),
+        HddsProtos.LifeCycleState.ALLOCATED);
+    Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+
+    factor = HddsProtos.ReplicationFactor.ONE;
+    pipeline = provider.create(createListOfNodes(factor.getNumber()));
+    Assert.assertEquals(pipeline.getType(),
+        HddsProtos.ReplicationType.STAND_ALONE);
+    Assert.assertEquals(pipeline.getFactor(), factor);
+    Assert.assertEquals(pipeline.getLifeCycleState(),
+        HddsProtos.LifeCycleState.ALLOCATED);
+    Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+  }
+}