Selaa lähdekoodia

HDDS-802. Container State Manager should get open pipelines for allocating container. Contributed by Lokesh Jain.

Yiqun Lin 6 vuotta sitten
vanhempi
commit
9317a61f3c

+ 2 - 2
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java

@@ -248,8 +248,8 @@ public class ContainerStateManager {
     try {
       pipeline = pipelineManager.createPipeline(type, replicationFactor);
     } catch (IOException e) {
-      final List<Pipeline> pipelines =
-          pipelineManager.getPipelines(type, replicationFactor);
+      final List<Pipeline> pipelines = pipelineManager
+          .getPipelines(type, replicationFactor, Pipeline.PipelineState.OPEN);
       if (pipelines.isEmpty()) {
         throw new IOException("Could not allocate container");
       }

+ 3 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java

@@ -46,6 +46,9 @@ public interface PipelineManager extends Closeable {
   List<Pipeline> getPipelines(ReplicationType type,
       ReplicationFactor factor);
 
+  List<Pipeline> getPipelines(ReplicationType type,
+      ReplicationFactor factor, Pipeline.PipelineState state);
+
   void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID)
       throws IOException;
 

+ 5 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java

@@ -64,6 +64,11 @@ class PipelineStateManager {
     return pipelineStateMap.getPipelines(type, factor);
   }
 
+  List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor,
+      PipelineState state) {
+    return pipelineStateMap.getPipelines(type, factor, state);
+  }
+
   List<Pipeline> getPipelines(ReplicationType type, PipelineState... states) {
     return pipelineStateMap.getPipelines(type, states);
   }

+ 22 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java

@@ -165,6 +165,28 @@ class PipelineStateMap {
         .collect(Collectors.toList());
   }
 
+  /**
+   * Get list of pipeline corresponding to specified replication type,
+   * replication factor and pipeline state.
+   *
+   * @param type - ReplicationType
+   * @param state - Required PipelineState
+   * @return List of pipelines with specified replication type,
+   * replication factor and pipeline state
+   */
+  List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor,
+      PipelineState state) {
+    Preconditions.checkNotNull(type, "Replication type cannot be null");
+    Preconditions.checkNotNull(factor, "Replication factor cannot be null");
+    Preconditions.checkNotNull(state, "Pipeline state cannot be null");
+
+    return pipelineMap.values().stream().filter(
+        pipeline -> pipeline.getType() == type
+            && pipeline.getPipelineState() == state
+            && pipeline.getFactor() == factor)
+        .collect(Collectors.toList());
+  }
+
   /**
    * Get set of containerIDs corresponding to a pipeline.
    *

+ 11 - 0
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java

@@ -165,6 +165,17 @@ public class SCMPipelineManager implements PipelineManager {
     }
   }
 
+  @Override
+  public List<Pipeline> getPipelines(ReplicationType type,
+      ReplicationFactor factor, Pipeline.PipelineState state) {
+    lock.readLock().lock();
+    try {
+      return stateManager.getPipelines(type, factor, state);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
   @Override
   public void addContainerToPipeline(PipelineID pipelineID,
       ContainerID containerID) throws IOException {

+ 57 - 4
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java

@@ -157,8 +157,8 @@ public class TestPipelineStateManager {
             stateManager.getPipelines(type, factor);
         Assert.assertEquals(15, pipelines1.size());
         pipelines1.stream().forEach(p -> {
-          Assert.assertEquals(p.getType(), type);
-          Assert.assertEquals(p.getFactor(), factor);
+          Assert.assertEquals(type, p.getType());
+          Assert.assertEquals(factor, p.getFactor());
         });
       }
     }
@@ -203,8 +203,8 @@ public class TestPipelineStateManager {
           .getPipelines(type, Pipeline.PipelineState.OPEN);
       Assert.assertEquals(5, pipelines1.size());
       pipelines1.forEach(p -> {
-        Assert.assertEquals(p.getType(), type);
-        Assert.assertEquals(p.getPipelineState(), Pipeline.PipelineState.OPEN);
+        Assert.assertEquals(type, p.getType());
+        Assert.assertEquals(Pipeline.PipelineState.OPEN, p.getPipelineState());
       });
 
       pipelines1 = stateManager
@@ -219,6 +219,59 @@ public class TestPipelineStateManager {
     }
   }
 
+  @Test
+  public void testGetPipelinesByTypeFactorAndState() throws IOException {
+    Set<Pipeline> pipelines = new HashSet<>();
+    for (HddsProtos.ReplicationType type : HddsProtos.ReplicationType
+        .values()) {
+      for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor
+          .values()) {
+        for (int i = 0; i < 5; i++) {
+          // 5 pipelines in allocated state for each type and factor
+          Pipeline pipeline =
+              createDummyPipeline(type, factor, factor.getNumber());
+          stateManager.addPipeline(pipeline);
+          pipelines.add(pipeline);
+
+          // 5 pipelines in open state for each type and factor
+          pipeline = createDummyPipeline(type, factor, factor.getNumber());
+          stateManager.addPipeline(pipeline);
+          stateManager.openPipeline(pipeline.getId());
+          pipelines.add(pipeline);
+
+          // 5 pipelines in closed state for each type and factor
+          pipeline = createDummyPipeline(type, factor, factor.getNumber());
+          stateManager.addPipeline(pipeline);
+          stateManager.finalizePipeline(pipeline.getId());
+          pipelines.add(pipeline);
+        }
+      }
+    }
+
+    for (HddsProtos.ReplicationType type : HddsProtos.ReplicationType
+        .values()) {
+      for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor
+          .values()) {
+        for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) {
+          // verify pipelines received
+          List<Pipeline> pipelines1 =
+              stateManager.getPipelines(type, factor, state);
+          Assert.assertEquals(5, pipelines1.size());
+          pipelines1.forEach(p -> {
+            Assert.assertEquals(type, p.getType());
+            Assert.assertEquals(factor, p.getFactor());
+            Assert.assertEquals(state, p.getPipelineState());
+          });
+        }
+      }
+    }
+
+    //clean up
+    for (Pipeline pipeline : pipelines) {
+      removePipeline(pipeline);
+    }
+  }
+
   @Test
   public void testAddAndGetContainer() throws IOException {
     long containerID = 0;