|
@@ -31,17 +31,14 @@ import java.util.Set;
|
|
import java.util.UUID;
|
|
import java.util.UUID;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
+import org.junit.jupiter.params.ParameterizedTest;
|
|
|
|
+import org.junit.jupiter.params.provider.MethodSource;
|
|
import software.amazon.awssdk.services.s3.S3Client;
|
|
import software.amazon.awssdk.services.s3.S3Client;
|
|
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
|
|
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
|
|
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
|
|
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
|
|
|
|
|
|
import org.apache.hadoop.util.Sets;
|
|
import org.apache.hadoop.util.Sets;
|
|
-import org.assertj.core.api.Assertions;
|
|
|
|
-import org.junit.After;
|
|
|
|
-import org.junit.Before;
|
|
|
|
-import org.junit.Test;
|
|
|
|
-import org.junit.runner.RunWith;
|
|
|
|
-import org.junit.runners.Parameterized;
|
|
|
|
|
|
+import org.junit.jupiter.api.AfterEach;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
@@ -78,12 +75,12 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
|
|
import static org.apache.hadoop.fs.s3a.commit.staging.Paths.*;
|
|
import static org.apache.hadoop.fs.s3a.commit.staging.Paths.*;
|
|
import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.*;
|
|
import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.*;
|
|
import static org.apache.hadoop.test.LambdaTestUtils.*;
|
|
import static org.apache.hadoop.test.LambdaTestUtils.*;
|
|
|
|
+import static org.assertj.core.api.Assertions.assertThat;
|
|
|
|
|
|
/**
|
|
/**
|
|
* The main unit test suite of the staging committer.
|
|
* The main unit test suite of the staging committer.
|
|
* Parameterized on thread count and unique filename policy.
|
|
* Parameterized on thread count and unique filename policy.
|
|
*/
|
|
*/
|
|
-@RunWith(Parameterized.class)
|
|
|
|
public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
|
|
public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
|
|
|
|
|
|
private static final JobID JOB_ID = new JobID("job", 1);
|
|
private static final JobID JOB_ID = new JobID("job", 1);
|
|
@@ -97,8 +94,8 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
|
|
private static final Logger LOG =
|
|
private static final Logger LOG =
|
|
LoggerFactory.getLogger(TestStagingCommitter.class);
|
|
LoggerFactory.getLogger(TestStagingCommitter.class);
|
|
|
|
|
|
- private final int numThreads;
|
|
|
|
- private final boolean uniqueFilenames;
|
|
|
|
|
|
+ private int numThreads;
|
|
|
|
+ private boolean uniqueFilenames;
|
|
private JobContext job = null;
|
|
private JobContext job = null;
|
|
private TaskAttemptContext tac = null;
|
|
private TaskAttemptContext tac = null;
|
|
private Configuration conf = null;
|
|
private Configuration conf = null;
|
|
@@ -129,7 +126,6 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
|
|
* whether or not filenames are unique.
|
|
* whether or not filenames are unique.
|
|
* @return a list of parameter tuples.
|
|
* @return a list of parameter tuples.
|
|
*/
|
|
*/
|
|
- @Parameterized.Parameters(name="threads-{0}-unique-{1}")
|
|
|
|
public static Collection<Object[]> params() {
|
|
public static Collection<Object[]> params() {
|
|
return Arrays.asList(new Object[][] {
|
|
return Arrays.asList(new Object[][] {
|
|
{0, false},
|
|
{0, false},
|
|
@@ -138,12 +134,12 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
|
|
- public TestStagingCommitter(int numThreads, boolean uniqueFilenames) {
|
|
|
|
- this.numThreads = numThreads;
|
|
|
|
- this.uniqueFilenames = uniqueFilenames;
|
|
|
|
|
|
+ public void initTestStagingCommitter(int pNumThreads, boolean pNniqueFilenames) throws Exception {
|
|
|
|
+ this.numThreads = pNumThreads;
|
|
|
|
+ this.uniqueFilenames = pNniqueFilenames;
|
|
|
|
+ setupCommitter();
|
|
}
|
|
}
|
|
|
|
|
|
- @Before
|
|
|
|
public void setupCommitter() throws Exception {
|
|
public void setupCommitter() throws Exception {
|
|
JobConf jobConf = getConfiguration();
|
|
JobConf jobConf = getConfiguration();
|
|
jobConf.setInt(FS_S3A_COMMITTER_THREADS, numThreads);
|
|
jobConf.setInt(FS_S3A_COMMITTER_THREADS, numThreads);
|
|
@@ -187,7 +183,7 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
|
|
Paths.resetTempFolderCache();
|
|
Paths.resetTempFolderCache();
|
|
}
|
|
}
|
|
|
|
|
|
- @After
|
|
|
|
|
|
+ @AfterEach
|
|
public void cleanup() {
|
|
public void cleanup() {
|
|
try {
|
|
try {
|
|
if (tmpDir != null) {
|
|
if (tmpDir != null) {
|
|
@@ -202,35 +198,40 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
|
|
return new Configuration(false);
|
|
return new Configuration(false);
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testMockFSclientWiredUp() throws Throwable {
|
|
|
|
|
|
+ @ParameterizedTest(name = "threads-{0}-unique-{1}")
|
|
|
|
+ @MethodSource("params")
|
|
|
|
+ public void testMockFSclientWiredUp(int pNumThreads, boolean pNniqueFilenames) throws Throwable {
|
|
|
|
+ initTestStagingCommitter(pNumThreads, pNniqueFilenames);
|
|
final S3Client client = mockFS.getS3AInternals().getAmazonS3Client("test");
|
|
final S3Client client = mockFS.getS3AInternals().getAmazonS3Client("test");
|
|
- Assertions.assertThat(client)
|
|
|
|
|
|
+ assertThat(client)
|
|
.describedAs("S3Client from FS")
|
|
.describedAs("S3Client from FS")
|
|
.isNotNull()
|
|
.isNotNull()
|
|
.isSameAs(mockClient);
|
|
.isSameAs(mockClient);
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testUUIDPropagation() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "threads-{0}-unique-{1}")
|
|
|
|
+ @MethodSource("params")
|
|
|
|
+ public void testUUIDPropagation(int pNumThreads, boolean pNniqueFilenames) throws Exception {
|
|
|
|
+ initTestStagingCommitter(pNumThreads, pNniqueFilenames);
|
|
Configuration config = newConfig();
|
|
Configuration config = newConfig();
|
|
String uuid = uuid();
|
|
String uuid = uuid();
|
|
config.set(SPARK_WRITE_UUID, uuid);
|
|
config.set(SPARK_WRITE_UUID, uuid);
|
|
config.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
|
|
config.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
|
|
Pair<String, AbstractS3ACommitter.JobUUIDSource> t3 = AbstractS3ACommitter
|
|
Pair<String, AbstractS3ACommitter.JobUUIDSource> t3 = AbstractS3ACommitter
|
|
.buildJobUUID(config, JOB_ID);
|
|
.buildJobUUID(config, JOB_ID);
|
|
- assertEquals("Job UUID", uuid, t3.getLeft());
|
|
|
|
- assertEquals("Job UUID source: " + t3,
|
|
|
|
- AbstractS3ACommitter.JobUUIDSource.SparkWriteUUID,
|
|
|
|
- t3.getRight());
|
|
|
|
|
|
+ assertEquals(uuid, t3.getLeft(), "Job UUID");
|
|
|
|
+ assertEquals(AbstractS3ACommitter.JobUUIDSource.SparkWriteUUID,
|
|
|
|
+ t3.getRight(), "Job UUID source: " + t3);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* If the Spark UUID is required, then binding will fail
|
|
* If the Spark UUID is required, then binding will fail
|
|
* if a UUID did not get passed in.
|
|
* if a UUID did not get passed in.
|
|
*/
|
|
*/
|
|
- @Test
|
|
|
|
- public void testUUIDValidation() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "threads-{0}-unique-{1}")
|
|
|
|
+ @MethodSource("params")
|
|
|
|
+ public void testUUIDValidation(int pNumThreads, boolean pNniqueFilenames) throws Exception {
|
|
|
|
+ initTestStagingCommitter(pNumThreads, pNniqueFilenames);
|
|
Configuration config = newConfig();
|
|
Configuration config = newConfig();
|
|
config.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
|
|
config.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
|
|
intercept(PathCommitException.class, E_NO_SPARK_UUID, () ->
|
|
intercept(PathCommitException.class, E_NO_SPARK_UUID, () ->
|
|
@@ -240,8 +241,10 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
|
|
/**
|
|
/**
|
|
* Validate ordering of UUID retrieval.
|
|
* Validate ordering of UUID retrieval.
|
|
*/
|
|
*/
|
|
- @Test
|
|
|
|
- public void testUUIDLoadOrdering() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "threads-{0}-unique-{1}")
|
|
|
|
+ @MethodSource("params")
|
|
|
|
+ public void testUUIDLoadOrdering(int pNumThreads, boolean pNniqueFilenames) throws Exception {
|
|
|
|
+ initTestStagingCommitter(pNumThreads, pNniqueFilenames);
|
|
Configuration config = newConfig();
|
|
Configuration config = newConfig();
|
|
config.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
|
|
config.setBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, true);
|
|
String uuid = uuid();
|
|
String uuid = uuid();
|
|
@@ -250,24 +253,24 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
|
|
config.set(SPARK_WRITE_UUID, "something");
|
|
config.set(SPARK_WRITE_UUID, "something");
|
|
Pair<String, AbstractS3ACommitter.JobUUIDSource> t3 = AbstractS3ACommitter
|
|
Pair<String, AbstractS3ACommitter.JobUUIDSource> t3 = AbstractS3ACommitter
|
|
.buildJobUUID(config, JOB_ID);
|
|
.buildJobUUID(config, JOB_ID);
|
|
- assertEquals("Job UUID", uuid, t3.getLeft());
|
|
|
|
- assertEquals("Job UUID source: " + t3,
|
|
|
|
- AbstractS3ACommitter.JobUUIDSource.CommitterUUIDProperty,
|
|
|
|
- t3.getRight());
|
|
|
|
|
|
+ assertEquals(uuid, t3.getLeft(), "Job UUID");
|
|
|
|
+ assertEquals(AbstractS3ACommitter.JobUUIDSource.CommitterUUIDProperty,
|
|
|
|
+ t3.getRight(), "Job UUID source: " + t3);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* Verify that unless the config enables self-generation, JobIDs
|
|
* Verify that unless the config enables self-generation, JobIDs
|
|
* are used.
|
|
* are used.
|
|
*/
|
|
*/
|
|
- @Test
|
|
|
|
- public void testJobIDIsUUID() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "threads-{0}-unique-{1}")
|
|
|
|
+ @MethodSource("params")
|
|
|
|
+ public void testJobIDIsUUID(int pNumThreads, boolean pNniqueFilenames) throws Exception {
|
|
|
|
+ initTestStagingCommitter(pNumThreads, pNniqueFilenames);
|
|
Configuration config = newConfig();
|
|
Configuration config = newConfig();
|
|
Pair<String, AbstractS3ACommitter.JobUUIDSource> t3 = AbstractS3ACommitter
|
|
Pair<String, AbstractS3ACommitter.JobUUIDSource> t3 = AbstractS3ACommitter
|
|
.buildJobUUID(config, JOB_ID);
|
|
.buildJobUUID(config, JOB_ID);
|
|
- assertEquals("Job UUID source: " + t3,
|
|
|
|
- AbstractS3ACommitter.JobUUIDSource.JobID,
|
|
|
|
- t3.getRight());
|
|
|
|
|
|
+ assertEquals(AbstractS3ACommitter.JobUUIDSource.JobID,
|
|
|
|
+ t3.getRight(), "Job UUID source: " + t3);
|
|
// parse it as a JobID
|
|
// parse it as a JobID
|
|
JobID.forName(t3.getLeft());
|
|
JobID.forName(t3.getLeft());
|
|
}
|
|
}
|
|
@@ -276,15 +279,16 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
|
|
* Verify self-generated UUIDs are supported when enabled,
|
|
* Verify self-generated UUIDs are supported when enabled,
|
|
* and come before JobID.
|
|
* and come before JobID.
|
|
*/
|
|
*/
|
|
- @Test
|
|
|
|
- public void testSelfGeneratedUUID() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "threads-{0}-unique-{1}")
|
|
|
|
+ @MethodSource("params")
|
|
|
|
+ public void testSelfGeneratedUUID(int pNumThreads, boolean pNniqueFilenames) throws Exception {
|
|
|
|
+ initTestStagingCommitter(pNumThreads, pNniqueFilenames);
|
|
Configuration config = newConfig();
|
|
Configuration config = newConfig();
|
|
config.setBoolean(FS_S3A_COMMITTER_GENERATE_UUID, true);
|
|
config.setBoolean(FS_S3A_COMMITTER_GENERATE_UUID, true);
|
|
Pair<String, AbstractS3ACommitter.JobUUIDSource> t3 = AbstractS3ACommitter
|
|
Pair<String, AbstractS3ACommitter.JobUUIDSource> t3 = AbstractS3ACommitter
|
|
.buildJobUUID(config, JOB_ID);
|
|
.buildJobUUID(config, JOB_ID);
|
|
- assertEquals("Job UUID source: " + t3,
|
|
|
|
- AbstractS3ACommitter.JobUUIDSource.GeneratedLocally,
|
|
|
|
- t3.getRight());
|
|
|
|
|
|
+ assertEquals(AbstractS3ACommitter.JobUUIDSource.GeneratedLocally,
|
|
|
|
+ t3.getRight(), "Job UUID source: " + t3);
|
|
// parse it
|
|
// parse it
|
|
UUID.fromString(t3.getLeft());
|
|
UUID.fromString(t3.getLeft());
|
|
}
|
|
}
|
|
@@ -308,21 +312,27 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
|
|
return UUID.randomUUID().toString();
|
|
return UUID.randomUUID().toString();
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testAttemptPathConstructionNoSchema() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "threads-{0}-unique-{1}")
|
|
|
|
+ @MethodSource("params")
|
|
|
|
+ public void testAttemptPathConstructionNoSchema(int pNumThreads, boolean pNniqueFilenames)
|
|
|
|
+ throws Exception {
|
|
|
|
+ initTestStagingCommitter(pNumThreads, pNniqueFilenames);
|
|
Configuration config = newConfig();
|
|
Configuration config = newConfig();
|
|
final String jobUUID = addUUID(config);
|
|
final String jobUUID = addUUID(config);
|
|
config.set(BUFFER_DIR, "/tmp/mr-local-0,/tmp/mr-local-1");
|
|
config.set(BUFFER_DIR, "/tmp/mr-local-0,/tmp/mr-local-1");
|
|
String commonPath = "file:/tmp/mr-local-";
|
|
String commonPath = "file:/tmp/mr-local-";
|
|
- Assertions.assertThat(getLocalTaskAttemptTempDir(config,
|
|
|
|
|
|
+ assertThat(getLocalTaskAttemptTempDir(config,
|
|
jobUUID, tac.getTaskAttemptID()).toString())
|
|
jobUUID, tac.getTaskAttemptID()).toString())
|
|
.describedAs("Missing scheme should produce local file paths")
|
|
.describedAs("Missing scheme should produce local file paths")
|
|
.startsWith(commonPath)
|
|
.startsWith(commonPath)
|
|
.contains(jobUUID);
|
|
.contains(jobUUID);
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testAttemptPathsDifferentByTaskAttempt() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "threads-{0}-unique-{1}")
|
|
|
|
+ @MethodSource("params")
|
|
|
|
+ public void testAttemptPathsDifferentByTaskAttempt(int pNumThreads, boolean pNniqueFilenames)
|
|
|
|
+ throws Exception {
|
|
|
|
+ initTestStagingCommitter(pNumThreads, pNniqueFilenames);
|
|
Configuration config = newConfig();
|
|
Configuration config = newConfig();
|
|
final String jobUUID = addUUID(config);
|
|
final String jobUUID = addUUID(config);
|
|
config.set(BUFFER_DIR, "file:/tmp/mr-local-0");
|
|
config.set(BUFFER_DIR, "file:/tmp/mr-local-0");
|
|
@@ -330,13 +340,16 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
|
|
jobUUID, AID).toString();
|
|
jobUUID, AID).toString();
|
|
String attempt2Path = getLocalTaskAttemptTempDir(config,
|
|
String attempt2Path = getLocalTaskAttemptTempDir(config,
|
|
jobUUID, AID2).toString();
|
|
jobUUID, AID2).toString();
|
|
- Assertions.assertThat(attempt2Path)
|
|
|
|
|
|
+ assertThat(attempt2Path)
|
|
.describedAs("local task attempt dir of TA1 must not match that of TA2")
|
|
.describedAs("local task attempt dir of TA1 must not match that of TA2")
|
|
.isNotEqualTo(attempt1Path);
|
|
.isNotEqualTo(attempt1Path);
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testAttemptPathConstructionWithSchema() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "threads-{0}-unique-{1}")
|
|
|
|
+ @MethodSource("params")
|
|
|
|
+ public void testAttemptPathConstructionWithSchema(int pNumThreads, boolean pNniqueFilenames)
|
|
|
|
+ throws Exception {
|
|
|
|
+ initTestStagingCommitter(pNumThreads, pNniqueFilenames);
|
|
Configuration config = newConfig();
|
|
Configuration config = newConfig();
|
|
final String jobUUID = addUUID(config);
|
|
final String jobUUID = addUUID(config);
|
|
String commonPath = "file:/tmp/mr-local-";
|
|
String commonPath = "file:/tmp/mr-local-";
|
|
@@ -344,58 +357,66 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
|
|
config.set(BUFFER_DIR,
|
|
config.set(BUFFER_DIR,
|
|
"file:/tmp/mr-local-0,file:/tmp/mr-local-1");
|
|
"file:/tmp/mr-local-0,file:/tmp/mr-local-1");
|
|
|
|
|
|
- Assertions.assertThat(
|
|
|
|
|
|
+ assertThat(
|
|
getLocalTaskAttemptTempDir(config,
|
|
getLocalTaskAttemptTempDir(config,
|
|
jobUUID, tac.getTaskAttemptID()).toString())
|
|
jobUUID, tac.getTaskAttemptID()).toString())
|
|
.describedAs("Path should be the same with file scheme")
|
|
.describedAs("Path should be the same with file scheme")
|
|
.startsWith(commonPath);
|
|
.startsWith(commonPath);
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testAttemptPathConstructionWrongSchema() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "threads-{0}-unique-{1}")
|
|
|
|
+ @MethodSource("params")
|
|
|
|
+ public void testAttemptPathConstructionWrongSchema(int pNumThreads,
|
|
|
|
+ boolean pNniqueFilenames) throws Exception {
|
|
|
|
+ initTestStagingCommitter(pNumThreads, pNniqueFilenames);
|
|
Configuration config = newConfig();
|
|
Configuration config = newConfig();
|
|
final String jobUUID = addUUID(config);
|
|
final String jobUUID = addUUID(config);
|
|
config.set(BUFFER_DIR,
|
|
config.set(BUFFER_DIR,
|
|
"hdfs://nn:8020/tmp/mr-local-0,hdfs://nn:8020/tmp/mr-local-1");
|
|
"hdfs://nn:8020/tmp/mr-local-0,hdfs://nn:8020/tmp/mr-local-1");
|
|
intercept(IllegalArgumentException.class, "Wrong FS",
|
|
intercept(IllegalArgumentException.class, "Wrong FS",
|
|
() -> getLocalTaskAttemptTempDir(config, jobUUID,
|
|
() -> getLocalTaskAttemptTempDir(config, jobUUID,
|
|
- tac.getTaskAttemptID()));
|
|
|
|
|
|
+ tac.getTaskAttemptID()));
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testCommitPathConstruction() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "threads-{0}-unique-{1}")
|
|
|
|
+ @MethodSource("params")
|
|
|
|
+ public void testCommitPathConstruction(int pNumThreads,
|
|
|
|
+ boolean pNniqueFilenames) throws Exception {
|
|
|
|
+ initTestStagingCommitter(pNumThreads, pNniqueFilenames);
|
|
Path committedTaskPath = committer.getCommittedTaskPath(tac);
|
|
Path committedTaskPath = committer.getCommittedTaskPath(tac);
|
|
- assertEquals("Path should be in HDFS: " + committedTaskPath,
|
|
|
|
- "hdfs", committedTaskPath.toUri().getScheme());
|
|
|
|
|
|
+ assertEquals("hdfs", committedTaskPath.toUri().getScheme(),
|
|
|
|
+ "Path should be in HDFS: " + committedTaskPath);
|
|
String ending = STAGING_UPLOADS + "/_temporary/0/task_job_0001_r_000002";
|
|
String ending = STAGING_UPLOADS + "/_temporary/0/task_job_0001_r_000002";
|
|
- assertTrue("Did not end with \"" + ending +"\" :" + committedTaskPath,
|
|
|
|
- committedTaskPath.toString().endsWith(ending));
|
|
|
|
|
|
+ assertTrue(committedTaskPath.toString().endsWith(ending),
|
|
|
|
+ "Did not end with \"" + ending +"\" :" + committedTaskPath);
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testSingleTaskCommit() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "threads-{0}-unique-{1}")
|
|
|
|
+ @MethodSource("params")
|
|
|
|
+ public void testSingleTaskCommit(int pNumThreads,
|
|
|
|
+ boolean pNniqueFilenames) throws Exception {
|
|
|
|
+ initTestStagingCommitter(pNumThreads, pNniqueFilenames);
|
|
Path file = new Path(commitTask(committer, tac, 1).iterator().next());
|
|
Path file = new Path(commitTask(committer, tac, 1).iterator().next());
|
|
|
|
|
|
List<String> uploads = results.getUploads();
|
|
List<String> uploads = results.getUploads();
|
|
- assertEquals("Should initiate one upload: " + results, 1, uploads.size());
|
|
|
|
|
|
+ assertEquals(1, uploads.size(), "Should initiate one upload: " + results);
|
|
|
|
|
|
Path committedPath = committer.getCommittedTaskPath(tac);
|
|
Path committedPath = committer.getCommittedTaskPath(tac);
|
|
FileSystem dfs = committedPath.getFileSystem(conf);
|
|
FileSystem dfs = committedPath.getFileSystem(conf);
|
|
|
|
|
|
- assertEquals("Should commit to HDFS: "+ committer, getDFS(), dfs);
|
|
|
|
|
|
+ assertEquals(getDFS(), dfs, "Should commit to HDFS: "+ committer);
|
|
|
|
|
|
FileStatus[] stats = dfs.listStatus(committedPath);
|
|
FileStatus[] stats = dfs.listStatus(committedPath);
|
|
- assertEquals("Should produce one commit file: " + results, 1, stats.length);
|
|
|
|
- assertEquals("Should name the commits file with the task ID: " + results,
|
|
|
|
- "task_job_0001_r_000002", stats[0].getPath().getName());
|
|
|
|
|
|
+ assertEquals(1, stats.length, "Should produce one commit file: " + results);
|
|
|
|
+ assertEquals("task_job_0001_r_000002", stats[0].getPath().getName(),
|
|
|
|
+ "Should name the commits file with the task ID: " + results);
|
|
|
|
|
|
PendingSet pending = PersistentCommitData.load(dfs, stats[0], PendingSet.serializer());
|
|
PendingSet pending = PersistentCommitData.load(dfs, stats[0], PendingSet.serializer());
|
|
- assertEquals("Should have one pending commit", 1, pending.size());
|
|
|
|
|
|
+ assertEquals(1, pending.size(), "Should have one pending commit");
|
|
SinglePendingCommit commit = pending.getCommits().get(0);
|
|
SinglePendingCommit commit = pending.getCommits().get(0);
|
|
- assertEquals("Should write to the correct bucket:" + results,
|
|
|
|
- BUCKET, commit.getBucket());
|
|
|
|
- assertEquals("Should write to the correct key: " + results,
|
|
|
|
- OUTPUT_PREFIX + "/" + file.getName(), commit.getDestinationKey());
|
|
|
|
|
|
+ assertEquals(BUCKET, commit.getBucket(), "Should write to the correct bucket:" + results);
|
|
|
|
+ assertEquals(OUTPUT_PREFIX + "/" + file.getName(), commit.getDestinationKey(),
|
|
|
|
+ "Should write to the correct key: " + results);
|
|
|
|
|
|
assertValidUpload(results.getTagsByUpload(), commit);
|
|
assertValidUpload(results.getTagsByUpload(), commit);
|
|
}
|
|
}
|
|
@@ -404,8 +425,11 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
|
|
* This originally verified that empty files weren't PUT. They are now.
|
|
* This originally verified that empty files weren't PUT. They are now.
|
|
* @throws Exception on a failure
|
|
* @throws Exception on a failure
|
|
*/
|
|
*/
|
|
- @Test
|
|
|
|
- public void testSingleTaskEmptyFileCommit() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "threads-{0}-unique-{1}")
|
|
|
|
+ @MethodSource("params")
|
|
|
|
+ public void testSingleTaskEmptyFileCommit(int pNumThreads,
|
|
|
|
+ boolean pNniqueFilenames) throws Exception {
|
|
|
|
+ initTestStagingCommitter(pNumThreads, pNniqueFilenames);
|
|
committer.setupTask(tac);
|
|
committer.setupTask(tac);
|
|
|
|
|
|
Path attemptPath = committer.getTaskAttemptPath(tac);
|
|
Path attemptPath = committer.getTaskAttemptPath(tac);
|
|
@@ -416,60 +440,65 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
|
|
committer.commitTask(tac);
|
|
committer.commitTask(tac);
|
|
|
|
|
|
List<String> uploads = results.getUploads();
|
|
List<String> uploads = results.getUploads();
|
|
- assertEquals("Should initiate one upload", 1, uploads.size());
|
|
|
|
|
|
+ assertEquals(1, uploads.size(), "Should initiate one upload");
|
|
|
|
|
|
Path committedPath = committer.getCommittedTaskPath(tac);
|
|
Path committedPath = committer.getCommittedTaskPath(tac);
|
|
FileSystem dfs = committedPath.getFileSystem(conf);
|
|
FileSystem dfs = committedPath.getFileSystem(conf);
|
|
|
|
|
|
- assertEquals("Should commit to HDFS", getDFS(), dfs);
|
|
|
|
|
|
+ assertEquals(getDFS(), dfs, "Should commit to HDFS");
|
|
|
|
|
|
assertIsFile(dfs, committedPath);
|
|
assertIsFile(dfs, committedPath);
|
|
FileStatus[] stats = dfs.listStatus(committedPath);
|
|
FileStatus[] stats = dfs.listStatus(committedPath);
|
|
- assertEquals("Should produce one commit file", 1, stats.length);
|
|
|
|
- assertEquals("Should name the commits file with the task ID",
|
|
|
|
- "task_job_0001_r_000002", stats[0].getPath().getName());
|
|
|
|
|
|
+ assertEquals(1, stats.length, "Should produce one commit file");
|
|
|
|
+ assertEquals("task_job_0001_r_000002", stats[0].getPath().getName(),
|
|
|
|
+ "Should name the commits file with the task ID");
|
|
|
|
|
|
PendingSet pending = PersistentCommitData.load(dfs, stats[0], PendingSet.serializer());
|
|
PendingSet pending = PersistentCommitData.load(dfs, stats[0], PendingSet.serializer());
|
|
- assertEquals("Should have one pending commit", 1, pending.size());
|
|
|
|
|
|
+ assertEquals(1, pending.size(), "Should have one pending commit");
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testSingleTaskMultiFileCommit() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "threads-{0}-unique-{1}")
|
|
|
|
+ @MethodSource("params")
|
|
|
|
+ public void testSingleTaskMultiFileCommit(int pNumThreads,
|
|
|
|
+ boolean pNniqueFilenames) throws Exception {
|
|
|
|
+ initTestStagingCommitter(pNumThreads, pNniqueFilenames);
|
|
int numFiles = 3;
|
|
int numFiles = 3;
|
|
Set<String> files = commitTask(committer, tac, numFiles);
|
|
Set<String> files = commitTask(committer, tac, numFiles);
|
|
|
|
|
|
List<String> uploads = results.getUploads();
|
|
List<String> uploads = results.getUploads();
|
|
- assertEquals("Should initiate multiple uploads", numFiles, uploads.size());
|
|
|
|
|
|
+ assertEquals(numFiles, uploads.size(), "Should initiate multiple uploads");
|
|
|
|
|
|
Path committedPath = committer.getCommittedTaskPath(tac);
|
|
Path committedPath = committer.getCommittedTaskPath(tac);
|
|
FileSystem dfs = committedPath.getFileSystem(conf);
|
|
FileSystem dfs = committedPath.getFileSystem(conf);
|
|
|
|
|
|
- assertEquals("Should commit to HDFS", getDFS(), dfs);
|
|
|
|
|
|
+ assertEquals(getDFS(), dfs, "Should commit to HDFS");
|
|
assertIsFile(dfs, committedPath);
|
|
assertIsFile(dfs, committedPath);
|
|
FileStatus[] stats = dfs.listStatus(committedPath);
|
|
FileStatus[] stats = dfs.listStatus(committedPath);
|
|
- assertEquals("Should produce one commit file", 1, stats.length);
|
|
|
|
- assertEquals("Should name the commits file with the task ID",
|
|
|
|
- "task_job_0001_r_000002", stats[0].getPath().getName());
|
|
|
|
|
|
+ assertEquals(1, stats.length, "Should produce one commit file");
|
|
|
|
+ assertEquals("task_job_0001_r_000002", stats[0].getPath().getName(),
|
|
|
|
+ "Should name the commits file with the task ID");
|
|
|
|
|
|
List<SinglePendingCommit> pending =
|
|
List<SinglePendingCommit> pending =
|
|
PersistentCommitData.load(dfs, stats[0], PendingSet.serializer()).getCommits();
|
|
PersistentCommitData.load(dfs, stats[0], PendingSet.serializer()).getCommits();
|
|
- assertEquals("Should have correct number of pending commits",
|
|
|
|
- files.size(), pending.size());
|
|
|
|
|
|
+ assertEquals(files.size(), pending.size(),
|
|
|
|
+ "Should have correct number of pending commits");
|
|
|
|
|
|
Set<String> keys = Sets.newHashSet();
|
|
Set<String> keys = Sets.newHashSet();
|
|
for (SinglePendingCommit commit : pending) {
|
|
for (SinglePendingCommit commit : pending) {
|
|
- assertEquals("Should write to the correct bucket: " + commit,
|
|
|
|
- BUCKET, commit.getBucket());
|
|
|
|
|
|
+ assertEquals(BUCKET, commit.getBucket(),
|
|
|
|
+ "Should write to the correct bucket: " + commit);
|
|
assertValidUpload(results.getTagsByUpload(), commit);
|
|
assertValidUpload(results.getTagsByUpload(), commit);
|
|
keys.add(commit.getDestinationKey());
|
|
keys.add(commit.getDestinationKey());
|
|
}
|
|
}
|
|
|
|
|
|
- assertEquals("Should write to the correct key",
|
|
|
|
- files, keys);
|
|
|
|
|
|
+ assertEquals(files, keys, "Should write to the correct key");
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testTaskInitializeFailure() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "threads-{0}-unique-{1}")
|
|
|
|
+ @MethodSource("params")
|
|
|
|
+ public void testTaskInitializeFailure(int pNumThreads,
|
|
|
|
+ boolean pNniqueFilenames) throws Exception {
|
|
|
|
+ initTestStagingCommitter(pNumThreads, pNniqueFilenames);
|
|
committer.setupTask(tac);
|
|
committer.setupTask(tac);
|
|
|
|
|
|
errors.failOnInit(1);
|
|
errors.failOnInit(1);
|
|
@@ -487,18 +516,20 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
|
|
"Should fail during init",
|
|
"Should fail during init",
|
|
() -> committer.commitTask(tac));
|
|
() -> committer.commitTask(tac));
|
|
|
|
|
|
- assertEquals("Should have initialized one file upload",
|
|
|
|
- 1, results.getUploads().size());
|
|
|
|
- assertEquals("Should abort the upload",
|
|
|
|
- new HashSet<>(results.getUploads()),
|
|
|
|
- getAbortedIds(results.getAborts()));
|
|
|
|
|
|
+ assertEquals(1, results.getUploads().size(),
|
|
|
|
+ "Should have initialized one file upload");
|
|
|
|
+ assertEquals(new HashSet<>(results.getUploads()),
|
|
|
|
+ getAbortedIds(results.getAborts()), "Should abort the upload");
|
|
assertPathDoesNotExist(fs,
|
|
assertPathDoesNotExist(fs,
|
|
"Should remove the attempt path",
|
|
"Should remove the attempt path",
|
|
attemptPath);
|
|
attemptPath);
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testTaskSingleFileUploadFailure() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "threads-{0}-unique-{1}")
|
|
|
|
+ @MethodSource("params")
|
|
|
|
+ public void testTaskSingleFileUploadFailure(int pNumThreads,
|
|
|
|
+ boolean pNniqueFilenames) throws Exception {
|
|
|
|
+ initTestStagingCommitter(pNumThreads, pNniqueFilenames);
|
|
describe("Set up a single file upload to fail on upload 2");
|
|
describe("Set up a single file upload to fail on upload 2");
|
|
committer.setupTask(tac);
|
|
committer.setupTask(tac);
|
|
|
|
|
|
@@ -518,17 +549,19 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
|
|
return committer.toString();
|
|
return committer.toString();
|
|
});
|
|
});
|
|
|
|
|
|
- assertEquals("Should have attempted one file upload",
|
|
|
|
- 1, results.getUploads().size());
|
|
|
|
- assertEquals("Should abort the upload",
|
|
|
|
- results.getUploads().get(0),
|
|
|
|
- results.getAborts().get(0).uploadId());
|
|
|
|
|
|
+ assertEquals(1, results.getUploads().size(),
|
|
|
|
+ "Should have attempted one file upload");
|
|
|
|
+ assertEquals(results.getUploads().get(0),
|
|
|
|
+ results.getAborts().get(0).uploadId(), "Should abort the upload");
|
|
assertPathDoesNotExist(fs, "Should remove the attempt path",
|
|
assertPathDoesNotExist(fs, "Should remove the attempt path",
|
|
attemptPath);
|
|
attemptPath);
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testTaskMultiFileUploadFailure() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "threads-{0}-unique-{1}")
|
|
|
|
+ @MethodSource("params")
|
|
|
|
+ public void testTaskMultiFileUploadFailure(int pNumThreads,
|
|
|
|
+ boolean pNniqueFilenames) throws Exception {
|
|
|
|
+ initTestStagingCommitter(pNumThreads, pNniqueFilenames);
|
|
committer.setupTask(tac);
|
|
committer.setupTask(tac);
|
|
|
|
|
|
errors.failOnUpload(5);
|
|
errors.failOnUpload(5);
|
|
@@ -549,17 +582,19 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
|
|
return committer.toString();
|
|
return committer.toString();
|
|
});
|
|
});
|
|
|
|
|
|
- assertEquals("Should have attempted two file uploads",
|
|
|
|
- 2, results.getUploads().size());
|
|
|
|
- assertEquals("Should abort the upload",
|
|
|
|
- new HashSet<>(results.getUploads()),
|
|
|
|
- getAbortedIds(results.getAborts()));
|
|
|
|
|
|
+ assertEquals(2, results.getUploads().size(),
|
|
|
|
+ "Should have attempted two file uploads");
|
|
|
|
+ assertEquals(new HashSet<>(results.getUploads()),
|
|
|
|
+ getAbortedIds(results.getAborts()), "Should abort the upload");
|
|
assertPathDoesNotExist(fs, "Should remove the attempt path",
|
|
assertPathDoesNotExist(fs, "Should remove the attempt path",
|
|
attemptPath);
|
|
attemptPath);
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testTaskUploadAndAbortFailure() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "threads-{0}-unique-{1}")
|
|
|
|
+ @MethodSource("params")
|
|
|
|
+ public void testTaskUploadAndAbortFailure(int pNumThreads,
|
|
|
|
+ boolean pNniqueFilenames) throws Exception {
|
|
|
|
+ initTestStagingCommitter(pNumThreads, pNniqueFilenames);
|
|
committer.setupTask(tac);
|
|
committer.setupTask(tac);
|
|
|
|
|
|
errors.failOnUpload(5);
|
|
errors.failOnUpload(5);
|
|
@@ -581,16 +616,18 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
|
|
return committer.toString();
|
|
return committer.toString();
|
|
});
|
|
});
|
|
|
|
|
|
- assertEquals("Should have attempted two file uploads",
|
|
|
|
- 2, results.getUploads().size());
|
|
|
|
- assertEquals("Should not have succeeded with any aborts",
|
|
|
|
- new HashSet<>(),
|
|
|
|
- getAbortedIds(results.getAborts()));
|
|
|
|
|
|
+ assertEquals(2, results.getUploads().size(),
|
|
|
|
+ "Should have attempted two file uploads");
|
|
|
|
+ assertEquals(new HashSet<>(), getAbortedIds(results.getAborts()),
|
|
|
|
+ "Should not have succeeded with any aborts");
|
|
assertPathDoesNotExist(fs, "Should remove the attempt path", attemptPath);
|
|
assertPathDoesNotExist(fs, "Should remove the attempt path", attemptPath);
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testSingleTaskAbort() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "threads-{0}-unique-{1}")
|
|
|
|
+ @MethodSource("params")
|
|
|
|
+ public void testSingleTaskAbort(int pNumThreads,
|
|
|
|
+ boolean pNniqueFilenames) throws Exception {
|
|
|
|
+ initTestStagingCommitter(pNumThreads, pNniqueFilenames);
|
|
committer.setupTask(tac);
|
|
committer.setupTask(tac);
|
|
|
|
|
|
Path attemptPath = committer.getTaskAttemptPath(tac);
|
|
Path attemptPath = committer.getTaskAttemptPath(tac);
|
|
@@ -601,17 +638,19 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
|
|
|
|
|
|
committer.abortTask(tac);
|
|
committer.abortTask(tac);
|
|
|
|
|
|
- assertEquals("Should not upload anything",
|
|
|
|
- 0, results.getUploads().size());
|
|
|
|
- assertEquals("Should not upload anything",
|
|
|
|
- 0, results.getParts().size());
|
|
|
|
|
|
+ assertEquals(0, results.getUploads().size(),
|
|
|
|
+ "Should not upload anything");
|
|
|
|
+ assertEquals(0, results.getParts().size(), "Should not upload anything");
|
|
assertPathDoesNotExist(fs, "Should remove all attempt data", outPath);
|
|
assertPathDoesNotExist(fs, "Should remove all attempt data", outPath);
|
|
assertPathDoesNotExist(fs, "Should remove the attempt path", attemptPath);
|
|
assertPathDoesNotExist(fs, "Should remove the attempt path", attemptPath);
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testJobCommit() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "threads-{0}-unique-{1}")
|
|
|
|
+ @MethodSource("params")
|
|
|
|
+ public void testJobCommit(int pNumThreads,
|
|
|
|
+ boolean pNniqueFilenames) throws Exception {
|
|
|
|
+ initTestStagingCommitter(pNumThreads, pNniqueFilenames);
|
|
Path jobAttemptPath = jobCommitter.getJobAttemptPath(job);
|
|
Path jobAttemptPath = jobCommitter.getJobAttemptPath(job);
|
|
FileSystem fs = jobAttemptPath.getFileSystem(conf);
|
|
FileSystem fs = jobAttemptPath.getFileSystem(conf);
|
|
|
|
|
|
@@ -621,21 +660,24 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
|
|
assertPathExists(fs, "No job attempt path", jobAttemptPath);
|
|
assertPathExists(fs, "No job attempt path", jobAttemptPath);
|
|
|
|
|
|
jobCommitter.commitJob(job);
|
|
jobCommitter.commitJob(job);
|
|
- assertEquals("Should have aborted no uploads",
|
|
|
|
- 0, results.getAborts().size());
|
|
|
|
|
|
+ assertEquals(0, results.getAborts().size(),
|
|
|
|
+ "Should have aborted no uploads");
|
|
|
|
|
|
- assertEquals("Should have deleted no uploads",
|
|
|
|
- 0, results.getDeletes().size());
|
|
|
|
|
|
+ assertEquals(0, results.getDeletes().size(),
|
|
|
|
+ "Should have deleted no uploads");
|
|
|
|
|
|
- assertEquals("Should have committed all uploads",
|
|
|
|
- uploads, getCommittedIds(results.getCommits()));
|
|
|
|
|
|
+ assertEquals(uploads, getCommittedIds(results.getCommits()),
|
|
|
|
+ "Should have committed all uploads");
|
|
|
|
|
|
assertPathDoesNotExist(fs, "jobAttemptPath not deleted", jobAttemptPath);
|
|
assertPathDoesNotExist(fs, "jobAttemptPath not deleted", jobAttemptPath);
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testJobCommitFailure() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "threads-{0}-unique-{1}")
|
|
|
|
+ @MethodSource("params")
|
|
|
|
+ public void testJobCommitFailure(int pNumThreads,
|
|
|
|
+ boolean pNniqueFilenames) throws Exception {
|
|
|
|
+ initTestStagingCommitter(pNumThreads, pNniqueFilenames);
|
|
Path jobAttemptPath = jobCommitter.getJobAttemptPath(job);
|
|
Path jobAttemptPath = jobCommitter.getJobAttemptPath(job);
|
|
FileSystem fs = jobAttemptPath.getFileSystem(conf);
|
|
FileSystem fs = jobAttemptPath.getFileSystem(conf);
|
|
|
|
|
|
@@ -666,24 +708,27 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
|
|
"s3a://" + delete.bucket() + "/" + delete.key())
|
|
"s3a://" + delete.bucket() + "/" + delete.key())
|
|
.collect(Collectors.toSet());
|
|
.collect(Collectors.toSet());
|
|
|
|
|
|
- Assertions.assertThat(commits)
|
|
|
|
|
|
+ assertThat(commits)
|
|
.describedAs("Committed objects compared to deleted paths %s", results)
|
|
.describedAs("Committed objects compared to deleted paths %s", results)
|
|
.containsExactlyInAnyOrderElementsOf(deletes);
|
|
.containsExactlyInAnyOrderElementsOf(deletes);
|
|
|
|
|
|
- Assertions.assertThat(results.getAborts())
|
|
|
|
|
|
+ assertThat(results.getAborts())
|
|
.describedAs("aborted count in %s", results)
|
|
.describedAs("aborted count in %s", results)
|
|
.hasSize(7);
|
|
.hasSize(7);
|
|
Set<String> uploadIds = getCommittedIds(results.getCommits());
|
|
Set<String> uploadIds = getCommittedIds(results.getCommits());
|
|
uploadIds.addAll(getAbortedIds(results.getAborts()));
|
|
uploadIds.addAll(getAbortedIds(results.getAborts()));
|
|
- Assertions.assertThat(uploadIds)
|
|
|
|
|
|
+ assertThat(uploadIds)
|
|
.describedAs("Combined commit/delete and aborted upload IDs")
|
|
.describedAs("Combined commit/delete and aborted upload IDs")
|
|
.containsExactlyInAnyOrderElementsOf(uploads);
|
|
.containsExactlyInAnyOrderElementsOf(uploads);
|
|
|
|
|
|
assertPathDoesNotExist(fs, "jobAttemptPath not deleted", jobAttemptPath);
|
|
assertPathDoesNotExist(fs, "jobAttemptPath not deleted", jobAttemptPath);
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testJobAbort() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "threads-{0}-unique-{1}")
|
|
|
|
+ @MethodSource("params")
|
|
|
|
+ public void testJobAbort(int pNumThreads,
|
|
|
|
+ boolean pNniqueFilenames) throws Exception {
|
|
|
|
+ initTestStagingCommitter(pNumThreads, pNniqueFilenames);
|
|
Path jobAttemptPath = jobCommitter.getJobAttemptPath(job);
|
|
Path jobAttemptPath = jobCommitter.getJobAttemptPath(job);
|
|
FileSystem fs = jobAttemptPath.getFileSystem(conf);
|
|
FileSystem fs = jobAttemptPath.getFileSystem(conf);
|
|
|
|
|
|
@@ -691,14 +736,14 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
|
|
|
|
|
|
assertPathExists(fs, "No job attempt path", jobAttemptPath);
|
|
assertPathExists(fs, "No job attempt path", jobAttemptPath);
|
|
jobCommitter.abortJob(job, JobStatus.State.KILLED);
|
|
jobCommitter.abortJob(job, JobStatus.State.KILLED);
|
|
- assertEquals("Should have committed no uploads: " + jobCommitter,
|
|
|
|
- 0, results.getCommits().size());
|
|
|
|
|
|
+ assertEquals(0, results.getCommits().size(),
|
|
|
|
+ "Should have committed no uploads: " + jobCommitter);
|
|
|
|
|
|
- assertEquals("Should have deleted no uploads: " + jobCommitter,
|
|
|
|
- 0, results.getDeletes().size());
|
|
|
|
|
|
+ assertEquals(0, results.getDeletes().size(),
|
|
|
|
+ "Should have deleted no uploads: " + jobCommitter);
|
|
|
|
|
|
- assertEquals("Should have aborted all uploads: " + jobCommitter,
|
|
|
|
- uploads, getAbortedIds(results.getAborts()));
|
|
|
|
|
|
+ assertEquals(uploads, getAbortedIds(results.getAborts()),
|
|
|
|
+ "Should have aborted all uploads: " + jobCommitter);
|
|
|
|
|
|
assertPathDoesNotExist(fs, "jobAttemptPath not deleted", jobAttemptPath);
|
|
assertPathDoesNotExist(fs, "jobAttemptPath not deleted", jobAttemptPath);
|
|
}
|
|
}
|
|
@@ -771,16 +816,16 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
|
|
|
|
|
|
private static void assertValidUpload(Map<String, List<String>> parts,
|
|
private static void assertValidUpload(Map<String, List<String>> parts,
|
|
SinglePendingCommit commit) {
|
|
SinglePendingCommit commit) {
|
|
- assertTrue("Should commit a valid uploadId",
|
|
|
|
- parts.containsKey(commit.getUploadId()));
|
|
|
|
|
|
+ assertTrue(parts.containsKey(commit.getUploadId()),
|
|
|
|
+ "Should commit a valid uploadId");
|
|
|
|
|
|
List<String> tags = parts.get(commit.getUploadId());
|
|
List<String> tags = parts.get(commit.getUploadId());
|
|
- assertEquals("Should commit the correct number of file parts",
|
|
|
|
- tags.size(), commit.getPartCount());
|
|
|
|
|
|
+ assertEquals(tags.size(), commit.getPartCount(),
|
|
|
|
+ "Should commit the correct number of file parts");
|
|
|
|
|
|
for (int i = 0; i < tags.size(); i += 1) {
|
|
for (int i = 0; i < tags.size(); i += 1) {
|
|
- assertEquals("Should commit the correct part tags",
|
|
|
|
- tags.get(i), commit.getEtags().get(i).getEtag());
|
|
|
|
|
|
+ assertEquals(tags.get(i), commit.getEtags().get(i).getEtag(),
|
|
|
|
+ "Should commit the correct part tags");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|