|
@@ -578,4 +578,64 @@ public class TestShortCircuitLocalRead {
|
|
|
System.out.println("Iteration " + iteration + " took " + (end - start));
|
|
|
fs.delete(file1, false);
|
|
|
}
|
|
|
+
|
|
|
+ public void testReadWithRemoteBlockReader() throws IOException, InterruptedException {
|
|
|
+ doTestShortCircuitReadWithRemoteBlockReader(true, 3*blockSize+100, getCurrentUser(), 0, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test that file data can be read by reading the block
|
|
|
+ * through RemoteBlockReader
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public void doTestShortCircuitReadWithRemoteBlockReader(boolean ignoreChecksum, int size, String shortCircuitUser,
|
|
|
+ int readOffset, boolean shortCircuitFails) throws IOException, InterruptedException {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
|
|
|
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
|
|
|
+
|
|
|
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
|
|
|
+ .format(true).build();
|
|
|
+ FileSystem fs = cluster.getFileSystem();
|
|
|
+ // check that / exists
|
|
|
+ Path path = new Path("/");
|
|
|
+ URI uri = cluster.getURI();
|
|
|
+ assertTrue("/ should be a directory", fs.getFileStatus(path)
|
|
|
+ .isDirectory() == true);
|
|
|
+
|
|
|
+ byte[] fileData = AppendTestUtil.randomBytes(seed, size);
|
|
|
+ Path file1 = new Path("filelocal.dat");
|
|
|
+ FSDataOutputStream stm = createFile(fs, file1, 1);
|
|
|
+
|
|
|
+ stm.write(fileData);
|
|
|
+ stm.close();
|
|
|
+ try {
|
|
|
+ checkFileContent(uri, file1, fileData, readOffset, shortCircuitUser, conf, shortCircuitFails);
|
|
|
+ //RemoteBlockReader have unsupported method read(ByteBuffer bf)
|
|
|
+ assertTrue("RemoteBlockReader unsupported method read(ByteBuffer bf) error",
|
|
|
+ checkUnsupportedMethod(fs, file1, fileData, readOffset));
|
|
|
+ } catch(IOException e) {
|
|
|
+ throw new IOException("doTestShortCircuitReadWithRemoteBlockReader ex error ", e);
|
|
|
+ } catch(InterruptedException inEx) {
|
|
|
+ throw inEx;
|
|
|
+ } finally {
|
|
|
+ fs.close();
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean checkUnsupportedMethod(FileSystem fs, Path file,
|
|
|
+ byte[] expected, int readOffset) throws IOException {
|
|
|
+ HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(file);
|
|
|
+ ByteBuffer actual = ByteBuffer.allocateDirect(expected.length - readOffset);
|
|
|
+ IOUtils.skipFully(stm, readOffset);
|
|
|
+ try {
|
|
|
+ stm.read(actual);
|
|
|
+ } catch(UnsupportedOperationException unex) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
}
|