Ver código fonte

HDDS-1517. AllocateBlock call fails with ContainerNotFoundException (#826). Contributed by Shashikant Banerjee.

Shashikant Banerjee 6 anos atrás
pai
commit
a315913c48

+ 7 - 3
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java

@@ -263,11 +263,15 @@ public class ContainerStateManager {
       }
       pipeline = pipelines.get((int) containerCount.get() % pipelines.size());
     }
-    return allocateContainer(pipelineManager, owner, pipeline);
+    synchronized (pipeline) {
+      return allocateContainer(pipelineManager, owner, pipeline);
+    }
   }
 
   /**
    * Allocates a new container based on the type, replication etc.
+   * This method should be called only after the lock on the pipeline is held
+   * on which the container will be allocated.
    *
    * @param pipelineManager   - Pipeline Manager class.
    * @param owner             - Owner of the container.
@@ -296,10 +300,10 @@ public class ContainerStateManager {
         .setReplicationFactor(pipeline.getFactor())
         .setReplicationType(pipeline.getType())
         .build();
-    pipelineManager.addContainerToPipeline(pipeline.getId(),
-        ContainerID.valueof(containerID));
     Preconditions.checkNotNull(containerInfo);
     containers.addContainer(containerInfo);
+    pipelineManager.addContainerToPipeline(pipeline.getId(),
+        ContainerID.valueof(containerID));
     containerStateCount.incrementAndGet(containerInfo.getState());
     LOG.trace("New container allocated: {}", containerInfo);
     return containerInfo;

+ 7 - 8
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java

@@ -386,18 +386,17 @@ public class SCMContainerManager implements ContainerManager {
 
   public ContainerInfo getMatchingContainer(final long sizeRequired,
       String owner, Pipeline pipeline, List<ContainerID> excludedContainers) {
+    NavigableSet<ContainerID> containerIDs;
     try {
-      //TODO: #CLUTIL See if lock is required here
-      NavigableSet<ContainerID> containerIDs =
-          pipelineManager.getContainersInPipeline(pipeline.getId());
+      synchronized (pipeline) {
+        //TODO: #CLUTIL See if lock is required here
+        containerIDs =
+            pipelineManager.getContainersInPipeline(pipeline.getId());
 
-      containerIDs = getContainersForOwner(containerIDs, owner);
-      if (containerIDs.size() < numContainerPerOwnerInPipeline) {
-        synchronized (pipeline) {
+        containerIDs = getContainersForOwner(containerIDs, owner);
+        if (containerIDs.size() < numContainerPerOwnerInPipeline) {
           // TODO: #CLUTIL Maybe we can add selection logic inside synchronized
           // as well
-          containerIDs = getContainersForOwner(
-              pipelineManager.getContainersInPipeline(pipeline.getId()), owner);
           if (containerIDs.size() < numContainerPerOwnerInPipeline) {
             ContainerInfo containerInfo =
                 containerStateManager.allocateContainer(pipelineManager, owner,

+ 43 - 0
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java

@@ -19,7 +19,13 @@ package org.apache.hadoop.hdds.scm.block;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeoutException;
+
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -132,6 +138,43 @@ public class TestBlockManager {
     Assert.assertNotNull(block);
   }
 
+  @Test
+  public void testAllocateBlockInParallel() throws Exception {
+    eventQueue.fireEvent(SCMEvents.SAFE_MODE_STATUS, safeModeStatus);
+    GenericTestUtils.waitFor(() -> {
+      return !blockManager.isScmInSafeMode();
+    }, 10, 1000 * 5);
+    int threadCount = 20;
+    List<ExecutorService> executors = new ArrayList<>(threadCount);
+    for (int i = 0; i < threadCount; i++) {
+      executors.add(Executors.newSingleThreadExecutor());
+    }
+    List<CompletableFuture<AllocatedBlock>> futureList =
+        new ArrayList<>(threadCount);
+    for (int i = 0; i < threadCount; i++) {
+      final CompletableFuture<AllocatedBlock> future =
+          new CompletableFuture<>();
+      CompletableFuture.supplyAsync(() -> {
+        try {
+          future.complete(blockManager
+              .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, containerOwner,
+                  new ExcludeList()));
+        } catch (IOException e) {
+          future.completeExceptionally(e);
+        }
+        return future;
+      }, executors.get(i));
+      futureList.add(future);
+    }
+    try {
+      CompletableFuture
+          .allOf(futureList.toArray(new CompletableFuture[futureList.size()]))
+          .get();
+    } catch (Exception e) {
+      Assert.fail("testAllocateBlockInParallel failed");
+    }
+  }
+
   @Test
   public void testAllocateOversizedBlock() throws Exception {
     eventQueue.fireEvent(SCMEvents.SAFE_MODE_STATUS, safeModeStatus);

+ 45 - 2
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java

@@ -43,14 +43,20 @@ import org.junit.rules.ExpectedException;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Iterator;
-import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -144,6 +150,43 @@ public class TestSCMContainerManager {
     Assert.assertTrue(pipelineList.size() > 5);
   }
 
+  @Test
+  public void testAllocateContainerInParallel() throws Exception {
+    int threadCount = 20;
+    List<ExecutorService> executors = new ArrayList<>(threadCount);
+    for (int i = 0; i < threadCount; i++) {
+      executors.add(Executors.newSingleThreadExecutor());
+    }
+    List<CompletableFuture<ContainerInfo>> futureList =
+        new ArrayList<>(threadCount);
+    for (int i = 0; i < threadCount; i++) {
+      final CompletableFuture<ContainerInfo> future = new CompletableFuture<>();
+      CompletableFuture.supplyAsync(() -> {
+        try {
+          ContainerInfo containerInfo = containerManager
+              .allocateContainer(xceiverClientManager.getType(),
+                  xceiverClientManager.getFactor(), containerOwner);
+
+          Assert.assertNotNull(containerInfo);
+          Assert.assertNotNull(containerInfo.getPipelineID());
+          future.complete(containerInfo);
+          return containerInfo;
+        } catch (IOException e) {
+          future.completeExceptionally(e);
+        }
+        return future;
+      }, executors.get(i));
+      futureList.add(future);
+    }
+    try {
+      CompletableFuture
+          .allOf(futureList.toArray(new CompletableFuture[futureList.size()]))
+          .get();
+    } catch (Exception e) {
+      Assert.fail("testAllocateBlockInParallel failed");
+    }
+  }
+
   @Test
   public void testGetContainer() throws IOException {
     ContainerInfo containerInfo = containerManager.allocateContainer(