|
@@ -175,12 +175,14 @@ final class S3ADataBlocks {
|
|
|
/**
|
|
|
* Create a block.
|
|
|
*
|
|
|
+ * @param spanId id of the audit span
|
|
|
+ * @param key key of s3 object being written to
|
|
|
* @param index index of block
|
|
|
* @param limit limit of the block.
|
|
|
* @param statistics stats to work with
|
|
|
* @return a new block.
|
|
|
*/
|
|
|
- abstract DataBlock create(long index, long limit,
|
|
|
+ abstract DataBlock create(String spanId, String key, long index, long limit,
|
|
|
BlockOutputStreamStatistics statistics)
|
|
|
throws IOException;
|
|
|
|
|
@@ -391,11 +393,11 @@ final class S3ADataBlocks {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- DataBlock create(long index, long limit,
|
|
|
+ DataBlock create(String spanId, String key, long index, long limit,
|
|
|
BlockOutputStreamStatistics statistics)
|
|
|
throws IOException {
|
|
|
Preconditions.checkArgument(limit > 0,
|
|
|
- "Invalid block size: %d", limit);
|
|
|
+ "Invalid block size: %d [%s]", limit, key);
|
|
|
return new ByteArrayBlock(0, limit, statistics);
|
|
|
}
|
|
|
|
|
@@ -516,11 +518,11 @@ final class S3ADataBlocks {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- ByteBufferBlock create(long index, long limit,
|
|
|
+ ByteBufferBlock create(String spanId, String key, long index, long limit,
|
|
|
BlockOutputStreamStatistics statistics)
|
|
|
throws IOException {
|
|
|
Preconditions.checkArgument(limit > 0,
|
|
|
- "Invalid block size: %d", limit);
|
|
|
+ "Invalid block size: %d [%s]", limit, key);
|
|
|
return new ByteBufferBlock(index, limit, statistics);
|
|
|
}
|
|
|
|
|
@@ -798,6 +800,8 @@ final class S3ADataBlocks {
|
|
|
* Buffer blocks to disk.
|
|
|
*/
|
|
|
static class DiskBlockFactory extends BlockFactory {
|
|
|
+ private static final String ESCAPED_FORWARD_SLASH = "EFS";
|
|
|
+ private static final String ESCAPED_BACKSLASH = "EBS";
|
|
|
|
|
|
DiskBlockFactory(S3AFileSystem owner) {
|
|
|
super(owner);
|
|
@@ -806,6 +810,8 @@ final class S3ADataBlocks {
|
|
|
/**
|
|
|
* Create a temp file and a {@link DiskBlock} instance to manage it.
|
|
|
*
|
|
|
+ * @param spanId id of the audit span
|
|
|
+ * @param key of the s3 object being written
|
|
|
* @param index block index
|
|
|
* @param limit limit of the block. -1 means "no limit"
|
|
|
* @param statistics statistics to update
|
|
@@ -813,17 +819,22 @@ final class S3ADataBlocks {
|
|
|
* @throws IOException IO problems
|
|
|
*/
|
|
|
@Override
|
|
|
- DataBlock create(long index,
|
|
|
+ DataBlock create(String spanId, String key, long index,
|
|
|
long limit,
|
|
|
BlockOutputStreamStatistics statistics)
|
|
|
throws IOException {
|
|
|
Preconditions.checkArgument(limit != 0,
|
|
|
- "Invalid block size: %d", limit);
|
|
|
- File destFile = getOwner()
|
|
|
- .createTmpFileForWrite(String.format("s3ablock-%04d-", index),
|
|
|
- limit, getOwner().getConf());
|
|
|
+ "Invalid block size: %d [%s]", limit, key);
|
|
|
+ String prefix = String.format("s3ablock-%04d-%s-%s-", index, spanId, escapeS3Key(key));
|
|
|
+ File destFile = getOwner().createTmpFileForWrite(prefix, limit, getOwner().getConf());
|
|
|
return new DiskBlock(destFile, limit, index, statistics);
|
|
|
}
|
|
|
+
|
|
|
+ protected static String escapeS3Key(String key) {
|
|
|
+ return key
|
|
|
+ .replace("\\", ESCAPED_BACKSLASH)
|
|
|
+ .replace("/", ESCAPED_FORWARD_SLASH);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|