浏览代码

HDDS-1532. Improve the concurrent testing framework of Freon. (#957)

Xudong Cao 5 年之前
父节点
当前提交
3d020e914f

+ 129 - 91
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java

@@ -64,7 +64,6 @@ import com.fasterxml.jackson.annotation.PropertyAccessor;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
 import com.google.common.annotations.VisibleForTesting;
-import static java.lang.Math.min;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.time.DurationFormatUtils;
 import org.slf4j.Logger;
@@ -102,6 +101,8 @@ public final class RandomKeyGenerator implements Callable<Void> {
 
   private static final int QUANTILES = 10;
 
+  private static final int CHECK_INTERVAL_MILLIS = 5000;
+
   private byte[] keyValueBuffer = null;
 
   private static final String DIGEST_ALGORITHM = "MD5";
@@ -180,7 +181,7 @@ public final class RandomKeyGenerator implements Callable<Void> {
 
   private OzoneClient ozoneClient;
   private ObjectStore objectStore;
-  private ExecutorService processor;
+  private ExecutorService executor;
 
   private long startTime;
   private long jobStartTime;
@@ -259,9 +260,8 @@ public final class RandomKeyGenerator implements Callable<Void> {
     }
 
     LOG.info("Number of Threads: " + numOfThreads);
-    threadPoolSize =
-        min(numOfVolumes, numOfThreads);
-    processor = Executors.newFixedThreadPool(threadPoolSize);
+    threadPoolSize = numOfThreads;
+    executor = Executors.newFixedThreadPool(threadPoolSize);
     addShutdownHook();
 
     LOG.info("Number of Volumes: {}.", numOfVolumes);
@@ -270,9 +270,8 @@ public final class RandomKeyGenerator implements Callable<Void> {
     LOG.info("Key size: {} bytes", keySize);
     LOG.info("Buffer size: {} bytes", bufferSize);
     for (int i = 0; i < numOfVolumes; i++) {
-      String volume = "vol-" + i + "-" +
-          RandomStringUtils.randomNumeric(5);
-      processor.submit(new OfflineProcessor(volume));
+      String volumeName = "vol-" + i + "-" + RandomStringUtils.randomNumeric(5);
+      executor.submit(new VolumeProcessor(volumeName));
     }
 
     Thread validator = null;
@@ -301,8 +300,17 @@ public final class RandomKeyGenerator implements Callable<Void> {
 
     progressbar.start();
 
-    processor.shutdown();
-    processor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
+    // wait until all keys are added or exception occurred.
+    while ((numberOfKeysAdded.get() != numOfVolumes * numOfBuckets * numOfKeys)
+           && exception == null) {
+      try {
+        Thread.sleep(CHECK_INTERVAL_MILLIS);
+      } catch (InterruptedException e) {
+        throw e;
+      }
+    }
+    executor.shutdown();
+    executor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
     completed = true;
 
     if (exception != null) {
@@ -571,15 +579,10 @@ public final class RandomKeyGenerator implements Callable<Void> {
     }
   }
 
-  private class OfflineProcessor implements Runnable {
-
-    private int totalBuckets;
-    private int totalKeys;
+  private class VolumeProcessor implements Runnable {
     private String volumeName;
 
-    OfflineProcessor(String volumeName) {
-      this.totalBuckets = numOfBuckets;
-      this.totalKeys = numOfKeys;
+    VolumeProcessor(String volumeName) {
       this.volumeName = volumeName;
     }
 
@@ -604,88 +607,118 @@ public final class RandomKeyGenerator implements Callable<Void> {
         return;
       }
 
-      Long threadKeyWriteTime = 0L;
-      for (int j = 0; j < totalBuckets; j++) {
-        String bucketName = "bucket-" + j + "-" +
+      for (int i = 0; i < numOfBuckets; i++) {
+        String bucketName = "bucket-" + i + "-" +
             RandomStringUtils.randomNumeric(5);
-        try {
-          LOG.trace("Creating bucket: {} in volume: {}",
+        BucketProcessor bp = new BucketProcessor(volume, bucketName);
+        executor.submit(bp);
+      }
+    }
+  }
+
+  private class BucketProcessor implements Runnable {
+    private OzoneVolume volume;
+    private String bucketName;
+
+    BucketProcessor(OzoneVolume volume, String bucketName) {
+      this.volume = volume;
+      this.bucketName = bucketName;
+    }
+
+    @Override
+    @SuppressFBWarnings("REC_CATCH_EXCEPTION")
+    public void run() {
+      LOG.trace("Creating bucket: {} in volume: {}",
               bucketName, volume.getName());
-          start = System.nanoTime();
-          try (Scope scope = GlobalTracer.get().buildSpan("createBucket")
-              .startActive(true)) {
-            volume.createBucket(bucketName);
-            long bucketCreationDuration = System.nanoTime() - start;
-            histograms.get(FreonOps.BUCKET_CREATE.ordinal())
-                .update(bucketCreationDuration);
-            bucketCreationTime.getAndAdd(bucketCreationDuration);
-            numberOfBucketsCreated.getAndIncrement();
-          }
-          OzoneBucket bucket = volume.getBucket(bucketName);
-          for (int k = 0; k < totalKeys; k++) {
-            String key = "key-" + k + "-" +
-                RandomStringUtils.randomNumeric(5);
-            byte[] randomValue =
-                DFSUtil.string2Bytes(UUID.randomUUID().toString());
-            try {
-              LOG.trace("Adding key: {} in bucket: {} of volume: {}",
-                  key, bucket, volume);
-              long keyCreateStart = System.nanoTime();
-              try (Scope scope = GlobalTracer.get().buildSpan("createKey")
-                  .startActive(true)) {
-                OzoneOutputStream os =
-                    bucket
-                        .createKey(key, keySize, type, factor, new HashMap<>());
-                long keyCreationDuration = System.nanoTime() - keyCreateStart;
-                histograms.get(FreonOps.KEY_CREATE.ordinal())
+      long start = System.nanoTime();
+      OzoneBucket bucket;
+      try (Scope scope = GlobalTracer.get().buildSpan("createBucket")
+          .startActive(true)) {
+        volume.createBucket(bucketName);
+        long bucketCreationDuration = System.nanoTime() - start;
+        histograms.get(FreonOps.BUCKET_CREATE.ordinal())
+                  .update(bucketCreationDuration);
+        bucketCreationTime.getAndAdd(bucketCreationDuration);
+        numberOfBucketsCreated.getAndIncrement();
+
+        bucket = volume.getBucket(bucketName);
+      } catch (IOException e) {
+        exception = e;
+        LOG.error("Could not create bucket ", e);
+        return;
+      }
+
+      for (int i = 0; i < numOfKeys; i++) {
+        String keyName = "key-" + i + "-" + RandomStringUtils.randomNumeric(5);
+        KeyProcessor kp = new KeyProcessor(bucket, keyName);
+        executor.submit(kp);
+      }
+    }
+  }
+
+  private class KeyProcessor implements Runnable {
+    private OzoneBucket bucket;
+    private String keyName;
+
+    KeyProcessor(OzoneBucket bucket, String keyName) {
+      this.bucket = bucket;
+      this.keyName = keyName;
+    }
+
+    @Override
+    @SuppressFBWarnings("REC_CATCH_EXCEPTION")
+    public void run() {
+      String bucketName = bucket.getName();
+      String volumeName = bucket.getVolumeName();
+      LOG.trace("Adding key: {} in bucket: {} of volume: {}",
+          keyName, bucketName, volumeName);
+      byte[] randomValue = DFSUtil.string2Bytes(UUID.randomUUID().toString());
+      try {
+        long keyCreateStart = System.nanoTime();
+        try (Scope scope = GlobalTracer.get().buildSpan("createKey")
+            .startActive(true)) {
+          OzoneOutputStream os = bucket.createKey(keyName, keySize, type,
+                                                  factor, new HashMap<>());
+          long keyCreationDuration = System.nanoTime() - keyCreateStart;
+          histograms.get(FreonOps.KEY_CREATE.ordinal())
                     .update(keyCreationDuration);
-                keyCreationTime.getAndAdd(keyCreationDuration);
-                long keyWriteStart = System.nanoTime();
-                try (Scope writeScope = GlobalTracer.get()
-                    .buildSpan("writeKeyData")
-                    .startActive(true)) {
-                  for (long nrRemaining = keySize - randomValue.length;
-                        nrRemaining > 0; nrRemaining -= bufferSize) {
-                    int curSize = (int)Math.min(bufferSize, nrRemaining);
-                    os.write(keyValueBuffer, 0, curSize);
-                  }
-                  os.write(randomValue);
-                  os.close();
-                }
-
-                long keyWriteDuration = System.nanoTime() - keyWriteStart;
-
-                threadKeyWriteTime += keyWriteDuration;
-                histograms.get(FreonOps.KEY_WRITE.ordinal())
-                    .update(keyWriteDuration);
-                totalBytesWritten.getAndAdd(keySize);
-                numberOfKeysAdded.getAndIncrement();
-              }
-              if (validateWrites) {
-                MessageDigest tmpMD = (MessageDigest)commonInitialMD.clone();
-                tmpMD.update(randomValue);
-                boolean validate = validationQueue.offer(
-                    new KeyValidate(bucket, key, tmpMD.digest()));
-                if (validate) {
-                  LOG.trace("Key {}, is queued for validation.", key);
-                }
-              }
-            } catch (Exception e) {
-              exception = e;
-              LOG.error("Exception while adding key: {} in bucket: {}" +
-                  " of volume: {}.", key, bucket, volume, e);
+          keyCreationTime.getAndAdd(keyCreationDuration);
+
+          long keyWriteStart = System.nanoTime();
+          try (Scope writeScope = GlobalTracer.get().buildSpan("writeKeyData")
+              .startActive(true)) {
+            for (long nrRemaining = keySize - randomValue.length;
+                 nrRemaining > 0; nrRemaining -= bufferSize) {
+              int curSize = (int)Math.min(bufferSize, nrRemaining);
+              os.write(keyValueBuffer, 0, curSize);
             }
+            os.write(randomValue);
+            os.close();
+
+            long keyWriteDuration = System.nanoTime() - keyWriteStart;
+            histograms.get(FreonOps.KEY_WRITE.ordinal())
+                      .update(keyWriteDuration);
+            keyWriteTime.getAndAdd(keyWriteDuration);
+            totalBytesWritten.getAndAdd(keySize);
+            numberOfKeysAdded.getAndIncrement();
           }
-        } catch (Exception e) {
-          exception = e;
-          LOG.error("Exception while creating bucket: {}" +
-              " in volume: {}.", bucketName, volume, e);
         }
-      }
 
-      keyWriteTime.getAndAdd(threadKeyWriteTime);
+        if (validateWrites) {
+          MessageDigest tmpMD = (MessageDigest)commonInitialMD.clone();
+          tmpMD.update(randomValue);
+          boolean validate = validationQueue.offer(
+                    new KeyValidate(bucket, keyName, tmpMD.digest()));
+          if (validate) {
+            LOG.trace("Key {}, is queued for validation.", keyName);
+          }
+        }
+      } catch (Exception e) {
+        exception = e;
+        LOG.error("Exception while adding key: {} in bucket: {}" +
+            " of volume: {}.", keyName, bucketName, volumeName, e);
+      }
     }
-
   }
 
   private final class FreonJobInfo {
@@ -1028,4 +1061,9 @@ public final class RandomKeyGenerator implements Callable<Void> {
   public void setValidateWrites(boolean validateWrites) {
     this.validateWrites = validateWrites;
   }
+
+  @VisibleForTesting
+  public int getThreadPoolSize() {
+    return threadPoolSize;
+  }
 }

+ 15 - 0
hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java

@@ -127,4 +127,19 @@ public class TestRandomKeyGenerator {
     Assert.assertEquals(1, randomKeyGenerator.getNumberOfKeysAdded());
     Assert.assertEquals(1, randomKeyGenerator.getSuccessfulValidationCount());
   }
+
+  @Test
+  public void testThreadPoolSize() throws Exception {
+    RandomKeyGenerator randomKeyGenerator =
+        new RandomKeyGenerator((OzoneConfiguration) cluster.getConf());
+    randomKeyGenerator.setNumOfVolumes(1);
+    randomKeyGenerator.setNumOfBuckets(1);
+    randomKeyGenerator.setNumOfKeys(1);
+    randomKeyGenerator.setFactor(ReplicationFactor.THREE);
+    randomKeyGenerator.setType(ReplicationType.RATIS);
+    randomKeyGenerator.setNumOfThreads(10);
+    randomKeyGenerator.call();
+    Assert.assertEquals(10, randomKeyGenerator.getThreadPoolSize());
+    Assert.assertEquals(1, randomKeyGenerator.getNumberOfKeysAdded());
+  }
 }