Browse Source

HDDS-297. Add pipeline actions in Ozone. Contributed by Mukul Kumar Singh and Shashikant Banerjee

Tsz Wo Nicholas Sze 6 years ago
parent
commit
b3161c4dd9
38 changed files with 815 additions and 163 deletions
  1. 5 3
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java
  2. 5 4
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
  3. 27 9
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
  4. 1 1
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
  5. 5 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
  6. 11 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
  7. 7 3
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
  8. 12 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
  9. 20 2
      hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
  10. 27 0
      hadoop-hdds/common/src/main/resources/ozone-default.xml
  11. 45 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
  12. 28 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
  13. 16 1
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
  14. 152 53
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
  15. 1 1
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
  16. 26 0
      hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
  17. 21 6
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
  18. 2 3
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
  19. 14 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
  20. 22 2
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
  21. 14 2
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
  22. 9 25
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
  23. 60 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineActionEventHandler.java
  24. 38 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java
  25. 5 5
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
  26. 22 24
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
  27. 9 5
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
  28. 4 3
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
  29. 23 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
  30. 12 1
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
  31. 3 3
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
  32. 126 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java
  33. 2 4
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
  34. 15 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
  35. 21 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
  36. 2 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
  37. 2 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
  38. 1 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java

+ 5 - 3
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java

@@ -186,15 +186,17 @@ public class XceiverClient extends XceiverClientSpi {
 
   /**
    * Create a pipeline.
-   *
-   * @param ignored -  pipeline to be created.
    */
   @Override
-  public void createPipeline(Pipeline ignored)
+  public void createPipeline()
       throws IOException {
     // For stand alone pipeline, there is no notion called setup pipeline.
   }
 
+  public void destroyPipeline() {
+    // For stand alone pipeline, there is no notion called destroy pipeline.
+  }
+
   /**
    * Returns pipeline Type.
    *

+ 5 - 4
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java

@@ -216,15 +216,16 @@ public class XceiverClientGrpc extends XceiverClientSpi {
 
   /**
    * Create a pipeline.
-   *
-   * @param ignored -  pipeline to be created.
    */
   @Override
-  public void createPipeline(Pipeline ignored)
-      throws IOException {
+  public void createPipeline() {
     // For stand alone pipeline, there is no notion called setup pipeline.
   }
 
+  public void destroyPipeline() {
+    // For stand alone pipeline, there is no notion called destroy pipeline.
+  }
+
   /**
    * Returns pipeline Type.
    *

+ 27 - 9
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java

@@ -88,13 +88,27 @@ public final class XceiverClientRatis extends XceiverClientSpi {
   /**
    * {@inheritDoc}
    */
-  public void createPipeline(Pipeline pipeline)
+  public void createPipeline()
       throws IOException {
     RaftGroupId groupId = pipeline.getId().getRaftGroupID();
     RaftGroup group = RatisHelper.newRaftGroup(groupId, pipeline.getMachines());
     LOG.debug("initializing pipeline:{} with nodes:{}",
         pipeline.getId(), group.getPeers());
-    reinitialize(pipeline.getMachines(), group);
+    reinitialize(pipeline.getMachines(), RatisHelper.emptyRaftGroup(), group);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void destroyPipeline()
+      throws IOException {
+    RaftGroupId groupId = pipeline.getId().getRaftGroupID();
+    RaftGroup currentGroup =
+        RatisHelper.newRaftGroup(groupId, pipeline.getMachines());
+    LOG.debug("destroying pipeline:{} with nodes:{}",
+        pipeline.getId(), currentGroup.getPeers());
+    reinitialize(pipeline.getMachines(), currentGroup,
+        RatisHelper.emptyRaftGroup());
   }
 
   /**
@@ -107,8 +121,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     return HddsProtos.ReplicationType.RATIS;
   }
 
-  private void reinitialize(List<DatanodeDetails> datanodes, RaftGroup group)
-      throws IOException {
+  private void reinitialize(List<DatanodeDetails> datanodes, RaftGroup oldGroup,
+      RaftGroup newGroup) throws IOException {
     if (datanodes.isEmpty()) {
       return;
     }
@@ -116,7 +130,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     IOException exception = null;
     for (DatanodeDetails d : datanodes) {
       try {
-        reinitialize(d, group);
+        reinitialize(d, oldGroup, newGroup);
       } catch (IOException ioe) {
         if (exception == null) {
           exception = new IOException(
@@ -135,14 +149,18 @@ public final class XceiverClientRatis extends XceiverClientSpi {
    * Adds a new peers to the Ratis Ring.
    *
    * @param datanode - new datanode
-   * @param group    - Raft group
+   * @param oldGroup    - previous Raft group
+   * @param newGroup    - new Raft group
    * @throws IOException - on Failure.
    */
-  private void reinitialize(DatanodeDetails datanode, RaftGroup group)
+  private void reinitialize(DatanodeDetails datanode, RaftGroup oldGroup,
+      RaftGroup newGroup)
       throws IOException {
     final RaftPeer p = RatisHelper.toRaftPeer(datanode);
-    try (RaftClient client = RatisHelper.newRaftClient(rpcType, p)) {
-      client.reinitialize(group, p.getId());
+    try (RaftClient client = oldGroup == RatisHelper.emptyRaftGroup() ?
+        RatisHelper.newRaftClient(rpcType, p) :
+        RatisHelper.newRaftClient(rpcType, p, oldGroup)) {
+      client.reinitialize(newGroup, p.getId());
     } catch (IOException ioe) {
       LOG.error("Failed to reinitialize RaftPeer:{} datanode: {}  ",
           p, datanode, ioe);

+ 1 - 1
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java

@@ -180,7 +180,7 @@ public class ContainerOperationClient implements ScmClient {
     //    ObjectStageChangeRequestProto.Op.create,
     //    ObjectStageChangeRequestProto.Stage.begin);
 
-    client.createPipeline(pipeline);
+    client.createPipeline();
 
     //storageContainerLocationClient.notifyObjectStageChange(
     //    ObjectStageChangeRequestProto.Type.pipeline,

+ 5 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java

@@ -56,6 +56,11 @@ public final class HddsConfigKeys {
   public static final int HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT =
       20;
 
+  public static final String HDDS_PIPELINE_ACTION_MAX_LIMIT =
+      "hdds.pipeline.action.max.limit";
+  public static final int HDDS_PIPELINE_ACTION_MAX_LIMIT_DEFAULT =
+      20;
+
   // Configuration to allow volume choosing policy.
   public static final String HDDS_DATANODE_VOLUME_CHOOSING_POLICY =
       "hdds.datanode.volume.choosing.policy";

+ 11 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.util.TimeDuration;
 
 import java.util.concurrent.TimeUnit;
@@ -57,6 +58,10 @@ public final class ScmConfigKeys {
       = "dfs.container.ratis.num.write.chunk.threads";
   public static final int DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT
       = 60;
+  public static final String DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY
+      = "dfs.container.ratis.replication.level";
+  public static final ReplicationLevel
+      DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT = ReplicationLevel.MAJORITY;
   public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY =
       "dfs.container.ratis.segment.size";
   public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT =
@@ -76,6 +81,12 @@ public final class ScmConfigKeys {
       DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT =
       TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);
 
+  public static final String DFS_RATIS_SERVER_FAILURE_DURATION_KEY =
+      "dfs.ratis.server.failure.duration";
+  public static final TimeDuration
+      DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT =
+      TimeDuration.valueOf(120, TimeUnit.SECONDS);
+
   // TODO : this is copied from OzoneConsts, may need to move to a better place
   public static final String OZONE_SCM_CHUNK_SIZE_KEY = "ozone.scm.chunk.size";
   // 16 MB by default

+ 7 - 3
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java

@@ -111,10 +111,14 @@ public abstract class XceiverClientSpi implements Closeable {
 
   /**
    * Create a pipeline.
-   *
-   * @param pipeline -  pipeline to be created.
    */
-  public abstract void createPipeline(Pipeline pipeline) throws IOException;
+  public abstract void createPipeline() throws IOException;
+
+  /**
+   * Destroy a pipeline.
+   * @throws IOException
+   */
+  public abstract void destroyPipeline() throws IOException;
 
   /**
    * Returns pipeline Type.

+ 12 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 
+import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.util.TimeDuration;
 
 /**
@@ -214,6 +215,11 @@ public final class OzoneConfigKeys {
       = ScmConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY;
   public static final int DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT
       = ScmConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT;
+  public static final String DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY
+      = ScmConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY;
+  public static final ReplicationLevel
+      DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT
+      = ScmConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT;
   public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY
       = ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY;
   public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT
@@ -237,6 +243,12 @@ public final class OzoneConfigKeys {
       DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT =
       ScmConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT;
 
+  public static final String DFS_RATIS_SERVER_FAILURE_DURATION_KEY =
+      ScmConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY;
+  public static final TimeDuration
+      DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT =
+      ScmConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT;
+
   public static final String OZONE_SCM_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL =
       "ozone.web.authentication.kerberos.principal";
 

+ 20 - 2
hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java

@@ -30,6 +30,7 @@ import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.shaded.proto.RaftProtos;
 import org.apache.ratis.util.SizeInBytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,6 +40,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 /**
@@ -48,8 +50,19 @@ public interface RatisHelper {
   Logger LOG = LoggerFactory.getLogger(RatisHelper.class);
 
   static String toRaftPeerIdString(DatanodeDetails id) {
-    return id.getUuidString() + "_" +
-        id.getPort(DatanodeDetails.Port.Name.RATIS).getValue();
+    return id.getUuidString();
+  }
+
+  static UUID toDatanodeId(String peerIdString) {
+    return UUID.fromString(peerIdString);
+  }
+
+  static UUID toDatanodeId(RaftPeerId peerId) {
+    return toDatanodeId(peerId.toString());
+  }
+
+  static UUID toDatanodeId(RaftProtos.RaftPeerProto peerId) {
+    return toDatanodeId(RaftPeerId.valueOf(peerId.getId()));
   }
 
   static String toRaftPeerAddressString(DatanodeDetails id) {
@@ -117,6 +130,11 @@ public interface RatisHelper {
         newRaftGroup(new ArrayList<>(Arrays.asList(leader))));
   }
 
+  static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
+      RaftGroup group) {
+    return newRaftClient(rpcType, leader.getId(), group);
+  }
+
   static RaftClient newRaftClient(
       RpcType rpcType, RaftPeerId leader, RaftGroup group) {
     LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, group);

+ 27 - 0
hadoop-hdds/common/src/main/resources/ozone-default.xml

@@ -126,6 +126,15 @@
       will use for writing chunks (60 by default).
     </description>
   </property>
+  <property>
+    <name>dfs.container.ratis.replication.level</name>
+    <value>MAJORITY</value>
+    <tag>OZONE, RATIS</tag>
+    <description>Replication level to be used by datanode for submitting a
+      container command to ratis. Available replication levels are ALL and
+      MAJORTIY, MAJORITY is used as the default replication level.
+    </description>
+  </property>
   <property>
     <name>dfs.container.ratis.segment.size</name>
     <value>1073741824</value>
@@ -154,6 +163,15 @@
     <tag>OZONE, RATIS, MANAGEMENT</tag>
     <description>The timeout duration for ratis server request.</description>
   </property>
+  <property>
+    <name>dfs.ratis.server.failure.duration</name>
+    <value>120s</value>
+    <tag>OZONE, RATIS, MANAGEMENT</tag>
+    <description>The timeout duration for ratis server failure detection,
+      once the threshold has reached, the ratis state machine will be informed
+      about the failure in the ratis ring
+    </description>
+  </property>
   <property>
     <name>hdds.node.report.interval</name>
     <value>60000ms</value>
@@ -1104,6 +1122,15 @@
     </description>
   </property>
 
+  <property>
+    <name>hdds.pipeline.action.max.limit</name>
+    <value>20</value>
+    <tag>DATANODE</tag>
+    <description>
+      Maximum number of Pipeline Actions sent by the datanode to SCM in a
+      single heartbeat.
+    </description>
+  </property>
   <property>
     <name>hdds.scm.watcher.timeout</name>
     <value>10m</value>

+ 45 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java

@@ -20,6 +20,8 @@ import com.google.protobuf.GeneratedMessage;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineAction;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerAction;
 import org.apache.hadoop.hdds.protocol.proto
@@ -66,6 +68,7 @@ public class StateContext {
   private final Configuration conf;
   private final Queue<GeneratedMessage> reports;
   private final Queue<ContainerAction> containerActions;
+  private final Queue<PipelineAction> pipelineActions;
   private DatanodeStateMachine.DatanodeStates state;
 
   /**
@@ -91,6 +94,7 @@ public class StateContext {
     cmdStatusMap = new ConcurrentHashMap<>();
     reports = new LinkedList<>();
     containerActions = new LinkedList<>();
+    pipelineActions = new LinkedList<>();
     lock = new ReentrantLock();
     stateExecutionCount = new AtomicLong(0);
   }
@@ -256,6 +260,47 @@ public class StateContext {
     }
   }
 
+  /**
+   * Add PipelineAction to PipelineAction queue if it's not present.
+   *
+   * @param pipelineAction PipelineAction to be added
+   */
+  public void addPipelineActionIfAbsent(PipelineAction pipelineAction) {
+    synchronized (pipelineActions) {
+      /**
+       * If pipelineAction queue already contains entry for the pipeline id
+       * with same action, we should just return.
+       * Note: We should not use pipelineActions.contains(pipelineAction) here
+       * as, pipelineAction has a msg string. So even if two msgs differ though
+       * action remains same on the given pipeline, it will end up adding it
+       * multiple times here.
+       */
+      for (PipelineAction pipelineActionIter : pipelineActions) {
+        if (pipelineActionIter.getAction() == pipelineAction.getAction()
+            && pipelineActionIter.hasClosePipeline() && pipelineAction
+            .hasClosePipeline()
+            && pipelineActionIter.getClosePipeline().getPipelineID()
+            == pipelineAction.getClosePipeline().getPipelineID()) {
+          return;
+        }
+      }
+      pipelineActions.add(pipelineAction);
+    }
+  }
+
+  /**
+   * Returns pending PipelineActions from the PipelineAction queue with a
+   * max limit on list size, or empty list if the queue is empty.
+   *
+   * @return List<ContainerAction>
+   */
+  public List<PipelineAction> getPendingPipelineAction(int maxLimit) {
+    synchronized (pipelineActions) {
+      return pipelineActions.parallelStream().limit(maxLimit)
+          .collect(Collectors.toList());
+    }
+  }
+
   /**
    * Returns the next task to get executed by the datanode state machine.
    * @return A callable that will be executed by the

+ 28 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java

@@ -24,6 +24,10 @@ import com.google.protobuf.GeneratedMessage;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineActionsProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineAction;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerActionsProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -57,6 +61,10 @@ import static org.apache.hadoop.hdds.HddsConfigKeys
     .HDDS_CONTAINER_ACTION_MAX_LIMIT;
 import static org.apache.hadoop.hdds.HddsConfigKeys
     .HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+    .HDDS_PIPELINE_ACTION_MAX_LIMIT;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+    .HDDS_PIPELINE_ACTION_MAX_LIMIT_DEFAULT;
 
 /**
  * Heartbeat class for SCMs.
@@ -70,6 +78,7 @@ public class HeartbeatEndpointTask
   private DatanodeDetailsProto datanodeDetailsProto;
   private StateContext context;
   private int maxContainerActionsPerHB;
+  private int maxPipelineActionsPerHB;
 
   /**
    * Constructs a SCM heart beat.
@@ -83,6 +92,8 @@ public class HeartbeatEndpointTask
     this.context = context;
     this.maxContainerActionsPerHB = conf.getInt(HDDS_CONTAINER_ACTION_MAX_LIMIT,
         HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT);
+    this.maxPipelineActionsPerHB = conf.getInt(HDDS_PIPELINE_ACTION_MAX_LIMIT,
+        HDDS_PIPELINE_ACTION_MAX_LIMIT_DEFAULT);
   }
 
   /**
@@ -121,6 +132,7 @@ public class HeartbeatEndpointTask
               .setDatanodeDetails(datanodeDetailsProto);
       addReports(requestBuilder);
       addContainerActions(requestBuilder);
+      addPipelineActions(requestBuilder);
       SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
           .sendHeartbeat(requestBuilder.build());
       processResponse(reponse, datanodeDetailsProto);
@@ -169,6 +181,22 @@ public class HeartbeatEndpointTask
     }
   }
 
+  /**
+   * Adds all the pending PipelineActions to the heartbeat.
+   *
+   * @param requestBuilder builder to which the report has to be added.
+   */
+  private void addPipelineActions(
+      SCMHeartbeatRequestProto.Builder requestBuilder) {
+    List<PipelineAction> actions = context.getPendingPipelineAction(
+        maxPipelineActionsPerHB);
+    if (!actions.isEmpty()) {
+      PipelineActionsProto pap = PipelineActionsProto.newBuilder()
+          .addAllPipelineActions(actions)
+          .build();
+      requestBuilder.setPipelineActions(pap);
+    }
+  }
 
   /**
    * Returns a builder class for HeartbeatEndpointTask task.

+ 16 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.shaded.com.google.protobuf
@@ -42,6 +43,7 @@ import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.shaded.proto.RaftProtos.RoleInfoProto;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
 import org.apache.ratis.statemachine.StateMachineStorage;
@@ -115,6 +117,7 @@ public class ContainerStateMachine extends BaseStateMachine {
       = new SimpleStateMachineStorage();
   private final ContainerDispatcher dispatcher;
   private ThreadPoolExecutor chunkExecutor;
+  private final XceiverServerRatis ratisServer;
   private final ConcurrentHashMap<Long, CompletableFuture<Message>>
       writeChunkFutureMap;
   private final ConcurrentHashMap<Long, StateMachineHelper> stateMachineMap;
@@ -124,9 +127,10 @@ public class ContainerStateMachine extends BaseStateMachine {
   private final CSMMetrics metrics;
 
   public ContainerStateMachine(ContainerDispatcher dispatcher,
-      ThreadPoolExecutor chunkExecutor) {
+      ThreadPoolExecutor chunkExecutor, XceiverServerRatis ratisServer) {
     this.dispatcher = dispatcher;
     this.chunkExecutor = chunkExecutor;
+    this.ratisServer = ratisServer;
     this.writeChunkFutureMap = new ConcurrentHashMap<>();
     this.stateMachineMap = new ConcurrentHashMap<>();
     metrics = CSMMetrics.create();
@@ -400,6 +404,17 @@ public class ContainerStateMachine extends BaseStateMachine {
     return future;
   }
 
+  @Override
+  public void notifySlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
+    ratisServer.handleNodeSlowness(group, roleInfoProto);
+  }
+
+  @Override
+  public void notifyExtendedNoLeader(RaftGroup group,
+      RoleInfoProto roleInfoProto) {
+    ratisServer.handleNoLeader(group, roleInfoProto);
+  }
+
   @Override
   public void close() throws IOException {
   }

+ 152 - 53
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java

@@ -26,9 +26,14 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineAction;
 import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.transport.server
     .XceiverServerSpi;
 import org.apache.ratis.RaftConfigKeys;
@@ -43,10 +48,15 @@ import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.NotLeaderException;
 import org.apache.ratis.protocol.StateMachineException;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.shaded.proto.RaftProtos;
+import org.apache.ratis.shaded.proto.RaftProtos.RoleInfoProto;
 import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
@@ -59,6 +69,7 @@ import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.SocketAddress;
 import java.util.Objects;
+import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -81,24 +92,72 @@ public final class XceiverServerRatis implements XceiverServerSpi {
   private final RaftServer server;
   private ThreadPoolExecutor chunkExecutor;
   private ClientId clientId = ClientId.randomId();
+  private final StateContext context;
+  private final ReplicationLevel replicationLevel;
+  private long nodeFailureTimeoutMs;
 
   private XceiverServerRatis(DatanodeDetails dd, int port, String storageDir,
-      ContainerDispatcher dispatcher, Configuration conf) throws IOException {
+      ContainerDispatcher dispatcher, Configuration conf, StateContext context)
+      throws IOException {
+    Objects.requireNonNull(dd, "id == null");
+    this.port = port;
+    RaftProperties serverProperties = newRaftProperties(conf, storageDir);
+    final int numWriteChunkThreads = conf.getInt(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY,
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT);
+    chunkExecutor =
+        new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads,
+            100, TimeUnit.SECONDS,
+            new ArrayBlockingQueue<>(1024),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+    this.context = context;
+    this.replicationLevel =
+        conf.getEnum(OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY,
+            OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT);
+    ContainerStateMachine stateMachine =
+        new ContainerStateMachine(dispatcher, chunkExecutor, this);
+    this.server = RaftServer.newBuilder()
+        .setServerId(RatisHelper.toRaftPeerId(dd))
+        .setGroup(RatisHelper.emptyRaftGroup())
+        .setProperties(serverProperties)
+        .setStateMachine(stateMachine)
+        .build();
+  }
+
 
+  private RaftProperties newRaftProperties(Configuration conf,
+      String storageDir) {
+    final RaftProperties properties = new RaftProperties();
+
+    // Set rpc type
     final String rpcType = conf.get(
         OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
     final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType);
+    RaftConfigKeys.Rpc.setType(properties, rpc);
+
+    // set raft segment size
     final int raftSegmentSize = conf.getInt(
         OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT);
+    RaftServerConfigKeys.Log.setSegmentSizeMax(properties,
+        SizeInBytes.valueOf(raftSegmentSize));
+
+    // set raft segment pre-allocated size
     final int raftSegmentPreallocatedSize = conf.getInt(
         OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT);
+    RaftServerConfigKeys.Log.Appender.setBufferCapacity(properties,
+        SizeInBytes.valueOf(raftSegmentPreallocatedSize));
+    RaftServerConfigKeys.Log.setPreallocatedSize(properties,
+        SizeInBytes.valueOf(raftSegmentPreallocatedSize));
+
+    // Set max write buffer size, which is the scm chunk size
     final int maxChunkSize = OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE;
-    final int numWriteChunkThreads = conf.getInt(
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY,
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT);
+    RaftServerConfigKeys.Log.setWriteBufferSize(properties,
+        SizeInBytes.valueOf(maxChunkSize));
+
+    // Set the client requestTimeout
     TimeUnit timeUnit =
         OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
             .getUnit();
@@ -108,6 +167,10 @@ public final class XceiverServerRatis implements XceiverServerSpi {
             .getDuration(), timeUnit);
     final TimeDuration clientRequestTimeout =
         TimeDuration.valueOf(duration, timeUnit);
+    RaftClientConfigKeys.Rpc
+        .setRequestTimeout(properties, clientRequestTimeout);
+
+    // Set the server Request timeout
     timeUnit = OzoneConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT
         .getUnit();
     duration = conf.getTimeDuration(
@@ -116,61 +179,44 @@ public final class XceiverServerRatis implements XceiverServerSpi {
             .getDuration(), timeUnit);
     final TimeDuration serverRequestTimeout =
         TimeDuration.valueOf(duration, timeUnit);
-
-    Objects.requireNonNull(dd, "id == null");
-    this.port = port;
-    RaftProperties serverProperties =
-        newRaftProperties(rpc, port, storageDir, maxChunkSize, raftSegmentSize,
-            raftSegmentPreallocatedSize);
-    setRequestTimeout(serverProperties, clientRequestTimeout,
-        serverRequestTimeout);
-
-    chunkExecutor =
-        new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads,
-            100, TimeUnit.SECONDS,
-            new ArrayBlockingQueue<>(1024),
-            new ThreadPoolExecutor.CallerRunsPolicy());
-    ContainerStateMachine stateMachine =
-        new ContainerStateMachine(dispatcher, chunkExecutor);
-    this.server = RaftServer.newBuilder()
-        .setServerId(RatisHelper.toRaftPeerId(dd))
-        .setGroup(RatisHelper.emptyRaftGroup())
-        .setProperties(serverProperties)
-        .setStateMachine(stateMachine)
-        .build();
-  }
-
-  private static void setRequestTimeout(RaftProperties serverProperties,
-      TimeDuration clientRequestTimeout, TimeDuration serverRequestTimeout) {
-    RaftClientConfigKeys.Rpc
-        .setRequestTimeout(serverProperties, clientRequestTimeout);
     RaftServerConfigKeys.Rpc
-        .setRequestTimeout(serverProperties, serverRequestTimeout);
-  }
+        .setRequestTimeout(properties, serverRequestTimeout);
 
-  private static RaftProperties newRaftProperties(
-      RpcType rpc, int port, String storageDir, int scmChunkSize,
-      int raftSegmentSize, int raftSegmentPreallocatedSize) {
-    final RaftProperties properties = new RaftProperties();
+    // Enable batch append on raft server
     RaftServerConfigKeys.Log.Appender.setBatchEnabled(properties, true);
-    RaftServerConfigKeys.Log.Appender.setBufferCapacity(properties,
-        SizeInBytes.valueOf(raftSegmentPreallocatedSize));
-    RaftServerConfigKeys.Log.setWriteBufferSize(properties,
-        SizeInBytes.valueOf(scmChunkSize));
-    RaftServerConfigKeys.Log.setPreallocatedSize(properties,
-        SizeInBytes.valueOf(raftSegmentPreallocatedSize));
-    RaftServerConfigKeys.Log.setSegmentSizeMax(properties,
-        SizeInBytes.valueOf(raftSegmentSize));
-    RaftServerConfigKeys.setStorageDir(properties, new File(storageDir));
-    RaftConfigKeys.Rpc.setType(properties, rpc);
 
+    // Set the maximum cache segments
     RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2);
-    GrpcConfigKeys.setMessageSizeMax(properties,
-        SizeInBytes.valueOf(scmChunkSize + raftSegmentPreallocatedSize));
+
+    // Set the ratis leader election timeout
     RaftServerConfigKeys.Rpc.setTimeoutMin(properties,
         TimeDuration.valueOf(800, TimeUnit.MILLISECONDS));
     RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
         TimeDuration.valueOf(1000, TimeUnit.MILLISECONDS));
+
+    // set the node failure timeout
+    timeUnit = OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT
+        .getUnit();
+    duration = conf.getTimeDuration(
+        OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY,
+        OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT
+            .getDuration(), timeUnit);
+    final TimeDuration nodeFailureTimeout =
+        TimeDuration.valueOf(duration, timeUnit);
+    RaftServerConfigKeys.setLeaderElectionTimeout(properties,
+        nodeFailureTimeout);
+    RaftServerConfigKeys.Rpc.setSlownessTimeout(properties,
+        nodeFailureTimeout);
+    nodeFailureTimeoutMs = nodeFailureTimeout.toLong(TimeUnit.MILLISECONDS);
+
+    // Set the ratis storage directory
+    RaftServerConfigKeys.setStorageDir(properties, new File(storageDir));
+
+    // For grpc set the maximum message size
+    GrpcConfigKeys.setMessageSizeMax(properties,
+        SizeInBytes.valueOf(maxChunkSize + raftSegmentPreallocatedSize));
+
+    // Set the ratis port number
     if (rpc == SupportedRpcType.GRPC) {
       GrpcConfigKeys.Server.setPort(properties, port);
     } else if (rpc == SupportedRpcType.NETTY) {
@@ -181,7 +227,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
 
   public static XceiverServerRatis newXceiverServerRatis(
       DatanodeDetails datanodeDetails, Configuration ozoneConf,
-      ContainerDispatcher dispatcher) throws IOException {
+      ContainerDispatcher dispatcher, StateContext context) throws IOException {
     final String ratisDir = File.separator + "ratis";
     int localPort = ozoneConf.getInt(
         OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT,
@@ -226,7 +272,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
     datanodeDetails.setPort(
         DatanodeDetails.newPort(DatanodeDetails.Port.Name.RATIS, localPort));
     return new XceiverServerRatis(datanodeDetails, localPort, storageDir,
-        dispatcher, ozoneConf);
+        dispatcher, ozoneConf, context);
   }
 
   @Override
@@ -296,7 +342,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
     // the request here are applied on all the raft servers.
     RaftClientRequest raftClientRequest =
         createRaftClientRequest(request, pipelineID,
-            RaftClientRequest.writeRequestType(ReplicationLevel.ALL));
+            RaftClientRequest.writeRequestType(replicationLevel));
     CompletableFuture<RaftClientReply> reply =
         server.submitClientRequestAsync(raftClientRequest);
     reply.thenAccept(this::processReply);
@@ -309,4 +355,57 @@ public final class XceiverServerRatis implements XceiverServerSpi {
         PipelineID.getFromProtobuf(pipelineID).getRaftGroupID(),
         nextCallId(), 0, Message.valueOf(request.toByteString()), type);
   }
+
+  private void handlePipelineFailure(RaftGroupId groupId,
+      RoleInfoProto roleInfoProto) {
+    String msg;
+    UUID datanode = RatisHelper.toDatanodeId(roleInfoProto.getSelf());
+    RaftPeerId id = RaftPeerId.valueOf(roleInfoProto.getSelf().getId());
+    switch (roleInfoProto.getRole()) {
+    case CANDIDATE:
+      msg = datanode + " is in candidate state for " +
+          roleInfoProto.getCandidateInfo().getLastLeaderElapsedTimeMs() + "ms";
+      break;
+    case LEADER:
+      StringBuilder sb = new StringBuilder();
+      sb.append(datanode).append(" has not seen follower/s");
+      for (RaftProtos.ServerRpcProto follower : roleInfoProto.getLeaderInfo()
+          .getFollowerInfoList()) {
+        if (follower.getLastRpcElapsedTimeMs() > nodeFailureTimeoutMs) {
+          sb.append(" ").append(RatisHelper.toDatanodeId(follower.getId()))
+              .append(" for ").append(follower.getLastRpcElapsedTimeMs())
+              .append("ms");
+        }
+      }
+      msg = sb.toString();
+      break;
+    default:
+      LOG.error("unknown state:" + roleInfoProto.getRole());
+      throw new IllegalStateException("node" + id + " is in illegal role "
+          + roleInfoProto.getRole());
+    }
+
+    PipelineID pipelineID = PipelineID.valueOf(groupId);
+    ClosePipelineInfo.Builder closePipelineInfo =
+        ClosePipelineInfo.newBuilder()
+            .setPipelineID(pipelineID.getProtobuf())
+            .setReason(ClosePipelineInfo.Reason.PIPELINE_FAILED)
+            .setDetailedReason(msg);
+
+    PipelineAction action = PipelineAction.newBuilder()
+        .setClosePipeline(closePipelineInfo)
+        .setAction(PipelineAction.Action.CLOSE)
+        .build();
+    context.addPipelineActionIfAbsent(action);
+  }
+
+  void handleNodeSlowness(
+      RaftGroup group, RoleInfoProto roleInfoProto) {
+    handlePipelineFailure(group.getGroupId(), roleInfoProto);
+  }
+
+  void handleNoLeader(
+      RaftGroup group, RoleInfoProto roleInfoProto) {
+    handlePipelineFailure(group.getGroupId(), roleInfoProto);
+  }
 }

+ 1 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java

@@ -84,7 +84,7 @@ public class OzoneContainer {
         new XceiverServerGrpc(datanodeDetails, this.config, this
             .hddsDispatcher, createReplicationService()),
         XceiverServerRatis.newXceiverServerRatis(datanodeDetails, this
-            .config, hddsDispatcher)
+            .config, hddsDispatcher, context)
     };
 
 

+ 26 - 0
hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto

@@ -81,6 +81,7 @@ message SCMHeartbeatRequestProto {
   optional ContainerReportsProto containerReport = 3;
   optional CommandStatusReportsProto commandStatusReport = 4;
   optional ContainerActionsProto containerActions = 5;
+  optional PipelineActionsProto pipelineActions = 6;
 }
 
 /*
@@ -162,6 +163,31 @@ message ContainerAction {
   optional Reason reason = 3;
 }
 
+message PipelineActionsProto {
+  repeated PipelineAction pipelineActions = 1;
+}
+
+message ClosePipelineInfo {
+  enum Reason {
+    PIPELINE_FAILED = 1;
+  }
+  required PipelineID pipelineID = 1;
+  optional Reason reason = 3;
+  optional string detailedReason = 4;
+}
+
+message PipelineAction {
+  enum Action {
+    CLOSE = 1;
+  }
+
+  /**
+   * Action will be used to identify the correct pipeline action.
+   */
+  required Action action = 1;
+  optional ClosePipelineInfo closePipeline = 2;
+}
+
 /**
 A container report contains the following information.
 */

+ 21 - 6
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java

@@ -200,8 +200,7 @@ public class ContainerMapping implements Mapping {
       Pipeline pipeline;
       if (contInfo.isContainerOpen()) {
         // If pipeline with given pipeline Id already exist return it
-        pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID(),
-            contInfo.getReplicationType());
+        pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID());
         if (pipeline == null) {
           pipeline = pipelineSelector
               .getReplicationPipeline(contInfo.getReplicationType(),
@@ -389,8 +388,7 @@ public class ContainerMapping implements Mapping {
           .updateContainerState(containerInfo, event);
       if (!updatedContainer.isContainerOpen()) {
         Pipeline pipeline = pipelineSelector
-            .getPipeline(containerInfo.getPipelineID(),
-                containerInfo.getReplicationType());
+            .getPipeline(containerInfo.getPipelineID());
         pipelineSelector.closePipelineIfNoOpenContainers(pipeline);
       }
       containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray());
@@ -470,8 +468,7 @@ public class ContainerMapping implements Mapping {
       return null;
     }
     Pipeline pipeline = pipelineSelector
-        .getPipeline(containerInfo.getPipelineID(),
-            containerInfo.getReplicationType());
+        .getPipeline(containerInfo.getPipelineID());
     if (pipeline == null) {
       pipeline = pipelineSelector
           .getReplicationPipeline(containerInfo.getReplicationType(),
@@ -480,6 +477,24 @@ public class ContainerMapping implements Mapping {
     return new ContainerWithPipeline(containerInfo, pipeline);
   }
 
+  public void handlePipelineClose(PipelineID pipelineID) {
+    try {
+      Pipeline pipeline = pipelineSelector.getPipeline(pipelineID);
+      if (pipeline != null) {
+        pipelineSelector.finalizePipeline(pipeline);
+      } else {
+        LOG.debug("pipeline:{} not found", pipelineID);
+      }
+    } catch (Exception e) {
+      LOG.info("failed to close pipeline:{}", pipelineID, e);
+    }
+  }
+
+  public Set<PipelineID> getPipelineOnDatanode(
+      DatanodeDetails datanodeDetails) {
+    return pipelineSelector.getPipelineId(datanodeDetails.getUuid());
+  }
+
   /**
    * Process container report from Datanode.
    * <p>

+ 2 - 3
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java

@@ -486,10 +486,9 @@ public class ContainerStateManager implements Closeable {
    * @throws IOException
    */
   public ContainerWithPipeline getContainer(PipelineSelector selector,
-      ContainerID containerID) throws IOException {
+      ContainerID containerID) {
     ContainerInfo info = containers.getContainerInfo(containerID.getId());
-    Pipeline pipeline = selector.getPipeline(info.getPipelineID(),
-        info.getReplicationType());
+    Pipeline pipeline = selector.getPipeline(info.getPipelineID());
     return new ContainerWithPipeline(info, pipeline);
   }
 

+ 14 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java

@@ -25,11 +25,13 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Mapping class contains the mapping from a name to a pipeline mapping. This is
@@ -135,4 +137,16 @@ public interface Mapping extends Closeable {
   ContainerWithPipeline getMatchingContainerWithPipeline(long size,
       String owner, ReplicationType type, ReplicationFactor factor,
       LifeCycleState state) throws IOException;
+
+  /**
+   * Handle a pipeline close event.
+   * @param pipelineID pipeline id
+   */
+  void handlePipelineClose(PipelineID pipelineID);
+
+  /**
+   * Get set of pipeline for a specific datanode.
+   * @param datanodeDetails datanode for which pipelines needs to be fetched.
+   */
+  Set<PipelineID> getPipelineOnDatanode(DatanodeDetails datanodeDetails);
 }

+ 22 - 2
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java

@@ -29,6 +29,9 @@ import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
     .ReplicationStatus;
 import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler.CloseContainerRetryableReq;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+    .PipelineActionsFromDatanode;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .ContainerActionsFromDatanode;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
@@ -72,6 +75,23 @@ public final class SCMEvents {
   public static final TypedEvent<ContainerActionsFromDatanode>
       CONTAINER_ACTIONS = new TypedEvent<>(ContainerActionsFromDatanode.class,
       "Container_Actions");
+
+  /**
+   * PipelineActions are sent by Datanode. This event is received by
+   * SCMDatanodeHeartbeatDispatcher and PIPELINE_ACTIONS event is generated.
+   */
+  public static final TypedEvent<PipelineActionsFromDatanode>
+      PIPELINE_ACTIONS = new TypedEvent<>(PipelineActionsFromDatanode.class,
+      "Pipeline_Actions");
+
+  /**
+   * Pipeline close event are triggered to close pipeline because of failure,
+   * stale node, decommissioning etc.
+   */
+  public static final TypedEvent<PipelineID>
+      PIPELINE_CLOSE = new TypedEvent<>(PipelineID.class,
+      "Pipeline_Close");
+
   /**
    * A Command status report will be sent by datanodes. This repoort is received
    * by SCMDatanodeHeartbeatDispatcher and CommandReport event is generated.
@@ -155,7 +175,7 @@ public final class SCMEvents {
    */
   public static final Event<DeleteBlockCommandStatus>
       DELETE_BLOCK_STATUS =
-      new TypedEvent(DeleteBlockCommandStatus.class,
+      new TypedEvent<>(DeleteBlockCommandStatus.class,
           "DeleteBlockCommandStatus");
 
   /**
@@ -164,7 +184,7 @@ public final class SCMEvents {
    * deleteTransactionID on SCM.
    */
   public static final Event<PendingDeleteStatusList> PENDING_DELETE_STATUS =
-      new TypedEvent(PendingDeleteStatusList.class, "PendingDeleteStatus");
+      new TypedEvent<>(PendingDeleteStatusList.class, "PendingDeleteStatus");
 
   /**
    * This is the command for ReplicationManager to handle under/over

+ 14 - 2
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java

@@ -19,24 +19,36 @@
 package org.apache.hadoop.hdds.scm.node;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 
+import java.util.Set;
+
 /**
  * Handles Stale node event.
  */
 public class StaleNodeHandler implements EventHandler<DatanodeDetails> {
 
   private final Node2ContainerMap node2ContainerMap;
+  private final Mapping containerManager;
 
-  public StaleNodeHandler(Node2ContainerMap node2ContainerMap) {
+  public StaleNodeHandler(Node2ContainerMap node2ContainerMap,
+      Mapping containerManager) {
     this.node2ContainerMap = node2ContainerMap;
+    this.containerManager = containerManager;
   }
 
   @Override
   public void onMessage(DatanodeDetails datanodeDetails,
                         EventPublisher publisher) {
-    //TODO: logic to handle stale node.
+    Set<PipelineID> pipelineIDs =
+        containerManager.getPipelineOnDatanode(datanodeDetails);
+    for (PipelineID id : pipelineIDs) {
+      publisher.fireEvent(SCMEvents.PIPELINE_CLOSE, id);
+    }
   }
 }

+ 9 - 25
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java

@@ -21,7 +21,7 @@ package org.apache.hadoop.hdds.scm.pipelines;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 
 import java.util.Collections;
 import java.util.HashSet;
@@ -30,8 +30,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
-import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.DUPLICATE_DATANODE;
-
 /**
  * This data structure maintains the list of pipelines which the given datanode is a part of. This
  * information will be added whenever a new pipeline allocation happens.
@@ -39,7 +37,7 @@ import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.DUP
  * <p>TODO: this information needs to be regenerated from pipeline reports on SCM restart
  */
 public class Node2PipelineMap {
-  private final Map<UUID, Set<Pipeline>> dn2PipelineMap;
+  private final Map<UUID, Set<PipelineID>> dn2PipelineMap;
 
   /** Constructs a Node2PipelineMap Object. */
   public Node2PipelineMap() {
@@ -57,20 +55,6 @@ public class Node2PipelineMap {
     return dn2PipelineMap.containsKey(datanodeID);
   }
 
-  /**
-   * Insert a new datanode into Node2Pipeline Map.
-   *
-   * @param datanodeID -- Datanode UUID
-   * @param pipelines - set of pipelines.
-   */
-  private void insertNewDatanode(UUID datanodeID, Set<Pipeline> pipelines) throws SCMException {
-    Preconditions.checkNotNull(pipelines);
-    Preconditions.checkNotNull(datanodeID);
-    if (dn2PipelineMap.putIfAbsent(datanodeID, pipelines) != null) {
-      throw new SCMException("Node already exists in the map", DUPLICATE_DATANODE);
-    }
-  }
-
   /**
    * Removes datanode Entry from the map.
    *
@@ -87,9 +71,10 @@ public class Node2PipelineMap {
    * @param datanode - UUID
    * @return Set of pipelines or Null.
    */
-  public Set<Pipeline> getPipelines(UUID datanode) {
+  public Set<PipelineID> getPipelines(UUID datanode) {
     Preconditions.checkNotNull(datanode);
-    return dn2PipelineMap.computeIfPresent(datanode, (k, v) -> Collections.unmodifiableSet(v));
+    final Set<PipelineID> s = dn2PipelineMap.get(datanode);
+    return s != null? Collections.unmodifiableSet(s): Collections.emptySet();
   }
 
   /**
@@ -100,9 +85,8 @@ public class Node2PipelineMap {
   public synchronized void addPipeline(Pipeline pipeline) {
     for (DatanodeDetails details : pipeline.getDatanodes().values()) {
       UUID dnId = details.getUuid();
-      dn2PipelineMap
-          .computeIfAbsent(dnId, k -> Collections.synchronizedSet(new HashSet<>()))
-          .add(pipeline);
+      dn2PipelineMap.computeIfAbsent(dnId, k -> new HashSet<>())
+          .add(pipeline.getId());
     }
   }
 
@@ -112,13 +96,13 @@ public class Node2PipelineMap {
       dn2PipelineMap.computeIfPresent(
           dnId,
           (k, v) -> {
-            v.remove(pipeline);
+            v.remove(pipeline.getId());
             return v;
           });
     }
   }
 
-  public Map<UUID, Set<Pipeline>> getDn2PipelineMap() {
+  public Map<UUID, Set<PipelineID>> getDn2PipelineMap() {
     return Collections.unmodifiableMap(dn2PipelineMap);
   }
 }

+ 60 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineActionEventHandler.java

@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipelines;
+
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineAction;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+    .PipelineActionsFromDatanode;
+
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles pipeline actions from datanode.
+ */
+public class PipelineActionEventHandler implements
+    EventHandler<PipelineActionsFromDatanode> {
+
+  public static final Logger LOG = LoggerFactory.getLogger(
+      PipelineActionEventHandler.class);
+
+  public PipelineActionEventHandler() {
+
+  }
+
+  @Override
+  public void onMessage(PipelineActionsFromDatanode report,
+      EventPublisher publisher) {
+    for (PipelineAction action : report.getReport().getPipelineActionsList()) {
+      switch (action.getAction()) {
+      case CLOSE:
+        PipelineID pipelineID = PipelineID.
+            getFromProtobuf(action.getClosePipeline().getPipelineID());
+        publisher.fireEvent(SCMEvents.PIPELINE_CLOSE, pipelineID);
+        break;
+      default:
+        LOG.error("unknown pipeline action:{}" + action.getAction());
+      }
+    }
+  }
+}

+ 38 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java

@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipelines;
+
+import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+
+/**
+ * Handles pipeline close event.
+ */
+public class PipelineCloseHandler implements EventHandler<PipelineID> {
+  private final Mapping mapping;
+  public PipelineCloseHandler(Mapping mapping) {
+    this.mapping = mapping;
+  }
+
+  @Override
+  public void onMessage(PipelineID pipelineID, EventPublisher publisher) {
+    mapping.handlePipelineClose(pipelineID);
+  }
+}

+ 5 - 5
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java

@@ -18,7 +18,6 @@ package org.apache.hadoop.hdds.scm.pipelines;
 
 import java.util.LinkedList;
 import java.util.Map;
-import java.util.WeakHashMap;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
@@ -43,11 +42,12 @@ public abstract class PipelineManager {
   private final AtomicInteger pipelineIndex;
   private final Node2PipelineMap node2PipelineMap;
 
-  public PipelineManager(Node2PipelineMap map) {
+  public PipelineManager(Node2PipelineMap map,
+      Map<PipelineID, Pipeline> pipelineMap) {
     activePipelines = new LinkedList<>();
     pipelineIndex = new AtomicInteger(0);
-    pipelineMap = new WeakHashMap<>();
-    node2PipelineMap = map;
+    this.pipelineMap = pipelineMap;
+    this.node2PipelineMap = map;
   }
 
   /**
@@ -187,7 +187,7 @@ public abstract class PipelineManager {
    *
    * @param pipeline
    */
-  public void closePipeline(Pipeline pipeline) {
+  public void closePipeline(Pipeline pipeline) throws IOException {
     pipelineMap.remove(pipeline.getId());
     node2PipelineMap.removePipeline(pipeline);
   }

+ 22 - 24
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java

@@ -55,6 +55,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.Set;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -77,6 +79,7 @@ public class PipelineSelector {
   private final StandaloneManagerImpl standaloneManager;
   private final long containerSize;
   private final Node2PipelineMap node2PipelineMap;
+  private final Map<PipelineID, Pipeline> pipelineMap;
   private final LeaseManager<Pipeline> pipelineLeaseManager;
   private final StateMachine<LifeCycleState,
       HddsProtos.LifeCycleEvent> stateMachine;
@@ -99,12 +102,13 @@ public class PipelineSelector {
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
         StorageUnit.BYTES);
     node2PipelineMap = new Node2PipelineMap();
+    pipelineMap = new ConcurrentHashMap<>();
     this.standaloneManager =
         new StandaloneManagerImpl(this.nodeManager, placementPolicy,
-            containerSize, node2PipelineMap);
+            containerSize, node2PipelineMap, pipelineMap);
     this.ratisManager =
         new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize,
-            conf, node2PipelineMap);
+            conf, node2PipelineMap, pipelineMap);
     // Initialize the container state machine.
     Set<HddsProtos.LifeCycleState> finalStates = new HashSet();
     long pipelineCreationLeaseTimeout = conf.getTimeDuration(
@@ -303,19 +307,10 @@ public class PipelineSelector {
   }
 
   /**
-   * This function to return pipeline for given pipeline name and replication
-   * type.
+   * This function to return pipeline for given pipeline id.
    */
-  public Pipeline getPipeline(PipelineID pipelineID,
-      ReplicationType replicationType) throws IOException {
-    if (pipelineID == null) {
-      return null;
-    }
-    PipelineManager manager = getPipelineManager(replicationType);
-    Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
-    LOG.debug("Getting replication pipeline forReplicationType {} :" +
-        " pipelineName:{}", replicationType, pipelineID);
-    return manager.getPipeline(pipelineID);
+  public Pipeline getPipeline(PipelineID pipelineID) {
+    return pipelineMap.get(pipelineID);
   }
 
   /**
@@ -324,9 +319,18 @@ public class PipelineSelector {
   public void finalizePipeline(Pipeline pipeline) throws IOException {
     PipelineManager manager = getPipelineManager(pipeline.getType());
     Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
-    LOG.debug("Finalizing pipeline. pipelineID: {}", pipeline.getId());
+    if (pipeline.getLifeCycleState() == LifeCycleState.CLOSING ||
+        pipeline.getLifeCycleState() == LifeCycleState.CLOSED) {
+      LOG.debug("pipeline:{} already in closing state, skipping",
+          pipeline.getId());
+      // already in closing/closed state
+      return;
+    }
+
     // Remove the pipeline from active allocation
     manager.finalizePipeline(pipeline);
+
+    LOG.info("Finalizing pipeline. pipelineID: {}", pipeline.getId());
     updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.FINALIZE);
     closePipelineIfNoOpenContainers(pipeline);
   }
@@ -350,7 +354,7 @@ public class PipelineSelector {
   /**
    * Close a given pipeline.
    */
-  private void closePipeline(Pipeline pipeline) {
+  private void closePipeline(Pipeline pipeline) throws IOException {
     PipelineManager manager = getPipelineManager(pipeline.getType());
     Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
     LOG.debug("Closing pipeline. pipelineID: {}", pipeline.getId());
@@ -400,14 +404,8 @@ public class PipelineSelector {
     return node2PipelineMap;
   }
 
-  public void removePipeline(UUID dnId) {
-    Set<Pipeline> pipelineSet =
-        node2PipelineMap.getPipelines(dnId);
-    for (Pipeline pipeline : pipelineSet) {
-      getPipelineManager(pipeline.getType())
-          .closePipeline(pipeline);
-    }
-    node2PipelineMap.removeDatanode(dnId);
+  public Set<PipelineID> getPipelineId(UUID dnId) {
+    return node2PipelineMap.getPipelines(dnId);
   }
 
   /**

+ 9 - 5
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java

@@ -39,6 +39,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import java.util.Map;
 
 /**
  * Implementation of {@link PipelineManager}.
@@ -59,8 +60,8 @@ public class RatisManagerImpl extends PipelineManager {
    */
   public RatisManagerImpl(NodeManager nodeManager,
       ContainerPlacementPolicy placementPolicy, long size, Configuration conf,
-      Node2PipelineMap map) {
-    super(map);
+      Node2PipelineMap map, Map<PipelineID, Pipeline> pipelineMap) {
+    super(map, pipelineMap);
     this.conf = conf;
     this.nodeManager = nodeManager;
     ratisMembers = new HashSet<>();
@@ -101,20 +102,23 @@ public class RatisManagerImpl extends PipelineManager {
     //TODO:move the initialization from SCM to client
     try (XceiverClientRatis client =
         XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
-      client.createPipeline(pipeline);
+      client.createPipeline();
     }
   }
 
   /**
    * Close the pipeline.
    */
-  public void closePipeline(Pipeline pipeline) {
+  public void closePipeline(Pipeline pipeline) throws IOException {
     super.closePipeline(pipeline);
     for (DatanodeDetails node : pipeline.getMachines()) {
       // A node should always be the in ratis members list.
       Preconditions.checkArgument(ratisMembers.remove(node));
     }
-    //TODO: should the raft ring also be destroyed as well?
+    try (XceiverClientRatis client =
+        XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
+      client.destroyPipeline();
+    }
   }
 
   /**

+ 4 - 3
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java

@@ -37,6 +37,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import java.util.Map;
 
 /**
  * Standalone Manager Impl to prove that pluggable interface
@@ -58,8 +59,8 @@ public class StandaloneManagerImpl extends PipelineManager {
    */
   public StandaloneManagerImpl(NodeManager nodeManager,
       ContainerPlacementPolicy placementPolicy, long containerSize,
-      Node2PipelineMap map) {
-    super(map);
+      Node2PipelineMap map, Map<PipelineID, Pipeline> pipelineMap) {
+    super(map, pipelineMap);
     this.nodeManager = nodeManager;
     this.placementPolicy = placementPolicy;
     this.containerSize =  containerSize;
@@ -103,7 +104,7 @@ public class StandaloneManagerImpl extends PipelineManager {
   /**
    * Close the pipeline.
    */
-  public void closePipeline(Pipeline pipeline) {
+  public void closePipeline(Pipeline pipeline) throws IOException {
     super.closePipeline(pipeline);
     for (DatanodeDetails node : pipeline.getMachines()) {
       // A node should always be the in standalone members list.

+ 23 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java

@@ -19,6 +19,8 @@ package org.apache.hadoop.hdds.scm.server;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineActionsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerActionsProto;
 import org.apache.hadoop.hdds.protocol.proto.
@@ -43,6 +45,8 @@ import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_ACTIONS;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.CMD_STATUS_REPORT;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_ACTIONS;
+
 /**
  * This class is responsible for dispatching heartbeat from datanode to
  * appropriate EventHandler at SCM.
@@ -99,6 +103,13 @@ public final class SCMDatanodeHeartbeatDispatcher {
               heartbeat.getContainerActions()));
     }
 
+    if (heartbeat.hasPipelineActions()) {
+      LOG.debug("Dispatching Pipeline Actions.");
+      eventPublisher.fireEvent(PIPELINE_ACTIONS,
+          new PipelineActionsFromDatanode(datanodeDetails,
+              heartbeat.getPipelineActions()));
+    }
+
     if (heartbeat.hasCommandStatusReport()) {
       eventPublisher.fireEvent(CMD_STATUS_REPORT,
           new CommandStatusReportFromDatanode(datanodeDetails,
@@ -167,6 +178,18 @@ public final class SCMDatanodeHeartbeatDispatcher {
     }
   }
 
+  /**
+   * Pipeline action event payload with origin.
+   */
+  public static class PipelineActionsFromDatanode
+      extends ReportFromDatanode<PipelineActionsProto> {
+
+    public PipelineActionsFromDatanode(DatanodeDetails datanodeDetails,
+        PipelineActionsProto actions) {
+      super(datanodeDetails, actions);
+    }
+  }
+
   /**
    * Container report event payload with origin.
    */

+ 12 - 1
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java

@@ -62,6 +62,8 @@ import org.apache.hadoop.hdds.scm.node.NodeReportHandler;
 import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
 import org.apache.hadoop.hdds.scm.node.StaleNodeHandler;
 import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineCloseHandler;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineActionEventHandler;
 import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -218,7 +220,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
         new CommandStatusReportHandler();
 
     NewNodeHandler newNodeHandler = new NewNodeHandler(node2ContainerMap);
-    StaleNodeHandler staleNodeHandler = new StaleNodeHandler(node2ContainerMap);
+    StaleNodeHandler staleNodeHandler =
+        new StaleNodeHandler(node2ContainerMap, scmContainerManager);
     DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap,
         getScmContainerManager().getStateManager());
     ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
@@ -229,6 +232,11 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
         new ContainerReportHandler(scmContainerManager, node2ContainerMap,
             replicationStatus);
 
+    PipelineActionEventHandler pipelineActionEventHandler =
+        new PipelineActionEventHandler();
+
+    PipelineCloseHandler pipelineCloseHandler =
+        new PipelineCloseHandler(scmContainerManager);
 
     eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
     eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
@@ -242,6 +250,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     eventQueue.addHandler(SCMEvents.START_REPLICATION, replicationStatus);
     eventQueue
         .addHandler(SCMEvents.PENDING_DELETE_STATUS, pendingDeleteHandler);
+    eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS,
+        pipelineActionEventHandler);
+    eventQueue.addHandler(SCMEvents.PIPELINE_CLOSE, pipelineCloseHandler);
 
     long watcherTimeout =
         conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,

+ 3 - 3
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java

@@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerMapping;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
@@ -97,10 +97,10 @@ public class TestNode2PipelineMap {
     Assert.assertEquals(3, dns.size());
 
     // get pipeline details by dnid
-    Set<Pipeline> pipelines = mapping.getPipelineSelector()
+    Set<PipelineID> pipelines = mapping.getPipelineSelector()
         .getNode2PipelineMap().getPipelines(dns.get(0).getUuid());
     Assert.assertEquals(1, pipelines.size());
-    pipelines.forEach(p -> Assert.assertEquals(p.getId(),
+    pipelines.forEach(p -> Assert.assertEquals(p,
         ratisContainer.getPipeline().getId()));
 
 

+ 126 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java

@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerMapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
+    .ReplicationFactor.THREE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
+    .ReplicationType.RATIS;
+
+/**
+ * Test Node failure detection and handling in Ratis.
+ */
+public class TestNodeFailure {
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+  private static ContainerWithPipeline ratisContainer1;
+  private static ContainerWithPipeline ratisContainer2;
+  private static ContainerMapping mapping;
+  private static long timeForFailure;
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    conf.setTimeDuration(OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY,
+        10, TimeUnit.SECONDS);
+    conf.setTimeDuration(
+        ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT,
+        10, TimeUnit.SECONDS);
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(6)
+        .setHbInterval(1000)
+        .setHbProcessorInterval(1000)
+        .build();
+    cluster.waitForClusterToBeReady();
+    StorageContainerManager scm = cluster.getStorageContainerManager();
+    mapping = (ContainerMapping)scm.getScmContainerManager();
+    ratisContainer1 = mapping.allocateContainer(RATIS, THREE, "testOwner");
+    ratisContainer2 = mapping.allocateContainer(RATIS, THREE, "testOwner");
+    // At this stage, there should be 2 pipeline one with 1 open container each.
+    // Try closing the both the pipelines, one with a closed container and
+    // the other with an open container.
+    timeForFailure = conf.getTimeDuration(
+        OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY,
+        OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT
+            .getDuration(), TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testPipelineFail() throws InterruptedException, IOException,
+      TimeoutException {
+    Assert.assertEquals(ratisContainer1.getPipeline().getLifeCycleState(),
+        HddsProtos.LifeCycleState.OPEN);
+    Pipeline pipelineToFail = ratisContainer1.getPipeline();
+    DatanodeDetails dnToFail = pipelineToFail.getMachines().get(0);
+    cluster.shutdownHddsDatanode(dnToFail);
+
+    // wait for sufficient time for the callback to be triggered
+    Thread.sleep(3 * timeForFailure);
+
+    Assert.assertEquals(HddsProtos.LifeCycleState.CLOSED,
+        ratisContainer1.getPipeline().getLifeCycleState());
+    Assert.assertEquals(HddsProtos.LifeCycleState.OPEN,
+        ratisContainer2.getPipeline().getLifeCycleState());
+    Assert.assertNull(
+        mapping.getPipelineSelector().getPipeline(pipelineToFail.getId()));
+    // Now restart the datanode and make sure that a new pipeline is created.
+    cluster.restartHddsDatanode(dnToFail);
+    ContainerWithPipeline ratisContainer3 =
+        mapping.allocateContainer(RATIS, THREE, "testOwner");
+    //Assert that new container is not created from the ratis 2 pipeline
+    Assert.assertNotEquals(ratisContainer3.getPipeline().getId(),
+        ratisContainer2.getPipeline().getId());
+  }
+}

+ 2 - 4
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java

@@ -112,8 +112,7 @@ public class TestPipelineClose {
 
     pipelineSelector.finalizePipeline(ratisContainer1.getPipeline());
     Pipeline pipeline1 = pipelineSelector
-        .getPipeline(ratisContainer1.getPipeline().getId(),
-            ratisContainer1.getContainerInfo().getReplicationType());
+        .getPipeline(ratisContainer1.getPipeline().getId());
     Assert.assertNull(pipeline1);
     Assert.assertEquals(ratisContainer1.getPipeline().getLifeCycleState(),
         HddsProtos.LifeCycleState.CLOSED);
@@ -140,8 +139,7 @@ public class TestPipelineClose {
     Assert.assertEquals(ratisContainer2.getPipeline().getLifeCycleState(),
         HddsProtos.LifeCycleState.CLOSING);
     Pipeline pipeline2 = pipelineSelector
-        .getPipeline(ratisContainer2.getPipeline().getId(),
-            ratisContainer2.getContainerInfo().getReplicationType());
+        .getPipeline(ratisContainer2.getPipeline().getId());
     Assert.assertEquals(pipeline2.getLifeCycleState(),
         HddsProtos.LifeCycleState.CLOSING);
   }

+ 15 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.om.OzoneManager;
@@ -155,6 +156,13 @@ public interface MiniOzoneCluster {
   void restartHddsDatanode(int i) throws InterruptedException,
       TimeoutException;
 
+  /**
+   * Restart a particular HddsDatanode.
+   *
+   * @param dn HddsDatanode in the MiniOzoneCluster
+   */
+  void restartHddsDatanode(DatanodeDetails dn) throws InterruptedException,
+      TimeoutException, IOException;
   /**
    * Shutdown a particular HddsDatanode.
    *
@@ -162,6 +170,13 @@ public interface MiniOzoneCluster {
    */
   void shutdownHddsDatanode(int i);
 
+  /**
+   * Shutdown a particular HddsDatanode.
+   *
+   * @param dn HddsDatanode in the MiniOzoneCluster
+   */
+  void shutdownHddsDatanode(DatanodeDetails dn) throws IOException;
+
   /**
    * Shutdown the MiniOzoneCluster.
    */

+ 21 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java

@@ -157,6 +157,16 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
     return hddsDatanodes;
   }
 
+  private int getHddsDatanodeIndex(DatanodeDetails dn) throws IOException {
+    for (HddsDatanodeService service : hddsDatanodes) {
+      if (service.getDatanodeDetails().equals(dn)) {
+        return hddsDatanodes.indexOf(service);
+      }
+    }
+    throw new IOException(
+        "Not able to find datanode with datanode Id " + dn.getUuid());
+  }
+
   @Override
   public OzoneClient getClient() throws IOException {
     return OzoneClientFactory.getClient(conf);
@@ -242,11 +252,22 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
     waitForClusterToBeReady();
   }
 
+  @Override
+  public void restartHddsDatanode(DatanodeDetails dn)
+      throws InterruptedException, TimeoutException, IOException {
+    restartHddsDatanode(getHddsDatanodeIndex(dn));
+  }
+
   @Override
   public void shutdownHddsDatanode(int i) {
     hddsDatanodes.get(i).stop();
   }
 
+  @Override
+  public void shutdownHddsDatanode(DatanodeDetails dn) throws IOException {
+    shutdownHddsDatanode(getHddsDatanodeIndex(dn));
+  }
+
   @Override
   public void shutdown() {
     try {

+ 2 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java

@@ -156,7 +156,8 @@ public class TestCSMMetrics {
     conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
 
     final ContainerDispatcher dispatcher = new TestContainerDispatcher();
-    return XceiverServerRatis.newXceiverServerRatis(dn, conf, dispatcher);
+    return XceiverServerRatis.newXceiverServerRatis(dn, conf, dispatcher,
+        null);
   }
 
   static void initXceiverServerRatis(

+ 2 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java

@@ -138,7 +138,8 @@ public class TestContainerServer {
     conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
 
     final ContainerDispatcher dispatcher = new TestContainerDispatcher();
-    return XceiverServerRatis.newXceiverServerRatis(dn, conf, dispatcher);
+    return XceiverServerRatis
+        .newXceiverServerRatis(dn, conf, dispatcher, null);
   }
 
   static void initXceiverServerRatis(

+ 1 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java

@@ -69,7 +69,7 @@ public class TestContainerStateMachine {
           new ArrayBlockingQueue<>(1024),
           new ThreadPoolExecutor.CallerRunsPolicy());
   private ContainerStateMachine stateMachine =
-      new ContainerStateMachine(new TestContainerDispatcher(), executor);
+      new ContainerStateMachine(new TestContainerDispatcher(), executor, null);
 
 
   @Test