Преглед изворни кода

HDDS-1128. Create stateful manager class for the pipeline creation scheduling.

Signed-off-by: Nanda kumar <nanda@apache.org>
Lokesh Jain пре 6 година
родитељ
комит
0d62753da9
23 измењених фајлова са 290 додато и 293 уклоњено
  1. 10 7
      hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/Scheduler.java
  2. 1 3
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/SCMChillModeManager.java
  3. 1 3
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
  4. 1 2
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java
  5. 1 3
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
  6. 110 0
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
  7. 1 3
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
  8. 0 6
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
  9. 6 3
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
  10. 0 1
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
  11. 0 9
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
  12. 0 17
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
  13. 6 117
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
  14. 109 27
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
  15. 0 5
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
  16. 1 3
      hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
  17. 1 3
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
  18. 6 8
      hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestSCMChillModeManager.java
  19. 2 2
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
  20. 18 56
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
  21. 11 3
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineUtils.java
  22. 4 9
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
  23. 1 3
      hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java

+ 10 - 7
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/Scheduler.java

@@ -87,14 +87,17 @@ public class Scheduler {
    * yet executed are also cancelled. For the executing tasks the scheduler
    * waits 60 seconds for completion.
    */
-  public void close() {
+  public synchronized void close() {
     isClosed = true;
-    scheduler.shutdownNow();
-    try {
-      scheduler.awaitTermination(60, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      LOG.info(threadName + " interrupted while waiting for task completion {}",
-          e);
+    if (scheduler != null) {
+      scheduler.shutdownNow();
+      try {
+        scheduler.awaitTermination(60, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        LOG.info(
+            threadName + " interrupted while waiting for task completion {}",
+            e);
+      }
     }
     scheduler = null;
   }

+ 1 - 3
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/SCMChillModeManager.java

@@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.slf4j.Logger;
@@ -147,8 +146,7 @@ public class SCMChillModeManager {
     emitChillModeStatus();
     // TODO: #CLUTIL if we reenter chill mode the fixed interval pipeline
     // creation job needs to stop
-    RatisPipelineUtils
-        .scheduleFixedIntervalPipelineCreator(pipelineManager, config);
+    pipelineManager.startPipelineCreator();
   }
 
   public boolean getInChillMode() {

+ 1 - 3
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java

@@ -21,7 +21,6 @@ package org.apache.hadoop.hdds.scm.node;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 
@@ -41,7 +40,6 @@ public class NewNodeHandler implements EventHandler<DatanodeDetails> {
   @Override
   public void onMessage(DatanodeDetails datanodeDetails,
                         EventPublisher publisher) {
-    RatisPipelineUtils
-        .triggerPipelineCreation(pipelineManager, conf, 0);
+    pipelineManager.triggerPipelineCreation();
   }
 }

+ 1 - 2
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java

@@ -21,7 +21,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 
@@ -43,6 +42,6 @@ public class NonHealthyToHealthyNodeHandler
   @Override
   public void onMessage(DatanodeDetails datanodeDetails,
       EventPublisher publisher) {
-    RatisPipelineUtils.triggerPipelineCreation(pipelineManager, conf, 0);
+    pipelineManager.triggerPipelineCreation();
   }
 }

+ 1 - 3
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java

@@ -24,7 +24,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.slf4j.Logger;
@@ -61,8 +60,7 @@ public class StaleNodeHandler implements EventHandler<DatanodeDetails> {
     for (PipelineID pipelineID : pipelineIds) {
       try {
         Pipeline pipeline = pipelineManager.getPipeline(pipelineID);
-        RatisPipelineUtils
-            .finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, true);
+        pipelineManager.finalizeAndDestroyPipeline(pipeline, true);
       } catch (IOException e) {
         LOG.info("Could not finalize pipeline={} for dn={}", pipelineID,
             datanodeDetails);

+ 110 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java

@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.utils.Scheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Implements api for running background pipeline creation jobs.
+ */
+class BackgroundPipelineCreator {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(BackgroundPipelineCreator.class);
+
+  private final Scheduler scheduler;
+  private final AtomicBoolean isPipelineCreatorRunning;
+  private final PipelineManager pipelineManager;
+  private final Configuration conf;
+
+  BackgroundPipelineCreator(PipelineManager pipelineManager,
+      Scheduler scheduler, Configuration conf) {
+    this.pipelineManager = pipelineManager;
+    this.conf = conf;
+    this.scheduler = scheduler;
+    isPipelineCreatorRunning = new AtomicBoolean(false);
+  }
+
+  private boolean shouldSchedulePipelineCreator() {
+    return isPipelineCreatorRunning.compareAndSet(false, true);
+  }
+
+  /**
+   * Schedules a fixed interval job to create pipelines.
+   */
+  void startFixedIntervalPipelineCreator() {
+    long intervalInMillis = conf
+        .getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL,
+            ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT,
+            TimeUnit.MILLISECONDS);
+    // TODO: #CLUTIL We can start the job asap
+    scheduler.scheduleWithFixedDelay(() -> {
+      if (!shouldSchedulePipelineCreator()) {
+        return;
+      }
+      createPipelines();
+    }, 0, intervalInMillis, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Triggers pipeline creation via background thread.
+   */
+  void triggerPipelineCreation() {
+    // TODO: #CLUTIL introduce a better mechanism to not have more than one
+    // job of a particular type running, probably via ratis.
+    if (!shouldSchedulePipelineCreator()) {
+      return;
+    }
+    scheduler.schedule(this::createPipelines, 0, TimeUnit.MILLISECONDS);
+  }
+
+  private void createPipelines() {
+    // TODO: #CLUTIL Different replication factor may need to be supported
+    HddsProtos.ReplicationType type = HddsProtos.ReplicationType.valueOf(
+        conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE,
+            OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT));
+
+    for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor
+        .values()) {
+      while (true) {
+        try {
+          if (scheduler.isClosed()) {
+            break;
+          }
+          pipelineManager.createPipeline(type, factor);
+        } catch (IOException ioe) {
+          break;
+        } catch (Throwable t) {
+          LOG.error("Error while creating pipelines {}", t);
+          break;
+        }
+      }
+    }
+    isPipelineCreatorRunning.set(false);
+  }
+}

+ 1 - 3
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java

@@ -59,9 +59,7 @@ public class PipelineActionHandler
           Pipeline pipeline = pipelineManager.getPipeline(pipelineID);
           LOG.info("Received pipeline action {} for {} from datanode [}",
               action.getAction(), pipeline, report.getDatanodeDetails());
-          RatisPipelineUtils
-              .finalizeAndDestroyPipeline(pipelineManager, pipeline, ozoneConf,
-                  true);
+          pipelineManager.finalizeAndDestroyPipeline(pipeline, true);
         } catch (IOException ioe) {
           LOG.error("Could not execute pipeline action={} pipeline={} {}",
               action, pipelineID, ioe);

+ 0 - 6
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java

@@ -61,10 +61,4 @@ public final class PipelineFactory {
       List<DatanodeDetails> nodes) {
     return providers.get(type).create(factor, nodes);
   }
-
-  public void close() {
-    for (PipelineProvider p : providers.values()) {
-      p.close();
-    }
-  }
 }

+ 6 - 3
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java

@@ -67,9 +67,12 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean {
 
   int getNumberOfContainers(PipelineID pipelineID) throws IOException;
 
-  void finalizePipeline(PipelineID pipelineID) throws IOException;
-
   void openPipeline(PipelineID pipelineId) throws IOException;
 
-  void removePipeline(PipelineID pipelineID) throws IOException;
+  void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout)
+      throws IOException;
+
+  void startPipelineCreator();
+
+  void triggerPipelineCreation();
 }

+ 0 - 1
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java

@@ -33,5 +33,4 @@ public interface PipelineProvider {
 
   Pipeline create(ReplicationFactor factor, List<DatanodeDetails> nodes);
 
-  void close();
 }

+ 0 - 9
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java

@@ -108,15 +108,6 @@ public class PipelineReportHandler implements
         // if all the dns have reported, pipeline can be moved to OPEN state
         pipelineManager.openPipeline(pipelineID);
       }
-    } else if (pipeline.isClosed()) {
-      int numContainers = pipelineManager.getNumberOfContainers(pipelineID);
-      if (numContainers == 0) {
-        // since all the containers have been closed the pipeline can be
-        // destroyed
-        LOGGER.info("Destroying pipeline {} as all containers are closed",
-            pipeline);
-        RatisPipelineUtils.destroyPipeline(pipelineManager, pipeline, conf);
-      }
     } else {
       // In OPEN state case just report the datanode
       pipeline.reportDatanode(dn);

+ 0 - 17
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java

@@ -28,7 +28,6 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacem
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
-import org.apache.hadoop.utils.Scheduler;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
@@ -38,8 +37,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-
 /**
  * Implements Api for creating ratis pipelines.
  */
@@ -48,20 +45,12 @@ public class RatisPipelineProvider implements PipelineProvider {
   private final NodeManager nodeManager;
   private final PipelineStateManager stateManager;
   private final Configuration conf;
-  private static Scheduler scheduler;
 
-  //TODO static Scheduler should be removed!!!! HDDS-1128
-  @SuppressFBWarnings("ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD")
   RatisPipelineProvider(NodeManager nodeManager,
       PipelineStateManager stateManager, Configuration conf) {
     this.nodeManager = nodeManager;
     this.stateManager = stateManager;
     this.conf = conf;
-    scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
-  }
-
-  static Scheduler getScheduler() {
-    return scheduler;
   }
 
   /**
@@ -98,7 +87,6 @@ public class RatisPipelineProvider implements PipelineProvider {
     }
   }
 
-
   @Override
   public Pipeline create(ReplicationFactor factor) throws IOException {
     // Get set of datanodes already used for ratis pipeline
@@ -146,9 +134,4 @@ public class RatisPipelineProvider implements PipelineProvider {
   protected void initializePipeline(Pipeline pipeline) throws IOException {
     RatisPipelineUtils.createPipeline(pipeline, conf);
   }
-
-  @Override
-  public void close() {
-    scheduler.close();
-  }
 }

+ 6 - 117
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java

@@ -19,12 +19,10 @@ package org.apache.hadoop.hdds.scm.pipeline;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.io.MultipleIOException;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.ratis.RatisHelper;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.grpc.GrpcTlsConfig;
@@ -42,17 +40,12 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Utility class for Ratis pipelines. Contains methods to create and destroy
  * ratis pipelines.
  */
-public final class RatisPipelineUtils {
-
-  private static AtomicBoolean isPipelineCreatorRunning =
-      new AtomicBoolean(false);
+final class RatisPipelineUtils {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(RatisPipelineUtils.class);
@@ -87,13 +80,11 @@ public final class RatisPipelineUtils {
    * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
    * the datanodes.
    *
-   * @param pipelineManager - SCM pipeline manager
    * @param pipeline        - Pipeline to be destroyed
    * @param ozoneConf       - Ozone configuration
    * @throws IOException
    */
-  public static void destroyPipeline(PipelineManager pipelineManager,
-      Pipeline pipeline, Configuration ozoneConf) throws IOException {
+  static void destroyPipeline(Pipeline pipeline, Configuration ozoneConf) {
     final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
     LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group);
     for (DatanodeDetails dn : pipeline.getNodes()) {
@@ -104,42 +95,6 @@ public final class RatisPipelineUtils {
             pipeline.getId(), dn);
       }
     }
-    // remove the pipeline from the pipeline manager
-    pipelineManager.removePipeline(pipeline.getId());
-    triggerPipelineCreation(pipelineManager, ozoneConf, 0);
-  }
-
-  /**
-   * Finalizes pipeline in the SCM. Removes pipeline and sends ratis command to
-   * destroy pipeline on the datanodes immediately or after timeout based on the
-   * value of onTimeout parameter.
-   *
-   * @param pipelineManager - SCM pipeline manager
-   * @param pipeline        - Pipeline to be destroyed
-   * @param ozoneConf       - Ozone Configuration
-   * @param onTimeout       - if true pipeline is removed and destroyed on
-   *                        datanodes after timeout
-   * @throws IOException
-   */
-  public static void finalizeAndDestroyPipeline(PipelineManager pipelineManager,
-      Pipeline pipeline, Configuration ozoneConf, boolean onTimeout)
-      throws IOException {
-    final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
-    LOG.info("destroying pipeline:{} with {}", pipeline.getId(), group);
-    pipelineManager.finalizePipeline(pipeline.getId());
-    if (onTimeout) {
-      long pipelineDestroyTimeoutInMillis = ozoneConf
-          .getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
-              ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT,
-              TimeUnit.MILLISECONDS);
-      RatisPipelineProvider.getScheduler()
-          .schedule(() -> destroyPipeline(pipelineManager, pipeline, ozoneConf),
-              pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS, LOG, String
-                  .format("Destroy pipeline failed for pipeline:%s with %s",
-                      pipeline.getId(), group));
-    } else {
-      destroyPipeline(pipelineManager, pipeline, ozoneConf);
-    }
   }
 
   /**
@@ -194,80 +149,14 @@ public final class RatisPipelineUtils {
               retryPolicy, maxOutstandingRequests, tlsConfig)) {
         rpc.accept(client, p);
       } catch (IOException ioe) {
-        exceptions.add(
-            new IOException("Failed invoke Ratis rpc " + rpc + " for " +
-                d.getUuid(), ioe));
+        String errMsg =
+            "Failed invoke Ratis rpc " + rpc + " for " + d.getUuid();
+        LOG.error(errMsg, ioe);
+        exceptions.add(new IOException(errMsg, ioe));
       }
     });
     if (!exceptions.isEmpty()) {
       throw MultipleIOException.createIOException(exceptions);
     }
   }
-
-  /**
-   * Schedules a fixed interval job to create pipelines.
-   *
-   * @param pipelineManager - Pipeline manager
-   * @param conf            - Configuration
-   */
-  public static void scheduleFixedIntervalPipelineCreator(
-      PipelineManager pipelineManager, Configuration conf) {
-    long intervalInMillis = conf
-        .getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL,
-            ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT,
-            TimeUnit.MILLISECONDS);
-    // TODO: #CLUTIL We can start the job asap
-    RatisPipelineProvider.getScheduler().scheduleWithFixedDelay(() -> {
-      if (!isPipelineCreatorRunning.compareAndSet(false, true)) {
-        return;
-      }
-      createPipelines(pipelineManager, conf);
-    }, intervalInMillis, intervalInMillis, TimeUnit.MILLISECONDS);
-  }
-
-  /**
-   * Triggers pipeline creation after the specified time.
-   *
-   * @param pipelineManager - Pipeline manager
-   * @param conf            - Configuration
-   * @param afterMillis     - Time after which pipeline creation needs to be
-   *                        triggered
-   */
-  public static void triggerPipelineCreation(PipelineManager pipelineManager,
-      Configuration conf, long afterMillis) {
-    // TODO: #CLUTIL introduce a better mechanism to not have more than one
-    // job of a particular type running, probably via ratis.
-    if (!isPipelineCreatorRunning.compareAndSet(false, true)) {
-      return;
-    }
-    RatisPipelineProvider.getScheduler()
-        .schedule(() -> createPipelines(pipelineManager, conf), afterMillis,
-            TimeUnit.MILLISECONDS);
-  }
-
-  private static void createPipelines(PipelineManager pipelineManager,
-      Configuration conf) {
-    // TODO: #CLUTIL Different replication factor may need to be supported
-    HddsProtos.ReplicationType type = HddsProtos.ReplicationType.valueOf(
-        conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE,
-            OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT));
-
-    for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor
-        .values()) {
-      while (true) {
-        try {
-          if (RatisPipelineProvider.getScheduler().isClosed()) {
-            break;
-          }
-          pipelineManager.createPipeline(type, factor);
-        } catch (IOException ioe) {
-          break;
-        } catch (Throwable t) {
-          LOG.error("Error while creating pipelines {}", t);
-          break;
-        }
-      }
-    }
-    isPipelineCreatorRunning.set(false);
-  }
 }

+ 109 - 27
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -34,6 +35,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.utils.MetadataKeyFilters;
 import org.apache.hadoop.utils.MetadataStore;
 import org.apache.hadoop.utils.MetadataStoreBuilder;
+import org.apache.hadoop.utils.Scheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,6 +48,7 @@ import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Set;
 import java.util.Collection;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -68,19 +71,27 @@ public class SCMPipelineManager implements PipelineManager {
   private final ReadWriteLock lock;
   private final PipelineFactory pipelineFactory;
   private final PipelineStateManager stateManager;
-  private final MetadataStore pipelineStore;
+  private final BackgroundPipelineCreator backgroundPipelineCreator;
+  private Scheduler scheduler;
+  private MetadataStore pipelineStore;
 
   private final EventPublisher eventPublisher;
   private final NodeManager nodeManager;
   private final SCMPipelineMetrics metrics;
+  private final Configuration conf;
   // Pipeline Manager MXBean
   private ObjectName pmInfoBean;
 
   public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
       EventPublisher eventPublisher) throws IOException {
     this.lock = new ReentrantReadWriteLock();
+    this.conf = conf;
     this.stateManager = new PipelineStateManager(conf);
     this.pipelineFactory = new PipelineFactory(nodeManager, stateManager, conf);
+    // TODO: See if thread priority needs to be set for these threads
+    scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
+    this.backgroundPipelineCreator =
+        new BackgroundPipelineCreator(this, scheduler, conf);
     int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
         OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
     final File metaDir = ServerUtils.getScmDbDir(conf);
@@ -268,35 +279,116 @@ public class SCMPipelineManager implements PipelineManager {
   }
 
   @Override
-  public void finalizePipeline(PipelineID pipelineId) throws IOException {
+  public void openPipeline(PipelineID pipelineId) throws IOException {
     lock.writeLock().lock();
     try {
-      stateManager.finalizePipeline(pipelineId);
-      Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
-      for (ContainerID containerID : containerIDs) {
-        eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
-      }
+      stateManager.openPipeline(pipelineId);
     } finally {
       lock.writeLock().unlock();
     }
   }
 
+  /**
+   * Finalizes pipeline in the SCM. Removes pipeline and makes rpc call to
+   * destroy pipeline on the datanodes immediately or after timeout based on the
+   * value of onTimeout parameter.
+   *
+   * @param pipeline        - Pipeline to be destroyed
+   * @param onTimeout       - if true pipeline is removed and destroyed on
+   *                        datanodes after timeout
+   * @throws IOException
+   */
   @Override
-  public void openPipeline(PipelineID pipelineId) throws IOException {
+  public void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout)
+      throws IOException {
+    LOG.info("destroying pipeline:{}", pipeline);
+    finalizePipeline(pipeline.getId());
+    if (onTimeout) {
+      long pipelineDestroyTimeoutInMillis =
+          conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
+              ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT,
+              TimeUnit.MILLISECONDS);
+      scheduler.schedule(() -> destroyPipeline(pipeline),
+          pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS, LOG,
+          String.format("Destroy pipeline failed for pipeline:%s", pipeline));
+    } else {
+      destroyPipeline(pipeline);
+    }
+  }
+
+  @Override
+  public Map<String, Integer> getPipelineInfo() {
+    final Map<String, Integer> pipelineInfo = new HashMap<>();
+    for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) {
+      pipelineInfo.put(state.toString(), 0);
+    }
+    stateManager.getPipelines().forEach(pipeline ->
+        pipelineInfo.computeIfPresent(
+            pipeline.getPipelineState().toString(), (k, v) -> v + 1));
+    return pipelineInfo;
+  }
+
+  /**
+   * Schedules a fixed interval job to create pipelines.
+   */
+  @Override
+  public void startPipelineCreator() {
+    backgroundPipelineCreator.startFixedIntervalPipelineCreator();
+  }
+
+  /**
+   * Triggers pipeline creation after the specified time.
+   */
+  @Override
+  public void triggerPipelineCreation() {
+    backgroundPipelineCreator.triggerPipelineCreation();
+  }
+
+  /**
+   * Moves the pipeline to CLOSED state and sends close container command for
+   * all the containers in the pipeline.
+   *
+   * @param pipelineId - ID of the pipeline to be moved to CLOSED state.
+   * @throws IOException
+   */
+  private void finalizePipeline(PipelineID pipelineId) throws IOException {
     lock.writeLock().lock();
     try {
-      stateManager.openPipeline(pipelineId);
+      stateManager.finalizePipeline(pipelineId);
+      Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
+      for (ContainerID containerID : containerIDs) {
+        eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
+      }
     } finally {
       lock.writeLock().unlock();
     }
   }
 
-  @Override
-  public void removePipeline(PipelineID pipelineID) throws IOException {
+  /**
+   * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
+   * the datanodes for ratis pipelines.
+   *
+   * @param pipeline        - Pipeline to be destroyed
+   * @throws IOException
+   */
+  private void destroyPipeline(Pipeline pipeline) throws IOException {
+    RatisPipelineUtils.destroyPipeline(pipeline, conf);
+    // remove the pipeline from the pipeline manager
+    removePipeline(pipeline.getId());
+    triggerPipelineCreation();
+  }
+
+  /**
+   * Removes the pipeline from the db and pipeline state map.
+   *
+   * @param pipelineId - ID of the pipeline to be removed
+   * @throws IOException
+   */
+  private void removePipeline(PipelineID pipelineId) throws IOException {
     lock.writeLock().lock();
     try {
-      pipelineStore.delete(pipelineID.getProtobuf().toByteArray());
-      Pipeline pipeline = stateManager.removePipeline(pipelineID);
+      pipelineStore.delete(pipelineId.getProtobuf().toByteArray());
+      Pipeline pipeline = stateManager.removePipeline(pipelineId);
       nodeManager.removePipeline(pipeline);
       metrics.incNumPipelineDestroyed();
     } catch (IOException ex) {
@@ -307,26 +399,16 @@ public class SCMPipelineManager implements PipelineManager {
     }
   }
 
-  @Override
-  public Map<String, Integer> getPipelineInfo() {
-    final Map<String, Integer> pipelineInfo = new HashMap<>();
-    for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) {
-      pipelineInfo.put(state.toString(), 0);
-    }
-    stateManager.getPipelines().forEach(pipeline ->
-        pipelineInfo.computeIfPresent(
-            pipeline.getPipelineState().toString(), (k, v) -> v + 1));
-    return pipelineInfo;
-  }
-
   @Override
   public void close() throws IOException {
-    if (pipelineFactory != null) {
-      pipelineFactory.close();
+    if (scheduler != null) {
+      scheduler.close();
+      scheduler = null;
     }
 
     if (pipelineStore != null) {
       pipelineStore.close();
+      pipelineStore = null;
     }
     if(pmInfoBean != null) {
       MBeans.unregister(this.pmInfoBean);

+ 0 - 5
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java

@@ -72,9 +72,4 @@ public class SimplePipelineProvider implements PipelineProvider {
         .setNodes(nodes)
         .build();
   }
-
-  @Override
-  public void close() {
-    // Nothing to do in here.
-  }
 }

+ 1 - 3
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java

@@ -46,7 +46,6 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
 import org.apache.hadoop.io.IOUtils;
@@ -411,8 +410,7 @@ public class SCMClientProtocolServer implements
     PipelineManager pipelineManager = scm.getPipelineManager();
     Pipeline pipeline =
         pipelineManager.getPipeline(PipelineID.getFromProtobuf(pipelineID));
-    RatisPipelineUtils
-        .finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, false);
+    pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
     AUDIT.logWriteSuccess(
         buildAuditMessageForSuccess(SCMAction.CLOSE_PIPELINE, null)
     );

+ 1 - 3
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java

@@ -35,7 +35,6 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
 import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.server.events.EventHandler;
@@ -274,8 +273,7 @@ public class TestBlockManager implements EventHandler<Boolean> {
         .waitFor(() -> !blockManager.isScmInChillMode(), 10, 1000 * 5);
 
     for (Pipeline pipeline : pipelineManager.getPipelines()) {
-      RatisPipelineUtils
-          .finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, false);
+      pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
     }
     Assert.assertEquals(0, pipelineManager.getPipelines(type, factor).size());
     Assert.assertNotNull(blockManager

+ 6 - 8
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestSCMChillModeManager.java

@@ -35,19 +35,16 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
-import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
+import org.apache.hadoop.hdds.scm.pipeline.*;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
-import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
+import org.mockito.Mockito;
 
 /** Test class for SCMChillModeManager.
  */
@@ -128,12 +125,13 @@ public class TestSCMChillModeManager {
   }
 
   @Test
-  @Ignore("TODO:HDDS-1140")
   public void testDisableChillMode() {
     OzoneConfiguration conf = new OzoneConfiguration(config);
     conf.setBoolean(HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED, false);
-    scmChillModeManager = new SCMChillModeManager(
-        conf, containers, null, queue);
+    PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
+    Mockito.doNothing().when(pipelineManager).startPipelineCreator();
+    scmChillModeManager =
+        new SCMChillModeManager(conf, containers, pipelineManager, queue);
     assertFalse(scmChillModeManager.getInChillMode());
   }
 

+ 2 - 2
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java

@@ -112,8 +112,8 @@ public class TestNode2PipelineMap {
         ratisContainer.getPipeline().getId());
     Assert.assertEquals(0, set2.size());
 
-    pipelineManager.finalizePipeline(ratisContainer.getPipeline().getId());
-    pipelineManager.removePipeline(ratisContainer.getPipeline().getId());
+    pipelineManager
+        .finalizeAndDestroyPipeline(ratisContainer.getPipeline(), false);
     pipelines = scm.getScmNodeManager()
         .getPipelines(dns.get(0));
     Assert

+ 18 - 56
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java

@@ -24,24 +24,23 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.TestUtils;
-import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -121,12 +120,8 @@ public class TestPipelineClose {
         .getContainersInPipeline(ratisContainer.getPipeline().getId());
     Assert.assertEquals(0, setClosed.size());
 
-    pipelineManager.finalizePipeline(ratisContainer.getPipeline().getId());
-    Pipeline pipeline1 = pipelineManager
-        .getPipeline(ratisContainer.getPipeline().getId());
-    Assert.assertEquals(Pipeline.PipelineState.CLOSED,
-        pipeline1.getPipelineState());
-    pipelineManager.removePipeline(pipeline1.getId());
+    pipelineManager
+        .finalizeAndDestroyPipeline(ratisContainer.getPipeline(), false);
     for (DatanodeDetails dn : ratisContainer.getPipeline().getNodes()) {
       // Assert that the pipeline has been removed from Node2PipelineMap as well
       Assert.assertFalse(scm.getScmNodeManager().getPipelines(dn)
@@ -135,21 +130,23 @@ public class TestPipelineClose {
   }
 
   @Test
-  public void testPipelineCloseWithOpenContainer() throws IOException,
-      TimeoutException, InterruptedException {
+  public void testPipelineCloseWithOpenContainer()
+      throws IOException, TimeoutException, InterruptedException {
     Set<ContainerID> setOpen = pipelineManager.getContainersInPipeline(
         ratisContainer.getPipeline().getId());
     Assert.assertEquals(1, setOpen.size());
 
-    ContainerID cId2 = ratisContainer.getContainerInfo().containerID();
-    pipelineManager.finalizePipeline(ratisContainer.getPipeline().getId());
-    Assert.assertEquals(Pipeline.PipelineState.CLOSED,
-        pipelineManager.getPipeline(
-            ratisContainer.getPipeline().getId()).getPipelineState());
-    Pipeline pipeline2 = pipelineManager
-        .getPipeline(ratisContainer.getPipeline().getId());
-    Assert.assertEquals(Pipeline.PipelineState.CLOSED,
-        pipeline2.getPipelineState());
+    pipelineManager
+        .finalizeAndDestroyPipeline(ratisContainer.getPipeline(), false);
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return containerManager
+            .getContainer(ratisContainer.getContainerInfo().containerID())
+            .getState() == HddsProtos.LifeCycleState.CLOSING;
+      } catch (ContainerNotFoundException e) {
+        return false;
+      }
+    }, 100, 10000);
   }
 
   @Test
@@ -183,39 +180,4 @@ public class TestPipelineClose {
     } catch (PipelineNotFoundException e) {
     }
   }
-
-  @Test
-  public void testPipelineCloseWithPipelineReport() throws IOException {
-    Pipeline pipeline = ratisContainer.getPipeline();
-    pipelineManager.finalizePipeline(pipeline.getId());
-    // remove pipeline from SCM
-    pipelineManager.removePipeline(pipeline.getId());
-
-    for (DatanodeDetails dn : pipeline.getNodes()) {
-      PipelineReportFromDatanode pipelineReport =
-          TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId());
-      EventQueue eventQueue = new EventQueue();
-      SCMChillModeManager scmChillModeManager =
-          new SCMChillModeManager(new OzoneConfiguration(),
-              new ArrayList<>(), pipelineManager, eventQueue);
-      PipelineReportHandler pipelineReportHandler =
-          new PipelineReportHandler(scmChillModeManager, pipelineManager, conf);
-      // on receiving pipeline report for the pipeline, pipeline report handler
-      // should destroy the pipeline for the dn
-      pipelineReportHandler.onMessage(pipelineReport, eventQueue);
-    }
-
-    OzoneContainer ozoneContainer =
-        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
-            .getContainer();
-    List<PipelineReport> pipelineReports =
-        ozoneContainer.getPipelineReport().getPipelineReportList();
-    for (PipelineReport pipelineReport : pipelineReports) {
-      // pipeline should not be reported by any dn
-      Assert.assertNotEquals(
-          PipelineID.getFromProtobuf(pipelineReport.getPipelineID()),
-          ratisContainer.getPipeline().getId());
-    }
-  }
-
-}
+}

+ 11 - 3
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineUtils.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdds.scm.pipeline;
 
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -46,6 +47,8 @@ public class TestRatisPipelineUtils {
   private static PipelineManager pipelineManager;
 
   public void init(int numDatanodes) throws Exception {
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
+        GenericTestUtils.getRandomizedTempPath());
     cluster = MiniOzoneCluster.newBuilder(conf)
             .setNumDatanodes(numDatanodes)
             .setHbInterval(1000)
@@ -71,8 +74,7 @@ public class TestRatisPipelineUtils {
         .getPipelines(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN);
     for (Pipeline pipeline : pipelines) {
-      RatisPipelineUtils
-          .finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, false);
+      pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
     }
     // make sure two pipelines are created
     waitForPipelines(2);
@@ -108,7 +110,13 @@ public class TestRatisPipelineUtils {
     for (HddsDatanodeService dn : dns) {
       cluster.restartHddsDatanode(dn.getDatanodeDetails(), false);
     }
+
+    // destroy the existing pipelines
+    for (Pipeline pipeline : pipelines) {
+      pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+    }
     // make sure pipelines is created after node start
+    pipelineManager.triggerPipelineCreation();
     waitForPipelines(1);
   }
 
@@ -117,6 +125,6 @@ public class TestRatisPipelineUtils {
     GenericTestUtils.waitFor(() -> pipelineManager
         .getPipelines(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN)
-        .size() == numPipelines, 100, 20000);
+        .size() == numPipelines, 100, 40000);
   }
 }

+ 4 - 9
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java

@@ -104,8 +104,7 @@ public class TestSCMPipelineManager {
 
     // clean up
     for (Pipeline pipeline : pipelines) {
-      pipelineManager.finalizePipeline(pipeline.getId());
-      pipelineManager.removePipeline(pipeline.getId());
+      pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
     }
     pipelineManager.close();
   }
@@ -126,10 +125,7 @@ public class TestSCMPipelineManager {
     pipelineManager.openPipeline(pipeline.getId());
     pipelineManager
         .addContainerToPipeline(pipeline.getId(), ContainerID.valueof(1));
-    pipelineManager.finalizePipeline(pipeline.getId());
-    pipelineManager
-        .removeContainerFromPipeline(pipeline.getId(), ContainerID.valueof(1));
-    pipelineManager.removePipeline(pipeline.getId());
+    pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
     pipelineManager.close();
 
     // new pipeline manager should not be able to load removed pipelines
@@ -192,13 +188,12 @@ public class TestSCMPipelineManager {
         .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen());
 
     // close the pipeline
-    pipelineManager.finalizePipeline(pipeline.getId());
+    pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
 
     for (DatanodeDetails dn: pipeline.getNodes()) {
       PipelineReportFromDatanode pipelineReportFromDatanode =
           TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId());
-      // pipeline report for a closed pipeline should destroy the pipeline
-      // and remove it from the pipeline manager
+      // pipeline report for destroyed pipeline should be ignored
       pipelineReportHandler
           .onMessage(pipelineReportFromDatanode, new EventQueue());
     }

+ 1 - 3
hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java

@@ -25,7 +25,6 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
 import org.junit.AfterClass;
@@ -103,7 +102,6 @@ public class TestFreonWithPipelineDestroy {
     PipelineManager pipelineManager =
         cluster.getStorageContainerManager().getPipelineManager();
     Pipeline pipeline = pipelineManager.getPipeline(id);
-    RatisPipelineUtils
-        .finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, false);
+    pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
   }
 }