|
@@ -19,6 +19,8 @@ package org.apache.hadoop.hdds.scm.node;
|
|
|
|
|
|
import com.google.common.base.Supplier;
|
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
|
+import org.apache.hadoop.hdds.protocol.proto
|
|
|
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
|
|
|
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
|
|
import org.apache.hadoop.hdds.scm.TestUtils;
|
|
|
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
|
|
@@ -26,7 +28,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
|
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
|
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
|
|
import org.apache.hadoop.hdds.protocol.proto
|
|
|
- .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
|
|
|
+ .StorageContainerDatanodeProtocolProtos.StorageReportProto;
|
|
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
|
|
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
@@ -63,8 +65,6 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
|
|
|
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
|
|
|
.HEALTHY;
|
|
|
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
|
|
|
-import static org.apache.hadoop.hdds.protocol.proto
|
|
|
- .StorageContainerDatanodeProtocolProtos.SCMCmdType;
|
|
|
import static org.hamcrest.CoreMatchers.containsString;
|
|
|
import static org.hamcrest.core.StringStartsWith.startsWith;
|
|
|
import static org.junit.Assert.assertEquals;
|
|
@@ -144,7 +144,7 @@ public class TestNodeManager {
|
|
|
for (int x = 0; x < nodeManager.getMinimumChillModeNodes(); x++) {
|
|
|
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(
|
|
|
nodeManager);
|
|
|
- nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
|
|
|
+ nodeManager.sendHeartbeat(datanodeDetails,
|
|
|
null);
|
|
|
}
|
|
|
|
|
@@ -191,8 +191,8 @@ public class TestNodeManager {
|
|
|
|
|
|
// Need 100 nodes to come out of chill mode, only one node is sending HB.
|
|
|
nodeManager.setMinimumChillModeNodes(100);
|
|
|
- nodeManager.sendHeartbeat(TestUtils.getDatanodeDetails(nodeManager)
|
|
|
- .getProtoBufMessage(), null);
|
|
|
+ nodeManager.sendHeartbeat(TestUtils.getDatanodeDetails(nodeManager),
|
|
|
+ null);
|
|
|
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
|
|
|
100, 4 * 1000);
|
|
|
assertFalse("Not enough heartbeat, Node manager should have" +
|
|
@@ -219,7 +219,7 @@ public class TestNodeManager {
|
|
|
|
|
|
// Send 10 heartbeat from same node, and assert we never leave chill mode.
|
|
|
for (int x = 0; x < 10; x++) {
|
|
|
- nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
|
|
|
+ nodeManager.sendHeartbeat(datanodeDetails,
|
|
|
null);
|
|
|
}
|
|
|
|
|
@@ -250,7 +250,7 @@ public class TestNodeManager {
|
|
|
nodeManager.close();
|
|
|
|
|
|
// These should never be processed.
|
|
|
- nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
|
|
|
+ nodeManager.sendHeartbeat(datanodeDetails,
|
|
|
null);
|
|
|
|
|
|
// Let us just wait for 2 seconds to prove that HBs are not processed.
|
|
@@ -274,13 +274,13 @@ public class TestNodeManager {
|
|
|
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails();
|
|
|
String dnId = datanodeDetails.getUuidString();
|
|
|
String storagePath = testDir.getAbsolutePath() + "/" + dnId;
|
|
|
- List<SCMStorageReport> reports =
|
|
|
+ List<StorageReportProto> reports =
|
|
|
TestUtils.createStorageReport(100, 10, 90, storagePath, null, dnId, 1);
|
|
|
try (SCMNodeManager nodemanager = createNodeManager(conf)) {
|
|
|
- nodemanager.register(datanodeDetails.getProtoBufMessage(),
|
|
|
+ nodemanager.register(datanodeDetails,
|
|
|
TestUtils.createNodeReport(reports));
|
|
|
List<SCMCommand> command = nodemanager.sendHeartbeat(
|
|
|
- datanodeDetails.getProtoBufMessage(), null);
|
|
|
+ datanodeDetails, null);
|
|
|
Assert.assertTrue(nodemanager.getAllNodes().contains(datanodeDetails));
|
|
|
Assert.assertTrue("On regular HB calls, SCM responses a "
|
|
|
+ "datanode with an empty command list", command.isEmpty());
|
|
@@ -298,10 +298,10 @@ public class TestNodeManager {
|
|
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
@Override public Boolean get() {
|
|
|
List<SCMCommand> command =
|
|
|
- nodemanager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
|
|
|
+ nodemanager.sendHeartbeat(datanodeDetails,
|
|
|
null);
|
|
|
return command.size() == 1 && command.get(0).getType()
|
|
|
- .equals(SCMCmdType.reregisterCommand);
|
|
|
+ .equals(SCMCommandProto.Type.reregisterCommand);
|
|
|
}
|
|
|
}, 100, 3 * 1000);
|
|
|
} catch (TimeoutException e) {
|
|
@@ -330,7 +330,7 @@ public class TestNodeManager {
|
|
|
for (int x = 0; x < count; x++) {
|
|
|
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(
|
|
|
nodeManager);
|
|
|
- nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
|
|
|
+ nodeManager.sendHeartbeat(datanodeDetails,
|
|
|
null);
|
|
|
}
|
|
|
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
|
|
@@ -422,19 +422,19 @@ public class TestNodeManager {
|
|
|
DatanodeDetails staleNode = TestUtils.getDatanodeDetails(nodeManager);
|
|
|
|
|
|
// Heartbeat once
|
|
|
- nodeManager.sendHeartbeat(staleNode.getProtoBufMessage(),
|
|
|
+ nodeManager.sendHeartbeat(staleNode,
|
|
|
null);
|
|
|
|
|
|
// Heartbeat all other nodes.
|
|
|
for (DatanodeDetails dn : nodeList) {
|
|
|
- nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null);
|
|
|
+ nodeManager.sendHeartbeat(dn, null);
|
|
|
}
|
|
|
|
|
|
// Wait for 2 seconds .. and heartbeat good nodes again.
|
|
|
Thread.sleep(2 * 1000);
|
|
|
|
|
|
for (DatanodeDetails dn : nodeList) {
|
|
|
- nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null);
|
|
|
+ nodeManager.sendHeartbeat(dn, null);
|
|
|
}
|
|
|
|
|
|
// Wait for 2 seconds, wait a total of 4 seconds to make sure that the
|
|
@@ -451,7 +451,7 @@ public class TestNodeManager {
|
|
|
|
|
|
// heartbeat good nodes again.
|
|
|
for (DatanodeDetails dn : nodeList) {
|
|
|
- nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null);
|
|
|
+ nodeManager.sendHeartbeat(dn, null);
|
|
|
}
|
|
|
|
|
|
// 6 seconds is the dead window for this test , so we wait a total of
|
|
@@ -565,11 +565,11 @@ public class TestNodeManager {
|
|
|
DatanodeDetails deadNode =
|
|
|
TestUtils.getDatanodeDetails(nodeManager);
|
|
|
nodeManager.sendHeartbeat(
|
|
|
- healthyNode.getProtoBufMessage(), null);
|
|
|
+ healthyNode, null);
|
|
|
nodeManager.sendHeartbeat(
|
|
|
- staleNode.getProtoBufMessage(), null);
|
|
|
+ staleNode, null);
|
|
|
nodeManager.sendHeartbeat(
|
|
|
- deadNode.getProtoBufMessage(), null);
|
|
|
+ deadNode, null);
|
|
|
|
|
|
// Sleep so that heartbeat processing thread gets to run.
|
|
|
Thread.sleep(500);
|
|
@@ -596,15 +596,15 @@ public class TestNodeManager {
|
|
|
*/
|
|
|
|
|
|
nodeManager.sendHeartbeat(
|
|
|
- healthyNode.getProtoBufMessage(), null);
|
|
|
+ healthyNode, null);
|
|
|
nodeManager.sendHeartbeat(
|
|
|
- staleNode.getProtoBufMessage(), null);
|
|
|
+ staleNode, null);
|
|
|
nodeManager.sendHeartbeat(
|
|
|
- deadNode.getProtoBufMessage(), null);
|
|
|
+ deadNode, null);
|
|
|
|
|
|
Thread.sleep(1500);
|
|
|
nodeManager.sendHeartbeat(
|
|
|
- healthyNode.getProtoBufMessage(), null);
|
|
|
+ healthyNode, null);
|
|
|
Thread.sleep(2 * 1000);
|
|
|
assertEquals(1, nodeManager.getNodeCount(HEALTHY));
|
|
|
|
|
@@ -625,12 +625,12 @@ public class TestNodeManager {
|
|
|
*/
|
|
|
|
|
|
nodeManager.sendHeartbeat(
|
|
|
- healthyNode.getProtoBufMessage(), null);
|
|
|
+ healthyNode, null);
|
|
|
nodeManager.sendHeartbeat(
|
|
|
- staleNode.getProtoBufMessage(), null);
|
|
|
+ staleNode, null);
|
|
|
Thread.sleep(1500);
|
|
|
nodeManager.sendHeartbeat(
|
|
|
- healthyNode.getProtoBufMessage(), null);
|
|
|
+ healthyNode, null);
|
|
|
Thread.sleep(2 * 1000);
|
|
|
|
|
|
// 3.5 seconds have elapsed for stale node, so it moves into Stale.
|
|
@@ -664,11 +664,11 @@ public class TestNodeManager {
|
|
|
* back all the nodes in healthy state.
|
|
|
*/
|
|
|
nodeManager.sendHeartbeat(
|
|
|
- healthyNode.getProtoBufMessage(), null);
|
|
|
+ healthyNode, null);
|
|
|
nodeManager.sendHeartbeat(
|
|
|
- staleNode.getProtoBufMessage(), null);
|
|
|
+ staleNode, null);
|
|
|
nodeManager.sendHeartbeat(
|
|
|
- deadNode.getProtoBufMessage(), null);
|
|
|
+ deadNode, null);
|
|
|
Thread.sleep(500);
|
|
|
//Assert all nodes are healthy.
|
|
|
assertEquals(3, nodeManager.getAllNodes().size());
|
|
@@ -689,7 +689,7 @@ public class TestNodeManager {
|
|
|
int sleepDuration) throws InterruptedException {
|
|
|
while (!Thread.currentThread().isInterrupted()) {
|
|
|
for (DatanodeDetails dn : list) {
|
|
|
- manager.sendHeartbeat(dn.getProtoBufMessage(), null);
|
|
|
+ manager.sendHeartbeat(dn, null);
|
|
|
}
|
|
|
Thread.sleep(sleepDuration);
|
|
|
}
|
|
@@ -775,7 +775,7 @@ public class TestNodeManager {
|
|
|
// No Thread just one time HBs the node manager, so that these will be
|
|
|
// marked as dead nodes eventually.
|
|
|
for (DatanodeDetails dn : deadNodeList) {
|
|
|
- nodeManager.sendHeartbeat(dn.getProtoBufMessage(), null);
|
|
|
+ nodeManager.sendHeartbeat(dn, null);
|
|
|
}
|
|
|
|
|
|
|
|
@@ -940,7 +940,7 @@ public class TestNodeManager {
|
|
|
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(
|
|
|
nodeManager);
|
|
|
nodeManager.sendHeartbeat(
|
|
|
- datanodeDetails.getProtoBufMessage(), null);
|
|
|
+ datanodeDetails, null);
|
|
|
String status = nodeManager.getChillModeStatus();
|
|
|
Assert.assertThat(status, containsString("Still in chill " +
|
|
|
"mode, waiting on nodes to report in."));
|
|
@@ -967,8 +967,7 @@ public class TestNodeManager {
|
|
|
// Assert that node manager force enter cannot be overridden by nodes HBs.
|
|
|
for (int x = 0; x < 20; x++) {
|
|
|
DatanodeDetails datanode = TestUtils.getDatanodeDetails(nodeManager);
|
|
|
- nodeManager.sendHeartbeat(datanode.getProtoBufMessage(),
|
|
|
- null);
|
|
|
+ nodeManager.sendHeartbeat(datanode, null);
|
|
|
}
|
|
|
|
|
|
Thread.sleep(500);
|
|
@@ -1009,10 +1008,10 @@ public class TestNodeManager {
|
|
|
String dnId = datanodeDetails.getUuidString();
|
|
|
long free = capacity - used;
|
|
|
String storagePath = testDir.getAbsolutePath() + "/" + dnId;
|
|
|
- List<SCMStorageReport> reports = TestUtils
|
|
|
+ List<StorageReportProto> reports = TestUtils
|
|
|
.createStorageReport(capacity, used, free, storagePath,
|
|
|
null, dnId, 1);
|
|
|
- nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
|
|
|
+ nodeManager.sendHeartbeat(datanodeDetails,
|
|
|
TestUtils.createNodeReport(reports));
|
|
|
}
|
|
|
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
|
|
@@ -1058,11 +1057,11 @@ public class TestNodeManager {
|
|
|
long scmUsed = x * usedPerHeartbeat;
|
|
|
long remaining = capacity - scmUsed;
|
|
|
String storagePath = testDir.getAbsolutePath() + "/" + dnId;
|
|
|
- List<SCMStorageReport> reports = TestUtils
|
|
|
+ List<StorageReportProto> reports = TestUtils
|
|
|
.createStorageReport(capacity, scmUsed, remaining, storagePath,
|
|
|
null, dnId, 1);
|
|
|
|
|
|
- nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
|
|
|
+ nodeManager.sendHeartbeat(datanodeDetails,
|
|
|
TestUtils.createNodeReport(reports));
|
|
|
Thread.sleep(100);
|
|
|
}
|
|
@@ -1140,10 +1139,10 @@ public class TestNodeManager {
|
|
|
|
|
|
// Send a new report to bring the dead node back to healthy
|
|
|
String storagePath = testDir.getAbsolutePath() + "/" + dnId;
|
|
|
- List<SCMStorageReport> reports = TestUtils
|
|
|
+ List<StorageReportProto> reports = TestUtils
|
|
|
.createStorageReport(capacity, expectedScmUsed, expectedRemaining,
|
|
|
storagePath, null, dnId, 1);
|
|
|
- nodeManager.sendHeartbeat(datanodeDetails.getProtoBufMessage(),
|
|
|
+ nodeManager.sendHeartbeat(datanodeDetails,
|
|
|
TestUtils.createNodeReport(reports));
|
|
|
|
|
|
// Wait up to 5 seconds so that the dead node becomes healthy
|