|
@@ -17,23 +17,24 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.hdfs;
|
|
|
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.InputStream;
|
|
|
+import java.io.OutputStream;
|
|
|
+
|
|
|
import org.apache.commons.logging.impl.Log4JLogger;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
-import org.apache.hadoop.hdfs.protocol.FSConstants;
|
|
|
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
|
+import org.apache.hadoop.ipc.RemoteException;
|
|
|
import org.apache.hadoop.security.UnixUserGroupInformation;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.log4j.Level;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
-import java.io.IOException;
|
|
|
-import java.io.InputStream;
|
|
|
-import java.io.OutputStream;
|
|
|
-
|
|
|
/** Test reading from hdfs while a file is being written. */
|
|
|
public class TestReadWhileWriting {
|
|
|
{
|
|
@@ -44,6 +45,7 @@ public class TestReadWhileWriting {
|
|
|
private static final String DIR = "/"
|
|
|
+ TestReadWhileWriting.class.getSimpleName() + "/";
|
|
|
private static final int BLOCK_SIZE = 8192;
|
|
|
+ private static final long LEASE_LIMIT = 500;
|
|
|
|
|
|
/** Test reading while writing. */
|
|
|
@Test
|
|
@@ -51,13 +53,13 @@ public class TestReadWhileWriting {
|
|
|
final Configuration conf = new HdfsConfiguration();
|
|
|
//enable append
|
|
|
conf.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
|
|
|
+ conf.setLong("dfs.heartbeat.interval", 1);
|
|
|
|
|
|
// create cluster
|
|
|
final MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
|
|
|
try {
|
|
|
- //change the lease soft limit to 1 second.
|
|
|
- final long leaseSoftLimit = 1000;
|
|
|
- cluster.setLeasePeriod(leaseSoftLimit, FSConstants.LEASE_HARDLIMIT_PERIOD);
|
|
|
+ //change the lease limits.
|
|
|
+ cluster.setLeasePeriod(LEASE_LIMIT, LEASE_LIMIT);
|
|
|
|
|
|
//wait for the cluster
|
|
|
cluster.waitActive();
|
|
@@ -82,25 +84,41 @@ public class TestReadWhileWriting {
|
|
|
// of data can be read successfully.
|
|
|
checkFile(p, half, conf);
|
|
|
|
|
|
- /* TODO: enable the following when append is done.
|
|
|
//c. On M1, append another half block of data. Close file on M1.
|
|
|
{
|
|
|
- //sleep to make sure the lease is expired the soft limit.
|
|
|
- Thread.sleep(2*leaseSoftLimit);
|
|
|
-
|
|
|
- FSDataOutputStream out = fs.append(p);
|
|
|
+ //sleep to let the lease is expired.
|
|
|
+ Thread.sleep(2*LEASE_LIMIT);
|
|
|
+
|
|
|
+ final DistributedFileSystem dfs = (DistributedFileSystem)FileSystem.newInstance(conf);
|
|
|
+ final FSDataOutputStream out = append(dfs, p);
|
|
|
write(out, 0, half);
|
|
|
out.close();
|
|
|
}
|
|
|
|
|
|
//d. On M2, open file and read 1 block of data from it. Close file.
|
|
|
checkFile(p, 2*half, conf);
|
|
|
- */
|
|
|
} finally {
|
|
|
cluster.shutdown();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /** Try openning a file for append. */
|
|
|
+ private static FSDataOutputStream append(FileSystem fs, Path p) throws Exception {
|
|
|
+ for(int i = 0; i < 10; i++) {
|
|
|
+ try {
|
|
|
+ return fs.append(p);
|
|
|
+ } catch(RemoteException re) {
|
|
|
+ if (re.getClassName().equals(RecoveryInProgressException.class.getName())) {
|
|
|
+ AppendTestUtil.LOG.info("Will sleep and retry, i=" + i +", p="+p, re);
|
|
|
+ Thread.sleep(1000);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ throw re;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ throw new IOException("Cannot append to " + p);
|
|
|
+ }
|
|
|
+
|
|
|
static private int userCount = 0;
|
|
|
//check the file
|
|
|
static void checkFile(Path p, int expectedsize, Configuration conf
|