|
@@ -94,17 +94,17 @@ public class TestShortCircuitLocalRead {
|
|
|
public void before() {
|
|
|
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
static final long seed = 0xDEADBEEFL;
|
|
|
static final int blockSize = 5120;
|
|
|
final boolean simulatedStorage = false;
|
|
|
-
|
|
|
+
|
|
|
// creates a file but does not close it
|
|
|
static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
|
|
|
throws IOException {
|
|
|
FSDataOutputStream stm = fileSys.create(name, true,
|
|
|
- fileSys.getConf().getInt("io.file.buffer.size", 4096),
|
|
|
- (short)repl, blockSize);
|
|
|
+ fileSys.getConf().getInt("io.file.buffer.size", 4096),
|
|
|
+ (short)repl, blockSize);
|
|
|
return stm;
|
|
|
}
|
|
|
|
|
@@ -112,19 +112,20 @@ public class TestShortCircuitLocalRead {
|
|
|
String message) {
|
|
|
checkData(actual, from, expected, actual.length, message);
|
|
|
}
|
|
|
-
|
|
|
- static private void checkData(byte[] actual, int from, byte[] expected, int len,
|
|
|
- String message) {
|
|
|
+
|
|
|
+ static private void checkData(byte[] actual, int from, byte[] expected,
|
|
|
+ int len, String message) {
|
|
|
for (int idx = 0; idx < len; idx++) {
|
|
|
if (expected[from + idx] != actual[idx]) {
|
|
|
- Assert.fail(message + " byte " + (from + idx) + " differs. expected "
|
|
|
- + expected[from + idx] + " actual " + actual[idx] +
|
|
|
- "\nexpected: " + StringUtils.byteToHexString(expected, from, from + len) +
|
|
|
+ Assert.fail(message + " byte " + (from + idx) + " differs. expected " +
|
|
|
+ expected[from + idx] + " actual " + actual[idx] +
|
|
|
+ "\nexpected: " +
|
|
|
+ StringUtils.byteToHexString(expected, from, from + len) +
|
|
|
"\nactual: " + StringUtils.byteToHexString(actual, 0, len));
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private static String getCurrentUser() throws IOException {
|
|
|
return UserGroupInformation.getCurrentUser().getShortUserName();
|
|
|
}
|
|
@@ -140,7 +141,7 @@ public class TestShortCircuitLocalRead {
|
|
|
if (legacyShortCircuitFails) {
|
|
|
assertFalse(getClientContext.getDisableLegacyBlockReaderLocal());
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
FSDataInputStream stm = fs.open(name);
|
|
|
byte[] actual = new byte[expected.length-readOffset];
|
|
|
stm.readFully(readOffset, actual);
|
|
@@ -165,7 +166,7 @@ public class TestShortCircuitLocalRead {
|
|
|
nread += nbytes;
|
|
|
}
|
|
|
checkData(actual, readOffset, expected, "Read 3");
|
|
|
-
|
|
|
+
|
|
|
if (legacyShortCircuitFails) {
|
|
|
assertTrue(getClientContext.getDisableLegacyBlockReaderLocal());
|
|
|
}
|
|
@@ -179,7 +180,7 @@ public class TestShortCircuitLocalRead {
|
|
|
alt.get(arr);
|
|
|
return arr;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/** Check the file content, reading as user {@code readingUser} */
|
|
|
static void checkFileContentDirect(URI uri, Path name, byte[] expected,
|
|
|
int readOffset, String readingUser, Configuration conf,
|
|
@@ -191,7 +192,7 @@ public class TestShortCircuitLocalRead {
|
|
|
if (legacyShortCircuitFails) {
|
|
|
assertTrue(clientContext.getDisableLegacyBlockReaderLocal());
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(name);
|
|
|
|
|
|
ByteBuffer actual = ByteBuffer.allocateDirect(expected.length - readOffset);
|
|
@@ -239,7 +240,7 @@ public class TestShortCircuitLocalRead {
|
|
|
doTestShortCircuitReadImpl(ignoreChecksum, size, readOffset,
|
|
|
null, getCurrentUser(), false);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Test that file data can be read by reading the block file
|
|
|
* directly from the local store.
|
|
@@ -272,15 +273,15 @@ public class TestShortCircuitLocalRead {
|
|
|
try {
|
|
|
// check that / exists
|
|
|
Path path = new Path("/");
|
|
|
- assertTrue("/ should be a directory", fs.getFileStatus(path)
|
|
|
- .isDirectory() == true);
|
|
|
-
|
|
|
+ assertTrue("/ should be a directory",
|
|
|
+ fs.getFileStatus(path).isDirectory());
|
|
|
+
|
|
|
byte[] fileData = AppendTestUtil.randomBytes(seed, size);
|
|
|
Path file1 = fs.makeQualified(new Path("filelocal.dat"));
|
|
|
FSDataOutputStream stm = createFile(fs, file1, 1);
|
|
|
stm.write(fileData);
|
|
|
stm.close();
|
|
|
-
|
|
|
+
|
|
|
URI uri = cluster.getURI();
|
|
|
checkFileContent(uri, file1, fileData, readOffset, readingUser, conf,
|
|
|
legacyShortCircuitFails);
|
|
@@ -301,7 +302,7 @@ public class TestShortCircuitLocalRead {
|
|
|
public void testFileLocalReadChecksum() throws Exception {
|
|
|
doTestShortCircuitRead(false, 3*blockSize+100, 0);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Test(timeout=60000)
|
|
|
public void testSmallFileLocalRead() throws Exception {
|
|
|
doTestShortCircuitRead(false, 13, 0);
|
|
@@ -309,7 +310,7 @@ public class TestShortCircuitLocalRead {
|
|
|
doTestShortCircuitRead(true, 13, 0);
|
|
|
doTestShortCircuitRead(true, 13, 5);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Test(timeout=60000)
|
|
|
public void testLocalReadLegacy() throws Exception {
|
|
|
doTestShortCircuitReadLegacy(true, 13, 0, getCurrentUser(),
|
|
@@ -325,13 +326,13 @@ public class TestShortCircuitLocalRead {
|
|
|
public void testLocalReadFallback() throws Exception {
|
|
|
doTestShortCircuitReadLegacy(true, 13, 0, getCurrentUser(), "notallowed", true);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Test(timeout=60000)
|
|
|
public void testReadFromAnOffset() throws Exception {
|
|
|
doTestShortCircuitRead(false, 3*blockSize+100, 777);
|
|
|
doTestShortCircuitRead(true, 3*blockSize+100, 777);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Test(timeout=60000)
|
|
|
public void testLongFile() throws Exception {
|
|
|
doTestShortCircuitRead(false, 10*blockSize+100, 777);
|
|
@@ -348,7 +349,7 @@ public class TestShortCircuitLocalRead {
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Test(timeout=10000)
|
|
|
public void testDeprecatedGetBlockLocalPathInfoRpc() throws IOException {
|
|
|
final Configuration conf = new Configuration();
|
|
@@ -399,18 +400,18 @@ public class TestShortCircuitLocalRead {
|
|
|
try {
|
|
|
// check that / exists
|
|
|
Path path = new Path("/");
|
|
|
- assertTrue("/ should be a directory", fs.getFileStatus(path)
|
|
|
- .isDirectory() == true);
|
|
|
-
|
|
|
+ assertTrue("/ should be a directory",
|
|
|
+ fs.getFileStatus(path).isDirectory());
|
|
|
+
|
|
|
byte[] fileData = AppendTestUtil.randomBytes(seed, size*3);
|
|
|
// create a new file in home directory. Do not close it.
|
|
|
Path file1 = new Path("filelocal.dat");
|
|
|
FSDataOutputStream stm = createFile(fs, file1, 1);
|
|
|
-
|
|
|
+
|
|
|
// write to file
|
|
|
stm.write(fileData);
|
|
|
stm.close();
|
|
|
-
|
|
|
+
|
|
|
// now test the skip function
|
|
|
FSDataInputStream instm = fs.open(file1);
|
|
|
byte[] actual = new byte[fileData.length];
|
|
@@ -421,7 +422,6 @@ public class TestShortCircuitLocalRead {
|
|
|
instm.seek(skipped);
|
|
|
nread = instm.read(actual, (int)(skipped + nread), 3);
|
|
|
instm.close();
|
|
|
-
|
|
|
} finally {
|
|
|
fs.close();
|
|
|
cluster.shutdown();
|
|
@@ -443,7 +443,7 @@ public class TestShortCircuitLocalRead {
|
|
|
final long RANDOM_SEED2 = 4568L;
|
|
|
FSDataInputStream fsIn = null;
|
|
|
final int TEST_LENGTH = 3456;
|
|
|
-
|
|
|
+
|
|
|
try {
|
|
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
|
|
cluster.waitActive();
|
|
@@ -470,14 +470,11 @@ public class TestShortCircuitLocalRead {
|
|
|
File dataFile = cluster.getBlockFile(0, block);
|
|
|
cluster.shutdown();
|
|
|
cluster = null;
|
|
|
- RandomAccessFile raf = null;
|
|
|
- try {
|
|
|
- raf = new RandomAccessFile(dataFile, "rw");
|
|
|
+ try (RandomAccessFile raf = new RandomAccessFile(dataFile, "rw")) {
|
|
|
raf.setLength(0);
|
|
|
- } finally {
|
|
|
- if (raf != null) raf.close();
|
|
|
}
|
|
|
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(false).build();
|
|
|
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(false)
|
|
|
+ .build();
|
|
|
cluster.waitActive();
|
|
|
fs = cluster.getFileSystem();
|
|
|
fsIn = fs.open(TEST_PATH);
|
|
@@ -509,7 +506,7 @@ public class TestShortCircuitLocalRead {
|
|
|
if (cluster != null) cluster.shutdown();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Test to run benchmarks between short circuit read vs regular read with
|
|
|
* specified number of threads simultaneously reading.
|
|
@@ -535,16 +532,16 @@ public class TestShortCircuitLocalRead {
|
|
|
"/tmp/TestShortCircuitLocalRead._PORT");
|
|
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
|
|
|
checksum);
|
|
|
-
|
|
|
- //Override fileSize and DATA_TO_WRITE to much larger values for benchmark test
|
|
|
+
|
|
|
+ // Override fileSize and DATA_TO_WRITE to much larger values for benchmark test
|
|
|
int fileSize = 1000 * blockSize + 100; // File with 1000 blocks
|
|
|
final byte [] dataToWrite = AppendTestUtil.randomBytes(seed, fileSize);
|
|
|
-
|
|
|
+
|
|
|
// create a new file in home directory. Do not close it.
|
|
|
final Path file1 = new Path("filelocal.dat");
|
|
|
final FileSystem fs = FileSystem.get(conf);
|
|
|
FSDataOutputStream stm = createFile(fs, file1, 1);
|
|
|
-
|
|
|
+
|
|
|
stm.write(dataToWrite);
|
|
|
stm.close();
|
|
|
|
|
@@ -580,8 +577,10 @@ public class TestShortCircuitLocalRead {
|
|
|
}
|
|
|
|
|
|
@Test(timeout=60000)
|
|
|
- public void testReadWithRemoteBlockReader() throws IOException, InterruptedException {
|
|
|
- doTestShortCircuitReadWithRemoteBlockReader(true, 3*blockSize+100, getCurrentUser(), 0, false);
|
|
|
+ public void testReadWithRemoteBlockReader()
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+ doTestShortCircuitReadWithRemoteBlockReader(true, 3 * blockSize + 100,
|
|
|
+ getCurrentUser(), 0, false);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -589,8 +588,9 @@ public class TestShortCircuitLocalRead {
|
|
|
* through RemoteBlockReader
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public void doTestShortCircuitReadWithRemoteBlockReader(boolean ignoreChecksum, int size, String shortCircuitUser,
|
|
|
- int readOffset, boolean shortCircuitFails) throws IOException, InterruptedException {
|
|
|
+ public void doTestShortCircuitReadWithRemoteBlockReader(boolean ignoreChecksum,
|
|
|
+ int size, String shortCircuitUser, int readOffset,
|
|
|
+ boolean shortCircuitFails) throws IOException, InterruptedException {
|
|
|
Configuration conf = new Configuration();
|
|
|
conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
|
|
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
|
@@ -601,8 +601,7 @@ public class TestShortCircuitLocalRead {
|
|
|
// check that / exists
|
|
|
Path path = new Path("/");
|
|
|
URI uri = cluster.getURI();
|
|
|
- assertTrue("/ should be a directory", fs.getFileStatus(path)
|
|
|
- .isDirectory() == true);
|
|
|
+ assertTrue("/ should be a directory", fs.getFileStatus(path).isDirectory());
|
|
|
|
|
|
byte[] fileData = AppendTestUtil.randomBytes(seed, size);
|
|
|
Path file1 = new Path("filelocal.dat");
|
|
@@ -627,7 +626,7 @@ public class TestShortCircuitLocalRead {
|
|
|
}
|
|
|
|
|
|
private boolean checkUnsupportedMethod(FileSystem fs, Path file,
|
|
|
- byte[] expected, int readOffset) throws IOException {
|
|
|
+ byte[] expected, int readOffset) throws IOException {
|
|
|
HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(file);
|
|
|
ByteBuffer actual = ByteBuffer.allocateDirect(expected.length - readOffset);
|
|
|
IOUtils.skipFully(stm, readOffset);
|
|
@@ -639,5 +638,4 @@ public class TestShortCircuitLocalRead {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
}
|