|
@@ -23,15 +23,19 @@ import java.io.IOException;
|
|
import java.io.InputStream;
|
|
import java.io.InputStream;
|
|
import java.nio.ByteBuffer;
|
|
import java.nio.ByteBuffer;
|
|
import java.security.MessageDigest;
|
|
import java.security.MessageDigest;
|
|
-import java.util.ArrayList;
|
|
|
|
-import java.util.List;
|
|
|
|
|
|
+import java.util.HashMap;
|
|
|
|
+import java.util.Map;
|
|
|
|
+import java.util.Random;
|
|
|
|
|
|
import com.google.common.base.Charsets;
|
|
import com.google.common.base.Charsets;
|
|
|
|
+import org.junit.Assume;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
|
|
+import org.slf4j.Logger;
|
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import org.apache.commons.codec.digest.DigestUtils;
|
|
import org.apache.commons.codec.digest.DigestUtils;
|
|
import org.apache.commons.io.IOUtils;
|
|
import org.apache.commons.io.IOUtils;
|
|
-import org.apache.commons.lang3.tuple.Pair;
|
|
|
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.BBUploadHandle;
|
|
import org.apache.hadoop.fs.BBUploadHandle;
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
@@ -43,11 +47,64 @@ import org.apache.hadoop.fs.PathHandle;
|
|
import org.apache.hadoop.fs.UploadHandle;
|
|
import org.apache.hadoop.fs.UploadHandle;
|
|
|
|
|
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists;
|
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists;
|
|
|
|
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
|
|
|
|
+import static org.apache.hadoop.test.LambdaTestUtils.eventually;
|
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
|
|
|
|
|
public abstract class AbstractContractMultipartUploaderTest extends
|
|
public abstract class AbstractContractMultipartUploaderTest extends
|
|
AbstractFSContractTestBase {
|
|
AbstractFSContractTestBase {
|
|
|
|
|
|
|
|
+ protected static final Logger LOG =
|
|
|
|
+ LoggerFactory.getLogger(AbstractContractMultipartUploaderTest.class);
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Size of very small uploads.
|
|
|
|
+ * Enough to be non empty, not big enough to cause delays on uploads.
|
|
|
|
+ */
|
|
|
|
+ protected static final int SMALL_FILE = 100;
|
|
|
|
+
|
|
|
|
+ private MultipartUploader mpu;
|
|
|
|
+ private MultipartUploader mpu2;
|
|
|
|
+ private final Random random = new Random();
|
|
|
|
+ private UploadHandle activeUpload;
|
|
|
|
+ private Path activeUploadPath;
|
|
|
|
+
|
|
|
|
+ protected String getMethodName() {
|
|
|
|
+ return methodName.getMethodName();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void setup() throws Exception {
|
|
|
|
+ super.setup();
|
|
|
|
+ Configuration conf = getContract().getConf();
|
|
|
|
+ mpu = MultipartUploaderFactory.get(getFileSystem(), conf);
|
|
|
|
+ mpu2 = MultipartUploaderFactory.get(getFileSystem(), conf);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void teardown() throws Exception {
|
|
|
|
+ if (mpu!= null && activeUpload != null) {
|
|
|
|
+ try {
|
|
|
|
+ mpu.abort(activeUploadPath, activeUpload);
|
|
|
|
+ } catch (FileNotFoundException ignored) {
|
|
|
|
+ /* this is fine */
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ LOG.info("in teardown", e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ cleanupWithLogger(LOG, mpu, mpu2);
|
|
|
|
+ super.teardown();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Get a test path based on the method name.
|
|
|
|
+ * @return a path to use in the test
|
|
|
|
+ * @throws IOException failure to build the path name up.
|
|
|
|
+ */
|
|
|
|
+ protected Path methodPath() throws IOException {
|
|
|
|
+ return path(getMethodName());
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* The payload is the part number repeated for the length of the part.
|
|
* The payload is the part number repeated for the length of the part.
|
|
* This makes checking the correctness of the upload straightforward.
|
|
* This makes checking the correctness of the upload straightforward.
|
|
@@ -55,9 +112,19 @@ public abstract class AbstractContractMultipartUploaderTest extends
|
|
* @return the bytes to upload.
|
|
* @return the bytes to upload.
|
|
*/
|
|
*/
|
|
private byte[] generatePayload(int partNumber) {
|
|
private byte[] generatePayload(int partNumber) {
|
|
- int sizeInBytes = partSizeInBytes();
|
|
|
|
- ByteBuffer buffer = ByteBuffer.allocate(sizeInBytes);
|
|
|
|
- for (int i=0 ; i < sizeInBytes/(Integer.SIZE/Byte.SIZE); ++i) {
|
|
|
|
|
|
+ return generatePayload(partNumber, partSizeInBytes());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Generate a payload of a given size; part number is used
|
|
|
|
+ * for all the values.
|
|
|
|
+ * @param partNumber part number
|
|
|
|
+ * @param size size in bytes
|
|
|
|
+ * @return the bytes to upload.
|
|
|
|
+ */
|
|
|
|
+ private byte[] generatePayload(final int partNumber, final int size) {
|
|
|
|
+ ByteBuffer buffer = ByteBuffer.allocate(size);
|
|
|
|
+ for (int i=0; i < size /(Integer.SIZE/Byte.SIZE); ++i) {
|
|
buffer.putInt(partNumber);
|
|
buffer.putInt(partNumber);
|
|
}
|
|
}
|
|
return buffer.array();
|
|
return buffer.array();
|
|
@@ -70,11 +137,14 @@ public abstract class AbstractContractMultipartUploaderTest extends
|
|
* @throws IOException failure to read or digest the file.
|
|
* @throws IOException failure to read or digest the file.
|
|
*/
|
|
*/
|
|
protected byte[] digest(Path path) throws IOException {
|
|
protected byte[] digest(Path path) throws IOException {
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
- try (InputStream in = fs.open(path)) {
|
|
|
|
|
|
+ ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
|
|
|
|
+ try (InputStream in = getFileSystem().open(path)) {
|
|
byte[] fdData = IOUtils.toByteArray(in);
|
|
byte[] fdData = IOUtils.toByteArray(in);
|
|
MessageDigest newDigest = DigestUtils.getMd5Digest();
|
|
MessageDigest newDigest = DigestUtils.getMd5Digest();
|
|
- return newDigest.digest(fdData);
|
|
|
|
|
|
+ byte[] digest = newDigest.digest(fdData);
|
|
|
|
+ return digest;
|
|
|
|
+ } finally {
|
|
|
|
+ timer.end("Download and digest of path %s", path);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -92,75 +162,231 @@ public abstract class AbstractContractMultipartUploaderTest extends
|
|
return 10;
|
|
return 10;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * How long in milliseconds for propagation of
|
|
|
|
+ * store changes, including update/delete/list
|
|
|
|
+ * to be everywhere.
|
|
|
|
+ * If 0: the FS is consistent.
|
|
|
|
+ * @return a time in milliseconds.
|
|
|
|
+ */
|
|
|
|
+ protected int timeToBecomeConsistentMillis() {
|
|
|
|
+ return 0;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Does a call to finalize an upload (either complete or abort) consume the
|
|
|
|
+ * uploadID immediately or is it reaped at a later point in time?
|
|
|
|
+ * @return true if the uploadID will be consumed immediately (and no longer
|
|
|
|
+ * resuable).
|
|
|
|
+ */
|
|
|
|
+ protected abstract boolean finalizeConsumesUploadIdImmediately();
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Does the store support concurrent uploads to the same destination path?
|
|
|
|
+ * @return true if concurrent uploads are supported.
|
|
|
|
+ */
|
|
|
|
+ protected abstract boolean supportsConcurrentUploadsToSamePath();
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Pick a multipart uploader from the index value.
|
|
|
|
+ * @param index index of upload
|
|
|
|
+ * @return an uploader
|
|
|
|
+ */
|
|
|
|
+ protected MultipartUploader mpu(int index) {
|
|
|
|
+ return (index % 2 == 0) ? mpu : mpu2;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Pick a multipart uploader at random.
|
|
|
|
+ * @return an uploader
|
|
|
|
+ */
|
|
|
|
+ protected MultipartUploader randomMpu() {
|
|
|
|
+ return mpu(random.nextInt(10));
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Assert that a multipart upload is successful.
|
|
* Assert that a multipart upload is successful.
|
|
* @throws Exception failure
|
|
* @throws Exception failure
|
|
*/
|
|
*/
|
|
@Test
|
|
@Test
|
|
public void testSingleUpload() throws Exception {
|
|
public void testSingleUpload() throws Exception {
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
- Path file = path("testSingleUpload");
|
|
|
|
- MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
|
|
|
|
- UploadHandle uploadHandle = mpu.initialize(file);
|
|
|
|
- List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
|
|
|
|
|
|
+ Path file = methodPath();
|
|
|
|
+ UploadHandle uploadHandle = initializeUpload(file);
|
|
|
|
+ Map<Integer, PartHandle> partHandles = new HashMap<>();
|
|
MessageDigest origDigest = DigestUtils.getMd5Digest();
|
|
MessageDigest origDigest = DigestUtils.getMd5Digest();
|
|
- byte[] payload = generatePayload(1);
|
|
|
|
|
|
+ int size = SMALL_FILE;
|
|
|
|
+ byte[] payload = generatePayload(1, size);
|
|
origDigest.update(payload);
|
|
origDigest.update(payload);
|
|
- InputStream is = new ByteArrayInputStream(payload);
|
|
|
|
- PartHandle partHandle = mpu.putPart(file, is, 1, uploadHandle,
|
|
|
|
- payload.length);
|
|
|
|
- partHandles.add(Pair.of(1, partHandle));
|
|
|
|
- PathHandle fd = completeUpload(file, mpu, uploadHandle, partHandles,
|
|
|
|
|
|
+ PartHandle partHandle = putPart(file, uploadHandle, 1, payload);
|
|
|
|
+ partHandles.put(1, partHandle);
|
|
|
|
+ PathHandle fd = completeUpload(file, uploadHandle, partHandles,
|
|
origDigest,
|
|
origDigest,
|
|
- payload.length);
|
|
|
|
|
|
+ size);
|
|
|
|
+
|
|
|
|
+ if (finalizeConsumesUploadIdImmediately()) {
|
|
|
|
+ intercept(FileNotFoundException.class,
|
|
|
|
+ () -> mpu.complete(file, partHandles, uploadHandle));
|
|
|
|
+ } else {
|
|
|
|
+ PathHandle fd2 = mpu.complete(file, partHandles, uploadHandle);
|
|
|
|
+ assertArrayEquals("Path handles differ", fd.toByteArray(),
|
|
|
|
+ fd2.toByteArray());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Initialize an upload.
|
|
|
|
+ * This saves the path and upload handle as the active
|
|
|
|
+ * upload, for aborting in teardown
|
|
|
|
+ * @param dest destination
|
|
|
|
+ * @return the handle
|
|
|
|
+ * @throws IOException failure to initialize
|
|
|
|
+ */
|
|
|
|
+ protected UploadHandle initializeUpload(final Path dest) throws IOException {
|
|
|
|
+ activeUploadPath = dest;
|
|
|
|
+ activeUpload = randomMpu().initialize(dest);
|
|
|
|
+ return activeUpload;
|
|
|
|
+ }
|
|
|
|
|
|
- // Complete is idempotent
|
|
|
|
- PathHandle fd2 = mpu.complete(file, partHandles, uploadHandle);
|
|
|
|
- assertArrayEquals("Path handles differ", fd.toByteArray(),
|
|
|
|
- fd2.toByteArray());
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Generate then upload a part.
|
|
|
|
+ * @param file destination
|
|
|
|
+ * @param uploadHandle handle
|
|
|
|
+ * @param index index of part
|
|
|
|
+ * @param origDigest digest to build up. May be null
|
|
|
|
+ * @return the part handle
|
|
|
|
+ * @throws IOException IO failure.
|
|
|
|
+ */
|
|
|
|
+ protected PartHandle buildAndPutPart(
|
|
|
|
+ final Path file,
|
|
|
|
+ final UploadHandle uploadHandle,
|
|
|
|
+ final int index,
|
|
|
|
+ final MessageDigest origDigest) throws IOException {
|
|
|
|
+ byte[] payload = generatePayload(index);
|
|
|
|
+ if (origDigest != null) {
|
|
|
|
+ origDigest.update(payload);
|
|
|
|
+ }
|
|
|
|
+ return putPart(file, uploadHandle, index, payload);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Put a part.
|
|
|
|
+ * The entire byte array is uploaded.
|
|
|
|
+ * @param file destination
|
|
|
|
+ * @param uploadHandle handle
|
|
|
|
+ * @param index index of part
|
|
|
|
+ * @param payload byte array of payload
|
|
|
|
+ * @return the part handle
|
|
|
|
+ * @throws IOException IO failure.
|
|
|
|
+ */
|
|
|
|
+ protected PartHandle putPart(final Path file,
|
|
|
|
+ final UploadHandle uploadHandle,
|
|
|
|
+ final int index,
|
|
|
|
+ final byte[] payload) throws IOException {
|
|
|
|
+ ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
|
|
|
|
+ PartHandle partHandle = mpu(index)
|
|
|
|
+ .putPart(file,
|
|
|
|
+ new ByteArrayInputStream(payload),
|
|
|
|
+ index,
|
|
|
|
+ uploadHandle,
|
|
|
|
+ payload.length);
|
|
|
|
+ timer.end("Uploaded part %s", index);
|
|
|
|
+ LOG.info("Upload bandwidth {} MB/s",
|
|
|
|
+ timer.bandwidthDescription(payload.length));
|
|
|
|
+ return partHandle;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Complete an upload with the active MPU instance.
|
|
|
|
+ * @param file destination
|
|
|
|
+ * @param uploadHandle handle
|
|
|
|
+ * @param partHandles map of handles
|
|
|
|
+ * @param origDigest digest of source data (may be null)
|
|
|
|
+ * @param expectedLength expected length of result.
|
|
|
|
+ * @return the path handle from the upload.
|
|
|
|
+ * @throws IOException IO failure
|
|
|
|
+ */
|
|
private PathHandle completeUpload(final Path file,
|
|
private PathHandle completeUpload(final Path file,
|
|
- final MultipartUploader mpu,
|
|
|
|
final UploadHandle uploadHandle,
|
|
final UploadHandle uploadHandle,
|
|
- final List<Pair<Integer, PartHandle>> partHandles,
|
|
|
|
|
|
+ final Map<Integer, PartHandle> partHandles,
|
|
final MessageDigest origDigest,
|
|
final MessageDigest origDigest,
|
|
final int expectedLength) throws IOException {
|
|
final int expectedLength) throws IOException {
|
|
- PathHandle fd = mpu.complete(file, partHandles, uploadHandle);
|
|
|
|
|
|
+ PathHandle fd = complete(file, uploadHandle, partHandles);
|
|
|
|
|
|
FileStatus status = verifyPathExists(getFileSystem(),
|
|
FileStatus status = verifyPathExists(getFileSystem(),
|
|
"Completed file", file);
|
|
"Completed file", file);
|
|
assertEquals("length of " + status,
|
|
assertEquals("length of " + status,
|
|
expectedLength, status.getLen());
|
|
expectedLength, status.getLen());
|
|
|
|
|
|
|
|
+ if (origDigest != null) {
|
|
|
|
+ verifyContents(file, origDigest, expectedLength);
|
|
|
|
+ }
|
|
|
|
+ return fd;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Verify the contents of a file.
|
|
|
|
+ * @param file path
|
|
|
|
+ * @param origDigest digest
|
|
|
|
+ * @param expectedLength expected length (for logging B/W)
|
|
|
|
+ * @throws IOException IO failure
|
|
|
|
+ */
|
|
|
|
+ protected void verifyContents(final Path file,
|
|
|
|
+ final MessageDigest origDigest,
|
|
|
|
+ final int expectedLength) throws IOException {
|
|
|
|
+ ContractTestUtils.NanoTimer timer2 = new ContractTestUtils.NanoTimer();
|
|
assertArrayEquals("digest of source and " + file
|
|
assertArrayEquals("digest of source and " + file
|
|
+ " differ",
|
|
+ " differ",
|
|
origDigest.digest(), digest(file));
|
|
origDigest.digest(), digest(file));
|
|
|
|
+ timer2.end("Completed digest", file);
|
|
|
|
+ LOG.info("Download bandwidth {} MB/s",
|
|
|
|
+ timer2.bandwidthDescription(expectedLength));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Perform the inner complete without verification.
|
|
|
|
+ * @param file destination path
|
|
|
|
+ * @param uploadHandle upload handle
|
|
|
|
+ * @param partHandles map of parts
|
|
|
|
+ * @return the path handle from the upload.
|
|
|
|
+ * @throws IOException IO failure
|
|
|
|
+ */
|
|
|
|
+ private PathHandle complete(final Path file,
|
|
|
|
+ final UploadHandle uploadHandle,
|
|
|
|
+ final Map<Integer, PartHandle> partHandles) throws IOException {
|
|
|
|
+ ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
|
|
|
|
+ PathHandle fd = randomMpu().complete(file, partHandles, uploadHandle);
|
|
|
|
+ timer.end("Completed upload to %s", file);
|
|
return fd;
|
|
return fd;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Abort an upload.
|
|
|
|
+ * @param file path
|
|
|
|
+ * @param uploadHandle handle
|
|
|
|
+ * @throws IOException failure
|
|
|
|
+ */
|
|
|
|
+ private void abortUpload(final Path file, UploadHandle uploadHandle)
|
|
|
|
+ throws IOException {
|
|
|
|
+ randomMpu().abort(file, uploadHandle);
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Assert that a multipart upload is successful.
|
|
* Assert that a multipart upload is successful.
|
|
* @throws Exception failure
|
|
* @throws Exception failure
|
|
*/
|
|
*/
|
|
@Test
|
|
@Test
|
|
public void testMultipartUpload() throws Exception {
|
|
public void testMultipartUpload() throws Exception {
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
- Path file = path("testMultipartUpload");
|
|
|
|
- MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
|
|
|
|
- UploadHandle uploadHandle = mpu.initialize(file);
|
|
|
|
- List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
|
|
|
|
|
|
+ Path file = methodPath();
|
|
|
|
+ UploadHandle uploadHandle = initializeUpload(file);
|
|
|
|
+ Map<Integer, PartHandle> partHandles = new HashMap<>();
|
|
MessageDigest origDigest = DigestUtils.getMd5Digest();
|
|
MessageDigest origDigest = DigestUtils.getMd5Digest();
|
|
final int payloadCount = getTestPayloadCount();
|
|
final int payloadCount = getTestPayloadCount();
|
|
for (int i = 1; i <= payloadCount; ++i) {
|
|
for (int i = 1; i <= payloadCount; ++i) {
|
|
- byte[] payload = generatePayload(i);
|
|
|
|
- origDigest.update(payload);
|
|
|
|
- InputStream is = new ByteArrayInputStream(payload);
|
|
|
|
- PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
|
|
|
|
- payload.length);
|
|
|
|
- partHandles.add(Pair.of(i, partHandle));
|
|
|
|
|
|
+ PartHandle partHandle = buildAndPutPart(file, uploadHandle, i,
|
|
|
|
+ origDigest);
|
|
|
|
+ partHandles.put(i, partHandle);
|
|
}
|
|
}
|
|
- completeUpload(file, mpu, uploadHandle, partHandles, origDigest,
|
|
|
|
|
|
+ completeUpload(file, uploadHandle, partHandles, origDigest,
|
|
payloadCount * partSizeInBytes());
|
|
payloadCount * partSizeInBytes());
|
|
}
|
|
}
|
|
|
|
|
|
@@ -173,17 +399,33 @@ public abstract class AbstractContractMultipartUploaderTest extends
|
|
public void testMultipartUploadEmptyPart() throws Exception {
|
|
public void testMultipartUploadEmptyPart() throws Exception {
|
|
FileSystem fs = getFileSystem();
|
|
FileSystem fs = getFileSystem();
|
|
Path file = path("testMultipartUpload");
|
|
Path file = path("testMultipartUpload");
|
|
- MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
|
|
|
|
- UploadHandle uploadHandle = mpu.initialize(file);
|
|
|
|
- List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
|
|
|
|
- MessageDigest origDigest = DigestUtils.getMd5Digest();
|
|
|
|
- byte[] payload = new byte[0];
|
|
|
|
- origDigest.update(payload);
|
|
|
|
- InputStream is = new ByteArrayInputStream(payload);
|
|
|
|
- PartHandle partHandle = mpu.putPart(file, is, 0, uploadHandle,
|
|
|
|
- payload.length);
|
|
|
|
- partHandles.add(Pair.of(0, partHandle));
|
|
|
|
- completeUpload(file, mpu, uploadHandle, partHandles, origDigest, 0);
|
|
|
|
|
|
+ try (MultipartUploader uploader =
|
|
|
|
+ MultipartUploaderFactory.get(fs, null)) {
|
|
|
|
+ UploadHandle uploadHandle = uploader.initialize(file);
|
|
|
|
+
|
|
|
|
+ Map<Integer, PartHandle> partHandles = new HashMap<>();
|
|
|
|
+ MessageDigest origDigest = DigestUtils.getMd5Digest();
|
|
|
|
+ byte[] payload = new byte[0];
|
|
|
|
+ origDigest.update(payload);
|
|
|
|
+ InputStream is = new ByteArrayInputStream(payload);
|
|
|
|
+ PartHandle partHandle = uploader.putPart(file, is, 1, uploadHandle,
|
|
|
|
+ payload.length);
|
|
|
|
+ partHandles.put(1, partHandle);
|
|
|
|
+ completeUpload(file, uploadHandle, partHandles, origDigest, 0);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Assert that a multipart upload is successful.
|
|
|
|
+ * @throws Exception failure
|
|
|
|
+ */
|
|
|
|
+ @Test
|
|
|
|
+ public void testUploadEmptyBlock() throws Exception {
|
|
|
|
+ Path file = methodPath();
|
|
|
|
+ UploadHandle uploadHandle = initializeUpload(file);
|
|
|
|
+ Map<Integer, PartHandle> partHandles = new HashMap<>();
|
|
|
|
+ partHandles.put(1, putPart(file, uploadHandle, 1, new byte[0]));
|
|
|
|
+ completeUpload(file, uploadHandle, partHandles, null, 0);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -192,11 +434,9 @@ public abstract class AbstractContractMultipartUploaderTest extends
|
|
*/
|
|
*/
|
|
@Test
|
|
@Test
|
|
public void testMultipartUploadReverseOrder() throws Exception {
|
|
public void testMultipartUploadReverseOrder() throws Exception {
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
- Path file = path("testMultipartUploadReverseOrder");
|
|
|
|
- MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
|
|
|
|
- UploadHandle uploadHandle = mpu.initialize(file);
|
|
|
|
- List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
|
|
|
|
|
|
+ Path file = methodPath();
|
|
|
|
+ UploadHandle uploadHandle = initializeUpload(file);
|
|
|
|
+ Map<Integer, PartHandle> partHandles = new HashMap<>();
|
|
MessageDigest origDigest = DigestUtils.getMd5Digest();
|
|
MessageDigest origDigest = DigestUtils.getMd5Digest();
|
|
final int payloadCount = getTestPayloadCount();
|
|
final int payloadCount = getTestPayloadCount();
|
|
for (int i = 1; i <= payloadCount; ++i) {
|
|
for (int i = 1; i <= payloadCount; ++i) {
|
|
@@ -204,13 +444,9 @@ public abstract class AbstractContractMultipartUploaderTest extends
|
|
origDigest.update(payload);
|
|
origDigest.update(payload);
|
|
}
|
|
}
|
|
for (int i = payloadCount; i > 0; --i) {
|
|
for (int i = payloadCount; i > 0; --i) {
|
|
- byte[] payload = generatePayload(i);
|
|
|
|
- InputStream is = new ByteArrayInputStream(payload);
|
|
|
|
- PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
|
|
|
|
- payload.length);
|
|
|
|
- partHandles.add(Pair.of(i, partHandle));
|
|
|
|
|
|
+ partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null));
|
|
}
|
|
}
|
|
- completeUpload(file, mpu, uploadHandle, partHandles, origDigest,
|
|
|
|
|
|
+ completeUpload(file, uploadHandle, partHandles, origDigest,
|
|
payloadCount * partSizeInBytes());
|
|
payloadCount * partSizeInBytes());
|
|
}
|
|
}
|
|
|
|
|
|
@@ -222,25 +458,19 @@ public abstract class AbstractContractMultipartUploaderTest extends
|
|
public void testMultipartUploadReverseOrderNonContiguousPartNumbers()
|
|
public void testMultipartUploadReverseOrderNonContiguousPartNumbers()
|
|
throws Exception {
|
|
throws Exception {
|
|
describe("Upload in reverse order and the part numbers are not contiguous");
|
|
describe("Upload in reverse order and the part numbers are not contiguous");
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
- Path file = path("testMultipartUploadReverseOrderNonContiguousPartNumbers");
|
|
|
|
- MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
|
|
|
|
- UploadHandle uploadHandle = mpu.initialize(file);
|
|
|
|
- List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
|
|
|
|
|
|
+ Path file = methodPath();
|
|
|
|
+ UploadHandle uploadHandle = initializeUpload(file);
|
|
MessageDigest origDigest = DigestUtils.getMd5Digest();
|
|
MessageDigest origDigest = DigestUtils.getMd5Digest();
|
|
int payloadCount = 2 * getTestPayloadCount();
|
|
int payloadCount = 2 * getTestPayloadCount();
|
|
for (int i = 2; i <= payloadCount; i += 2) {
|
|
for (int i = 2; i <= payloadCount; i += 2) {
|
|
byte[] payload = generatePayload(i);
|
|
byte[] payload = generatePayload(i);
|
|
origDigest.update(payload);
|
|
origDigest.update(payload);
|
|
}
|
|
}
|
|
|
|
+ Map<Integer, PartHandle> partHandles = new HashMap<>();
|
|
for (int i = payloadCount; i > 0; i -= 2) {
|
|
for (int i = payloadCount; i > 0; i -= 2) {
|
|
- byte[] payload = generatePayload(i);
|
|
|
|
- InputStream is = new ByteArrayInputStream(payload);
|
|
|
|
- PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
|
|
|
|
- payload.length);
|
|
|
|
- partHandles.add(Pair.of(i, partHandle));
|
|
|
|
|
|
+ partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null));
|
|
}
|
|
}
|
|
- completeUpload(file, mpu, uploadHandle, partHandles, origDigest,
|
|
|
|
|
|
+ completeUpload(file, uploadHandle, partHandles, origDigest,
|
|
getTestPayloadCount() * partSizeInBytes());
|
|
getTestPayloadCount() * partSizeInBytes());
|
|
}
|
|
}
|
|
|
|
|
|
@@ -251,19 +481,14 @@ public abstract class AbstractContractMultipartUploaderTest extends
|
|
@Test
|
|
@Test
|
|
public void testMultipartUploadAbort() throws Exception {
|
|
public void testMultipartUploadAbort() throws Exception {
|
|
describe("Upload and then abort it before completing");
|
|
describe("Upload and then abort it before completing");
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
- Path file = path("testMultipartUploadAbort");
|
|
|
|
- MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
|
|
|
|
- UploadHandle uploadHandle = mpu.initialize(file);
|
|
|
|
- List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
|
|
|
|
- for (int i = 20; i >= 10; --i) {
|
|
|
|
- byte[] payload = generatePayload(i);
|
|
|
|
- InputStream is = new ByteArrayInputStream(payload);
|
|
|
|
- PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
|
|
|
|
- payload.length);
|
|
|
|
- partHandles.add(Pair.of(i, partHandle));
|
|
|
|
|
|
+ Path file = methodPath();
|
|
|
|
+ UploadHandle uploadHandle = initializeUpload(file);
|
|
|
|
+ int end = 10;
|
|
|
|
+ Map<Integer, PartHandle> partHandles = new HashMap<>();
|
|
|
|
+ for (int i = 12; i > 10; i--) {
|
|
|
|
+ partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null));
|
|
}
|
|
}
|
|
- mpu.abort(file, uploadHandle);
|
|
|
|
|
|
+ abortUpload(file, uploadHandle);
|
|
|
|
|
|
String contents = "ThisIsPart49\n";
|
|
String contents = "ThisIsPart49\n";
|
|
int len = contents.getBytes(Charsets.UTF_8).length;
|
|
int len = contents.getBytes(Charsets.UTF_8).length;
|
|
@@ -275,6 +500,15 @@ public abstract class AbstractContractMultipartUploaderTest extends
|
|
() -> mpu.complete(file, partHandles, uploadHandle));
|
|
() -> mpu.complete(file, partHandles, uploadHandle));
|
|
|
|
|
|
assertPathDoesNotExist("Uploaded file should not exist", file);
|
|
assertPathDoesNotExist("Uploaded file should not exist", file);
|
|
|
|
+
|
|
|
|
+ // A second abort should be an FileNotFoundException if the UploadHandle is
|
|
|
|
+ // consumed by finalization operations (complete, abort).
|
|
|
|
+ if (finalizeConsumesUploadIdImmediately()) {
|
|
|
|
+ intercept(FileNotFoundException.class,
|
|
|
|
+ () -> abortUpload(file, uploadHandle));
|
|
|
|
+ } else {
|
|
|
|
+ abortUpload(file, uploadHandle);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -282,13 +516,23 @@ public abstract class AbstractContractMultipartUploaderTest extends
|
|
*/
|
|
*/
|
|
@Test
|
|
@Test
|
|
public void testAbortUnknownUpload() throws Exception {
|
|
public void testAbortUnknownUpload() throws Exception {
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
- Path file = path("testAbortUnknownUpload");
|
|
|
|
- MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
|
|
|
|
|
|
+ Path file = methodPath();
|
|
ByteBuffer byteBuffer = ByteBuffer.wrap(
|
|
ByteBuffer byteBuffer = ByteBuffer.wrap(
|
|
"invalid-handle".getBytes(Charsets.UTF_8));
|
|
"invalid-handle".getBytes(Charsets.UTF_8));
|
|
UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer);
|
|
UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer);
|
|
- intercept(FileNotFoundException.class, () -> mpu.abort(file, uploadHandle));
|
|
|
|
|
|
+ intercept(FileNotFoundException.class,
|
|
|
|
+ () -> abortUpload(file, uploadHandle));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Trying to abort with a handle of size 0 must fail.
|
|
|
|
+ */
|
|
|
|
+ @Test
|
|
|
|
+ public void testAbortEmptyUpload() throws Exception {
|
|
|
|
+ describe("initialize upload and abort before uploading data");
|
|
|
|
+ Path file = methodPath();
|
|
|
|
+ abortUpload(file, initializeUpload(file));
|
|
|
|
+ assertPathDoesNotExist("Uploaded file should not exist", file);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -296,13 +540,10 @@ public abstract class AbstractContractMultipartUploaderTest extends
|
|
*/
|
|
*/
|
|
@Test
|
|
@Test
|
|
public void testAbortEmptyUploadHandle() throws Exception {
|
|
public void testAbortEmptyUploadHandle() throws Exception {
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
- Path file = path("testAbortEmptyUpload");
|
|
|
|
- MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
|
|
|
|
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[0]);
|
|
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[0]);
|
|
UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer);
|
|
UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer);
|
|
intercept(IllegalArgumentException.class,
|
|
intercept(IllegalArgumentException.class,
|
|
- () -> mpu.abort(file, uploadHandle));
|
|
|
|
|
|
+ () -> abortUpload(methodPath(), uploadHandle));
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -311,26 +552,20 @@ public abstract class AbstractContractMultipartUploaderTest extends
|
|
@Test
|
|
@Test
|
|
public void testCompleteEmptyUpload() throws Exception {
|
|
public void testCompleteEmptyUpload() throws Exception {
|
|
describe("Expect an empty MPU to fail, but still be abortable");
|
|
describe("Expect an empty MPU to fail, but still be abortable");
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
- Path dest = path("testCompleteEmptyUpload");
|
|
|
|
- MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
|
|
|
|
- UploadHandle handle = mpu.initialize(dest);
|
|
|
|
- intercept(IOException.class,
|
|
|
|
- () -> mpu.complete(dest, new ArrayList<>(), handle));
|
|
|
|
- mpu.abort(dest, handle);
|
|
|
|
|
|
+ Path dest = methodPath();
|
|
|
|
+ UploadHandle handle = initializeUpload(dest);
|
|
|
|
+ intercept(IllegalArgumentException.class,
|
|
|
|
+ () -> mpu.complete(dest, new HashMap<>(), handle));
|
|
|
|
+ abortUpload(dest, handle);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* When we pass empty uploadID, putPart throws IllegalArgumentException.
|
|
* When we pass empty uploadID, putPart throws IllegalArgumentException.
|
|
- * @throws Exception
|
|
|
|
*/
|
|
*/
|
|
@Test
|
|
@Test
|
|
public void testPutPartEmptyUploadID() throws Exception {
|
|
public void testPutPartEmptyUploadID() throws Exception {
|
|
describe("Expect IllegalArgumentException when putPart uploadID is empty");
|
|
describe("Expect IllegalArgumentException when putPart uploadID is empty");
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
- Path dest = path("testCompleteEmptyUpload");
|
|
|
|
- MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
|
|
|
|
- mpu.initialize(dest);
|
|
|
|
|
|
+ Path dest = methodPath();
|
|
UploadHandle emptyHandle =
|
|
UploadHandle emptyHandle =
|
|
BBUploadHandle.from(ByteBuffer.wrap(new byte[0]));
|
|
BBUploadHandle.from(ByteBuffer.wrap(new byte[0]));
|
|
byte[] payload = generatePayload(1);
|
|
byte[] payload = generatePayload(1);
|
|
@@ -341,25 +576,123 @@ public abstract class AbstractContractMultipartUploaderTest extends
|
|
|
|
|
|
/**
|
|
/**
|
|
* When we pass empty uploadID, complete throws IllegalArgumentException.
|
|
* When we pass empty uploadID, complete throws IllegalArgumentException.
|
|
- * @throws Exception
|
|
|
|
*/
|
|
*/
|
|
@Test
|
|
@Test
|
|
public void testCompleteEmptyUploadID() throws Exception {
|
|
public void testCompleteEmptyUploadID() throws Exception {
|
|
describe("Expect IllegalArgumentException when complete uploadID is empty");
|
|
describe("Expect IllegalArgumentException when complete uploadID is empty");
|
|
- FileSystem fs = getFileSystem();
|
|
|
|
- Path dest = path("testCompleteEmptyUpload");
|
|
|
|
- MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
|
|
|
|
- UploadHandle realHandle = mpu.initialize(dest);
|
|
|
|
|
|
+ Path dest = methodPath();
|
|
|
|
+ UploadHandle realHandle = initializeUpload(dest);
|
|
UploadHandle emptyHandle =
|
|
UploadHandle emptyHandle =
|
|
BBUploadHandle.from(ByteBuffer.wrap(new byte[0]));
|
|
BBUploadHandle.from(ByteBuffer.wrap(new byte[0]));
|
|
- List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
|
|
|
|
- byte[] payload = generatePayload(1);
|
|
|
|
- InputStream is = new ByteArrayInputStream(payload);
|
|
|
|
- PartHandle partHandle = mpu.putPart(dest, is, 1, realHandle,
|
|
|
|
- payload.length);
|
|
|
|
- partHandles.add(Pair.of(1, partHandle));
|
|
|
|
|
|
+ Map<Integer, PartHandle> partHandles = new HashMap<>();
|
|
|
|
+ PartHandle partHandle = putPart(dest, realHandle, 1,
|
|
|
|
+ generatePayload(1, SMALL_FILE));
|
|
|
|
+ partHandles.put(1, partHandle);
|
|
|
|
|
|
intercept(IllegalArgumentException.class,
|
|
intercept(IllegalArgumentException.class,
|
|
() -> mpu.complete(dest, partHandles, emptyHandle));
|
|
() -> mpu.complete(dest, partHandles, emptyHandle));
|
|
|
|
+
|
|
|
|
+ // and, while things are setup, attempt to complete with
|
|
|
|
+ // a part index of 0
|
|
|
|
+ partHandles.clear();
|
|
|
|
+ partHandles.put(0, partHandle);
|
|
|
|
+ intercept(IllegalArgumentException.class,
|
|
|
|
+ () -> mpu.complete(dest, partHandles, realHandle));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Assert that upon completion, a directory in the way of the file will
|
|
|
|
+ * result in a failure. This test only applies to backing stores with a
|
|
|
|
+ * concept of directories.
|
|
|
|
+ * @throws Exception failure
|
|
|
|
+ */
|
|
|
|
+ @Test
|
|
|
|
+ public void testDirectoryInTheWay() throws Exception {
|
|
|
|
+ FileSystem fs = getFileSystem();
|
|
|
|
+ Path file = methodPath();
|
|
|
|
+ UploadHandle uploadHandle = initializeUpload(file);
|
|
|
|
+ Map<Integer, PartHandle> partHandles = new HashMap<>();
|
|
|
|
+ int size = SMALL_FILE;
|
|
|
|
+ PartHandle partHandle = putPart(file, uploadHandle, 1,
|
|
|
|
+ generatePayload(1, size));
|
|
|
|
+ partHandles.put(1, partHandle);
|
|
|
|
+
|
|
|
|
+ fs.mkdirs(file);
|
|
|
|
+ intercept(IOException.class,
|
|
|
|
+ () -> completeUpload(file, uploadHandle, partHandles, null,
|
|
|
|
+ size));
|
|
|
|
+ // abort should still work
|
|
|
|
+ abortUpload(file, uploadHandle);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testConcurrentUploads() throws Throwable {
|
|
|
|
+
|
|
|
|
+ // if the FS doesn't support concurrent uploads, this test is
|
|
|
|
+ // required to fail during the second initialization.
|
|
|
|
+ final boolean concurrent = supportsConcurrentUploadsToSamePath();
|
|
|
|
+
|
|
|
|
+ describe("testing concurrent uploads, MPU support for this is "
|
|
|
|
+ + concurrent);
|
|
|
|
+ final FileSystem fs = getFileSystem();
|
|
|
|
+ final Path file = methodPath();
|
|
|
|
+ final int size1 = SMALL_FILE;
|
|
|
|
+ final int partId1 = 1;
|
|
|
|
+ final byte[] payload1 = generatePayload(partId1, size1);
|
|
|
|
+ final MessageDigest digest1 = DigestUtils.getMd5Digest();
|
|
|
|
+ digest1.update(payload1);
|
|
|
|
+ final UploadHandle upload1 = initializeUpload(file);
|
|
|
|
+ final Map<Integer, PartHandle> partHandles1 = new HashMap<>();
|
|
|
|
+
|
|
|
|
+ // initiate part 2
|
|
|
|
+ // by using a different size, it's straightforward to see which
|
|
|
|
+ // version is visible, before reading/digesting the contents
|
|
|
|
+ final int size2 = size1 * 2;
|
|
|
|
+ final int partId2 = 2;
|
|
|
|
+ final byte[] payload2 = generatePayload(partId1, size2);
|
|
|
|
+ final MessageDigest digest2 = DigestUtils.getMd5Digest();
|
|
|
|
+ digest2.update(payload2);
|
|
|
|
+
|
|
|
|
+ final UploadHandle upload2;
|
|
|
|
+ try {
|
|
|
|
+ upload2 = initializeUpload(file);
|
|
|
|
+ Assume.assumeTrue(
|
|
|
|
+ "The Filesystem is unexpectedly supporting concurrent uploads",
|
|
|
|
+ concurrent);
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ if (!concurrent) {
|
|
|
|
+ // this is expected, so end the test
|
|
|
|
+ LOG.debug("Expected exception raised on concurrent uploads {}", e);
|
|
|
|
+ return;
|
|
|
|
+ } else {
|
|
|
|
+ throw e;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ final Map<Integer, PartHandle> partHandles2 = new HashMap<>();
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ assertNotEquals("Upload handles match", upload1, upload2);
|
|
|
|
+
|
|
|
|
+ // put part 1
|
|
|
|
+ partHandles1.put(partId1, putPart(file, upload1, partId1, payload1));
|
|
|
|
+
|
|
|
|
+ // put part2
|
|
|
|
+ partHandles2.put(partId2, putPart(file, upload2, partId2, payload2));
|
|
|
|
+
|
|
|
|
+ // complete part u1. expect its size and digest to
|
|
|
|
+ // be as expected.
|
|
|
|
+ completeUpload(file, upload1, partHandles1, digest1, size1);
|
|
|
|
+
|
|
|
|
+ // now upload part 2.
|
|
|
|
+ complete(file, upload2, partHandles2);
|
|
|
|
+ // and await the visible length to match
|
|
|
|
+ eventually(timeToBecomeConsistentMillis(), 500,
|
|
|
|
+ () -> {
|
|
|
|
+ FileStatus status = fs.getFileStatus(file);
|
|
|
|
+ assertEquals("File length in " + status,
|
|
|
|
+ size2, status.getLen());
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ verifyContents(file, digest2, size2);
|
|
}
|
|
}
|
|
}
|
|
}
|