فهرست منبع

HDDS-1785. OOM error in Freon due to the concurrency handling

Closes #1085
Doroszlai, Attila 5 سال پیش
والد
کامیت
256fcc160e
1فایلهای تغییر یافته به همراه174 افزوده شده و 134 حذف شده
  1. 174 134
      hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java

+ 174 - 134
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java

@@ -25,9 +25,11 @@ import java.io.PrintStream;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -95,8 +97,6 @@ public final class RandomKeyGenerator implements Callable<Void> {
     KEY_WRITE
   }
 
-  private static final String RATIS = "ratis";
-
   private static final String DURATION_FORMAT = "HH:mm:ss,SSS";
 
   private static final int QUANTILES = 10;
@@ -112,8 +112,8 @@ public final class RandomKeyGenerator implements Callable<Void> {
   private static final Logger LOG =
       LoggerFactory.getLogger(RandomKeyGenerator.class);
 
-  private boolean completed = false;
-  private Exception exception = null;
+  private volatile boolean completed = false;
+  private volatile Exception exception = null;
 
   @Option(names = "--numOfThreads",
       description = "number of threads to be launched for the run",
@@ -193,6 +193,14 @@ public final class RandomKeyGenerator implements Callable<Void> {
 
   private AtomicLong totalBytesWritten;
 
+  private int totalBucketCount;
+  private long totalKeyCount;
+  private AtomicInteger volumeCounter;
+  private AtomicInteger bucketCounter;
+  private AtomicLong keyCounter;
+  private Map<Integer, OzoneVolume> volumes;
+  private Map<Integer, OzoneBucket> buckets;
+
   private AtomicInteger numberOfVolumesCreated;
   private AtomicInteger numberOfBucketsCreated;
   private AtomicLong numberOfKeysAdded;
@@ -226,6 +234,11 @@ public final class RandomKeyGenerator implements Callable<Void> {
     numberOfVolumesCreated = new AtomicInteger();
     numberOfBucketsCreated = new AtomicInteger();
     numberOfKeysAdded = new AtomicLong();
+    volumeCounter = new AtomicInteger();
+    bucketCounter = new AtomicInteger();
+    keyCounter = new AtomicLong();
+    volumes = new ConcurrentHashMap<>();
+    buckets = new ConcurrentHashMap<>();
     ozoneClient = OzoneClientFactory.getClient(configuration);
     objectStore = ozoneClient.getObjectStore();
     for (FreonOps ops : FreonOps.values()) {
@@ -259,6 +272,9 @@ public final class RandomKeyGenerator implements Callable<Void> {
       }
     }
 
+    totalBucketCount = numOfVolumes * numOfBuckets;
+    totalKeyCount = totalBucketCount * numOfKeys;
+
     LOG.info("Number of Threads: " + numOfThreads);
     threadPoolSize = numOfThreads;
     executor = Executors.newFixedThreadPool(threadPoolSize);
@@ -269,9 +285,8 @@ public final class RandomKeyGenerator implements Callable<Void> {
     LOG.info("Number of Keys per Bucket: {}.", numOfKeys);
     LOG.info("Key size: {} bytes", keySize);
     LOG.info("Buffer size: {} bytes", bufferSize);
-    for (int i = 0; i < numOfVolumes; i++) {
-      String volumeName = "vol-" + i + "-" + RandomStringUtils.randomNumeric(5);
-      executor.submit(new VolumeProcessor(volumeName));
+    for (int i = 0; i < numOfThreads; i++) {
+      executor.submit(new ObjectCreator());
     }
 
     Thread validator = null;
@@ -286,22 +301,15 @@ public final class RandomKeyGenerator implements Callable<Void> {
       LOG.info("Data validation is enabled.");
     }
 
-    Supplier<Long> currentValue;
-    long maxValue;
-
-    currentValue = () -> numberOfKeysAdded.get();
-    maxValue = numOfVolumes *
-            numOfBuckets *
-            numOfKeys;
-
-    progressbar = new ProgressBar(System.out, maxValue, currentValue);
+    Supplier<Long> currentValue = numberOfKeysAdded::get;
+    progressbar = new ProgressBar(System.out, totalKeyCount, currentValue);
 
     LOG.info("Starting progress bar Thread.");
 
     progressbar.start();
 
     // wait until all keys are added or exception occurred.
-    while ((numberOfKeysAdded.get() != numOfVolumes * numOfBuckets * numOfKeys)
+    while ((numberOfKeysAdded.get() != totalKeyCount)
            && exception == null) {
       try {
         Thread.sleep(CHECK_INTERVAL_MILLIS);
@@ -570,7 +578,7 @@ public final class RandomKeyGenerator implements Callable<Void> {
      *
      * @param bucket    bucket part
      * @param keyName   key part
-     * @param keyName   digest of this key's full value
+     * @param digest    digest of this key's full value
      */
     KeyValidate(OzoneBucket bucket, String keyName, byte[] digest) {
       this.bucket = bucket;
@@ -579,146 +587,178 @@ public final class RandomKeyGenerator implements Callable<Void> {
     }
   }
 
-  private class VolumeProcessor implements Runnable {
-    private String volumeName;
-
-    VolumeProcessor(String volumeName) {
-      this.volumeName = volumeName;
-    }
-
+  private class ObjectCreator implements Runnable {
     @Override
-    @SuppressFBWarnings("REC_CATCH_EXCEPTION")
     public void run() {
-      LOG.trace("Creating volume: {}", volumeName);
-      long start = System.nanoTime();
-      OzoneVolume volume;
-      try (Scope scope = GlobalTracer.get().buildSpan("createVolume")
-          .startActive(true)) {
-        objectStore.createVolume(volumeName);
-        long volumeCreationDuration = System.nanoTime() - start;
-        volumeCreationTime.getAndAdd(volumeCreationDuration);
-        histograms.get(FreonOps.VOLUME_CREATE.ordinal())
-            .update(volumeCreationDuration);
-        numberOfVolumesCreated.getAndIncrement();
-        volume = objectStore.getVolume(volumeName);
-      } catch (IOException e) {
-        exception = e;
-        LOG.error("Could not create volume", e);
-        return;
+      int v;
+      while ((v = volumeCounter.getAndIncrement()) < numOfVolumes) {
+        if (!createVolume(v)) {
+          return;
+        }
+      }
+
+      int b;
+      while ((b = bucketCounter.getAndIncrement()) < totalBucketCount) {
+        if (!createBucket(b)) {
+          return;
+        }
       }
 
-      for (int i = 0; i < numOfBuckets; i++) {
-        String bucketName = "bucket-" + i + "-" +
-            RandomStringUtils.randomNumeric(5);
-        BucketProcessor bp = new BucketProcessor(volume, bucketName);
-        executor.submit(bp);
+      long k;
+      while ((k = keyCounter.getAndIncrement()) < totalKeyCount) {
+        if (!createKey(k)) {
+          return;
+        }
       }
     }
   }
 
-  private class BucketProcessor implements Runnable {
-    private OzoneVolume volume;
-    private String bucketName;
-
-    BucketProcessor(OzoneVolume volume, String bucketName) {
-      this.volume = volume;
-      this.bucketName = bucketName;
+  private boolean createVolume(int volumeNumber) {
+    String volumeName = "vol-" + volumeNumber + "-"
+        + RandomStringUtils.randomNumeric(5);
+    LOG.trace("Creating volume: {}", volumeName);
+    try (Scope ignored = GlobalTracer.get().buildSpan("createVolume")
+        .startActive(true)) {
+      long start = System.nanoTime();
+      objectStore.createVolume(volumeName);
+      long volumeCreationDuration = System.nanoTime() - start;
+      volumeCreationTime.getAndAdd(volumeCreationDuration);
+      histograms.get(FreonOps.VOLUME_CREATE.ordinal())
+          .update(volumeCreationDuration);
+      numberOfVolumesCreated.getAndIncrement();
+
+      OzoneVolume volume = objectStore.getVolume(volumeName);
+      volumes.put(volumeNumber, volume);
+      return true;
+    } catch (IOException e) {
+      exception = e;
+      LOG.error("Could not create volume", e);
+      return false;
     }
+  }
 
-    @Override
-    @SuppressFBWarnings("REC_CATCH_EXCEPTION")
-    public void run() {
-      LOG.trace("Creating bucket: {} in volume: {}",
-              bucketName, volume.getName());
+  private boolean createBucket(int globalBucketNumber) {
+    int volumeNumber = globalBucketNumber % numOfVolumes;
+    int bucketNumber = globalBucketNumber / numOfVolumes;
+    OzoneVolume volume = getVolume(volumeNumber);
+    if (volume == null) {
+      return false;
+    }
+    String bucketName = "bucket-" + bucketNumber + "-" +
+        RandomStringUtils.randomNumeric(5);
+    LOG.trace("Creating bucket: {} in volume: {}",
+        bucketName, volume.getName());
+    try (Scope ignored = GlobalTracer.get().buildSpan("createBucket")
+        .startActive(true)) {
       long start = System.nanoTime();
-      OzoneBucket bucket;
-      try (Scope scope = GlobalTracer.get().buildSpan("createBucket")
-          .startActive(true)) {
-        volume.createBucket(bucketName);
-        long bucketCreationDuration = System.nanoTime() - start;
-        histograms.get(FreonOps.BUCKET_CREATE.ordinal())
-                  .update(bucketCreationDuration);
-        bucketCreationTime.getAndAdd(bucketCreationDuration);
-        numberOfBucketsCreated.getAndIncrement();
-
-        bucket = volume.getBucket(bucketName);
-      } catch (IOException e) {
-        exception = e;
-        LOG.error("Could not create bucket ", e);
-        return;
-      }
-
-      for (int i = 0; i < numOfKeys; i++) {
-        String keyName = "key-" + i + "-" + RandomStringUtils.randomNumeric(5);
-        KeyProcessor kp = new KeyProcessor(bucket, keyName);
-        executor.submit(kp);
-      }
+      volume.createBucket(bucketName);
+      long bucketCreationDuration = System.nanoTime() - start;
+      histograms.get(FreonOps.BUCKET_CREATE.ordinal())
+          .update(bucketCreationDuration);
+      bucketCreationTime.getAndAdd(bucketCreationDuration);
+      numberOfBucketsCreated.getAndIncrement();
+
+      OzoneBucket bucket = volume.getBucket(bucketName);
+      buckets.put(globalBucketNumber, bucket);
+      return true;
+    } catch (IOException e) {
+      exception = e;
+      LOG.error("Could not create bucket ", e);
+      return false;
     }
   }
 
-  private class KeyProcessor implements Runnable {
-    private OzoneBucket bucket;
-    private String keyName;
-
-    KeyProcessor(OzoneBucket bucket, String keyName) {
-      this.bucket = bucket;
-      this.keyName = keyName;
+  @SuppressFBWarnings("REC_CATCH_EXCEPTION")
+  private boolean createKey(long globalKeyNumber) {
+    int globalBucketNumber = (int) (globalKeyNumber % totalBucketCount);
+    long keyNumber = globalKeyNumber / totalBucketCount;
+    OzoneBucket bucket = getBucket(globalBucketNumber);
+    if (bucket == null) {
+      return false;
     }
-
-    @Override
-    @SuppressFBWarnings("REC_CATCH_EXCEPTION")
-    public void run() {
-      String bucketName = bucket.getName();
-      String volumeName = bucket.getVolumeName();
-      LOG.trace("Adding key: {} in bucket: {} of volume: {}",
-          keyName, bucketName, volumeName);
-      byte[] randomValue = DFSUtil.string2Bytes(UUID.randomUUID().toString());
-      try {
+    String bucketName = bucket.getName();
+    String volumeName = bucket.getVolumeName();
+    String keyName = "key-" + keyNumber + "-"
+        + RandomStringUtils.randomNumeric(5);
+    LOG.trace("Adding key: {} in bucket: {} of volume: {}",
+        keyName, bucketName, volumeName);
+    byte[] randomValue = DFSUtil.string2Bytes(UUID.randomUUID().toString());
+    try {
+      try (Scope scope = GlobalTracer.get().buildSpan("createKey")
+          .startActive(true)) {
         long keyCreateStart = System.nanoTime();
-        try (Scope scope = GlobalTracer.get().buildSpan("createKey")
+        OzoneOutputStream os = bucket.createKey(keyName, keySize, type,
+            factor, new HashMap<>());
+        long keyCreationDuration = System.nanoTime() - keyCreateStart;
+        histograms.get(FreonOps.KEY_CREATE.ordinal())
+            .update(keyCreationDuration);
+        keyCreationTime.getAndAdd(keyCreationDuration);
+
+        try (Scope writeScope = GlobalTracer.get().buildSpan("writeKeyData")
             .startActive(true)) {
-          OzoneOutputStream os = bucket.createKey(keyName, keySize, type,
-                                                  factor, new HashMap<>());
-          long keyCreationDuration = System.nanoTime() - keyCreateStart;
-          histograms.get(FreonOps.KEY_CREATE.ordinal())
-                    .update(keyCreationDuration);
-          keyCreationTime.getAndAdd(keyCreationDuration);
-
           long keyWriteStart = System.nanoTime();
-          try (Scope writeScope = GlobalTracer.get().buildSpan("writeKeyData")
-              .startActive(true)) {
-            for (long nrRemaining = keySize - randomValue.length;
-                 nrRemaining > 0; nrRemaining -= bufferSize) {
-              int curSize = (int)Math.min(bufferSize, nrRemaining);
-              os.write(keyValueBuffer, 0, curSize);
-            }
-            os.write(randomValue);
-            os.close();
-
-            long keyWriteDuration = System.nanoTime() - keyWriteStart;
-            histograms.get(FreonOps.KEY_WRITE.ordinal())
-                      .update(keyWriteDuration);
-            keyWriteTime.getAndAdd(keyWriteDuration);
-            totalBytesWritten.getAndAdd(keySize);
-            numberOfKeysAdded.getAndIncrement();
+          for (long nrRemaining = keySize - randomValue.length;
+               nrRemaining > 0; nrRemaining -= bufferSize) {
+            int curSize = (int) Math.min(bufferSize, nrRemaining);
+            os.write(keyValueBuffer, 0, curSize);
           }
+          os.write(randomValue);
+          os.close();
+
+          long keyWriteDuration = System.nanoTime() - keyWriteStart;
+          histograms.get(FreonOps.KEY_WRITE.ordinal())
+              .update(keyWriteDuration);
+          keyWriteTime.getAndAdd(keyWriteDuration);
+          totalBytesWritten.getAndAdd(keySize);
+          numberOfKeysAdded.getAndIncrement();
         }
+      }
 
-        if (validateWrites) {
-          MessageDigest tmpMD = (MessageDigest)commonInitialMD.clone();
-          tmpMD.update(randomValue);
-          boolean validate = validationQueue.offer(
-                    new KeyValidate(bucket, keyName, tmpMD.digest()));
-          if (validate) {
-            LOG.trace("Key {}, is queued for validation.", keyName);
-          }
+      if (validateWrites) {
+        MessageDigest tmpMD = (MessageDigest) commonInitialMD.clone();
+        tmpMD.update(randomValue);
+        boolean validate = validationQueue.offer(
+            new KeyValidate(bucket, keyName, tmpMD.digest()));
+        if (validate) {
+          LOG.trace("Key {} is queued for validation.", keyName);
         }
-      } catch (Exception e) {
-        exception = e;
-        LOG.error("Exception while adding key: {} in bucket: {}" +
-            " of volume: {}.", keyName, bucketName, volumeName, e);
+      }
+
+      return true;
+    } catch (Exception e) {
+      exception = e;
+      LOG.error("Exception while adding key: {} in bucket: {}" +
+          " of volume: {}.", keyName, bucketName, volumeName, e);
+      return false;
+    }
+  }
+
+  private OzoneVolume getVolume(Integer volumeNumber) {
+    return waitUntilAddedToMap(volumes, volumeNumber);
+  }
+
+  private OzoneBucket getBucket(Integer bucketNumber) {
+    return waitUntilAddedToMap(buckets, bucketNumber);
+  }
+
+  /**
+   * Looks up volume or bucket from the cache.  Waits for it to be created if
+   * needed (can happen for the last few items depending on the number of
+   * threads).
+   *
+   * @return may return null if this thread is interrupted, or if any other
+   *   thread encounters an exception (and stores it to {@code exception})
+   */
+  private <T> T waitUntilAddedToMap(Map<Integer, T> map, Integer i) {
+    while (exception == null && !map.containsKey(i)) {
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return null;
       }
     }
+    return map.get(i);
   }
 
   private final class FreonJobInfo {