|
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.verify;
|
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
|
+import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
import java.lang.reflect.Field;
|
|
|
import java.lang.reflect.Modifier;
|
|
@@ -41,6 +42,7 @@ import java.util.Set;
|
|
|
import java.util.UUID;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
+import org.apache.commons.io.FileUtils;
|
|
|
import org.apache.commons.lang3.RandomUtils;
|
|
|
import org.apache.hadoop.hdds.HddsConfigKeys;
|
|
|
import org.apache.hadoop.hdds.HddsUtils;
|
|
@@ -92,6 +94,8 @@ import org.junit.rules.TemporaryFolder;
|
|
|
import org.junit.rules.Timeout;
|
|
|
import org.mockito.ArgumentMatcher;
|
|
|
import org.mockito.Mockito;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import com.google.common.collect.Lists;
|
|
|
import com.google.common.collect.Maps;
|
|
@@ -101,6 +105,9 @@ import com.google.common.collect.Maps;
|
|
|
*/
|
|
|
public class TestStorageContainerManager {
|
|
|
private static XceiverClientManager xceiverClientManager;
|
|
|
+ private static final Logger LOG = LoggerFactory.getLogger(
|
|
|
+ TestStorageContainerManager.class);
|
|
|
+
|
|
|
/**
|
|
|
* Set the timeout for every test.
|
|
|
*/
|
|
@@ -306,9 +313,7 @@ public class TestStorageContainerManager {
|
|
|
}
|
|
|
}, 1000, 10000);
|
|
|
} finally {
|
|
|
- if (cluster != null) {
|
|
|
- cluster.shutdown();
|
|
|
- }
|
|
|
+ cluster.shutdown();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -329,50 +334,54 @@ public class TestStorageContainerManager {
|
|
|
.build();
|
|
|
cluster.waitForClusterToBeReady();
|
|
|
|
|
|
- DeletedBlockLog delLog = cluster.getStorageContainerManager()
|
|
|
- .getScmBlockManager().getDeletedBlockLog();
|
|
|
- Assert.assertEquals(0, delLog.getNumOfValidTransactions());
|
|
|
-
|
|
|
- int limitSize = 1;
|
|
|
- // Reset limit value to 1, so that we only allow one TX is dealt per
|
|
|
- // datanode.
|
|
|
- SCMBlockDeletingService delService = cluster.getStorageContainerManager()
|
|
|
- .getScmBlockManager().getSCMBlockDeletingService();
|
|
|
- delService.setBlockDeleteTXNum(limitSize);
|
|
|
-
|
|
|
- // Create {numKeys} random names keys.
|
|
|
- TestStorageContainerManagerHelper helper =
|
|
|
- new TestStorageContainerManagerHelper(cluster, conf);
|
|
|
- Map<String, OmKeyInfo> keyLocations = helper.createKeys(numKeys, 4096);
|
|
|
- // Wait for container report
|
|
|
- Thread.sleep(5000);
|
|
|
- for (OmKeyInfo keyInfo : keyLocations.values()) {
|
|
|
- OzoneTestUtils.closeContainers(keyInfo.getKeyLocationVersions(),
|
|
|
- cluster.getStorageContainerManager());
|
|
|
- }
|
|
|
+ try {
|
|
|
+ DeletedBlockLog delLog = cluster.getStorageContainerManager()
|
|
|
+ .getScmBlockManager().getDeletedBlockLog();
|
|
|
+ Assert.assertEquals(0, delLog.getNumOfValidTransactions());
|
|
|
+
|
|
|
+ int limitSize = 1;
|
|
|
+ // Reset limit value to 1, so that we only allow one TX is dealt per
|
|
|
+ // datanode.
|
|
|
+ SCMBlockDeletingService delService = cluster.getStorageContainerManager()
|
|
|
+ .getScmBlockManager().getSCMBlockDeletingService();
|
|
|
+ delService.setBlockDeleteTXNum(limitSize);
|
|
|
+
|
|
|
+ // Create {numKeys} random names keys.
|
|
|
+ TestStorageContainerManagerHelper helper =
|
|
|
+ new TestStorageContainerManagerHelper(cluster, conf);
|
|
|
+ Map<String, OmKeyInfo> keyLocations = helper.createKeys(numKeys, 4096);
|
|
|
+ // Wait for container report
|
|
|
+ Thread.sleep(5000);
|
|
|
+ for (OmKeyInfo keyInfo : keyLocations.values()) {
|
|
|
+ OzoneTestUtils.closeContainers(keyInfo.getKeyLocationVersions(),
|
|
|
+ cluster.getStorageContainerManager());
|
|
|
+ }
|
|
|
|
|
|
- createDeleteTXLog(delLog, keyLocations, helper);
|
|
|
- // Verify a few TX gets created in the TX log.
|
|
|
- Assert.assertTrue(delLog.getNumOfValidTransactions() > 0);
|
|
|
-
|
|
|
- // Verify the size in delete commands is expected.
|
|
|
- GenericTestUtils.waitFor(() -> {
|
|
|
- NodeManager nodeManager = cluster.getStorageContainerManager()
|
|
|
- .getScmNodeManager();
|
|
|
- List<SCMCommand> commands = nodeManager.processHeartbeat(
|
|
|
- nodeManager.getNodes(NodeState.HEALTHY).get(0));
|
|
|
-
|
|
|
- if (commands != null) {
|
|
|
- for (SCMCommand cmd : commands) {
|
|
|
- if (cmd.getType() == SCMCommandProto.Type.deleteBlocksCommand) {
|
|
|
- List<DeletedBlocksTransaction> deletedTXs =
|
|
|
- ((DeleteBlocksCommand) cmd).blocksTobeDeleted();
|
|
|
- return deletedTXs != null && deletedTXs.size() == limitSize;
|
|
|
+ createDeleteTXLog(delLog, keyLocations, helper);
|
|
|
+ // Verify a few TX gets created in the TX log.
|
|
|
+ Assert.assertTrue(delLog.getNumOfValidTransactions() > 0);
|
|
|
+
|
|
|
+ // Verify the size in delete commands is expected.
|
|
|
+ GenericTestUtils.waitFor(() -> {
|
|
|
+ NodeManager nodeManager = cluster.getStorageContainerManager()
|
|
|
+ .getScmNodeManager();
|
|
|
+ List<SCMCommand> commands = nodeManager.processHeartbeat(
|
|
|
+ nodeManager.getNodes(NodeState.HEALTHY).get(0));
|
|
|
+
|
|
|
+ if (commands != null) {
|
|
|
+ for (SCMCommand cmd : commands) {
|
|
|
+ if (cmd.getType() == SCMCommandProto.Type.deleteBlocksCommand) {
|
|
|
+ List<DeletedBlocksTransaction> deletedTXs =
|
|
|
+ ((DeleteBlocksCommand) cmd).blocksTobeDeleted();
|
|
|
+ return deletedTXs != null && deletedTXs.size() == limitSize;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
- return false;
|
|
|
- }, 500, 10000);
|
|
|
+ return false;
|
|
|
+ }, 500, 10000);
|
|
|
+ } finally {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private Map<Long, List<Long>> createDeleteTXLog(DeletedBlockLog delLog,
|
|
@@ -450,12 +459,15 @@ public class TestStorageContainerManager {
|
|
|
MiniOzoneCluster cluster =
|
|
|
MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
|
|
|
cluster.waitForClusterToBeReady();
|
|
|
- // This will initialize SCM
|
|
|
- StorageContainerManager.scmInit(conf, "testClusterId");
|
|
|
- SCMStorageConfig scmStore = new SCMStorageConfig(conf);
|
|
|
- Assert.assertEquals(NodeType.SCM, scmStore.getNodeType());
|
|
|
- Assert.assertNotEquals("testClusterId", scmStore.getClusterID());
|
|
|
- cluster.shutdown();
|
|
|
+ try {
|
|
|
+ // This will initialize SCM
|
|
|
+ StorageContainerManager.scmInit(conf, "testClusterId");
|
|
|
+ SCMStorageConfig scmStore = new SCMStorageConfig(conf);
|
|
|
+ Assert.assertEquals(NodeType.SCM, scmStore.getNodeType());
|
|
|
+ Assert.assertNotEquals("testClusterId", scmStore.getClusterID());
|
|
|
+ } finally {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -478,25 +490,29 @@ public class TestStorageContainerManager {
|
|
|
OzoneConfiguration conf = new OzoneConfiguration();
|
|
|
final String path =
|
|
|
GenericTestUtils.getTempPath(UUID.randomUUID().toString());
|
|
|
- Path scmPath = Paths.get(path, "scm-meta");
|
|
|
- conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
|
|
|
- conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
|
|
|
- SCMStorageConfig scmStore = new SCMStorageConfig(conf);
|
|
|
- String clusterId = UUID.randomUUID().toString();
|
|
|
- String scmId = UUID.randomUUID().toString();
|
|
|
- scmStore.setClusterId(clusterId);
|
|
|
- scmStore.setScmId(scmId);
|
|
|
- // writes the version file properties
|
|
|
- scmStore.initialize();
|
|
|
- StorageContainerManager scm = StorageContainerManager.createSCM(conf);
|
|
|
- //Reads the SCM Info from SCM instance
|
|
|
- ScmInfo scmInfo = scm.getClientProtocolServer().getScmInfo();
|
|
|
- Assert.assertEquals(clusterId, scmInfo.getClusterId());
|
|
|
- Assert.assertEquals(scmId, scmInfo.getScmId());
|
|
|
-
|
|
|
- String expectedVersion = HddsVersionInfo.HDDS_VERSION_INFO.getVersion();
|
|
|
- String actualVersion = scm.getSoftwareVersion();
|
|
|
- Assert.assertEquals(expectedVersion, actualVersion);
|
|
|
+ try {
|
|
|
+ Path scmPath = Paths.get(path, "scm-meta");
|
|
|
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
|
|
|
+ conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
|
|
|
+ SCMStorageConfig scmStore = new SCMStorageConfig(conf);
|
|
|
+ String clusterId = UUID.randomUUID().toString();
|
|
|
+ String scmId = UUID.randomUUID().toString();
|
|
|
+ scmStore.setClusterId(clusterId);
|
|
|
+ scmStore.setScmId(scmId);
|
|
|
+ // writes the version file properties
|
|
|
+ scmStore.initialize();
|
|
|
+ StorageContainerManager scm = StorageContainerManager.createSCM(conf);
|
|
|
+ //Reads the SCM Info from SCM instance
|
|
|
+ ScmInfo scmInfo = scm.getClientProtocolServer().getScmInfo();
|
|
|
+ Assert.assertEquals(clusterId, scmInfo.getClusterId());
|
|
|
+ Assert.assertEquals(scmId, scmInfo.getScmId());
|
|
|
+
|
|
|
+ String expectedVersion = HddsVersionInfo.HDDS_VERSION_INFO.getVersion();
|
|
|
+ String actualVersion = scm.getSoftwareVersion();
|
|
|
+ Assert.assertEquals(expectedVersion, actualVersion);
|
|
|
+ } finally {
|
|
|
+ FileUtils.deleteQuietly(new File(path));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -564,48 +580,52 @@ public class TestStorageContainerManager {
|
|
|
.build();
|
|
|
cluster.waitForClusterToBeReady();
|
|
|
|
|
|
- TestStorageContainerManagerHelper helper =
|
|
|
- new TestStorageContainerManagerHelper(cluster, conf);
|
|
|
-
|
|
|
- helper.createKeys(10, 4096);
|
|
|
- Thread.sleep(5000);
|
|
|
+ try {
|
|
|
+ TestStorageContainerManagerHelper helper =
|
|
|
+ new TestStorageContainerManagerHelper(cluster, conf);
|
|
|
|
|
|
- StorageContainerManager scm = cluster.getStorageContainerManager();
|
|
|
- List<ContainerInfo> containers = cluster.getStorageContainerManager()
|
|
|
- .getContainerManager().getContainers();
|
|
|
- Assert.assertNotNull(containers);
|
|
|
- ContainerInfo selectedContainer = containers.iterator().next();
|
|
|
-
|
|
|
- // Stop processing HB
|
|
|
- scm.getDatanodeProtocolServer().stop();
|
|
|
-
|
|
|
- scm.getContainerManager().updateContainerState(selectedContainer
|
|
|
- .containerID(), HddsProtos.LifeCycleEvent.FINALIZE);
|
|
|
- cluster.restartStorageContainerManager(true);
|
|
|
- scm = cluster.getStorageContainerManager();
|
|
|
- EventPublisher publisher = mock(EventPublisher.class);
|
|
|
- ReplicationManager replicationManager = scm.getReplicationManager();
|
|
|
- Field f = replicationManager.getClass().getDeclaredField("eventPublisher");
|
|
|
- f.setAccessible(true);
|
|
|
- Field modifiersField = Field.class.getDeclaredField("modifiers");
|
|
|
- modifiersField.setAccessible(true);
|
|
|
- modifiersField.setInt(f, f.getModifiers() & ~Modifier.FINAL);
|
|
|
- f.set(replicationManager, publisher);
|
|
|
- scm.getReplicationManager().start();
|
|
|
- Thread.sleep(2000);
|
|
|
-
|
|
|
- UUID dnUuid = cluster.getHddsDatanodes().iterator().next()
|
|
|
- .getDatanodeDetails().getUuid();
|
|
|
-
|
|
|
- CloseContainerCommand closeContainerCommand =
|
|
|
- new CloseContainerCommand(selectedContainer.getContainerID(),
|
|
|
- selectedContainer.getPipelineID(), false);
|
|
|
-
|
|
|
- CommandForDatanode commandForDatanode = new CommandForDatanode(
|
|
|
- dnUuid, closeContainerCommand);
|
|
|
-
|
|
|
- verify(publisher).fireEvent(eq(SCMEvents.DATANODE_COMMAND), argThat(new
|
|
|
- CloseContainerCommandMatcher(dnUuid, commandForDatanode)));
|
|
|
+ helper.createKeys(10, 4096);
|
|
|
+ Thread.sleep(5000);
|
|
|
+
|
|
|
+ StorageContainerManager scm = cluster.getStorageContainerManager();
|
|
|
+ List<ContainerInfo> containers = cluster.getStorageContainerManager()
|
|
|
+ .getContainerManager().getContainers();
|
|
|
+ Assert.assertNotNull(containers);
|
|
|
+ ContainerInfo selectedContainer = containers.iterator().next();
|
|
|
+
|
|
|
+ // Stop processing HB
|
|
|
+ scm.getDatanodeProtocolServer().stop();
|
|
|
+
|
|
|
+ scm.getContainerManager().updateContainerState(selectedContainer
|
|
|
+ .containerID(), HddsProtos.LifeCycleEvent.FINALIZE);
|
|
|
+ cluster.restartStorageContainerManager(true);
|
|
|
+ scm = cluster.getStorageContainerManager();
|
|
|
+ EventPublisher publisher = mock(EventPublisher.class);
|
|
|
+ ReplicationManager replicationManager = scm.getReplicationManager();
|
|
|
+ Field f = ReplicationManager.class.getDeclaredField("eventPublisher");
|
|
|
+ f.setAccessible(true);
|
|
|
+ Field modifiersField = Field.class.getDeclaredField("modifiers");
|
|
|
+ modifiersField.setAccessible(true);
|
|
|
+ modifiersField.setInt(f, f.getModifiers() & ~Modifier.FINAL);
|
|
|
+ f.set(replicationManager, publisher);
|
|
|
+ scm.getReplicationManager().start();
|
|
|
+ Thread.sleep(2000);
|
|
|
+
|
|
|
+ UUID dnUuid = cluster.getHddsDatanodes().iterator().next()
|
|
|
+ .getDatanodeDetails().getUuid();
|
|
|
+
|
|
|
+ CloseContainerCommand closeContainerCommand =
|
|
|
+ new CloseContainerCommand(selectedContainer.getContainerID(),
|
|
|
+ selectedContainer.getPipelineID(), false);
|
|
|
+
|
|
|
+ CommandForDatanode commandForDatanode = new CommandForDatanode(
|
|
|
+ dnUuid, closeContainerCommand);
|
|
|
+
|
|
|
+ verify(publisher).fireEvent(eq(SCMEvents.DATANODE_COMMAND), argThat(new
|
|
|
+ CloseContainerCommandMatcher(dnUuid, commandForDatanode)));
|
|
|
+ } finally {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings("visibilitymodifier")
|