浏览代码

HDDS-298. Implement SCMClientProtocolServer.getContainerWithPipeline for closed containers. Contributed by Ajay Kumar.

Xiaoyu Yao 6 年之前
父节点
当前提交
75fc51588d

+ 20 - 8
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java

@@ -58,6 +58,7 @@ import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -201,14 +202,25 @@ public class ContainerMapping implements Mapping {
       HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER
           .parseFrom(containerBytes);
       contInfo = ContainerInfo.fromProtobuf(temp);
-      Pipeline pipeline = pipelineSelector
-          .getPipeline(contInfo.getPipelineID(),
-              contInfo.getReplicationType());
-
-      if(pipeline == null) {
-        pipeline = pipelineSelector
-            .getReplicationPipeline(contInfo.getReplicationType(),
-                contInfo.getReplicationFactor());
+
+      Pipeline pipeline;
+      if (contInfo.isContainerOpen()) {
+        // If pipeline with given pipeline Id already exist return it
+        pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID(),
+            contInfo.getReplicationType());
+        if (pipeline == null) {
+          pipeline = pipelineSelector
+              .getReplicationPipeline(contInfo.getReplicationType(),
+                  contInfo.getReplicationFactor());
+        }
+      } else {
+        // For close containers create pipeline from datanodes with replicas
+        Set<DatanodeDetails> dnWithReplicas = containerStateManager
+            .getContainerReplicas(contInfo.containerID());
+        pipeline = new Pipeline(dnWithReplicas.iterator().next().getHostName(),
+            contInfo.getState(), ReplicationType.STAND_ALONE,
+            contInfo.getReplicationFactor(), PipelineID.randomId());
+        dnWithReplicas.forEach(pipeline::addMember);
       }
       return new ContainerWithPipeline(contInfo, pipeline);
     } finally {

+ 1 - 1
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java

@@ -199,7 +199,7 @@ public class ContainerStateMap {
     }
     throw new SCMException(
         "No entry exist for containerId: " + containerID + " in replica map.",
-        ResultCodes.FAILED_TO_FIND_CONTAINER);
+        ResultCodes.NO_REPLICA_FOUND);
   }
 
   /**

+ 2 - 1
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java

@@ -117,6 +117,7 @@ public class SCMException extends IOException {
     UNEXPECTED_CONTAINER_STATE,
     SCM_NOT_INITIALIZED,
     DUPLICATE_DATANODE,
-    NO_SUCH_DATANODE
+    NO_SUCH_DATANODE,
+    NO_REPLICA_FOUND
   }
 }

+ 50 - 0
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java

@@ -18,6 +18,8 @@ package org.apache.hadoop.hdds.scm.container;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
@@ -30,10 +32,12 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -149,6 +153,52 @@ public class TestContainerMapping {
         newPipeline.getLeader().getUuid());
   }
 
+  @Test
+  public void testGetContainerWithPipeline() throws Exception {
+    ContainerWithPipeline containerWithPipeline = mapping.allocateContainer(
+        xceiverClientManager.getType(),
+        xceiverClientManager.getFactor(),
+        containerOwner);
+    ContainerInfo contInfo = containerWithPipeline.getContainerInfo();
+    // Add dummy replicas for container.
+    DatanodeDetails dn1 = DatanodeDetails.newBuilder()
+        .setHostName("host1")
+        .setIpAddress("1.1.1.1")
+        .setUuid(UUID.randomUUID().toString()).build();
+    DatanodeDetails dn2 = DatanodeDetails.newBuilder()
+        .setHostName("host2")
+        .setIpAddress("2.2.2.2")
+        .setUuid(UUID.randomUUID().toString()).build();
+    mapping
+        .updateContainerState(contInfo.getContainerID(), LifeCycleEvent.CREATE);
+    mapping.updateContainerState(contInfo.getContainerID(),
+        LifeCycleEvent.CREATED);
+    mapping.updateContainerState(contInfo.getContainerID(),
+        LifeCycleEvent.FINALIZE);
+    mapping
+        .updateContainerState(contInfo.getContainerID(), LifeCycleEvent.CLOSE);
+    ContainerInfo finalContInfo = contInfo;
+    LambdaTestUtils.intercept(SCMException.class,"No entry exist for "
+        + "containerId:" , () -> mapping.getContainerWithPipeline(
+        finalContInfo.getContainerID()));
+
+    mapping.getStateManager().getContainerStateMap()
+        .addContainerReplica(contInfo.containerID(), dn1, dn2);
+
+    contInfo = mapping.getContainer(contInfo.getContainerID());
+    Assert.assertEquals(contInfo.getState(), LifeCycleState.CLOSED);
+    Pipeline pipeline = containerWithPipeline.getPipeline();
+    mapping.getPipelineSelector().finalizePipeline(pipeline);
+
+    ContainerWithPipeline containerWithPipeline2 = mapping
+        .getContainerWithPipeline(contInfo.getContainerID());
+    pipeline = containerWithPipeline2.getPipeline();
+    Assert.assertNotEquals(containerWithPipeline, containerWithPipeline2);
+    Assert.assertNotNull("Pipeline should not be null", pipeline);
+    Assert.assertTrue(pipeline.getDatanodeHosts().contains(dn1.getHostName()));
+    Assert.assertTrue(pipeline.getDatanodeHosts().contains(dn2.getHostName()));
+  }
+
   @Test
   public void testgetNoneExistentContainer() throws IOException {
     thrown.expectMessage("Specified key does not exist.");