Browse Source

HDDS-1076. TestSCMNodeManager crashed the jvm. Contributed by Lokesh Jain.

Shashikant Banerjee 6 years ago
parent
commit
de934ba2dc

+ 101 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/Scheduler.java

@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.utils;
+
+import org.apache.ratis.util.function.CheckedRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class encapsulates ScheduledExecutorService.
+ */
+public class Scheduler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(Scheduler.class);
+
+  private ScheduledExecutorService scheduler;
+
+  private volatile boolean isClosed;
+
+  private String threadName;
+
+  /**
+   * Creates a ScheduledExecutorService based on input arguments.
+   * @param threadName - thread name
+   * @param isDaemon - if true the threads in the scheduler are started as
+   *                 daemon
+   * @param numCoreThreads - number of core threads to maintain in the scheduler
+   */
+  public Scheduler(String threadName, boolean isDaemon, int numCoreThreads) {
+    scheduler = Executors.newScheduledThreadPool(numCoreThreads, r -> {
+      Thread t = new Thread(r);
+      t.setName(threadName);
+      t.setDaemon(isDaemon);
+      return t;
+    });
+    this.threadName = threadName;
+    isClosed = false;
+  }
+
+  public void schedule(Runnable runnable, long delay, TimeUnit timeUnit) {
+    scheduler.schedule(runnable, delay, timeUnit);
+  }
+
+  public void schedule(CheckedRunnable runnable, long delay,
+      TimeUnit timeUnit, Logger logger, String errMsg) {
+    scheduler.schedule(() -> {
+      try {
+        runnable.run();
+      } catch (Throwable throwable) {
+        logger.error(errMsg, throwable);
+      }
+    }, delay, timeUnit);
+  }
+
+  public void scheduleWithFixedDelay(Runnable runnable, long initialDelay,
+      long fixedDelay, TimeUnit timeUnit) {
+    scheduler
+        .scheduleWithFixedDelay(runnable, initialDelay, fixedDelay, timeUnit);
+  }
+
+  public boolean isClosed() {
+    return isClosed;
+  }
+
+  /**
+   * Closes the scheduler for further task submission. Any pending tasks not
+   * yet executed are also cancelled. For the executing tasks the scheduler
+   * waits 60 seconds for completion.
+   */
+  public void close() {
+    isClosed = true;
+    scheduler.shutdownNow();
+    try {
+      scheduler.awaitTermination(60, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOG.info(threadName + " interrupted while waiting for task completion {}",
+          e);
+    }
+    scheduler = null;
+  }
+}

+ 6 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java

@@ -54,4 +54,10 @@ public final class PipelineFactory {
       List<DatanodeDetails> nodes) {
     return providers.get(type).create(factor, nodes);
   }
+
+  public void close() {
+    for (PipelineProvider p : providers.values()) {
+      p.close();
+    }
+  }
 }

+ 2 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java

@@ -32,4 +32,6 @@ public interface PipelineProvider {
   Pipeline create(ReplicationFactor factor) throws IOException;
 
   Pipeline create(ReplicationFactor factor, List<DatanodeDetails> nodes);
+
+  void close();
 }

+ 12 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacem
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
+import org.apache.hadoop.utils.Scheduler;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
@@ -45,12 +46,18 @@ public class RatisPipelineProvider implements PipelineProvider {
   private final NodeManager nodeManager;
   private final PipelineStateManager stateManager;
   private final Configuration conf;
+  private static Scheduler scheduler;
 
   RatisPipelineProvider(NodeManager nodeManager,
       PipelineStateManager stateManager, Configuration conf) {
     this.nodeManager = nodeManager;
     this.stateManager = stateManager;
     this.conf = conf;
+    scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
+  }
+
+  static Scheduler getScheduler() {
+    return scheduler;
   }
 
   /**
@@ -135,4 +142,9 @@ public class RatisPipelineProvider implements PipelineProvider {
   private void initializePipeline(Pipeline pipeline) throws IOException {
     RatisPipelineUtils.createPipeline(pipeline, conf);
   }
+
+  @Override
+  public void close() {
+    scheduler.close();
+  }
 }

+ 26 - 37
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java

@@ -34,8 +34,6 @@ import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.util.function.CheckedBiConsumer;
-import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.TimeoutScheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,8 +50,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public final class RatisPipelineUtils {
 
-  private static TimeoutScheduler timeoutScheduler =
-      TimeoutScheduler.newInstance(1);
   private static AtomicBoolean isPipelineCreatorRunning =
       new AtomicBoolean(false);
 
@@ -127,12 +123,11 @@ public final class RatisPipelineUtils {
           .getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
               ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT,
               TimeUnit.MILLISECONDS);
-      TimeDuration timeoutDuration = TimeDuration
-          .valueOf(pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS);
-      timeoutScheduler.onTimeout(timeoutDuration,
-          () -> destroyPipeline(pipelineManager, pipeline, ozoneConf), LOG,
-          () -> String.format("Destroy pipeline failed for pipeline:%s with %s",
-              pipeline.getId(), group));
+      RatisPipelineProvider.getScheduler()
+          .schedule(() -> destroyPipeline(pipelineManager, pipeline, ozoneConf),
+              pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS, LOG, String
+                  .format("Destroy pipeline failed for pipeline:%s with %s",
+                      pipeline.getId(), group));
     } else {
       destroyPipeline(pipelineManager, pipeline, ozoneConf);
     }
@@ -213,22 +208,12 @@ public final class RatisPipelineUtils {
             ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT,
             TimeUnit.MILLISECONDS);
     // TODO: #CLUTIL We can start the job asap
-    TimeDuration timeDuration =
-        TimeDuration.valueOf(intervalInMillis, TimeUnit.MILLISECONDS);
-    timeoutScheduler.onTimeout(timeDuration,
-        () -> fixedIntervalPipelineCreator(pipelineManager, conf,
-            timeDuration), LOG,
-        () -> "FixedIntervalPipelineCreatorJob failed.");
-  }
-
-  private static void fixedIntervalPipelineCreator(
-      PipelineManager pipelineManager, Configuration conf,
-      TimeDuration timeDuration) {
-    timeoutScheduler.onTimeout(timeDuration,
-        () -> fixedIntervalPipelineCreator(pipelineManager, conf,
-            timeDuration), LOG,
-        () -> "FixedIntervalPipelineCreatorJob failed.");
-    triggerPipelineCreation(pipelineManager, conf, 0);
+    RatisPipelineProvider.getScheduler().scheduleWithFixedDelay(() -> {
+      if (!isPipelineCreatorRunning.compareAndSet(false, true)) {
+        return;
+      }
+      createPipelines(pipelineManager, conf);
+    }, intervalInMillis, intervalInMillis, TimeUnit.MILLISECONDS);
   }
 
   /**
@@ -246,10 +231,9 @@ public final class RatisPipelineUtils {
     if (!isPipelineCreatorRunning.compareAndSet(false, true)) {
       return;
     }
-    timeoutScheduler
-        .onTimeout(TimeDuration.valueOf(afterMillis, TimeUnit.MILLISECONDS),
-            () -> createPipelines(pipelineManager, conf), LOG,
-            () -> "PipelineCreation failed.");
+    RatisPipelineProvider.getScheduler()
+        .schedule(() -> createPipelines(pipelineManager, conf), afterMillis,
+            TimeUnit.MILLISECONDS);
   }
 
   private static void createPipelines(PipelineManager pipelineManager,
@@ -261,13 +245,18 @@ public final class RatisPipelineUtils {
 
     for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor
         .values()) {
-      try {
-        pipelineManager.createPipeline(type, factor);
-      } catch (IOException ioe) {
-        break;
-      } catch (Throwable t) {
-        LOG.error("Error while creating pipelines {}", t);
-        break;
+      while (true) {
+        try {
+          if (RatisPipelineProvider.getScheduler().isClosed()) {
+            break;
+          }
+          pipelineManager.createPipeline(type, factor);
+        } catch (IOException ioe) {
+          break;
+        } catch (Throwable t) {
+          LOG.error("Error while creating pipelines {}", t);
+          break;
+        }
       }
     }
     isPipelineCreatorRunning.set(false);

+ 4 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java

@@ -267,6 +267,10 @@ public class SCMPipelineManager implements PipelineManager {
 
   @Override
   public void close() throws IOException {
+    if (pipelineFactory != null) {
+      pipelineFactory.close();
+    }
+
     if (pipelineStore != null) {
       pipelineStore.close();
     }

+ 5 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java

@@ -72,4 +72,9 @@ public class SimplePipelineProvider implements PipelineProvider {
         .setNodes(nodes)
         .build();
   }
+
+  @Override
+  public void close() {
+    // Nothing to do in here.
+  }
 }

+ 7 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineUtils.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.List;
@@ -51,6 +52,11 @@ public class TestRatisPipelineUtils {
     pipelineManager = scm.getPipelineManager();
   }
 
+  @After
+  public void cleanup() {
+    cluster.shutdown();
+  }
+
   @Test(timeout = 30000)
   public void testAutomaticPipelineCreationOnPipelineDestroy()
       throws Exception {
@@ -90,6 +96,6 @@ public class TestRatisPipelineUtils {
     GenericTestUtils.waitFor(() -> pipelineManager
         .getPipelines(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN)
-        .size() == numPipelines, 100, 10000);
+        .size() == numPipelines, 100, 20000);
   }
 }