|
@@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
|
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
|
|
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
|
|
|
import static org.hamcrest.CoreMatchers.equalTo;
|
|
|
|
|
|
import java.io.DataOutputStream;
|
|
@@ -28,6 +29,7 @@ import java.io.File;
|
|
|
import java.io.FileInputStream;
|
|
|
import java.io.FileOutputStream;
|
|
|
import java.io.IOException;
|
|
|
+import java.nio.channels.ClosedChannelException;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Iterator;
|
|
@@ -910,4 +912,94 @@ public class TestShortCircuitCache {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ @Test(timeout = 60000)
|
|
|
+ public void testDomainSocketClosedByDN() throws Exception {
|
|
|
+ TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
|
|
|
+ Configuration conf =
|
|
|
+ createShortCircuitConf("testDomainSocketClosedByDN", sockDir);
|
|
|
+ MiniDFSCluster cluster =
|
|
|
+ new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
|
|
+
|
|
|
+ try {
|
|
|
+ cluster.waitActive();
|
|
|
+ DistributedFileSystem fs = cluster.getFileSystem();
|
|
|
+ final ShortCircuitCache cache =
|
|
|
+ fs.getClient().getClientContext().getShortCircuitCache();
|
|
|
+ DomainPeer peer = getDomainPeerToDn(conf);
|
|
|
+ MutableBoolean usedPeer = new MutableBoolean(false);
|
|
|
+ ExtendedBlockId blockId = new ExtendedBlockId(123, "xyz");
|
|
|
+ final DatanodeInfo datanode = new DatanodeInfo.DatanodeInfoBuilder()
|
|
|
+ .setNodeID(cluster.getDataNodes().get(0).getDatanodeId()).build();
|
|
|
+ // Allocating the first shm slot requires using up a peer.
|
|
|
+ Slot slot1 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
|
|
|
+ "testReleaseSlotReuseDomainSocket_client");
|
|
|
+
|
|
|
+ cluster.getDataNodes().get(0).getShortCircuitRegistry()
|
|
|
+ .registerSlot(blockId, slot1.getSlotId(), false);
|
|
|
+
|
|
|
+ Slot slot2 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
|
|
|
+ "testReleaseSlotReuseDomainSocket_client");
|
|
|
+
|
|
|
+ cluster.getDataNodes().get(0).getShortCircuitRegistry()
|
|
|
+ .registerSlot(blockId, slot2.getSlotId(), false);
|
|
|
+
|
|
|
+ cache.scheduleSlotReleaser(slot1);
|
|
|
+
|
|
|
+ Thread.sleep(2000);
|
|
|
+ cache.scheduleSlotReleaser(slot2);
|
|
|
+ Thread.sleep(2000);
|
|
|
+ Assert.assertEquals(0,
|
|
|
+ cluster.getDataNodes().get(0).getShortCircuitRegistry().getShmNum());
|
|
|
+ Assert.assertEquals(0, cache.getDfsClientShmManager().getShmNum());
|
|
|
+ } finally {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout = 60000)
|
|
|
+ public void testDNRestart() throws Exception {
|
|
|
+ TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
|
|
|
+ Configuration conf = createShortCircuitConf("testDNRestart", sockDir);
|
|
|
+ MiniDFSCluster cluster =
|
|
|
+ new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
|
|
+ try {
|
|
|
+ cluster.waitActive();
|
|
|
+ DistributedFileSystem fs = cluster.getFileSystem();
|
|
|
+ final ShortCircuitCache cache =
|
|
|
+ fs.getClient().getClientContext().getShortCircuitCache();
|
|
|
+ DomainPeer peer = getDomainPeerToDn(conf);
|
|
|
+ MutableBoolean usedPeer = new MutableBoolean(false);
|
|
|
+ ExtendedBlockId blockId = new ExtendedBlockId(123, "xyz");
|
|
|
+ final DatanodeInfo datanode = new DatanodeInfo.DatanodeInfoBuilder()
|
|
|
+ .setNodeID(cluster.getDataNodes().get(0).getDatanodeId()).build();
|
|
|
+ // Allocating the first shm slot requires using up a peer.
|
|
|
+ Slot slot1 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
|
|
|
+ "testReleaseSlotReuseDomainSocket_client");
|
|
|
+
|
|
|
+ cluster.getDataNodes().get(0).getShortCircuitRegistry()
|
|
|
+ .registerSlot(blockId, slot1.getSlotId(), false);
|
|
|
+
|
|
|
+ // restart the datanode to invalidate the cache
|
|
|
+ cluster.restartDataNode(0);
|
|
|
+ Thread.sleep(1000);
|
|
|
+ // after the restart, new allocation and release should not be affect
|
|
|
+ cache.scheduleSlotReleaser(slot1);
|
|
|
+
|
|
|
+ Slot slot2 = null;
|
|
|
+ try {
|
|
|
+ slot2 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
|
|
|
+ "testReleaseSlotReuseDomainSocket_client");
|
|
|
+ } catch (ClosedChannelException ce) {
|
|
|
+
|
|
|
+ }
|
|
|
+ cache.scheduleSlotReleaser(slot2);
|
|
|
+ Thread.sleep(2000);
|
|
|
+ Assert.assertEquals(0,
|
|
|
+ cluster.getDataNodes().get(0).getShortCircuitRegistry().getShmNum());
|
|
|
+ Assert.assertEquals(0, cache.getDfsClientShmManager().getShmNum());
|
|
|
+ } finally {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|