|
@@ -17,33 +17,37 @@
|
|
|
|
|
|
package org.apache.hadoop.ozone.tools;
|
|
|
|
|
|
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
|
|
|
+import com.fasterxml.jackson.annotation.PropertyAccessor;
|
|
|
+import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
+import com.fasterxml.jackson.databind.ObjectWriter;
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
import org.apache.commons.cli.CommandLine;
|
|
|
import org.apache.commons.cli.Option;
|
|
|
import org.apache.commons.cli.OptionBuilder;
|
|
|
import org.apache.commons.cli.Options;
|
|
|
+import org.apache.commons.lang.ArrayUtils;
|
|
|
import org.apache.commons.lang.RandomStringUtils;
|
|
|
+import org.apache.commons.lang.time.DurationFormatUtils;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.conf.Configured;
|
|
|
import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
import org.apache.hadoop.conf.OzoneConfiguration;
|
|
|
+import org.apache.hadoop.ozone.OzoneConsts;
|
|
|
+import org.apache.hadoop.ozone.client.*;
|
|
|
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
|
|
|
-import org.apache.hadoop.ozone.client.ObjectStore;
|
|
|
-import org.apache.hadoop.ozone.client.OzoneBucket;
|
|
|
-import org.apache.hadoop.ozone.client.OzoneClient;
|
|
|
-import org.apache.hadoop.ozone.client.OzoneClientFactory;
|
|
|
-import org.apache.hadoop.ozone.client.OzoneVolume;
|
|
|
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
|
|
+import org.apache.hadoop.util.*;
|
|
|
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
|
|
-import org.apache.hadoop.util.GenericOptionsParser;
|
|
|
-import org.apache.hadoop.util.Tool;
|
|
|
-import org.apache.hadoop.util.ToolRunner;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import java.io.FileNotFoundException;
|
|
|
+import java.io.FileOutputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.io.PrintStream;
|
|
|
-import java.util.Arrays;
|
|
|
+import java.text.SimpleDateFormat;
|
|
|
+import java.util.*;
|
|
|
import java.util.concurrent.ArrayBlockingQueue;
|
|
|
import java.util.concurrent.BlockingQueue;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
@@ -53,6 +57,9 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
import java.util.function.Supplier;
|
|
|
|
|
|
+import static java.lang.Math.max;
|
|
|
+import static java.lang.Math.min;
|
|
|
+
|
|
|
/**
|
|
|
* Corona - A tool to populate ozone with data for testing.<br>
|
|
|
* This is not a map-reduce program and this is not for benchmarking
|
|
@@ -90,6 +97,7 @@ public final class Corona extends Configured implements Tool {
|
|
|
private static final String MODE = "mode";
|
|
|
private static final String SOURCE = "source";
|
|
|
private static final String VALIDATE_WRITE = "validateWrites";
|
|
|
+ private static final String JSON_WRITE_DIRECTORY = "jsonDir";
|
|
|
private static final String NUM_OF_THREADS = "numOfThreads";
|
|
|
private static final String NUM_OF_VOLUMES = "numOfVolumes";
|
|
|
private static final String NUM_OF_BUCKETS = "numOfBuckets";
|
|
@@ -105,6 +113,7 @@ public final class Corona extends Configured implements Tool {
|
|
|
private static final String NUM_OF_VOLUMES_DEFAULT = "10";
|
|
|
private static final String NUM_OF_BUCKETS_DEFAULT = "1000";
|
|
|
private static final String NUM_OF_KEYS_DEFAULT = "500000";
|
|
|
+ private static final String DURATION_FORMAT = "HH:mm:ss,SSS";
|
|
|
|
|
|
private static final int KEY_SIZE_DEFAULT = 10240;
|
|
|
|
|
@@ -121,10 +130,12 @@ public final class Corona extends Configured implements Tool {
|
|
|
private String numOfVolumes;
|
|
|
private String numOfBuckets;
|
|
|
private String numOfKeys;
|
|
|
+ private String jsonDir;
|
|
|
private boolean useRatis;
|
|
|
private int replicationFactor = 0;
|
|
|
|
|
|
private int keySize;
|
|
|
+ private byte[] keyValue = null;
|
|
|
|
|
|
private boolean validateWrites;
|
|
|
|
|
@@ -133,6 +144,7 @@ public final class Corona extends Configured implements Tool {
|
|
|
private ExecutorService processor;
|
|
|
|
|
|
private long startTime;
|
|
|
+ private long jobStartTime;
|
|
|
|
|
|
private AtomicLong volumeCreationTime;
|
|
|
private AtomicLong bucketCreationTime;
|
|
@@ -150,10 +162,12 @@ public final class Corona extends Configured implements Tool {
|
|
|
private Long writeValidationFailureCount;
|
|
|
|
|
|
private BlockingQueue<KeyValue> validationQueue;
|
|
|
+ private List<Double> threadThroughput;
|
|
|
|
|
|
@VisibleForTesting
|
|
|
Corona(Configuration conf) throws IOException {
|
|
|
startTime = System.nanoTime();
|
|
|
+ jobStartTime = System.currentTimeMillis();
|
|
|
volumeCreationTime = new AtomicLong();
|
|
|
bucketCreationTime = new AtomicLong();
|
|
|
keyCreationTime = new AtomicLong();
|
|
@@ -185,6 +199,12 @@ public final class Corona extends Configured implements Tool {
|
|
|
usage();
|
|
|
return 0;
|
|
|
}
|
|
|
+
|
|
|
+ threadThroughput = Collections.synchronizedList(new ArrayList<Double>());
|
|
|
+
|
|
|
+ keyValue =
|
|
|
+ DFSUtil.string2Bytes(RandomStringUtils.randomAscii(keySize - 36));
|
|
|
+
|
|
|
LOG.info("Number of Threads: " + numOfThreads);
|
|
|
processor = Executors.newFixedThreadPool(Integer.parseInt(numOfThreads));
|
|
|
addShutdownHook();
|
|
@@ -251,6 +271,11 @@ public final class Corona extends Configured implements Tool {
|
|
|
"data written into ozone, only subset of data is validated.");
|
|
|
Option optValidateWrite = OptionBuilder.create(VALIDATE_WRITE);
|
|
|
|
|
|
+
|
|
|
+ OptionBuilder.withDescription("directory where json is created");
|
|
|
+ OptionBuilder.hasArg();
|
|
|
+ Option optJsonDir = OptionBuilder.create(JSON_WRITE_DIRECTORY);
|
|
|
+
|
|
|
OptionBuilder.withArgName("value");
|
|
|
OptionBuilder.hasArg();
|
|
|
OptionBuilder.withDescription("number of threads to be launched " +
|
|
@@ -291,6 +316,7 @@ public final class Corona extends Configured implements Tool {
|
|
|
options.addOption(optMode);
|
|
|
options.addOption(optSource);
|
|
|
options.addOption(optValidateWrite);
|
|
|
+ options.addOption(optJsonDir);
|
|
|
options.addOption(optNumOfThreads);
|
|
|
options.addOption(optNumOfVolumes);
|
|
|
options.addOption(optNumOfBuckets);
|
|
@@ -303,33 +329,37 @@ public final class Corona extends Configured implements Tool {
|
|
|
private void parseOptions(CommandLine cmdLine) {
|
|
|
printUsage = cmdLine.hasOption(HELP);
|
|
|
|
|
|
- mode = cmdLine.hasOption(MODE) ?
|
|
|
- cmdLine.getOptionValue(MODE) : MODE_DEFAULT;
|
|
|
+ mode = cmdLine.getOptionValue(MODE, MODE_DEFAULT);
|
|
|
|
|
|
- source = cmdLine.hasOption(SOURCE) ?
|
|
|
- cmdLine.getOptionValue(SOURCE) : SOURCE_DEFAULT;
|
|
|
+ source = cmdLine.getOptionValue(SOURCE, SOURCE_DEFAULT);
|
|
|
|
|
|
- numOfThreads = cmdLine.hasOption(NUM_OF_THREADS) ?
|
|
|
- cmdLine.getOptionValue(NUM_OF_THREADS) : NUM_OF_THREADS_DEFAULT;
|
|
|
+ numOfThreads =
|
|
|
+ cmdLine.getOptionValue(NUM_OF_THREADS, NUM_OF_THREADS_DEFAULT);
|
|
|
|
|
|
validateWrites = cmdLine.hasOption(VALIDATE_WRITE);
|
|
|
|
|
|
- numOfVolumes = cmdLine.hasOption(NUM_OF_VOLUMES) ?
|
|
|
- cmdLine.getOptionValue(NUM_OF_VOLUMES) : NUM_OF_VOLUMES_DEFAULT;
|
|
|
+ jsonDir = cmdLine.getOptionValue(JSON_WRITE_DIRECTORY);
|
|
|
|
|
|
- numOfBuckets = cmdLine.hasOption(NUM_OF_BUCKETS) ?
|
|
|
- cmdLine.getOptionValue(NUM_OF_BUCKETS) : NUM_OF_BUCKETS_DEFAULT;
|
|
|
+ numOfVolumes =
|
|
|
+ cmdLine.getOptionValue(NUM_OF_VOLUMES, NUM_OF_VOLUMES_DEFAULT);
|
|
|
|
|
|
- numOfKeys = cmdLine.hasOption(NUM_OF_KEYS) ?
|
|
|
- cmdLine.getOptionValue(NUM_OF_KEYS) : NUM_OF_KEYS_DEFAULT;
|
|
|
+ numOfBuckets =
|
|
|
+ cmdLine.getOptionValue(NUM_OF_BUCKETS, NUM_OF_BUCKETS_DEFAULT);
|
|
|
+
|
|
|
+ numOfKeys = cmdLine.getOptionValue(NUM_OF_KEYS, NUM_OF_KEYS_DEFAULT);
|
|
|
|
|
|
keySize = cmdLine.hasOption(KEY_SIZE) ?
|
|
|
Integer.parseInt(cmdLine.getOptionValue(KEY_SIZE)) : KEY_SIZE_DEFAULT;
|
|
|
+ if (keySize < 1024) {
|
|
|
+ throw new IllegalArgumentException(
|
|
|
+ "keySize can not be less than 1024 bytes");
|
|
|
+ }
|
|
|
+
|
|
|
useRatis = cmdLine.hasOption(RATIS);
|
|
|
|
|
|
//To-do if replication factor is not mentioned throw an exception
|
|
|
- replicationFactor = useRatis ?
|
|
|
- Integer.parseInt(cmdLine.getOptionValue(RATIS)) : 0;
|
|
|
+ replicationFactor =
|
|
|
+ useRatis ? Integer.parseInt(cmdLine.getOptionValue(RATIS)) : 0;
|
|
|
}
|
|
|
|
|
|
private void usage() {
|
|
@@ -339,6 +369,8 @@ public final class Corona extends Configured implements Tool {
|
|
|
System.out.println("-validateWrites "
|
|
|
+ "do random validation of data written into ozone, " +
|
|
|
"only subset of data is validated.");
|
|
|
+ System.out.println("-jsonDir "
|
|
|
+ + "directory where json is created.");
|
|
|
System.out.println("-mode [online | offline] "
|
|
|
+ "specifies the mode in which Corona should run.");
|
|
|
System.out.println("-source <url> "
|
|
@@ -394,76 +426,60 @@ public final class Corona extends Configured implements Tool {
|
|
|
int threadCount = Integer.parseInt(numOfThreads);
|
|
|
|
|
|
long endTime = System.nanoTime() - startTime;
|
|
|
- String execTime = String.format("%02d:%02d:%02d",
|
|
|
- TimeUnit.NANOSECONDS.toHours(endTime),
|
|
|
- TimeUnit.NANOSECONDS.toMinutes(endTime) -
|
|
|
- TimeUnit.HOURS.toMinutes(
|
|
|
- TimeUnit.NANOSECONDS.toHours(endTime)),
|
|
|
- TimeUnit.NANOSECONDS.toSeconds(endTime) -
|
|
|
- TimeUnit.MINUTES.toSeconds(
|
|
|
- TimeUnit.NANOSECONDS.toMinutes(endTime)));
|
|
|
-
|
|
|
- long volumeTime = volumeCreationTime.longValue();
|
|
|
- String prettyVolumeTime = String.format("%02d:%02d:%02d:%02d",
|
|
|
- TimeUnit.NANOSECONDS.toHours(volumeTime),
|
|
|
- TimeUnit.NANOSECONDS.toMinutes(volumeTime) -
|
|
|
- TimeUnit.HOURS.toMinutes(
|
|
|
- TimeUnit.NANOSECONDS.toHours(volumeTime)),
|
|
|
- TimeUnit.NANOSECONDS.toSeconds(volumeTime) -
|
|
|
- TimeUnit.MINUTES.toSeconds(
|
|
|
- TimeUnit.NANOSECONDS.toMinutes(volumeTime)),
|
|
|
- TimeUnit.NANOSECONDS.toMillis(volumeTime) -
|
|
|
- TimeUnit.SECONDS.toMillis(
|
|
|
- TimeUnit.NANOSECONDS.toSeconds(volumeTime)));
|
|
|
-
|
|
|
- long bucketTime = bucketCreationTime.longValue() / threadCount;
|
|
|
- String prettyBucketTime = String.format("%02d:%02d:%02d:%02d",
|
|
|
- TimeUnit.NANOSECONDS.toHours(bucketTime),
|
|
|
- TimeUnit.NANOSECONDS.toMinutes(bucketTime) -
|
|
|
- TimeUnit.HOURS.toMinutes(
|
|
|
- TimeUnit.NANOSECONDS.toHours(bucketTime)),
|
|
|
- TimeUnit.NANOSECONDS.toSeconds(bucketTime) -
|
|
|
- TimeUnit.MINUTES.toSeconds(
|
|
|
- TimeUnit.NANOSECONDS.toMinutes(bucketTime)),
|
|
|
- TimeUnit.NANOSECONDS.toMillis(bucketTime) -
|
|
|
- TimeUnit.SECONDS.toMillis(
|
|
|
- TimeUnit.NANOSECONDS.toSeconds(bucketTime)));
|
|
|
-
|
|
|
- long totalKeyCreationTime = keyCreationTime.longValue() / threadCount;
|
|
|
- String prettyKeyCreationTime = String.format("%02d:%02d:%02d:%02d",
|
|
|
- TimeUnit.NANOSECONDS.toHours(totalKeyCreationTime),
|
|
|
- TimeUnit.NANOSECONDS.toMinutes(totalKeyCreationTime) -
|
|
|
- TimeUnit.HOURS.toMinutes(
|
|
|
- TimeUnit.NANOSECONDS.toHours(totalKeyCreationTime)),
|
|
|
- TimeUnit.NANOSECONDS.toSeconds(totalKeyCreationTime) -
|
|
|
- TimeUnit.MINUTES.toSeconds(
|
|
|
- TimeUnit.NANOSECONDS.toMinutes(totalKeyCreationTime)),
|
|
|
- TimeUnit.NANOSECONDS.toMillis(totalKeyCreationTime) -
|
|
|
- TimeUnit.SECONDS.toMillis(
|
|
|
- TimeUnit.NANOSECONDS.toSeconds(totalKeyCreationTime)));
|
|
|
-
|
|
|
- long totalKeyWriteTime = keyWriteTime.longValue() / threadCount;
|
|
|
- String prettyKeyWriteTime = String.format("%02d:%02d:%02d:%02d",
|
|
|
- TimeUnit.NANOSECONDS.toHours(totalKeyWriteTime),
|
|
|
- TimeUnit.NANOSECONDS.toMinutes(totalKeyWriteTime) -
|
|
|
- TimeUnit.HOURS.toMinutes(
|
|
|
- TimeUnit.NANOSECONDS.toHours(totalKeyWriteTime)),
|
|
|
- TimeUnit.NANOSECONDS.toSeconds(totalKeyWriteTime) -
|
|
|
- TimeUnit.MINUTES.toSeconds(
|
|
|
- TimeUnit.NANOSECONDS.toMinutes(totalKeyWriteTime)),
|
|
|
- TimeUnit.NANOSECONDS.toMillis(totalKeyWriteTime) -
|
|
|
- TimeUnit.SECONDS.toMillis(
|
|
|
- TimeUnit.NANOSECONDS.toSeconds(totalKeyWriteTime)));
|
|
|
+ String execTime = DurationFormatUtils
|
|
|
+ .formatDuration(TimeUnit.NANOSECONDS.toMillis(endTime),
|
|
|
+ DURATION_FORMAT);
|
|
|
+ String prettyTotalVolumeTime = DurationFormatUtils
|
|
|
+ .formatDuration(TimeUnit.NANOSECONDS.toMillis(volumeCreationTime.get()),
|
|
|
+ DURATION_FORMAT);
|
|
|
+ String prettyTotalBucketTime = DurationFormatUtils
|
|
|
+ .formatDuration(TimeUnit.NANOSECONDS.toMillis(bucketCreationTime.get()),
|
|
|
+ DURATION_FORMAT);
|
|
|
+ String prettyTotalKeyCreationTime = DurationFormatUtils
|
|
|
+ .formatDuration(TimeUnit.NANOSECONDS.toMillis(keyCreationTime.get()),
|
|
|
+ DURATION_FORMAT);
|
|
|
+ String prettyTotalKeyWriteTime = DurationFormatUtils
|
|
|
+ .formatDuration(TimeUnit.NANOSECONDS.toMillis(keyWriteTime.get()),
|
|
|
+ DURATION_FORMAT);
|
|
|
+
|
|
|
+ long volumeTime =
|
|
|
+ TimeUnit.NANOSECONDS.toMillis(volumeCreationTime.get()) / threadCount;
|
|
|
+ String prettyAverageVolumeTime =
|
|
|
+ DurationFormatUtils.formatDuration(volumeTime, DURATION_FORMAT);
|
|
|
+
|
|
|
+ long bucketTime =
|
|
|
+ TimeUnit.NANOSECONDS.toMillis(bucketCreationTime.get()) / threadCount;
|
|
|
+ String prettyAverageBucketTime =
|
|
|
+ DurationFormatUtils.formatDuration(bucketTime, DURATION_FORMAT);
|
|
|
+
|
|
|
+ long averageKeyCreationTime =
|
|
|
+ TimeUnit.NANOSECONDS.toMillis(keyCreationTime.get()) / threadCount;
|
|
|
+ String prettyAverageKeyCreationTime = DurationFormatUtils
|
|
|
+ .formatDuration(averageKeyCreationTime, DURATION_FORMAT);
|
|
|
+
|
|
|
+ long averageKeyWriteTime =
|
|
|
+ TimeUnit.NANOSECONDS.toMillis(keyWriteTime.get()) / threadCount;
|
|
|
+ String prettyAverageKeyWriteTime = DurationFormatUtils
|
|
|
+ .formatDuration(averageKeyWriteTime, DURATION_FORMAT);
|
|
|
|
|
|
out.println();
|
|
|
out.println("***************************************************");
|
|
|
+ out.println("Git Base Revision: " + VersionInfo.getRevision());
|
|
|
out.println("Number of Volumes created: " + numberOfVolumesCreated);
|
|
|
out.println("Number of Buckets created: " + numberOfBucketsCreated);
|
|
|
out.println("Number of Keys added: " + numberOfKeysAdded);
|
|
|
- out.println("Time spent in volume creation: " + prettyVolumeTime);
|
|
|
- out.println("Time spent in bucket creation: " + prettyBucketTime);
|
|
|
- out.println("Time spent in key creation: " + prettyKeyCreationTime);
|
|
|
- out.println("Time spent in writing keys: " + prettyKeyWriteTime);
|
|
|
+ out.println("Time spent in volume creation: " + prettyTotalVolumeTime);
|
|
|
+ out.println("Time spent in bucket creation: " + prettyTotalBucketTime);
|
|
|
+ out.println("Time spent in key creation: " + prettyTotalKeyCreationTime);
|
|
|
+ out.println("Time spent in key write: " + prettyTotalKeyWriteTime);
|
|
|
+ out.println(
|
|
|
+ "Average Time spent in volume creation: " + prettyAverageVolumeTime);
|
|
|
+ out.println(
|
|
|
+ "Average Time spent in bucket creation: " + prettyAverageBucketTime);
|
|
|
+ out.println(
|
|
|
+ "Average Time spent in key creation: " + prettyAverageKeyCreationTime);
|
|
|
+ out.println(
|
|
|
+ "Average Time spent in key write: " + prettyAverageKeyWriteTime);
|
|
|
out.println("Total bytes written: " + totalBytesWritten);
|
|
|
if (validateWrites) {
|
|
|
out.println("Total number of writes validated: " +
|
|
@@ -478,6 +494,46 @@ public final class Corona extends Configured implements Tool {
|
|
|
}
|
|
|
out.println("Total Execution time: " + execTime);
|
|
|
out.println("***************************************************");
|
|
|
+
|
|
|
+ if (jsonDir != null) {
|
|
|
+ CoronaJobInfo jobInfo = new CoronaJobInfo()
|
|
|
+ .setExecTime(execTime)
|
|
|
+ .setGitBaseRevision(VersionInfo.getRevision())
|
|
|
+ .setAverageVolumeCreationTime(prettyAverageVolumeTime)
|
|
|
+ .setAverageBucketCreationTime(prettyAverageBucketTime)
|
|
|
+ .setAverageKeyCreationTime(prettyAverageKeyCreationTime)
|
|
|
+ .setAverageKeyWriteTime(prettyAverageKeyWriteTime)
|
|
|
+ .setTotalVolumeCreationTime(prettyTotalVolumeTime)
|
|
|
+ .setTotalBucketCreationTime(prettyTotalBucketTime)
|
|
|
+ .setTotalKeyCreationTime(prettyTotalKeyCreationTime)
|
|
|
+ .setTotalKeyWriteTime(prettyTotalKeyWriteTime);
|
|
|
+ String jsonName =
|
|
|
+ new SimpleDateFormat("yyyyMMddHHmmss").format(Time.now()) + ".json";
|
|
|
+ String jsonPath = jsonDir + "/" + jsonName;
|
|
|
+ FileOutputStream os = null;
|
|
|
+ try {
|
|
|
+ os = new FileOutputStream(jsonPath);
|
|
|
+ ObjectMapper mapper = new ObjectMapper();
|
|
|
+ mapper.setVisibility(PropertyAccessor.FIELD,
|
|
|
+ JsonAutoDetect.Visibility.ANY);
|
|
|
+ ObjectWriter writer = mapper.writerWithDefaultPrettyPrinter();
|
|
|
+ writer.writeValue(os, jobInfo);
|
|
|
+ } catch (FileNotFoundException e) {
|
|
|
+ out.println("Json File could not be created for the path: " + jsonPath);
|
|
|
+ out.println(e);
|
|
|
+ } catch (IOException e) {
|
|
|
+ out.println("Json object could not be created");
|
|
|
+ out.println(e);
|
|
|
+ } finally {
|
|
|
+ try {
|
|
|
+ if (os != null) {
|
|
|
+ os.close();
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("Could not close the output stream for json", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -543,6 +599,15 @@ public final class Corona extends Configured implements Tool {
|
|
|
return writeValidationFailureCount;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Returns the length of the common key value initialized.
|
|
|
+ * @return key value length initialized.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ long getKeyValueLength(){
|
|
|
+ return keyValue.length;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Wrapper to hold ozone key-value pair.
|
|
|
*/
|
|
@@ -602,8 +667,9 @@ public final class Corona extends Configured implements Tool {
|
|
|
factor = replicationFactor != 0 ?
|
|
|
OzoneProtos.ReplicationFactor.valueOf(replicationFactor) :
|
|
|
OzoneProtos.ReplicationFactor.THREE;
|
|
|
-
|
|
|
}
|
|
|
+
|
|
|
+ Long threadKeyWriteTime = 0L;
|
|
|
for (int j = 0; j < totalBuckets; j++) {
|
|
|
String bucketName = "bucket-" + j + "-" +
|
|
|
RandomStringUtils.randomNumeric(5);
|
|
@@ -618,22 +684,24 @@ public final class Corona extends Configured implements Tool {
|
|
|
for (int k = 0; k < totalKeys; k++) {
|
|
|
String key = "key-" + k + "-" +
|
|
|
RandomStringUtils.randomNumeric(5);
|
|
|
- byte[] value = DFSUtil.string2Bytes(
|
|
|
- RandomStringUtils.randomAscii(keySize));
|
|
|
+ byte[] randomValue =
|
|
|
+ DFSUtil.string2Bytes(UUID.randomUUID().toString());
|
|
|
try {
|
|
|
LOG.trace("Adding key: {} in bucket: {} of volume: {}",
|
|
|
key, bucket, volume);
|
|
|
long keyCreateStart = System.nanoTime();
|
|
|
- OzoneOutputStream os = bucket.createKey(key, value.length,
|
|
|
- type, factor);
|
|
|
+ OzoneOutputStream os =
|
|
|
+ bucket.createKey(key, keySize, type, factor);
|
|
|
keyCreationTime.getAndAdd(System.nanoTime() - keyCreateStart);
|
|
|
long keyWriteStart = System.nanoTime();
|
|
|
- os.write(value);
|
|
|
+ os.write(keyValue);
|
|
|
+ os.write(randomValue);
|
|
|
os.close();
|
|
|
- keyWriteTime.getAndAdd(System.nanoTime() - keyWriteStart);
|
|
|
- totalBytesWritten.getAndAdd(value.length);
|
|
|
+ threadKeyWriteTime += System.nanoTime() - keyWriteStart;
|
|
|
+ totalBytesWritten.getAndAdd(keySize);
|
|
|
numberOfKeysAdded.getAndIncrement();
|
|
|
if (validateWrites) {
|
|
|
+ byte[] value = ArrayUtils.addAll(keyValue, randomValue);
|
|
|
boolean validate = validationQueue.offer(
|
|
|
new KeyValue(bucket, key, value));
|
|
|
if (validate) {
|
|
@@ -652,7 +720,238 @@ public final class Corona extends Configured implements Tool {
|
|
|
" in volume: {}.", bucketName, volume, e);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ keyWriteTime.getAndAdd(threadKeyWriteTime);
|
|
|
+ boolean success = threadThroughput.add(
|
|
|
+ (totalBuckets * totalKeys * keySize * 1.0) / TimeUnit.NANOSECONDS
|
|
|
+ .toSeconds(threadKeyWriteTime));
|
|
|
+ if (!success) {
|
|
|
+ LOG.warn("Throughput could not be added for thread id: {}",
|
|
|
+ Thread.currentThread().getId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private final class CoronaJobInfo {
|
|
|
+
|
|
|
+ private String gitBaseRevision;
|
|
|
+ private String jobStartTime;
|
|
|
+ private String numOfVolumes;
|
|
|
+ private String numOfBuckets;
|
|
|
+ private String numOfKeys;
|
|
|
+ private String numOfThreads;
|
|
|
+ private String mode;
|
|
|
+ private String totalBucketCreationTime;
|
|
|
+ private String totalVolumeCreationTime;
|
|
|
+ private String totalKeyCreationTime;
|
|
|
+ private String totalKeyWriteTime;
|
|
|
+ private String averageBucketCreationTime;
|
|
|
+ private String averageVolumeCreationTime;
|
|
|
+ private String averageKeyCreationTime;
|
|
|
+ private String averageKeyWriteTime;
|
|
|
+ private String dataWritten;
|
|
|
+ private String execTime;
|
|
|
+
|
|
|
+ private int keySize;
|
|
|
+
|
|
|
+ private String[] threadThroughputPerSecond;
|
|
|
+ private String minThreadThroughputPerSecond;
|
|
|
+ private String maxThreadThroughputPerSecond;
|
|
|
+ private String totalThroughputPerSecond;
|
|
|
+
|
|
|
+ private CoronaJobInfo() {
|
|
|
+ this.numOfVolumes = Corona.this.numOfVolumes;
|
|
|
+ this.numOfBuckets = Corona.this.numOfBuckets;
|
|
|
+ this.numOfKeys = Corona.this.numOfKeys;
|
|
|
+ this.numOfThreads = Corona.this.numOfThreads;
|
|
|
+ this.keySize = Corona.this.keySize;
|
|
|
+ this.mode = Corona.this.mode;
|
|
|
+ this.jobStartTime = Time.formatTime(Corona.this.jobStartTime);
|
|
|
+
|
|
|
+ long totalBytes =
|
|
|
+ Long.parseLong(numOfVolumes) * Long.parseLong(numOfBuckets) * Long
|
|
|
+ .parseLong(numOfKeys) * keySize;
|
|
|
+ this.dataWritten = getInStorageUnits((double) totalBytes);
|
|
|
+
|
|
|
+ threadThroughputPerSecond = new String[Integer.parseInt(numOfThreads)];
|
|
|
+ double minThreadThroughput = Double.MAX_VALUE, maxThreadThroughput = 0.0,
|
|
|
+ totalThroughput = 0.0;
|
|
|
+ int i = 0;
|
|
|
+ for (Double throughput : Corona.this.threadThroughput) {
|
|
|
+ minThreadThroughput = min(throughput, minThreadThroughput);
|
|
|
+ maxThreadThroughput = max(throughput, maxThreadThroughput);
|
|
|
+ totalThroughput += throughput;
|
|
|
+ threadThroughputPerSecond[i++] = getInStorageUnits(throughput);
|
|
|
+ }
|
|
|
+ minThreadThroughputPerSecond = getInStorageUnits(minThreadThroughput);
|
|
|
+ maxThreadThroughputPerSecond = getInStorageUnits(maxThreadThroughput);
|
|
|
+ totalThroughputPerSecond = getInStorageUnits(totalThroughput);
|
|
|
+ }
|
|
|
+
|
|
|
+ private String getInStorageUnits(Double value) {
|
|
|
+ double size;
|
|
|
+ OzoneQuota.Units unit;
|
|
|
+ if ((long) (value / OzoneConsts.KB) == 0) {
|
|
|
+ size = value / OzoneConsts.KB;
|
|
|
+ unit = OzoneQuota.Units.KB;
|
|
|
+ } else if ((long) (value / OzoneConsts.MB) == 0) {
|
|
|
+ size = value / OzoneConsts.MB;
|
|
|
+ unit = OzoneQuota.Units.MB;
|
|
|
+ } else if ((long) (value / OzoneConsts.GB) == 0) {
|
|
|
+ size = value / OzoneConsts.GB;
|
|
|
+ unit = OzoneQuota.Units.GB;
|
|
|
+ } else if ((long) (value / OzoneConsts.TB) == 0) {
|
|
|
+ size = value / OzoneConsts.TB;
|
|
|
+ unit = OzoneQuota.Units.TB;
|
|
|
+ } else {
|
|
|
+ size = value;
|
|
|
+ unit = OzoneQuota.Units.BYTES;
|
|
|
+ }
|
|
|
+ return size + " " + unit;
|
|
|
+ }
|
|
|
+
|
|
|
+ public CoronaJobInfo setGitBaseRevision(String gitBaseRevisionVal) {
|
|
|
+ gitBaseRevision = gitBaseRevisionVal;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ public CoronaJobInfo setTotalBucketCreationTime(
|
|
|
+ String totalBucketCreationTimeVal) {
|
|
|
+ totalBucketCreationTime = totalBucketCreationTimeVal;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ public CoronaJobInfo setTotalVolumeCreationTime(
|
|
|
+ String totalVolumeCreationTimeVal) {
|
|
|
+ totalVolumeCreationTime = totalVolumeCreationTimeVal;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ public CoronaJobInfo setTotalKeyCreationTime(
|
|
|
+ String totalKeyCreationTimeVal) {
|
|
|
+ totalKeyCreationTime = totalKeyCreationTimeVal;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ public CoronaJobInfo setTotalKeyWriteTime(String totalKeyWriteTimeVal) {
|
|
|
+ totalKeyWriteTime = totalKeyWriteTimeVal;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ public CoronaJobInfo setAverageBucketCreationTime(
|
|
|
+ String averageBucketCreationTimeVal) {
|
|
|
+ averageBucketCreationTime = averageBucketCreationTimeVal;
|
|
|
+ return this;
|
|
|
}
|
|
|
+
|
|
|
+ public CoronaJobInfo setAverageVolumeCreationTime(
|
|
|
+ String averageVolumeCreationTimeVal) {
|
|
|
+ averageVolumeCreationTime = averageVolumeCreationTimeVal;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ public CoronaJobInfo setAverageKeyCreationTime(
|
|
|
+ String averageKeyCreationTimeVal) {
|
|
|
+ averageKeyCreationTime = averageKeyCreationTimeVal;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ public CoronaJobInfo setAverageKeyWriteTime(
|
|
|
+ String averageKeyWriteTimeVal) {
|
|
|
+ averageKeyWriteTime = averageKeyWriteTimeVal;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ public CoronaJobInfo setExecTime(String execTimeVal) {
|
|
|
+ execTime = execTimeVal;
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getJobStartTime() {
|
|
|
+ return jobStartTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getNumOfVolumes() {
|
|
|
+ return numOfVolumes;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getNumOfBuckets() {
|
|
|
+ return numOfBuckets;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getNumOfKeys() {
|
|
|
+ return numOfKeys;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getNumOfThreads() {
|
|
|
+ return numOfThreads;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getMode() {
|
|
|
+ return mode;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getTotalBucketCreationTime() {
|
|
|
+ return totalBucketCreationTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getTotalVolumeCreationTime() {
|
|
|
+ return totalVolumeCreationTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getTotalKeyCreationTime() {
|
|
|
+ return totalKeyCreationTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getAverageBucketCreationTime() {
|
|
|
+ return averageBucketCreationTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getTotalKeyWriteTime() {
|
|
|
+ return totalKeyWriteTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getAverageKeyWriteTime() {
|
|
|
+ return averageKeyWriteTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getAverageVolumeCreationTime() {
|
|
|
+ return averageVolumeCreationTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getAverageKeyCreationTime() {
|
|
|
+ return averageKeyCreationTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getExecTime() {
|
|
|
+ return execTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ public int getKeySize() {
|
|
|
+ return keySize;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getGitBaseRevision() {
|
|
|
+ return gitBaseRevision;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getMinThreadThroughputPerSecond() {
|
|
|
+ return minThreadThroughputPerSecond;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getMaxThreadThroughputPerSecond() {
|
|
|
+ return maxThreadThroughputPerSecond;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getDataWritten() {
|
|
|
+ return dataWritten;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getTotalThroughput() {
|
|
|
+ return totalThroughputPerSecond;
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
|
|
|
private class ProgressBar implements Runnable {
|