|
@@ -0,0 +1,100 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.hdfs;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.InputStream;
|
|
|
+import java.io.OutputStream;
|
|
|
+
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.io.IOUtils;
|
|
|
+
|
|
|
+import junit.framework.TestCase;
|
|
|
+
|
|
|
+
|
|
|
+/**
|
|
|
+ * These tests make sure that DFSClient retries fetching data from DFS
|
|
|
+ * properly in case of errors.
|
|
|
+ */
|
|
|
+public class TestDFSClientRetries extends TestCase {
|
|
|
+
|
|
|
+ // writes 'len' bytes of data to out.
|
|
|
+ private static void writeData(OutputStream out, int len) throws IOException {
|
|
|
+ byte [] buf = new byte[4096*16];
|
|
|
+ while(len > 0) {
|
|
|
+ int toWrite = Math.min(len, buf.length);
|
|
|
+ out.write(buf, 0, toWrite);
|
|
|
+ len -= toWrite;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This makes sure that when DN closes clients socket after client had
|
|
|
+ * successfully connected earlier, the data can still be fetched.
|
|
|
+ */
|
|
|
+ public void testWriteTimeoutAtDataNode() throws IOException,
|
|
|
+ InterruptedException {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+
|
|
|
+ final int writeTimeout = 100; //milliseconds.
|
|
|
+ // set a very short write timeout for datanode, so that tests runs fast.
|
|
|
+ conf.setInt("dfs.datanode.socket.write.timeout", writeTimeout);
|
|
|
+ // set a smaller block size
|
|
|
+ final int blockSize = 10*1024*1024;
|
|
|
+ conf.setInt("dfs.block.size", blockSize);
|
|
|
+ conf.setInt("dfs.client.max.block.acquire.failures", 1);
|
|
|
+ // set a small buffer size
|
|
|
+ final int bufferSize = 4096;
|
|
|
+ conf.setInt("io.file.buffer.size", bufferSize);
|
|
|
+
|
|
|
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
|
|
|
+
|
|
|
+ try {
|
|
|
+ cluster.waitActive();
|
|
|
+ FileSystem fs = cluster.getFileSystem();
|
|
|
+
|
|
|
+ Path filePath = new Path("/testWriteTimeoutAtDataNode");
|
|
|
+ OutputStream out = fs.create(filePath, true, bufferSize);
|
|
|
+
|
|
|
+ // write a 2 block file.
|
|
|
+ writeData(out, 2*blockSize);
|
|
|
+ out.close();
|
|
|
+
|
|
|
+ byte[] buf = new byte[1024*1024]; // enough to empty TCP buffers.
|
|
|
+
|
|
|
+ InputStream in = fs.open(filePath, bufferSize);
|
|
|
+
|
|
|
+ //first read a few bytes
|
|
|
+ IOUtils.readFully(in, buf, 0, bufferSize/2);
|
|
|
+ //now read few more chunks of data by sleeping in between :
|
|
|
+ for(int i=0; i<10; i++) {
|
|
|
+ Thread.sleep(2*writeTimeout); // force write timeout at the datanode.
|
|
|
+ // read enough to empty out socket buffers.
|
|
|
+ IOUtils.readFully(in, buf, 0, buf.length);
|
|
|
+ }
|
|
|
+ // successfully read with write timeout on datanodes.
|
|
|
+ in.close();
|
|
|
+ } finally {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // more tests related to different failure cases can be added here.
|
|
|
+}
|