|
@@ -22,6 +22,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONT
|
|
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_KEY;
|
|
|
import static org.hamcrest.CoreMatchers.equalTo;
|
|
|
|
|
|
import java.io.File;
|
|
@@ -36,9 +38,8 @@ import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
-import org.apache.commons.logging.Log;
|
|
|
-import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.hdfs.BlockReader;
|
|
@@ -54,18 +55,29 @@ import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisito
|
|
|
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.Visitor;
|
|
|
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
|
|
|
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
|
|
|
+import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.net.unix.DomainSocket;
|
|
|
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
|
|
+import org.apache.hadoop.test.GenericTestUtils;
|
|
|
+import org.hamcrest.CoreMatchers;
|
|
|
import org.junit.After;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Assume;
|
|
|
import org.junit.Before;
|
|
|
+import org.junit.Rule;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
import com.google.common.util.concurrent.Uninterruptibles;
|
|
|
+import org.junit.rules.Timeout;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
public class TestBlockReaderFactory {
|
|
|
- static final Log LOG = LogFactory.getLog(TestBlockReaderFactory.class);
|
|
|
+ static final Logger LOG =
|
|
|
+ LoggerFactory.getLogger(TestBlockReaderFactory.class);
|
|
|
+
|
|
|
+ @Rule
|
|
|
+ public final Timeout globalTimeout = new Timeout(180000);
|
|
|
|
|
|
@Before
|
|
|
public void init() {
|
|
@@ -209,7 +221,7 @@ public class TestBlockReaderFactory {
|
|
|
* occurs); however, the failure result should not be cached. We want
|
|
|
* to be able to retry later and succeed.
|
|
|
*/
|
|
|
- @Test(timeout=60000)
|
|
|
+ @Test
|
|
|
public void testShortCircuitCacheTemporaryFailure()
|
|
|
throws Exception {
|
|
|
BlockReaderTestUtil.enableBlockReaderFactoryTracing();
|
|
@@ -302,7 +314,96 @@ public class TestBlockReaderFactory {
|
|
|
Assert.assertFalse(testFailed.get());
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
+ /**
|
|
|
+ * Test that by default, reads after a failure does not go through SCR.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testShortCircuitCacheUnbufferDefault() throws Exception {
|
|
|
+ testShortCircuitCacheUnbufferWithDisableInterval(
|
|
|
+ DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_DEFAULT, true);
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * Test the case where if we disable the cache in
|
|
|
+ * {@link org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory}, reads
|
|
|
+ * after a failure still goes through SCR.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testShortCircuitCacheUnbufferDisabled() throws Exception {
|
|
|
+ testShortCircuitCacheUnbufferWithDisableInterval(0, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void testShortCircuitCacheUnbufferWithDisableInterval(
|
|
|
+ final long interval, final boolean disabled) throws Exception {
|
|
|
+ final String testName = GenericTestUtils.getMethodName();
|
|
|
+ BlockReaderTestUtil.enableBlockReaderFactoryTracing();
|
|
|
+ try (TemporarySocketDirectory sockDir = new TemporarySocketDirectory()) {
|
|
|
+ Configuration conf = createShortCircuitConf(testName, sockDir);
|
|
|
+ conf.set(DFS_CLIENT_CONTEXT, testName + interval + disabled);
|
|
|
+ conf.setLong(DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_KEY, interval);
|
|
|
+ Configuration serverConf = new Configuration(conf);
|
|
|
+ MiniDFSCluster.Builder builder =
|
|
|
+ new MiniDFSCluster.Builder(serverConf).numDataNodes(1);
|
|
|
+ try (MiniDFSCluster cluster = builder.build();
|
|
|
+ DistributedFileSystem dfs = (DistributedFileSystem) FileSystem
|
|
|
+ .get(cluster.getURI(0), conf)) {
|
|
|
+ cluster.waitActive();
|
|
|
+ final Path testFile = new Path("/test_file");
|
|
|
+ final int testFileLen = 4000;
|
|
|
+ final int seed = 0xFADED;
|
|
|
+ DFSTestUtil.createFile(dfs, testFile, testFileLen, (short) 1, seed);
|
|
|
+ final byte[] expected = DFSTestUtil.
|
|
|
+ calculateFileContentsFromSeed(seed, testFileLen);
|
|
|
+
|
|
|
+ try (FSDataInputStream in = dfs.open(testFile)) {
|
|
|
+ Assert.assertEquals(0,
|
|
|
+ dfs.getClient().getClientContext().getShortCircuitCache()
|
|
|
+ .getReplicaInfoMapSize());
|
|
|
+
|
|
|
+ final byte[] buf = new byte[testFileLen];
|
|
|
+ IOUtils.readFully(in, buf, 0, testFileLen);
|
|
|
+ validateReadResult(dfs, expected, buf, 1);
|
|
|
+
|
|
|
+ // Set cache size to 0 so the replica marked evictable by unbuffer
|
|
|
+ // will be purged immediately.
|
|
|
+ dfs.getClient().getClientContext().getShortCircuitCache()
|
|
|
+ .setMaxTotalSize(0);
|
|
|
+ LOG.info("Unbuffering");
|
|
|
+ in.unbuffer();
|
|
|
+ Assert.assertEquals(0,
|
|
|
+ dfs.getClient().getClientContext().getShortCircuitCache()
|
|
|
+ .getReplicaInfoMapSize());
|
|
|
+
|
|
|
+ DFSTestUtil.appendFile(dfs, testFile, "append more data");
|
|
|
+
|
|
|
+ // This read will force a new replica read via TCP.
|
|
|
+ Arrays.fill(buf, (byte) 0);
|
|
|
+ in.seek(0);
|
|
|
+ IOUtils.readFully(in, buf, 0, testFileLen);
|
|
|
+ validateReadResult(dfs, expected, buf, 0);
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG.info("Reading {} again.", testFile);
|
|
|
+ try (FSDataInputStream in = dfs.open(testFile)) {
|
|
|
+ final byte[] buf = new byte[testFileLen];
|
|
|
+ Arrays.fill(buf, (byte) 0);
|
|
|
+ IOUtils.readFully(in, buf, 0, testFileLen);
|
|
|
+ final int expectedMapSize = disabled ? 0 : 1;
|
|
|
+ validateReadResult(dfs, expected, buf, expectedMapSize);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void validateReadResult(final DistributedFileSystem dfs,
|
|
|
+ final byte[] expected, final byte[] actual,
|
|
|
+ final int expectedScrRepMapSize) {
|
|
|
+ Assert.assertThat(expected, CoreMatchers.is(actual));
|
|
|
+ Assert.assertEquals(expectedScrRepMapSize,
|
|
|
+ dfs.getClient().getClientContext().getShortCircuitCache()
|
|
|
+ .getReplicaInfoMapSize());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
* Test that a client which supports short-circuit reads using
|
|
|
* shared memory can fall back to not using shared memory when
|
|
|
* the server doesn't support it.
|