|
@@ -19,77 +19,258 @@
|
|
|
package org.apache.hadoop.fs.s3a;
|
|
|
|
|
|
import java.io.FileNotFoundException;
|
|
|
+import java.io.IOException;
|
|
|
+import java.nio.charset.Charset;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collection;
|
|
|
+import java.util.Optional;
|
|
|
|
|
|
+import com.amazonaws.AmazonClientException;
|
|
|
+import com.amazonaws.services.s3.AmazonS3;
|
|
|
+import com.amazonaws.services.s3.model.CopyObjectRequest;
|
|
|
+import com.amazonaws.services.s3.model.CopyObjectResult;
|
|
|
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
|
|
|
+import com.amazonaws.services.s3.model.GetObjectRequest;
|
|
|
+import com.amazonaws.services.s3.model.ObjectMetadata;
|
|
|
+import com.amazonaws.services.s3.model.S3Object;
|
|
|
+import com.google.common.base.Charsets;
|
|
|
import org.junit.Assume;
|
|
|
import org.junit.Test;
|
|
|
import org.junit.runner.RunWith;
|
|
|
import org.junit.runners.Parameterized;
|
|
|
+import org.mockito.ArgumentMatchers;
|
|
|
+import org.mockito.Mockito;
|
|
|
+import org.mockito.invocation.InvocationOnMock;
|
|
|
+import org.mockito.stubbing.Answer;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import org.apache.commons.io.IOUtils;
|
|
|
+import org.apache.commons.lang3.tuple.Pair;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.Mode;
|
|
|
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.Source;
|
|
|
+import org.apache.hadoop.fs.s3a.s3guard.LocalMetadataStore;
|
|
|
+import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
|
|
|
+import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore;
|
|
|
+import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
|
|
|
|
|
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
|
|
|
+import static org.apache.hadoop.fs.contract.ContractTestUtils.readUTF8;
|
|
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
|
|
|
import static org.apache.hadoop.fs.s3a.Constants.*;
|
|
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
|
|
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
|
|
|
+import static org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.CHANGE_DETECTED;
|
|
|
+import static org.apache.hadoop.fs.s3a.select.SelectConstants.S3_SELECT_CAPABILITY;
|
|
|
+import static org.apache.hadoop.fs.s3a.select.SelectConstants.SELECT_SQL;
|
|
|
import static org.apache.hadoop.test.LambdaTestUtils.eventually;
|
|
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
|
|
+import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
|
|
|
+import static org.mockito.Mockito.doAnswer;
|
|
|
+import static org.mockito.Mockito.when;
|
|
|
|
|
|
/**
|
|
|
* Test S3A remote file change detection.
|
|
|
+ * This is a very parameterized test; the first three parameters
|
|
|
+ * define configuration options for the tests, while the final one
|
|
|
+ * declares the expected outcomes given those options.
|
|
|
+ *
|
|
|
+ * This test uses mocking to insert transient failures into the S3 client,
|
|
|
+ * underneath the S3A Filesystem instance.
|
|
|
+ *
|
|
|
+ * This is used to simulate eventual consistency, so force the change policy
|
|
|
+ * failure modes to be encountered.
|
|
|
+ *
|
|
|
+ * If changes are made to the filesystem such that the number of calls to
|
|
|
+ * operations such as {@link S3AFileSystem#getObjectMetadata(Path)} are
|
|
|
+ * changed, the number of failures which the mock layer must generate may
|
|
|
+ * change.
|
|
|
+ *
|
|
|
+ * As the S3Guard auth mode flag does control whether or not a HEAD is issued
|
|
|
+ * in a call to {@code getFileStatus()}; the test parameter {@link #authMode}
|
|
|
+ * is used to help predict this count.
|
|
|
+ *
|
|
|
+ * <i>Important:</i> if you are seeing failures in this test after changing
|
|
|
+ * one of the rename/copy/open operations, it may be that an increase,
|
|
|
+ * decrease or change in the number of low-level S3 HEAD/GET operations is
|
|
|
+ * triggering the failures.
|
|
|
+ * Please review the changes to see that you haven't unintentionally done this.
|
|
|
+ * If it is intentional, please update the parameters here.
|
|
|
+ *
|
|
|
+ * If you are seeing failures without such a change, and nobody else is,
|
|
|
+ * it is likely that you have a different bucket configuration option which
|
|
|
+ * is somehow triggering a regression. If you can work out which option
|
|
|
+ * this is, then extend {@link #createConfiguration()} to reset that parameter
|
|
|
+ * too.
|
|
|
+ *
|
|
|
+ * Note: to help debug these issues, set the log for this to DEBUG:
|
|
|
+ * <pre>
|
|
|
+ * log4j.logger.org.apache.hadoop.fs.s3a.ITestS3ARemoteFileChanged=DEBUG
|
|
|
+ * </pre>
|
|
|
+ * The debug information printed will include a trace of where operations
|
|
|
+ * are being called from, to help understand why the test is failing.
|
|
|
*/
|
|
|
@RunWith(Parameterized.class)
|
|
|
public class ITestS3ARemoteFileChanged extends AbstractS3ATestBase {
|
|
|
+
|
|
|
private static final Logger LOG =
|
|
|
LoggerFactory.getLogger(ITestS3ARemoteFileChanged.class);
|
|
|
|
|
|
+ private static final String TEST_DATA = "Some test data";
|
|
|
+
|
|
|
+ private static final byte[] TEST_DATA_BYTES = TEST_DATA.getBytes(
|
|
|
+ Charsets.UTF_8);
|
|
|
+ private static final int TEST_MAX_RETRIES = 5;
|
|
|
+ private static final String TEST_RETRY_INTERVAL = "10ms";
|
|
|
+ private static final String QUOTED_TEST_DATA =
|
|
|
+ "\"" + TEST_DATA + "\"";
|
|
|
+
|
|
|
+ private Optional<AmazonS3> originalS3Client = Optional.empty();
|
|
|
+
|
|
|
+ private enum InteractionType {
|
|
|
+ READ,
|
|
|
+ READ_AFTER_DELETE,
|
|
|
+ EVENTUALLY_CONSISTENT_READ,
|
|
|
+ COPY,
|
|
|
+ EVENTUALLY_CONSISTENT_COPY,
|
|
|
+ EVENTUALLY_CONSISTENT_METADATA,
|
|
|
+ SELECT,
|
|
|
+ EVENTUALLY_CONSISTENT_SELECT
|
|
|
+ }
|
|
|
+
|
|
|
private final String changeDetectionSource;
|
|
|
private final String changeDetectionMode;
|
|
|
- private final boolean expectChangeException;
|
|
|
- private final boolean expectFileNotFoundException;
|
|
|
+ private final boolean authMode;
|
|
|
+ private final Collection<InteractionType> expectedExceptionInteractions;
|
|
|
+ private S3AFileSystem fs;
|
|
|
|
|
|
- @Parameterized.Parameters
|
|
|
+ /**
|
|
|
+ * Test parameters.
|
|
|
+ * <ol>
|
|
|
+ * <li>Change detection source: etag or version.</li>
|
|
|
+ * <li>Change detection policy: server, client, client+warn, none</li>
|
|
|
+ * <li>Whether to enable auth mode on the filesystem.</li>
|
|
|
+ * <li>Expected outcomes.</li>
|
|
|
+ * </ol>
|
|
|
+ * @return the test configuration.
|
|
|
+ */
|
|
|
+ @Parameterized.Parameters(name = "{0}-{1}-auth-{2}")
|
|
|
public static Collection<Object[]> params() {
|
|
|
return Arrays.asList(new Object[][]{
|
|
|
// make sure it works with invalid config
|
|
|
- {"bogus", "bogus", true, true},
|
|
|
+ {"bogus", "bogus",
|
|
|
+ true,
|
|
|
+ Arrays.asList(
|
|
|
+ InteractionType.READ,
|
|
|
+ InteractionType.READ_AFTER_DELETE,
|
|
|
+ InteractionType.EVENTUALLY_CONSISTENT_READ,
|
|
|
+ InteractionType.COPY,
|
|
|
+ InteractionType.EVENTUALLY_CONSISTENT_COPY,
|
|
|
+ InteractionType.EVENTUALLY_CONSISTENT_METADATA,
|
|
|
+ InteractionType.SELECT,
|
|
|
+ InteractionType.EVENTUALLY_CONSISTENT_SELECT)},
|
|
|
|
|
|
// test with etag
|
|
|
- {CHANGE_DETECT_SOURCE_ETAG, CHANGE_DETECT_MODE_SERVER, true, true},
|
|
|
- {CHANGE_DETECT_SOURCE_ETAG, CHANGE_DETECT_MODE_CLIENT, true, true},
|
|
|
- {CHANGE_DETECT_SOURCE_ETAG, CHANGE_DETECT_MODE_WARN, false, true},
|
|
|
- {CHANGE_DETECT_SOURCE_ETAG, CHANGE_DETECT_MODE_NONE, false, true},
|
|
|
+ {CHANGE_DETECT_SOURCE_ETAG, CHANGE_DETECT_MODE_SERVER,
|
|
|
+ true,
|
|
|
+ Arrays.asList(
|
|
|
+ InteractionType.READ,
|
|
|
+ InteractionType.READ_AFTER_DELETE,
|
|
|
+ InteractionType.EVENTUALLY_CONSISTENT_READ,
|
|
|
+ InteractionType.COPY,
|
|
|
+ InteractionType.EVENTUALLY_CONSISTENT_COPY,
|
|
|
+ InteractionType.EVENTUALLY_CONSISTENT_METADATA,
|
|
|
+ InteractionType.SELECT,
|
|
|
+ InteractionType.EVENTUALLY_CONSISTENT_SELECT)},
|
|
|
+ {CHANGE_DETECT_SOURCE_ETAG, CHANGE_DETECT_MODE_CLIENT,
|
|
|
+ false,
|
|
|
+ Arrays.asList(
|
|
|
+ InteractionType.READ,
|
|
|
+ InteractionType.EVENTUALLY_CONSISTENT_READ,
|
|
|
+ InteractionType.READ_AFTER_DELETE,
|
|
|
+ InteractionType.COPY,
|
|
|
+ // not InteractionType.EVENTUALLY_CONSISTENT_COPY as copy change
|
|
|
+ // detection can't really occur client-side. The eTag of
|
|
|
+ // the new object can't be expected to match.
|
|
|
+ InteractionType.EVENTUALLY_CONSISTENT_METADATA,
|
|
|
+ InteractionType.SELECT,
|
|
|
+ InteractionType.EVENTUALLY_CONSISTENT_SELECT)},
|
|
|
+ {CHANGE_DETECT_SOURCE_ETAG, CHANGE_DETECT_MODE_WARN,
|
|
|
+ false,
|
|
|
+ Arrays.asList(
|
|
|
+ InteractionType.READ_AFTER_DELETE)},
|
|
|
+ {CHANGE_DETECT_SOURCE_ETAG, CHANGE_DETECT_MODE_NONE,
|
|
|
+ false,
|
|
|
+ Arrays.asList(
|
|
|
+ InteractionType.READ_AFTER_DELETE)},
|
|
|
|
|
|
// test with versionId
|
|
|
- // when using server-side versionId, the exceptions shouldn't happen
|
|
|
- // since the previous version will still be available
|
|
|
- {CHANGE_DETECT_SOURCE_VERSION_ID, CHANGE_DETECT_MODE_SERVER, false,
|
|
|
- false},
|
|
|
+ // when using server-side versionId, the exceptions
|
|
|
+ // shouldn't happen since the previous version will still be available
|
|
|
+ {CHANGE_DETECT_SOURCE_VERSION_ID, CHANGE_DETECT_MODE_SERVER,
|
|
|
+ true,
|
|
|
+ Arrays.asList(
|
|
|
+ InteractionType.EVENTUALLY_CONSISTENT_READ,
|
|
|
+ InteractionType.EVENTUALLY_CONSISTENT_COPY,
|
|
|
+ InteractionType.EVENTUALLY_CONSISTENT_METADATA,
|
|
|
+ InteractionType.EVENTUALLY_CONSISTENT_SELECT)},
|
|
|
|
|
|
// with client-side versionId it will behave similar to client-side eTag
|
|
|
- {CHANGE_DETECT_SOURCE_VERSION_ID, CHANGE_DETECT_MODE_CLIENT, true,
|
|
|
- true},
|
|
|
+ {CHANGE_DETECT_SOURCE_VERSION_ID, CHANGE_DETECT_MODE_CLIENT,
|
|
|
+ false,
|
|
|
+ Arrays.asList(
|
|
|
+ InteractionType.READ,
|
|
|
+ InteractionType.READ_AFTER_DELETE,
|
|
|
+ InteractionType.EVENTUALLY_CONSISTENT_READ,
|
|
|
+ InteractionType.COPY,
|
|
|
+ // not InteractionType.EVENTUALLY_CONSISTENT_COPY as copy change
|
|
|
+ // detection can't really occur client-side. The versionId of
|
|
|
+ // the new object can't be expected to match.
|
|
|
+ InteractionType.EVENTUALLY_CONSISTENT_METADATA,
|
|
|
+ InteractionType.SELECT,
|
|
|
+ InteractionType.EVENTUALLY_CONSISTENT_SELECT)},
|
|
|
|
|
|
- {CHANGE_DETECT_SOURCE_VERSION_ID, CHANGE_DETECT_MODE_WARN, false, true},
|
|
|
- {CHANGE_DETECT_SOURCE_VERSION_ID, CHANGE_DETECT_MODE_NONE, false, true}
|
|
|
+ {CHANGE_DETECT_SOURCE_VERSION_ID, CHANGE_DETECT_MODE_WARN,
|
|
|
+ true,
|
|
|
+ Arrays.asList(
|
|
|
+ InteractionType.READ_AFTER_DELETE)},
|
|
|
+ {CHANGE_DETECT_SOURCE_VERSION_ID, CHANGE_DETECT_MODE_NONE,
|
|
|
+ false,
|
|
|
+ Arrays.asList(
|
|
|
+ InteractionType.READ_AFTER_DELETE)}
|
|
|
});
|
|
|
}
|
|
|
|
|
|
public ITestS3ARemoteFileChanged(String changeDetectionSource,
|
|
|
String changeDetectionMode,
|
|
|
- boolean expectException,
|
|
|
- boolean expectFileNotFoundException) {
|
|
|
+ boolean authMode,
|
|
|
+ Collection<InteractionType> expectedExceptionInteractions) {
|
|
|
this.changeDetectionSource = changeDetectionSource;
|
|
|
this.changeDetectionMode = changeDetectionMode;
|
|
|
- this.expectChangeException = expectException;
|
|
|
- this.expectFileNotFoundException = expectFileNotFoundException;
|
|
|
+ this.authMode = authMode;
|
|
|
+ this.expectedExceptionInteractions = expectedExceptionInteractions;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void setup() throws Exception {
|
|
|
+ super.setup();
|
|
|
+ // skip all versioned checks if the remote FS doesn't do
|
|
|
+ // versions.
|
|
|
+ fs = getFileSystem();
|
|
|
+ skipIfVersionPolicyAndNoVersionId();
|
|
|
+ // cache the original S3 client for teardown.
|
|
|
+ originalS3Client = Optional.of(
|
|
|
+ fs.getAmazonS3ClientForTesting("caching"));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void teardown() throws Exception {
|
|
|
+ // restore the s3 client so there's no mocking interfering with the teardown
|
|
|
+ originalS3Client.ifPresent(fs::setAmazonS3Client);
|
|
|
+ super.teardown();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -98,33 +279,65 @@ public class ITestS3ARemoteFileChanged extends AbstractS3ATestBase {
|
|
|
String bucketName = getTestBucketName(conf);
|
|
|
removeBucketOverrides(bucketName, conf,
|
|
|
CHANGE_DETECT_SOURCE,
|
|
|
- CHANGE_DETECT_MODE);
|
|
|
+ CHANGE_DETECT_MODE,
|
|
|
+ RETRY_LIMIT,
|
|
|
+ RETRY_INTERVAL,
|
|
|
+ METADATASTORE_AUTHORITATIVE);
|
|
|
conf.set(CHANGE_DETECT_SOURCE, changeDetectionSource);
|
|
|
conf.set(CHANGE_DETECT_MODE, changeDetectionMode);
|
|
|
+ conf.setBoolean(METADATASTORE_AUTHORITATIVE, authMode);
|
|
|
+
|
|
|
+ // reduce retry limit so FileNotFoundException cases timeout faster,
|
|
|
+ // speeding up the tests
|
|
|
+ conf.setInt(RETRY_LIMIT, TEST_MAX_RETRIES);
|
|
|
+ conf.set(RETRY_INTERVAL, TEST_RETRY_INTERVAL);
|
|
|
+
|
|
|
+ if (conf.getClass(S3_METADATA_STORE_IMPL, MetadataStore.class) ==
|
|
|
+ NullMetadataStore.class) {
|
|
|
+ LOG.debug("Enabling local S3Guard metadata store");
|
|
|
+ // favor LocalMetadataStore over NullMetadataStore
|
|
|
+ conf.setClass(S3_METADATA_STORE_IMPL,
|
|
|
+ LocalMetadataStore.class, MetadataStore.class);
|
|
|
+ }
|
|
|
S3ATestUtils.disableFilesystemCaching(conf);
|
|
|
return conf;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Get the path of this method, including parameterized values.
|
|
|
+ * @return a path unique to this method and parameters
|
|
|
+ * @throws IOException failure.
|
|
|
+ */
|
|
|
+ protected Path path() throws IOException {
|
|
|
+ return super.path(getMethodName());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * How many HEAD requests are made in a call to
|
|
|
+ * {@link S3AFileSystem#getFileStatus(Path)}?
|
|
|
+ * @return a number >= 0.
|
|
|
+ */
|
|
|
+ private int getFileStatusHeadCount() {
|
|
|
+ return authMode ? 0 : 1;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Tests reading a file that is changed while the reader's InputStream is
|
|
|
+ * open.
|
|
|
+ */
|
|
|
@Test
|
|
|
- public void testReadFileChanged() throws Throwable {
|
|
|
+ public void testReadFileChangedStreamOpen() throws Throwable {
|
|
|
+ describe("Tests reading a file that is changed while the reader's "
|
|
|
+ + "InputStream is open.");
|
|
|
final int originalLength = 8192;
|
|
|
final byte[] originalDataset = dataset(originalLength, 'a', 32);
|
|
|
final int newLength = originalLength + 1;
|
|
|
final byte[] newDataset = dataset(newLength, 'A', 32);
|
|
|
- final S3AFileSystem fs = getFileSystem();
|
|
|
final Path testpath = path("readFileToChange.txt");
|
|
|
// initial write
|
|
|
writeDataset(fs, testpath, originalDataset, originalDataset.length,
|
|
|
1024, false);
|
|
|
|
|
|
- if (fs.getChangeDetectionPolicy().getSource() == Source.VersionId) {
|
|
|
- // skip versionId tests if the bucket doesn't have object versioning
|
|
|
- // enabled
|
|
|
- Assume.assumeTrue(
|
|
|
- "Target filesystem does not support versioning",
|
|
|
- fs.getObjectMetadata(fs.pathToKey(testpath)).getVersionId() != null);
|
|
|
- }
|
|
|
-
|
|
|
try(FSDataInputStream instream = fs.open(testpath)) {
|
|
|
// seek forward and read successfully
|
|
|
instream.seek(1024);
|
|
@@ -152,9 +365,8 @@ public class ITestS3ARemoteFileChanged extends AbstractS3ATestBase {
|
|
|
// now check seek backward
|
|
|
instream.seek(instream.getPos() - 100);
|
|
|
|
|
|
- if (expectChangeException) {
|
|
|
- intercept(RemoteFileChangedException.class, "", "read",
|
|
|
- () -> instream.read());
|
|
|
+ if (expectedExceptionInteractions.contains(InteractionType.READ)) {
|
|
|
+ expectReadFailure(instream);
|
|
|
} else {
|
|
|
instream.read();
|
|
|
}
|
|
@@ -164,9 +376,8 @@ public class ITestS3ARemoteFileChanged extends AbstractS3ATestBase {
|
|
|
// seek backward
|
|
|
instream.seek(0);
|
|
|
|
|
|
- if (expectChangeException) {
|
|
|
- intercept(RemoteFileChangedException.class, "", "read",
|
|
|
- () -> instream.read(buf));
|
|
|
+ if (expectedExceptionInteractions.contains(InteractionType.READ)) {
|
|
|
+ expectReadFailure(instream);
|
|
|
intercept(RemoteFileChangedException.class, "", "read",
|
|
|
() -> instream.read(0, buf, 0, buf.length));
|
|
|
intercept(RemoteFileChangedException.class, "", "readfully",
|
|
@@ -183,7 +394,8 @@ public class ITestS3ARemoteFileChanged extends AbstractS3ATestBase {
|
|
|
// seek backward
|
|
|
instream.seek(0);
|
|
|
|
|
|
- if (expectFileNotFoundException) {
|
|
|
+ if (expectedExceptionInteractions.contains(
|
|
|
+ InteractionType.READ_AFTER_DELETE)) {
|
|
|
intercept(FileNotFoundException.class, "", "read()",
|
|
|
() -> instream.read());
|
|
|
intercept(FileNotFoundException.class, "", "readfully",
|
|
@@ -194,4 +406,890 @@ public class ITestS3ARemoteFileChanged extends AbstractS3ATestBase {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Tests reading a file where the version visible in S3 does not match the
|
|
|
+ * version tracked in the metadata store.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testReadFileChangedOutOfSyncMetadata() throws Throwable {
|
|
|
+ final Path testpath = writeOutOfSyncFileVersion("fileChangedOutOfSync.dat");
|
|
|
+
|
|
|
+ try (FSDataInputStream instream = fs.open(testpath)) {
|
|
|
+ if (expectedExceptionInteractions.contains(InteractionType.READ)) {
|
|
|
+ expectReadFailure(instream);
|
|
|
+ } else {
|
|
|
+ instream.read();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Ensures a file can be read when there is no version metadata
|
|
|
+ * (ETag, versionId).
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testReadWithNoVersionMetadata() throws Throwable {
|
|
|
+ final Path testpath = writeFileWithNoVersionMetadata("readnoversion.dat");
|
|
|
+
|
|
|
+ assertEquals("Contents of " + testpath,
|
|
|
+ TEST_DATA,
|
|
|
+ readUTF8(fs, testpath, -1));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Tests using S3 Select on a file where the version visible in S3 does not
|
|
|
+ * match the version tracked in the metadata store.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testSelectChangedFile() throws Throwable {
|
|
|
+ requireS3Select();
|
|
|
+ final Path testpath = writeOutOfSyncFileVersion("select.dat");
|
|
|
+
|
|
|
+ if (expectedExceptionInteractions.contains(InteractionType.SELECT)) {
|
|
|
+ interceptFuture(RemoteFileChangedException.class, "select",
|
|
|
+ fs.openFile(testpath)
|
|
|
+ .must(SELECT_SQL, "SELECT * FROM S3OBJECT").build());
|
|
|
+ } else {
|
|
|
+ fs.openFile(testpath)
|
|
|
+ .must(SELECT_SQL, "SELECT * FROM S3OBJECT")
|
|
|
+ .build()
|
|
|
+ .get()
|
|
|
+ .close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Tests using S3 Select on a file where the version visible in S3 does not
|
|
|
+ * initially match the version tracked in the metadata store, but eventually
|
|
|
+ * (after retries) does.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testSelectEventuallyConsistentFile() throws Throwable {
|
|
|
+ describe("Eventually Consistent S3 Select");
|
|
|
+ requireS3Guard();
|
|
|
+ requireS3Select();
|
|
|
+ AmazonS3 s3ClientSpy = spyOnFilesystem();
|
|
|
+
|
|
|
+ final Path testpath1 = writeEventuallyConsistentFileVersion(
|
|
|
+ "select1.dat", s3ClientSpy, 0, TEST_MAX_RETRIES, 0);
|
|
|
+
|
|
|
+ // should succeed since the inconsistency doesn't last longer than the
|
|
|
+ // configured retry limit
|
|
|
+ fs.openFile(testpath1)
|
|
|
+ .must(SELECT_SQL, "SELECT * FROM S3OBJECT")
|
|
|
+ .build()
|
|
|
+ .get()
|
|
|
+ .close();
|
|
|
+
|
|
|
+ // select() makes a getFileStatus() call before the consistency checking
|
|
|
+ // that will match the stub. As such, we need an extra inconsistency here
|
|
|
+ // to cross the threshold
|
|
|
+ int getMetadataInconsistencyCount = TEST_MAX_RETRIES + 2;
|
|
|
+ final Path testpath2 = writeEventuallyConsistentFileVersion(
|
|
|
+ "select2.dat", s3ClientSpy, 0, getMetadataInconsistencyCount, 0);
|
|
|
+
|
|
|
+ if (expectedExceptionInteractions.contains(
|
|
|
+ InteractionType.EVENTUALLY_CONSISTENT_SELECT)) {
|
|
|
+ // should fail since the inconsistency lasts longer than the configured
|
|
|
+ // retry limit
|
|
|
+ interceptFuture(RemoteFileChangedException.class, "select",
|
|
|
+ fs.openFile(testpath2)
|
|
|
+ .must(SELECT_SQL, "SELECT * FROM S3OBJECT").build());
|
|
|
+ } else {
|
|
|
+ fs.openFile(testpath2)
|
|
|
+ .must(SELECT_SQL, "SELECT * FROM S3OBJECT")
|
|
|
+ .build()
|
|
|
+ .get()
|
|
|
+ .close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Ensures a file can be read via S3 Select when there is no version metadata
|
|
|
+ * (ETag, versionId).
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testSelectWithNoVersionMetadata() throws Throwable {
|
|
|
+ requireS3Select();
|
|
|
+ final Path testpath =
|
|
|
+ writeFileWithNoVersionMetadata("selectnoversion.dat");
|
|
|
+
|
|
|
+ try (FSDataInputStream instream = fs.openFile(testpath)
|
|
|
+ .must(SELECT_SQL, "SELECT * FROM S3OBJECT").build().get()) {
|
|
|
+ assertEquals(QUOTED_TEST_DATA,
|
|
|
+ IOUtils.toString(instream, Charset.forName("UTF-8")).trim());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Tests doing a rename() on a file where the version visible in S3 does not
|
|
|
+ * match the version tracked in the metadata store.
|
|
|
+ * @throws Throwable failure
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testRenameChangedFile() throws Throwable {
|
|
|
+ final Path testpath = writeOutOfSyncFileVersion("rename.dat");
|
|
|
+
|
|
|
+ final Path dest = path("dest.dat");
|
|
|
+ if (expectedExceptionInteractions.contains(InteractionType.COPY)) {
|
|
|
+ intercept(RemoteFileChangedException.class, "",
|
|
|
+ "expected copy() failure",
|
|
|
+ () -> fs.rename(testpath, dest));
|
|
|
+ } else {
|
|
|
+ fs.rename(testpath, dest);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Inconsistent response counts for getObjectMetadata() and
|
|
|
+ * copyObject() for a rename.
|
|
|
+ * @param metadataCallsExpectedBeforeRetryLoop number of getObjectMetadata
|
|
|
+ * calls expected before the consistency checking retry loop
|
|
|
+ * @return the inconsistencies for (metadata, copy)
|
|
|
+ */
|
|
|
+ private Pair<Integer, Integer> renameInconsistencyCounts(
|
|
|
+ int metadataCallsExpectedBeforeRetryLoop) {
|
|
|
+ int metadataInconsistencyCount = TEST_MAX_RETRIES
|
|
|
+ + metadataCallsExpectedBeforeRetryLoop;
|
|
|
+ int copyInconsistencyCount =
|
|
|
+ versionCheckingIsOnServer() ? TEST_MAX_RETRIES : 0;
|
|
|
+
|
|
|
+ return Pair.of(metadataInconsistencyCount, copyInconsistencyCount);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Tests doing a rename() on a file where the version visible in S3 does not
|
|
|
+ * match the version in the metadata store until a certain number of retries
|
|
|
+ * has been met.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testRenameEventuallyConsistentFile() throws Throwable {
|
|
|
+ requireS3Guard();
|
|
|
+ AmazonS3 s3ClientSpy = spyOnFilesystem();
|
|
|
+
|
|
|
+ // Total inconsistent response count across getObjectMetadata() and
|
|
|
+ // copyObject().
|
|
|
+ // The split of inconsistent responses between getObjectMetadata() and
|
|
|
+ // copyObject() is arbitrary.
|
|
|
+ Pair<Integer, Integer> counts = renameInconsistencyCounts(
|
|
|
+ getFileStatusHeadCount());
|
|
|
+ int metadataInconsistencyCount = counts.getLeft();
|
|
|
+ int copyInconsistencyCount = counts.getRight();
|
|
|
+ final Path testpath1 =
|
|
|
+ writeEventuallyConsistentFileVersion("rename-eventually1.dat",
|
|
|
+ s3ClientSpy,
|
|
|
+ 0,
|
|
|
+ metadataInconsistencyCount,
|
|
|
+ copyInconsistencyCount);
|
|
|
+
|
|
|
+ final Path dest1 = path("dest1.dat");
|
|
|
+ // shouldn't fail since the inconsistency doesn't last through the
|
|
|
+ // configured retry limit
|
|
|
+ fs.rename(testpath1, dest1);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Tests doing a rename() on a file where the version visible in S3 does not
|
|
|
+ * match the version in the metadata store until a certain number of retries
|
|
|
+ * has been met.
|
|
|
+ * The test expects failure by AWSClientIOException caused by NPE due to
|
|
|
+ * https://github.com/aws/aws-sdk-java/issues/1644
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testRenameEventuallyConsistentFileNPE() throws Throwable {
|
|
|
+ requireS3Guard();
|
|
|
+ skipIfVersionPolicyAndNoVersionId();
|
|
|
+ AmazonS3 s3ClientSpy = spyOnFilesystem();
|
|
|
+
|
|
|
+ Pair<Integer, Integer> counts = renameInconsistencyCounts(
|
|
|
+ getFileStatusHeadCount());
|
|
|
+ int metadataInconsistencyCount = counts.getLeft();
|
|
|
+ int copyInconsistencyCount = counts.getRight();
|
|
|
+ // giving copyInconsistencyCount + 1 here should trigger the failure,
|
|
|
+ // exceeding the retry limit
|
|
|
+ final Path testpath2 =
|
|
|
+ writeEventuallyConsistentFileVersion("rename-eventuallyNPE.dat",
|
|
|
+ s3ClientSpy,
|
|
|
+ 0,
|
|
|
+ metadataInconsistencyCount,
|
|
|
+ copyInconsistencyCount + 1);
|
|
|
+ final Path dest2 = path("destNPE.dat");
|
|
|
+ if (expectedExceptionInteractions.contains(
|
|
|
+ InteractionType.EVENTUALLY_CONSISTENT_COPY)) {
|
|
|
+ // should fail since the inconsistency is set up to persist longer than
|
|
|
+ // the configured retry limit
|
|
|
+ // the expected exception is not RemoteFileChangedException due to
|
|
|
+ // https://github.com/aws/aws-sdk-java/issues/1644
|
|
|
+ // If this test is failing after an AWS SDK update,
|
|
|
+ // then it means the SDK bug is fixed.
|
|
|
+ // Please update this test to match the new behavior.
|
|
|
+ AWSClientIOException exception =
|
|
|
+ intercept(AWSClientIOException.class,
|
|
|
+ "Unable to complete transfer: null",
|
|
|
+ "expected copy() failure",
|
|
|
+ () -> fs.rename(testpath2, dest2));
|
|
|
+ AmazonClientException cause = exception.getCause();
|
|
|
+ if (cause == null) {
|
|
|
+ // no cause; something else went wrong: throw.
|
|
|
+ throw new AssertionError("No inner cause",
|
|
|
+ exception);
|
|
|
+ }
|
|
|
+ Throwable causeCause = cause.getCause();
|
|
|
+ if (!(causeCause instanceof NullPointerException)) {
|
|
|
+ // null causeCause or it is the wrong type: throw
|
|
|
+ throw new AssertionError("Innermost cause is not NPE",
|
|
|
+ exception);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ fs.rename(testpath2, dest2);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Tests doing a rename() on a file where the version visible in S3 does not
|
|
|
+ * match the version in the metadata store until a certain number of retries
|
|
|
+ * has been met.
|
|
|
+ * The test expects failure by RemoteFileChangedException.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testRenameEventuallyConsistentFileRFCE() throws Throwable {
|
|
|
+ requireS3Guard();
|
|
|
+ skipIfVersionPolicyAndNoVersionId();
|
|
|
+ AmazonS3 s3ClientSpy = spyOnFilesystem();
|
|
|
+
|
|
|
+ Pair<Integer, Integer> counts = renameInconsistencyCounts(
|
|
|
+ getFileStatusHeadCount());
|
|
|
+ int metadataInconsistencyCount = counts.getLeft();
|
|
|
+ int copyInconsistencyCount = counts.getRight();
|
|
|
+ // giving metadataInconsistencyCount + 1 here should trigger the failure,
|
|
|
+ // exceeding the retry limit
|
|
|
+ final Path testpath2 =
|
|
|
+ writeEventuallyConsistentFileVersion("rename-eventuallyRFCE.dat",
|
|
|
+ s3ClientSpy,
|
|
|
+ 0,
|
|
|
+ metadataInconsistencyCount + 1,
|
|
|
+ copyInconsistencyCount);
|
|
|
+ final Path dest2 = path("destRFCE.dat");
|
|
|
+ if (expectedExceptionInteractions.contains(
|
|
|
+ InteractionType.EVENTUALLY_CONSISTENT_METADATA)) {
|
|
|
+ // should fail since the inconsistency is set up to persist longer than
|
|
|
+ // the configured retry limit
|
|
|
+ intercept(RemoteFileChangedException.class,
|
|
|
+ CHANGE_DETECTED,
|
|
|
+ "expected copy() failure",
|
|
|
+ () -> fs.rename(testpath2, dest2));
|
|
|
+ } else {
|
|
|
+ fs.rename(testpath2, dest2);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Tests doing a rename() on a directory containing
|
|
|
+ * an file which is eventually consistent.
|
|
|
+ * There is no call to getFileStatus on the source file whose
|
|
|
+ * inconsistency is simulated; the state of S3Guard auth mode is not
|
|
|
+ * relevant.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testRenameEventuallyConsistentDirectory() throws Throwable {
|
|
|
+ requireS3Guard();
|
|
|
+ AmazonS3 s3ClientSpy = spyOnFilesystem();
|
|
|
+ Path basedir = path();
|
|
|
+ Path sourcedir = new Path(basedir, "sourcedir");
|
|
|
+ fs.mkdirs(sourcedir);
|
|
|
+ Path destdir = new Path(basedir, "destdir");
|
|
|
+ String inconsistent = "inconsistent";
|
|
|
+ String consistent = "consistent";
|
|
|
+ Path inconsistentFile = new Path(sourcedir, inconsistent);
|
|
|
+ Path consistentFile = new Path(sourcedir, consistent);
|
|
|
+
|
|
|
+ // write the consistent data
|
|
|
+ writeDataset(fs, consistentFile, TEST_DATA_BYTES, TEST_DATA_BYTES.length,
|
|
|
+ 1024, true, true);
|
|
|
+
|
|
|
+ Pair<Integer, Integer> counts = renameInconsistencyCounts(0);
|
|
|
+ int metadataInconsistencyCount = counts.getLeft();
|
|
|
+ int copyInconsistencyCount = counts.getRight();
|
|
|
+
|
|
|
+ writeEventuallyConsistentData(
|
|
|
+ s3ClientSpy,
|
|
|
+ inconsistentFile,
|
|
|
+ TEST_DATA_BYTES,
|
|
|
+ 0,
|
|
|
+ metadataInconsistencyCount,
|
|
|
+ copyInconsistencyCount);
|
|
|
+
|
|
|
+ // must not fail since the inconsistency doesn't last through the
|
|
|
+ // configured retry limit
|
|
|
+ fs.rename(sourcedir, destdir);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Ensures a file can be renamed when there is no version metadata
|
|
|
+ * (ETag, versionId).
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testRenameWithNoVersionMetadata() throws Throwable {
|
|
|
+ final Path testpath =
|
|
|
+ writeFileWithNoVersionMetadata("renamenoversion.dat");
|
|
|
+
|
|
|
+ final Path dest = path("noversiondest.dat");
|
|
|
+ fs.rename(testpath, dest);
|
|
|
+ assertEquals("Contents of " + dest,
|
|
|
+ TEST_DATA,
|
|
|
+ readUTF8(fs, dest, -1));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Ensures S3Guard and retries allow an eventually consistent read.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testReadAfterEventuallyConsistentWrite() throws Throwable {
|
|
|
+ requireS3Guard();
|
|
|
+ AmazonS3 s3ClientSpy = spyOnFilesystem();
|
|
|
+ final Path testpath1 =
|
|
|
+ writeEventuallyConsistentFileVersion("eventually1.dat",
|
|
|
+ s3ClientSpy, TEST_MAX_RETRIES, 0 , 0);
|
|
|
+
|
|
|
+ try (FSDataInputStream instream1 = fs.open(testpath1)) {
|
|
|
+ // succeeds on the last retry
|
|
|
+ instream1.read();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Ensures S3Guard and retries allow an eventually consistent read.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testReadAfterEventuallyConsistentWrite2() throws Throwable {
|
|
|
+ requireS3Guard();
|
|
|
+ AmazonS3 s3ClientSpy = spyOnFilesystem();
|
|
|
+ final Path testpath2 =
|
|
|
+ writeEventuallyConsistentFileVersion("eventually2.dat",
|
|
|
+ s3ClientSpy, TEST_MAX_RETRIES + 1, 0, 0);
|
|
|
+
|
|
|
+ try (FSDataInputStream instream2 = fs.open(testpath2)) {
|
|
|
+ if (expectedExceptionInteractions.contains(
|
|
|
+ InteractionType.EVENTUALLY_CONSISTENT_READ)) {
|
|
|
+ // keeps retrying and eventually gives up with RemoteFileChangedException
|
|
|
+ expectReadFailure(instream2);
|
|
|
+ } else {
|
|
|
+ instream2.read();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Ensures read on re-open (after seek backwards) when S3 does not return the
|
|
|
+ * version of the file tracked in the metadata store fails immediately. No
|
|
|
+ * retries should happen since a retry is not expected to recover.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testEventuallyConsistentReadOnReopen() throws Throwable {
|
|
|
+ requireS3Guard();
|
|
|
+ AmazonS3 s3ClientSpy = spyOnFilesystem();
|
|
|
+ String filename = "eventually-reopen.dat";
|
|
|
+ final Path testpath =
|
|
|
+ writeEventuallyConsistentFileVersion(filename,
|
|
|
+ s3ClientSpy, 0, 0, 0);
|
|
|
+
|
|
|
+ try (FSDataInputStream instream = fs.open(testpath)) {
|
|
|
+ instream.read();
|
|
|
+ // overwrite the file, returning inconsistent version for
|
|
|
+ // (effectively) infinite retries
|
|
|
+ writeEventuallyConsistentFileVersion(filename, s3ClientSpy,
|
|
|
+ Integer.MAX_VALUE, 0, 0);
|
|
|
+ instream.seek(0);
|
|
|
+ if (expectedExceptionInteractions.contains(InteractionType.READ)) {
|
|
|
+ // if it retries at all, it will retry forever, which should fail
|
|
|
+ // the test. The expected behavior is immediate
|
|
|
+ // RemoteFileChangedException.
|
|
|
+ expectReadFailure(instream);
|
|
|
+ } else {
|
|
|
+ instream.read();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Writes a file with old ETag and versionId in the metadata store such
|
|
|
+ * that the metadata is out of sync with S3. Attempts to read such a file
|
|
|
+ * should result in {@link RemoteFileChangedException}.
|
|
|
+ */
|
|
|
+ private Path writeOutOfSyncFileVersion(String filename) throws IOException {
|
|
|
+ final Path testpath = path(filename);
|
|
|
+ final byte[] dataset = TEST_DATA_BYTES;
|
|
|
+ writeDataset(fs, testpath, dataset, dataset.length,
|
|
|
+ 1024, false);
|
|
|
+ S3AFileStatus originalStatus = (S3AFileStatus) fs.getFileStatus(testpath);
|
|
|
+
|
|
|
+ // overwrite with half the content
|
|
|
+ writeDataset(fs, testpath, dataset, dataset.length / 2,
|
|
|
+ 1024, true);
|
|
|
+
|
|
|
+ S3AFileStatus newStatus = (S3AFileStatus) fs.getFileStatus(testpath);
|
|
|
+
|
|
|
+ // put back the original etag, versionId
|
|
|
+ S3AFileStatus forgedStatus =
|
|
|
+ S3AFileStatus.fromFileStatus(newStatus, Tristate.FALSE,
|
|
|
+ originalStatus.getETag(), originalStatus.getVersionId());
|
|
|
+ fs.getMetadataStore().put(
|
|
|
+ new PathMetadata(forgedStatus, Tristate.FALSE, false));
|
|
|
+
|
|
|
+ return testpath;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Writes {@link #TEST_DATA} to a file where the file will be inconsistent
|
|
|
+ * in S3 for a set of operations.
|
|
|
+ * The duration of the inconsistency is controlled by the
|
|
|
+ * getObjectInconsistencyCount, getMetadataInconsistencyCount, and
|
|
|
+ * copyInconsistentCallCount parameters.
|
|
|
+ * The inconsistency manifests in AmazonS3#getObject,
|
|
|
+ * AmazonS3#getObjectMetadata, and AmazonS3#copyObject.
|
|
|
+ * This method sets up the provided s3ClientSpy to return a response to each
|
|
|
+ * of these methods indicating an inconsistency where the requested object
|
|
|
+ * version (eTag or versionId) is not available until a certain retry
|
|
|
+ * threshold is met.
|
|
|
+ * Providing inconsistent call count values above or
|
|
|
+ * below the overall retry limit allows a test to simulate a condition that
|
|
|
+ * either should or should not result in an overall failure from retry
|
|
|
+ * exhaustion.
|
|
|
+ * @param filename name of file (will be under test path)
|
|
|
+ * @param s3ClientSpy s3 client to patch
|
|
|
+ * @param getObjectInconsistencyCount number of GET inconsistencies
|
|
|
+ * @param getMetadataInconsistencyCount number of HEAD inconsistencies
|
|
|
+ * @param copyInconsistencyCount number of COPY inconsistencies.
|
|
|
+ * @return the path written
|
|
|
+ * @throws IOException failure to write the test data.
|
|
|
+ */
|
|
|
+ private Path writeEventuallyConsistentFileVersion(String filename,
|
|
|
+ AmazonS3 s3ClientSpy,
|
|
|
+ int getObjectInconsistencyCount,
|
|
|
+ int getMetadataInconsistencyCount,
|
|
|
+ int copyInconsistencyCount)
|
|
|
+ throws IOException {
|
|
|
+ return writeEventuallyConsistentData(s3ClientSpy,
|
|
|
+ path(filename),
|
|
|
+ TEST_DATA_BYTES,
|
|
|
+ getObjectInconsistencyCount,
|
|
|
+ getMetadataInconsistencyCount,
|
|
|
+ copyInconsistencyCount);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Writes data to a path and configures the S3 client for inconsistent
|
|
|
+ * HEAD, GET or COPY operations.
|
|
|
+ * @param testpath absolute path of file
|
|
|
+ * @param s3ClientSpy s3 client to patch
|
|
|
+ * @param dataset bytes to write.
|
|
|
+ * @param getObjectInconsistencyCount number of GET inconsistencies
|
|
|
+ * @param getMetadataInconsistencyCount number of HEAD inconsistencies
|
|
|
+ * @param copyInconsistencyCount number of COPY inconsistencies.
|
|
|
+ * @return the path written
|
|
|
+ * @throws IOException failure to write the test data.
|
|
|
+ */
|
|
|
+ private Path writeEventuallyConsistentData(final AmazonS3 s3ClientSpy,
|
|
|
+ final Path testpath,
|
|
|
+ final byte[] dataset,
|
|
|
+ final int getObjectInconsistencyCount,
|
|
|
+ final int getMetadataInconsistencyCount,
|
|
|
+ final int copyInconsistencyCount)
|
|
|
+ throws IOException {
|
|
|
+ writeDataset(fs, testpath, dataset, dataset.length,
|
|
|
+ 1024, true);
|
|
|
+ S3AFileStatus originalStatus = (S3AFileStatus) fs.getFileStatus(testpath);
|
|
|
+
|
|
|
+ // overwrite with half the content
|
|
|
+ writeDataset(fs, testpath, dataset, dataset.length / 2,
|
|
|
+ 1024, true);
|
|
|
+
|
|
|
+ LOG.debug("Original file info: {}: version={}, etag={}", testpath,
|
|
|
+ originalStatus.getVersionId(), originalStatus.getETag());
|
|
|
+
|
|
|
+ S3AFileStatus newStatus = (S3AFileStatus) fs.getFileStatus(testpath);
|
|
|
+ LOG.debug("Updated file info: {}: version={}, etag={}", testpath,
|
|
|
+ newStatus.getVersionId(), newStatus.getETag());
|
|
|
+
|
|
|
+ stubTemporaryUnavailable(s3ClientSpy, getObjectInconsistencyCount,
|
|
|
+ testpath, newStatus);
|
|
|
+
|
|
|
+ stubTemporaryWrongVersion(s3ClientSpy, getObjectInconsistencyCount,
|
|
|
+ testpath, originalStatus);
|
|
|
+
|
|
|
+ if (versionCheckingIsOnServer()) {
|
|
|
+ // only stub inconsistency when mode is server since no constraints that
|
|
|
+ // should trigger inconsistency are passed in any other mode
|
|
|
+ stubTemporaryCopyInconsistency(s3ClientSpy, testpath, newStatus,
|
|
|
+ copyInconsistencyCount);
|
|
|
+ }
|
|
|
+
|
|
|
+ stubTemporaryMetadataInconsistency(s3ClientSpy, testpath, originalStatus,
|
|
|
+ newStatus, getMetadataInconsistencyCount);
|
|
|
+
|
|
|
+ return testpath;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Log the call hierarchy at debug level, helps track down
|
|
|
+ * where calls to operations are coming from.
|
|
|
+ */
|
|
|
+ private void logLocationAtDebug() {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Call hierarchy", new Exception("here"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Stubs {@link AmazonS3#getObject(GetObjectRequest)}
|
|
|
+ * within s3ClientSpy to return null until inconsistentCallCount calls have
|
|
|
+ * been made. The null response simulates what occurs when an object
|
|
|
+ * matching the specified ETag or versionId is not available.
|
|
|
+ * @param s3ClientSpy the spy to stub
|
|
|
+ * @param inconsistentCallCount the number of calls that should return the
|
|
|
+ * null response
|
|
|
+ * @param testpath the path of the object the stub should apply to
|
|
|
+ */
|
|
|
+ private void stubTemporaryUnavailable(AmazonS3 s3ClientSpy,
|
|
|
+ int inconsistentCallCount, Path testpath,
|
|
|
+ S3AFileStatus newStatus) {
|
|
|
+ Answer<S3Object> temporarilyUnavailableAnswer = new Answer<S3Object>() {
|
|
|
+ private int callCount = 0;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public S3Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ // simulates ETag or versionId constraint not met until
|
|
|
+ // inconsistentCallCount surpassed
|
|
|
+ callCount++;
|
|
|
+ if (callCount <= inconsistentCallCount) {
|
|
|
+ LOG.info("Temporarily unavailable {} count {} of {}",
|
|
|
+ testpath, callCount, inconsistentCallCount);
|
|
|
+ logLocationAtDebug();
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ return (S3Object) invocation.callRealMethod();
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ // match the requests that would be made in either server-side change
|
|
|
+ // detection mode
|
|
|
+ doAnswer(temporarilyUnavailableAnswer).when(s3ClientSpy)
|
|
|
+ .getObject(
|
|
|
+ matchingGetObjectRequest(
|
|
|
+ testpath, newStatus.getETag(), null));
|
|
|
+ doAnswer(temporarilyUnavailableAnswer).when(s3ClientSpy)
|
|
|
+ .getObject(
|
|
|
+ matchingGetObjectRequest(
|
|
|
+ testpath, null, newStatus.getVersionId()));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Stubs {@link AmazonS3#getObject(GetObjectRequest)}
|
|
|
+ * within s3ClientSpy to return an object modified to contain metadata
|
|
|
+ * from originalStatus until inconsistentCallCount calls have been made.
|
|
|
+ * @param s3ClientSpy the spy to stub
|
|
|
+ * @param testpath the path of the object the stub should apply to
|
|
|
+ * @param inconsistentCallCount the number of calls that should return the
|
|
|
+ * null response
|
|
|
+ * @param originalStatus the status metadata to inject into the
|
|
|
+ * inconsistentCallCount responses
|
|
|
+ */
|
|
|
+ private void stubTemporaryWrongVersion(AmazonS3 s3ClientSpy,
|
|
|
+ int inconsistentCallCount, Path testpath,
|
|
|
+ S3AFileStatus originalStatus) {
|
|
|
+ Answer<S3Object> temporarilyWrongVersionAnswer = new Answer<S3Object>() {
|
|
|
+ private int callCount = 0;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public S3Object answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ // simulates old ETag or versionId until inconsistentCallCount surpassed
|
|
|
+ callCount++;
|
|
|
+ S3Object s3Object = (S3Object) invocation.callRealMethod();
|
|
|
+ if (callCount <= inconsistentCallCount) {
|
|
|
+ LOG.info("Temporary Wrong Version {} count {} of {}",
|
|
|
+ testpath, callCount, inconsistentCallCount);
|
|
|
+ logLocationAtDebug();
|
|
|
+ S3Object objectSpy = Mockito.spy(s3Object);
|
|
|
+ ObjectMetadata metadataSpy =
|
|
|
+ Mockito.spy(s3Object.getObjectMetadata());
|
|
|
+ when(objectSpy.getObjectMetadata()).thenReturn(metadataSpy);
|
|
|
+ when(metadataSpy.getETag()).thenReturn(originalStatus.getETag());
|
|
|
+ when(metadataSpy.getVersionId())
|
|
|
+ .thenReturn(originalStatus.getVersionId());
|
|
|
+ return objectSpy;
|
|
|
+ }
|
|
|
+ return s3Object;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ // match requests that would be made in client-side change detection
|
|
|
+ doAnswer(temporarilyWrongVersionAnswer).when(s3ClientSpy).getObject(
|
|
|
+ matchingGetObjectRequest(testpath, null, null));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Stubs {@link AmazonS3#copyObject(CopyObjectRequest)}
|
|
|
+ * within s3ClientSpy to return null (indicating preconditions not met) until
|
|
|
+ * copyInconsistentCallCount calls have been made.
|
|
|
+ * @param s3ClientSpy the spy to stub
|
|
|
+ * @param testpath the path of the object the stub should apply to
|
|
|
+ * @param newStatus the status metadata containing the ETag and versionId
|
|
|
+ * that should be matched in order for the stub to apply
|
|
|
+ * @param copyInconsistentCallCount how many times to return the
|
|
|
+ * precondition failed error
|
|
|
+ */
|
|
|
+ private void stubTemporaryCopyInconsistency(AmazonS3 s3ClientSpy,
|
|
|
+ Path testpath, S3AFileStatus newStatus,
|
|
|
+ int copyInconsistentCallCount) {
|
|
|
+ Answer<CopyObjectResult> temporarilyPreconditionsNotMetAnswer =
|
|
|
+ new Answer<CopyObjectResult>() {
|
|
|
+ private int callCount = 0;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public CopyObjectResult answer(InvocationOnMock invocation)
|
|
|
+ throws Throwable {
|
|
|
+ callCount++;
|
|
|
+ if (callCount <= copyInconsistentCallCount) {
|
|
|
+ String message = "preconditions not met on call " + callCount
|
|
|
+ + " of " + copyInconsistentCallCount;
|
|
|
+ LOG.info("Copying {}: {}", testpath, message);
|
|
|
+ logLocationAtDebug();
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ return (CopyObjectResult) invocation.callRealMethod();
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ // match requests made during copy
|
|
|
+ doAnswer(temporarilyPreconditionsNotMetAnswer).when(s3ClientSpy).copyObject(
|
|
|
+ matchingCopyObjectRequest(testpath, newStatus.getETag(), null));
|
|
|
+ doAnswer(temporarilyPreconditionsNotMetAnswer).when(s3ClientSpy).copyObject(
|
|
|
+ matchingCopyObjectRequest(testpath, null, newStatus.getVersionId()));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Stubs {@link AmazonS3#getObjectMetadata(GetObjectMetadataRequest)}
|
|
|
+ * within s3ClientSpy to return metadata from originalStatus until
|
|
|
+ * metadataInconsistentCallCount calls have been made.
|
|
|
+ * @param s3ClientSpy the spy to stub
|
|
|
+ * @param testpath the path of the object the stub should apply to
|
|
|
+ * @param originalStatus the inconsistent status metadata to return
|
|
|
+ * @param newStatus the status metadata to return after
|
|
|
+ * metadataInconsistentCallCount is met
|
|
|
+ * @param metadataInconsistentCallCount how many times to return the
|
|
|
+ * inconsistent metadata
|
|
|
+ */
|
|
|
+ private void stubTemporaryMetadataInconsistency(AmazonS3 s3ClientSpy,
|
|
|
+ Path testpath, S3AFileStatus originalStatus,
|
|
|
+ S3AFileStatus newStatus, int metadataInconsistentCallCount) {
|
|
|
+ Answer<ObjectMetadata> temporarilyOldMetadataAnswer =
|
|
|
+ new Answer<ObjectMetadata>() {
|
|
|
+ private int callCount = 0;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public ObjectMetadata answer(InvocationOnMock invocation)
|
|
|
+ throws Throwable {
|
|
|
+ ObjectMetadata objectMetadata =
|
|
|
+ (ObjectMetadata) invocation.callRealMethod();
|
|
|
+ callCount++;
|
|
|
+ if (callCount <= metadataInconsistentCallCount) {
|
|
|
+ LOG.info("Inconsistent metadata {} count {} of {}",
|
|
|
+ testpath, callCount, metadataInconsistentCallCount);
|
|
|
+ logLocationAtDebug();
|
|
|
+ ObjectMetadata metadataSpy =
|
|
|
+ Mockito.spy(objectMetadata);
|
|
|
+ when(metadataSpy.getETag()).thenReturn(originalStatus.getETag());
|
|
|
+ when(metadataSpy.getVersionId())
|
|
|
+ .thenReturn(originalStatus.getVersionId());
|
|
|
+ return metadataSpy;
|
|
|
+ }
|
|
|
+ return objectMetadata;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ // match requests made during select
|
|
|
+ doAnswer(temporarilyOldMetadataAnswer).when(s3ClientSpy).getObjectMetadata(
|
|
|
+ matchingMetadataRequest(testpath, null));
|
|
|
+ doAnswer(temporarilyOldMetadataAnswer).when(s3ClientSpy).getObjectMetadata(
|
|
|
+ matchingMetadataRequest(testpath, newStatus.getVersionId()));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Writes a file with null ETag and versionId in the metadata store.
|
|
|
+ */
|
|
|
+ private Path writeFileWithNoVersionMetadata(String filename)
|
|
|
+ throws IOException {
|
|
|
+ final Path testpath = path(filename);
|
|
|
+ writeDataset(fs, testpath, TEST_DATA_BYTES, TEST_DATA_BYTES.length,
|
|
|
+ 1024, false);
|
|
|
+ S3AFileStatus originalStatus = (S3AFileStatus) fs.getFileStatus(testpath);
|
|
|
+
|
|
|
+ // remove ETag and versionId
|
|
|
+ S3AFileStatus newStatus = S3AFileStatus.fromFileStatus(originalStatus,
|
|
|
+ Tristate.FALSE, null, null);
|
|
|
+ fs.getMetadataStore().put(new PathMetadata(newStatus, Tristate.FALSE,
|
|
|
+ false));
|
|
|
+
|
|
|
+ return testpath;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The test is invalid if the policy uses versionId but the bucket doesn't
|
|
|
+ * have versioning enabled.
|
|
|
+ *
|
|
|
+ * Tests the given file for a versionId to detect whether bucket versioning
|
|
|
+ * is enabled.
|
|
|
+ */
|
|
|
+ private void skipIfVersionPolicyAndNoVersionId(Path testpath)
|
|
|
+ throws IOException {
|
|
|
+ if (fs.getChangeDetectionPolicy().getSource() == Source.VersionId) {
|
|
|
+ // skip versionId tests if the bucket doesn't have object versioning
|
|
|
+ // enabled
|
|
|
+ Assume.assumeTrue(
|
|
|
+ "Target filesystem does not support versioning",
|
|
|
+ fs.getObjectMetadata(fs.pathToKey(testpath)).getVersionId() != null);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Like {@link #skipIfVersionPolicyAndNoVersionId(Path)} but generates a new
|
|
|
+ * file to test versionId against.
|
|
|
+ */
|
|
|
+ private void skipIfVersionPolicyAndNoVersionId() throws IOException {
|
|
|
+ if (fs.getChangeDetectionPolicy().getSource() == Source.VersionId) {
|
|
|
+ Path versionIdFeatureTestFile = path("versionIdTest");
|
|
|
+ writeDataset(fs, versionIdFeatureTestFile, TEST_DATA_BYTES,
|
|
|
+ TEST_DATA_BYTES.length, 1024, true, true);
|
|
|
+ skipIfVersionPolicyAndNoVersionId(versionIdFeatureTestFile);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private GetObjectRequest matchingGetObjectRequest(Path path, String eTag,
|
|
|
+ String versionId) {
|
|
|
+ return ArgumentMatchers.argThat(request -> {
|
|
|
+ if (request.getBucketName().equals(fs.getBucket())
|
|
|
+ && request.getKey().equals(fs.pathToKey(path))) {
|
|
|
+ if (eTag == null && !request.getMatchingETagConstraints().isEmpty()) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ if (eTag != null &&
|
|
|
+ !request.getMatchingETagConstraints().contains(eTag)) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ if (versionId == null && request.getVersionId() != null) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ if (versionId != null && !versionId.equals(request.getVersionId())) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private CopyObjectRequest matchingCopyObjectRequest(Path path, String eTag,
|
|
|
+ String versionId) {
|
|
|
+ return ArgumentMatchers.argThat(request -> {
|
|
|
+ if (request.getSourceBucketName().equals(fs.getBucket())
|
|
|
+ && request.getSourceKey().equals(fs.pathToKey(path))) {
|
|
|
+ if (eTag == null && !request.getMatchingETagConstraints().isEmpty()) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ if (eTag != null &&
|
|
|
+ !request.getMatchingETagConstraints().contains(eTag)) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ if (versionId == null && request.getSourceVersionId() != null) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ if (versionId != null &&
|
|
|
+ !versionId.equals(request.getSourceVersionId())) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private GetObjectMetadataRequest matchingMetadataRequest(Path path,
|
|
|
+ String versionId) {
|
|
|
+ return ArgumentMatchers.argThat(request -> {
|
|
|
+ if (request.getBucketName().equals(fs.getBucket())
|
|
|
+ && request.getKey().equals(fs.pathToKey(path))) {
|
|
|
+ if (versionId == null && request.getVersionId() != null) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ if (versionId != null &&
|
|
|
+ !versionId.equals(request.getVersionId())) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Skip a test case if it needs S3Guard and the filesystem does
|
|
|
+ * not have it.
|
|
|
+ */
|
|
|
+ private void requireS3Guard() {
|
|
|
+ Assume.assumeTrue("S3Guard must be enabled", fs.hasMetadataStore());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Skip a test case if S3 Select is not supported on this store.
|
|
|
+ */
|
|
|
+ private void requireS3Select() {
|
|
|
+ Assume.assumeTrue("S3 Select is not enabled",
|
|
|
+ getFileSystem().hasCapability(S3_SELECT_CAPABILITY));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Spy on the filesystem at the S3 client level.
|
|
|
+ * @return a mocked S3 client to which the test FS is bonded.
|
|
|
+ */
|
|
|
+ private AmazonS3 spyOnFilesystem() {
|
|
|
+ AmazonS3 s3ClientSpy = Mockito.spy(
|
|
|
+ fs.getAmazonS3ClientForTesting("mocking"));
|
|
|
+ fs.setAmazonS3Client(s3ClientSpy);
|
|
|
+ return s3ClientSpy;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Expect reading this stream to fail.
|
|
|
+ * @param instream input stream.
|
|
|
+ * @return the caught exception.
|
|
|
+ * @throws Exception an other exception
|
|
|
+ */
|
|
|
+
|
|
|
+ private RemoteFileChangedException expectReadFailure(
|
|
|
+ final FSDataInputStream instream)
|
|
|
+ throws Exception {
|
|
|
+ return intercept(RemoteFileChangedException.class, "",
|
|
|
+ "read() returned",
|
|
|
+ () -> readToText(instream.read()));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Convert the result of a read to a text string for errors.
|
|
|
+ * @param r result of the read() call.
|
|
|
+ * @return a string for exception text.
|
|
|
+ */
|
|
|
+ private String readToText(int r) {
|
|
|
+ return r < 32
|
|
|
+ ? (String.format("%02d", r))
|
|
|
+ : (String.format("%c", (char) r));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Is the version checking on the server?
|
|
|
+ * @return true if the server returns 412 errors.
|
|
|
+ */
|
|
|
+ private boolean versionCheckingIsOnServer() {
|
|
|
+ return fs.getChangeDetectionPolicy().getMode() == Mode.Server;
|
|
|
+ }
|
|
|
}
|