|
@@ -0,0 +1,1951 @@
|
|
|
+<!---
|
|
|
+ Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
+ you may not use this file except in compliance with the License.
|
|
|
+ You may obtain a copy of the License at
|
|
|
+
|
|
|
+ http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+
|
|
|
+ Unless required by applicable law or agreed to in writing, software
|
|
|
+ distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ See the License for the specific language governing permissions and
|
|
|
+ limitations under the License. See accompanying LICENSE file.
|
|
|
+-->
|
|
|
+
|
|
|
+# S3A Committers: Architecture and Implementation
|
|
|
+
|
|
|
+<!-- DISABLEDMACRO{toc|fromDepth=0|toDepth=5} -->
|
|
|
+
|
|
|
+This document covers the architecture and implementation details of the S3A committers.
|
|
|
+
|
|
|
+For information on using the committers, see [the S3A Committers](./committer.html).
|
|
|
+
|
|
|
+
|
|
|
+## Problem: Efficient, reliable commits of work to consistent S3 buckets
|
|
|
+
|
|
|
+
|
|
|
+The standard commit algorithms (the `FileOutputCommitter` and its v1 and v2 algorithms)
|
|
|
+rely on directory rename being an `O(1)` atomic operation: callers output their
|
|
|
+work to temporary directories in the destination filesystem, then
|
|
|
+rename these directories to the final destination as way of committing work.
|
|
|
+This is the perfect solution for commiting work against any filesystem with
|
|
|
+consistent listing operations and where the `FileSystem.rename()` command
|
|
|
+is an atomic `O(1)` operation.
|
|
|
+
|
|
|
+Using rename allows individual tasks to work in temporary directories, with the
|
|
|
+rename as the atomic operation can be used to explicitly commit tasks and
|
|
|
+ultimately the entire job. Because the cost of the rename is low, it can be
|
|
|
+performed during task and job commits with minimal delays. Note that HDFS
|
|
|
+will lock the namenode metadata during the rename operation, so all rename() calls
|
|
|
+will be serialized. However, as they only update the metadata of two directory
|
|
|
+entries, the duration of the lock is low.
|
|
|
+
|
|
|
+In contrast to a "real" filesystem, Amazon's S3A object store, similar to
|
|
|
+most others, does not support `rename()` at all. A hash operation on the filename
|
|
|
+determines the location of of the data —there is no separate metadata to change.
|
|
|
+To mimic renaming, the Hadoop S3A client has to copy the data to a new object
|
|
|
+with the destination filename, then delete the original entry. This copy
|
|
|
+can be executed server-side, but as it does not complete until the in-cluster
|
|
|
+copy has completed, it takes time proportional to the amount of data.
|
|
|
+
|
|
|
+The rename overhead is the most visible issue, but it is not the most dangerous.
|
|
|
+That is the fact that path listings have no consistency guarantees, and may
|
|
|
+lag the addition or deletion of files.
|
|
|
+If files are not listed, the commit operation will *not* copy them, and
|
|
|
+so they will not appear in the final output.
|
|
|
+
|
|
|
+The solution to this problem is closely coupled to the S3 protocol itself:
|
|
|
+delayed completion of multi-part PUT operations
|
|
|
+
|
|
|
+That is: tasks write all data as multipart uploads, *but delay the final
|
|
|
+commit action until until the final, single job commit action.* Only that
|
|
|
+data committed in the job commit action will be made visible; work from speculative
|
|
|
+and failed tasks will not be instiantiated. As there is no rename, there is no
|
|
|
+delay while data is copied from a temporary directory to the final directory.
|
|
|
+The duration of the commit will be the time needed to determine which commit operations
|
|
|
+to construct, and to execute them.
|
|
|
+
|
|
|
+
|
|
|
+## Terminology
|
|
|
+
|
|
|
+* *Job*: a potentially parallelized query/operation to execute. The execution
|
|
|
+of a job: the division of work into tasks and the management of their completion,
|
|
|
+is generally executed in a single process.
|
|
|
+
|
|
|
+The output of a Job is made visible to other stages in a larger operation
|
|
|
+sequence or other applications if the job *completes successfully*.
|
|
|
+
|
|
|
+* *Job Driver*. Not sure quite what term to use here. Whatever process schedules
|
|
|
+task execution, tracks success/failures and, determines when all the work has been
|
|
|
+processed and then commits the output. It may also determine that a job
|
|
|
+has failed and cannot be recovered, in which case the job is aborted.
|
|
|
+In MR and Tez, this is inside the YARN application master.
|
|
|
+In Spark it is the driver, which can run in the AM, the YARN client, or other
|
|
|
+places (e.g Livy?).
|
|
|
+
|
|
|
+* *Final directory*: the directory into which the output of a job is placed
|
|
|
+so as to be visible.
|
|
|
+
|
|
|
+* *Task* a single operation within a job, on a single process, one which generates
|
|
|
+one or more files.
|
|
|
+After a successful job completion, the data MUST be visible in the final directory.
|
|
|
+A task completes successfully if it generates all the output it expects to without
|
|
|
+failing in some way (error in processing; network/process failure).
|
|
|
+
|
|
|
+* *Job Context* an instance of the class `org.apache.hadoop.mapreduce.JobContext`,
|
|
|
+which provides a read-only view of the Job for the Job Driver and tasks.
|
|
|
+
|
|
|
+* *Task Attempt Context* an instance of the class
|
|
|
+`org.apache.hadoop.mapreduce.TaskAttemptContext extends JobContext, Progressable`,
|
|
|
+which provides operations for tasks, such as getting and setting status,
|
|
|
+progress and counter values.
|
|
|
+
|
|
|
+* *Task Working Directory*: a directory for exclusive access by a single task,
|
|
|
+into which uncommitted work may be placed.
|
|
|
+
|
|
|
+* *Task Commit* The act of taking the output of a task, as found in the
|
|
|
+Task Working Directory, and making it visible in the final directory.
|
|
|
+This is traditionally implemented via a `FileSystem.rename()` call.
|
|
|
+
|
|
|
+ It is useful to differentiate between a *task-side commit*: an operation performed
|
|
|
+ in the task process after its work, and a *driver-side task commit*, in which
|
|
|
+ the Job driver perfoms the commit operation. Any task-side commit work will
|
|
|
+ be performed across the cluster, and may take place off the critical part for
|
|
|
+ job execution. However, unless the commit protocol requires all tasks to await
|
|
|
+ a signal from the job driver, task-side commits cannot instantiate their output
|
|
|
+ in the final directory. They may be used to promote the output of a successful
|
|
|
+ task into a state ready for the job commit, addressing speculative execution
|
|
|
+ and failures.
|
|
|
+
|
|
|
+* *Job Commit* The act of taking all successfully completed tasks of a job,
|
|
|
+and committing them. This process is generally non-atomic; as it is often
|
|
|
+a serialized operation at the end of a job, its performance can be a bottleneck.
|
|
|
+
|
|
|
+* *Task Abort* To cancel a task such that its data is not committed.
|
|
|
+
|
|
|
+* *Job Abort* To cancel all work in a job: no task's work is committed.
|
|
|
+
|
|
|
+* *Speculative Task Execution/ "Speculation"* Running multiple tasks against the same
|
|
|
+input dataset in parallel, with the first task which completes being the one
|
|
|
+which is considered successful. Its output SHALL be committed; the other task
|
|
|
+SHALL be aborted. There's a requirement that a task can be executed in parallel,
|
|
|
+and that the output of a task MUST NOT BE visible until the job is committed,
|
|
|
+at the behest of the Job driver. There is the expectation that the output
|
|
|
+SHOULD BE the same on each task, though that MAY NOT be the case. What matters
|
|
|
+is if any instance of a speculative task is committed, the output MUST BE
|
|
|
+considered valid.
|
|
|
+
|
|
|
+There is an expectation that the Job Driver and tasks can communicate: if a task
|
|
|
+performs any operations itself during the task commit phase, it shall only do
|
|
|
+this when instructed by the Job Driver. Similarly, if a task is unable to
|
|
|
+communicate its final status to the Job Driver, it MUST NOT commit is work.
|
|
|
+This is very important when working with S3, as some network partitions could
|
|
|
+isolate a task from the Job Driver, while the task retains access to S3.
|
|
|
+
|
|
|
+## The execution workflow
|
|
|
+
|
|
|
+
|
|
|
+**setup**:
|
|
|
+
|
|
|
+* A job is created, assigned a Job ID (YARN?).
|
|
|
+* For each attempt, an attempt ID is created, to build the job attempt ID.
|
|
|
+* `Driver`: a `JobContext` is created/configured
|
|
|
+* A committer instance is instantiated with the `JobContext`; `setupJob()` invoked.
|
|
|
+
|
|
|
+
|
|
|
+## The `FileOutputCommitter`
|
|
|
+
|
|
|
+The standard commit protocols are implemented in
|
|
|
+ `org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter`.
|
|
|
+
|
|
|
+There are two algorithms, the "v1" designed to address failures and restarts
|
|
|
+of the MapReduce application master. The V2 algorithm cannot recover from failure
|
|
|
+except by re-executing the entire job. It does, however, propagate all its work
|
|
|
+to the output directory in the task commit. When working with object stores which
|
|
|
+mimic `rename()` by copy and delete, it is more efficient due to the reduced
|
|
|
+number of listings, copies and deletes, and, because these are executed in
|
|
|
+task commits, eliminates the final `O(data)` pause at the end of all work.
|
|
|
+It is still highly inefficient, but the inefficiencies are less visible as
|
|
|
+large pauses in execution.
|
|
|
+
|
|
|
+
|
|
|
+Notes
|
|
|
+
|
|
|
+* The v1 algorithm was implemented to handle MapReduce AM restarts, it was
|
|
|
+ not used in Hadoop 1.x, whose JobTracker could not recover from failures.
|
|
|
+ Historically then, it is the second version of a file commit algorithm.
|
|
|
+* Because the renames are considered to be fast, there is no logging
|
|
|
+of renames being in progress, or their duration.
|
|
|
+* Speculative execution is supported by having every task attempt write its
|
|
|
+uncommitted data to a task attempt directory. When a task is ready to commit,
|
|
|
+it must notify that job driver that it is ready to commit -the job driver
|
|
|
+will then commit or abort the task.
|
|
|
+
|
|
|
+## Hadoop MR Commit algorithm "1"
|
|
|
+
|
|
|
+
|
|
|
+The "v1" commit algorithm is the default commit algorithm in Hadoop 2.x;
|
|
|
+it was implemented as part of [MAPREDUCE-2702](https://issues.apache.org/jira/browse/MAPREDUCE-2702).
|
|
|
+
|
|
|
+This algorithm is designed to handle a failure and restart of the Job driver,
|
|
|
+with the restarted job driver only rerunning the incomplete tasks; the
|
|
|
+output of the completed tasks is recovered for commitment when the restarted
|
|
|
+job completes.
|
|
|
+
|
|
|
+There is a cost: the time to commit by recursive listing all files in all
|
|
|
+committed task directories, and renaming this.
|
|
|
+
|
|
|
+As this is performed sequentially, time to commit is `O(files)`, which is
|
|
|
+generally `O(tasks)`.
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+```python
|
|
|
+#Job Attempt Path is `$dest/_temporary/$appAttemptId/`
|
|
|
+
|
|
|
+jobAttemptPath = '$dest/_temporary/$appAttemptId/'
|
|
|
+
|
|
|
+# Task Attempt Path is `$dest/_temporary/$appAttemptId/_temporary/$taskAttemptID`
|
|
|
+taskAttemptPath = '$dest/_temporary/$appAttemptId/_temporary/$taskAttemptID'
|
|
|
+
|
|
|
+#Task committed path is `$dest/_temporary/$appAttemptId/$taskAttemptID`
|
|
|
+taskCommittedPath = '$dest/_temporary/$appAttemptId/$taskAttemptID'
|
|
|
+```
|
|
|
+
|
|
|
+Tasks write in/under the task attempt path.
|
|
|
+
|
|
|
+### Job Setup
|
|
|
+
|
|
|
+```python
|
|
|
+fs.mkdir(jobAttemptPath)
|
|
|
+```
|
|
|
+
|
|
|
+### Task Setup
|
|
|
+
|
|
|
+None: directories are created on demand.
|
|
|
+
|
|
|
+
|
|
|
+### Task Commit
|
|
|
+
|
|
|
+Rename task attempt path to task committed path.
|
|
|
+
|
|
|
+```python
|
|
|
+
|
|
|
+def needsTaskCommit(fs, jobAttemptPath, taskAttemptPath, dest):
|
|
|
+ return fs.exists(taskAttemptPath)
|
|
|
+
|
|
|
+
|
|
|
+def commitTask(fs, jobAttemptPath, taskAttemptPath, dest):
|
|
|
+ if fs.exists(taskAttemptPath) :
|
|
|
+ fs.delete(taskCommittedPath, recursive=True)
|
|
|
+ fs.rename(taskAttemptPath, taskCommittedPath)
|
|
|
+```
|
|
|
+
|
|
|
+On a genuine fileystem this is an `O(1)` directory rename.
|
|
|
+
|
|
|
+On an object store with a mimiced rename, it is `O(data)` for the copy,
|
|
|
+along with overhead for listing and deleting all files (For S3, that's
|
|
|
+`(1 + files/500)` lists, and the same number of delete calls.
|
|
|
+
|
|
|
+
|
|
|
+### Task Abort
|
|
|
+
|
|
|
+Delete task attempt path.
|
|
|
+
|
|
|
+```python
|
|
|
+def abortTask(fs, jobAttemptPath, taskAttemptPath, dest):
|
|
|
+ fs.delete(taskAttemptPath, recursive=True)
|
|
|
+```
|
|
|
+
|
|
|
+On a genuine fileystem this is an `O(1)` operation. On an object store,
|
|
|
+proportional to the time to list and delete files, usually in batches.
|
|
|
+
|
|
|
+
|
|
|
+### Job Commit
|
|
|
+
|
|
|
+Merge all files/directories in all task commited paths into final destination path.
|
|
|
+Optionally; create 0-byte `_SUCCESS` file in destination path.
|
|
|
+
|
|
|
+```python
|
|
|
+def commitJob(fs, jobAttemptDir, dest):
|
|
|
+ for committedTask in fs.listFiles(jobAttemptDir):
|
|
|
+ mergePathsV1(fs, committedTask, dest)
|
|
|
+ fs.touch("$dest/_SUCCESS")
|
|
|
+```
|
|
|
+
|
|
|
+(See below for details on `mergePaths()`)
|
|
|
+
|
|
|
+
|
|
|
+A failure during job abort cannot be recovered from except by re-executing
|
|
|
+the entire query:
|
|
|
+
|
|
|
+```python
|
|
|
+def isCommitJobRepeatable() :
|
|
|
+ return True
|
|
|
+```
|
|
|
+
|
|
|
+Accordingly, it is a failure point in the protocol. With a low number of files
|
|
|
+and fast rename/list algorithms, the window of vulnerability is low. At
|
|
|
+scale, the vulnerability increases. It could actually be reduced through
|
|
|
+parallel execution of the renaming of of committed tasks.
|
|
|
+
|
|
|
+
|
|
|
+### Job Abort
|
|
|
+
|
|
|
+Delete all data under job attempt path.
|
|
|
+
|
|
|
+```python
|
|
|
+def abortJob(fs, jobAttemptDir, dest):
|
|
|
+ fs.delete(jobAttemptDir, recursive = True)
|
|
|
+```
|
|
|
+
|
|
|
+### Job Cleanup
|
|
|
+
|
|
|
+```python
|
|
|
+def cleanupJob(fs, dest):
|
|
|
+ fs.delete('$dest/_temporary', recursive = True)
|
|
|
+```
|
|
|
+
|
|
|
+
|
|
|
+### Job Recovery
|
|
|
+
|
|
|
+1. Data under task committed paths is retained
|
|
|
+1. All directories under `$dest/_temporary/$appAttemptId/_temporary/` are deleted.
|
|
|
+
|
|
|
+Uncommitted/unexecuted tasks are (re)executed.
|
|
|
+
|
|
|
+This significantly improves time to recover from Job driver (here MR AM) failure.
|
|
|
+The only lost work is that of all tasks in progress -those which had generated
|
|
|
+data but were not yet committed.
|
|
|
+
|
|
|
+Only the failure of the job driver requires a job restart, not an individual
|
|
|
+task. Therefore the probability of this happening is independent of the number
|
|
|
+of tasks executed in parallel, instead simply due to the duration of the query.
|
|
|
+
|
|
|
+The longer the task, the higher the risk of failure, the more value there is
|
|
|
+in recovering the work in progress.
|
|
|
+
|
|
|
+Fast queries not only have a lower risk of failure, they can recover from
|
|
|
+failure simply by rerunning the entire job. This is implicitly the strategy
|
|
|
+in Spark, which does not attempt to recover any in-progress jobs. The faster
|
|
|
+your queries, the simpler your recovery strategy needs to be.
|
|
|
+
|
|
|
+### `mergePaths(FileSystem fs, FileStatus src, Path dest)` Algorithm
|
|
|
+
|
|
|
+`mergePaths()` is the core algorithm to merge data; it is somewhat confusing
|
|
|
+as the implementation mixes the strategies for both algorithms across
|
|
|
+two co-recursive routines, `mergePaths()` and `renameOrMerge()`.
|
|
|
+
|
|
|
+
|
|
|
+Here the two algorithms have been split, and one of the co-recursive methods
|
|
|
+inlined.
|
|
|
+
|
|
|
+```python
|
|
|
+def mergePathsV1(fs, src, dest) :
|
|
|
+ if fs.exists(dest) :
|
|
|
+ toStat = fs.getFileStatus(dest)
|
|
|
+ else:
|
|
|
+ toStat = None
|
|
|
+
|
|
|
+ if src.isFile :
|
|
|
+ if not toStat is None :
|
|
|
+ fs.delete(dest, recursive = True)
|
|
|
+ fs.rename(src.getPath, dest)
|
|
|
+ else :
|
|
|
+ # destination is directory, choose action on source type
|
|
|
+ if src.isDirectory :
|
|
|
+ if not toStat is None :
|
|
|
+ if not toStat.isDirectory :
|
|
|
+ # Destination exists and is not a directory
|
|
|
+ fs.delete(dest)
|
|
|
+ fs.rename(src.getPath(), dest)
|
|
|
+ else :
|
|
|
+ # Destination exists and is a directory
|
|
|
+ # merge all children under destination directory
|
|
|
+ for child in fs.listStatus(src.getPath) :
|
|
|
+ mergePathsV1(fs, child, dest + child.getName)
|
|
|
+ else :
|
|
|
+ # destination does not exist
|
|
|
+ fs.rename(src.getPath(), dest)
|
|
|
+```
|
|
|
+
|
|
|
+## v2 commit algorithm
|
|
|
+
|
|
|
+
|
|
|
+The v2 algorithm directly commits task output into the destination directory.
|
|
|
+It is essentially a re-implementation of the Hadoop 1.x commit algorithm.
|
|
|
+
|
|
|
+1. During execution, intermediate data becomes visible.
|
|
|
+1. On a failure, all output must be deleted and the job restarted.
|
|
|
+
|
|
|
+It implements `mergePaths` differently, as shown below.
|
|
|
+
|
|
|
+```python
|
|
|
+def mergePathsV2(fs, src, dest) :
|
|
|
+ if fs.exists(dest) :
|
|
|
+ toStat = fs.getFileStatus(dest)
|
|
|
+ else:
|
|
|
+ toStat = None
|
|
|
+
|
|
|
+ if src.isFile :
|
|
|
+ if not toStat is None :
|
|
|
+ fs.delete(dest, recursive = True)
|
|
|
+ fs.rename(src.getPath, dest)
|
|
|
+ else :
|
|
|
+ # destination is directory, choose action on source type
|
|
|
+ if src.isDirectory :
|
|
|
+ if not toStat is None :
|
|
|
+ if not toStat.isDirectory :
|
|
|
+ # Destination exists and is not a directory
|
|
|
+ fs.delete(dest)
|
|
|
+ fs.mkdirs(dest) #
|
|
|
+ for child in fs.listStatus(src.getPath) : # HERE
|
|
|
+ mergePathsV2(fs, child, dest + child.getName) #
|
|
|
+
|
|
|
+ else :
|
|
|
+ # Destination exists and is a directory
|
|
|
+ # merge all children under destination directory
|
|
|
+ for child in fs.listStatus(src.getPath) :
|
|
|
+ mergePathsV2(fs, child, dest + child.getName)
|
|
|
+ else :
|
|
|
+ # destination does not exist
|
|
|
+ fs.mkdirs(dest) #
|
|
|
+ for child in fs.listStatus(src.getPath) : # HERE
|
|
|
+ mergePathsV2(fs, child, dest + child.getName) #
|
|
|
+```
|
|
|
+
|
|
|
+Both recurse down any source directory tree, and commit single files
|
|
|
+by renaming the files.
|
|
|
+
|
|
|
+A a key difference is that the v1 algorithm commits a source directory to
|
|
|
+via a directory rename, which is traditionally an `O(1)` operation.
|
|
|
+
|
|
|
+In constrast, the v2 algorithm lists all direct children of a source directory
|
|
|
+and recursively calls `mergePath()` on them, ultimately renaming the individual
|
|
|
+files. As such, the number of renames it performa equals the number of source
|
|
|
+*files*, rather than the number of source *directories*; the number of directory
|
|
|
+listings being `O(depth(src))` , where `depth(path)` is a function returning the
|
|
|
+depth of directories under the given path.
|
|
|
+
|
|
|
+On a normal filesystem, the v2 merge algorithm is potentially more expensive
|
|
|
+than the v1 algorithm. However, as the merging only takes place in task commit,
|
|
|
+it is potentially less of a bottleneck in the entire execution process.
|
|
|
+
|
|
|
+On an objcct store, it is suboptimal not just from its expectation that `rename()`
|
|
|
+is an `O(1)` operation, but from its expectation that a recursive tree walk is
|
|
|
+an efficient way to enumerate and act on a tree of data. If the algorithm was
|
|
|
+switched to using `FileSystem.listFiles(path, recursive)` for a single call to
|
|
|
+enumerate all children under a path, then the listing operation would be significantly
|
|
|
+faster, at least on a deep or wide tree. However, for any realistic dataset,
|
|
|
+the size of the output files is likely to be the main cause of delays. That
|
|
|
+is, if the cost of `mergePathsV2` is `O(depth(src)) + O(data))`, then
|
|
|
+generally the `O(data)` value will be more significant than the `depth(src)`.
|
|
|
+
|
|
|
+There is one key exception: tests which work on small amounts of data yet try
|
|
|
+to generate realistic output directory structures. In these tests the cost
|
|
|
+of listing directories and calling `getFileStatus()` could exceed that of the copy
|
|
|
+calls. This is why small-scale tests of the commit algorithms against object stores
|
|
|
+must be considered significantly misleading.
|
|
|
+
|
|
|
+### v2 Task Commit
|
|
|
+
|
|
|
+Rename task attempt path to task committed path.
|
|
|
+
|
|
|
+```python
|
|
|
+def needsTaskCommit(fs, jobAttemptPath, taskAttemptPath, dest):
|
|
|
+ return fs.exists(taskAttemptPath)
|
|
|
+
|
|
|
+def commitTask(fs, jobAttemptPath, taskAttemptPath, dest):
|
|
|
+ if fs.exists(taskAttemptPath) :
|
|
|
+ mergePathsV2(fs. taskAttemptPath, dest)
|
|
|
+```
|
|
|
+
|
|
|
+### v2 Task Abort
|
|
|
+
|
|
|
+Delete task attempt path.
|
|
|
+
|
|
|
+```python
|
|
|
+def abortTask(fs, jobAttemptPath, taskAttemptPath, dest):
|
|
|
+ fs.delete(taskAttemptPath, recursive=True)
|
|
|
+```
|
|
|
+
|
|
|
+Cost: `O(1)` for normal filesystems, `O(files)` for object stores.
|
|
|
+
|
|
|
+
|
|
|
+### v2 Job Commit
|
|
|
+
|
|
|
+As all the task output is already completed, all that is needed is
|
|
|
+to touch the `_SUCCESS` marker.
|
|
|
+
|
|
|
+```python
|
|
|
+def commitJob(fs, jobAttemptDir, dest):
|
|
|
+ fs.touch("$dest/_SUCCESS")
|
|
|
+```
|
|
|
+Cost: `O(1)`
|
|
|
+
|
|
|
+A failure during job abort is implicitly repeatable
|
|
|
+
|
|
|
+```python
|
|
|
+def isCommitJobRepeatable() :
|
|
|
+ return True
|
|
|
+```
|
|
|
+
|
|
|
+### v2 Job Abort
|
|
|
+
|
|
|
+Delete all data under job attempt path.
|
|
|
+
|
|
|
+```python
|
|
|
+def abortJob(fs, jobAttemptDir, dest):
|
|
|
+ fs.delete(jobAttemptDir, recursive=True)
|
|
|
+```
|
|
|
+
|
|
|
+Cost: `O(1)` for normal filesystems, `O(files)` for object stores.
|
|
|
+
|
|
|
+### v2 Task Recovery
|
|
|
+
|
|
|
+As no data is written to the destination directory, a task can be cleaned up
|
|
|
+by deleting the task attempt directory.
|
|
|
+
|
|
|
+### v2 Job Recovery
|
|
|
+
|
|
|
+Because the data has been renamed into the destination directory, it is nominally
|
|
|
+recoverable. However, this assumes that the number and name of generated
|
|
|
+files are constant on retried tasks.
|
|
|
+
|
|
|
+
|
|
|
+## How MapReduce uses the committer in a task
|
|
|
+
|
|
|
+MapReduce runs each Mapper or Reducer in its own container; it
|
|
|
+gets its own process. Implicitly, this also means that each task gets
|
|
|
+its own instance of every filesystem.
|
|
|
+
|
|
|
+The work is choreographed in `org.apache.hadoop.mapred.Task`, for which there
|
|
|
+are specific subclassing for the Map (`MapTask`) and reduce (`ReduceTask`).
|
|
|
+There are also cleanup tasks whose role is simpler: clean things up,
|
|
|
+and a Job setup task which runs `OutputCommittre.setupJob`. That is: even
|
|
|
+the job setup phase is run in a Task.
|
|
|
+
|
|
|
+MapTask uses the committer to write the output of all the mappers into
|
|
|
+the filesystem, ready for the reducers. Each partition writes its data
|
|
|
+as a `MapFile`, which is actually two `SequenceFile` files in a directory,:
|
|
|
+the `data` file of all Key-Value
|
|
|
+output, and `index` which contains an index of some of the keys in the file.
|
|
|
+
|
|
|
+This is all written to the local filesystem.
|
|
|
+
|
|
|
+The `ReduceTask` does the final write to the destination filesystem.
|
|
|
+
|
|
|
+Because the Map phase uses a committer to commits intermediate work,
|
|
|
+any plug in committer supplied to a process
|
|
|
+through any extension mechanism *must* work with the output generated by a mapper.
|
|
|
+The staging committers do only if unique filenames are disabled, but as they
|
|
|
+and the magic committers are only meant to be used for the final output of work,
|
|
|
+it is somewhat moot. What is important is to be able to use different committers
|
|
|
+for the map phase as the final reduce. This is implicit if separate committers
|
|
|
+are defined for different filesystems: the new committer can be defined for
|
|
|
+the final destination FS, while `file://` can retain the default
|
|
|
+`FileOutputCommitter`.
|
|
|
+
|
|
|
+### Task Setup
|
|
|
+
|
|
|
+`Task.initialize()`: read in the configuration, instantate the `JobContextImpl`
|
|
|
+and `TaskAttemptContextImpl` instances bonded to the current job & task.
|
|
|
+
|
|
|
+### Task Ccommit
|
|
|
+
|
|
|
+After the work is completed, `Task.done()` is invoked. Which is essentially the following
|
|
|
+codepath:
|
|
|
+
|
|
|
+```java
|
|
|
+if (committer.needsTaskCommit(taskContext)) {
|
|
|
+ // get permission to commit from AM
|
|
|
+ int retries = MAX_RETRIES;
|
|
|
+ while(true) {
|
|
|
+ try {
|
|
|
+ umbilical.commitPending(taskId, taskStatus);
|
|
|
+ break;
|
|
|
+ } catch(IOException ie) {
|
|
|
+ if (--retries == 0) {
|
|
|
+ // FAIL WITHOUT CALLING ABORT
|
|
|
+ System.exit(67);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // commit the work
|
|
|
+ try {
|
|
|
+ committer.commitTask(taskContext);
|
|
|
+ } catch (IOException iee) {
|
|
|
+ // failure: abort
|
|
|
+ try {
|
|
|
+ committer.abortTask(taskContext);
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ LOG.warn("Failure cleaning up: " +
|
|
|
+ StringUtils.stringifyException(ioe));
|
|
|
+ }
|
|
|
+ throw iee;
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+That is: if and only if there is data to write, the Task requests clearance
|
|
|
+to do so from the AM. This ensures that speculative work is not committed if
|
|
|
+another task has written it, *and that even non-speculative work is not committed
|
|
|
+if the task has lost contact with the AM*.
|
|
|
+
|
|
|
+That is: task output is not committed if the Job's AM has failed or
|
|
|
+is in a separate network partition from the Task.
|
|
|
+
|
|
|
+If permission is granted, the commit call is invoked.
|
|
|
+If this doesn't work, `abortTask()` is called; all
|
|
|
+failures there are logged and swallowed.
|
|
|
+
|
|
|
+
|
|
|
+This method actually appears to be limited in that it does not guarantee
|
|
|
+that `committer.abortTask()` is always called: if the `umbilical.commitPending()` calls
|
|
|
+fail repeatedly and the Task process aborts, `committer.abortTask()` is not called.
|
|
|
+If this is due to a network partition, we can hope that the AM invokes
|
|
|
+`committer.abortTask()`, and that if it is an AM failure then a restarted AM can
|
|
|
+clean up its previous attempts. For the classic FileOutputCommitter, listing and
|
|
|
+deleting the previous attempt's data is straightforward. However, for S3 committers
|
|
|
+using Multipart Upload as the means of uploading uncommitted data, it is critical
|
|
|
+to ensure that pending uploads are always aborted. This can be done by
|
|
|
+
|
|
|
+* Making sure that all task-side failure branvches in `Task.done()` call `committer.abortTask()`.
|
|
|
+* Having job commit & abort cleaning up all pending multipart writes to the same directory
|
|
|
+tree. That is: require that no other jobs are writing to the same tree, and so
|
|
|
+list all pending operations and cancel them.
|
|
|
+* Add a CLI command to verify that there are no commits under a path; returning
|
|
|
+a non-zero exit code if there are. This can be used in testing.
|
|
|
+* Add a CLI command to purge commits under a path. This can be used in workflows,
|
|
|
+if felt necessary, and in test runs to isolate the output of different tests.
|
|
|
+* Document a recommended purge-pending-commit timeout for the system
|
|
|
+* Add probes in the integration tests to verify that pending uploads are never
|
|
|
+present after a write.
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+### How MapReduce uses the committer in the Application Master
|
|
|
+
|
|
|
+The AM uses a committer to set up and commit the job. To support failure
|
|
|
+and recovery of the AM, `OutputCommitter.isRecoverySupported()` is used
|
|
|
+to declare whether all the output of successful tasks can be used in the final
|
|
|
+job, or the entire job needs to be reset and repeated.
|
|
|
+`OutputCommitter.isCommitJobRepeatable()` addresses the other question:
|
|
|
+can a committer recover from a failure of the commit process itself.
|
|
|
+
|
|
|
+A committer is created in the Application Master, and handed to an instance
|
|
|
+of `org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler`,
|
|
|
+which then manages the job-level lifecycle.
|
|
|
+
|
|
|
+If the MRv1 API is used, the committer is chosen from the value of
|
|
|
+`"mapred.output.committer.class"`; in the MRv2 API the output format
|
|
|
+is instantiated, then asked for a committer using a task and task attempt ID of
|
|
|
+0. A committer is obtained from the output format via a call to
|
|
|
+`Committer.getOutputCommitter(taskContext)`, again using the task attempt context
|
|
|
+with the (job, 0, 0) task and attempt IDs. That is: even for the job committers,
|
|
|
+a task context is always passed in to the `OutputFormat` when requesting a committer.
|
|
|
+*It is critical for all implementations of `OutputCommitter.abortTask()` to
|
|
|
+be able to execute from the AM, rather than the container and host running
|
|
|
+the task. Furthermore, all information needed for the abort (paths, filesystem instances
|
|
|
+&c) *must* be retrieved from the `TaskAttemptContext` passed to the method,
|
|
|
+rather than relying on fields initiated from the context passed to the constructor.
|
|
|
+
|
|
|
+
|
|
|
+#### AM: Job setup: `OutputCommitter.setupJob()`
|
|
|
+
|
|
|
+This is initated in `org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.StartTransition`.
|
|
|
+It is queued for asynchronous execution in `org.apache.hadoop.mapreduce.v2.app.MRAppMaster.startJobs()`,
|
|
|
+which is invoked when the service is started. Thus: the job is set up when the
|
|
|
+AM is started.
|
|
|
+
|
|
|
+
|
|
|
+#### AM: Job Commit: `OutputCommitter.commitJob()`
|
|
|
+
|
|
|
+In the "default"/cluster filesystem, the `CommitterEventHandler` uses data in
|
|
|
+the staging area defined in `yarn.app.mapreduce.am.staging-dir`
|
|
|
+(default `/tmp/hadoop-yarn/staging/$user/.staging`), in a subdirectory
|
|
|
+named from the job ID.
|
|
|
+
|
|
|
+Three paths are built up for using the filesystem for tracking the state
|
|
|
+of the commit process, with a goal of making the commit operation recoverable,
|
|
|
+where supported.
|
|
|
+
|
|
|
+
|
|
|
+| name | role |
|
|
|
+|------|--------|
|
|
|
+| `COMMIT_STARTED` | mark the commencement of the job commit |
|
|
|
+| `COMMIT_SUCCESS` | mark the successful completion of the job commit |
|
|
|
+| `COMMIT_FAIL` | mark the failure of the job commit |
|
|
|
+
|
|
|
+These markers are used to manage job restart/failure of they happen during
|
|
|
+the job commit itself.
|
|
|
+
|
|
|
+When an AM starts up, it looks in its staging area for these files. as a way
|
|
|
+to determine the previous state of the job. If there are no `COMMIT_` marker files,
|
|
|
+the job is considered not to have attempted to commit itself yet.
|
|
|
+
|
|
|
+
|
|
|
+The presence of `COMMIT_SUCCESS` or `COMMIT_FAIL` are taken as evidence
|
|
|
+that the previous job completed successfully or unsucessfully; the AM
|
|
|
+then completes with a success/failure error code, without attempting to rerun
|
|
|
+the job.
|
|
|
+
|
|
|
+If `COMMIT_STARTED` exists but not either of the completion markers, then,
|
|
|
+if the committer declares that its job commit operation is repeatable
|
|
|
+(`Committer.isCommitJobRepeatable(jobContext) == true`), then an attempt
|
|
|
+is made to recommit the job, deleting the `COMMIT_STARTED` and commencing
|
|
|
+the commit process again.
|
|
|
+
|
|
|
+These `COMMIT_STARTED` files are simply 0-byte files, but are created
|
|
|
+with the overwrite bit only set to true if the job commit is considered
|
|
|
+repeatable:
|
|
|
+
|
|
|
+```java
|
|
|
+private void touchz(Path p, boolean overwrite) throws IOException {
|
|
|
+ fs.create(p, overwrite).close();
|
|
|
+}
|
|
|
+```
|
|
|
+That is: the atomicity of the `create(path, overwrite=false)` on the cluster
|
|
|
+filesystem is used to guarantee that only one process will attempt to commit
|
|
|
+a specific job.
|
|
|
+
|
|
|
+```java
|
|
|
+boolean commitJobIsRepeatable = committer.isCommitJobRepeatable(
|
|
|
+ event.getJobContext());
|
|
|
+try {
|
|
|
+ touchz(startCommitFile, commitJobIsRepeatable);
|
|
|
+ waitForValidCommitWindow();
|
|
|
+ committer.commitJob(event.getJobContext());
|
|
|
+ touchz(endCommitSuccessFile, commitJobIsRepeatable);
|
|
|
+} catch (Exception e) {
|
|
|
+ touchz(endCommitFailureFile, commitJobIsRepeatable);
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+The `waitForValidCommitWindow()` operation is important: it declares that
|
|
|
+the committer must not commit unless there has been communication with the YARN
|
|
|
+Resource Manager with in `yarn.app.mapreduce.am.job.committer.commit-window` milliseconds
|
|
|
+(default: 10,000). It does this by waiting until the next heartbeat it received.
|
|
|
+There's a possible bug here: if the interval is set too small the thread may
|
|
|
+permanently spin waiting a callback within the window. Ignoring that, this algorithm
|
|
|
+guarantees that
|
|
|
+
|
|
|
+1. As only one call can create a file with `overwrite=false`,
|
|
|
+ only one process's attempt to commit a non-repeatable job will proceed
|
|
|
+
|
|
|
+1. Only a process with contact with the YARN within the configured window
|
|
|
+ may commit a job.
|
|
|
+
|
|
|
+1. If the AM is partitioned from the rest of the network, provided that its clock
|
|
|
+is monotonically increasing at the same rate as the rest of the cluster, then
|
|
|
+the rest of the cluster can be confident that
|
|
|
+`yarn.app.mapreduce.am.ob.committer.commit-window` milliseconds after the AM
|
|
|
+successfully heartbeated to the YARN RM, then the output of this job attempt
|
|
|
+will *never* be committed. This permits 1 job to run simultaneously, but
|
|
|
+helps ensure that only one of them will attempt to commit.
|
|
|
+
|
|
|
+1. Provided YARN heartbeats are only sent to the AM which successfully created
|
|
|
+the `COMMIT_STARTED` file, it will initiate the commit operation.
|
|
|
+
|
|
|
+Possible issues with this algorithm:
|
|
|
+
|
|
|
+* The `COMMIT_STARTED` file is created before waiting to get a heartbeat. It
|
|
|
+may be that this AM has lost contact with YARN, but doesn't know it yet. When the
|
|
|
+YARN liveness protocols eventually time out, the AM will correctly terminate,
|
|
|
+but as the `COMMIT_STARTED` file has been created at this point, no other launched
|
|
|
+AM will be able to commit.
|
|
|
+
|
|
|
+* If two committers attempt to create `COMMIT_STARTED` files on a no-repeatable
|
|
|
+commit, one will succeed, wait for a heartbeat then attempt a (possibly slow) commit.
|
|
|
+The second committer will fail, and will *immediately* create a `COMMIT_FAILED` file.
|
|
|
+As a result, the state of the staging area will imply that the commit has failed,
|
|
|
+when really it is in progress, and that only the second process failed.
|
|
|
+
|
|
|
+It would seem safer to address this through
|
|
|
+1. Waiting for a heartbeat before creating the `COMMIT_STARTED` file.
|
|
|
+1. Maybe: not creating the `COMMIT_FAILED` file if the failure happens when
|
|
|
+trying to create the `COMMIT_STARTED` file. That is: only a process which
|
|
|
+successfully created the `COMMIT_STARTED` file may indicate that a commit has failed.
|
|
|
+
|
|
|
+
|
|
|
+### AM: Cancelling job commit
|
|
|
+
|
|
|
+The thread performing the commit is interrupted; the `CommitterEventHandler`
|
|
|
+awaits for it to finish. (set in `yarn.app.mapreduce.am.job.committer.cancel-timeout`
|
|
|
+as milliseconds).
|
|
|
+
|
|
|
+### AM: Task Abort
|
|
|
+
|
|
|
+The AM may call the `OutputCommitter.taskAbort()` with a task attempt context,
|
|
|
+when handling the failure/loss of a container. That is: on container failure,
|
|
|
+the task abort operation is executed in the AM, using the AM's committer.
|
|
|
+This avoids the need to create a new container, and means that the "best-effort"
|
|
|
+task abort does cope with container failures.
|
|
|
+
|
|
|
+A partition between the AM and the task container means that this AM-executed
|
|
|
+task abort may take place while a task in the partitioned container is still
|
|
|
+executing. Unless output writing operations will fail after the abort operation,
|
|
|
+the partitioned task may not become aware of the partition until it's own task
|
|
|
+commit sequence in `Task.done()`, when `talkToAMTGetPermissionToCommit()`
|
|
|
+
|
|
|
+# Requirements of an S3A Committer
|
|
|
+
|
|
|
+1. Support an eventually consistent S3 object store as a reliable direct
|
|
|
+destination of work through the S3A filesystem client.
|
|
|
+1. Efficient: implies no rename, and a minimal amount of delay in the job driver's
|
|
|
+task and job commit phases,
|
|
|
+1. Support task failure and speculation.
|
|
|
+1. Can be used by existing code: Hadoop MapReduce, Spark, Hive.
|
|
|
+1. Retrofittable to existing subclasses of FileOutputFormat and/or compatible
|
|
|
+with committers which expect a specific FileOutputFormat.
|
|
|
+1. Clean up uncommitted data from all task attempts, all previous attempts of
|
|
|
+the job, and any previous incompleted jobs.
|
|
|
+1. Security: not to permit privilege escalation from other users with
|
|
|
+write access to the same file system(s).
|
|
|
+
|
|
|
+
|
|
|
+## Features of S3 and the S3A Client
|
|
|
+
|
|
|
+
|
|
|
+A core problem is that
|
|
|
+[object stores are not filesystems](../../../hadoop-project-dist/hadoop-common/filesystem/introduction.html);
|
|
|
+how `rename()` has been emulated in the S3A client means that both the existing
|
|
|
+MR committer algorithms have significant performance problems.
|
|
|
+
|
|
|
+1. Single-object renames are implemented as a copy and delete sequence.
|
|
|
+1. COPY is atomic, but overwrites cannot be prevented.
|
|
|
+1. Amazon S3 is eventually consistent on listings, deletes and updates.
|
|
|
+1. Amazon S3 has create consistency, however, the negative response of a HEAD/GET
|
|
|
+performed on a path before an object was created can be cached, unintentionally
|
|
|
+creating a create inconsistency. The S3A client library does perform such a check,
|
|
|
+on `create()` and `rename()` to check the state of the destination path, and
|
|
|
+so, whether the operation is permitted.
|
|
|
+1. Multi-object renames are sequential or parallel single object COPY+DELETE operations:
|
|
|
+non atomic, `O(data)` and, on failure, can leave the filesystem in an unknown
|
|
|
+state.
|
|
|
+1. There is a PUT operation, capable of uploading 5GB of data in one HTTP request.
|
|
|
+1. The PUT operation is atomic, but there is no PUT-no-overwrite option.
|
|
|
+1. There is a multipart POST/PUT sequence for uploading larger amounts of data
|
|
|
+in a sequence of PUT requests.
|
|
|
+
|
|
|
+
|
|
|
+The Hadoop S3A Filesystem client supports PUT and multipart PUT for uploading
|
|
|
+data, with the `S3ABlockOutputStream` of HADOOP-13560 uploading written data
|
|
|
+as parts of a multipart PUT once the threshold set in the configuration
|
|
|
+parameter `fs.s3a.multipart.size` (default: 100MB).
|
|
|
+
|
|
|
+[S3Guard](./s3guard.html) adds an option of consistent view of the filesystem
|
|
|
+to all processes using the shared DynamoDB table as the authoritative store of
|
|
|
+metadata. Some S3-compatible object stores are fully consistent; the
|
|
|
+proposed algorithm is designed to work with such object stores without the
|
|
|
+need for any DynamoDB tables.
|
|
|
+
|
|
|
+## Related work: Spark's `DirectOutputCommitter`
|
|
|
+
|
|
|
+One implementation to look at is the
|
|
|
+[`DirectOutputCommitter` of Spark 1.6](https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala).
|
|
|
+
|
|
|
+This implements a zero rename commit by subclassing the `ParquetOutputCommitter`
|
|
|
+and then
|
|
|
+
|
|
|
+1. Returning the destination directory as the task working directory.
|
|
|
+1. Subclassing all the task commit/abort operations to be no-ops.
|
|
|
+
|
|
|
+With the working directory as the destination directory, there is no need
|
|
|
+to move/rename the task output on a successful commit. However, it is flawed.
|
|
|
+There is no notion of "committing" or "aborting" a task, hence no ability to
|
|
|
+handle speculative execution or failures. This is why the committer
|
|
|
+was removed from Spark 2 [SPARK-10063](https://issues.apache.org/jira/browse/SPARK-10063)
|
|
|
+
|
|
|
+There is also the issue that work-in-progress data is visible; this may or may
|
|
|
+not be a problem.
|
|
|
+
|
|
|
+## Related work: IBM's "Stocator" committer
|
|
|
+
|
|
|
+IBM's [Stocator](https://github.com/SparkTC/stocator) can transform indirect
|
|
|
+writes of V1/V2 committers into direct writes to the destination directory.
|
|
|
+
|
|
|
+Hpw does it do this? It's a special Hadoop `FileSystem` implementation which
|
|
|
+recognizes writes to `_temporary` paths and translate them to writes to the
|
|
|
+base directory. As well as translating the write operation, it also supports
|
|
|
+a `getFileStatus()` call on the original path, returning details on the file
|
|
|
+at the final destination. This allows for committing applications to verify
|
|
|
+the creation/existence/size of the written files (in contrast to the magic
|
|
|
+committer covdered below).
|
|
|
+
|
|
|
+The FS targets Openstack Swift, though other object stores are supportable through
|
|
|
+different backends.
|
|
|
+
|
|
|
+This solution is innovative in that it appears to deliver the same semantics
|
|
|
+(and hence failure modes) as the Spark Direct OutputCommitter, but which
|
|
|
+does not need any changs in either Spark *or* the Hadoop committers. In contrast,
|
|
|
+the committers proposed here combines changing the Hadoop MR committers for
|
|
|
+ease of pluggability, and offers a new committer exclusivley for S3, one
|
|
|
+strongly dependent upon and tightly integrated with the S3A Filesystem.
|
|
|
+
|
|
|
+The simplicity of the Stocator committer is something to appreciate.
|
|
|
+
|
|
|
+## Background: The S3 multi-part PUT mechanism
|
|
|
+
|
|
|
+In the [S3 REST API](http://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html),
|
|
|
+multipart uploads allow clients to upload a series of "Parts" of a file,
|
|
|
+then commit the upload with a final call.
|
|
|
+
|
|
|
+1. Caller initiates a multipart request, including the destination bucket, key
|
|
|
+and metadata.
|
|
|
+
|
|
|
+ POST bucket.s3.aws.com/path?uploads
|
|
|
+
|
|
|
+ An UploadId is returned
|
|
|
+
|
|
|
+1. Caller uploads one or more parts.
|
|
|
+
|
|
|
+ PUT bucket.s3.aws.com/path?partNumber=PartNumber&uploadId=UploadId
|
|
|
+
|
|
|
+ The part number is used to declare the ordering of the PUTs; they
|
|
|
+ can be uploaded in parallel and out of order.
|
|
|
+ All parts *excluding the final part* must be 5MB or larger.
|
|
|
+ Every upload completes with an etag returned
|
|
|
+
|
|
|
+1. Caller completes the operation
|
|
|
+
|
|
|
+ POST /ObjectName?uploadId=UploadId
|
|
|
+ <CompleteMultipartUpload>
|
|
|
+ <Part><PartNumber>(number)<PartNumber><ETag>(Tag)</ETag></Part>
|
|
|
+ ...
|
|
|
+ </CompleteMultipartUpload>
|
|
|
+
|
|
|
+ This final call lists the etags of all uploaded parts and the actual ordering
|
|
|
+ of the parts within the object.
|
|
|
+
|
|
|
+The completion operation is apparently `O(1)`; presumably the PUT requests
|
|
|
+have already uploaded the data to the server(s) which will eventually be
|
|
|
+serving up the data for the final path. All that is needed to complete
|
|
|
+the upload is to construct an object by linking together the files in
|
|
|
+the server's local filesystem and udate an entry the index table of the
|
|
|
+object store.
|
|
|
+
|
|
|
+In the S3A client, all PUT calls in the sequence and the final commit are
|
|
|
+initiated by the same process. *This does not have to be the case*.
|
|
|
+It is that fact, that a different process may perform different parts
|
|
|
+of the upload, which make this algorithm viable.
|
|
|
+
|
|
|
+
|
|
|
+## The Netfix "Staging" committer
|
|
|
+
|
|
|
+Ryan Blue, of Netflix, has submitted an alternate committer, one which has a
|
|
|
+number of appealing features
|
|
|
+
|
|
|
+* Doesn't have any requirements of the destination object store,
|
|
|
+* Known to work.
|
|
|
+
|
|
|
+The final point is not to be underestimated, es not even
|
|
|
+a need for a consistency layer.
|
|
|
+* Overall a simpler design.pecially given the need to
|
|
|
+be resilient to the various failure modes which may arise.
|
|
|
+
|
|
|
+
|
|
|
+The commiter writes task outputs to a temporary directory on the local FS.
|
|
|
+Task outputs are directed to the local FS by `getTaskAttemptPath` and `getWorkPath`.
|
|
|
+On task commit, the committer enumerates files in the task attempt directory (ignoring hidden files).
|
|
|
+Each file is uploaded to S3 using the [multi-part upload API](http://docs.aws.amazon.com/AmazonS3/latest/dev/mpuoverview.html),
|
|
|
+
|
|
|
+The information needed to commit the upload is saved into HDFS and committed
|
|
|
+via that protocol: when the job commits, the pending uploads of the successful
|
|
|
+tasks are all committed.
|
|
|
+
|
|
|
+
|
|
|
+### Commit logic
|
|
|
+
|
|
|
+The core algorithm is as follows:
|
|
|
+
|
|
|
+1. The destination directory for output (e.g. `FileOutputFormat` and subclasses)
|
|
|
+is a local `file://` reference.
|
|
|
+1. Task commit initiates the multipart PUT to the destination object store.
|
|
|
+1. A list of every pending PUT for task is persisted to a single file
|
|
|
+within a consistent, cluster-wide filesystem. For Netflix, that is HDFS.
|
|
|
+1. The Standard `FileOutputCommitter` (algorithm 1) is used to manage the commit/abort of these
|
|
|
+files. That is: it copies only those lists of files to commit from successful tasks
|
|
|
+into a (transient) job commmit directory.
|
|
|
+1. The S3 job committer reads the pending file list for every task committed
|
|
|
+in HDFS, and completes those put requests.
|
|
|
+
|
|
|
+By using `FileOutputCommmitter` to manage the propagation of the lists of files
|
|
|
+to commit, the existing commit algorithm implicitly becomes that defining which
|
|
|
+files will be committed at the end of the job.
|
|
|
+
|
|
|
+
|
|
|
+The Netflix contribution has Hadoop `OutputCommitter` implementations for S3.
|
|
|
+
|
|
|
+There are 3 main classes:
|
|
|
+* `S3MultipartOutputCommitter` is a base committer class that handles commit logic. This should not be used directly.
|
|
|
+* `S3DirectoryOutputCommitter` for writing unpartitioned data to S3 with conflict resolution.
|
|
|
+* `S3PartitionedOutputCommitter` for writing partitioned data to S3 with conflict resolution.
|
|
|
+
|
|
|
+Callers should use `S3DirectoryOutputCommitter` for single-directory outputs,
|
|
|
+or `S3PartitionedOutputCommitter` for partitioned data.
|
|
|
+
|
|
|
+
|
|
|
+These S3 committers work by writing task outputs to a temporary directory on the local FS.
|
|
|
+Task outputs are directed to the local FS by `getTaskAttemptPath` and `getWorkPath`.
|
|
|
+
|
|
|
+
|
|
|
+### Conflict Resolution
|
|
|
+
|
|
|
+The single-directory and partitioned committers handle conflict resolution by
|
|
|
+checking whether target paths exist in S3 before uploading any data.
|
|
|
+There are 3 conflict resolution modes, controlled by setting `fs.s3a.committer.staging.conflict-mode`:
|
|
|
+
|
|
|
+* `fail`: Fail a task if an output directory or partition already exists. (Default)
|
|
|
+* `append`: Upload data files without checking whether directories or partitions already exist.
|
|
|
+* `replace`: If an output directory exists, delete it so the new data replaces the current content.
|
|
|
+
|
|
|
+The partitioned committer enforces the conflict mode when a conflict is detected with output data, not before the job runs.
|
|
|
+Conflict resolution differs from an output mode because it does not enforce the mode when there is no conflict.
|
|
|
+For example, overwriting a partition should remove all sub-partitions and data it contains, whether or not new output is created.
|
|
|
+Conflict resolution will only replace partitions that have output data.
|
|
|
+
|
|
|
+When the conflict mode is `replace`, conflicting directories are removed during
|
|
|
+job commit. Data is only deleted if all tasks have completed successfully.
|
|
|
+
|
|
|
+A UUID that identifies a write is added to filenames that are uploaded to S3.
|
|
|
+This allows rolling back data from jobs that fail during job commit (see failure cases below) and avoids
|
|
|
+file-level conflicts when appending data to existing directories.
|
|
|
+
|
|
|
+
|
|
|
+*Note* the checks for existence are made via `S3AFileSystem.getFileStatus()` requests of the destination paths.
|
|
|
+Unless the view of the S3 store is consistent, it may be that a newly-deleted object
|
|
|
+is still discovered in the probe, so a commit fail, even when there is no longer any actual conflict.
|
|
|
+
|
|
|
+### Performance
|
|
|
+
|
|
|
+Compared to the previous proposal, henceforth the "magic" committer, this
|
|
|
+committer, the "staging committer", adds the extra overhead of uploading
|
|
|
+each file at the end of every task. This is an `O(data)` operation; it can be
|
|
|
+parallelized, but is bounded by the bandwidth from compute node to S3, as
|
|
|
+well as the write/IOP capacity of the destination shard of S3. If many tasks
|
|
|
+complete at or near the same time, there may be a peak of bandwidth load
|
|
|
+slowing down the upload.
|
|
|
+
|
|
|
+Time to commit will be the same, and, given the Netflix committer has already
|
|
|
+implemented the paralellization logic here, a time of `O(files/threads)`.
|
|
|
+
|
|
|
+### Resilience
|
|
|
+
|
|
|
+There's already a lot of code in the task and job commits to handle failure.
|
|
|
+
|
|
|
+Any failure in a commit triggers a best-effort abort/revert of the commit
|
|
|
+actions for a task/job.
|
|
|
+
|
|
|
+Task commits delegate to the `FileOutputCommitter` to ensure that only one task's
|
|
|
+output reaches the job commit.
|
|
|
+
|
|
|
+Similarly, if a task is aborted, temporary output on the local FS is removed.
|
|
|
+
|
|
|
+If a task dies while the committer is running, it is possible for data to be
|
|
|
+eft on the local FS or as unfinished parts in S3.
|
|
|
+Unfinished upload parts in S3 are not visible to table readers and are cleaned
|
|
|
+up following the rules in the target bucket's life-cycle policy.
|
|
|
+
|
|
|
+Failures during job commit are handled by deleting any files that have already
|
|
|
+been completed and aborting the remaining uploads.
|
|
|
+Because uploads are completed individually, the files that are deleted were visible to readers.
|
|
|
+
|
|
|
+If the process dies while the job committer is running, there are two possible failures:
|
|
|
+
|
|
|
+1. Some directories that would be replaced have been deleted, but no new data is visible.
|
|
|
+2. Some new data is visible but is not complete, and all replaced directories have been removed.
|
|
|
+ Only complete files are visible.
|
|
|
+
|
|
|
+If the process dies during job commit, cleaning up is a manual process.
|
|
|
+File names include a UUID for each write so that files can be identified and removed.
|
|
|
+
|
|
|
+
|
|
|
+**Failure during task execution**
|
|
|
+
|
|
|
+All data is written to local temporary files; these need to be cleaned up.
|
|
|
+
|
|
|
+The job must ensure that the local (pending) data is purged.
|
|
|
+
|
|
|
+**Failure during task commit**
|
|
|
+
|
|
|
+
|
|
|
+A process failure during the upload process will result in the
|
|
|
+list of pending multipart PUTs to *not* be persisted to the cluster filesystem.
|
|
|
+This window is smaller than the entire task execution, but still potentially
|
|
|
+significant, at least for large uploads.
|
|
|
+
|
|
|
+Per-file persistence, or incremental overwrites of the upload list may
|
|
|
+reduce the problems here, but there would still be a small risk of
|
|
|
+an outstanding multipart upload not being recorded
|
|
|
+
|
|
|
+**Explicit Task abort before task commit**
|
|
|
+
|
|
|
+Task will delete all local data; no uploads will be initiated.
|
|
|
+
|
|
|
+**Failure to communicate with S3 during data upload**
|
|
|
+
|
|
|
+If an upload fails, tasks will:
|
|
|
+
|
|
|
+* Retry using the retry policies implemented in the S3AFileSystem classes
|
|
|
+and the AWS libraries.
|
|
|
+* Eventually: attempt to abort outstanding multipart uploads already initiated.
|
|
|
+* Remove temporary files on the local FS.
|
|
|
+
|
|
|
+
|
|
|
+**Explicit Job Abort**
|
|
|
+
|
|
|
+All in-progress tasks are aborted and cleaned up. The pending commit data
|
|
|
+of all completed tasks can be loaded, outstanding multipart PUT requests aborted.
|
|
|
+
|
|
|
+
|
|
|
+This is done by
|
|
|
+
|
|
|
+1. Listing all local files, with a best effort read attempt followed by
|
|
|
+an abort of all successfully read files.
|
|
|
+1. List and abort all pending multipart uploads.
|
|
|
+
|
|
|
+Because of action #2, action #1 is superflous. It is retained so as to leave
|
|
|
+open the option of making action #2 a configurable option -which would be
|
|
|
+required to handle the use case of >1 partitioned commit running simultaneously/
|
|
|
+
|
|
|
+**Job Driver failure before Job Commit**
|
|
|
+
|
|
|
+Because the local data is managed with the v1 commit algorithm, the
|
|
|
+second attempt of the job will recover all the outstanding commit data
|
|
|
+of the first attempt; those tasks will not be rerun.
|
|
|
+
|
|
|
+This also ensures that on a job abort, the invidual tasks' .pendingset
|
|
|
+files can be read and used to initiate the abort of those uploads.
|
|
|
+That is: a recovered job can clean up the pending writes of the previous job
|
|
|
+
|
|
|
+If the query engine does not support multiple job attempts, then the
|
|
|
+pending commit data will not be recovered; an explicit abort operation will
|
|
|
+need to be initiated (we will add a CLI command for this), or the S3 bucket
|
|
|
+must be configured to automatically delete the pending request.
|
|
|
+
|
|
|
+**Job Driver failure during Job Commit**
|
|
|
+
|
|
|
+Those uploads already executed by a failed job commit will persist; those
|
|
|
+yet to execute will remain outstanding.
|
|
|
+
|
|
|
+The committer currently declares itself as non-recoverble, but that
|
|
|
+may not actually hold, as the recovery process could be one of:
|
|
|
+
|
|
|
+1. Enumerate all job commits from the .pendingset files (*:= Commits*).
|
|
|
+1. List all outstanding uploads under the destination path (*:= Outstandings*)..
|
|
|
+1. List all written files (for better consistency, via a GET call on the known
|
|
|
+filenames)
|
|
|
+1. Identify all files which are yet to be completed (*Commits - Written*)/
|
|
|
+1. Verify that the set of pending uploads matches (*Outstanding = (Commits - Written)*)
|
|
|
+
|
|
|
+The main problem here is the progress of the job-commit-time conflict resolution
|
|
|
+process: how to determine if it completed, as the only way to be confident
|
|
|
+that all files in the destination directory are to be retained is by knowing
|
|
|
+that the pre-commit phase completed. This could be implicitly determined
|
|
|
+based on the rule "no uploads are committed until precommit is completed".
|
|
|
+If it can be determined that 1+ upload has completed, then it could be inferred
|
|
|
+that precommit had completed and so the job could be repeated.
|
|
|
+
|
|
|
+This is dangerous territory to delve into. For now, the committer declares
|
|
|
+itself as unrecoverable.
|
|
|
+
|
|
|
+**Entire application failure before any task commit**
|
|
|
+
|
|
|
+Data is left on local systems, in the temporary directories. This may
|
|
|
+not be cleaned up.
|
|
|
+
|
|
|
+**Entire application failure after one or more task commits, before job commit**
|
|
|
+
|
|
|
+* A multipart PUT request will be outstanding for every pending write.
|
|
|
+* A temporary directory in HDFS will list all known pending requests.
|
|
|
+
|
|
|
+**Job complete/abort after >1 task failure**
|
|
|
+
|
|
|
+1. All pending put data listed in the job completion directory needs to be loaded
|
|
|
+and then cancelled.
|
|
|
+1. Any other pending writes to the dest dir need to be enumerated and aborted.
|
|
|
+This catches the situation of a task failure before the output is written.
|
|
|
+1. All pending data in local dirs need to be deleted.
|
|
|
+
|
|
|
+Issue: what about the destination directory: overwrite or not? It could well
|
|
|
+depend upon the merge policy.
|
|
|
+
|
|
|
+
|
|
|
+#### Overall Resilience
|
|
|
+
|
|
|
+1. The only time that incomplete work will appear in the destination directory
|
|
|
+is if the job commit operation fails partway through.
|
|
|
+1. There's a risk of leakage of local filesystem data; this will need to
|
|
|
+be managed in the response to a task failure.
|
|
|
+1. There's a risk of uncommitted multipart PUT operations remaining outstanding,
|
|
|
+operations which will run up bills until cancelled. (as indeed, so does the Magic Committer).
|
|
|
+
|
|
|
+
|
|
|
+For cleaning up PUT commits, as well as scheduled GC of uncommitted writes, we
|
|
|
+may want to consider having job setup list and cancel all pending commits
|
|
|
+to the destination directory, on the assumption that these are from a previous
|
|
|
+incomplete operation.
|
|
|
+
|
|
|
+We should add a "commit" command to the S3guard CLI to probe for, list and abort pending requests under
|
|
|
+a path, e.g. `--has-pending <path>`, `--list-pending <path>`, `--abort-pending <path>`.
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+## The "Magic" Committer
|
|
|
+
|
|
|
+Development on this committer began before Netflix donated their committer.
|
|
|
+
|
|
|
+By making changes to the `S3AFileSystem` and the `S3ABlockOutputStream`, this committer
|
|
|
+manages to postpone the completion of writes of all files written to special
|
|
|
+("magic") directories; the final destination of the write being altered to
|
|
|
+that of the final job destination. When the job is committed, the pending
|
|
|
+writes are instantiated.
|
|
|
+
|
|
|
+With the addition of the Netflix Staging committer, the actual committer
|
|
|
+code now shares common formats for the persistent metadadata and shared routines
|
|
|
+for parallel committing of work, including all the error handling based on
|
|
|
+the Netflix experience.
|
|
|
+
|
|
|
+It differs in that it directly streams data to S3 (there is no staging),
|
|
|
+and it also stores the lists of pending commits in S3 too. That mandates
|
|
|
+consistent metadata on S3, which S3Guard provides.
|
|
|
+
|
|
|
+
|
|
|
+### Core concept: A new/modified output stream for delayed PUT commits
|
|
|
+
|
|
|
+
|
|
|
+This algorithm uses a modified `S3ABlockOutputStream` Output stream, which, rather
|
|
|
+than commit any active multipart upload in the final `close()` operation,
|
|
|
+it instead saves enough information into the S3 repository for an independent
|
|
|
+process to be able to complete or abort the upload.
|
|
|
+
|
|
|
+Originally, in `OutputStream.close()`, it chose whether to perform a single PUT or to
|
|
|
+complete an ongoing multipart write.
|
|
|
+
|
|
|
+If a multipart PUT is in progress, then the stream waits for the ongoing uploads
|
|
|
+to complete (including any final block submitted), and then builds and PUTs
|
|
|
+the final multipart commit operation. The list of parts (and their ordering)
|
|
|
+has been built up during the opt
|
|
|
+
|
|
|
+In contrast, when writing to a delayed-commit file:
|
|
|
+
|
|
|
+1. A multipart write MUST always be initiated, even for small writes. This write
|
|
|
+MAY be initiated during the creation of the stream.
|
|
|
+
|
|
|
+1. Instead of committing the write in the `close()` call, perform a PUT to
|
|
|
+a path in the S3A repository with all the information needed to commit the operation.
|
|
|
+That is: the final path, the multipart upload ID, and the ordered list of etags
|
|
|
+for the uploaded parts.
|
|
|
+
|
|
|
+
|
|
|
+Recognising when a file is "special" is problematic; the normal `create(Path, Boolean)`
|
|
|
+call must recognize when the file being created is to be a delayed-commit file,
|
|
|
+so returning the special new stream.
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+This is done with a "magic" temporary directory name, `__magic`, to indicate that all files
|
|
|
+created under this path are not to be completed during the stream write process.
|
|
|
+Directories created under the path will still be created —this allows job- and
|
|
|
+task-specific directories to be created for individual job and task attempts.
|
|
|
+
|
|
|
+For example, the pattern `__magic/${jobID}/${taskId}` could be used to
|
|
|
+store pending commits to the final directory for that specific task. If that
|
|
|
+task is committed, all pending commit files stored in that path will be loaded
|
|
|
+and used to commit the final uploads.
|
|
|
+
|
|
|
+Consider a job with the final directory `/results/latest`
|
|
|
+
|
|
|
+ The intermediate directory for the task 01 attempt 01 of job `job_400_1` would be
|
|
|
+
|
|
|
+ /results/latest/__magic/job_400_1/_task_01_01
|
|
|
+
|
|
|
+This would be returned as the temp directory.
|
|
|
+
|
|
|
+When a client attempted to create the file
|
|
|
+`/results/latest/__magic/job_400_1/task_01_01/latest.orc.lzo` , the S3A FS would initiate
|
|
|
+a multipart request with the final destination of `/results/latest/latest.orc.lzo`.
|
|
|
+
|
|
|
+As data was written to the output stream, it would be incrementally uploaded as
|
|
|
+individual multipart PUT operations
|
|
|
+
|
|
|
+On `close()`, summary data would be written to the file
|
|
|
+`/results/latest/__magic/job400_1/task_01_01/latest.orc.lzo.pending`.
|
|
|
+This would contain the upload ID and all the parts and etags of uploaded data.
|
|
|
+
|
|
|
+
|
|
|
+#### Task commit
|
|
|
+
|
|
|
+The information needed to commit a task is moved from the task attempt
|
|
|
+to the job attempt.
|
|
|
+
|
|
|
+1. The task commit operation lists all `.pending` files in its attempt directory.
|
|
|
+1. The contents are loaded into a list of single pending uploads.
|
|
|
+1. These are merged into to a single `Pendingset` structure.
|
|
|
+1. Which is saved to a `.pendingset` file in the job attempt directory.
|
|
|
+1. Finally, the task attempt directory is deleted. In the example, this
|
|
|
+would be to `/results/latest/__magic/job400_1/task_01_01.pendingset`;
|
|
|
+
|
|
|
+
|
|
|
+A failure to load any of the single pending upload files (i.e. the file
|
|
|
+could not load or was considered invalid, the task is considered to
|
|
|
+have failed. All successfully loaded pending commits will be aborted, then
|
|
|
+the failure reported.
|
|
|
+
|
|
|
+Similarly, a failure to save the `.pendingset` file will trigger an
|
|
|
+abort of all its pending uploads.
|
|
|
+
|
|
|
+
|
|
|
+#### Job Commit
|
|
|
+
|
|
|
+The job committer loads all `.pendingset` files in its job attempt directory.
|
|
|
+
|
|
|
+A failure to load any of these files is considered a job failure; all
|
|
|
+pendingsets which could be loaded will be aborted.
|
|
|
+
|
|
|
+If all pendingsets were loaded, then every
|
|
|
+pending commit in the job will be committed. If any one of these commits
|
|
|
+failed, then all successful commits will be reverted by deleting the destination
|
|
|
+file.
|
|
|
+
|
|
|
+#### Supporting directory trees
|
|
|
+
|
|
|
+To allow tasks to generate data in subdirectories, a special filename `__base`
|
|
|
+will be used to provide an extra cue as to the final path. When mapping an output
|
|
|
+path `/results/latest/__magic/job_400/task_01_01/__base/2017/2017-01-01.orc.lzo.pending`
|
|
|
+to a final destination path, the path will become `/results/latest/2017/2017-01-01.orc.lzo`.
|
|
|
+That is: all directories between `__magic` and `__base` inclusive will be ignored.
|
|
|
+
|
|
|
+
|
|
|
+**Issues**
|
|
|
+
|
|
|
+Q. What if there are some non-`.pending` files in the task attempt directory?
|
|
|
+
|
|
|
+A. This can only happen if the magic committer is being used in an S3A client
|
|
|
+which does not have the "magic path" feature enabled. This will be checked for
|
|
|
+during job and task committer initialization.
|
|
|
+
|
|
|
+
|
|
|
+### Failure cases
|
|
|
+
|
|
|
+#### Network Partitioning
|
|
|
+
|
|
|
+The job/task commit protocol is expected to handle this with the task
|
|
|
+only committing work when the job driver tells it to. A network partition
|
|
|
+should trigger the task committer's cancellation of the work (this is a protcol
|
|
|
+above the committers).
|
|
|
+
|
|
|
+#### Job Driver failure
|
|
|
+
|
|
|
+The job will be restarted. When it completes it will delete all
|
|
|
+outstanding requests to the destination directory which it has not
|
|
|
+committed itself.
|
|
|
+
|
|
|
+#### Task failure
|
|
|
+
|
|
|
+The task will be restarted. Pending work of the task will not be committed;
|
|
|
+when the job driver cleans up it will cancel pending writes under the directory.
|
|
|
+
|
|
|
+#### Multiple jobs targeting the same destination directory
|
|
|
+
|
|
|
+This leaves things in an inderminate state.
|
|
|
+
|
|
|
+
|
|
|
+#### Failure during task commit
|
|
|
+
|
|
|
+Pending uploads will remain, but no changes will be visible.
|
|
|
+
|
|
|
+If the `.pendingset` file has been saved to the job attempt directory, the
|
|
|
+task has effectively committed, it has just failed to report to the
|
|
|
+controller. This will cause complications during job commit, as there
|
|
|
+may be two task PendingSet committing the same files, or committing
|
|
|
+files with
|
|
|
+
|
|
|
+*Proposed*: track task ID in pendingsets, recognise duplicates on load
|
|
|
+and then respond by cancelling one set and committing the other. (or fail?)
|
|
|
+
|
|
|
+#### Failure during job commit
|
|
|
+
|
|
|
+The destination will be left in an unknown state.
|
|
|
+
|
|
|
+#### Failure during task/job abort
|
|
|
+
|
|
|
+Failures in the abort process are not well handled in either the committers
|
|
|
+or indeed in the applications which use these committers. If an abort
|
|
|
+operation fails, what can be done?
|
|
|
+
|
|
|
+While somewhat hypothetical for the use case of a task being aborted due
|
|
|
+to the protocol (e.g. speculative jobs being aborted), the abort task/abort job
|
|
|
+calls may be made as part of the exception handling logic on a failure to commit.
|
|
|
+As such, the caller may assume that the abort does not fail: if it does,
|
|
|
+the newly thrown exception may hide the original problem.
|
|
|
+
|
|
|
+Two options present themselves
|
|
|
+
|
|
|
+1. Catch, log and swallow failures in the `abort()`
|
|
|
+1. Throw the exceptions, and expect the callers to handle them: review, fix
|
|
|
+and test that code as appropriate.
|
|
|
+
|
|
|
+Fixing the calling code does seem to be the best strategy, as it allows the
|
|
|
+failure to be explictly handled in the commit protocol, rather than hidden
|
|
|
+in the committer.::OpenFile
|
|
|
+
|
|
|
+#### Preemption
|
|
|
+
|
|
|
+Preemption is the explicit termination of work at the behest of the cluster
|
|
|
+scheduler. It's a failure, but a special one: pre-empted tasks must not be counted
|
|
|
+as a failure in any code which only allows a limited number of trackers, and the
|
|
|
+Job driver can assume that the task was successfully terminated.
|
|
|
+
|
|
|
+Job drivers themselves may be preempted.
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+#### Cleaning up after complete job failure
|
|
|
+
|
|
|
+One failure case is that the entire execution framework failed; a new process
|
|
|
+must identify outstanding jobs with pending work, and abort them, then delete
|
|
|
+the appropriate `__magic` directories.
|
|
|
+
|
|
|
+This can be done either by scanning the directory tree for `__magic` directories
|
|
|
+and scanning underneath them, or by using the `listMultipartUploads()` call to
|
|
|
+list multipart uploads under a path, then cancel them. The most efficient solution
|
|
|
+may be to use `listMultipartUploads` to identify all outstanding request, and use that
|
|
|
+to identify which requests to cancel, and where to scan for `__magic` directories.
|
|
|
+This strategy should address scalability problems when working with repositories
|
|
|
+with many millions of objects —rather than list all keys searching for those
|
|
|
+with `/__magic/**/*.pending` in their name, work backwards from the active uploads to
|
|
|
+the directories with the data.
|
|
|
+
|
|
|
+We may also want to consider having a cleanup operationn in the S3 CLI to
|
|
|
+do the full tree scan and purge of pending items; give some statistics on
|
|
|
+what was found. This will keep costs down and help us identify problems
|
|
|
+related to cleanup.
|
|
|
+
|
|
|
+### Performance
|
|
|
+
|
|
|
+The time to upload is that of today's block upload (`s3a.fast.upload=true`)
|
|
|
+output stream; ongoing through the write, and in the `close()` operation,
|
|
|
+a delay to upload any pending data and await all outstanding uploads to complete.
|
|
|
+There wouldn't be any overhead of the final completion request. If no
|
|
|
+data had yet been uploaded, the `close()` time would be that of the initiate
|
|
|
+multipart request and the final put. This could perhaps be simplified by always
|
|
|
+requesting a multipart ID on stream creation.
|
|
|
+
|
|
|
+The time to commit each task is `O(files)`: all `.pending` files in and under the task attempt
|
|
|
+directory will be listed, their contents read and then an aggregate `.pendingset`
|
|
|
+file PUT to the job attempt directory. The `.pending` files are then deleted.
|
|
|
+
|
|
|
+The time to commit a job will be `O(files/threads)`
|
|
|
+
|
|
|
+Every `.pendingset` file in the job attempt directory must be loaded, and a PUT
|
|
|
+request issued for every incomplete upload listed in the files.
|
|
|
+
|
|
|
+Note that it is the bulk listing of all children which is where full consistency
|
|
|
+is required. If instead, the list of files to commit could be returned from
|
|
|
+tasks to the job committer, as the Spark commit protocol allows, it would be
|
|
|
+possible to commit data to an inconsistent object store.
|
|
|
+
|
|
|
+### Cost
|
|
|
+
|
|
|
+Uncommitted data in an incomplete multipart upload is billed at the storage
|
|
|
+cost of the S3 bucket. To keep costs down, outstanding data from
|
|
|
+failed jobs must be deleted. This can be done through S3 bucket lifecycle policies,
|
|
|
+or some command tools which we would need to write.
|
|
|
+
|
|
|
+### Limitations of this algorithm
|
|
|
+
|
|
|
+1. Files will not be visible after the `close()` call, as they will not exist.
|
|
|
+Any code which expected pending-commit files to be visible will fail.
|
|
|
+
|
|
|
+1. Failures of tasks and jobs will leave outstanding multipart uploads. These
|
|
|
+will need to be garbage collected. S3 now supports automated cleanup; S3A has
|
|
|
+the option to do it on startup, and we plan for the `hadoop s3` command to
|
|
|
+allow callers to explicitly do it. If tasks were to explicitly write the upload
|
|
|
+ID of writes as a write commenced, cleanup by the job committer may be possible.
|
|
|
+
|
|
|
+1. The time to write very small files may be higher than that of PUT and COPY.
|
|
|
+We are ignoring this problem as not relevant in production; any attempt at optimizing
|
|
|
+small file operations will only complicate development, maintenance and testing.
|
|
|
+
|
|
|
+1. The files containing temporary information could be mistaken for actual
|
|
|
+data.
|
|
|
+
|
|
|
+1. It could potentially be harder to diagnose what is causing problems. Lots of
|
|
|
+logging can help, especially with debug-level listing of the directory structure
|
|
|
+of the temporary directories.
|
|
|
+
|
|
|
+1. To reliably list all PUT requests outstanding, we need list consistency
|
|
|
+In the absence of a means to reliably identify when an S3 endpoint is consistent, people
|
|
|
+may still use eventually consistent stores, with the consequent loss of data.
|
|
|
+
|
|
|
+1. If there is more than one job simultaneously writing to the same destination
|
|
|
+directories, the output may get confused. This appears to hold today with the current
|
|
|
+commit algorithms.
|
|
|
+
|
|
|
+1. It is possible to create more than one client writing to the
|
|
|
+same destination file within the same S3A client/task, either sequentially or in parallel.
|
|
|
+
|
|
|
+1. Even with a consistent metadata store, if a job overwrites existing
|
|
|
+files, then old data may still be visible to clients reading the data, until
|
|
|
+the update has propagated to all replicas of the data.
|
|
|
+
|
|
|
+1. If the operation is attempting to completely overwrite the contents of
|
|
|
+a directory, then it is not going to work: the existing data will not be cleaned
|
|
|
+up. A cleanup operation would need to be included in the job commit, deleting
|
|
|
+all files in the destination directory which where not being overwritten.
|
|
|
+
|
|
|
+1. It requires a path element, such as `__magic` which cannot be used
|
|
|
+for any purpose other than for the storage of pending commit data.
|
|
|
+
|
|
|
+1. Unless extra code is added to every FS operation, it will still be possible
|
|
|
+to manipulate files under the `__magic` tree. That's not bad, it just potentially
|
|
|
+confusing.
|
|
|
+
|
|
|
+1. As written data is not materialized until the commit, it will not be possible
|
|
|
+for any process to read or manipulated a file which it has just created.
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+### Changes to `S3ABlockOutputStream`
|
|
|
+
|
|
|
+To avoid having to copy and paste the `S3ABlockOutputStream` it has
|
|
|
+been modified to be constructed with a `PutTracker` class to
|
|
|
+managed the immediate/delayed completion of uploads.
|
|
|
+It will be called at appropriate points.
|
|
|
+
|
|
|
+* Initialization, returning a marker to indicate whether or not multipart
|
|
|
+upload is commence immediately.
|
|
|
+* Multipart PUT init.
|
|
|
+* Single put init (not used in this algorithm, but useful for completeness).
|
|
|
+* Block upload init, failure and completion (from the relevant thread).
|
|
|
+* `close()` entered; all blocks completed —returning a marker to indicate
|
|
|
+whether any outstanding multipart should be committed.
|
|
|
+* Multipart abort in `abort()` call (maybe: move core logic elsewhere).
|
|
|
+
|
|
|
+The base implementation would do nothing
|
|
|
+except declare that the MPU must be executed in the `close()` call.
|
|
|
+
|
|
|
+The S3A Committer version, would
|
|
|
+1. Always initiate a during initialization
|
|
|
+1. In `close()` operation save all the data required to commit later.
|
|
|
+
|
|
|
+
|
|
|
+## Integrating the Committers with Hadoop MapReduce
|
|
|
+
|
|
|
+
|
|
|
+In order to support the ubiquitous `FileOutputFormat` and subclasses,
|
|
|
+S3A Committers will need somehow be accepted as a valid committer by the class,
|
|
|
+a class which explicity expects the output committer to be `FileOutputCommitter`
|
|
|
+
|
|
|
+```java
|
|
|
+public Path getDefaultWorkFile(TaskAttemptContext context,
|
|
|
+ String extension) throws IOException{
|
|
|
+ PathOutputCommitter committer =
|
|
|
+ (PathOutputCommitter) getOutputCommitter(context);
|
|
|
+ return new Path(committer.getWorkPath(), getUniqueFile(context,
|
|
|
+ getOutputName(context), extension));
|
|
|
+}
|
|
|
+
|
|
|
+```
|
|
|
+
|
|
|
+Here are some options which have been considered, explored and discarded
|
|
|
+
|
|
|
+1. Adding more of a factory mechanism to create `FileOutputCommitter` instances;
|
|
|
+subclass this for S3A output and return it. The complexity of `FileOutputCommitter`
|
|
|
+and of supporting more dynamic consturction makes this dangerous from an implementation
|
|
|
+and maintenance perspective.
|
|
|
+
|
|
|
+1. Add a new commit algorithmm "3", which actually reads in the configured
|
|
|
+classname of a committer which it then instantiates and then relays the commit
|
|
|
+operations, passing in context information. Ths new committer interface would
|
|
|
+add methods for methods and attributes. This is viable, but does still change
|
|
|
+the existing Committer code in a way which may be high-maintenance.
|
|
|
+
|
|
|
+1. Allow the `FileOutputFormat` class to take any task/job context committer
|
|
|
+which implemented the `getWorkPath()` method —that being the sole
|
|
|
+specific feature which it needs from the `FileOutputCommitter`.
|
|
|
+
|
|
|
+
|
|
|
+Option 3, make `FileOutputFormat` support more generic committers, is the
|
|
|
+current design. It relies on the fact that the sole specific method of
|
|
|
+`FileOutputCommitter` which `FileOutputFormat` uses is `getWorkPath()`.
|
|
|
+
|
|
|
+This can be pulled up into a new abstract class, `PathOutputCommitter`, which
|
|
|
+`FileOutputCommitter` and `S3ACommitter` can implement:
|
|
|
+
|
|
|
+```java
|
|
|
+public abstract class PathOutputCommitter extends OutputCommitter {
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the directory that the task should write results into.
|
|
|
+ * @return the work directory
|
|
|
+ */
|
|
|
+ public abstract Path getWorkPath() throws IOException;
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+The sole change needed for `FileOutputFormat` is to change what it casts
|
|
|
+the context committer to:
|
|
|
+
|
|
|
+```java
|
|
|
+PathOutputCommitter committer =
|
|
|
+ (PathOutputCommitter) getOutputCommitter(context);
|
|
|
+```
|
|
|
+
|
|
|
+Provided that `getWorkPath()` remains the sole method which `FileOutputFormat`
|
|
|
+uses, these changes will allow an S3A committer to replace the `FileOutputCommitter`,
|
|
|
+with minimal changes to the codebase.
|
|
|
+
|
|
|
+
|
|
|
+Update: There is a cost to this: MRv1 API support is lost.
|
|
|
+
|
|
|
+
|
|
|
+### MRv1 support via `org.apache.hadoop.mapred.FileOutputFormat`
|
|
|
+
|
|
|
+A price of not subclassing `FileOutputCommitter` is that the code used
|
|
|
+to wrap and relay the MRv1 API calls protocol to the `FileOutputCommitter`
|
|
|
+will not work: the new committer will not be picked up.
|
|
|
+
|
|
|
+This is visible in Spark, where the V1 API is exported from the `RDD` class
|
|
|
+(`RDD.saveAsHadoopFile()`)). The successor code, `PairRDDFunctions.saveAsNewAPIHadoopFile()`
|
|
|
+does work: *To get high performance commits in Object Stores, the MRv2 commit protocol
|
|
|
+must be used, which means: the V2 classes.
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+#### Resolved issues
|
|
|
+
|
|
|
+
|
|
|
+**Magic Committer: Name of directory**
|
|
|
+
|
|
|
+The design proposes the name `__magic` for the directory. HDFS and
|
|
|
+the various scanning routines always treat files and directories starting with `_`
|
|
|
+as temporary/excluded data.
|
|
|
+
|
|
|
+
|
|
|
+**Magic Committer: Subdirectories**
|
|
|
+
|
|
|
+It is legal to create subdirectories in a task work directory, which
|
|
|
+will then be moved into the destination directory, retaining that directory
|
|
|
+tree.
|
|
|
+
|
|
|
+That is, a if the task working dir is `dest/__magic/app1/task1/`, all files
|
|
|
+under `dest/__magic/app1/task1/part-0000/` must end up under the path
|
|
|
+`dest/part-0000/`.
|
|
|
+
|
|
|
+This behavior is relied upon for the writing of intermediate map data in an MR
|
|
|
+job.
|
|
|
+
|
|
|
+This means it is not simply enough to strip off all elements of under `__magic`,
|
|
|
+it is critical to determine the base path.
|
|
|
+
|
|
|
+Proposed: use the special name `__base` as a marker of the base element for
|
|
|
+committing. Under task attempts a `__base` dir is created and turned into the
|
|
|
+working dir. All files created under this path will be committed to the destination
|
|
|
+with a path relative to the base dir.
|
|
|
+
|
|
|
+More formally: the last parent element of a path which is `__base` sets the
|
|
|
+base for relative paths created underneath it.
|
|
|
+
|
|
|
+
|
|
|
+## Testing
|
|
|
+
|
|
|
+The committers can only be tested against an S3-compatible object store.
|
|
|
+
|
|
|
+Although a consistent object store is a requirement for a production deployment
|
|
|
+of the magic committer an inconsistent one has appeared to work during testing, simply by
|
|
|
+adding some delays to the operations: a task commit does not succeed until
|
|
|
+all the objects which it has PUT are visible in the LIST operation. Assuming
|
|
|
+that further listings from the same process also show the objects, the job
|
|
|
+committer will be able to list and commit the uploads.
|
|
|
+
|
|
|
+
|
|
|
+The committers have some unit tests, and integration tests based on
|
|
|
+the protocol integration test lifted from `org.apache.hadoop.mapreduce.lib.output.TestFileOutputCommitter`
|
|
|
+to test various state transitions of the commit mechanism has been extended
|
|
|
+to support the variants of the staging committer.
|
|
|
+
|
|
|
+There is an abstract integration test, `AbstractITCommitMRJob` which creates
|
|
|
+a MiniYARN cluster bonded to a MiniHDFS cluster, then submits a simple
|
|
|
+MR job using the relevant committer. This verifies that the committer actually
|
|
|
+works, rather than just "appears to follow the protocol"
|
|
|
+
|
|
|
+One feature added during this testing is that the `_SUCCESS` marker file saved is
|
|
|
+no-longer a 0-byte file, it is a JSON manifest file, as implemented in
|
|
|
+`org.apache.hadoop.fs.s3a.commit.files.SuccessData`. This file includes
|
|
|
+the committer used, the hostname performing the commit, timestamp data and
|
|
|
+a list of paths committed.
|
|
|
+
|
|
|
+```
|
|
|
+SuccessData{
|
|
|
+ committer='PartitionedStagingCommitter',
|
|
|
+ hostname='devbox.local',
|
|
|
+ description='Task committer attempt_1493832493956_0001_m_000000_0',
|
|
|
+ date='Wed May 03 18:28:41 BST 2017',
|
|
|
+ filenames=[/test/testMRJob/part-m-00000, /test/testMRJob/part-m-00002, /test/testMRJob/part-m-00001]
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+This was useful a means of verifying that the correct
|
|
|
+committer had in fact been invoked in those forked processes: a 0-byte `_SUCCESS`
|
|
|
+marker implied the classic `FileOutputCommitter` had been used; if it could be read
|
|
|
+then it provides some details on the commit operation which are then used
|
|
|
+in assertions in the test suite.
|
|
|
+
|
|
|
+It has since been extended to collet metrics and other values, and has proven
|
|
|
+equally useful in Spark integration testing.
|
|
|
+
|
|
|
+## Integrating the Committers with Apache Spark
|
|
|
+
|
|
|
+
|
|
|
+Spark defines a commit protocol `org.apache.spark.internal.io.FileCommitProtocol`,
|
|
|
+implementing it in `HadoopMapReduceCommitProtocol` a subclass `SQLHadoopMapReduceCommitProtocol`
|
|
|
+which supports the configurable declaration of the underlying Hadoop committer class,
|
|
|
+and the `ManifestFileCommitProtocol` for Structured Streaming. The latter
|
|
|
+is best defined as "a complication" —but without support for it, S3 cannot be used
|
|
|
+as a reliable destination of stream checkpoints.
|
|
|
+
|
|
|
+One aspect of the Spark commit protocol is that alongside the Hadoop file committer,
|
|
|
+there's an API to request an absolute path as a target for a commit operation,
|
|
|
+`newTaskTempFileAbsPath(taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String`;
|
|
|
+each task's mapping of temp-> absolute files is passed to the Spark driver
|
|
|
+in the `TaskCommitMessage` returned after a task performs its local
|
|
|
+commit operations (which includes requesting permission to commit from the executor).
|
|
|
+These temporary paths are renamed to the final absolute paths are renamed
|
|
|
+in `FileCommitProtocol.commitJob()`. This is currently a serialized rename sequence
|
|
|
+at the end of all other work. This use of absolute paths is used in writing
|
|
|
+data into a destination directory tree whose directory names is driven by
|
|
|
+partition names (year, month, etc).
|
|
|
+
|
|
|
+Supporting that feature is going to be challenging; either we allow each directory in the partition tree to
|
|
|
+have its own staging directory documenting pending PUT operations, or (better) a staging directory
|
|
|
+tree is built off the base path, with all pending commits tracked in a matching directory
|
|
|
+tree.
|
|
|
+
|
|
|
+Alternatively, the fact that Spark tasks provide data to the job committer on their
|
|
|
+completion means that a list of pending PUT commands could be built up, with the commit
|
|
|
+operations being excuted by an S3A-specific implementation of the `FileCommitProtocol`.
|
|
|
+As noted earlier, this may permit the reqirement for a consistent list operation
|
|
|
+to be bypassed. It would still be important to list what was being written, as
|
|
|
+it is needed to aid aborting work in failed tasks, but the list of files
|
|
|
+created by successful tasks could be passed directly from the task to committer,
|
|
|
+avoid that potentially-inconsistent list.
|
|
|
+
|
|
|
+
|
|
|
+#### Spark, Parquet and the Spark SQL Commit mechanism
|
|
|
+
|
|
|
+Spark's `org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat`
|
|
|
+Parquet output format wants a subclass of `org.apache.parquet.hadoop.ParquetOutputCommitter`,
|
|
|
+the option being defined by the classname in the configuration
|
|
|
+key `spark.sql.parquet.output.committer.class`;
|
|
|
+this is then patched in to the value `spark.sql.sources.outputCommitterClass`
|
|
|
+where it is picked up by `SQLHadoopMapReduceCommitProtocol` and instantiated
|
|
|
+as the committer for the work.
|
|
|
+
|
|
|
+This is presumably done so the user has the option of requesting a metadata
|
|
|
+summary file by setting the option `"parquet.enable.summary-metadata"`.
|
|
|
+Creating the summary file requires scanning every single file in the destination
|
|
|
+directory on the job commit, so is *very* expensive, and not something which
|
|
|
+we recommend when working with S3.
|
|
|
+
|
|
|
+
|
|
|
+To use a S3Guard committer, it must also be identified as the Parquet committer.
|
|
|
+The fact that instances are dynamically instantiated somewhat complicates the process.
|
|
|
+
|
|
|
+In early tests; we can switch committers for ORC output without making any changes
|
|
|
+to the Spark code or configuration other than configuring the factory
|
|
|
+for Path output committers. For Parquet support, it may be sufficient to also declare
|
|
|
+the classname of the specific committer (i.e not the factory).
|
|
|
+
|
|
|
+This is unfortunate as it complicates dynamically selecting a committer protocol
|
|
|
+based on the destination filesystem type or any per-bucket configuration.
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+The solution as implemented in the [initial prototype](https://github.com/hortonworks-spark/cloud-integration)
|
|
|
+consists of two things
|
|
|
+
|
|
|
+1. A class `PathOutputCommitProtocol extends HadoopMapReduceCommitProtocol`
|
|
|
+which always creates the committer using the `PathOutputCommitterFactory`
|
|
|
+mechanism. This ensures that output format's own committers are replaced
|
|
|
+with an output factory mechanism.
|
|
|
+
|
|
|
+1. A class `org.apache.hadoop.mapreduce.lib.output.BindingPathOutputCommitter`
|
|
|
+which is a directly instantiable output committer that then creates a
|
|
|
+committer through the factory mechanism and delegates all operations to
|
|
|
+that committer. This allows a committer to be declared in any configuration
|
|
|
+option which takes a committer class, but still use the factory mechanism
|
|
|
+underneath.
|
|
|
+
|
|
|
+1. Add a patch to Spark 2.3 [SPARK-21762], which allows any output committer
|
|
|
+to be used for `ParquetFileOutput`.
|
|
|
+
|
|
|
+Overall, its a bit convoluted to implement, document and use. Users must
|
|
|
+declare two spark SQL options as well as three spark.hadoop ones
|
|
|
+
|
|
|
+```properties
|
|
|
+spark.sql.sources.commitProtocolClass=com.hortonworks.spark.cloud.commit.PathOutputCommitProtocol
|
|
|
+spark.sql.parquet.output.committer.class=org.apache.hadoop.mapreduce.lib.output.BindingPathOutputCommitter
|
|
|
+spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a=org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
|
|
|
+spark.hadoop.fs.s3a.committer.name=magic
|
|
|
+spark.hadoop.fs.s3a.committer.magic.enabled=true
|
|
|
+```
|
|
|
+
|
|
|
+We could actually simplify this by adding a new algorithm "3" to the existing
|
|
|
+`FileOutputFormat`, telling `FileOutputFormat` itself to use the factory
|
|
|
+to create committers. This would then automatically pick up the factory
|
|
|
+Avoiding loops in this situation would be "challenging": If instantiated
|
|
|
+via a factory, the file committer must not attempt to use the factory
|
|
|
+itself.
|
|
|
+
|
|
|
+
|
|
|
+## Security
|
|
|
+
|
|
|
+What are the obvious possible security risks which need to be covered
|
|
|
+and which code reviews should check for?
|
|
|
+
|
|
|
+* Leakage of AWS credentials in jobs/logging.
|
|
|
+* Exposure of data to unauthorized users.
|
|
|
+* Exposure of workflow to unauthorized users (e.g. paths & filenames, sequence
|
|
|
+of queries).
|
|
|
+* Silent tampering of data by unauthed users.
|
|
|
+* Attacks on jobs which run up (large) bills by leaking pending datasets.
|
|
|
+* DoS attacks with malicious users sabotaging/hindering progress of other
|
|
|
+users' work.
|
|
|
+
|
|
|
+#### Security Risks of the Staging Committers
|
|
|
+
|
|
|
+Staged data is on local FS, in directories listed by `fs.s3a.buffer.dir`,
|
|
|
+falling back to the directory in `hadoop.tmp.dir`.
|
|
|
+These are the same directories already
|
|
|
+used to buffer blocks of data being written in output streams, prior to
|
|
|
+PUT/POST to S3. Therefore: there is more risk than before. We should
|
|
|
+clearly document the security aspects of the temp dirs to ensure this.
|
|
|
+
|
|
|
+As all files written by a task are not uploaded until task commit, more
|
|
|
+local storage is needed. A malicious user executing work on one system
|
|
|
+could potentially fill up the temp disk space. Mitigation: storage
|
|
|
+quotas in local FS, keeping temp dirs on different mounted FS from root.
|
|
|
+
|
|
|
+The intermediate `.pendingset` files are saved in HDFS under the directory in
|
|
|
+`fs.s3a.committer.staging.tmp.path`; defaulting to `/tmp`. This data can
|
|
|
+disclose the workflow (it contains the destination paths & amount of data
|
|
|
+generated), and if deleted, breaks the job. If malicous code were to edit
|
|
|
+the file, by, for example, reordering the ordered etag list, the generated
|
|
|
+data would be committed out of order, creating invalid files. As this is
|
|
|
+the (usually transient) cluster FS, any user in the cluster has the potential
|
|
|
+to do this.
|
|
|
+
|
|
|
+*Locking down the temporary directories is critical to prevent malicious
|
|
|
+cluster users damaging the workflow and output.*
|
|
|
+
|
|
|
+#### Security Risks of the Magic Committer
|
|
|
+
|
|
|
+The directory defined by `fs.s3a.buffer.dir` is used to buffer blocks
|
|
|
+before upload, unless the job is configured to buffer the blocks in memory.
|
|
|
+This is as before: no incremental risk. As blocks are deleted from the filesystem
|
|
|
+after upload, the amount of storage needed is determined by the data generation
|
|
|
+bandwidth and the data upload bandwdith.
|
|
|
+
|
|
|
+No use is made of the cluster filesystem; there are no risks there.
|
|
|
+
|
|
|
+A consistent store is required, which, for Amazon's infrastructure, means S3Guard.
|
|
|
+This is covered below.
|
|
|
+
|
|
|
+A malicious user with write access to the `__magic` directory could manipulate
|
|
|
+or delete the metadata of pending uploads, or potentially inject new work int
|
|
|
+the commit. Having access to the `__magic` directory implies write access
|
|
|
+to the parent destination directory: a malicious user could just as easily
|
|
|
+manipulate the final output, without needing to attack the committer's intermediate
|
|
|
+files.
|
|
|
+
|
|
|
+
|
|
|
+### Security Risks of all committers
|
|
|
+
|
|
|
+
|
|
|
+#### Visibility
|
|
|
+
|
|
|
+* If S3Guard is used for storing metadata, then the metadata is visible to
|
|
|
+all users with read access. A malicious user with write access could delete
|
|
|
+entries of newly generated files, so they would not be visible.
|
|
|
+
|
|
|
+
|
|
|
+#### Malicious Serialized Data
|
|
|
+
|
|
|
+The internal metadata summary files (`.pending` and `.pendingset`)
|
|
|
+could be tampered by someone malicious, and, when parsed or interpreted by
|
|
|
+the trusted account, used to execute untrusted code, fail, etc.
|
|
|
+The formats are all JSON, parsed with Jackson; we expect invalid JSON
|
|
|
+to result in parse errors, and fail the job. Aborting the job triggers a
|
|
|
+best-effort attempt to load the pending files, ignoring those which cannot
|
|
|
+be loaded or parsed, and aborts all pending uploads identified by the loaded
|
|
|
+files.
|
|
|
+
|
|
|
+* None of the strings in the parsed dataset are passed through any interpreter
|
|
|
+(e.g used in SQL queries, shell commands or similar). Therefore (SQL) injection
|
|
|
+attacks are not possible.
|
|
|
+
|
|
|
+* Some of the data *may* be printed in the log files generated during
|
|
|
+process execution. For example, commit Id, destination Key, etc. These
|
|
|
+are all treated as plain text, and should be served up as such in any browser-
|
|
|
+hosted view of the logs.
|
|
|
+
|
|
|
+* Some of the data is returned in the `toString()` values of the loaded
|
|
|
+classes. This may also be logged, observed in IDEs.
|
|
|
+
|
|
|
+* None of the content in the serialized data is displayed in any web UI.
|
|
|
+The vulnerability there is what if this happened in the future in some
|
|
|
+downstream application: would it be possible to inject Javascript into
|
|
|
+any of the text fields, script which could then be executed in some XSS
|
|
|
+attack. We may wish to consider sanitizing this data on load.
|
|
|
+
|
|
|
+* Paths in tampered data could be modified in an attempt to commit an upload across
|
|
|
+an existing file, or the MPU ID alterated to prematurely commit a different upload.
|
|
|
+These attempts will not going to succeed, because the destination
|
|
|
+path of the upload is declared on the initial POST to initiate the MPU, and
|
|
|
+operations associated with the MPU must also declare the path: if the path and
|
|
|
+ID are inconsistent, the operation will fail. If a valid (path, MPU) tuple
|
|
|
+were inserted in a tampered-with file, the commit or abort could complete or abort
|
|
|
+the upload prematurely. As the malicious party must already have enough access to the
|
|
|
+target S3 store to obtain this information, they are unlikely to need to
|
|
|
+tamper with the JSON files to perform such actions.
|
|
|
+
|
|
|
+
|
|
|
+#### Outstanding uncommitted data and its cost
|
|
|
+
|
|
|
+* Killed jobs will leak uncommitted uploads, which will run up bills.
|
|
|
+A restarted job will automatically purge
|
|
|
+all pending uploads under the destination path on job commit, so if the job
|
|
|
+is rerun it will cancel the pending writes of the previous job attempt.
|
|
|
+
|
|
|
+We shall also provide a CLI tool to list and delete pending uploads under a path.
|
|
|
+
|
|
|
+* configuring a store to automatically clean pending uploads after a
|
|
|
+time period such as 24h will guarantee that pending upload data will always
|
|
|
+be deleted, even without a rerun of the job or use of the CLI tool.
|
|
|
+
|
|
|
+AWS document [the permissions for MPU](http://docs.aws.amazon.com/AmazonS3/latest/dev/mpuAndPermissions.html).
|
|
|
+There are special permissions `s3:ListBucketMultipartUploads`,
|
|
|
+`s3:ListMultipartUploadParts` and `s3:AbortMultipartUpload`. By default
|
|
|
+bucket owner has all these permissions, MPU initiator will be granted
|
|
|
+the permissions to list the upload, list hte parts and abort their own uploads.
|
|
|
+Bucket owner may grant/deny perms to either (this may permit a user to be
|
|
|
+able to initiate & complete MPU, but not delete and abort).
|
|
|
+
|
|
|
+#### Proposed security settings & recommendations
|
|
|
+
|
|
|
+* Bucket access restricted to specific IAM roles.
|
|
|
+* `fs.s3a.buffer.dir` set to location under `/tmp` with read & write access
|
|
|
+restricted to the active user.
|
|
|
+* `fs.s3a.committer.staging.tmp.path` should be isolated to the active
|
|
|
+each user. Proposed: make the default an unqualified path, `tmp/staging`,
|
|
|
+which will made absolute relative to the current user. In filesystems in
|
|
|
+which access under user's home directories are restricted, this final, absolute
|
|
|
+path, will not be visible to untrusted accounts.
|
|
|
+
|
|
|
+* Maybe: define the for valid characters in a text strings, and a regext for
|
|
|
+ validating, e,g, `[a-zA-Z0-9 \.\,\(\) \-\+]+` and then validate any free text
|
|
|
+ JSON fields on load and save.
|