|
@@ -35,6 +35,7 @@ import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
+import java.util.Optional;
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
|
import java.util.concurrent.CompletableFuture;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -49,7 +50,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|
|
*/
|
|
|
public class MiniOzoneLoadGenerator {
|
|
|
|
|
|
- static final Logger LOG =
|
|
|
+ private static final Logger LOG =
|
|
|
LoggerFactory.getLogger(MiniOzoneLoadGenerator.class);
|
|
|
|
|
|
private static String keyNameDelimiter = "_";
|
|
@@ -113,7 +114,7 @@ public class MiniOzoneLoadGenerator {
|
|
|
int index = RandomUtils.nextInt();
|
|
|
String keyName = writeData(index, bucket, threadName);
|
|
|
|
|
|
- readData(bucket, keyName);
|
|
|
+ readData(bucket, keyName, index);
|
|
|
|
|
|
deleteKey(bucket, keyName);
|
|
|
} catch (Exception e) {
|
|
@@ -133,11 +134,13 @@ public class MiniOzoneLoadGenerator {
|
|
|
ByteBuffer buffer = buffers.get(keyIndex % numBuffers);
|
|
|
int bufferCapacity = buffer.capacity();
|
|
|
|
|
|
- String keyName = threadName + keyNameDelimiter + keyIndex;
|
|
|
+ String keyName = getKeyName(keyIndex, threadName);
|
|
|
+ LOG.trace("LOADGEN: Writing key {}", keyName);
|
|
|
try (OzoneOutputStream stream = bucket.createKey(keyName,
|
|
|
bufferCapacity, ReplicationType.RATIS, ReplicationFactor.THREE,
|
|
|
new HashMap<>())) {
|
|
|
stream.write(buffer.array());
|
|
|
+ LOG.trace("LOADGEN: Written key {}", keyName);
|
|
|
} catch (Throwable t) {
|
|
|
LOG.error("LOADGEN: Create key:{} failed with exception, skipping",
|
|
|
keyName, t);
|
|
@@ -147,9 +150,9 @@ public class MiniOzoneLoadGenerator {
|
|
|
return keyName;
|
|
|
}
|
|
|
|
|
|
- private void readData(OzoneBucket bucket, String keyName) throws Exception {
|
|
|
- int index = Integer.valueOf(keyName.split(keyNameDelimiter)[1]);
|
|
|
-
|
|
|
+ private void readData(OzoneBucket bucket, String keyName, int index)
|
|
|
+ throws Exception {
|
|
|
+ LOG.trace("LOADGEN: Reading key {}", keyName);
|
|
|
|
|
|
ByteBuffer buffer = buffers.get(index % numBuffers);
|
|
|
int bufferCapacity = buffer.capacity();
|
|
@@ -168,6 +171,7 @@ public class MiniOzoneLoadGenerator {
|
|
|
throw new IOException("Read mismatch, key:" + keyName +
|
|
|
" read data does not match the written data");
|
|
|
}
|
|
|
+ LOG.trace("LOADGEN: Read key {}", keyName);
|
|
|
} catch (Throwable t) {
|
|
|
LOG.error("LOADGEN: Read key:{} failed with exception", keyName, t);
|
|
|
throw t;
|
|
@@ -175,18 +179,21 @@ public class MiniOzoneLoadGenerator {
|
|
|
}
|
|
|
|
|
|
private void deleteKey(OzoneBucket bucket, String keyName) throws Exception {
|
|
|
+ LOG.trace("LOADGEN: Deleting key {}", keyName);
|
|
|
try {
|
|
|
bucket.deleteKey(keyName);
|
|
|
+ LOG.trace("LOADGEN: Deleted key {}", keyName);
|
|
|
} catch (Throwable t) {
|
|
|
LOG.error("LOADGEN: Unable to delete key:{}", keyName, t);
|
|
|
throw t;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private String getKeyToRead() {
|
|
|
+ private Optional<Integer> randomKeyToRead() {
|
|
|
int currentIndex = agedFileWrittenIndex.get();
|
|
|
- return currentIndex != 0 ?
|
|
|
- String.valueOf(RandomUtils.nextInt(0, currentIndex)): null;
|
|
|
+ return currentIndex != 0
|
|
|
+ ? Optional.of(RandomUtils.nextInt(0, currentIndex))
|
|
|
+ : Optional.empty();
|
|
|
}
|
|
|
|
|
|
private void startAgedFilesLoad(long runTimeMillis) {
|
|
@@ -201,12 +208,13 @@ public class MiniOzoneLoadGenerator {
|
|
|
String keyName = null;
|
|
|
try {
|
|
|
if (agedWriteProbability.isTrue()) {
|
|
|
- keyName = writeData(agedFileWrittenIndex.incrementAndGet(),
|
|
|
+ keyName = writeData(agedFileWrittenIndex.getAndIncrement(),
|
|
|
agedLoadBucket, threadName);
|
|
|
} else {
|
|
|
- keyName = getKeyToRead();
|
|
|
- if (keyName != null) {
|
|
|
- readData(agedLoadBucket, keyName);
|
|
|
+ Optional<Integer> index = randomKeyToRead();
|
|
|
+ if (index.isPresent()) {
|
|
|
+ keyName = getKeyName(index.get(), threadName);
|
|
|
+ readData(agedLoadBucket, keyName, index.get());
|
|
|
}
|
|
|
}
|
|
|
} catch (Throwable t) {
|
|
@@ -253,4 +261,8 @@ public class MiniOzoneLoadGenerator {
|
|
|
LOG.error("error while closing ", e);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private static String getKeyName(int keyIndex, String threadName) {
|
|
|
+ return threadName + keyNameDelimiter + keyIndex;
|
|
|
+ }
|
|
|
}
|