|
@@ -20,10 +20,9 @@ package org.apache.hadoop.ozone.scm.node;
|
|
|
import com.google.common.base.Supplier;
|
|
|
import static java.util.concurrent.TimeUnit.*;
|
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
|
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
|
+import org.apache.hadoop.hdsl.protocol.DatanodeDetails;
|
|
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
|
|
import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
|
|
|
-import org.apache.hadoop.ozone.container.common.SCMTestUtils;
|
|
|
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
|
|
import org.apache.hadoop.hdsl.protocol.proto.HdslProtos;
|
|
|
import org.apache.hadoop.hdsl.protocol.proto
|
|
@@ -38,7 +37,6 @@ import org.apache.hadoop.scm.ScmConfigKeys;
|
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.apache.hadoop.test.PathUtils;
|
|
|
|
|
|
-import static org.apache.hadoop.ozone.scm.TestUtils.getDatanodeID;
|
|
|
import org.hamcrest.CoreMatchers;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.After;
|
|
@@ -69,7 +67,8 @@ import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
|
|
|
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL;
|
|
|
import static org.apache.hadoop.scm.ScmConfigKeys
|
|
|
.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL;
|
|
|
-import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_MAX_HB_COUNT_TO_PROCESS;
|
|
|
+import static org.apache.hadoop.scm.ScmConfigKeys
|
|
|
+ .OZONE_SCM_MAX_HB_COUNT_TO_PROCESS;
|
|
|
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
|
|
|
import static org.hamcrest.CoreMatchers.containsString;
|
|
|
import static org.hamcrest.MatcherAssert.assertThat;
|
|
@@ -153,8 +152,10 @@ public class TestNodeManager {
|
|
|
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
|
|
|
// Send some heartbeats from different nodes.
|
|
|
for (int x = 0; x < nodeManager.getMinimumChillModeNodes(); x++) {
|
|
|
- DatanodeID datanodeID = getDatanodeID(nodeManager);
|
|
|
- nodeManager.sendHeartbeat(datanodeID, null, reportState);
|
|
|
+ DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(
|
|
|
+ nodeManager);
|
|
|
+ nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
|
|
|
+ null, reportState);
|
|
|
}
|
|
|
|
|
|
// Wait for 4 seconds max.
|
|
@@ -200,7 +201,8 @@ public class TestNodeManager {
|
|
|
|
|
|
// Need 100 nodes to come out of chill mode, only one node is sending HB.
|
|
|
nodeManager.setMinimumChillModeNodes(100);
|
|
|
- nodeManager.sendHeartbeat(TestUtils.getDatanodeID(nodeManager),
|
|
|
+ nodeManager.sendHeartbeat(TestUtils.getDatanodeDetails(nodeManager)
|
|
|
+ .getProtoBufMessage(),
|
|
|
null, reportState);
|
|
|
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
|
|
|
100, 4 * 1000);
|
|
@@ -223,11 +225,13 @@ public class TestNodeManager {
|
|
|
|
|
|
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
|
|
|
nodeManager.setMinimumChillModeNodes(3);
|
|
|
- DatanodeID datanodeID = TestUtils.getDatanodeID(nodeManager);
|
|
|
+ DatanodeDetails datanodeDetails = TestUtils
|
|
|
+ .getDatanodeDetails(nodeManager);
|
|
|
|
|
|
// Send 10 heartbeat from same node, and assert we never leave chill mode.
|
|
|
for (int x = 0; x < 10; x++) {
|
|
|
- nodeManager.sendHeartbeat(datanodeID, null, reportState);
|
|
|
+ nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
|
|
|
+ null, reportState);
|
|
|
}
|
|
|
|
|
|
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
|
|
@@ -253,11 +257,12 @@ public class TestNodeManager {
|
|
|
conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
|
|
|
100, TimeUnit.MILLISECONDS);
|
|
|
SCMNodeManager nodeManager = createNodeManager(conf);
|
|
|
- DatanodeID datanodeID = TestUtils.getDatanodeID(nodeManager);
|
|
|
+ DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(nodeManager);
|
|
|
nodeManager.close();
|
|
|
|
|
|
// These should never be processed.
|
|
|
- nodeManager.sendHeartbeat(datanodeID, null, reportState);
|
|
|
+ nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
|
|
|
+ null, reportState);
|
|
|
|
|
|
// Let us just wait for 2 seconds to prove that HBs are not processed.
|
|
|
Thread.sleep(2 * 1000);
|
|
@@ -277,12 +282,13 @@ public class TestNodeManager {
|
|
|
OzoneConfiguration conf = getConf();
|
|
|
conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
|
|
|
100, TimeUnit.MILLISECONDS);
|
|
|
- DatanodeID datanodeID = TestUtils.getDatanodeID();
|
|
|
+ DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails();
|
|
|
try (SCMNodeManager nodemanager = createNodeManager(conf)) {
|
|
|
- nodemanager.register(datanodeID);
|
|
|
- List<SCMCommand> command = nodemanager.sendHeartbeat(datanodeID,
|
|
|
+ nodemanager.register(datanodeDetails.getProtoBufMessage());
|
|
|
+ List<SCMCommand> command = nodemanager.sendHeartbeat(
|
|
|
+ datanodeDetails.getProtoBufMessage(),
|
|
|
null, reportState);
|
|
|
- Assert.assertTrue(nodemanager.getAllNodes().contains(datanodeID));
|
|
|
+ Assert.assertTrue(nodemanager.getAllNodes().contains(datanodeDetails));
|
|
|
Assert.assertTrue("On regular HB calls, SCM responses a "
|
|
|
+ "datanode with an empty command list", command.isEmpty());
|
|
|
}
|
|
@@ -291,7 +297,7 @@ public class TestNodeManager {
|
|
|
// This happens when SCM restarts.
|
|
|
try (SCMNodeManager nodemanager = createNodeManager(conf)) {
|
|
|
Assert.assertFalse(nodemanager
|
|
|
- .getAllNodes().contains(datanodeID));
|
|
|
+ .getAllNodes().contains(datanodeDetails));
|
|
|
try {
|
|
|
// SCM handles heartbeat asynchronously.
|
|
|
// It may need more than one heartbeat processing to
|
|
@@ -299,8 +305,8 @@ public class TestNodeManager {
|
|
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
@Override public Boolean get() {
|
|
|
List<SCMCommand> command =
|
|
|
- nodemanager.sendHeartbeat(datanodeID, null,
|
|
|
- reportState);
|
|
|
+ nodemanager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
|
|
|
+ null, reportState);
|
|
|
return command.size() == 1 && command.get(0).getType()
|
|
|
.equals(SCMCmdType.reregisterCommand);
|
|
|
}
|
|
@@ -329,8 +335,10 @@ public class TestNodeManager {
|
|
|
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
|
|
|
|
|
|
for (int x = 0; x < count; x++) {
|
|
|
- DatanodeID datanodeID = TestUtils.getDatanodeID(nodeManager);
|
|
|
- nodeManager.sendHeartbeat(datanodeID, null, reportState);
|
|
|
+ DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(
|
|
|
+ nodeManager);
|
|
|
+ nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
|
|
|
+ null, reportState);
|
|
|
}
|
|
|
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
|
|
|
100, 4 * 1000);
|
|
@@ -415,41 +423,42 @@ public class TestNodeManager {
|
|
|
|
|
|
|
|
|
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
|
|
|
- List<DatanodeID> nodeList = createNodeSet(nodeManager, nodeCount,
|
|
|
+ List<DatanodeDetails> nodeList = createNodeSet(nodeManager, nodeCount,
|
|
|
"Node");
|
|
|
|
|
|
- DatanodeID staleNode = TestUtils.getDatanodeID(nodeManager);
|
|
|
+ DatanodeDetails staleNode = TestUtils.getDatanodeDetails(nodeManager);
|
|
|
|
|
|
// Heartbeat once
|
|
|
- nodeManager.sendHeartbeat(staleNode, null, reportState);
|
|
|
+ nodeManager.sendHeartbeat(staleNode.getProtoBufMessage(),
|
|
|
+ null, reportState);
|
|
|
|
|
|
// Heartbeat all other nodes.
|
|
|
- for (DatanodeID dn : nodeList) {
|
|
|
- nodeManager.sendHeartbeat(dn, null, reportState);
|
|
|
+ for (DatanodeDetails dn : nodeList) {
|
|
|
+ nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null, reportState);
|
|
|
}
|
|
|
|
|
|
// Wait for 2 seconds .. and heartbeat good nodes again.
|
|
|
Thread.sleep(2 * 1000);
|
|
|
|
|
|
- for (DatanodeID dn : nodeList) {
|
|
|
- nodeManager.sendHeartbeat(dn, null, reportState);
|
|
|
+ for (DatanodeDetails dn : nodeList) {
|
|
|
+ nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null, reportState);
|
|
|
}
|
|
|
|
|
|
// Wait for 2 seconds, wait a total of 4 seconds to make sure that the
|
|
|
// node moves into stale state.
|
|
|
Thread.sleep(2 * 1000);
|
|
|
- List<DatanodeID> staleNodeList = nodeManager.getNodes(STALE);
|
|
|
+ List<DatanodeDetails> staleNodeList = nodeManager.getNodes(STALE);
|
|
|
assertEquals("Expected to find 1 stale node",
|
|
|
1, nodeManager.getNodeCount(STALE));
|
|
|
assertEquals("Expected to find 1 stale node",
|
|
|
1, staleNodeList.size());
|
|
|
assertEquals("Stale node is not the expected ID", staleNode
|
|
|
- .getDatanodeUuid(), staleNodeList.get(0).getDatanodeUuid());
|
|
|
+ .getUuid(), staleNodeList.get(0).getUuid());
|
|
|
Thread.sleep(1000);
|
|
|
|
|
|
// heartbeat good nodes again.
|
|
|
- for (DatanodeID dn : nodeList) {
|
|
|
- nodeManager.sendHeartbeat(dn, null, reportState);
|
|
|
+ for (DatanodeDetails dn : nodeList) {
|
|
|
+ nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null, reportState);
|
|
|
}
|
|
|
|
|
|
// 6 seconds is the dead window for this test , so we wait a total of
|
|
@@ -464,13 +473,13 @@ public class TestNodeManager {
|
|
|
0, staleNodeList.size());
|
|
|
|
|
|
// Check for the dead node now.
|
|
|
- List<DatanodeID> deadNodeList = nodeManager.getNodes(DEAD);
|
|
|
+ List<DatanodeDetails> deadNodeList = nodeManager.getNodes(DEAD);
|
|
|
assertEquals("Expected to find 1 dead node", 1,
|
|
|
nodeManager.getNodeCount(DEAD));
|
|
|
assertEquals("Expected to find 1 dead node",
|
|
|
1, deadNodeList.size());
|
|
|
assertEquals("Dead node is not the expected ID", staleNode
|
|
|
- .getDatanodeUuid(), deadNodeList.get(0).getDatanodeUuid());
|
|
|
+ .getUuid(), deadNodeList.get(0).getUuid());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -558,15 +567,18 @@ public class TestNodeManager {
|
|
|
* Cluster state: Healthy: All nodes are heartbeat-ing like normal.
|
|
|
*/
|
|
|
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
|
|
|
- DatanodeID healthyNode =
|
|
|
- TestUtils.getDatanodeID(nodeManager, "HealthyNode");
|
|
|
- DatanodeID staleNode =
|
|
|
- TestUtils.getDatanodeID(nodeManager, "StaleNode");
|
|
|
- DatanodeID deadNode =
|
|
|
- TestUtils.getDatanodeID(nodeManager, "DeadNode");
|
|
|
- nodeManager.sendHeartbeat(healthyNode, null, reportState);
|
|
|
- nodeManager.sendHeartbeat(staleNode, null, reportState);
|
|
|
- nodeManager.sendHeartbeat(deadNode, null, reportState);
|
|
|
+ DatanodeDetails healthyNode =
|
|
|
+ TestUtils.getDatanodeDetails(nodeManager, "HealthyNode");
|
|
|
+ DatanodeDetails staleNode =
|
|
|
+ TestUtils.getDatanodeDetails(nodeManager, "StaleNode");
|
|
|
+ DatanodeDetails deadNode =
|
|
|
+ TestUtils.getDatanodeDetails(nodeManager, "DeadNode");
|
|
|
+ nodeManager.sendHeartbeat(
|
|
|
+ healthyNode.getProtoBufMessage(), null, reportState);
|
|
|
+ nodeManager.sendHeartbeat(
|
|
|
+ staleNode.getProtoBufMessage(), null, reportState);
|
|
|
+ nodeManager.sendHeartbeat(
|
|
|
+ deadNode.getProtoBufMessage(), null, reportState);
|
|
|
|
|
|
// Sleep so that heartbeat processing thread gets to run.
|
|
|
Thread.sleep(500);
|
|
@@ -592,12 +604,16 @@ public class TestNodeManager {
|
|
|
* the 3 second windows.
|
|
|
*/
|
|
|
|
|
|
- nodeManager.sendHeartbeat(healthyNode, null, reportState);
|
|
|
- nodeManager.sendHeartbeat(staleNode, null, reportState);
|
|
|
- nodeManager.sendHeartbeat(deadNode, null, reportState);
|
|
|
+ nodeManager.sendHeartbeat(
|
|
|
+ healthyNode.getProtoBufMessage(), null, reportState);
|
|
|
+ nodeManager.sendHeartbeat(
|
|
|
+ staleNode.getProtoBufMessage(), null, reportState);
|
|
|
+ nodeManager.sendHeartbeat(
|
|
|
+ deadNode.getProtoBufMessage(), null, reportState);
|
|
|
|
|
|
Thread.sleep(1500);
|
|
|
- nodeManager.sendHeartbeat(healthyNode, null, reportState);
|
|
|
+ nodeManager.sendHeartbeat(
|
|
|
+ healthyNode.getProtoBufMessage(), null, reportState);
|
|
|
Thread.sleep(2 * 1000);
|
|
|
assertEquals(1, nodeManager.getNodeCount(HEALTHY));
|
|
|
|
|
@@ -605,10 +621,10 @@ public class TestNodeManager {
|
|
|
// 3.5 seconds from last heartbeat for the stale and deadNode. So those
|
|
|
// 2 nodes must move to Stale state and the healthy node must
|
|
|
// remain in the healthy State.
|
|
|
- List<DatanodeID> healthyList = nodeManager.getNodes(HEALTHY);
|
|
|
+ List<DatanodeDetails> healthyList = nodeManager.getNodes(HEALTHY);
|
|
|
assertEquals("Expected one healthy node", 1, healthyList.size());
|
|
|
assertEquals("Healthy node is not the expected ID", healthyNode
|
|
|
- .getDatanodeUuid(), healthyList.get(0).getDatanodeUuid());
|
|
|
+ .getUuid(), healthyList.get(0).getUuid());
|
|
|
|
|
|
assertEquals(2, nodeManager.getNodeCount(STALE));
|
|
|
|
|
@@ -617,18 +633,21 @@ public class TestNodeManager {
|
|
|
* staleNode to move to stale state and deadNode to move to dead state.
|
|
|
*/
|
|
|
|
|
|
- nodeManager.sendHeartbeat(healthyNode, null, reportState);
|
|
|
- nodeManager.sendHeartbeat(staleNode, null, reportState);
|
|
|
+ nodeManager.sendHeartbeat(
|
|
|
+ healthyNode.getProtoBufMessage(), null, reportState);
|
|
|
+ nodeManager.sendHeartbeat(
|
|
|
+ staleNode.getProtoBufMessage(), null, reportState);
|
|
|
Thread.sleep(1500);
|
|
|
- nodeManager.sendHeartbeat(healthyNode, null, reportState);
|
|
|
+ nodeManager.sendHeartbeat(
|
|
|
+ healthyNode.getProtoBufMessage(), null, reportState);
|
|
|
Thread.sleep(2 * 1000);
|
|
|
|
|
|
// 3.5 seconds have elapsed for stale node, so it moves into Stale.
|
|
|
// 7 seconds have elapsed for dead node, so it moves into dead.
|
|
|
// 2 Seconds have elapsed for healthy node, so it stays in healhty state.
|
|
|
healthyList = nodeManager.getNodes(HEALTHY);
|
|
|
- List<DatanodeID> staleList = nodeManager.getNodes(STALE);
|
|
|
- List<DatanodeID> deadList = nodeManager.getNodes(DEAD);
|
|
|
+ List<DatanodeDetails> staleList = nodeManager.getNodes(STALE);
|
|
|
+ List<DatanodeDetails> deadList = nodeManager.getNodes(DEAD);
|
|
|
|
|
|
assertEquals(3, nodeManager.getAllNodes().size());
|
|
|
assertEquals(1, nodeManager.getNodeCount(HEALTHY));
|
|
@@ -638,24 +657,27 @@ public class TestNodeManager {
|
|
|
assertEquals("Expected one healthy node",
|
|
|
1, healthyList.size());
|
|
|
assertEquals("Healthy node is not the expected ID", healthyNode
|
|
|
- .getDatanodeUuid(), healthyList.get(0).getDatanodeUuid());
|
|
|
+ .getUuid(), healthyList.get(0).getUuid());
|
|
|
|
|
|
assertEquals("Expected one stale node",
|
|
|
1, staleList.size());
|
|
|
assertEquals("Stale node is not the expected ID", staleNode
|
|
|
- .getDatanodeUuid(), staleList.get(0).getDatanodeUuid());
|
|
|
+ .getUuid(), staleList.get(0).getUuid());
|
|
|
|
|
|
assertEquals("Expected one dead node",
|
|
|
1, deadList.size());
|
|
|
assertEquals("Dead node is not the expected ID", deadNode
|
|
|
- .getDatanodeUuid(), deadList.get(0).getDatanodeUuid());
|
|
|
+ .getUuid(), deadList.get(0).getUuid());
|
|
|
/**
|
|
|
* Cluster State : let us heartbeat all the nodes and verify that we get
|
|
|
* back all the nodes in healthy state.
|
|
|
*/
|
|
|
- nodeManager.sendHeartbeat(healthyNode, null, reportState);
|
|
|
- nodeManager.sendHeartbeat(staleNode, null, reportState);
|
|
|
- nodeManager.sendHeartbeat(deadNode, null, reportState);
|
|
|
+ nodeManager.sendHeartbeat(
|
|
|
+ healthyNode.getProtoBufMessage(), null, reportState);
|
|
|
+ nodeManager.sendHeartbeat(
|
|
|
+ staleNode.getProtoBufMessage(), null, reportState);
|
|
|
+ nodeManager.sendHeartbeat(
|
|
|
+ deadNode.getProtoBufMessage(), null, reportState);
|
|
|
Thread.sleep(500);
|
|
|
//Assert all nodes are healthy.
|
|
|
assertEquals(3, nodeManager.getAllNodes().size());
|
|
@@ -671,11 +693,12 @@ public class TestNodeManager {
|
|
|
* @param sleepDuration - Duration to sleep between heartbeats.
|
|
|
* @throws InterruptedException
|
|
|
*/
|
|
|
- private void heartbeatNodeSet(SCMNodeManager manager, List<DatanodeID> list,
|
|
|
- int sleepDuration) throws InterruptedException {
|
|
|
+ private void heartbeatNodeSet(SCMNodeManager manager,
|
|
|
+ List<DatanodeDetails> list,
|
|
|
+ int sleepDuration) throws InterruptedException {
|
|
|
while (!Thread.currentThread().isInterrupted()) {
|
|
|
- for (DatanodeID dn : list) {
|
|
|
- manager.sendHeartbeat(dn, null, reportState);
|
|
|
+ for (DatanodeDetails dn : list) {
|
|
|
+ manager.sendHeartbeat(dn.getProtoBufMessage(), null, reportState);
|
|
|
}
|
|
|
Thread.sleep(sleepDuration);
|
|
|
}
|
|
@@ -688,12 +711,12 @@ public class TestNodeManager {
|
|
|
* @param prefix - A prefix string that can be used in verification.
|
|
|
* @return List of Nodes.
|
|
|
*/
|
|
|
- private List<DatanodeID> createNodeSet(SCMNodeManager nodeManager, int
|
|
|
+ private List<DatanodeDetails> createNodeSet(SCMNodeManager nodeManager, int
|
|
|
count, String
|
|
|
prefix) {
|
|
|
- List<DatanodeID> list = new LinkedList<>();
|
|
|
+ List<DatanodeDetails> list = new LinkedList<>();
|
|
|
for (int x = 0; x < count; x++) {
|
|
|
- list.add(TestUtils.getDatanodeID(nodeManager, prefix + x));
|
|
|
+ list.add(TestUtils.getDatanodeDetails(nodeManager, prefix + x));
|
|
|
}
|
|
|
return list;
|
|
|
}
|
|
@@ -734,11 +757,11 @@ public class TestNodeManager {
|
|
|
|
|
|
|
|
|
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
|
|
|
- List<DatanodeID> healthyNodeList = createNodeSet(nodeManager,
|
|
|
+ List<DatanodeDetails> healthyNodeList = createNodeSet(nodeManager,
|
|
|
healthyCount, "Healthy");
|
|
|
- List<DatanodeID> staleNodeList = createNodeSet(nodeManager, staleCount,
|
|
|
- "Stale");
|
|
|
- List<DatanodeID> deadNodeList = createNodeSet(nodeManager, deadCount,
|
|
|
+ List<DatanodeDetails> staleNodeList = createNodeSet(nodeManager,
|
|
|
+ staleCount, "Stale");
|
|
|
+ List<DatanodeDetails> deadNodeList = createNodeSet(nodeManager, deadCount,
|
|
|
"Dead");
|
|
|
|
|
|
Runnable healthyNodeTask = () -> {
|
|
@@ -761,8 +784,8 @@ public class TestNodeManager {
|
|
|
|
|
|
// No Thread just one time HBs the node manager, so that these will be
|
|
|
// marked as dead nodes eventually.
|
|
|
- for (DatanodeID dn : deadNodeList) {
|
|
|
- nodeManager.sendHeartbeat(dn, null, reportState);
|
|
|
+ for (DatanodeDetails dn : deadNodeList) {
|
|
|
+ nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null, reportState);
|
|
|
}
|
|
|
|
|
|
|
|
@@ -784,9 +807,9 @@ public class TestNodeManager {
|
|
|
|
|
|
assertEquals(deadCount, nodeManager.getNodeCount(DEAD));
|
|
|
|
|
|
- List<DatanodeID> deadList = nodeManager.getNodes(DEAD);
|
|
|
+ List<DatanodeDetails> deadList = nodeManager.getNodes(DEAD);
|
|
|
|
|
|
- for (DatanodeID node : deadList) {
|
|
|
+ for (DatanodeDetails node : deadList) {
|
|
|
assertThat(node.getHostName(), CoreMatchers.startsWith("Dead"));
|
|
|
}
|
|
|
|
|
@@ -825,9 +848,10 @@ public class TestNodeManager {
|
|
|
MILLISECONDS);
|
|
|
|
|
|
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
|
|
|
- List<DatanodeID> healthyList = createNodeSet(nodeManager,
|
|
|
+ List<DatanodeDetails> healthyList = createNodeSet(nodeManager,
|
|
|
healthyCount, "h");
|
|
|
- List<DatanodeID> staleList = createNodeSet(nodeManager, staleCount, "s");
|
|
|
+ List<DatanodeDetails> staleList = createNodeSet(nodeManager,
|
|
|
+ staleCount, "s");
|
|
|
|
|
|
Runnable healthyNodeTask = () -> {
|
|
|
try {
|
|
@@ -886,8 +910,8 @@ public class TestNodeManager {
|
|
|
conf.setInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, 500);
|
|
|
|
|
|
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
|
|
|
- List<DatanodeID> healthyList = createNodeSet(nodeManager, healthyCount,
|
|
|
- "h");
|
|
|
+ List<DatanodeDetails> healthyList = createNodeSet(nodeManager,
|
|
|
+ healthyCount, "h");
|
|
|
GenericTestUtils.LogCapturer logCapturer =
|
|
|
GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG);
|
|
|
Runnable healthyNodeTask = () -> {
|
|
@@ -921,8 +945,10 @@ public class TestNodeManager {
|
|
|
|
|
|
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
|
|
|
nodeManager.setMinimumChillModeNodes(10);
|
|
|
- DatanodeID datanodeID = TestUtils.getDatanodeID(nodeManager);
|
|
|
- nodeManager.sendHeartbeat(datanodeID, null, reportState);
|
|
|
+ DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(
|
|
|
+ nodeManager);
|
|
|
+ nodeManager.sendHeartbeat(
|
|
|
+ datanodeDetails.getProtoBufMessage(), null, reportState);
|
|
|
String status = nodeManager.getChillModeStatus();
|
|
|
Assert.assertThat(status, containsString("Still in chill " +
|
|
|
"mode, waiting on nodes to report in."));
|
|
@@ -948,8 +974,9 @@ public class TestNodeManager {
|
|
|
|
|
|
// Assert that node manager force enter cannot be overridden by nodes HBs.
|
|
|
for (int x = 0; x < 20; x++) {
|
|
|
- DatanodeID datanode = TestUtils.getDatanodeID(nodeManager);
|
|
|
- nodeManager.sendHeartbeat(datanode, null, reportState);
|
|
|
+ DatanodeDetails datanode = TestUtils.getDatanodeDetails(nodeManager);
|
|
|
+ nodeManager.sendHeartbeat(datanode.getProtoBufMessage(),
|
|
|
+ null, reportState);
|
|
|
}
|
|
|
|
|
|
Thread.sleep(500);
|
|
@@ -985,14 +1012,15 @@ public class TestNodeManager {
|
|
|
|
|
|
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
|
|
|
for (int x = 0; x < nodeCount; x++) {
|
|
|
- DatanodeID datanodeID = TestUtils.getDatanodeID(nodeManager);
|
|
|
+ DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(
|
|
|
+ nodeManager);
|
|
|
|
|
|
SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
|
|
|
SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
|
|
|
srb.setStorageUuid(UUID.randomUUID().toString());
|
|
|
srb.setCapacity(capacity).setScmUsed(used).
|
|
|
setRemaining(capacity - used).build();
|
|
|
- nodeManager.sendHeartbeat(datanodeID,
|
|
|
+ nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
|
|
|
nrb.addStorageReport(srb).build(), reportState);
|
|
|
}
|
|
|
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
|
|
@@ -1029,7 +1057,8 @@ public class TestNodeManager {
|
|
|
conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS);
|
|
|
|
|
|
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
|
|
|
- DatanodeID datanodeID = TestUtils.getDatanodeID(nodeManager);
|
|
|
+ DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(
|
|
|
+ nodeManager);
|
|
|
final long capacity = 2000;
|
|
|
final long usedPerHeartbeat = 100;
|
|
|
|
|
@@ -1041,7 +1070,8 @@ public class TestNodeManager {
|
|
|
.setRemaining(capacity - x * usedPerHeartbeat).build();
|
|
|
nrb.addStorageReport(srb);
|
|
|
|
|
|
- nodeManager.sendHeartbeat(datanodeID, nrb.build(), reportState);
|
|
|
+ nodeManager.sendHeartbeat(
|
|
|
+ datanodeDetails.getProtoBufMessage(), nrb.build(), reportState);
|
|
|
Thread.sleep(100);
|
|
|
}
|
|
|
|
|
@@ -1063,23 +1093,23 @@ public class TestNodeManager {
|
|
|
|
|
|
// Test NodeManager#getNodeStats
|
|
|
assertEquals(nodeCount, nodeManager.getNodeStats().size());
|
|
|
- long nodeCapacity = nodeManager.getNodeStat(datanodeID).get()
|
|
|
+ long nodeCapacity = nodeManager.getNodeStat(datanodeDetails).get()
|
|
|
.getCapacity().get();
|
|
|
assertEquals(capacity, nodeCapacity);
|
|
|
|
|
|
- foundScmUsed = nodeManager.getNodeStat(datanodeID).get().getScmUsed()
|
|
|
+ foundScmUsed = nodeManager.getNodeStat(datanodeDetails).get().getScmUsed()
|
|
|
.get();
|
|
|
assertEquals(expectedScmUsed, foundScmUsed);
|
|
|
|
|
|
- foundRemaining = nodeManager.getNodeStat(datanodeID).get()
|
|
|
+ foundRemaining = nodeManager.getNodeStat(datanodeDetails).get()
|
|
|
.getRemaining().get();
|
|
|
assertEquals(expectedRemaining, foundRemaining);
|
|
|
|
|
|
// Compare the result from
|
|
|
// NodeManager#getNodeStats and NodeManager#getNodeStat
|
|
|
SCMNodeStat stat1 = nodeManager.getNodeStats().
|
|
|
- get(datanodeID.getDatanodeUuid());
|
|
|
- SCMNodeStat stat2 = nodeManager.getNodeStat(datanodeID).get();
|
|
|
+ get(datanodeDetails);
|
|
|
+ SCMNodeStat stat2 = nodeManager.getNodeStat(datanodeDetails).get();
|
|
|
assertEquals(stat1, stat2);
|
|
|
|
|
|
// Wait up to 4s so that the node becomes stale
|
|
@@ -1089,14 +1119,14 @@ public class TestNodeManager {
|
|
|
4 * 1000);
|
|
|
assertEquals(nodeCount, nodeManager.getNodeStats().size());
|
|
|
|
|
|
- foundCapacity = nodeManager.getNodeStat(datanodeID).get()
|
|
|
+ foundCapacity = nodeManager.getNodeStat(datanodeDetails).get()
|
|
|
.getCapacity().get();
|
|
|
assertEquals(capacity, foundCapacity);
|
|
|
- foundScmUsed = nodeManager.getNodeStat(datanodeID).get()
|
|
|
+ foundScmUsed = nodeManager.getNodeStat(datanodeDetails).get()
|
|
|
.getScmUsed().get();
|
|
|
assertEquals(expectedScmUsed, foundScmUsed);
|
|
|
|
|
|
- foundRemaining = nodeManager.getNodeStat(datanodeID).get().
|
|
|
+ foundRemaining = nodeManager.getNodeStat(datanodeDetails).get().
|
|
|
getRemaining().get();
|
|
|
assertEquals(expectedRemaining, foundRemaining);
|
|
|
|
|
@@ -1123,7 +1153,8 @@ public class TestNodeManager {
|
|
|
srb.setCapacity(capacity).setScmUsed(expectedScmUsed)
|
|
|
.setRemaining(expectedRemaining).build();
|
|
|
nrb.addStorageReport(srb);
|
|
|
- nodeManager.sendHeartbeat(datanodeID, nrb.build(), reportState);
|
|
|
+ nodeManager.sendHeartbeat(
|
|
|
+ datanodeDetails.getProtoBufMessage(), nrb.build(), reportState);
|
|
|
|
|
|
// Wait up to 5 seconds so that the dead node becomes healthy
|
|
|
// Verify usage info should be updated.
|
|
@@ -1134,13 +1165,13 @@ public class TestNodeManager {
|
|
|
() -> nodeManager.getStats().getScmUsed().get() == expectedScmUsed,
|
|
|
100, 4 * 1000);
|
|
|
assertEquals(nodeCount, nodeManager.getNodeStats().size());
|
|
|
- foundCapacity = nodeManager.getNodeStat(datanodeID).get()
|
|
|
+ foundCapacity = nodeManager.getNodeStat(datanodeDetails).get()
|
|
|
.getCapacity().get();
|
|
|
assertEquals(capacity, foundCapacity);
|
|
|
- foundScmUsed = nodeManager.getNodeStat(datanodeID).get().getScmUsed()
|
|
|
+ foundScmUsed = nodeManager.getNodeStat(datanodeDetails).get().getScmUsed()
|
|
|
.get();
|
|
|
assertEquals(expectedScmUsed, foundScmUsed);
|
|
|
- foundRemaining = nodeManager.getNodeStat(datanodeID).get()
|
|
|
+ foundRemaining = nodeManager.getNodeStat(datanodeDetails).get()
|
|
|
.getRemaining().get();
|
|
|
assertEquals(expectedRemaining, foundRemaining);
|
|
|
}
|