|
@@ -20,13 +20,14 @@ package org.apache.hadoop.fs.s3a.commit;
|
|
|
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
+import java.text.DateFormat;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Date;
|
|
|
import java.util.List;
|
|
|
+import java.util.UUID;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Future;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
-import java.util.stream.Collectors;
|
|
|
|
|
|
import com.amazonaws.services.s3.model.MultipartUpload;
|
|
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
|
@@ -35,6 +36,8 @@ import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFact
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import org.apache.commons.lang3.tuple.Pair;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
@@ -45,8 +48,10 @@ import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
|
|
|
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
|
|
|
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
|
|
|
import org.apache.hadoop.mapreduce.JobContext;
|
|
|
+import org.apache.hadoop.mapreduce.JobID;
|
|
|
import org.apache.hadoop.mapreduce.JobStatus;
|
|
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
|
|
+import org.apache.hadoop.mapreduce.TaskAttemptID;
|
|
|
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.util.DurationInfo;
|
|
@@ -58,6 +63,10 @@ import static org.apache.hadoop.fs.s3a.S3AUtils.*;
|
|
|
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
|
|
|
import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
|
|
|
import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*;
|
|
|
+import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.E_NO_SPARK_UUID;
|
|
|
+import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID;
|
|
|
+import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID_SOURCE;
|
|
|
+import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.SPARK_WRITE_UUID;
|
|
|
|
|
|
/**
|
|
|
* Abstract base class for S3A committers; allows for any commonality
|
|
@@ -86,11 +95,40 @@ import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*;
|
|
|
* committer was large enough for more all the parallel POST requests.
|
|
|
*/
|
|
|
public abstract class AbstractS3ACommitter extends PathOutputCommitter {
|
|
|
+
|
|
|
private static final Logger LOG =
|
|
|
LoggerFactory.getLogger(AbstractS3ACommitter.class);
|
|
|
|
|
|
public static final String THREAD_PREFIX = "s3a-committer-pool-";
|
|
|
|
|
|
+ /**
|
|
|
+ * Error string when task setup fails.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ public static final String E_SELF_GENERATED_JOB_UUID
|
|
|
+ = "has a self-generated job UUID";
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Unique ID for a Job.
|
|
|
+ * In MapReduce Jobs the YARN JobID suffices.
|
|
|
+ * On Spark this only be the YARN JobID
|
|
|
+ * it is known to be creating strongly unique IDs
|
|
|
+ * (i.e. SPARK-33402 is on the branch).
|
|
|
+ */
|
|
|
+ private final String uuid;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Source of the {@link #uuid} value.
|
|
|
+ */
|
|
|
+ private final JobUUIDSource uuidSource;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Has this instance been used for job setup?
|
|
|
+ * If so then it is safe for a locally generated
|
|
|
+ * UUID to be used for task setup.
|
|
|
+ */
|
|
|
+ private boolean jobSetup;
|
|
|
+
|
|
|
/**
|
|
|
* Thread pool for task execution.
|
|
|
*/
|
|
@@ -147,14 +185,19 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
|
|
|
this.jobContext = context;
|
|
|
this.role = "Task committer " + context.getTaskAttemptID();
|
|
|
setConf(context.getConfiguration());
|
|
|
+ Pair<String, JobUUIDSource> id = buildJobUUID(
|
|
|
+ conf, context.getJobID());
|
|
|
+ this.uuid = id.getLeft();
|
|
|
+ this.uuidSource = id.getRight();
|
|
|
+ LOG.info("Job UUID {} source {}", getUUID(), getUUIDSource().getText());
|
|
|
initOutput(outputPath);
|
|
|
LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}",
|
|
|
role, jobName(context), jobIdString(context), outputPath);
|
|
|
S3AFileSystem fs = getDestS3AFS();
|
|
|
- createJobMarker = context.getConfiguration().getBoolean(
|
|
|
+ this.createJobMarker = context.getConfiguration().getBoolean(
|
|
|
CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
|
|
|
DEFAULT_CREATE_SUCCESSFUL_JOB_DIR_MARKER);
|
|
|
- commitOperations = new CommitOperations(fs);
|
|
|
+ this.commitOperations = new CommitOperations(fs);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -202,7 +245,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
|
|
|
* @return the working path.
|
|
|
*/
|
|
|
@Override
|
|
|
- public Path getWorkPath() {
|
|
|
+ public final Path getWorkPath() {
|
|
|
return workPath;
|
|
|
}
|
|
|
|
|
@@ -210,16 +253,16 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
|
|
|
* Set the work path for this committer.
|
|
|
* @param workPath the work path to use.
|
|
|
*/
|
|
|
- protected void setWorkPath(Path workPath) {
|
|
|
+ protected final void setWorkPath(Path workPath) {
|
|
|
LOG.debug("Setting work path to {}", workPath);
|
|
|
this.workPath = workPath;
|
|
|
}
|
|
|
|
|
|
- public Configuration getConf() {
|
|
|
+ public final Configuration getConf() {
|
|
|
return conf;
|
|
|
}
|
|
|
|
|
|
- protected void setConf(Configuration conf) {
|
|
|
+ protected final void setConf(Configuration conf) {
|
|
|
this.conf = conf;
|
|
|
}
|
|
|
|
|
@@ -308,6 +351,24 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
|
|
|
*/
|
|
|
public abstract String getName();
|
|
|
|
|
|
+ /**
|
|
|
+ * The Job UUID, as passed in or generated.
|
|
|
+ * @return the UUID for the job.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ public final String getUUID() {
|
|
|
+ return uuid;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Source of the UUID.
|
|
|
+ * @return how the job UUID was retrieved/generated.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ public final JobUUIDSource getUUIDSource() {
|
|
|
+ return uuidSource;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public String toString() {
|
|
|
final StringBuilder sb = new StringBuilder(
|
|
@@ -316,6 +377,8 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
|
|
|
sb.append(", name=").append(getName());
|
|
|
sb.append(", outputPath=").append(getOutputPath());
|
|
|
sb.append(", workPath=").append(workPath);
|
|
|
+ sb.append(", uuid='").append(getUUID()).append('\'');
|
|
|
+ sb.append(", uuid source=").append(getUUIDSource());
|
|
|
sb.append('}');
|
|
|
return sb.toString();
|
|
|
}
|
|
@@ -394,6 +457,8 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
|
|
|
// create a success data structure and then save it
|
|
|
SuccessData successData = new SuccessData();
|
|
|
successData.setCommitter(getName());
|
|
|
+ successData.setJobId(uuid);
|
|
|
+ successData.setJobIdSource(uuidSource.getText());
|
|
|
successData.setDescription(getRole());
|
|
|
successData.setHostname(NetUtils.getLocalHostname());
|
|
|
Date now = new Date();
|
|
@@ -411,26 +476,60 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
|
|
|
* be deleted; creating it now ensures there is something at the end
|
|
|
* while the job is in progress -and if nothing is created, that
|
|
|
* it is still there.
|
|
|
+ * <p>
|
|
|
+ * The option {@link InternalCommitterConstants#FS_S3A_COMMITTER_UUID}
|
|
|
+ * is set to the job UUID; if generated locally
|
|
|
+ * {@link InternalCommitterConstants#SPARK_WRITE_UUID} is also patched.
|
|
|
+ * The field {@link #jobSetup} is set to true to note that
|
|
|
+ * this specific committer instance was used to set up a job.
|
|
|
+ * </p>
|
|
|
* @param context context
|
|
|
* @throws IOException IO failure
|
|
|
*/
|
|
|
|
|
|
@Override
|
|
|
public void setupJob(JobContext context) throws IOException {
|
|
|
- try (DurationInfo d = new DurationInfo(LOG, "preparing destination")) {
|
|
|
+ try (DurationInfo d = new DurationInfo(LOG,
|
|
|
+ "Job %s setting up", getUUID())) {
|
|
|
+ // record that the job has been set up
|
|
|
+ jobSetup = true;
|
|
|
+ // patch job conf with the job UUID.
|
|
|
+ Configuration c = context.getConfiguration();
|
|
|
+ c.set(FS_S3A_COMMITTER_UUID, getUUID());
|
|
|
+ c.set(FS_S3A_COMMITTER_UUID_SOURCE, getUUIDSource().getText());
|
|
|
+ Path dest = getOutputPath();
|
|
|
if (createJobMarker){
|
|
|
- commitOperations.deleteSuccessMarker(getOutputPath());
|
|
|
+ commitOperations.deleteSuccessMarker(dest);
|
|
|
}
|
|
|
- getDestFS().mkdirs(getOutputPath());
|
|
|
+ getDestFS().mkdirs(dest);
|
|
|
+ // do a scan for surplus markers
|
|
|
+ warnOnActiveUploads(dest);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Task setup. Fails if the the UUID was generated locally, and
|
|
|
+ * the same committer wasn't used for job setup.
|
|
|
+ * {@inheritDoc}
|
|
|
+ * @throws PathCommitException if the task UUID options are unsatisfied.
|
|
|
+ */
|
|
|
@Override
|
|
|
public void setupTask(TaskAttemptContext context) throws IOException {
|
|
|
+ TaskAttemptID attemptID = context.getTaskAttemptID();
|
|
|
try (DurationInfo d = new DurationInfo(LOG, "Setup Task %s",
|
|
|
- context.getTaskAttemptID())) {
|
|
|
+ attemptID)) {
|
|
|
+ // reject attempts to set up the task where the output won't be
|
|
|
+ // picked up
|
|
|
+ if (!jobSetup
|
|
|
+ && getUUIDSource() == JobUUIDSource.GeneratedLocally) {
|
|
|
+ // on anything other than a test run, the context must not have been
|
|
|
+ // generated locally.
|
|
|
+ throw new PathCommitException(getOutputPath().toString(),
|
|
|
+ "Task attempt " + attemptID
|
|
|
+ + " " + E_SELF_GENERATED_JOB_UUID);
|
|
|
+ }
|
|
|
Path taskAttemptPath = getTaskAttemptPath(context);
|
|
|
- FileSystem fs = getTaskAttemptFilesystem(context);
|
|
|
+ FileSystem fs = taskAttemptPath.getFileSystem(getConf());
|
|
|
fs.mkdirs(taskAttemptPath);
|
|
|
}
|
|
|
}
|
|
@@ -474,12 +573,12 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
|
|
|
.stopOnFailure()
|
|
|
.suppressExceptions(false)
|
|
|
.executeWith(buildSubmitter(context))
|
|
|
- .abortWith(path ->
|
|
|
- loadAndAbort(commitContext, pending, path, true, false))
|
|
|
- .revertWith(path ->
|
|
|
- loadAndRevert(commitContext, pending, path))
|
|
|
- .run(path ->
|
|
|
- loadAndCommit(commitContext, pending, path));
|
|
|
+ .abortWith(status ->
|
|
|
+ loadAndAbort(commitContext, pending, status, true, false))
|
|
|
+ .revertWith(status ->
|
|
|
+ loadAndRevert(commitContext, pending, status))
|
|
|
+ .run(status ->
|
|
|
+ loadAndCommit(commitContext, pending, status));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -504,7 +603,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
|
|
|
.stopOnFailure()
|
|
|
.suppressExceptions(false)
|
|
|
.executeWith(buildSubmitter(context))
|
|
|
- .run(path -> PendingSet.load(sourceFS, path));
|
|
|
+ .run(status -> PendingSet.load(sourceFS, status));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -512,17 +611,26 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
|
|
|
* Load a pendingset file and commit all of its contents.
|
|
|
* @param commitContext context to commit through
|
|
|
* @param activeCommit commit state
|
|
|
- * @param path path to load
|
|
|
+ * @param status file to load
|
|
|
* @throws IOException failure
|
|
|
*/
|
|
|
private void loadAndCommit(
|
|
|
final CommitOperations.CommitContext commitContext,
|
|
|
final ActiveCommit activeCommit,
|
|
|
- final Path path) throws IOException {
|
|
|
+ final FileStatus status) throws IOException {
|
|
|
|
|
|
+ final Path path = status.getPath();
|
|
|
try (DurationInfo ignored =
|
|
|
- new DurationInfo(LOG, false, "Committing %s", path)) {
|
|
|
- PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(), path);
|
|
|
+ new DurationInfo(LOG,
|
|
|
+ "Loading and committing files in pendingset %s", path)) {
|
|
|
+ PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(),
|
|
|
+ status);
|
|
|
+ String jobId = pendingSet.getJobId();
|
|
|
+ if (!StringUtils.isEmpty(jobId) && !getUUID().equals(jobId)) {
|
|
|
+ throw new PathCommitException(path,
|
|
|
+ String.format("Mismatch in Job ID (%s) and commit job ID (%s)",
|
|
|
+ getUUID(), jobId));
|
|
|
+ }
|
|
|
Tasks.foreach(pendingSet.getCommits())
|
|
|
.stopOnFailure()
|
|
|
.suppressExceptions(false)
|
|
@@ -543,17 +651,19 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
|
|
|
* Load a pendingset file and revert all of its contents.
|
|
|
* @param commitContext context to commit through
|
|
|
* @param activeCommit commit state
|
|
|
- * @param path path to load
|
|
|
+ * @param status status of file to load
|
|
|
* @throws IOException failure
|
|
|
*/
|
|
|
private void loadAndRevert(
|
|
|
final CommitOperations.CommitContext commitContext,
|
|
|
final ActiveCommit activeCommit,
|
|
|
- final Path path) throws IOException {
|
|
|
+ final FileStatus status) throws IOException {
|
|
|
|
|
|
+ final Path path = status.getPath();
|
|
|
try (DurationInfo ignored =
|
|
|
new DurationInfo(LOG, false, "Committing %s", path)) {
|
|
|
- PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(), path);
|
|
|
+ PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(),
|
|
|
+ status);
|
|
|
Tasks.foreach(pendingSet.getCommits())
|
|
|
.suppressExceptions(true)
|
|
|
.run(commitContext::revertCommit);
|
|
@@ -564,21 +674,22 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
|
|
|
* Load a pendingset file and abort all of its contents.
|
|
|
* @param commitContext context to commit through
|
|
|
* @param activeCommit commit state
|
|
|
- * @param path path to load
|
|
|
+ * @param status status of file to load
|
|
|
* @param deleteRemoteFiles should remote files be deleted?
|
|
|
* @throws IOException failure
|
|
|
*/
|
|
|
private void loadAndAbort(
|
|
|
final CommitOperations.CommitContext commitContext,
|
|
|
final ActiveCommit activeCommit,
|
|
|
- final Path path,
|
|
|
+ final FileStatus status,
|
|
|
final boolean suppressExceptions,
|
|
|
final boolean deleteRemoteFiles) throws IOException {
|
|
|
|
|
|
+ final Path path = status.getPath();
|
|
|
try (DurationInfo ignored =
|
|
|
new DurationInfo(LOG, false, "Aborting %s", path)) {
|
|
|
PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(),
|
|
|
- path);
|
|
|
+ status);
|
|
|
FileSystem fs = getDestFS();
|
|
|
Tasks.foreach(pendingSet.getCommits())
|
|
|
.executeWith(singleThreadSubmitter())
|
|
@@ -659,6 +770,13 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
|
|
|
*/
|
|
|
protected void abortPendingUploadsInCleanup(
|
|
|
boolean suppressExceptions) throws IOException {
|
|
|
+ // return early if aborting is disabled.
|
|
|
+ if (!shouldAbortUploadsInCleanup()) {
|
|
|
+ LOG.debug("Not cleanup up pending uploads to {} as {} is false ",
|
|
|
+ getOutputPath(),
|
|
|
+ FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS);
|
|
|
+ return;
|
|
|
+ }
|
|
|
Path dest = getOutputPath();
|
|
|
try (DurationInfo ignored =
|
|
|
new DurationInfo(LOG, "Aborting all pending commits under %s",
|
|
@@ -674,14 +792,27 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
|
|
|
maybeIgnore(suppressExceptions, "aborting pending uploads", e);
|
|
|
return;
|
|
|
}
|
|
|
- Tasks.foreach(pending)
|
|
|
- .executeWith(buildSubmitter(getJobContext()))
|
|
|
- .suppressExceptions(suppressExceptions)
|
|
|
- .run(u -> commitContext.abortMultipartCommit(
|
|
|
- u.getKey(), u.getUploadId()));
|
|
|
+ if (!pending.isEmpty()) {
|
|
|
+ LOG.warn("{} pending uploads were found -aborting", pending.size());
|
|
|
+ LOG.warn("If other tasks/jobs are writing to {},"
|
|
|
+ + "this action may cause them to fail", dest);
|
|
|
+ Tasks.foreach(pending)
|
|
|
+ .executeWith(buildSubmitter(getJobContext()))
|
|
|
+ .suppressExceptions(suppressExceptions)
|
|
|
+ .run(u -> commitContext.abortMultipartCommit(
|
|
|
+ u.getKey(), u.getUploadId()));
|
|
|
+ } else {
|
|
|
+ LOG.info("No pending uploads were found");
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private boolean shouldAbortUploadsInCleanup() {
|
|
|
+ return getConf()
|
|
|
+ .getBoolean(FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS,
|
|
|
+ DEFAULT_FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Subclass-specific pre-Job-commit actions.
|
|
|
* The staging committers all load the pending files to verify that
|
|
@@ -1044,6 +1175,166 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Scan for active uploads and list them along with a warning message.
|
|
|
+ * Errors are ignored.
|
|
|
+ * @param path output path of job.
|
|
|
+ */
|
|
|
+ protected void warnOnActiveUploads(final Path path) {
|
|
|
+ List<MultipartUpload> pending;
|
|
|
+ try {
|
|
|
+ pending = getCommitOperations()
|
|
|
+ .listPendingUploadsUnderPath(path);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.debug("Failed to list uploads under {}",
|
|
|
+ path, e);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (!pending.isEmpty()) {
|
|
|
+ // log a warning
|
|
|
+ LOG.warn("{} active upload(s) in progress under {}",
|
|
|
+ pending.size(),
|
|
|
+ path);
|
|
|
+ LOG.warn("Either jobs are running concurrently"
|
|
|
+ + " or failed jobs are not being cleaned up");
|
|
|
+ // and the paths + timestamps
|
|
|
+ DateFormat df = DateFormat.getDateTimeInstance();
|
|
|
+ pending.forEach(u ->
|
|
|
+ LOG.info("[{}] {}",
|
|
|
+ df.format(u.getInitiated()),
|
|
|
+ u.getKey()));
|
|
|
+ if (shouldAbortUploadsInCleanup()) {
|
|
|
+ LOG.warn("This committer will abort these uploads in job cleanup");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Build the job UUID.
|
|
|
+ *
|
|
|
+ * <p>
|
|
|
+ * In MapReduce jobs, the application ID is issued by YARN, and
|
|
|
+ * unique across all jobs.
|
|
|
+ * </p>
|
|
|
+ * <p>
|
|
|
+ * Spark will use a fake app ID based on the current time.
|
|
|
+ * This can lead to collisions on busy clusters unless
|
|
|
+ * the specific spark release has SPARK-33402 applied.
|
|
|
+ * This appends a random long value to the timestamp, so
|
|
|
+ * is unique enough that the risk of collision is almost
|
|
|
+ * nonexistent.
|
|
|
+ * </p>
|
|
|
+ * <p>
|
|
|
+ * The order of selection of a uuid is
|
|
|
+ * </p>
|
|
|
+ * <ol>
|
|
|
+ * <li>Value of
|
|
|
+ * {@link InternalCommitterConstants#FS_S3A_COMMITTER_UUID}.</li>
|
|
|
+ * <li>Value of
|
|
|
+ * {@link InternalCommitterConstants#SPARK_WRITE_UUID}.</li>
|
|
|
+ * <li>If enabled through
|
|
|
+ * {@link CommitConstants#FS_S3A_COMMITTER_GENERATE_UUID}:
|
|
|
+ * Self-generated uuid.</li>
|
|
|
+ * <li>If {@link CommitConstants#FS_S3A_COMMITTER_REQUIRE_UUID}
|
|
|
+ * is not set: Application ID</li>
|
|
|
+ * </ol>
|
|
|
+ * The UUID bonding takes place during construction;
|
|
|
+ * the staging committers use it to set up their wrapped
|
|
|
+ * committer to a path in the cluster FS which is unique to the
|
|
|
+ * job.
|
|
|
+ * <p>
|
|
|
+ * In MapReduce jobs, the application ID is issued by YARN, and
|
|
|
+ * unique across all jobs.
|
|
|
+ * </p>
|
|
|
+ * In {@link #setupJob(JobContext)} the job context's configuration
|
|
|
+ * will be patched
|
|
|
+ * be valid in all sequences where the job has been set up for the
|
|
|
+ * configuration passed in.
|
|
|
+ * <p>
|
|
|
+ * If the option {@link CommitConstants#FS_S3A_COMMITTER_REQUIRE_UUID}
|
|
|
+ * is set, then an external UUID MUST be passed in.
|
|
|
+ * This can be used to verify that the spark engine is reliably setting
|
|
|
+ * unique IDs for staging.
|
|
|
+ * </p>
|
|
|
+ * @param conf job/task configuration
|
|
|
+ * @param jobId job ID from YARN or spark.
|
|
|
+ * @return Job UUID and source of it.
|
|
|
+ * @throws PathCommitException no UUID was found and it was required
|
|
|
+ */
|
|
|
+ public static Pair<String, JobUUIDSource>
|
|
|
+ buildJobUUID(Configuration conf, JobID jobId)
|
|
|
+ throws PathCommitException {
|
|
|
+
|
|
|
+ String jobUUID = conf.getTrimmed(FS_S3A_COMMITTER_UUID, "");
|
|
|
+
|
|
|
+ if (!jobUUID.isEmpty()) {
|
|
|
+ return Pair.of(jobUUID, JobUUIDSource.CommitterUUIDProperty);
|
|
|
+ }
|
|
|
+ // there is no job UUID.
|
|
|
+ // look for one from spark
|
|
|
+ jobUUID = conf.getTrimmed(SPARK_WRITE_UUID, "");
|
|
|
+ if (!jobUUID.isEmpty()) {
|
|
|
+ return Pair.of(jobUUID, JobUUIDSource.SparkWriteUUID);
|
|
|
+ }
|
|
|
+
|
|
|
+ // there is no UUID configuration in the job/task config
|
|
|
+
|
|
|
+ // Check the job hasn't declared a requirement for the UUID.
|
|
|
+ // This allows or fail-fast validation of Spark behavior.
|
|
|
+ if (conf.getBoolean(FS_S3A_COMMITTER_REQUIRE_UUID,
|
|
|
+ DEFAULT_S3A_COMMITTER_REQUIRE_UUID)) {
|
|
|
+ throw new PathCommitException("", E_NO_SPARK_UUID);
|
|
|
+ }
|
|
|
+
|
|
|
+ // see if the job can generate a random UUI`
|
|
|
+ if (conf.getBoolean(FS_S3A_COMMITTER_GENERATE_UUID,
|
|
|
+ DEFAULT_S3A_COMMITTER_GENERATE_UUID)) {
|
|
|
+ // generate a random UUID. This is OK for a job, for a task
|
|
|
+ // it means that the data may not get picked up.
|
|
|
+ String newId = UUID.randomUUID().toString();
|
|
|
+ LOG.warn("No job ID in configuration; generating a random ID: {}",
|
|
|
+ newId);
|
|
|
+ return Pair.of(newId, JobUUIDSource.GeneratedLocally);
|
|
|
+ }
|
|
|
+ // if no other option was supplied, return the job ID.
|
|
|
+ // This is exactly what MR jobs expect, but is not what
|
|
|
+ // Spark jobs can do as there is a risk of jobID collision.
|
|
|
+ return Pair.of(jobId.toString(), JobUUIDSource.JobID);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Enumeration of Job UUID source.
|
|
|
+ */
|
|
|
+ public enum JobUUIDSource {
|
|
|
+ SparkWriteUUID(SPARK_WRITE_UUID),
|
|
|
+ CommitterUUIDProperty(FS_S3A_COMMITTER_UUID),
|
|
|
+ JobID("JobID"),
|
|
|
+ GeneratedLocally("Generated Locally");
|
|
|
+
|
|
|
+ private final String text;
|
|
|
+
|
|
|
+ JobUUIDSource(final String text) {
|
|
|
+ this.text = text;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Source for messages.
|
|
|
+ * @return text
|
|
|
+ */
|
|
|
+ public String getText() {
|
|
|
+ return text;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ final StringBuilder sb = new StringBuilder(
|
|
|
+ "JobUUIDSource{");
|
|
|
+ sb.append("text='").append(text).append('\'');
|
|
|
+ sb.append('}');
|
|
|
+ return sb.toString();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* State of the active commit operation.
|
|
|
*
|
|
@@ -1071,7 +1362,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
|
|
|
= new ActiveCommit(null, new ArrayList<>());
|
|
|
|
|
|
/** All pendingset files to iterate through. */
|
|
|
- private final List<Path> sourceFiles;
|
|
|
+ private final List<FileStatus> sourceFiles;
|
|
|
|
|
|
/**
|
|
|
* Filesystem for the source files.
|
|
@@ -1101,8 +1392,8 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
|
|
|
*/
|
|
|
public ActiveCommit(
|
|
|
final FileSystem sourceFS,
|
|
|
- final List<Path> sourceFiles) {
|
|
|
- this.sourceFiles = sourceFiles;
|
|
|
+ final List<? extends FileStatus> sourceFiles) {
|
|
|
+ this.sourceFiles = (List<FileStatus>) sourceFiles;
|
|
|
this.sourceFS = sourceFS;
|
|
|
}
|
|
|
|
|
@@ -1115,10 +1406,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
|
|
|
public static ActiveCommit fromStatusList(
|
|
|
final FileSystem pendingFS,
|
|
|
final List<? extends FileStatus> statuses) {
|
|
|
- return new ActiveCommit(pendingFS,
|
|
|
- statuses.stream()
|
|
|
- .map(FileStatus::getPath)
|
|
|
- .collect(Collectors.toList()));
|
|
|
+ return new ActiveCommit(pendingFS, statuses);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1129,7 +1417,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
|
|
|
return EMPTY;
|
|
|
}
|
|
|
|
|
|
- public List<Path> getSourceFiles() {
|
|
|
+ public List<FileStatus> getSourceFiles() {
|
|
|
return sourceFiles;
|
|
|
}
|
|
|
|
|
@@ -1174,8 +1462,8 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
|
|
|
return sourceFiles.isEmpty();
|
|
|
}
|
|
|
|
|
|
- public void add(Path path) {
|
|
|
- sourceFiles.add(path);
|
|
|
+ public void add(FileStatus status) {
|
|
|
+ sourceFiles.add(status);
|
|
|
}
|
|
|
}
|
|
|
}
|