|
@@ -269,6 +269,76 @@ appending data are creating and writing into new partitions.
|
|
|
job to create unique files. This is foundational for
|
|
|
any job to generate correct data.
|
|
|
|
|
|
+# <a name="dynamic"></a> Spark Dynamic Partition overwriting
|
|
|
+
|
|
|
+Spark has a feature called "Dynamic Partition Overwrites",
|
|
|
+
|
|
|
+This can be initiated in SQL
|
|
|
+```SQL
|
|
|
+INSERT OVERWRITE TABLE ...
|
|
|
+```
|
|
|
+Or through DataSet writes where the mode is `overwrite` and the partitioning matches
|
|
|
+that of the existing table
|
|
|
+```scala
|
|
|
+sparkConf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
|
|
|
+// followed by an overwrite of a Dataset into an existing partitioned table.
|
|
|
+eventData2
|
|
|
+ .write
|
|
|
+ .mode("overwrite")
|
|
|
+ .partitionBy("year", "month")
|
|
|
+ .format("parquet")
|
|
|
+ .save(existingDir)
|
|
|
+```
|
|
|
+
|
|
|
+This feature is implemented in Spark, which
|
|
|
+1. Directs the job to write its new data to a temporary directory
|
|
|
+1. After job commit completes, scans the output to identify the leaf directories "partitions" into which data was written.
|
|
|
+1. Deletes the content of those directories in the destination table
|
|
|
+1. Renames the new files into the partitions.
|
|
|
+
|
|
|
+This is all done in spark, which takes over the tasks of scanning
|
|
|
+the intermediate output tree, deleting partitions and of
|
|
|
+renaming the new files.
|
|
|
+
|
|
|
+This feature also adds the ability for a job to write data entirely outside
|
|
|
+the destination table, which is done by
|
|
|
+1. writing new files into the working directory
|
|
|
+1. spark moving them to the final destination in job commit
|
|
|
+
|
|
|
+
|
|
|
+The manifest committer is compatible with dynamic partition overwrites
|
|
|
+on Azure and Google cloud storage as together they meet the core requirements of
|
|
|
+the extension:
|
|
|
+1. The working directory returned in `getWorkPath()` is in the same filesystem
|
|
|
+ as the final output.
|
|
|
+2. `rename()` is an `O(1)` operation which is safe and fast to use when committing a job.
|
|
|
+
|
|
|
+None of the S3A committers support this. Condition (1) is not met by
|
|
|
+the staging committers, while (2) is not met by S3 itself.
|
|
|
+
|
|
|
+To use the manifest committer with dynamic partition overwrites, the
|
|
|
+spark version must contain
|
|
|
+[SPARK-40034](https://issues.apache.org/jira/browse/SPARK-40034)
|
|
|
+_PathOutputCommitters to work with dynamic partition overwrite_.
|
|
|
+
|
|
|
+Be aware that the rename phase of the operation will be slow
|
|
|
+if many files are renamed -this is done sequentially.
|
|
|
+Parallel renaming would speed this up, *but could trigger the abfs overload
|
|
|
+problems the manifest committer is designed to both minimize the risk
|
|
|
+of and support recovery from*
|
|
|
+
|
|
|
+The spark side of the commit operation will be listing/treewalking
|
|
|
+the temporary output directory (some overhead), followed by
|
|
|
+the file promotion, done with a classic filesystem `rename()`
|
|
|
+call. There will be no explicit rate limiting here.
|
|
|
+
|
|
|
+*What does this mean?*
|
|
|
+
|
|
|
+It means that _dynamic partitioning should not be used on Azure Storage
|
|
|
+for SQL queries/Spark DataSet operations where many thousands of files are created.
|
|
|
+The fact that these will suffer from performance problems before
|
|
|
+throttling scale issues surface, should be considered a warning.
|
|
|
+
|
|
|
# <a name="SUCCESS"></a> Job Summaries in `_SUCCESS` files
|
|
|
|
|
|
The original hadoop committer creates a zero byte `_SUCCESS` file in the root of the output directory
|
|
@@ -585,7 +655,7 @@ There is no need to alter these values, except when writing new implementations
|
|
|
something which is only needed if the store provides extra integration support for the
|
|
|
committer.
|
|
|
|
|
|
-## <a name="concurrent"></a> Support for concurrent test runs.
|
|
|
+## <a name="concurrent"></a> Support for concurrent jobs to the same directory
|
|
|
|
|
|
It *may* be possible to run multiple jobs targeting the same directory tree.
|
|
|
|
|
@@ -600,6 +670,8 @@ For this to work, a number of conditions must be met:
|
|
|
`mapreduce.fileoutputcommitter.cleanup.skipped` to `true`.
|
|
|
* All jobs/tasks must create files with unique filenames.
|
|
|
* All jobs must create output with the same directory partition structure.
|
|
|
+* The job/queries MUST NOT be using Spark Dynamic Partitioning "INSERT OVERWRITE TABLE"; data may be lost.
|
|
|
+ This holds for *all* committers, not just the manifest committer.
|
|
|
* Remember to delete the `_temporary` directory later!
|
|
|
|
|
|
This has *NOT BEEN TESTED*
|