Explorar el Código

HDDS-1380. Add functonality to write from multiple clients in MiniOzoneChaosCluster. Contributed by Shashikant Banerjee.

Shashikant Banerjee hace 6 años
padre
commit
bfcb6534cd

+ 8 - 5
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneLoadGenerator.java

@@ -56,10 +56,11 @@ public class MiniOzoneLoadGenerator {
 
   private AtomicBoolean isWriteThreadRunning;
 
-  private final OzoneBucket ozoneBucket;
+  private final List<OzoneBucket> ozoneBuckets;
 
-  MiniOzoneLoadGenerator(OzoneBucket bucket, int numThreads, int numBuffers) {
-    this.ozoneBucket = bucket;
+  MiniOzoneLoadGenerator(List<OzoneBucket> bucket, int numThreads,
+      int numBuffers) {
+    this.ozoneBuckets = bucket;
     this.numWriteThreads = numThreads;
     this.numBuffers = numBuffers;
     this.writeExecutor = new ThreadPoolExecutor(numThreads, numThreads, 100,
@@ -94,7 +95,9 @@ public class MiniOzoneLoadGenerator {
       int bufferCapacity = buffer.capacity();
 
       String keyName = threadName + "-" + index;
-      try (OzoneOutputStream stream = ozoneBucket.createKey(keyName,
+      OzoneBucket bucket =
+          ozoneBuckets.get((int) (Math.random() * ozoneBuckets.size()));
+      try (OzoneOutputStream stream = bucket.createKey(keyName,
           bufferCapacity, ReplicationType.RATIS, ReplicationFactor.THREE,
           new HashMap<>())) {
         stream.write(buffer.array());
@@ -106,7 +109,7 @@ public class MiniOzoneLoadGenerator {
         //  to closed container. add a break here once that is fixed.
       }
 
-      try (OzoneInputStream stream = ozoneBucket.readKey(keyName)) {
+      try (OzoneInputStream stream = bucket.readKey(keyName)) {
         byte[] readBuffer = new byte[bufferCapacity];
         int readLen = stream.read(readBuffer);
 

+ 11 - 3
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniChaosOzoneCluster.java

@@ -29,7 +29,8 @@ import picocli.CommandLine.Command;
 import picocli.CommandLine.Option;
 import picocli.CommandLine;
 
-
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -55,6 +56,10 @@ public class TestMiniChaosOzoneCluster implements Runnable {
       description = "total run time")
   private static int numMinutes = 1440; // 1 day by default
 
+  @Option(names = {"-n", "--numClients"},
+      description = "no of clients writing to OM")
+  private static int numClients = 3;
+
   @Option(names = {"-i", "--failureInterval"},
       description = "time between failure events in seconds")
   private static int failureInterval = 5; // 5 second period between failures.
@@ -74,9 +79,12 @@ public class TestMiniChaosOzoneCluster implements Runnable {
     store.createVolume(volumeName);
     OzoneVolume volume = store.getVolume(volumeName);
     volume.createBucket(bucketName);
-    OzoneBucket ozoneBucket = volume.getBucket(bucketName);
+    List<OzoneBucket> ozoneBuckets = new ArrayList<>(numClients);
+    for (int i = 0; i < numClients; i++) {
+      ozoneBuckets.add(volume.getBucket(bucketName));
+    }
     loadGenerator =
-        new MiniOzoneLoadGenerator(ozoneBucket, numThreads, numBuffers);
+        new MiniOzoneLoadGenerator(ozoneBuckets, numThreads, numBuffers);
   }
 
   /**