浏览代码

HDFS-12275. Ozone: Corona: Support for random validation of writes. Contributed by Nandakumar.

Anu Engineer 7 年之前
父节点
当前提交
2f4dfbc8fb

+ 231 - 24
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/tools/Corona.java

@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.ozone.tools;
 
-
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
@@ -27,10 +27,11 @@ import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -39,11 +40,15 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 
 /**
  * Corona - A tool to populate ozone with data for testing.<br>
@@ -81,6 +86,7 @@ public final class Corona extends Configured implements Tool {
   private static final String HELP = "help";
   private static final String MODE = "mode";
   private static final String SOURCE = "source";
+  private static final String VALIDATE_WRITE = "validateWrites";
   private static final String NUM_OF_THREADS = "numOfThreads";
   private static final String NUM_OF_VOLUMES = "numOfVolumes";
   private static final String NUM_OF_BUCKETS = "numOfBuckets";
@@ -109,6 +115,8 @@ public final class Corona extends Configured implements Tool {
   private String numOfBuckets;
   private String numOfKeys;
 
+  private boolean validateWrites;
+
   private OzoneClient ozoneClient;
   private ExecutorService processor;
 
@@ -125,7 +133,14 @@ public final class Corona extends Configured implements Tool {
   private AtomicInteger numberOfBucketsCreated;
   private AtomicLong numberOfKeysAdded;
 
-  private Corona(Configuration conf) throws IOException {
+  private Long totalWritesValidated;
+  private Long writeValidationSuccessCount;
+  private Long writeValidationFailureCount;
+
+  private BlockingQueue<KeyValue> validationQueue;
+
+  @VisibleForTesting
+  Corona(Configuration conf) throws IOException {
     startTime = System.nanoTime();
     volumeCreationTime = new AtomicLong();
     bucketCreationTime = new AtomicLong();
@@ -159,20 +174,35 @@ public final class Corona extends Configured implements Tool {
       LOG.info("Number of Volumes: {}.", numOfVolumes);
       LOG.info("Number of Buckets per Volume: {}.", numOfBuckets);
       LOG.info("Number of Keys per Bucket: {}.", numOfKeys);
-      for(int i = 0; i < Integer.parseInt(numOfVolumes); i++) {
+      for (int i = 0; i < Integer.parseInt(numOfVolumes); i++) {
         String volume = "vol-" + i + "-" +
             RandomStringUtils.randomNumeric(5);
         processor.submit(new OfflineProcessor(volume));
       }
-      Thread progressbar = getProgressBarThread();
-      LOG.info("Starting progress bar Thread.");
-      progressbar.start();
-      processor.shutdown();
-      processor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
-      completed = true;
-      progressbar.join();
-      return 0;
     }
+    Thread validator = null;
+    if(validateWrites) {
+      totalWritesValidated = 0L;
+      writeValidationSuccessCount = 0L;
+      writeValidationFailureCount = 0L;
+
+      validationQueue =
+          new ArrayBlockingQueue<>(Integer.parseInt(numOfThreads));
+      validator = new Thread(new Validator());
+      validator.start();
+      LOG.info("Data validation is enabled.");
+    }
+    Thread progressbar = getProgressBarThread();
+    LOG.info("Starting progress bar Thread.");
+    progressbar.start();
+    processor.shutdown();
+    processor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
+    completed = true;
+    progressbar.join();
+    if(validateWrites) {
+      validator.join();
+    }
+    return 0;
   }
 
   private Options getOzonePetaGenOptions() {
@@ -193,6 +223,10 @@ public final class Corona extends Configured implements Tool {
         "commoncrawl warc file to be used when the mode is online.");
     Option optSource = OptionBuilder.create(SOURCE);
 
+    OptionBuilder.withDescription("do random validation of " +
+        "data written into ozone, only subset of data is validated.");
+    Option optValidateWrite = OptionBuilder.create(VALIDATE_WRITE);
+
     OptionBuilder.withArgName("value");
     OptionBuilder.hasArg();
     OptionBuilder.withDescription("number of threads to be launched " +
@@ -220,6 +254,7 @@ public final class Corona extends Configured implements Tool {
     options.addOption(optHelp);
     options.addOption(optMode);
     options.addOption(optSource);
+    options.addOption(optValidateWrite);
     options.addOption(optNumOfThreads);
     options.addOption(optNumOfVolumes);
     options.addOption(optNumOfBuckets);
@@ -239,6 +274,8 @@ public final class Corona extends Configured implements Tool {
     numOfThreads = cmdLine.hasOption(NUM_OF_THREADS) ?
         cmdLine.getOptionValue(NUM_OF_THREADS) : NUM_OF_THREADS_DEFAULT;
 
+    validateWrites = cmdLine.hasOption(VALIDATE_WRITE);
+
     numOfVolumes = cmdLine.hasOption(NUM_OF_VOLUMES) ?
         cmdLine.getOptionValue(NUM_OF_VOLUMES) : NUM_OF_VOLUMES_DEFAULT;
 
@@ -253,6 +290,9 @@ public final class Corona extends Configured implements Tool {
     System.out.println("Options supported are:");
     System.out.println("-numOfThreads <value>           "
         + "number of threads to be launched for the run.");
+    System.out.println("-validateWrites                 "
+        + "do random validation of data written into ozone, " +
+        "only subset of data is validated.");
     System.out.println("-mode [online | offline]        "
         + "specifies the mode in which Corona should run.");
     System.out.println("-source <url>                   "
@@ -304,6 +344,7 @@ public final class Corona extends Configured implements Tool {
                 RandomStringUtils.randomNumeric(5);
             byte[] value = DFSUtil.string2Bytes(
                 RandomStringUtils.randomAscii(10240));
+
             try {
               LOG.trace("Adding key: {} in bucket: {} of volume: {}",
                   key, bucket, volume);
@@ -317,6 +358,13 @@ public final class Corona extends Configured implements Tool {
               keyWriteTime.getAndAdd(System.nanoTime() - keyWriteStart);
               totalBytesWritten.getAndAdd(value.length);
               numberOfKeysAdded.getAndIncrement();
+              if(validateWrites) {
+                boolean validate = validationQueue.offer(
+                    new KeyValue(volume, bucket, key, value));
+                if(validate) {
+                  LOG.trace("Key {}, is queued for validation.", key);
+                }
+              }
             } catch (Exception e) {
               exception = true;
               LOG.error("Exception while adding key: {} in bucket: {}" +
@@ -341,11 +389,19 @@ public final class Corona extends Configured implements Tool {
   }
 
   private Thread getProgressBarThread() {
-    long maxValue = Integer.parseInt(numOfVolumes) *
-        Integer.parseInt(numOfBuckets) *
-        Integer.parseInt(numOfKeys);
+    Supplier<Long> currentValue;
+    long maxValue;
+
+    if(mode.equals("online")) {
+      throw new UnsupportedOperationException("Not yet implemented.");
+    } else {
+      currentValue = () -> numberOfKeysAdded.get();
+      maxValue = Long.parseLong(numOfVolumes) *
+          Long.parseLong(numOfBuckets) *
+          Long.parseLong(numOfKeys);
+    }
     Thread progressBarThread = new Thread(
-        new ProgressBar(System.out, maxValue));
+        new ProgressBar(System.out, currentValue, maxValue));
     progressBarThread.setName("ProgressBar");
     return progressBarThread;
   }
@@ -355,10 +411,13 @@ public final class Corona extends Configured implements Tool {
     private static final long REFRESH_INTERVAL = 1000L;
 
     private PrintStream stream;
+    private Supplier<Long> currentValue;
     private long maxValue;
 
-    ProgressBar(PrintStream stream, long maxValue) {
+    ProgressBar(PrintStream stream, Supplier<Long> currentValue,
+                long maxValue) {
       this.stream = stream;
+      this.currentValue = currentValue;
       this.maxValue = maxValue;
     }
 
@@ -366,9 +425,9 @@ public final class Corona extends Configured implements Tool {
     public void run() {
       try {
         stream.println();
-        long keys;
-        while((keys = numberOfKeysAdded.get()) < maxValue) {
-          print(keys);
+        long value;
+        while((value = currentValue.get()) < maxValue) {
+          print(value);
           if(completed) {
             break;
           }
@@ -389,11 +448,11 @@ public final class Corona extends Configured implements Tool {
     /**
      * Given current value prints the progress bar.
      *
-     * @param currentValue
+     * @param value
      */
-    private void print(long currentValue) {
+    private void print(long value) {
       stream.print('\r');
-      double percent = 100.0 * currentValue / maxValue;
+      double percent = 100.0 * value / maxValue;
       StringBuilder sb = new StringBuilder();
       sb.append(" " + String.format("%.2f", percent) + "% |");
 
@@ -404,7 +463,7 @@ public final class Corona extends Configured implements Tool {
         sb.append(' ');
       }
       sb.append("|  ");
-      sb.append(currentValue + "/" + maxValue);
+      sb.append(value + "/" + maxValue);
       long timeInSec = TimeUnit.SECONDS.convert(
           System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
       String timeToPrint = String.format("%d:%02d:%02d", timeInSec / 3600,
@@ -494,10 +553,158 @@ public final class Corona extends Configured implements Tool {
     out.println("Time spent in key creation: " + prettyKeyCreationTime);
     out.println("Time spent in writing keys: " + prettyKeyWriteTime);
     out.println("Total bytes written: " + totalBytesWritten);
+    if(validateWrites) {
+      out.println("Total number of writes validated: " +
+          totalWritesValidated);
+      out.println("Writes validated: " +
+          (100.0 * totalWritesValidated / numberOfKeysAdded.get())
+          + " %");
+      out.println("Successful validation: " +
+          writeValidationSuccessCount);
+      out.println("Unsuccessful validation: " +
+          writeValidationFailureCount);
+    }
     out.println("Total Execution time: " + execTime);
     out.println("***************************************************");
   }
 
+  /**
+   * Returns the number of volumes created.
+   * @return volume count.
+   */
+  @VisibleForTesting
+  int getNumberOfVolumesCreated() {
+    return numberOfVolumesCreated.get();
+  }
+
+  /**
+   * Returns the number of buckets created.
+   * @return bucket count.
+   */
+  @VisibleForTesting
+  int getNumberOfBucketsCreated() {
+    return  numberOfBucketsCreated.get();
+  }
+
+  /**
+   * Returns the number of keys added.
+   * @return keys count.
+   */
+  @VisibleForTesting
+  long getNumberOfKeysAdded() {
+    return  numberOfKeysAdded.get();
+  }
+
+  /**
+   * Returns true if random validation of write is enabled.
+   * @return validateWrites
+   */
+  @VisibleForTesting
+  boolean getValidateWrites() {
+    return validateWrites;
+  }
+
+  /**
+   * Returns the number of keys validated.
+   * @return validated key count.
+   */
+  @VisibleForTesting
+  long getTotalKeysValidated() {
+    return totalWritesValidated;
+  }
+
+  /**
+   * Returns the number of successful validation.
+   * @return successful validation count.
+   */
+  @VisibleForTesting
+  long getSuccessfulValidationCount() {
+    return writeValidationSuccessCount;
+  }
+
+  /**
+   * Returns the number of unsuccessful validation.
+   * @return unsuccessful validation count.
+   */
+  @VisibleForTesting
+  long getUnsuccessfulValidationCount() {
+    return writeValidationFailureCount;
+  }
+
+  /**
+   * Validates the write done in ozone cluster.
+   */
+  private class Validator implements Runnable {
+
+    @Override
+    public void run() {
+      while(!completed) {
+        try {
+          KeyValue kv = validationQueue.poll(5, TimeUnit.SECONDS);
+          if(kv != null) {
+            OzoneInputStream is = ozoneClient.
+                getKey(kv.volume, kv.bucket, kv.key);
+            byte[] value = new byte[kv.value.length];
+            int length = is.read(value);
+            totalWritesValidated++;
+            if (length == kv.value.length && Arrays.equals(value, kv.value)) {
+              writeValidationSuccessCount++;
+            } else {
+              writeValidationFailureCount++;
+              LOG.warn("Data validation error for key {}/{}/{}",
+                  kv.volume, kv.bucket, kv.key);
+              LOG.warn("Expected: {}, Actual: {}",
+                  DFSUtil.bytes2String(kv.value),
+                  DFSUtil.bytes2String(value));
+            }
+          }
+        } catch (IOException | InterruptedException ex) {
+          LOG.error("Exception while validating write: " + ex.getMessage());
+        }
+      }
+    }
+  }
+
+
+
+  /**
+   * Wrapper to hold ozone key-value pair.
+   */
+  private static class KeyValue {
+
+    /**
+     * Volume name associated with the key-value.
+     */
+    private String volume;
+
+    /**
+     * Bucket name associated with the key-value.
+     */
+    private String bucket;
+    /**
+     * Key name associated with the key-value.
+     */
+    private String key;
+    /**
+     * Value associated with the key-value.
+     */
+    private byte[] value;
+
+    /**
+     * Constructs a new ozone key-value pair.
+     *
+     * @param key   key part
+     * @param value value part
+     */
+    KeyValue(
+        String volume, String bucket, String key, byte[] value) {
+      this.volume = volume;
+      this.bucket = bucket;
+      this.key = key;
+      this.value = value;
+    }
+  }
+
   /**
    * @param args arguments
    */

+ 117 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/TestCorona.java

@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.tools;
+
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests Corona, with MiniOzoneCluster.
+ */
+public class TestCorona {
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true and
+   * OZONE_HANDLER_TYPE_KEY = "distributed"
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
+        OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
+    cluster = new MiniOzoneCluster.Builder(conf)
+        .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void defaultTest() throws Exception {
+    List<String> args = new ArrayList<>();
+    args.add("-numOfVolumes");
+    args.add("2");
+    args.add("-numOfBuckets");
+    args.add("5");
+    args.add("-numOfKeys");
+    args.add("10");
+    Corona corona = new Corona(conf);
+    int res = ToolRunner.run(conf, corona,
+        args.toArray(new String[0]));
+    Assert.assertEquals(2, corona.getNumberOfVolumesCreated());
+    Assert.assertEquals(10, corona.getNumberOfBucketsCreated());
+    Assert.assertEquals(100, corona.getNumberOfKeysAdded());
+    Assert.assertEquals(0, res);
+  }
+
+  @Test
+  public void validateWriteTest() throws Exception {
+    PrintStream originalStream = System.out;
+    ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+    System.setOut(new PrintStream(outStream));
+    List<String> args = new ArrayList<>();
+    args.add("-validateWrites");
+    args.add("-numOfVolumes");
+    args.add("2");
+    args.add("-numOfBuckets");
+    args.add("5");
+    args.add("-numOfKeys");
+    args.add("10");
+    Corona corona = new Corona(conf);
+    int res = ToolRunner.run(conf, corona,
+        args.toArray(new String[0]));
+    Assert.assertEquals(0, res);
+    Assert.assertEquals(2, corona.getNumberOfVolumesCreated());
+    Assert.assertEquals(10, corona.getNumberOfBucketsCreated());
+    Assert.assertEquals(100, corona.getNumberOfKeysAdded());
+    Assert.assertTrue(corona.getValidateWrites());
+    Assert.assertNotEquals(0, corona.getTotalKeysValidated());
+    Assert.assertNotEquals(0, corona.getSuccessfulValidationCount());
+    Assert.assertEquals(0, corona.getUnsuccessfulValidationCount());
+    System.setOut(originalStream);
+  }
+
+}

+ 21 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/tools/package-info.java

@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.tools;
+/**
+ * Classes related to Ozone tools tests.
+ */