|
@@ -204,6 +204,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
LoggerFactory.getLogger("org.apache.hadoop.fs.s3a.S3AFileSystem.Progress");
|
|
LoggerFactory.getLogger("org.apache.hadoop.fs.s3a.S3AFileSystem.Progress");
|
|
private LocalDirAllocator directoryAllocator;
|
|
private LocalDirAllocator directoryAllocator;
|
|
private CannedAccessControlList cannedACL;
|
|
private CannedAccessControlList cannedACL;
|
|
|
|
+ private boolean failOnMetadataWriteError;
|
|
|
|
|
|
/**
|
|
/**
|
|
* This must never be null; until initialized it just declares that there
|
|
* This must never be null; until initialized it just declares that there
|
|
@@ -306,6 +307,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
onRetry);
|
|
onRetry);
|
|
writeHelper = new WriteOperationHelper(this, getConf());
|
|
writeHelper = new WriteOperationHelper(this, getConf());
|
|
|
|
|
|
|
|
+ failOnMetadataWriteError = conf.getBoolean(FAIL_ON_METADATA_WRITE_ERROR,
|
|
|
|
+ FAIL_ON_METADATA_WRITE_ERROR_DEFAULT);
|
|
|
|
+
|
|
maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
|
|
maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
|
|
listing = new Listing(this);
|
|
listing = new Listing(this);
|
|
partSize = getMultipartSizeProperty(conf,
|
|
partSize = getMultipartSizeProperty(conf,
|
|
@@ -1784,10 +1788,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
* @param putObjectRequest the request
|
|
* @param putObjectRequest the request
|
|
* @return the upload initiated
|
|
* @return the upload initiated
|
|
* @throws AmazonClientException on problems
|
|
* @throws AmazonClientException on problems
|
|
|
|
+ * @throws MetadataPersistenceException if metadata about the write could
|
|
|
|
+ * not be saved to the metadata store and
|
|
|
|
+ * fs.s3a.metadatastore.fail.on.write.error=true
|
|
*/
|
|
*/
|
|
- @Retries.OnceRaw("For PUT; post-PUT actions are RetriesExceptionsSwallowed")
|
|
|
|
|
|
+ @Retries.OnceRaw("For PUT; post-PUT actions are RetryTranslated")
|
|
PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest)
|
|
PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest)
|
|
- throws AmazonClientException {
|
|
|
|
|
|
+ throws AmazonClientException, MetadataPersistenceException {
|
|
long len = getPutRequestLength(putObjectRequest);
|
|
long len = getPutRequestLength(putObjectRequest);
|
|
LOG.debug("PUT {} bytes to {}", len, putObjectRequest.getKey());
|
|
LOG.debug("PUT {} bytes to {}", len, putObjectRequest.getKey());
|
|
incrementPutStartStatistics(len);
|
|
incrementPutStartStatistics(len);
|
|
@@ -2710,11 +2717,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
* @param progress optional progress callback
|
|
* @param progress optional progress callback
|
|
* @return the upload result
|
|
* @return the upload result
|
|
* @throws InterruptedIOException if the blocking was interrupted.
|
|
* @throws InterruptedIOException if the blocking was interrupted.
|
|
|
|
+ * @throws MetadataPersistenceException if metadata about the write could
|
|
|
|
+ * not be saved to the metadata store and
|
|
|
|
+ * fs.s3a.metadatastore.fail.on.write.error=true
|
|
*/
|
|
*/
|
|
- @Retries.OnceRaw("For PUT; post-PUT actions are RetriesExceptionsSwallowed")
|
|
|
|
|
|
+ @Retries.OnceRaw("For PUT; post-PUT actions are RetryTranslated")
|
|
UploadResult executePut(PutObjectRequest putObjectRequest,
|
|
UploadResult executePut(PutObjectRequest putObjectRequest,
|
|
Progressable progress)
|
|
Progressable progress)
|
|
- throws InterruptedIOException {
|
|
|
|
|
|
+ throws InterruptedIOException, MetadataPersistenceException {
|
|
String key = putObjectRequest.getKey();
|
|
String key = putObjectRequest.getKey();
|
|
UploadInfo info = putObject(putObjectRequest);
|
|
UploadInfo info = putObject(putObjectRequest);
|
|
Upload upload = info.getUpload();
|
|
Upload upload = info.getUpload();
|
|
@@ -3034,10 +3044,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
* </ol>
|
|
* </ol>
|
|
* @param key key written to
|
|
* @param key key written to
|
|
* @param length total length of file written
|
|
* @param length total length of file written
|
|
|
|
+ * @throws MetadataPersistenceException if metadata about the write could
|
|
|
|
+ * not be saved to the metadata store and
|
|
|
|
+ * fs.s3a.metadatastore.fail.on.write.error=true
|
|
*/
|
|
*/
|
|
@InterfaceAudience.Private
|
|
@InterfaceAudience.Private
|
|
- @Retries.RetryExceptionsSwallowed
|
|
|
|
- void finishedWrite(String key, long length) {
|
|
|
|
|
|
+ @Retries.RetryTranslated("Except if failOnMetadataWriteError=false, in which"
|
|
|
|
+ + " case RetryExceptionsSwallowed")
|
|
|
|
+ void finishedWrite(String key, long length)
|
|
|
|
+ throws MetadataPersistenceException {
|
|
LOG.debug("Finished write to {}, len {}", key, length);
|
|
LOG.debug("Finished write to {}, len {}", key, length);
|
|
Path p = keyToQualifiedPath(key);
|
|
Path p = keyToQualifiedPath(key);
|
|
Preconditions.checkArgument(length >= 0, "content length is negative");
|
|
Preconditions.checkArgument(length >= 0, "content length is negative");
|
|
@@ -3053,8 +3068,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|
S3Guard.putAndReturn(metadataStore, status, instrumentation);
|
|
S3Guard.putAndReturn(metadataStore, status, instrumentation);
|
|
}
|
|
}
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
- LOG.error("S3Guard: Error updating MetadataStore for write to {}:",
|
|
|
|
- key, e);
|
|
|
|
|
|
+ if (failOnMetadataWriteError) {
|
|
|
|
+ throw new MetadataPersistenceException(p.toString(), e);
|
|
|
|
+ } else {
|
|
|
|
+ LOG.error("S3Guard: Error updating MetadataStore for write to {}",
|
|
|
|
+ p, e);
|
|
|
|
+ }
|
|
instrumentation.errorIgnored();
|
|
instrumentation.errorIgnored();
|
|
}
|
|
}
|
|
}
|
|
}
|