|
@@ -19,8 +19,9 @@
|
|
|
package org.apache.hadoop.fs.s3a;
|
|
|
|
|
|
import java.io.File;
|
|
|
-import java.net.URI;
|
|
|
+import java.util.UUID;
|
|
|
|
|
|
+import org.assertj.core.api.Assertions;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
import org.slf4j.Logger;
|
|
@@ -30,15 +31,16 @@ import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.LocalFileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
|
|
import org.apache.hadoop.fs.permission.FsAction;
|
|
|
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
|
|
|
|
|
|
import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
|
|
|
-import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
|
|
|
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
|
|
|
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
|
|
|
+import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
|
|
|
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
|
|
|
|
|
|
/**
|
|
@@ -49,11 +51,21 @@ public class ITestS3APrefetchingCacheFiles extends AbstractS3ACostTest {
|
|
|
private static final Logger LOG =
|
|
|
LoggerFactory.getLogger(ITestS3APrefetchingCacheFiles.class);
|
|
|
|
|
|
+ /** use a small file size so small source files will still work. */
|
|
|
+ public static final int BLOCK_SIZE = 128 * 1024;
|
|
|
+
|
|
|
+ public static final int PREFETCH_OFFSET = 10240;
|
|
|
+
|
|
|
private Path testFile;
|
|
|
+
|
|
|
+ /** The FS with the external file. */
|
|
|
private FileSystem fs;
|
|
|
+
|
|
|
private int prefetchBlockSize;
|
|
|
private Configuration conf;
|
|
|
|
|
|
+ private String bufferDir;
|
|
|
+
|
|
|
public ITestS3APrefetchingCacheFiles() {
|
|
|
super(true);
|
|
|
}
|
|
@@ -63,35 +75,31 @@ public class ITestS3APrefetchingCacheFiles extends AbstractS3ACostTest {
|
|
|
super.setup();
|
|
|
// Sets BUFFER_DIR by calling S3ATestUtils#prepareTestConfiguration
|
|
|
conf = createConfiguration();
|
|
|
- String testFileUri = S3ATestUtils.getCSVTestFile(conf);
|
|
|
|
|
|
- testFile = new Path(testFileUri);
|
|
|
- prefetchBlockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE);
|
|
|
- fs = getFileSystem();
|
|
|
- fs.initialize(new URI(testFileUri), conf);
|
|
|
+ testFile = getExternalData(conf);
|
|
|
+ prefetchBlockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
|
|
+ fs = FileSystem.get(testFile.toUri(), conf);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public Configuration createConfiguration() {
|
|
|
Configuration configuration = super.createConfiguration();
|
|
|
S3ATestUtils.removeBaseAndBucketOverrides(configuration, PREFETCH_ENABLED_KEY);
|
|
|
- S3ATestUtils.removeBaseAndBucketOverrides(configuration, PREFETCH_BLOCK_SIZE_KEY);
|
|
|
configuration.setBoolean(PREFETCH_ENABLED_KEY, true);
|
|
|
+ // use a small block size unless explicitly set in the test config.
|
|
|
+ configuration.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
|
|
+ // patch buffer dir with a unique path for test isolation.
|
|
|
+ final String bufferDirBase = configuration.get(BUFFER_DIR);
|
|
|
+ bufferDir = bufferDirBase + "/" + UUID.randomUUID();
|
|
|
+ configuration.set(BUFFER_DIR, bufferDir);
|
|
|
return configuration;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public synchronized void teardown() throws Exception {
|
|
|
super.teardown();
|
|
|
- File tmpFileDir = new File(conf.get(BUFFER_DIR));
|
|
|
- File[] tmpFiles = tmpFileDir.listFiles();
|
|
|
- if (tmpFiles != null) {
|
|
|
- for (File filePath : tmpFiles) {
|
|
|
- String path = filePath.getPath();
|
|
|
- if (path.endsWith(".bin") && path.contains("fs-cache-")) {
|
|
|
- filePath.delete();
|
|
|
- }
|
|
|
- }
|
|
|
+ if (bufferDir != null) {
|
|
|
+ new File(bufferDir).delete();
|
|
|
}
|
|
|
cleanupWithLogger(LOG, fs);
|
|
|
fs = null;
|
|
@@ -111,34 +119,35 @@ public class ITestS3APrefetchingCacheFiles extends AbstractS3ACostTest {
|
|
|
try (FSDataInputStream in = fs.open(testFile)) {
|
|
|
byte[] buffer = new byte[prefetchBlockSize];
|
|
|
|
|
|
- in.read(buffer, 0, prefetchBlockSize - 10240);
|
|
|
- in.seek(prefetchBlockSize * 2);
|
|
|
- in.read(buffer, 0, prefetchBlockSize);
|
|
|
+ // read a bit less than a block
|
|
|
+ in.readFully(0, buffer, 0, prefetchBlockSize - PREFETCH_OFFSET);
|
|
|
+ // read at least some of a second block
|
|
|
+ in.read(prefetchBlockSize * 2, buffer, 0, prefetchBlockSize);
|
|
|
+
|
|
|
|
|
|
File tmpFileDir = new File(conf.get(BUFFER_DIR));
|
|
|
- assertTrue("The dir to keep cache files must exist", tmpFileDir.exists());
|
|
|
+ final LocalFileSystem localFs = FileSystem.getLocal(conf);
|
|
|
+ Path bufferDirPath = new Path(tmpFileDir.toURI());
|
|
|
+ ContractTestUtils.assertIsDirectory(localFs, bufferDirPath);
|
|
|
File[] tmpFiles = tmpFileDir
|
|
|
.listFiles((dir, name) -> name.endsWith(".bin") && name.contains("fs-cache-"));
|
|
|
- boolean isCacheFileForBlockFound = tmpFiles != null && tmpFiles.length > 0;
|
|
|
- if (!isCacheFileForBlockFound) {
|
|
|
- LOG.warn("No cache files found under " + tmpFileDir);
|
|
|
- }
|
|
|
- assertTrue("File to cache block data must exist", isCacheFileForBlockFound);
|
|
|
+ Assertions.assertThat(tmpFiles)
|
|
|
+ .describedAs("Cache files not found under %s", tmpFileDir)
|
|
|
+ .isNotEmpty();
|
|
|
+
|
|
|
|
|
|
for (File tmpFile : tmpFiles) {
|
|
|
Path path = new Path(tmpFile.getAbsolutePath());
|
|
|
- try (FileSystem localFs = FileSystem.getLocal(conf)) {
|
|
|
- FileStatus stat = localFs.getFileStatus(path);
|
|
|
- ContractTestUtils.assertIsFile(path, stat);
|
|
|
- assertEquals("File length not matching with prefetchBlockSize", prefetchBlockSize,
|
|
|
- stat.getLen());
|
|
|
- assertEquals("User permissions should be RW", FsAction.READ_WRITE,
|
|
|
- stat.getPermission().getUserAction());
|
|
|
- assertEquals("Group permissions should be NONE", FsAction.NONE,
|
|
|
- stat.getPermission().getGroupAction());
|
|
|
- assertEquals("Other permissions should be NONE", FsAction.NONE,
|
|
|
- stat.getPermission().getOtherAction());
|
|
|
- }
|
|
|
+ FileStatus stat = localFs.getFileStatus(path);
|
|
|
+ ContractTestUtils.assertIsFile(path, stat);
|
|
|
+ assertEquals("File length not matching with prefetchBlockSize", prefetchBlockSize,
|
|
|
+ stat.getLen());
|
|
|
+ assertEquals("User permissions should be RW", FsAction.READ_WRITE,
|
|
|
+ stat.getPermission().getUserAction());
|
|
|
+ assertEquals("Group permissions should be NONE", FsAction.NONE,
|
|
|
+ stat.getPermission().getGroupAction());
|
|
|
+ assertEquals("Other permissions should be NONE", FsAction.NONE,
|
|
|
+ stat.getPermission().getOtherAction());
|
|
|
}
|
|
|
}
|
|
|
}
|