|
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.common;
|
|
import org.apache.commons.codec.digest.DigestUtils;
|
|
import org.apache.commons.codec.digest.DigestUtils;
|
|
import org.apache.commons.lang3.RandomUtils;
|
|
import org.apache.commons.lang3.RandomUtils;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
|
+import org.apache.hadoop.fs.FileSystemTestHelper;
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
|
|
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
|
|
import org.apache.hadoop.hdds.scm.TestUtils;
|
|
import org.apache.hadoop.hdds.scm.TestUtils;
|
|
@@ -33,6 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto
|
|
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
|
|
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
|
|
import org.apache.hadoop.hdds.protocol.proto
|
|
import org.apache.hadoop.hdds.protocol.proto
|
|
.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
|
|
.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
|
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
|
import org.apache.hadoop.hdds.protocol.proto
|
|
import org.apache.hadoop.hdds.protocol.proto
|
|
.StorageContainerDatanodeProtocolProtos.StorageReportProto;
|
|
.StorageContainerDatanodeProtocolProtos.StorageReportProto;
|
|
import org.apache.hadoop.hdds.protocol.proto
|
|
import org.apache.hadoop.hdds.protocol.proto
|
|
@@ -41,6 +43,7 @@ import org.apache.hadoop.ipc.RPC;
|
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
|
import org.apache.hadoop.ozone.OzoneConsts;
|
|
import org.apache.hadoop.ozone.OzoneConsts;
|
|
import org.apache.hadoop.ozone.container.common.helpers.ContainerReport;
|
|
import org.apache.hadoop.ozone.container.common.helpers.ContainerReport;
|
|
|
|
+import org.apache.hadoop.ozone.container.common.helpers.DatanodeVersionFile;
|
|
import org.apache.hadoop.ozone.container.common.statemachine
|
|
import org.apache.hadoop.ozone.container.common.statemachine
|
|
.DatanodeStateMachine;
|
|
.DatanodeStateMachine;
|
|
import org.apache.hadoop.ozone.container.common.statemachine
|
|
import org.apache.hadoop.ozone.container.common.statemachine
|
|
@@ -53,17 +56,22 @@ import org.apache.hadoop.ozone.container.common.states.endpoint
|
|
import org.apache.hadoop.ozone.container.common.states.endpoint
|
|
import org.apache.hadoop.ozone.container.common.states.endpoint
|
|
.VersionEndpointTask;
|
|
.VersionEndpointTask;
|
|
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
|
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
|
|
|
+import org.apache.hadoop.test.GenericTestUtils;
|
|
import org.apache.hadoop.test.PathUtils;
|
|
import org.apache.hadoop.test.PathUtils;
|
|
import org.apache.hadoop.util.Time;
|
|
import org.apache.hadoop.util.Time;
|
|
import org.junit.AfterClass;
|
|
import org.junit.AfterClass;
|
|
import org.junit.Assert;
|
|
import org.junit.Assert;
|
|
import org.junit.BeforeClass;
|
|
import org.junit.BeforeClass;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
|
|
+
|
|
|
|
+import static org.junit.Assert.fail;
|
|
import static org.mockito.Mockito.mock;
|
|
import static org.mockito.Mockito.mock;
|
|
|
|
|
|
import java.io.File;
|
|
import java.io.File;
|
|
import java.net.InetSocketAddress;
|
|
import java.net.InetSocketAddress;
|
|
|
|
+import java.util.ArrayList;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
+import java.util.Properties;
|
|
import java.util.UUID;
|
|
import java.util.UUID;
|
|
|
|
|
|
import static org.apache.hadoop.hdds.scm.TestUtils.getDatanodeDetails;
|
|
import static org.apache.hadoop.hdds.scm.TestUtils.getDatanodeDetails;
|
|
@@ -116,6 +124,11 @@ public class TestEndPoint {
|
|
responseProto.getKeys(0).getKey());
|
|
responseProto.getKeys(0).getKey());
|
|
Assert.assertEquals(VersionInfo.getLatestVersion().getDescription(),
|
|
Assert.assertEquals(VersionInfo.getLatestVersion().getDescription(),
|
|
responseProto.getKeys(0).getValue());
|
|
responseProto.getKeys(0).getValue());
|
|
|
|
+ Assert.assertEquals("scmUuid", responseProto.getKeys(
|
|
|
|
+ 1).getKey());
|
|
|
|
+ Assert.assertEquals(scmServerImpl.getScmUuid().toString(),
|
|
|
|
+ responseProto.getKeys(1).getValue());
|
|
|
|
+
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -126,11 +139,20 @@ public class TestEndPoint {
|
|
*/
|
|
*/
|
|
public void testGetVersionTask() throws Exception {
|
|
public void testGetVersionTask() throws Exception {
|
|
Configuration conf = SCMTestUtils.getConf();
|
|
Configuration conf = SCMTestUtils.getConf();
|
|
|
|
+ String path = new FileSystemTestHelper().getTestRootDir();
|
|
|
|
+ conf.set(DFS_DATANODE_DATA_DIR_KEY, path);
|
|
try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
|
|
try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
|
|
serverAddress, 1000)) {
|
|
serverAddress, 1000)) {
|
|
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
|
|
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
|
|
|
|
+ OzoneContainer ozoneContainer = mock(OzoneContainer.class);
|
|
|
|
+ List<StorageLocation> pathList = new ArrayList<>();
|
|
|
|
+ for (String dir : conf.getStrings(DFS_DATANODE_DATA_DIR_KEY)) {
|
|
|
|
+ StorageLocation location = StorageLocation.parse(dir);
|
|
|
|
+ pathList.add(location);
|
|
|
|
+ }
|
|
|
|
+ when(ozoneContainer.getLocations()).thenReturn(pathList);
|
|
VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
|
|
VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
|
|
- conf);
|
|
|
|
|
|
+ conf, ozoneContainer);
|
|
EndpointStateMachine.EndPointStates newState = versionTask.call();
|
|
EndpointStateMachine.EndPointStates newState = versionTask.call();
|
|
|
|
|
|
// if version call worked the endpoint should automatically move to the
|
|
// if version call worked the endpoint should automatically move to the
|
|
@@ -140,9 +162,131 @@ public class TestEndPoint {
|
|
|
|
|
|
// Now rpcEndpoint should remember the version it got from SCM
|
|
// Now rpcEndpoint should remember the version it got from SCM
|
|
Assert.assertNotNull(rpcEndPoint.getVersion());
|
|
Assert.assertNotNull(rpcEndPoint.getVersion());
|
|
|
|
+ FileUtil.fullyDelete(new File(path));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Test
|
|
|
|
+ public void testVersionCheckFail() throws Exception {
|
|
|
|
+ Configuration conf = SCMTestUtils.getConf();
|
|
|
|
+ String path = new FileSystemTestHelper().getTestRootDir();
|
|
|
|
+ conf.set(DFS_DATANODE_DATA_DIR_KEY, path);
|
|
|
|
+ try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
|
|
|
|
+ serverAddress, 1000)) {
|
|
|
|
+ rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
|
|
|
|
+ OzoneContainer ozoneContainer = mock(OzoneContainer.class);
|
|
|
|
+ List<StorageLocation> pathList = new ArrayList<>();
|
|
|
|
+ for (String dir : conf.getStrings(DFS_DATANODE_DATA_DIR_KEY)) {
|
|
|
|
+ StorageLocation location = StorageLocation.parse(dir);
|
|
|
|
+ pathList.add(location);
|
|
|
|
+ }
|
|
|
|
+ when(ozoneContainer.getLocations()).thenReturn(pathList);
|
|
|
|
+ VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
|
|
|
|
+ conf, ozoneContainer);
|
|
|
|
+ EndpointStateMachine.EndPointStates newState = versionTask.call();
|
|
|
|
+
|
|
|
|
+ // if version call worked the endpoint should automatically move to the
|
|
|
|
+ // next state.
|
|
|
|
+ Assert.assertEquals(EndpointStateMachine.EndPointStates.REGISTER,
|
|
|
|
+ newState);
|
|
|
|
+
|
|
|
|
+ // Now rpcEndpoint should remember the version it got from SCM
|
|
|
|
+ Assert.assertNotNull(rpcEndPoint.getVersion());
|
|
|
|
+
|
|
|
|
+ // Now call again version task with an incorrect layout version.
|
|
|
|
+ // This will fail with Incorrect layOutVersion error.
|
|
|
|
+ DatanodeVersionFile datanodeVersionFile = new DatanodeVersionFile(
|
|
|
|
+ scmServerImpl.getScmUuid().toString(), Time.now(), 2);
|
|
|
|
+ datanodeVersionFile.createVersionFile(DatanodeVersionFile
|
|
|
|
+ .getVersionFile(pathList.get(0), scmServerImpl.getScmUuid()
|
|
|
|
+ .toString()));
|
|
|
|
+ rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
|
|
|
|
+ versionTask.call();
|
|
|
|
+ fail("Test fail");
|
|
|
|
+ } catch(Throwable t) {
|
|
|
|
+ GenericTestUtils.assertExceptionContains("Incorrect layOutVersion", t);
|
|
|
|
+ FileUtil.fullyDelete(new File(path));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testVersionCheckSuccess() throws Exception {
|
|
|
|
+ Configuration conf = SCMTestUtils.getConf();
|
|
|
|
+ String path = new FileSystemTestHelper().getTestRootDir();
|
|
|
|
+ conf.set(DFS_DATANODE_DATA_DIR_KEY, path);
|
|
|
|
+ try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
|
|
|
|
+ serverAddress, 1000)) {
|
|
|
|
+ rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
|
|
|
|
+ OzoneContainer ozoneContainer = mock(OzoneContainer.class);
|
|
|
|
+ List<StorageLocation> pathList = new ArrayList<>();
|
|
|
|
+ for (String dir : conf.getStrings(DFS_DATANODE_DATA_DIR_KEY)) {
|
|
|
|
+ StorageLocation location = StorageLocation.parse(dir);
|
|
|
|
+ pathList.add(location);
|
|
|
|
+ }
|
|
|
|
+ when(ozoneContainer.getLocations()).thenReturn(pathList);
|
|
|
|
+ VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
|
|
|
|
+ conf, ozoneContainer);
|
|
|
|
+ EndpointStateMachine.EndPointStates newState = versionTask.call();
|
|
|
|
+
|
|
|
|
+ // if version call worked the endpoint should automatically move to the
|
|
|
|
+ // next state.
|
|
|
|
+ Assert.assertEquals(EndpointStateMachine.EndPointStates.REGISTER,
|
|
|
|
+ newState);
|
|
|
|
+
|
|
|
|
+ // Now rpcEndpoint should remember the version it got from SCM
|
|
|
|
+ Assert.assertNotNull(rpcEndPoint.getVersion());
|
|
|
|
+
|
|
|
|
+ // Now call again Version Task, this time version check should succeed.
|
|
|
|
+ rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
|
|
|
|
+ newState = versionTask.call();
|
|
|
|
+ Assert.assertEquals(EndpointStateMachine.EndPointStates.REGISTER,
|
|
|
|
+ newState);
|
|
|
|
+ FileUtil.fullyDelete(new File(path));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testVersionCheckFile() throws Exception {
|
|
|
|
+ Configuration conf = SCMTestUtils.getConf();
|
|
|
|
+ FileUtil.fullyDelete(new File("/tmp/hadoop"));
|
|
|
|
+ conf.set(DFS_DATANODE_DATA_DIR_KEY, "/tmp/hadoop");
|
|
|
|
+ try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
|
|
|
|
+ serverAddress, 1000)) {
|
|
|
|
+ rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
|
|
|
|
+ OzoneContainer ozoneContainer = mock(OzoneContainer.class);
|
|
|
|
+ List<StorageLocation> pathList = new ArrayList<>();
|
|
|
|
+ String dir = conf.get(DFS_DATANODE_DATA_DIR_KEY);
|
|
|
|
+ StorageLocation location = StorageLocation.parse(dir);
|
|
|
|
+ pathList.add(location);
|
|
|
|
+
|
|
|
|
+ when(ozoneContainer.getLocations()).thenReturn(pathList);
|
|
|
|
+ VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
|
|
|
|
+ conf, ozoneContainer);
|
|
|
|
+ EndpointStateMachine.EndPointStates newState = versionTask.call();
|
|
|
|
+
|
|
|
|
+ // if version call worked the endpoint should automatically move to the
|
|
|
|
+ // next state.
|
|
|
|
+ Assert.assertEquals(EndpointStateMachine.EndPointStates.REGISTER,
|
|
|
|
+ newState);
|
|
|
|
+
|
|
|
|
+ // Now rpcEndpoint should remember the version it got from SCM
|
|
|
|
+ Assert.assertNotNull(rpcEndPoint.getVersion());
|
|
|
|
+
|
|
|
|
+ // Check Version File created or not and content is expected or not.
|
|
|
|
+ File versionFile = DatanodeVersionFile.getVersionFile(pathList.get(0),
|
|
|
|
+ scmServerImpl.getScmUuid().toString());
|
|
|
|
+ Assert.assertTrue(versionFile.exists());
|
|
|
|
+
|
|
|
|
+ Properties props = DatanodeVersionFile.readFrom(versionFile);
|
|
|
|
+ DatanodeVersionFile.verifyCreationTime(props.getProperty(OzoneConsts
|
|
|
|
+ .CTIME));
|
|
|
|
+ DatanodeVersionFile.verifyScmUuid(scmServerImpl.getScmUuid().toString(),
|
|
|
|
+ props.getProperty(OzoneConsts.SCM_ID));
|
|
|
|
+ DatanodeVersionFile.verifyLayOutVersion(props.getProperty(OzoneConsts
|
|
|
|
+ .LAYOUTVERSION));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
@Test
|
|
@Test
|
|
/**
|
|
/**
|
|
* This test makes a call to end point where there is no SCM server. We
|
|
* This test makes a call to end point where there is no SCM server. We
|
|
@@ -152,11 +296,20 @@ public class TestEndPoint {
|
|
Configuration conf = SCMTestUtils.getConf();
|
|
Configuration conf = SCMTestUtils.getConf();
|
|
InetSocketAddress nonExistentServerAddress = SCMTestUtils
|
|
InetSocketAddress nonExistentServerAddress = SCMTestUtils
|
|
.getReuseableAddress();
|
|
.getReuseableAddress();
|
|
|
|
+ FileUtil.fullyDelete(new File("/tmp/hadoop"));
|
|
|
|
+ conf.set(DFS_DATANODE_DATA_DIR_KEY, "/tmp/hadoop");
|
|
try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
|
|
try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
|
|
nonExistentServerAddress, 1000)) {
|
|
nonExistentServerAddress, 1000)) {
|
|
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
|
|
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
|
|
|
|
+ OzoneContainer ozoneContainer = mock(OzoneContainer.class);
|
|
|
|
+ List<StorageLocation> pathList = new ArrayList<>();
|
|
|
|
+ for (String dir : conf.getStrings(DFS_DATANODE_DATA_DIR_KEY)) {
|
|
|
|
+ StorageLocation location = StorageLocation.parse(dir);
|
|
|
|
+ pathList.add(location);
|
|
|
|
+ }
|
|
|
|
+ when(ozoneContainer.getLocations()).thenReturn(pathList);
|
|
VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
|
|
VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
|
|
- conf);
|
|
|
|
|
|
+ conf, ozoneContainer);
|
|
EndpointStateMachine.EndPointStates newState = versionTask.call();
|
|
EndpointStateMachine.EndPointStates newState = versionTask.call();
|
|
|
|
|
|
// This version call did NOT work, so endpoint should remain in the same
|
|
// This version call did NOT work, so endpoint should remain in the same
|
|
@@ -176,12 +329,20 @@ public class TestEndPoint {
|
|
final long rpcTimeout = 1000;
|
|
final long rpcTimeout = 1000;
|
|
final long tolerance = 100;
|
|
final long tolerance = 100;
|
|
Configuration conf = SCMTestUtils.getConf();
|
|
Configuration conf = SCMTestUtils.getConf();
|
|
-
|
|
|
|
|
|
+ FileUtil.fullyDelete(new File("/tmp/hadoop"));
|
|
|
|
+ conf.set(DFS_DATANODE_DATA_DIR_KEY, "/tmp/hadoop");
|
|
try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
|
|
try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
|
|
serverAddress, (int) rpcTimeout)) {
|
|
serverAddress, (int) rpcTimeout)) {
|
|
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
|
|
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
|
|
|
|
+ OzoneContainer ozoneContainer = mock(OzoneContainer.class);
|
|
|
|
+ List<StorageLocation> pathList = new ArrayList<>();
|
|
|
|
+ for (String dir : conf.getStrings(DFS_DATANODE_DATA_DIR_KEY)) {
|
|
|
|
+ StorageLocation location = StorageLocation.parse(dir);
|
|
|
|
+ pathList.add(location);
|
|
|
|
+ }
|
|
|
|
+ when(ozoneContainer.getLocations()).thenReturn(pathList);
|
|
VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
|
|
VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
|
|
- conf);
|
|
|
|
|
|
+ conf, ozoneContainer);
|
|
|
|
|
|
scmServerImpl.setRpcResponseDelay(1500);
|
|
scmServerImpl.setRpcResponseDelay(1500);
|
|
long start = Time.monotonicNow();
|
|
long start = Time.monotonicNow();
|