|
@@ -24,6 +24,18 @@ import java.util.UUID;
|
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
+import org.apache.commons.lang3.RandomStringUtils;
|
|
|
+import org.apache.hadoop.ozone.audit.AuditLogger;
|
|
|
+import org.apache.hadoop.ozone.audit.AuditMessage;
|
|
|
+import org.apache.hadoop.ozone.om.OMConfigKeys;
|
|
|
+import org.apache.hadoop.ozone.om.OMMetrics;
|
|
|
+import org.apache.hadoop.ozone.om.OzoneManager;
|
|
|
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
|
|
|
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
|
|
|
+import org.apache.hadoop.ozone.om.request.bucket.OMBucketCreateRequest;
|
|
|
+import org.apache.hadoop.ozone.om.request.bucket.OMBucketDeleteRequest;
|
|
|
+import org.apache.hadoop.ozone.om.request.volume.OMVolumeCreateRequest;
|
|
|
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
|
|
|
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
|
|
|
import org.junit.After;
|
|
|
import org.junit.Assert;
|
|
@@ -38,30 +50,30 @@ import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
|
|
|
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
|
|
|
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
|
|
|
import org.apache.hadoop.ozone.om.response.volume.OMVolumeCreateResponse;
|
|
|
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
|
|
|
- .CreateBucketResponse;
|
|
|
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
|
|
|
- .CreateVolumeResponse;
|
|
|
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
|
|
|
.DeleteBucketResponse;
|
|
|
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
|
|
|
.OMResponse;
|
|
|
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
|
|
|
- .VolumeList;
|
|
|
import org.apache.hadoop.ozone.om.response.bucket.OMBucketCreateResponse;
|
|
|
import org.apache.hadoop.ozone.om.response.bucket.OMBucketDeleteResponse;
|
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.apache.hadoop.util.Daemon;
|
|
|
-import org.apache.hadoop.util.Time;
|
|
|
+import org.mockito.Mockito;
|
|
|
|
|
|
-import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
|
|
|
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_LOCK_MAX_CONCURRENCY;
|
|
|
import static org.junit.Assert.fail;
|
|
|
+import static org.mockito.ArgumentMatchers.any;
|
|
|
+import static org.mockito.Mockito.when;
|
|
|
|
|
|
/**
|
|
|
* This class tests OzoneManagerDouble Buffer with actual OMResponse classes.
|
|
|
*/
|
|
|
public class TestOzoneManagerDoubleBufferWithOMResponse {
|
|
|
|
|
|
+ private OzoneManager ozoneManager;
|
|
|
+ private OMMetrics omMetrics;
|
|
|
+ private AuditLogger auditLogger;
|
|
|
+ private OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper;
|
|
|
private OMMetadataManager omMetadataManager;
|
|
|
private OzoneManagerDoubleBuffer doubleBuffer;
|
|
|
private final AtomicLong trxId = new AtomicLong(0);
|
|
@@ -73,16 +85,25 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
|
|
|
|
|
|
@Before
|
|
|
public void setup() throws IOException {
|
|
|
- OzoneConfiguration configuration = new OzoneConfiguration();
|
|
|
- configuration.set(OZONE_METADATA_DIRS,
|
|
|
+ ozoneManager = Mockito.mock(OzoneManager.class);
|
|
|
+ omMetrics = OMMetrics.create();
|
|
|
+ OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
|
|
|
+ ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
|
|
|
folder.newFolder().getAbsolutePath());
|
|
|
- omMetadataManager =
|
|
|
- new OmMetadataManagerImpl(configuration);
|
|
|
+ ozoneConfiguration.setInt(HDDS_LOCK_MAX_CONCURRENCY, 1000);
|
|
|
+ omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration);
|
|
|
+ when(ozoneManager.getMetrics()).thenReturn(omMetrics);
|
|
|
+ when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
|
|
|
+ when(ozoneManager.getMaxUserVolumeCount()).thenReturn(10L);
|
|
|
+ auditLogger = Mockito.mock(AuditLogger.class);
|
|
|
+ when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
|
|
|
+ Mockito.doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
|
|
|
ozoneManagerRatisSnapshot = index -> {
|
|
|
lastAppliedIndex = index;
|
|
|
};
|
|
|
doubleBuffer = new OzoneManagerDoubleBuffer(omMetadataManager,
|
|
|
ozoneManagerRatisSnapshot);
|
|
|
+ ozoneManagerDoubleBufferHelper = doubleBuffer::add;
|
|
|
}
|
|
|
|
|
|
@After
|
|
@@ -104,7 +125,7 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
|
|
|
testDoubleBuffer(1, 10);
|
|
|
testDoubleBuffer(10, 100);
|
|
|
testDoubleBuffer(100, 100);
|
|
|
- testDoubleBuffer(1000, 1000);
|
|
|
+ testDoubleBuffer(1000, 100);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -123,9 +144,9 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
|
|
|
new ConcurrentLinkedQueue<>();
|
|
|
|
|
|
String volumeName = UUID.randomUUID().toString();
|
|
|
- OMVolumeCreateResponse omVolumeCreateResponse = createVolume(volumeName);
|
|
|
- doubleBuffer.add(omVolumeCreateResponse, trxId.incrementAndGet());
|
|
|
-
|
|
|
+ OMVolumeCreateResponse omVolumeCreateResponse =
|
|
|
+ (OMVolumeCreateResponse) createVolume(volumeName,
|
|
|
+ trxId.incrementAndGet());
|
|
|
|
|
|
int bucketCount = 10;
|
|
|
|
|
@@ -174,16 +195,16 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
|
|
|
new ConcurrentLinkedQueue<>();
|
|
|
|
|
|
String volumeName1 = UUID.randomUUID().toString();
|
|
|
+
|
|
|
OMVolumeCreateResponse omVolumeCreateResponse1 =
|
|
|
- createVolume(volumeName1);
|
|
|
+ (OMVolumeCreateResponse) createVolume(volumeName1,
|
|
|
+ trxId.incrementAndGet());
|
|
|
|
|
|
String volumeName2 = UUID.randomUUID().toString();
|
|
|
OMVolumeCreateResponse omVolumeCreateResponse2 =
|
|
|
- createVolume(volumeName2);
|
|
|
-
|
|
|
- doubleBuffer.add(omVolumeCreateResponse1, trxId.incrementAndGet());
|
|
|
+ (OMVolumeCreateResponse) createVolume(volumeName2,
|
|
|
+ trxId.incrementAndGet());
|
|
|
|
|
|
- doubleBuffer.add(omVolumeCreateResponse2, trxId.incrementAndGet());
|
|
|
|
|
|
Daemon daemon1 = new Daemon(() -> doMixTransactions(volumeName1, 10,
|
|
|
deleteBucketQueue, bucketQueue));
|
|
@@ -235,14 +256,14 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
|
|
|
Queue<OMBucketCreateResponse> bucketQueue) {
|
|
|
for (int i=0; i < bucketCount; i++) {
|
|
|
String bucketName = UUID.randomUUID().toString();
|
|
|
+ long transactionID = trxId.incrementAndGet();
|
|
|
OMBucketCreateResponse omBucketCreateResponse = createBucket(volumeName,
|
|
|
- bucketName);
|
|
|
- doubleBuffer.add(omBucketCreateResponse, trxId.incrementAndGet());
|
|
|
+ bucketName, transactionID);
|
|
|
// For every 2 transactions have a deleted bucket.
|
|
|
if (i % 2 == 0) {
|
|
|
OMBucketDeleteResponse omBucketDeleteResponse =
|
|
|
- deleteBucket(volumeName, bucketName);
|
|
|
- doubleBuffer.add(omBucketDeleteResponse, trxId.incrementAndGet());
|
|
|
+ (OMBucketDeleteResponse) deleteBucket(volumeName, bucketName,
|
|
|
+ trxId.incrementAndGet());
|
|
|
deleteBucketQueue.add(omBucketDeleteResponse);
|
|
|
} else {
|
|
|
bucketQueue.add(omBucketCreateResponse);
|
|
@@ -250,6 +271,18 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private OMClientResponse deleteBucket(String volumeName, String bucketName,
|
|
|
+ long transactionID) {
|
|
|
+ OzoneManagerProtocolProtos.OMRequest omRequest =
|
|
|
+ TestOMRequestUtils.createDeleteBucketRequest(volumeName, bucketName);
|
|
|
+
|
|
|
+ OMBucketDeleteRequest omBucketDeleteRequest =
|
|
|
+ new OMBucketDeleteRequest(omRequest);
|
|
|
+
|
|
|
+ return omBucketDeleteRequest.validateAndUpdateCache(ozoneManager,
|
|
|
+ transactionID, ozoneManagerDoubleBufferHelper);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Verifies volume table data is matching with actual response added to
|
|
|
* double buffer.
|
|
@@ -340,7 +373,7 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
|
|
|
setup();
|
|
|
for (int i = 0; i < iterations; i++) {
|
|
|
Daemon d1 = new Daemon(() ->
|
|
|
- doTransactions(UUID.randomUUID().toString(), bucketCount));
|
|
|
+ doTransactions(RandomStringUtils.randomAlphabetic(5), bucketCount));
|
|
|
d1.start();
|
|
|
}
|
|
|
|
|
@@ -353,13 +386,30 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
|
|
|
doubleBuffer.getFlushedTransactionCount()
|
|
|
);
|
|
|
|
|
|
- Assert.assertEquals(iterations,
|
|
|
- omMetadataManager.countRowsInTable(omMetadataManager.getVolumeTable())
|
|
|
- );
|
|
|
+ GenericTestUtils.waitFor(() -> {
|
|
|
+ long count = 0L;
|
|
|
+ try {
|
|
|
+ count =
|
|
|
+ omMetadataManager.countRowsInTable(
|
|
|
+ omMetadataManager.getVolumeTable());
|
|
|
+ } catch (IOException ex) {
|
|
|
+ fail("testDoubleBuffer failed");
|
|
|
+ }
|
|
|
+ return count == iterations;
|
|
|
+
|
|
|
+ }, 300, 40000);
|
|
|
|
|
|
- Assert.assertEquals(bucketCount * iterations,
|
|
|
- omMetadataManager.countRowsInTable(omMetadataManager.getBucketTable())
|
|
|
- );
|
|
|
+
|
|
|
+ GenericTestUtils.waitFor(() -> {
|
|
|
+ long count = 0L;
|
|
|
+ try {
|
|
|
+ count = omMetadataManager.countRowsInTable(
|
|
|
+ omMetadataManager.getBucketTable());
|
|
|
+ } catch (IOException ex) {
|
|
|
+ fail("testDoubleBuffer failed");
|
|
|
+ }
|
|
|
+ return count == bucketCount * iterations;
|
|
|
+ }, 300, 40000);
|
|
|
|
|
|
Assert.assertTrue(doubleBuffer.getFlushIterations() > 0);
|
|
|
} finally {
|
|
@@ -374,9 +424,9 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
|
|
|
* @param bucketCount
|
|
|
*/
|
|
|
public void doTransactions(String volumeName, int bucketCount) {
|
|
|
- doubleBuffer.add(createVolume(volumeName), trxId.incrementAndGet());
|
|
|
+ createVolume(volumeName, trxId.incrementAndGet());
|
|
|
for (int i=0; i< bucketCount; i++) {
|
|
|
- doubleBuffer.add(createBucket(volumeName, UUID.randomUUID().toString()),
|
|
|
+ createBucket(volumeName, UUID.randomUUID().toString(),
|
|
|
trxId.incrementAndGet());
|
|
|
// For every 100 buckets creation adding 100ms delay
|
|
|
|
|
@@ -395,22 +445,19 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
|
|
|
* @param volumeName
|
|
|
* @return OMVolumeCreateResponse
|
|
|
*/
|
|
|
- private OMVolumeCreateResponse createVolume(String volumeName) {
|
|
|
- OmVolumeArgs omVolumeArgs =
|
|
|
- OmVolumeArgs.newBuilder()
|
|
|
- .setAdminName(UUID.randomUUID().toString())
|
|
|
- .setOwnerName(UUID.randomUUID().toString())
|
|
|
- .setVolume(volumeName)
|
|
|
- .setCreationTime(Time.now()).build();
|
|
|
-
|
|
|
- VolumeList volumeList = VolumeList.newBuilder()
|
|
|
- .addVolumeNames(volumeName).build();
|
|
|
- return new OMVolumeCreateResponse(omVolumeArgs, volumeList,
|
|
|
- OMResponse.newBuilder()
|
|
|
- .setCmdType(OzoneManagerProtocolProtos.Type.CreateVolume)
|
|
|
- .setStatus(OzoneManagerProtocolProtos.Status.OK)
|
|
|
- .setCreateVolumeResponse(CreateVolumeResponse.newBuilder().build())
|
|
|
- .build());
|
|
|
+ private OMClientResponse createVolume(String volumeName,
|
|
|
+ long transactionId) {
|
|
|
+
|
|
|
+ String admin = "ozone";
|
|
|
+ String owner = UUID.randomUUID().toString();
|
|
|
+ OzoneManagerProtocolProtos.OMRequest omRequest =
|
|
|
+ TestOMRequestUtils.createVolumeRequest(volumeName, admin, owner);
|
|
|
+
|
|
|
+ OMVolumeCreateRequest omVolumeCreateRequest =
|
|
|
+ new OMVolumeCreateRequest(omRequest);
|
|
|
+
|
|
|
+ return omVolumeCreateRequest.validateAndUpdateCache(ozoneManager,
|
|
|
+ transactionId, ozoneManagerDoubleBufferHelper);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -420,15 +467,19 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
|
|
|
* @return OMBucketCreateResponse
|
|
|
*/
|
|
|
private OMBucketCreateResponse createBucket(String volumeName,
|
|
|
- String bucketName) {
|
|
|
- OmBucketInfo omBucketInfo =
|
|
|
- OmBucketInfo.newBuilder().setVolumeName(volumeName)
|
|
|
- .setBucketName(bucketName).setCreationTime(Time.now()).build();
|
|
|
- return new OMBucketCreateResponse(omBucketInfo, OMResponse.newBuilder()
|
|
|
- .setCmdType(OzoneManagerProtocolProtos.Type.CreateBucket)
|
|
|
- .setStatus(OzoneManagerProtocolProtos.Status.OK)
|
|
|
- .setCreateBucketResponse(CreateBucketResponse.newBuilder().build())
|
|
|
- .build());
|
|
|
+ String bucketName, long transactionID) {
|
|
|
+
|
|
|
+ OzoneManagerProtocolProtos.OMRequest omRequest =
|
|
|
+ TestOMRequestUtils.createBucketRequest(bucketName, volumeName, false,
|
|
|
+ OzoneManagerProtocolProtos.StorageTypeProto.DISK);
|
|
|
+
|
|
|
+ OMBucketCreateRequest omBucketCreateRequest =
|
|
|
+ new OMBucketCreateRequest(omRequest);
|
|
|
+
|
|
|
+ return (OMBucketCreateResponse) omBucketCreateRequest
|
|
|
+ .validateAndUpdateCache(ozoneManager, transactionID,
|
|
|
+ ozoneManagerDoubleBufferHelper);
|
|
|
+
|
|
|
}
|
|
|
|
|
|
/**
|