|
@@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.net.DomainPeer;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm;
|
|
|
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo;
|
|
@@ -73,6 +74,9 @@ import org.apache.hadoop.util.Time;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Assume;
|
|
|
import org.junit.Test;
|
|
|
+import org.mockito.Mockito;
|
|
|
+import org.mockito.invocation.InvocationOnMock;
|
|
|
+import org.mockito.stubbing.Answer;
|
|
|
|
|
|
import com.google.common.base.Preconditions;
|
|
|
import com.google.common.base.Supplier;
|
|
@@ -621,6 +625,18 @@ public class TestShortCircuitCache {
|
|
|
sockDir.close();
|
|
|
}
|
|
|
|
|
|
+ static private void checkNumberOfSegmentsAndSlots(final int expectedSegments,
|
|
|
+ final int expectedSlots, ShortCircuitRegistry registry) {
|
|
|
+ registry.visit(new ShortCircuitRegistry.Visitor() {
|
|
|
+ @Override
|
|
|
+ public void accept(HashMap<ShmId, RegisteredShm> segments,
|
|
|
+ HashMultimap<ExtendedBlockId, Slot> slots) {
|
|
|
+ Assert.assertEquals(expectedSegments, segments.size());
|
|
|
+ Assert.assertEquals(expectedSlots, slots.size());
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
public static class TestCleanupFailureInjector
|
|
|
extends BlockReaderFactory.FailureInjector {
|
|
|
@Override
|
|
@@ -664,16 +680,67 @@ public class TestShortCircuitCache {
|
|
|
GenericTestUtils.assertExceptionContains("TCP reads were disabled for " +
|
|
|
"testing, but we failed to do a non-TCP read.", t);
|
|
|
}
|
|
|
- ShortCircuitRegistry registry =
|
|
|
- cluster.getDataNodes().get(0).getShortCircuitRegistry();
|
|
|
- registry.visit(new ShortCircuitRegistry.Visitor() {
|
|
|
+ checkNumberOfSegmentsAndSlots(1, 1,
|
|
|
+ cluster.getDataNodes().get(0).getShortCircuitRegistry());
|
|
|
+ cluster.shutdown();
|
|
|
+ sockDir.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Regression test for HADOOP-11802
|
|
|
+ @Test(timeout=60000)
|
|
|
+ public void testDataXceiverHandlesRequestShortCircuitShmFailure()
|
|
|
+ throws Exception {
|
|
|
+ BlockReaderTestUtil.enableShortCircuitShmTracing();
|
|
|
+ TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
|
|
|
+ Configuration conf = createShortCircuitConf(
|
|
|
+ "testDataXceiverHandlesRequestShortCircuitShmFailure", sockDir);
|
|
|
+ conf.setLong(DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
|
|
|
+ 1000000000L);
|
|
|
+ MiniDFSCluster cluster =
|
|
|
+ new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
|
|
+ cluster.waitActive();
|
|
|
+ DistributedFileSystem fs = cluster.getFileSystem();
|
|
|
+ final Path TEST_PATH1 = new Path("/test_file1");
|
|
|
+ DFSTestUtil.createFile(fs, TEST_PATH1, 4096,
|
|
|
+ (short)1, 0xFADE1);
|
|
|
+ LOG.info("Setting failure injector and performing a read which " +
|
|
|
+ "should fail...");
|
|
|
+ DataNodeFaultInjector failureInjector = Mockito.mock(DataNodeFaultInjector.class);
|
|
|
+ Mockito.doAnswer(new Answer<Void>() {
|
|
|
@Override
|
|
|
- public void accept(HashMap<ShmId, RegisteredShm> segments,
|
|
|
- HashMultimap<ExtendedBlockId, Slot> slots) {
|
|
|
- Assert.assertEquals(1, segments.size());
|
|
|
- Assert.assertEquals(1, slots.size());
|
|
|
+ public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ throw new IOException("injected error into sendShmResponse");
|
|
|
}
|
|
|
- });
|
|
|
+ }).when(failureInjector).sendShortCircuitShmResponse();
|
|
|
+ DataNodeFaultInjector prevInjector = DataNodeFaultInjector.instance;
|
|
|
+ DataNodeFaultInjector.instance = failureInjector;
|
|
|
+
|
|
|
+ try {
|
|
|
+ // The first read will try to allocate a shared memory segment and slot.
|
|
|
+ // The shared memory segment allocation will fail because of the failure
|
|
|
+ // injector.
|
|
|
+ DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
|
|
|
+ Assert.fail("expected readFileBuffer to fail, but it succeeded.");
|
|
|
+ } catch (Throwable t) {
|
|
|
+ GenericTestUtils.assertExceptionContains("TCP reads were disabled for " +
|
|
|
+ "testing, but we failed to do a non-TCP read.", t);
|
|
|
+ }
|
|
|
+
|
|
|
+ checkNumberOfSegmentsAndSlots(0, 0,
|
|
|
+ cluster.getDataNodes().get(0).getShortCircuitRegistry());
|
|
|
+
|
|
|
+ LOG.info("Clearing failure injector and performing another read...");
|
|
|
+ DataNodeFaultInjector.instance = prevInjector;
|
|
|
+
|
|
|
+ fs.getClient().getClientContext().getDomainSocketFactory().clearPathMap();
|
|
|
+
|
|
|
+ // The second read should succeed.
|
|
|
+ DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
|
|
|
+
|
|
|
+ // We should have added a new short-circuit shared memory segment and slot.
|
|
|
+ checkNumberOfSegmentsAndSlots(1, 1,
|
|
|
+ cluster.getDataNodes().get(0).getShortCircuitRegistry());
|
|
|
+
|
|
|
cluster.shutdown();
|
|
|
sockDir.close();
|
|
|
}
|