|
@@ -0,0 +1,290 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+package org.apache.hadoop.hdfs;
|
|
|
+
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
|
+import static org.junit.Assert.assertFalse;
|
|
|
+import static org.junit.Assert.assertNotEquals;
|
|
|
+import static org.junit.Assert.assertNotNull;
|
|
|
+import static org.junit.Assert.assertTrue;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.net.InetSocketAddress;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Collection;
|
|
|
+import java.util.List;
|
|
|
+import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
|
|
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
|
|
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
|
|
+import org.apache.hadoop.util.Time;
|
|
|
+import org.junit.After;
|
|
|
+import org.junit.Assert;
|
|
|
+import org.junit.Before;
|
|
|
+import org.junit.Test;
|
|
|
+import org.junit.runner.RunWith;
|
|
|
+import org.junit.runners.Parameterized;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Test the caches expiration of the block locations.
|
|
|
+ */
|
|
|
+@RunWith(Parameterized.class)
|
|
|
+public class TestDFSInputStreamBlockLocations {
|
|
|
+ private static final int BLOCK_SIZE = 1024 * 1024;
|
|
|
+ private static final String[] RACKS = new String[] {
|
|
|
+ "/d1/r1", "/d1/r1", "/d1/r2", "/d1/r2", "/d1/r2", "/d2/r3", "/d2/r3" };
|
|
|
+ private static final int NUM_DATA_NODES = RACKS.length;
|
|
|
+ private static final short REPLICATION_FACTOR = (short) 4;
|
|
|
+ private final int staleInterval = 8000;
|
|
|
+ private final int numOfBlocks = 24;
|
|
|
+ private final int fileLength = numOfBlocks * BLOCK_SIZE;
|
|
|
+ private final int dfsClientPrefetchSize = fileLength / 2;
|
|
|
+ // locatedBlocks expiration set to 1 hour
|
|
|
+ private final long dfsInputLocationsTimeout = 60 * 60 * 1000L;
|
|
|
+
|
|
|
+ private HdfsConfiguration conf;
|
|
|
+ private MiniDFSCluster dfsCluster;
|
|
|
+ private DFSClient dfsClient;
|
|
|
+ private DistributedFileSystem fs;
|
|
|
+ private Path filePath;
|
|
|
+ private boolean enableBlkExpiration;
|
|
|
+
|
|
|
+ @Parameterized.Parameters(name = "{index}: CacheExpirationConfig(Enable {0})")
|
|
|
+ public static Collection<Object[]> getTestParameters() {
|
|
|
+ return Arrays.asList(new Object[][] {
|
|
|
+ {Boolean.TRUE},
|
|
|
+ {Boolean.FALSE}
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ public TestDFSInputStreamBlockLocations(Boolean enableExpiration) {
|
|
|
+ enableBlkExpiration = enableExpiration;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Before
|
|
|
+ public void setup() throws IOException {
|
|
|
+ conf = new HdfsConfiguration();
|
|
|
+ conf.setBoolean(
|
|
|
+ DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
|
|
|
+ // set the heartbeat intervals and stale considerations
|
|
|
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
|
|
|
+ staleInterval);
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
|
|
|
+ staleInterval / 2);
|
|
|
+ // disable shortcircuit reading
|
|
|
+ conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
|
|
|
+ // set replication factor
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, REPLICATION_FACTOR);
|
|
|
+ // set block size and other sizes
|
|
|
+ conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
|
|
+ conf.setLong(HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY,
|
|
|
+ dfsClientPrefetchSize);
|
|
|
+ if (enableBlkExpiration) {
|
|
|
+ // set the refresh locations for every dfsInputLocationsTimeout
|
|
|
+ conf.setLong(
|
|
|
+ HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY,
|
|
|
+ dfsInputLocationsTimeout);
|
|
|
+ }
|
|
|
+ // start the cluster and create a DFSClient
|
|
|
+ dfsCluster = new MiniDFSCluster.Builder(conf)
|
|
|
+ .numDataNodes(NUM_DATA_NODES).racks(RACKS).build();
|
|
|
+ dfsCluster.waitActive();
|
|
|
+ assertEquals(NUM_DATA_NODES, dfsCluster.getDataNodes().size());
|
|
|
+ InetSocketAddress addr = new InetSocketAddress("localhost",
|
|
|
+ dfsCluster.getNameNodePort());
|
|
|
+ dfsClient = new DFSClient(addr, conf);
|
|
|
+ fs = dfsCluster.getFileSystem();
|
|
|
+ }
|
|
|
+
|
|
|
+ @After
|
|
|
+ public void teardown() throws IOException {
|
|
|
+ if (dfsClient != null) {
|
|
|
+ dfsClient.close();
|
|
|
+ dfsClient = null;
|
|
|
+ }
|
|
|
+ if (fs != null) {
|
|
|
+ fs.deleteOnExit(filePath);
|
|
|
+ fs.close();
|
|
|
+ fs = null;
|
|
|
+ }
|
|
|
+ if (dfsCluster != null) {
|
|
|
+ dfsCluster.shutdown();
|
|
|
+ dfsCluster = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testRead() throws Exception {
|
|
|
+ final String fileName = "/test_cache_locations";
|
|
|
+ filePath = new Path(fileName);
|
|
|
+ DFSInputStream fin = null;
|
|
|
+ FSDataOutputStream fout = null;
|
|
|
+ try {
|
|
|
+ // create a file and write for testing
|
|
|
+ fout = fs.create(filePath, REPLICATION_FACTOR);
|
|
|
+ fout.write(new byte[(fileLength)]);
|
|
|
+ // finalize the file by closing the output stream
|
|
|
+ fout.close();
|
|
|
+ fout = null;
|
|
|
+ // get the located blocks
|
|
|
+ LocatedBlocks referenceLocatedBlocks =
|
|
|
+ dfsClient.getLocatedBlocks(fileName, 0, fileLength);
|
|
|
+ assertEquals(numOfBlocks, referenceLocatedBlocks.locatedBlockCount());
|
|
|
+ String poolId = dfsCluster.getNamesystem().getBlockPoolId();
|
|
|
+ fin = dfsClient.open(fileName);
|
|
|
+ // get the located blocks from fin
|
|
|
+ LocatedBlocks finLocatedBlocks = fin.locatedBlocks;
|
|
|
+ assertEquals(dfsClientPrefetchSize / BLOCK_SIZE,
|
|
|
+ finLocatedBlocks.locatedBlockCount());
|
|
|
+ final int chunkReadSize = BLOCK_SIZE / 4;
|
|
|
+ byte[] readBuffer = new byte[chunkReadSize];
|
|
|
+ // read the first block
|
|
|
+ DatanodeInfo prevDNInfo = null;
|
|
|
+ DatanodeInfo currDNInfo = null;
|
|
|
+ int bytesRead = 0;
|
|
|
+ int firstBlockMark = BLOCK_SIZE;
|
|
|
+ // get the second block locations
|
|
|
+ LocatedBlock firstLocatedBlk =
|
|
|
+ fin.locatedBlocks.getLocatedBlocks().get(0);
|
|
|
+ DatanodeInfo[] firstBlkDNInfos = firstLocatedBlk.getLocations();
|
|
|
+ while (fin.getPos() < firstBlockMark) {
|
|
|
+ bytesRead = fin.read(readBuffer);
|
|
|
+ Assert.assertTrue("Unexpected number of read bytes",
|
|
|
+ chunkReadSize >= bytesRead);
|
|
|
+ if (currDNInfo == null) {
|
|
|
+ currDNInfo = fin.getCurrentDatanode();
|
|
|
+ assertNotNull("current FIS datanode is null", currDNInfo);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ prevDNInfo = currDNInfo;
|
|
|
+ currDNInfo = fin.getCurrentDatanode();
|
|
|
+ assertEquals("the DFSInput stream does not read from same node",
|
|
|
+ prevDNInfo, currDNInfo);
|
|
|
+ }
|
|
|
+
|
|
|
+ assertEquals("InputStream exceeds expected position",
|
|
|
+ firstBlockMark, fin.getPos());
|
|
|
+ // get the second block locations
|
|
|
+ LocatedBlock secondLocatedBlk =
|
|
|
+ fin.locatedBlocks.getLocatedBlocks().get(1);
|
|
|
+ // get the nodeinfo for that block
|
|
|
+ DatanodeInfo[] secondBlkDNInfos = secondLocatedBlk.getLocations();
|
|
|
+ DatanodeInfo deadNodeInfo = secondBlkDNInfos[0];
|
|
|
+ // stop the datanode in the list of the
|
|
|
+ DataNode deadNode = getdataNodeFromHostName(dfsCluster,
|
|
|
+ deadNodeInfo.getHostName());
|
|
|
+ // Shutdown and wait for datanode to be marked dead
|
|
|
+ DatanodeRegistration reg = InternalDataNodeTestUtils.
|
|
|
+ getDNRegistrationForBP(dfsCluster.getDataNodes().get(0), poolId);
|
|
|
+ DataNodeProperties stoppedDNProps =
|
|
|
+ dfsCluster.stopDataNode(deadNodeInfo.getName());
|
|
|
+
|
|
|
+ List<DataNode> datanodesPostStoppage = dfsCluster.getDataNodes();
|
|
|
+ assertEquals(NUM_DATA_NODES - 1, datanodesPostStoppage.size());
|
|
|
+ // get the located blocks
|
|
|
+ LocatedBlocks afterStoppageLocatedBlocks =
|
|
|
+ dfsClient.getLocatedBlocks(fileName, 0, fileLength);
|
|
|
+ // read second block
|
|
|
+ int secondBlockMark = (int) (1.5 * BLOCK_SIZE);
|
|
|
+ boolean firstIteration = true;
|
|
|
+ if (this.enableBlkExpiration) {
|
|
|
+ // set the time stamps to make sure that we do not refresh locations yet
|
|
|
+ fin.setReadTimeStampsForTesting(Time.monotonicNow());
|
|
|
+ }
|
|
|
+ while (fin.getPos() < secondBlockMark) {
|
|
|
+ bytesRead = fin.read(readBuffer);
|
|
|
+ assertTrue("dead node used to read at position: " + fin.getPos(),
|
|
|
+ fin.deadNodesContain(deadNodeInfo));
|
|
|
+ Assert.assertTrue("Unexpected number of read bytes",
|
|
|
+ chunkReadSize >= bytesRead);
|
|
|
+ prevDNInfo = currDNInfo;
|
|
|
+ currDNInfo = fin.getCurrentDatanode();
|
|
|
+ assertNotEquals(deadNodeInfo, currDNInfo);
|
|
|
+ if (firstIteration) {
|
|
|
+ // currDNInfo has to be different unless first block locs is different
|
|
|
+ assertFalse("FSInputStream should pick a different DN",
|
|
|
+ firstBlkDNInfos[0].equals(deadNodeInfo)
|
|
|
+ && prevDNInfo.equals(currDNInfo));
|
|
|
+ firstIteration = false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ assertEquals("InputStream exceeds expected position",
|
|
|
+ secondBlockMark, fin.getPos());
|
|
|
+ // restart the dead node with the same port
|
|
|
+ assertTrue(dfsCluster.restartDataNode(stoppedDNProps, true));
|
|
|
+ dfsCluster.waitActive();
|
|
|
+ List<DataNode> datanodesPostRestart = dfsCluster.getDataNodes();
|
|
|
+ assertEquals(NUM_DATA_NODES, datanodesPostRestart.size());
|
|
|
+ // continue reading from block 2 again. We should read from deadNode
|
|
|
+ int thirdBlockMark = 2 * BLOCK_SIZE;
|
|
|
+ firstIteration = true;
|
|
|
+ while (fin.getPos() < thirdBlockMark) {
|
|
|
+ bytesRead = fin.read(readBuffer);
|
|
|
+ if (this.enableBlkExpiration) {
|
|
|
+ assertEquals("node is removed from deadNodes after 1st iteration",
|
|
|
+ firstIteration, fin.deadNodesContain(deadNodeInfo));
|
|
|
+ } else {
|
|
|
+ assertTrue(fin.deadNodesContain(deadNodeInfo));
|
|
|
+ }
|
|
|
+ Assert.assertTrue("Unexpected number of read bytes",
|
|
|
+ chunkReadSize >= bytesRead);
|
|
|
+ prevDNInfo = currDNInfo;
|
|
|
+ currDNInfo = fin.getCurrentDatanode();
|
|
|
+ if (!this.enableBlkExpiration) {
|
|
|
+ assertNotEquals(deadNodeInfo, currDNInfo);
|
|
|
+ }
|
|
|
+ if (firstIteration) {
|
|
|
+ assertEquals(prevDNInfo, currDNInfo);
|
|
|
+ firstIteration = false;
|
|
|
+ if (this.enableBlkExpiration) {
|
|
|
+ // reset the time stamps of located blocks to force cache expiration
|
|
|
+ fin.setReadTimeStampsForTesting(
|
|
|
+ Time.monotonicNow() - (dfsInputLocationsTimeout + 1));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ assertEquals("InputStream exceeds expected position",
|
|
|
+ thirdBlockMark, fin.getPos());
|
|
|
+ } finally {
|
|
|
+ if (fout != null) {
|
|
|
+ fout.close();
|
|
|
+ }
|
|
|
+ if (fin != null) {
|
|
|
+ fin.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private DataNode getdataNodeFromHostName(MiniDFSCluster cluster,
|
|
|
+ String hostName) {
|
|
|
+ for (DataNode dn : cluster.getDataNodes()) {
|
|
|
+ if (dn.getDatanodeId().getHostName().equals(hostName)) {
|
|
|
+ return dn;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+}
|