Browse Source

HDDS-844. Add logic for pipeline teardown after timeout. Contributed by Lokesh Jain.

Mukul Kumar Singh 6 years ago
parent
commit
cfb915f3df
18 changed files with 464 additions and 175 deletions
  1. 0 13
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
  2. 0 48
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
  3. 2 1
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
  4. 4 4
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
  5. 0 11
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
  6. 6 7
      hadoop-hdds/common/src/main/resources/ozone-default.xml
  7. 11 3
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
  8. 16 10
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
  9. 3 9
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
  10. 0 6
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
  11. 2 8
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
  12. 176 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
  13. 2 2
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
  14. 25 5
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
  15. 106 32
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
  16. 0 14
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
  17. 2 2
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
  18. 109 0
      hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java

+ 0 - 13
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java

@@ -288,19 +288,6 @@ public class XceiverClientGrpc extends XceiverClientSpi {
     }
     }
   }
   }
 
 
-  /**
-   * Create a pipeline.
-   */
-  @Override
-  public void createPipeline() {
-    // For stand alone pipeline, there is no notion called setup pipeline.
-  }
-
-  @Override
-  public void destroyPipeline() {
-    // For stand alone pipeline, there is no notion called destroy pipeline.
-  }
-
   @Override
   @Override
   public void watchForCommit(long index, long timeout)
   public void watchForCommit(long index, long timeout)
       throws InterruptedException, ExecutionException, TimeoutException {
       throws InterruptedException, ExecutionException, TimeoutException {

+ 0 - 48
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java

@@ -19,7 +19,6 @@
 package org.apache.hadoop.hdds.scm;
 package org.apache.hadoop.hdds.scm;
 
 
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.HddsUtils;
-import org.apache.hadoop.io.MultipleIOException;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.thirdparty.com.google.protobuf
 import org.apache.ratis.thirdparty.com.google.protobuf
@@ -27,7 +26,6 @@ import org.apache.ratis.thirdparty.com.google.protobuf
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@@ -36,19 +34,14 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.ratis.RatisHelper;
 import org.apache.ratis.RatisHelper;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.ratis.util.CheckedBiConsumer;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 import java.util.Objects;
 import java.util.Objects;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletableFuture;
@@ -97,22 +90,6 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     this.retryPolicy = retryPolicy;
     this.retryPolicy = retryPolicy;
   }
   }
 
 
-  @Override
-  public void createPipeline() throws IOException {
-    final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
-    LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group);
-    callRatisRpc(pipeline.getNodes(),
-        (raftClient, peer) -> raftClient.groupAdd(group, peer.getId()));
-  }
-
-  @Override
-  public void destroyPipeline() throws IOException {
-    final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
-    LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group);
-    callRatisRpc(pipeline.getNodes(), (raftClient, peer) -> raftClient
-        .groupRemove(group.getGroupId(), true, peer.getId()));
-  }
-
   /**
   /**
    * Returns Ratis as pipeline Type.
    * Returns Ratis as pipeline Type.
    *
    *
@@ -123,31 +100,6 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     return HddsProtos.ReplicationType.RATIS;
     return HddsProtos.ReplicationType.RATIS;
   }
   }
 
 
-  private void callRatisRpc(List<DatanodeDetails> datanodes,
-      CheckedBiConsumer<RaftClient, RaftPeer, IOException> rpc)
-      throws IOException {
-    if (datanodes.isEmpty()) {
-      return;
-    }
-
-    final List<IOException> exceptions =
-        Collections.synchronizedList(new ArrayList<>());
-    datanodes.parallelStream().forEach(d -> {
-      final RaftPeer p = RatisHelper.toRaftPeer(d);
-      try (RaftClient client = RatisHelper
-          .newRaftClient(rpcType, p, retryPolicy)) {
-        rpc.accept(client, p);
-      } catch (IOException ioe) {
-        exceptions.add(
-            new IOException("Failed invoke Ratis rpc " + rpc + " for " + d,
-                ioe));
-      }
-    });
-    if (!exceptions.isEmpty()) {
-      throw MultipleIOException.createIOException(exceptions);
-    }
-  }
-
   @Override
   @Override
   public Pipeline getPipeline() {
   public Pipeline getPipeline() {
     return pipeline;
     return pipeline;

+ 2 - 1
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java

@@ -157,7 +157,8 @@ public class ContainerOperationClient implements ScmClient {
     //    ObjectStageChangeRequestProto.Op.create,
     //    ObjectStageChangeRequestProto.Op.create,
     //    ObjectStageChangeRequestProto.Stage.begin);
     //    ObjectStageChangeRequestProto.Stage.begin);
 
 
-    client.createPipeline();
+    // client.createPipeline();
+    // TODO: Use PipelineManager to createPipeline
 
 
     //storageContainerLocationClient.notifyObjectStageChange(
     //storageContainerLocationClient.notifyObjectStageChange(
     //    ObjectStageChangeRequestProto.Type.pipeline,
     //    ObjectStageChangeRequestProto.Type.pipeline,

+ 4 - 4
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java

@@ -289,11 +289,11 @@ public final class ScmConfigKeys {
   public static final String
   public static final String
       OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT = "60s";
       OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT = "60s";
 
 
-  public static final String OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT =
-      "ozone.scm.pipeline.creation.lease.timeout";
+  public static final String OZONE_SCM_PIPELINE_DESTROY_TIMEOUT =
+      "ozone.scm.pipeline.destroy.timeout";
 
 
-  public static final String
-      OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT = "60s";
+  public static final String OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT =
+      "300s";
 
 
   public static final String OZONE_SCM_BLOCK_DELETION_MAX_RETRY =
   public static final String OZONE_SCM_BLOCK_DELETION_MAX_RETRY =
       "ozone.scm.block.deletion.max.retry";
       "ozone.scm.block.deletion.max.retry";

+ 0 - 11
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java

@@ -118,17 +118,6 @@ public abstract class XceiverClientSpi implements Closeable {
       sendCommandAsync(ContainerCommandRequestProto request)
       sendCommandAsync(ContainerCommandRequestProto request)
       throws IOException, ExecutionException, InterruptedException;
       throws IOException, ExecutionException, InterruptedException;
 
 
-  /**
-   * Create a pipeline.
-   */
-  public abstract void createPipeline() throws IOException;
-
-  /**
-   * Destroy a pipeline.
-   * @throws IOException
-   */
-  public abstract void destroyPipeline() throws IOException;
-
   /**
   /**
    * Returns pipeline Type.
    * Returns pipeline Type.
    *
    *

+ 6 - 7
hadoop-hdds/common/src/main/resources/ozone-default.xml

@@ -821,6 +821,8 @@
       OM/SCM eventually. So a 30 second HB seems to work. This assumes that
       OM/SCM eventually. So a 30 second HB seems to work. This assumes that
       replication strategy used is Ratis if not, this value should be set to
       replication strategy used is Ratis if not, this value should be set to
       something smaller like 3 seconds.
       something smaller like 3 seconds.
+      ozone.scm.pipeline.close.timeout should also be adjusted accordingly,
+      if the default value for this config is not used.
     </description>
     </description>
   </property>
   </property>
   <property>
   <property>
@@ -1183,15 +1185,12 @@
       postfix (ns,ms,s,m,h,d)</description>
       postfix (ns,ms,s,m,h,d)</description>
   </property>
   </property>
   <property>
   <property>
-    <name>ozone.scm.pipeline.creation.lease.timeout</name>
-    <value>60s</value>
+    <name>ozone.scm.pipeline.destroy.timeout</name>
+    <value>300s</value>
     <tag>OZONE, SCM, PIPELINE</tag>
     <tag>OZONE, SCM, PIPELINE</tag>
     <description>
     <description>
-      Pipeline creation timeout in milliseconds to be used by SCM. When
-      BEGIN_CREATE event happens the pipeline is moved from ALLOCATED to
-      CREATING state, SCM will now wait for the configured amount of time
-      to get COMPLETE_CREATE event if it doesn't receive it will move the
-      pipeline to DELETING.
+      Once a pipeline is closed, SCM should wait for the above configured time
+      before destroying a pipeline.
     </description>
     </description>
   </property>
   </property>
 
 

+ 11 - 3
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java

@@ -18,9 +18,13 @@
 
 
 package org.apache.hadoop.hdds.scm.node;
 package org.apache.hadoop.hdds.scm.node;
 
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
@@ -38,21 +42,25 @@ public class StaleNodeHandler implements EventHandler<DatanodeDetails> {
 
 
   private final NodeManager nodeManager;
   private final NodeManager nodeManager;
   private final PipelineManager pipelineManager;
   private final PipelineManager pipelineManager;
+  private final Configuration conf;
 
 
   public StaleNodeHandler(NodeManager nodeManager,
   public StaleNodeHandler(NodeManager nodeManager,
-      PipelineManager pipelineManager) {
+      PipelineManager pipelineManager, OzoneConfiguration conf) {
     this.nodeManager = nodeManager;
     this.nodeManager = nodeManager;
     this.pipelineManager = pipelineManager;
     this.pipelineManager = pipelineManager;
+    this.conf = conf;
   }
   }
 
 
   @Override
   @Override
   public void onMessage(DatanodeDetails datanodeDetails,
   public void onMessage(DatanodeDetails datanodeDetails,
-                        EventPublisher publisher) {
+      EventPublisher publisher) {
     Set<PipelineID> pipelineIds =
     Set<PipelineID> pipelineIds =
         nodeManager.getPipelines(datanodeDetails);
         nodeManager.getPipelines(datanodeDetails);
     for (PipelineID pipelineID : pipelineIds) {
     for (PipelineID pipelineID : pipelineIds) {
       try {
       try {
-        pipelineManager.finalizePipeline(pipelineID);
+        Pipeline pipeline = pipelineManager.getPipeline(pipelineID);
+        RatisPipelineUtils
+            .finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, true);
       } catch (IOException e) {
       } catch (IOException e) {
         LOG.info("Could not finalize pipeline={} for dn={}", pipelineID,
         LOG.info("Could not finalize pipeline={} for dn={}", pipelineID,
             datanodeDetails);
             datanodeDetails);

+ 16 - 10
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java

@@ -17,10 +17,10 @@
 
 
 package org.apache.hadoop.hdds.scm.pipeline;
 package org.apache.hadoop.hdds.scm.pipeline;
 
 
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineAction;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
-    .PipelineActionsFromDatanode;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
 
 
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -32,16 +32,19 @@ import java.io.IOException;
 /**
 /**
  * Handles pipeline actions from datanode.
  * Handles pipeline actions from datanode.
  */
  */
-public class PipelineActionHandler implements
-    EventHandler<PipelineActionsFromDatanode> {
+public class PipelineActionHandler
+    implements EventHandler<PipelineActionsFromDatanode> {
 
 
-  public static final Logger LOG = LoggerFactory.getLogger(
-      PipelineActionHandler.class);
+  public static final Logger LOG =
+      LoggerFactory.getLogger(PipelineActionHandler.class);
 
 
   private final PipelineManager pipelineManager;
   private final PipelineManager pipelineManager;
+  private final Configuration ozoneConf;
 
 
-  public PipelineActionHandler(PipelineManager pipelineManager) {
+  public PipelineActionHandler(PipelineManager pipelineManager,
+      OzoneConfiguration conf) {
     this.pipelineManager = pipelineManager;
     this.pipelineManager = pipelineManager;
+    this.ozoneConf = conf;
   }
   }
 
 
   @Override
   @Override
@@ -53,7 +56,10 @@ public class PipelineActionHandler implements
         try {
         try {
           pipelineID = PipelineID.
           pipelineID = PipelineID.
               getFromProtobuf(action.getClosePipeline().getPipelineID());
               getFromProtobuf(action.getClosePipeline().getPipelineID());
-          pipelineManager.finalizePipeline(pipelineID);
+          Pipeline pipeline = pipelineManager.getPipeline(pipelineID);
+          RatisPipelineUtils
+              .finalizeAndDestroyPipeline(pipelineManager, pipeline, ozoneConf,
+                  true);
         } catch (IOException ioe) {
         } catch (IOException ioe) {
           LOG.error("Could not execute pipeline action={} pipeline={} {}",
           LOG.error("Could not execute pipeline action={} pipeline={} {}",
               action, pipelineID, ioe);
               action, pipelineID, ioe);

+ 3 - 9
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java

@@ -25,7 +25,6 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.PipelineReport;
     .StorageContainerDatanodeProtocolProtos.PipelineReport;
 import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
     .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
-import org.apache.hadoop.hdds.scm.XceiverClientRatis;
 import org.apache.hadoop.hdds.scm.server
 import org.apache.hadoop.hdds.scm.server
     .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
     .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventHandler;
@@ -76,11 +75,11 @@ public class PipelineReportHandler implements
   private void processPipelineReport(PipelineReport report, DatanodeDetails dn)
   private void processPipelineReport(PipelineReport report, DatanodeDetails dn)
       throws IOException {
       throws IOException {
     PipelineID pipelineID = PipelineID.getFromProtobuf(report.getPipelineID());
     PipelineID pipelineID = PipelineID.getFromProtobuf(report.getPipelineID());
-    Pipeline pipeline = null;
+    Pipeline pipeline;
     try {
     try {
       pipeline = pipelineManager.getPipeline(pipelineID);
       pipeline = pipelineManager.getPipeline(pipelineID);
     } catch (PipelineNotFoundException e) {
     } catch (PipelineNotFoundException e) {
-      //TODO: introduce per datanode command for pipeline destroy
+      RatisPipelineUtils.destroyPipeline(dn, pipelineID, conf);
       return;
       return;
     }
     }
 
 
@@ -93,14 +92,9 @@ public class PipelineReportHandler implements
     } else if (pipeline.isClosed()) {
     } else if (pipeline.isClosed()) {
       int numContainers = pipelineManager.getNumberOfContainers(pipelineID);
       int numContainers = pipelineManager.getNumberOfContainers(pipelineID);
       if (numContainers == 0) {
       if (numContainers == 0) {
-        // remove the pipeline from the pipeline manager
-        pipelineManager.removePipeline(pipelineID);
         // since all the containers have been closed the pipeline can be
         // since all the containers have been closed the pipeline can be
         // destroyed
         // destroyed
-        try (XceiverClientRatis client =
-            XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
-          client.destroyPipeline();
-        }
+        RatisPipelineUtils.destroyPipeline(pipelineManager, pipeline, conf);
       }
       }
     } else {
     } else {
       // In OPEN state case just report the datanode
       // In OPEN state case just report the datanode

+ 0 - 6
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java

@@ -242,12 +242,6 @@ class PipelineStateMap {
           String.format("Pipeline with %s is not yet closed", pipelineID));
           String.format("Pipeline with %s is not yet closed", pipelineID));
     }
     }
 
 
-    Set<ContainerID> containerIDs = pipeline2container.get(pipelineID);
-    if (containerIDs.size() != 0) {
-      throw new IOException(
-          String.format("Pipeline with %s is not empty", pipelineID));
-    }
-
     pipelineMap.remove(pipelineID);
     pipelineMap.remove(pipelineID);
     pipeline2container.remove(pipelineID);
     pipeline2container.remove(pipelineID);
     return pipeline;
     return pipeline;

+ 2 - 8
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java

@@ -24,7 +24,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.XceiverClientRatis;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -133,12 +132,7 @@ public class RatisPipelineProvider implements PipelineProvider {
         .build();
         .build();
   }
   }
 
 
-  private void initializePipeline(Pipeline pipeline)
-      throws IOException {
-    // TODO: remove old code in XceiverClientRatis#newXceiverClientRatis
-    try (XceiverClientRatis client =
-        XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
-      client.createPipeline();
-    }
+  private void initializePipeline(Pipeline pipeline) throws IOException {
+    RatisPipelineUtils.createPipeline(pipeline, conf);
   }
   }
 }
 }

+ 176 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java

@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.ratis.RatisHelper;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.util.CheckedBiConsumer;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Utility class for Ratis pipelines. Contains methods to create and destroy
+ * ratis pipelines.
+ */
+public final class RatisPipelineUtils {
+
+  private static TimeoutScheduler timeoutScheduler =
+      TimeoutScheduler.newInstance(1);
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RatisPipelineUtils.class);
+
+  private RatisPipelineUtils() {
+  }
+
+  /**
+   * Sends ratis command to create pipeline on all the datanodes.
+   * @param pipeline - Pipeline to be created
+   * @param ozoneConf - Ozone Confinuration
+   * @throws IOException if creation fails
+   */
+  public static void createPipeline(Pipeline pipeline, Configuration ozoneConf)
+      throws IOException {
+    final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
+    LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group);
+    callRatisRpc(pipeline.getNodes(), ozoneConf,
+        (raftClient, peer) -> raftClient.groupAdd(group, peer.getId()));
+  }
+
+  /**
+   * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
+   * the datanodes.
+   * @param pipelineManager - SCM pipeline manager
+   * @param pipeline - Pipeline to be destroyed
+   * @param ozoneConf - Ozone configuration
+   * @throws IOException
+   */
+  static void destroyPipeline(PipelineManager pipelineManager,
+      Pipeline pipeline, Configuration ozoneConf) throws IOException {
+    final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
+    LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group);
+    // remove the pipeline from the pipeline manager
+    pipelineManager.removePipeline(pipeline.getId());
+    for (DatanodeDetails dn : pipeline.getNodes()) {
+      destroyPipeline(dn, pipeline.getId(), ozoneConf);
+    }
+  }
+
+  /**
+   * Finalizes pipeline in the SCM. Removes pipeline and sends ratis command to
+   * destroy pipeline on the datanodes immediately or after timeout based on the
+   * value of onTimeout parameter.
+   * @param pipelineManager - SCM pipeline manager
+   * @param pipeline - Pipeline to be destroyed
+   * @param ozoneConf - Ozone Configuration
+   * @param onTimeout - if true pipeline is removed and destroyed on datanodes
+   *                  after timeout
+   * @throws IOException
+   */
+  public static void finalizeAndDestroyPipeline(PipelineManager pipelineManager,
+      Pipeline pipeline, Configuration ozoneConf, boolean onTimeout)
+      throws IOException {
+    final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
+    LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group);
+    pipelineManager.finalizePipeline(pipeline.getId());
+    if (onTimeout) {
+      long pipelineDestroyTimeoutInMillis = ozoneConf
+          .getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
+              ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT,
+              TimeUnit.MILLISECONDS);
+      TimeDuration timeoutDuration = TimeDuration
+          .valueOf(pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS);
+      timeoutScheduler.onTimeout(timeoutDuration,
+          () -> destroyPipeline(pipelineManager, pipeline, ozoneConf), LOG,
+          () -> String.format("Destroy pipeline failed for pipeline:%s with %s",
+              pipeline.getId(), group));
+    } else {
+      destroyPipeline(pipelineManager, pipeline, ozoneConf);
+    }
+  }
+
+  /**
+   * Sends ratis command to destroy pipeline on the given datanode.
+   * @param dn - Datanode on which pipeline needs to be destroyed
+   * @param pipelineID - ID of pipeline to be destroyed
+   * @param ozoneConf - Ozone configuration
+   * @throws IOException
+   */
+  static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID,
+      Configuration ozoneConf) throws IOException {
+    final String rpcType = ozoneConf
+        .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
+            ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
+    final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
+    final RaftPeer p = RatisHelper.toRaftPeer(dn);
+    RaftClient client = RatisHelper
+        .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
+            retryPolicy);
+    client
+        .groupRemove(RaftGroupId.valueOf(pipelineID.getId()), true, p.getId());
+  }
+
+  private static void callRatisRpc(List<DatanodeDetails> datanodes,
+      Configuration ozoneConf,
+      CheckedBiConsumer<RaftClient, RaftPeer, IOException> rpc)
+      throws IOException {
+    if (datanodes.isEmpty()) {
+      return;
+    }
+
+    final String rpcType = ozoneConf
+        .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
+            ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
+    final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
+    final List<IOException> exceptions =
+        Collections.synchronizedList(new ArrayList<>());
+
+    datanodes.parallelStream().forEach(d -> {
+      final RaftPeer p = RatisHelper.toRaftPeer(d);
+      try (RaftClient client = RatisHelper
+          .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
+              retryPolicy)) {
+        rpc.accept(client, p);
+      } catch (IOException ioe) {
+        exceptions.add(
+            new IOException("Failed invoke Ratis rpc " + rpc + " for " + d,
+                ioe));
+      }
+    });
+    if (!exceptions.isEmpty()) {
+      throw MultipleIOException.createIOException(exceptions);
+    }
+  }
+}

+ 2 - 2
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java

@@ -223,7 +223,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
 
 
     NewNodeHandler newNodeHandler = new NewNodeHandler();
     NewNodeHandler newNodeHandler = new NewNodeHandler();
     StaleNodeHandler staleNodeHandler =
     StaleNodeHandler staleNodeHandler =
-        new StaleNodeHandler(scmNodeManager, pipelineManager);
+        new StaleNodeHandler(scmNodeManager, pipelineManager, conf);
     DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager,
     DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager,
         containerManager);
         containerManager);
     ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
     ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
@@ -239,7 +239,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
             pipelineManager, containerManager);
             pipelineManager, containerManager);
 
 
     PipelineActionHandler pipelineActionHandler =
     PipelineActionHandler pipelineActionHandler =
-        new PipelineActionHandler(pipelineManager);
+        new PipelineActionHandler(pipelineManager, conf);
 
 
     long watcherTimeout =
     long watcherTimeout =
         conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,
         conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,

+ 25 - 5
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java

@@ -17,12 +17,17 @@
 package org.apache.hadoop.hdds.scm;
 package org.apache.hadoop.hdds.scm;
 
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineActionsProto;
 import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
     .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.PipelineReport;
     .StorageContainerDatanodeProtocolProtos.PipelineReport;
 import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
 import org.apache.hadoop.hdds.scm.server
 import org.apache.hadoop.hdds.scm.server
     .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
     .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -305,19 +310,34 @@ public final class TestUtils {
     return PipelineReportsProto.newBuilder().build();
     return PipelineReportsProto.newBuilder().build();
   }
   }
 
 
-  public static PipelineReportFromDatanode getRandomPipelineReportFromDatanode(
-      DatanodeDetails dn,
-      org.apache.hadoop.hdds.scm.pipeline.PipelineID... pipelineIDs) {
+  public static PipelineReportFromDatanode getPipelineReportFromDatanode(
+      DatanodeDetails dn, PipelineID... pipelineIDs) {
     PipelineReportsProto.Builder reportBuilder =
     PipelineReportsProto.Builder reportBuilder =
         PipelineReportsProto.newBuilder();
         PipelineReportsProto.newBuilder();
-    for (org.apache.hadoop.hdds.scm.pipeline.PipelineID pipelineID :
-        pipelineIDs) {
+    for (PipelineID pipelineID : pipelineIDs) {
       reportBuilder.addPipelineReport(
       reportBuilder.addPipelineReport(
           PipelineReport.newBuilder().setPipelineID(pipelineID.getProtobuf()));
           PipelineReport.newBuilder().setPipelineID(pipelineID.getProtobuf()));
     }
     }
     return new PipelineReportFromDatanode(dn, reportBuilder.build());
     return new PipelineReportFromDatanode(dn, reportBuilder.build());
   }
   }
 
 
+  public static PipelineActionsFromDatanode getPipelineActionFromDatanode(
+      DatanodeDetails dn, PipelineID... pipelineIDs) {
+    PipelineActionsProto.Builder actionsProtoBuilder =
+        PipelineActionsProto.newBuilder();
+    for (PipelineID pipelineID : pipelineIDs) {
+      ClosePipelineInfo closePipelineInfo =
+          ClosePipelineInfo.newBuilder().setPipelineID(pipelineID.getProtobuf())
+              .setReason(ClosePipelineInfo.Reason.PIPELINE_FAILED)
+              .setDetailedReason("").build();
+      actionsProtoBuilder.addPipelineActions(PipelineAction.newBuilder()
+          .setClosePipeline(closePipelineInfo)
+          .setAction(PipelineAction.Action.CLOSE)
+          .build());
+    }
+    return new PipelineActionsFromDatanode(dn, actionsProtoBuilder.build());
+  }
+
   /**
   /**
    * Creates container report with the given ContainerInfo(s).
    * Creates container report with the given ContainerInfo(s).
    *
    *

+ 106 - 32
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java

@@ -17,22 +17,32 @@
  */
  */
 package org.apache.hadoop.hdds.scm.pipeline;
 package org.apache.hadoop.hdds.scm.pipeline;
 
 
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.junit.AfterClass;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.util.List;
 import java.util.Set;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeoutException;
 
 
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
@@ -43,35 +53,36 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.R
  */
  */
 public class TestPipelineClose {
 public class TestPipelineClose {
 
 
-  private static MiniOzoneCluster cluster;
-  private static OzoneConfiguration conf;
-  private static StorageContainerManager scm;
-  private static ContainerWithPipeline ratisContainer1;
-  private static ContainerWithPipeline ratisContainer2;
-  private static ContainerManager containerManager;
-  private static PipelineManager pipelineManager;
+  private MiniOzoneCluster cluster;
+  private OzoneConfiguration conf;
+  private StorageContainerManager scm;
+  private ContainerWithPipeline ratisContainer;
+  private ContainerManager containerManager;
+  private PipelineManager pipelineManager;
 
 
+  private long pipelineDestroyTimeoutInMillis;
   /**
   /**
    * Create a MiniDFSCluster for testing.
    * Create a MiniDFSCluster for testing.
    *
    *
    * @throws IOException
    * @throws IOException
    */
    */
-  @BeforeClass
-  public static void init() throws Exception {
+  @Before
+  public void init() throws Exception {
     conf = new OzoneConfiguration();
     conf = new OzoneConfiguration();
-    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(6).build();
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
+    conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 1000,
+        TimeUnit.MILLISECONDS);
+    pipelineDestroyTimeoutInMillis = 5000;
+    conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
+        pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS);
     cluster.waitForClusterToBeReady();
     cluster.waitForClusterToBeReady();
     scm = cluster.getStorageContainerManager();
     scm = cluster.getStorageContainerManager();
     containerManager = scm.getContainerManager();
     containerManager = scm.getContainerManager();
     pipelineManager = scm.getPipelineManager();
     pipelineManager = scm.getPipelineManager();
-    ContainerInfo containerInfo1 = containerManager
+    ContainerInfo containerInfo = containerManager
         .allocateContainer(RATIS, THREE, "testOwner");
         .allocateContainer(RATIS, THREE, "testOwner");
-    ratisContainer1 = new ContainerWithPipeline(containerInfo1,
-        pipelineManager.getPipeline(containerInfo1.getPipelineID()));
-    ContainerInfo containerInfo2 = containerManager
-        .allocateContainer(RATIS, THREE, "testOwner");
-    ratisContainer2 = new ContainerWithPipeline(containerInfo2,
-        pipelineManager.getPipeline(containerInfo2.getPipelineID()));
+    ratisContainer = new ContainerWithPipeline(containerInfo,
+        pipelineManager.getPipeline(containerInfo.getPipelineID()));
     pipelineManager = scm.getPipelineManager();
     pipelineManager = scm.getPipelineManager();
     // At this stage, there should be 2 pipeline one with 1 open container each.
     // At this stage, there should be 2 pipeline one with 1 open container each.
     // Try closing the both the pipelines, one with a closed container and
     // Try closing the both the pipelines, one with a closed container and
@@ -81,8 +92,8 @@ public class TestPipelineClose {
   /**
   /**
    * Shutdown MiniDFSCluster.
    * Shutdown MiniDFSCluster.
    */
    */
-  @AfterClass
-  public static void shutdown() {
+  @After
+  public void shutdown() {
     if (cluster != null) {
     if (cluster != null) {
       cluster.shutdown();
       cluster.shutdown();
     }
     }
@@ -91,9 +102,9 @@ public class TestPipelineClose {
   @Test
   @Test
   public void testPipelineCloseWithClosedContainer() throws IOException {
   public void testPipelineCloseWithClosedContainer() throws IOException {
     Set<ContainerID> set = pipelineManager
     Set<ContainerID> set = pipelineManager
-        .getContainersInPipeline(ratisContainer1.getPipeline().getId());
+        .getContainersInPipeline(ratisContainer.getPipeline().getId());
 
 
-    ContainerID cId = ratisContainer1.getContainerInfo().containerID();
+    ContainerID cId = ratisContainer.getContainerInfo().containerID();
     Assert.assertEquals(1, set.size());
     Assert.assertEquals(1, set.size());
     set.forEach(containerID -> Assert.assertEquals(containerID, cId));
     set.forEach(containerID -> Assert.assertEquals(containerID, cId));
 
 
@@ -105,16 +116,16 @@ public class TestPipelineClose {
         .updateContainerState(cId, HddsProtos.LifeCycleEvent.CLOSE);
         .updateContainerState(cId, HddsProtos.LifeCycleEvent.CLOSE);
 
 
     Set<ContainerID> setClosed = pipelineManager
     Set<ContainerID> setClosed = pipelineManager
-        .getContainersInPipeline(ratisContainer1.getPipeline().getId());
+        .getContainersInPipeline(ratisContainer.getPipeline().getId());
     Assert.assertEquals(0, setClosed.size());
     Assert.assertEquals(0, setClosed.size());
 
 
-    pipelineManager.finalizePipeline(ratisContainer1.getPipeline().getId());
+    pipelineManager.finalizePipeline(ratisContainer.getPipeline().getId());
     Pipeline pipeline1 = pipelineManager
     Pipeline pipeline1 = pipelineManager
-        .getPipeline(ratisContainer1.getPipeline().getId());
+        .getPipeline(ratisContainer.getPipeline().getId());
     Assert.assertEquals(Pipeline.PipelineState.CLOSED,
     Assert.assertEquals(Pipeline.PipelineState.CLOSED,
         pipeline1.getPipelineState());
         pipeline1.getPipelineState());
     pipelineManager.removePipeline(pipeline1.getId());
     pipelineManager.removePipeline(pipeline1.getId());
-    for (DatanodeDetails dn : ratisContainer1.getPipeline().getNodes()) {
+    for (DatanodeDetails dn : ratisContainer.getPipeline().getNodes()) {
       // Assert that the pipeline has been removed from Node2PipelineMap as well
       // Assert that the pipeline has been removed from Node2PipelineMap as well
       Assert.assertEquals(scm.getScmNodeManager().getPipelines(
       Assert.assertEquals(scm.getScmNodeManager().getPipelines(
           dn).size(), 0);
           dn).size(), 0);
@@ -125,17 +136,80 @@ public class TestPipelineClose {
   public void testPipelineCloseWithOpenContainer() throws IOException,
   public void testPipelineCloseWithOpenContainer() throws IOException,
       TimeoutException, InterruptedException {
       TimeoutException, InterruptedException {
     Set<ContainerID> setOpen = pipelineManager.getContainersInPipeline(
     Set<ContainerID> setOpen = pipelineManager.getContainersInPipeline(
-        ratisContainer2.getPipeline().getId());
+        ratisContainer.getPipeline().getId());
     Assert.assertEquals(1, setOpen.size());
     Assert.assertEquals(1, setOpen.size());
 
 
-    ContainerID cId2 = ratisContainer2.getContainerInfo().containerID();
-    pipelineManager.finalizePipeline(ratisContainer2.getPipeline().getId());
+    ContainerID cId2 = ratisContainer.getContainerInfo().containerID();
+    pipelineManager.finalizePipeline(ratisContainer.getPipeline().getId());
     Assert.assertEquals(Pipeline.PipelineState.CLOSED,
     Assert.assertEquals(Pipeline.PipelineState.CLOSED,
         pipelineManager.getPipeline(
         pipelineManager.getPipeline(
-            ratisContainer2.getPipeline().getId()).getPipelineState());
+            ratisContainer.getPipeline().getId()).getPipelineState());
     Pipeline pipeline2 = pipelineManager
     Pipeline pipeline2 = pipelineManager
-        .getPipeline(ratisContainer2.getPipeline().getId());
+        .getPipeline(ratisContainer.getPipeline().getId());
     Assert.assertEquals(Pipeline.PipelineState.CLOSED,
     Assert.assertEquals(Pipeline.PipelineState.CLOSED,
         pipeline2.getPipelineState());
         pipeline2.getPipelineState());
   }
   }
+
+  @Test
+  public void testPipelineCloseWithPipelineAction() throws Exception {
+    List<DatanodeDetails> dns = ratisContainer.getPipeline().getNodes();
+    PipelineActionsFromDatanode
+        pipelineActionsFromDatanode = TestUtils
+        .getPipelineActionFromDatanode(dns.get(0),
+            ratisContainer.getPipeline().getId());
+    // send closing action for pipeline
+    PipelineActionHandler pipelineActionHandler =
+        new PipelineActionHandler(pipelineManager, conf);
+    pipelineActionHandler
+        .onMessage(pipelineActionsFromDatanode, new EventQueue());
+    Thread.sleep((int) (pipelineDestroyTimeoutInMillis * 1.2));
+    OzoneContainer ozoneContainer =
+        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
+            .getContainer();
+    List<PipelineReport> pipelineReports =
+        ozoneContainer.getPipelineReport().getPipelineReportList();
+    for (PipelineReport pipelineReport : pipelineReports) {
+      // ensure the pipeline is not reported by any dn
+      Assert.assertNotEquals(
+          PipelineID.getFromProtobuf(pipelineReport.getPipelineID()),
+          ratisContainer.getPipeline().getId());
+    }
+
+    try {
+      pipelineManager.getPipeline(ratisContainer.getPipeline().getId());
+      Assert.fail("Pipeline should not exist in SCM");
+    } catch (PipelineNotFoundException e) {
+    }
+  }
+
+  @Test
+  public void testPipelineCloseWithPipelineReport() throws IOException {
+    Pipeline pipeline = ratisContainer.getPipeline();
+    pipelineManager.finalizePipeline(pipeline.getId());
+    // remove pipeline from SCM
+    pipelineManager.removePipeline(pipeline.getId());
+
+    for (DatanodeDetails dn : pipeline.getNodes()) {
+      PipelineReportFromDatanode pipelineReport =
+          TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId());
+      PipelineReportHandler pipelineReportHandler =
+          new PipelineReportHandler(pipelineManager, conf);
+      // on receiving pipeline report for the pipeline, pipeline report handler
+      // should destroy the pipeline for the dn
+      pipelineReportHandler.onMessage(pipelineReport, new EventQueue());
+    }
+
+    OzoneContainer ozoneContainer =
+        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
+            .getContainer();
+    List<PipelineReport> pipelineReports =
+        ozoneContainer.getPipelineReport().getPipelineReportList();
+    for (PipelineReport pipelineReport : pipelineReports) {
+      // pipeline should not be reported by any dn
+      Assert.assertNotEquals(
+          PipelineID.getFromProtobuf(pipelineReport.getPipelineID()),
+          ratisContainer.getPipeline().getId());
+    }
+  }
+
 }
 }

+ 0 - 14
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java

@@ -323,15 +323,6 @@ public class TestPipelineStateManager {
 
 
     // close the pipeline
     // close the pipeline
     stateManager.finalizePipeline(pipeline.getId());
     stateManager.finalizePipeline(pipeline.getId());
-
-    try {
-      stateManager.removePipeline(pipeline.getId());
-      Assert.fail("Pipeline should not have been removed");
-    } catch (IOException e) {
-      // can not remove a pipeline which already has containers
-      Assert.assertTrue(e.getMessage().contains("not empty"));
-    }
-
     // remove containers and then remove the pipeline
     // remove containers and then remove the pipeline
     removePipeline(pipeline);
     removePipeline(pipeline);
   }
   }
@@ -423,11 +414,6 @@ public class TestPipelineStateManager {
 
 
   private void removePipeline(Pipeline pipeline) throws IOException {
   private void removePipeline(Pipeline pipeline) throws IOException {
     stateManager.finalizePipeline(pipeline.getId());
     stateManager.finalizePipeline(pipeline.getId());
-    Set<ContainerID> containerIDs =
-        stateManager.getContainers(pipeline.getId());
-    for (ContainerID containerID : containerIDs) {
-      stateManager.removeContainerFromPipeline(pipeline.getId(), containerID);
-    }
     stateManager.removePipeline(pipeline.getId());
     stateManager.removePipeline(pipeline.getId());
   }
   }
 }
 }

+ 2 - 2
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java

@@ -148,7 +148,7 @@ public class TestSCMPipelineManager {
         new PipelineReportHandler(pipelineManager, conf);
         new PipelineReportHandler(pipelineManager, conf);
     for (DatanodeDetails dn: pipeline.getNodes()) {
     for (DatanodeDetails dn: pipeline.getNodes()) {
       PipelineReportFromDatanode pipelineReportFromDatanode =
       PipelineReportFromDatanode pipelineReportFromDatanode =
-          TestUtils.getRandomPipelineReportFromDatanode(dn, pipeline.getId());
+          TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId());
       // pipeline is not healthy until all dns report
       // pipeline is not healthy until all dns report
       Assert.assertFalse(
       Assert.assertFalse(
           pipelineManager.getPipeline(pipeline.getId()).isHealthy());
           pipelineManager.getPipeline(pipeline.getId()).isHealthy());
@@ -168,7 +168,7 @@ public class TestSCMPipelineManager {
 
 
     for (DatanodeDetails dn: pipeline.getNodes()) {
     for (DatanodeDetails dn: pipeline.getNodes()) {
       PipelineReportFromDatanode pipelineReportFromDatanode =
       PipelineReportFromDatanode pipelineReportFromDatanode =
-          TestUtils.getRandomPipelineReportFromDatanode(dn, pipeline.getId());
+          TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId());
       // pipeline report for a closed pipeline should destroy the pipeline
       // pipeline report for a closed pipeline should destroy the pipeline
       // and remove it from the pipeline manager
       // and remove it from the pipeline manager
       pipelineReportHandler
       pipelineReportHandler

+ 109 - 0
hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java

@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.freon;
+
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests Freon with Pipeline destroy.
+ */
+public class TestFreonWithPipelineDestroy {
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true
+   *
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    cluster = MiniOzoneCluster.newBuilder(conf)
+      .setHbProcessorInterval(1000)
+      .setHbInterval(1000)
+      .setNumDatanodes(3)
+      .build();
+    cluster.waitForClusterToBeReady();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testRestart() throws Exception {
+    startFreon();
+    destroyPipeline();
+    startFreon();
+  }
+
+  private void startFreon() throws Exception {
+    RandomKeyGenerator randomKeyGenerator =
+        new RandomKeyGenerator((OzoneConfiguration) cluster.getConf());
+    randomKeyGenerator.setNumOfVolumes(1);
+    randomKeyGenerator.setNumOfBuckets(1);
+    randomKeyGenerator.setNumOfKeys(1);
+    randomKeyGenerator.setType(ReplicationType.RATIS);
+    randomKeyGenerator.setFactor(ReplicationFactor.THREE);
+    randomKeyGenerator.setKeySize(20971520);
+    randomKeyGenerator.setValidateWrites(true);
+    randomKeyGenerator.call();
+    Assert.assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated());
+    Assert.assertEquals(1, randomKeyGenerator.getNumberOfBucketsCreated());
+    Assert.assertEquals(1, randomKeyGenerator.getNumberOfKeysAdded());
+    Assert.assertEquals(0,
+        randomKeyGenerator.getUnsuccessfulValidationCount());
+  }
+
+  private void destroyPipeline() throws Exception {
+    XceiverServerSpi server =
+        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().
+            getContainer().getWriteChannel();
+    StorageContainerDatanodeProtocolProtos.PipelineReport report =
+        server.getPipelineReport().get(0);
+    PipelineID id = PipelineID.getFromProtobuf(report.getPipelineID());
+    PipelineManager pipelineManager =
+        cluster.getStorageContainerManager().getPipelineManager();
+    Pipeline pipeline = pipelineManager.getPipeline(id);
+    RatisPipelineUtils
+        .finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, false);
+  }
+}