|
@@ -18,9 +18,11 @@
|
|
|
package org.apache.hadoop.hdfs;
|
|
|
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
+import static org.junit.Assert.assertFalse;
|
|
|
|
|
|
import java.io.EOFException;
|
|
|
import java.io.IOException;
|
|
|
+import java.net.URI;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
|
|
|
@@ -32,6 +34,7 @@ import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
|
|
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
|
|
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
@@ -85,9 +88,20 @@ public class TestShortCircuitLocalRead {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private static String getCurrentUser() throws IOException {
|
|
|
+ return UserGroupInformation.getCurrentUser().getShortUserName();
|
|
|
+ }
|
|
|
|
|
|
- static void checkFileContent(FileSystem fs, Path name, byte[] expected,
|
|
|
- int readOffset) throws IOException {
|
|
|
+ /** Check file content, reading as user {@code readingUser} */
|
|
|
+ static void checkFileContent(URI uri, Path name, byte[] expected,
|
|
|
+ int readOffset, String readingUser, Configuration conf,
|
|
|
+ boolean shortCircuitFails)
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+ // Ensure short circuit is enabled
|
|
|
+ DistributedFileSystem fs = getFileSystem(readingUser, uri, conf);
|
|
|
+ assertTrue(fs.getClient().getShortCircuitLocalReads());
|
|
|
+
|
|
|
FSDataInputStream stm = fs.open(name);
|
|
|
byte[] actual = new byte[expected.length-readOffset];
|
|
|
stm.readFully(readOffset, actual);
|
|
@@ -112,6 +126,11 @@ public class TestShortCircuitLocalRead {
|
|
|
nread += nbytes;
|
|
|
}
|
|
|
checkData(actual, readOffset, expected, "Read 3");
|
|
|
+
|
|
|
+ if (shortCircuitFails) {
|
|
|
+ // short circuit should be disabled due to failure
|
|
|
+ assertFalse(fs.getClient().getShortCircuitLocalReads());
|
|
|
+ }
|
|
|
stm.close();
|
|
|
}
|
|
|
|
|
@@ -123,11 +142,15 @@ public class TestShortCircuitLocalRead {
|
|
|
return arr;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Verifies that reading a file with the direct read(ByteBuffer) api gives the expected set of bytes.
|
|
|
- */
|
|
|
- static void checkFileContentDirect(FileSystem fs, Path name, byte[] expected,
|
|
|
- int readOffset) throws IOException {
|
|
|
+ /** Check the file content, reading as user {@code readingUser} */
|
|
|
+ static void checkFileContentDirect(URI uri, Path name, byte[] expected,
|
|
|
+ int readOffset, String readingUser, Configuration conf,
|
|
|
+ boolean shortCircuitFails)
|
|
|
+ throws IOException, InterruptedException {
|
|
|
+ // Ensure short circuit is enabled
|
|
|
+ DistributedFileSystem fs = getFileSystem(readingUser, uri, conf);
|
|
|
+ assertTrue(fs.getClient().getShortCircuitLocalReads());
|
|
|
+
|
|
|
HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(name);
|
|
|
|
|
|
ByteBuffer actual = ByteBuffer.allocateDirect(expected.length - readOffset);
|
|
@@ -157,21 +180,33 @@ public class TestShortCircuitLocalRead {
|
|
|
nread += nbytes;
|
|
|
}
|
|
|
checkData(arrayFromByteBuffer(actual), readOffset, expected, "Read 3");
|
|
|
+ if (shortCircuitFails) {
|
|
|
+ // short circuit should be disabled due to failure
|
|
|
+ assertFalse(fs.getClient().getShortCircuitLocalReads());
|
|
|
+ }
|
|
|
stm.close();
|
|
|
}
|
|
|
|
|
|
+ public void doTestShortCircuitRead(boolean ignoreChecksum, int size,
|
|
|
+ int readOffset) throws IOException, InterruptedException {
|
|
|
+ String shortCircuitUser = getCurrentUser();
|
|
|
+ doTestShortCircuitRead(ignoreChecksum, size, readOffset, shortCircuitUser,
|
|
|
+ shortCircuitUser, false);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Test that file data can be read by reading the block file
|
|
|
* directly from the local store.
|
|
|
*/
|
|
|
public void doTestShortCircuitRead(boolean ignoreChecksum, int size,
|
|
|
- int readOffset) throws IOException {
|
|
|
+ int readOffset, String shortCircuitUser, String readingUser,
|
|
|
+ boolean shortCircuitFails) throws IOException, InterruptedException {
|
|
|
Configuration conf = new Configuration();
|
|
|
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
|
|
|
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
|
|
|
ignoreChecksum);
|
|
|
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
|
|
|
- UserGroupInformation.getCurrentUser().getShortUserName());
|
|
|
+ shortCircuitUser);
|
|
|
if (simulatedStorage) {
|
|
|
SimulatedFSDataset.setFactory(conf);
|
|
|
}
|
|
@@ -184,53 +219,88 @@ public class TestShortCircuitLocalRead {
|
|
|
assertTrue("/ should be a directory", fs.getFileStatus(path)
|
|
|
.isDirectory() == true);
|
|
|
|
|
|
- byte[] fileData = AppendTestUtil.randomBytes(seed, size);
|
|
|
// create a new file in home directory. Do not close it.
|
|
|
- Path file1 = new Path("filelocal.dat");
|
|
|
+ byte[] fileData = AppendTestUtil.randomBytes(seed, size);
|
|
|
+ Path file1 = fs.makeQualified(new Path("filelocal.dat"));
|
|
|
FSDataOutputStream stm = createFile(fs, file1, 1);
|
|
|
-
|
|
|
- // write to file
|
|
|
stm.write(fileData);
|
|
|
stm.close();
|
|
|
- checkFileContent(fs, file1, fileData, readOffset);
|
|
|
- checkFileContentDirect(fs, file1, fileData, readOffset);
|
|
|
+
|
|
|
+ URI uri = cluster.getURI();
|
|
|
+ checkFileContent(uri, file1, fileData, readOffset, readingUser, conf,
|
|
|
+ shortCircuitFails);
|
|
|
+ checkFileContentDirect(uri, file1, fileData, readOffset, readingUser,
|
|
|
+ conf, shortCircuitFails);
|
|
|
} finally {
|
|
|
fs.close();
|
|
|
cluster.shutdown();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- public void testFileLocalReadNoChecksum() throws IOException {
|
|
|
+ @Test(timeout=10000)
|
|
|
+ public void testFileLocalReadNoChecksum() throws Exception {
|
|
|
doTestShortCircuitRead(true, 3*blockSize+100, 0);
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- public void testFileLocalReadChecksum() throws IOException {
|
|
|
+ @Test(timeout=10000)
|
|
|
+ public void testFileLocalReadChecksum() throws Exception {
|
|
|
doTestShortCircuitRead(false, 3*blockSize+100, 0);
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- public void testSmallFileLocalRead() throws IOException {
|
|
|
+ @Test(timeout=10000)
|
|
|
+ public void testSmallFileLocalRead() throws Exception {
|
|
|
doTestShortCircuitRead(false, 13, 0);
|
|
|
doTestShortCircuitRead(false, 13, 5);
|
|
|
doTestShortCircuitRead(true, 13, 0);
|
|
|
doTestShortCircuitRead(true, 13, 5);
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- public void testReadFromAnOffset() throws IOException {
|
|
|
+ /**
|
|
|
+ * Try a short circuit from a reader that is not allowed to
|
|
|
+ * to use short circuit. The test ensures reader falls back to non
|
|
|
+ * shortcircuit reads when shortcircuit is disallowed.
|
|
|
+ */
|
|
|
+ @Test(timeout=10000)
|
|
|
+ public void testLocalReadFallback() throws Exception {
|
|
|
+ doTestShortCircuitRead(true, 13, 0, getCurrentUser(), "notallowed", true);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout=10000)
|
|
|
+ public void testReadFromAnOffset() throws Exception {
|
|
|
doTestShortCircuitRead(false, 3*blockSize+100, 777);
|
|
|
doTestShortCircuitRead(true, 3*blockSize+100, 777);
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- public void testLongFile() throws IOException {
|
|
|
+ @Test(timeout=10000)
|
|
|
+ public void testLongFile() throws Exception {
|
|
|
doTestShortCircuitRead(false, 10*blockSize+100, 777);
|
|
|
doTestShortCircuitRead(true, 10*blockSize+100, 777);
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
+ private ClientDatanodeProtocol getProxy(UserGroupInformation ugi,
|
|
|
+ final DatanodeID dnInfo, final Configuration conf) throws IOException,
|
|
|
+ InterruptedException {
|
|
|
+ return ugi.doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
|
|
|
+ @Override
|
|
|
+ public ClientDatanodeProtocol run() throws Exception {
|
|
|
+ return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf, 60000,
|
|
|
+ false);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private static DistributedFileSystem getFileSystem(String user, final URI uri,
|
|
|
+ final Configuration conf) throws InterruptedException, IOException {
|
|
|
+ UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
|
|
|
+ return ugi.doAs(new PrivilegedExceptionAction<DistributedFileSystem>() {
|
|
|
+ @Override
|
|
|
+ public DistributedFileSystem run() throws Exception {
|
|
|
+ return (DistributedFileSystem)FileSystem.get(uri, conf);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout=10000)
|
|
|
public void testGetBlockLocalPathInfo() throws IOException, InterruptedException {
|
|
|
final Configuration conf = new Configuration();
|
|
|
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
|
|
@@ -253,15 +323,7 @@ public class TestShortCircuitLocalRead {
|
|
|
ExtendedBlock blk = new ExtendedBlock(lb.get(0).getBlock());
|
|
|
Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken();
|
|
|
final DatanodeInfo dnInfo = lb.get(0).getLocations()[0];
|
|
|
- ClientDatanodeProtocol proxy = aUgi1
|
|
|
- .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
|
|
|
- @Override
|
|
|
- public ClientDatanodeProtocol run() throws Exception {
|
|
|
- return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
|
|
|
- 60000, false);
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
+ ClientDatanodeProtocol proxy = getProxy(aUgi1, dnInfo, conf);
|
|
|
// This should succeed
|
|
|
BlockLocalPathInfo blpi = proxy.getBlockLocalPathInfo(blk, token);
|
|
|
Assert.assertEquals(
|
|
@@ -269,14 +331,7 @@ public class TestShortCircuitLocalRead {
|
|
|
blpi.getBlockPath());
|
|
|
|
|
|
// Try with the other allowed user
|
|
|
- proxy = aUgi2
|
|
|
- .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
|
|
|
- @Override
|
|
|
- public ClientDatanodeProtocol run() throws Exception {
|
|
|
- return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
|
|
|
- 60000, false);
|
|
|
- }
|
|
|
- });
|
|
|
+ proxy = getProxy(aUgi2, dnInfo, conf);
|
|
|
|
|
|
// This should succeed as well
|
|
|
blpi = proxy.getBlockLocalPathInfo(blk, token);
|
|
@@ -287,14 +342,7 @@ public class TestShortCircuitLocalRead {
|
|
|
// Now try with a disallowed user
|
|
|
UserGroupInformation bUgi = UserGroupInformation
|
|
|
.createRemoteUser("notalloweduser");
|
|
|
- proxy = bUgi
|
|
|
- .doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
|
|
|
- @Override
|
|
|
- public ClientDatanodeProtocol run() throws Exception {
|
|
|
- return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
|
|
|
- 60000, false);
|
|
|
- }
|
|
|
- });
|
|
|
+ proxy = getProxy(bUgi, dnInfo, conf);
|
|
|
try {
|
|
|
proxy.getBlockLocalPathInfo(blk, token);
|
|
|
Assert.fail("The call should have failed as " + bUgi.getShortUserName()
|
|
@@ -309,14 +357,14 @@ public class TestShortCircuitLocalRead {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
+ @Test(timeout=10000)
|
|
|
public void testSkipWithVerifyChecksum() throws IOException {
|
|
|
int size = blockSize;
|
|
|
Configuration conf = new Configuration();
|
|
|
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
|
|
|
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
|
|
|
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
|
|
|
- UserGroupInformation.getCurrentUser().getShortUserName());
|
|
|
+ getCurrentUser());
|
|
|
if (simulatedStorage) {
|
|
|
SimulatedFSDataset.setFactory(conf);
|
|
|
}
|
|
@@ -356,7 +404,7 @@ public class TestShortCircuitLocalRead {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Test to run benchmarks between shortcircuit read vs regular read with
|
|
|
+ * Test to run benchmarks between short circuit read vs regular read with
|
|
|
* specified number of threads simultaneously reading.
|
|
|
* <br>
|
|
|
* Run this using the following command:
|
|
@@ -374,7 +422,7 @@ public class TestShortCircuitLocalRead {
|
|
|
int threadCount = Integer.valueOf(args[2]);
|
|
|
|
|
|
// Setup create a file
|
|
|
- Configuration conf = new Configuration();
|
|
|
+ final Configuration conf = new Configuration();
|
|
|
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, shortcircuit);
|
|
|
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
|
|
|
checksum);
|
|
@@ -400,9 +448,13 @@ public class TestShortCircuitLocalRead {
|
|
|
public void run() {
|
|
|
for (int i = 0; i < iteration; i++) {
|
|
|
try {
|
|
|
- checkFileContent(fs, file1, dataToWrite, 0);
|
|
|
+ String user = getCurrentUser();
|
|
|
+ checkFileContent(fs.getUri(), file1, dataToWrite, 0, user, conf,
|
|
|
+ true);
|
|
|
} catch (IOException e) {
|
|
|
e.printStackTrace();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ e.printStackTrace();
|
|
|
}
|
|
|
}
|
|
|
}
|