|
@@ -22,23 +22,32 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
|
|
|
import org.apache.hadoop.hdds.client.ReplicationType;
|
|
|
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
|
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
|
|
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
|
|
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
|
|
|
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
|
|
|
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
|
|
|
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
|
|
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
|
|
+import org.apache.hadoop.ozone.OzoneConfigKeys;
|
|
|
import org.apache.hadoop.ozone.OzoneConsts;
|
|
|
import org.apache.hadoop.ozone.client.ObjectStore;
|
|
|
import org.apache.hadoop.ozone.client.OzoneClient;
|
|
|
import org.apache.hadoop.ozone.client.OzoneClientFactory;
|
|
|
-import org.apache.hadoop.ozone.client.OzoneKeyDetails;
|
|
|
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
|
|
|
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
|
|
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
|
|
|
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
|
|
|
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
|
|
|
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
|
|
|
+import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
|
|
|
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
|
|
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
|
|
-import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
|
|
|
+import org.apache.hadoop.ozone.om.exceptions.OMException;
|
|
|
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
|
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
|
+import org.apache.ratis.protocol.RaftRetryFailureException;
|
|
|
+import org.apache.ratis.protocol.StateMachineException;
|
|
|
+import org.apache.ratis.server.storage.FileInfo;
|
|
|
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
|
|
|
import org.junit.AfterClass;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.BeforeClass;
|
|
@@ -46,6 +55,7 @@ import org.junit.Test;
|
|
|
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
+import java.nio.file.Path;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -54,7 +64,8 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.
|
|
|
HDDS_COMMAND_STATUS_REPORT_INTERVAL;
|
|
|
import static org.apache.hadoop.hdds.HddsConfigKeys.
|
|
|
HDDS_CONTAINER_REPORT_INTERVAL;
|
|
|
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
|
|
|
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
|
|
|
+ ContainerDataProto.State.UNHEALTHY;
|
|
|
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
|
|
|
HDDS_SCM_WATCHER_TIMEOUT;
|
|
|
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
|
|
@@ -77,7 +88,7 @@ public class TestContainerStateMachineFailures {
|
|
|
private static String volumeName;
|
|
|
private static String bucketName;
|
|
|
private static String path;
|
|
|
- private static int chunkSize;
|
|
|
+ private static XceiverClientManager xceiverClientManager;
|
|
|
|
|
|
/**
|
|
|
* Create a MiniDFSCluster for testing.
|
|
@@ -101,6 +112,11 @@ public class TestContainerStateMachineFailures {
|
|
|
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
|
|
|
conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 10,
|
|
|
TimeUnit.SECONDS);
|
|
|
+ conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
|
|
|
+ conf.setTimeDuration(
|
|
|
+ OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY,
|
|
|
+ 1, TimeUnit.SECONDS);
|
|
|
+ conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1);
|
|
|
conf.setQuietMode(false);
|
|
|
cluster =
|
|
|
MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).setHbInterval(200)
|
|
@@ -109,6 +125,7 @@ public class TestContainerStateMachineFailures {
|
|
|
//the easiest way to create an open container is creating a key
|
|
|
client = OzoneClientFactory.getClient(conf);
|
|
|
objectStore = client.getObjectStore();
|
|
|
+ xceiverClientManager = new XceiverClientManager(conf);
|
|
|
volumeName = "testcontainerstatemachinefailures";
|
|
|
bucketName = volumeName;
|
|
|
objectStore.createVolume(volumeName);
|
|
@@ -132,19 +149,10 @@ public class TestContainerStateMachineFailures {
|
|
|
.createKey("ratis", 1024, ReplicationType.RATIS,
|
|
|
ReplicationFactor.ONE, new HashMap<>());
|
|
|
byte[] testData = "ratis".getBytes();
|
|
|
- long written = 0;
|
|
|
// First write and flush creates a container in the datanode
|
|
|
key.write(testData);
|
|
|
- written += testData.length;
|
|
|
key.flush();
|
|
|
key.write(testData);
|
|
|
- written += testData.length;
|
|
|
-
|
|
|
- //get the name of a valid container
|
|
|
- OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName).
|
|
|
- setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
|
|
|
- .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName("ratis")
|
|
|
- .build();
|
|
|
KeyOutputStream groupOutputStream =
|
|
|
(KeyOutputStream) key.getOutputStream();
|
|
|
List<OmKeyLocationInfo> locationInfoList =
|
|
@@ -157,7 +165,14 @@ public class TestContainerStateMachineFailures {
|
|
|
.getContainer().getContainerSet()
|
|
|
.getContainer(omKeyLocationInfo.getContainerID()).getContainerData()
|
|
|
.getContainerPath()));
|
|
|
- key.close();
|
|
|
+ try {
|
|
|
+ // there is only 1 datanode in the pipeline, the pipeline will be closed
|
|
|
+ // and allocation to new pipeline will fail as there is no other dn in
|
|
|
+ // the cluster
|
|
|
+ key.close();
|
|
|
+ } catch(IOException ioe) {
|
|
|
+ Assert.assertTrue(ioe instanceof OMException);
|
|
|
+ }
|
|
|
long containerID = omKeyLocationInfo.getContainerID();
|
|
|
|
|
|
// Make sure the container is marked unhealthy
|
|
@@ -179,22 +194,6 @@ public class TestContainerStateMachineFailures {
|
|
|
.getDatanodeStateMachine().getContainer();
|
|
|
Assert
|
|
|
.assertNull(ozoneContainer.getContainerSet().getContainer(containerID));
|
|
|
-
|
|
|
- OzoneKeyDetails keyDetails = objectStore.getVolume(volumeName)
|
|
|
- .getBucket(bucketName).getKey("ratis");
|
|
|
-
|
|
|
- /**
|
|
|
- * Ensure length of data stored in key is equal to number of bytes written.
|
|
|
- */
|
|
|
- Assert.assertTrue("Number of bytes stored in the key is not equal " +
|
|
|
- "to number of bytes written.", keyDetails.getDataSize() == written);
|
|
|
-
|
|
|
- /**
|
|
|
- * Pending data from the second write should get written to a new container
|
|
|
- * during key.close() because the first container is UNHEALTHY by that time
|
|
|
- */
|
|
|
- Assert.assertTrue("Expect Key to be stored in 2 separate containers",
|
|
|
- keyDetails.getOzoneKeyLocations().size() == 2);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -207,12 +206,6 @@ public class TestContainerStateMachineFailures {
|
|
|
key.write("ratis".getBytes());
|
|
|
key.flush();
|
|
|
key.write("ratis".getBytes());
|
|
|
-
|
|
|
- //get the name of a valid container
|
|
|
- OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName).
|
|
|
- setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
|
|
|
- .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName("ratis")
|
|
|
- .build();
|
|
|
KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
|
|
|
List<OmKeyLocationInfo> locationInfoList =
|
|
|
groupOutputStream.getLocationInfoList();
|
|
@@ -228,8 +221,14 @@ public class TestContainerStateMachineFailures {
|
|
|
(KeyValueContainerData) containerData;
|
|
|
// delete the container db file
|
|
|
FileUtil.fullyDelete(new File(keyValueContainerData.getChunksPath()));
|
|
|
-
|
|
|
- key.close();
|
|
|
+ try {
|
|
|
+ // there is only 1 datanode in the pipeline, the pipeline will be closed
|
|
|
+ // and allocation to new pipeline will fail as there is no other dn in
|
|
|
+ // the cluster
|
|
|
+ key.close();
|
|
|
+ } catch(IOException ioe) {
|
|
|
+ Assert.assertTrue(ioe instanceof OMException);
|
|
|
+ }
|
|
|
|
|
|
long containerID = omKeyLocationInfo.getContainerID();
|
|
|
|
|
@@ -270,4 +269,83 @@ public class TestContainerStateMachineFailures {
|
|
|
Assert.assertEquals(ContainerProtos.Result.CONTAINER_UNHEALTHY,
|
|
|
dispatcher.dispatch(request.build(), null).getResult());
|
|
|
}
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testApplyTransactionFailure() throws Exception {
|
|
|
+ OzoneOutputStream key =
|
|
|
+ objectStore.getVolume(volumeName).getBucket(bucketName)
|
|
|
+ .createKey("ratis", 1024, ReplicationType.RATIS,
|
|
|
+ ReplicationFactor.ONE, new HashMap<>());
|
|
|
+ // First write and flush creates a container in the datanode
|
|
|
+ key.write("ratis".getBytes());
|
|
|
+ key.flush();
|
|
|
+ key.write("ratis".getBytes());
|
|
|
+ KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
|
|
|
+ List<OmKeyLocationInfo> locationInfoList =
|
|
|
+ groupOutputStream.getLocationInfoList();
|
|
|
+ Assert.assertEquals(1, locationInfoList.size());
|
|
|
+ OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
|
|
|
+ ContainerData containerData =
|
|
|
+ cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
|
|
|
+ .getContainer().getContainerSet()
|
|
|
+ .getContainer(omKeyLocationInfo.getContainerID())
|
|
|
+ .getContainerData();
|
|
|
+ Assert.assertTrue(containerData instanceof KeyValueContainerData);
|
|
|
+ KeyValueContainerData keyValueContainerData =
|
|
|
+ (KeyValueContainerData) containerData;
|
|
|
+ key.close();
|
|
|
+ ContainerStateMachine stateMachine =
|
|
|
+ (ContainerStateMachine) ContainerTestHelper.getStateMachine(cluster);
|
|
|
+ SimpleStateMachineStorage storage =
|
|
|
+ (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
|
|
|
+ Path parentPath = storage.findLatestSnapshot().getFile().getPath();
|
|
|
+ // Since the snapshot threshold is set to 1, since there are
|
|
|
+ // applyTransactions, we should see snapshots
|
|
|
+ Assert.assertTrue(parentPath.getParent().toFile().listFiles().length > 0);
|
|
|
+ FileInfo snapshot = storage.findLatestSnapshot().getFile();
|
|
|
+ Assert.assertNotNull(snapshot);
|
|
|
+ long containerID = omKeyLocationInfo.getContainerID();
|
|
|
+ // delete the container db file
|
|
|
+ FileUtil.fullyDelete(new File(keyValueContainerData.getContainerPath()));
|
|
|
+ Pipeline pipeline = cluster.getStorageContainerLocationClient()
|
|
|
+ .getContainerWithPipeline(containerID).getPipeline();
|
|
|
+ XceiverClientSpi xceiverClient =
|
|
|
+ xceiverClientManager.acquireClient(pipeline);
|
|
|
+ ContainerProtos.ContainerCommandRequestProto.Builder request =
|
|
|
+ ContainerProtos.ContainerCommandRequestProto.newBuilder();
|
|
|
+ request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
|
|
|
+ request.setCmdType(ContainerProtos.Type.CloseContainer);
|
|
|
+ request.setContainerID(containerID);
|
|
|
+ request.setCloseContainer(
|
|
|
+ ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
|
|
|
+ // close container transaction will fail over Ratis and will initiate
|
|
|
+ // a pipeline close action
|
|
|
+
|
|
|
+ // Since the applyTransaction failure is propagated to Ratis,
|
|
|
+ // stateMachineUpdater will it exception while taking the next snapshot
|
|
|
+ // and should shutdown the RaftServerImpl. The client request will fail
|
|
|
+ // with RaftRetryFailureException.
|
|
|
+ try {
|
|
|
+ xceiverClient.sendCommand(request.build());
|
|
|
+ Assert.fail("Expected exception not thrown");
|
|
|
+ } catch (IOException e) {
|
|
|
+ Assert.assertTrue(HddsClientUtils
|
|
|
+ .checkForException(e) instanceof RaftRetryFailureException);
|
|
|
+ }
|
|
|
+ // Make sure the container is marked unhealthy
|
|
|
+ Assert.assertTrue(
|
|
|
+ cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
|
|
|
+ .getContainer().getContainerSet().getContainer(containerID)
|
|
|
+ .getContainerState()
|
|
|
+ == ContainerProtos.ContainerDataProto.State.UNHEALTHY);
|
|
|
+ try {
|
|
|
+ // try to take a new snapshot, ideally it should just fail
|
|
|
+ stateMachine.takeSnapshot();
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ Assert.assertTrue(ioe instanceof StateMachineException);
|
|
|
+ }
|
|
|
+ // Make sure the latest snapshot is same as the previous one
|
|
|
+ FileInfo latestSnapshot = storage.findLatestSnapshot().getFile();
|
|
|
+ Assert.assertTrue(snapshot.getPath().equals(latestSnapshot.getPath()));
|
|
|
+ }
|
|
|
}
|