|
@@ -26,6 +26,7 @@ import org.apache.hadoop.ozone.MiniOzoneCluster;
|
|
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
|
|
import org.apache.hadoop.conf.OzoneConfiguration;
|
|
|
import org.apache.hadoop.ozone.OzoneConsts;
|
|
|
+import org.apache.hadoop.ozone.common.BlockGroup;
|
|
|
import org.apache.hadoop.ozone.client.rest.OzoneException;
|
|
|
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
|
|
|
import org.apache.hadoop.ozone.web.handlers.KeyArgs;
|
|
@@ -69,7 +70,10 @@ import java.util.Set;
|
|
|
import java.util.List;
|
|
|
import java.util.UUID;
|
|
|
import java.util.stream.Collectors;
|
|
|
+import java.util.stream.Stream;
|
|
|
|
|
|
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS;
|
|
|
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
|
|
|
import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
|
|
|
/**
|
|
|
* Test Key Space Manager operation in distributed handler scenario.
|
|
@@ -101,6 +105,8 @@ public class TestKeySpaceManager {
|
|
|
scmId = UUID.randomUUID().toString();
|
|
|
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
|
|
|
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
|
|
|
+ conf.setInt(OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS, 2);
|
|
|
+ conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
|
|
|
cluster = new MiniOzoneClassicCluster.Builder(conf)
|
|
|
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
|
|
|
.setClusterId(clusterId)
|
|
@@ -955,7 +961,7 @@ public class TestKeySpaceManager {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testGetKeyInfo() throws IOException,
|
|
|
- OzoneException, ParseException {
|
|
|
+ OzoneException, ParseException {
|
|
|
String userName = "user" + RandomStringUtils.randomNumeric(5);
|
|
|
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
|
|
|
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
|
|
@@ -1053,4 +1059,75 @@ public class TestKeySpaceManager {
|
|
|
Assert.assertEquals(clusterId, info.getClusterId());
|
|
|
Assert.assertEquals(scmId, info.getScmId());
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testExpiredOpenKey() throws Exception {
|
|
|
+ String userName = "user" + RandomStringUtils.randomNumeric(5);
|
|
|
+ String adminName = "admin" + RandomStringUtils.randomNumeric(5);
|
|
|
+ String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
|
|
|
+ String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
|
|
|
+
|
|
|
+ VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
|
|
|
+ createVolumeArgs.setUserName(userName);
|
|
|
+ createVolumeArgs.setAdminName(adminName);
|
|
|
+ storageHandler.createVolume(createVolumeArgs);
|
|
|
+
|
|
|
+ BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
|
|
|
+ bucketArgs.setAddAcls(new LinkedList<>());
|
|
|
+ bucketArgs.setRemoveAcls(new LinkedList<>());
|
|
|
+ bucketArgs.setStorageType(StorageType.DISK);
|
|
|
+ storageHandler.createBucket(bucketArgs);
|
|
|
+
|
|
|
+ // open some keys.
|
|
|
+
|
|
|
+ Thread.sleep(1000);
|
|
|
+
|
|
|
+ KeyArgs keyArgs1 = new KeyArgs("testKey1", bucketArgs);
|
|
|
+ KeyArgs keyArgs2 = new KeyArgs("testKey2", bucketArgs);
|
|
|
+ KeyArgs keyArgs3 = new KeyArgs("testKey3", bucketArgs);
|
|
|
+ KeyArgs keyArgs4 = new KeyArgs("testKey4", bucketArgs);
|
|
|
+ List<BlockGroup> openKeys;
|
|
|
+ try (OutputStream s1 = storageHandler.newKeyWriter(keyArgs1);
|
|
|
+ OutputStream s2 = storageHandler.newKeyWriter(keyArgs2)) {
|
|
|
+ storageHandler.newKeyWriter(keyArgs3);
|
|
|
+ storageHandler.newKeyWriter(keyArgs4);
|
|
|
+ // now all k1-k4 should be in open state
|
|
|
+ openKeys = cluster.getKeySpaceManager()
|
|
|
+ .getMetadataManager().getExpiredOpenKeys();
|
|
|
+ Assert.assertEquals(0, openKeys.size());
|
|
|
+
|
|
|
+ Thread.sleep(2000);
|
|
|
+
|
|
|
+ openKeys = cluster.getKeySpaceManager().getMetadataManager()
|
|
|
+ .getExpiredOpenKeys();
|
|
|
+ Assert.assertEquals(4, openKeys.size());
|
|
|
+
|
|
|
+ Set<String> expected = Stream.of(
|
|
|
+ "testKey1", "testKey2", "testKey3", "testKey4")
|
|
|
+ .collect(Collectors.toSet());
|
|
|
+ openKeys =
|
|
|
+ cluster.getKeySpaceManager().getMetadataManager().getExpiredOpenKeys();
|
|
|
+ for (BlockGroup bg : openKeys) {
|
|
|
+ String[] subs = bg.getGroupID().split("/");
|
|
|
+ String keyName = subs[subs.length - 1];
|
|
|
+ Assert.assertTrue(expected.remove(keyName));
|
|
|
+ }
|
|
|
+ Assert.assertEquals(0, expected.size());
|
|
|
+ }
|
|
|
+
|
|
|
+ KeyArgs keyArgs5 = new KeyArgs("testKey5", bucketArgs);
|
|
|
+ storageHandler.newKeyWriter(keyArgs5);
|
|
|
+
|
|
|
+ // k1 and k2 are closed, so should be removed from meta data, k3 and k4
|
|
|
+ // should still be there.
|
|
|
+ Thread.sleep(2000);
|
|
|
+
|
|
|
+ openKeys =
|
|
|
+ cluster.getKeySpaceManager().getMetadataManager().getExpiredOpenKeys();
|
|
|
+ Assert.assertEquals(1, openKeys.size());
|
|
|
+ String[] subs = openKeys.get(0).getGroupID().split("/");
|
|
|
+ String keyName = subs[subs.length - 1];
|
|
|
+ Assert.assertEquals("testKey5", keyName);
|
|
|
+ }
|
|
|
}
|