|
@@ -72,9 +72,9 @@ import org.junit.Test;
|
|
|
/**
|
|
|
* Test for short circuit read functionality using {@link BlockReaderLocal}.
|
|
|
* When a block is being read by a client is on the local datanode, instead of
|
|
|
- * using {@link DataTransferProtocol} and connect to datanode,
|
|
|
- * the short circuit read allows reading the file directly
|
|
|
- * from the files on the local file system.
|
|
|
+ * using {@link DataTransferProtocol} and connect to datanode, the short circuit
|
|
|
+ * read allows reading the file directly from the files on the local file
|
|
|
+ * system.
|
|
|
*/
|
|
|
public class TestShortCircuitLocalRead {
|
|
|
private static TemporarySocketDirectory sockDir;
|
|
@@ -195,8 +195,7 @@ public class TestShortCircuitLocalRead {
|
|
|
|
|
|
HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(name);
|
|
|
|
|
|
- ByteBuffer actual =
|
|
|
- ByteBuffer.allocateDirect(expected.length - readOffset);
|
|
|
+ ByteBuffer actual = ByteBuffer.allocateDirect(expected.length - readOffset);
|
|
|
|
|
|
IOUtils.skipFully(stm, readOffset);
|
|
|
|
|
@@ -231,8 +230,7 @@ public class TestShortCircuitLocalRead {
|
|
|
|
|
|
public void doTestShortCircuitReadLegacy(boolean ignoreChecksum, int size,
|
|
|
int readOffset, String shortCircuitUser, String readingUser,
|
|
|
- boolean legacyShortCircuitFails)
|
|
|
- throws IOException, InterruptedException {
|
|
|
+ boolean legacyShortCircuitFails) throws IOException, InterruptedException {
|
|
|
doTestShortCircuitReadImpl(ignoreChecksum, size, readOffset,
|
|
|
shortCircuitUser, readingUser, legacyShortCircuitFails);
|
|
|
}
|
|
@@ -249,8 +247,7 @@ public class TestShortCircuitLocalRead {
|
|
|
*/
|
|
|
public void doTestShortCircuitReadImpl(boolean ignoreChecksum, int size,
|
|
|
int readOffset, String shortCircuitUser, String readingUser,
|
|
|
- boolean legacyShortCircuitFails)
|
|
|
- throws IOException, InterruptedException {
|
|
|
+ boolean legacyShortCircuitFails) throws IOException, InterruptedException {
|
|
|
Configuration conf = new Configuration();
|
|
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
|
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
|
|
@@ -265,8 +262,7 @@ public class TestShortCircuitLocalRead {
|
|
|
if (shortCircuitUser != null) {
|
|
|
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
|
|
|
shortCircuitUser);
|
|
|
- conf.setBoolean(
|
|
|
- HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
|
|
|
+ conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
|
|
|
}
|
|
|
if (simulatedStorage) {
|
|
|
SimulatedFSDataset.setFactory(conf);
|
|
@@ -328,8 +324,7 @@ public class TestShortCircuitLocalRead {
|
|
|
*/
|
|
|
@Test(timeout=60000)
|
|
|
public void testLocalReadFallback() throws Exception {
|
|
|
- doTestShortCircuitReadLegacy(
|
|
|
- true, 13, 0, getCurrentUser(), "notallowed", true);
|
|
|
+ doTestShortCircuitReadLegacy(true, 13, 0, getCurrentUser(), "notallowed", true);
|
|
|
}
|
|
|
|
|
|
@Test(timeout=60000)
|
|
@@ -371,9 +366,8 @@ public class TestShortCircuitLocalRead {
|
|
|
ExtendedBlock blk = new ExtendedBlock(lb.get(0).getBlock());
|
|
|
Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken();
|
|
|
final DatanodeInfo dnInfo = lb.get(0).getLocations()[0];
|
|
|
- ClientDatanodeProtocol proxy =
|
|
|
- DFSUtilClient.createClientDatanodeProtocolProxy(
|
|
|
- dnInfo, conf, 60000, false);
|
|
|
+ ClientDatanodeProtocol proxy =
|
|
|
+ DFSUtilClient.createClientDatanodeProtocolProxy(dnInfo, conf, 60000, false);
|
|
|
try {
|
|
|
proxy.getBlockLocalPathInfo(blk, token);
|
|
|
Assert.fail("The call should have failed as this user "
|
|
@@ -393,8 +387,7 @@ public class TestShortCircuitLocalRead {
|
|
|
int size = blockSize;
|
|
|
Configuration conf = new Configuration();
|
|
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
|
|
- conf.setBoolean(
|
|
|
- HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, false);
|
|
|
+ conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, false);
|
|
|
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
|
|
new File(sockDir.getDir(),
|
|
|
"testSkipWithVerifyChecksum._PORT.sock").getAbsolutePath());
|
|
@@ -441,8 +434,7 @@ public class TestShortCircuitLocalRead {
|
|
|
MiniDFSCluster cluster = null;
|
|
|
HdfsConfiguration conf = new HdfsConfiguration();
|
|
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
|
|
- conf.setBoolean(
|
|
|
- HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, false);
|
|
|
+ conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, false);
|
|
|
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
|
|
new File(sockDir.getDir(),
|
|
|
"testHandleTruncatedBlockFile._PORT.sock").getAbsolutePath());
|
|
@@ -531,8 +523,8 @@ public class TestShortCircuitLocalRead {
|
|
|
System.out.println("Usage: test shortcircuit checksum threadCount");
|
|
|
System.exit(1);
|
|
|
}
|
|
|
- boolean shortcircuit = Boolean.parseBoolean(args[0]);
|
|
|
- boolean checksum = Boolean.parseBoolean(args[1]);
|
|
|
+ boolean shortcircuit = Boolean.valueOf(args[0]);
|
|
|
+ boolean checksum = Boolean.valueOf(args[1]);
|
|
|
int threadCount = Integer.parseInt(args[2]);
|
|
|
|
|
|
// Setup create a file
|
|
@@ -543,8 +535,7 @@ public class TestShortCircuitLocalRead {
|
|
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
|
|
|
checksum);
|
|
|
|
|
|
- // Override fileSize and DATA_TO_WRITE to
|
|
|
- // much larger values for benchmark test
|
|
|
+ // Override fileSize and DATA_TO_WRITE to much larger values for benchmark test
|
|
|
int fileSize = 1000 * blockSize + 100; // File with 1000 blocks
|
|
|
final byte [] dataToWrite = AppendTestUtil.randomBytes(seed, fileSize);
|
|
|
|
|
@@ -566,8 +557,7 @@ public class TestShortCircuitLocalRead {
|
|
|
for (int i = 0; i < iteration; i++) {
|
|
|
try {
|
|
|
String user = getCurrentUser();
|
|
|
- checkFileContent(
|
|
|
- fs.getUri(), file1, dataToWrite, 0, user, conf, true);
|
|
|
+ checkFileContent(fs.getUri(), file1, dataToWrite, 0, user, conf, true);
|
|
|
} catch (IOException e) {
|
|
|
e.printStackTrace();
|
|
|
} catch (InterruptedException e) {
|
|
@@ -600,13 +590,11 @@ public class TestShortCircuitLocalRead {
|
|
|
* through RemoteBlockReader
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public void doTestShortCircuitReadWithRemoteBlockReader(
|
|
|
- boolean ignoreChecksum, int size, String shortCircuitUser,
|
|
|
- int readOffset, boolean shortCircuitFails)
|
|
|
- throws IOException, InterruptedException {
|
|
|
+ public void doTestShortCircuitReadWithRemoteBlockReader(boolean ignoreChecksum,
|
|
|
+ int size, String shortCircuitUser, int readOffset,
|
|
|
+ boolean shortCircuitFails) throws IOException, InterruptedException {
|
|
|
Configuration conf = new Configuration();
|
|
|
- conf.setBoolean(
|
|
|
- HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
|
|
|
+ conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
|
|
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
|
|
|
|
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
|
|
@@ -615,8 +603,7 @@ public class TestShortCircuitLocalRead {
|
|
|
// check that / exists
|
|
|
Path path = new Path("/");
|
|
|
URI uri = cluster.getURI();
|
|
|
- assertTrue(
|
|
|
- "/ should be a directory", fs.getFileStatus(path).isDirectory());
|
|
|
+ assertTrue("/ should be a directory", fs.getFileStatus(path).isDirectory());
|
|
|
|
|
|
byte[] fileData = AppendTestUtil.randomBytes(seed, size);
|
|
|
Path file1 = new Path("filelocal.dat");
|
|
@@ -628,12 +615,10 @@ public class TestShortCircuitLocalRead {
|
|
|
checkFileContent(uri, file1, fileData, readOffset, shortCircuitUser,
|
|
|
conf, shortCircuitFails);
|
|
|
//RemoteBlockReader have unsupported method read(ByteBuffer bf)
|
|
|
- assertTrue(
|
|
|
- "RemoteBlockReader unsupported method read(ByteBuffer bf) error",
|
|
|
- checkUnsupportedMethod(fs, file1, fileData, readOffset));
|
|
|
+ assertTrue("RemoteBlockReader unsupported method read(ByteBuffer bf) error",
|
|
|
+ checkUnsupportedMethod(fs, file1, fileData, readOffset));
|
|
|
} catch(IOException e) {
|
|
|
- throw new IOException(
|
|
|
- "doTestShortCircuitReadWithRemoteBlockReader ex error ", e);
|
|
|
+ throw new IOException("doTestShortCircuitReadWithRemoteBlockReader ex error ", e);
|
|
|
} catch(InterruptedException inEx) {
|
|
|
throw inEx;
|
|
|
} finally {
|
|
@@ -645,8 +630,7 @@ public class TestShortCircuitLocalRead {
|
|
|
private boolean checkUnsupportedMethod(FileSystem fs, Path file,
|
|
|
byte[] expected, int readOffset) throws IOException {
|
|
|
HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(file);
|
|
|
- ByteBuffer actual =
|
|
|
- ByteBuffer.allocateDirect(expected.length - readOffset);
|
|
|
+ ByteBuffer actual = ByteBuffer.allocateDirect(expected.length - readOffset);
|
|
|
IOUtils.skipFully(stm, readOffset);
|
|
|
try {
|
|
|
stm.read(actual);
|