Bladeren bron

HDDS-1372. getContainerWithPipeline for a standalone pipeline fails with ConcurrentModificationException. (#682)

Nanda kumar 6 jaren geleden
bovenliggende
commit
73f43ac2dc

+ 1 - 2
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java

@@ -32,7 +32,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashSet;
 import java.util.Set;
 import java.util.Collections;
 import java.util.Map;
@@ -138,7 +137,7 @@ public class ContainerStateMap {
       ownerMap.insert(info.getOwner(), id);
       factorMap.insert(info.getReplicationFactor(), id);
       typeMap.insert(info.getReplicationType(), id);
-      replicaMap.put(id, new HashSet<>());
+      replicaMap.put(id, ConcurrentHashMap.newKeySet());
 
       // Flush the cache of this container type, will be added later when
       // get container queries are executed.

+ 44 - 0
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java

@@ -44,12 +44,15 @@ import org.junit.rules.ExpectedException;
 import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
  * Tests for Container ContainerManager.
@@ -194,6 +197,47 @@ public class TestSCMContainerManager {
     Assert.assertTrue(replicaNodes.contains(dn1));
   }
 
+  @Test
+  public void testGetContainerReplicaWithParallelUpdate() throws Exception {
+    testGetContainerWithPipeline();
+    final Optional<ContainerID> id = containerManager.getContainerIDs()
+        .stream().findFirst();
+    Assert.assertTrue(id.isPresent());
+    final ContainerID cId = id.get();
+    final Optional<ContainerReplica> replica = containerManager
+        .getContainerReplicas(cId).stream().findFirst();
+    Assert.assertTrue(replica.isPresent());
+    final ContainerReplica cReplica = replica.get();
+    final AtomicBoolean runUpdaterThread =
+        new AtomicBoolean(true);
+
+    Thread updaterThread = new Thread(() -> {
+      while (runUpdaterThread.get()) {
+        try {
+          containerManager.removeContainerReplica(cId, cReplica);
+          containerManager.updateContainerReplica(cId, cReplica);
+        } catch (ContainerException e) {
+          Assert.fail("Container Exception: " + e.getMessage());
+        }
+      }
+    });
+
+    updaterThread.setDaemon(true);
+    updaterThread.start();
+
+    IntStream.range(0, 100).forEach(i -> {
+      try {
+        Assert.assertNotNull(containerManager
+            .getContainerReplicas(cId)
+            .stream().map(ContainerReplica::getDatanodeDetails)
+            .collect(Collectors.toSet()));
+      } catch (ContainerNotFoundException e) {
+        Assert.fail("Missing Container " + id);
+      }
+    });
+    runUpdaterThread.set(false);
+  }
+
   @Test
   public void testgetNoneExistentContainer() {
     try {