|
@@ -18,13 +18,15 @@
|
|
|
package org.apache.hadoop.dfs;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
-import java.util.Random;
|
|
|
|
|
|
import org.apache.commons.logging.impl.Log4JLogger;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.security.UnixUserGroupInformation;
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.log4j.Level;
|
|
|
|
|
|
public class TestLeaseRecovery2 extends junit.framework.TestCase {
|
|
@@ -34,36 +36,21 @@ public class TestLeaseRecovery2 extends junit.framework.TestCase {
|
|
|
((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
|
|
|
}
|
|
|
|
|
|
- static final int BLOCK_SIZE = 64;
|
|
|
- static final int FILE_SIZE = 1024;
|
|
|
+ static final long BLOCK_SIZE = 1024;
|
|
|
+ static final int FILE_SIZE = 1024*16;
|
|
|
static final short REPLICATION_NUM = (short)3;
|
|
|
- static final Random RANDOM = new Random();
|
|
|
static byte[] buffer = new byte[FILE_SIZE];
|
|
|
|
|
|
- static void checkMetaInfo(Block b, InterDatanodeProtocol idp
|
|
|
- ) throws IOException {
|
|
|
- TestInterDatanodeProtocol.checkMetaInfo(b, idp, null);
|
|
|
- }
|
|
|
-
|
|
|
- static int min(Integer... x) {
|
|
|
- int m = x[0];
|
|
|
- for(int i = 1; i < x.length; i++) {
|
|
|
- if (x[i] < m) {
|
|
|
- m = x[i];
|
|
|
- }
|
|
|
- }
|
|
|
- return m;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- */
|
|
|
public void testBlockSynchronization() throws Exception {
|
|
|
final long softLease = 1000;
|
|
|
final long hardLease = 60 * 60 *1000;
|
|
|
final short repl = 3;
|
|
|
- Configuration conf = new Configuration();
|
|
|
+ final Configuration conf = new Configuration();
|
|
|
+ final int bufferSize = conf.getInt("io.file.buffer.size", 4096);
|
|
|
conf.setLong("dfs.block.size", BLOCK_SIZE);
|
|
|
- conf.setInt("io.bytes.per.checksum", 16);
|
|
|
+ conf.setInt("dfs.heartbeat.interval", 1);
|
|
|
+ // conf.setInt("io.bytes.per.checksum", 16);
|
|
|
+
|
|
|
MiniDFSCluster cluster = null;
|
|
|
byte[] actual = new byte[FILE_SIZE];
|
|
|
|
|
@@ -74,19 +61,23 @@ public class TestLeaseRecovery2 extends junit.framework.TestCase {
|
|
|
//create a file
|
|
|
DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
|
|
|
// create a random file name
|
|
|
- String filestr = "/foo" + RANDOM.nextInt();
|
|
|
+ String filestr = "/foo" + AppendTestUtil.nextInt();
|
|
|
+ System.out.println("filestr=" + filestr);
|
|
|
Path filepath = new Path(filestr);
|
|
|
FSDataOutputStream stm = dfs.create(filepath, true,
|
|
|
- dfs.getConf().getInt("io.file.buffer.size", 4096),
|
|
|
- (short)repl, (long)BLOCK_SIZE);
|
|
|
+ bufferSize, repl, BLOCK_SIZE);
|
|
|
assertTrue(dfs.dfs.exists(filestr));
|
|
|
|
|
|
// write random number of bytes into it.
|
|
|
- int size = RANDOM.nextInt(FILE_SIZE);
|
|
|
+ int size = AppendTestUtil.nextInt(FILE_SIZE);
|
|
|
+ System.out.println("size=" + size);
|
|
|
stm.write(buffer, 0, size);
|
|
|
|
|
|
// sync file
|
|
|
+ AppendTestUtil.LOG.info("sync");
|
|
|
stm.sync();
|
|
|
+ AppendTestUtil.LOG.info("leasechecker.interrupt()");
|
|
|
+ dfs.dfs.leaseChecker.interrupt();
|
|
|
|
|
|
// set the soft limit to be 1 second so that the
|
|
|
// namenode triggers lease recovery on next attempt to write-for-open.
|
|
@@ -94,30 +85,44 @@ public class TestLeaseRecovery2 extends junit.framework.TestCase {
|
|
|
|
|
|
// try to re-open the file before closing the previous handle. This
|
|
|
// should fail but will trigger lease recovery.
|
|
|
- String oldClientName = dfs.dfs.clientName;
|
|
|
- dfs.dfs.clientName += "_1";
|
|
|
- while (true) {
|
|
|
- try {
|
|
|
- FSDataOutputStream newstm = dfs.create(filepath, false,
|
|
|
- dfs.getConf().getInt("io.file.buffer.size", 4096),
|
|
|
- (short)repl, (long)BLOCK_SIZE);
|
|
|
- assertTrue("Creation of an existing file should never succeed.", false);
|
|
|
- } catch (IOException e) {
|
|
|
- if (e.getMessage().contains("file exists")) {
|
|
|
- break;
|
|
|
+ {
|
|
|
+ Configuration conf2 = new Configuration(conf);
|
|
|
+ String username = UserGroupInformation.getCurrentUGI().getUserName()+"_1";
|
|
|
+ UnixUserGroupInformation.saveToConf(conf2,
|
|
|
+ UnixUserGroupInformation.UGI_PROPERTY_NAME,
|
|
|
+ new UnixUserGroupInformation(username, new String[]{"supergroup"}));
|
|
|
+ FileSystem dfs2 = FileSystem.get(conf2);
|
|
|
+
|
|
|
+ boolean done = false;
|
|
|
+ for(int i = 0; i < 10 && !done; i++) {
|
|
|
+ AppendTestUtil.LOG.info("i=" + i);
|
|
|
+ try {
|
|
|
+ dfs2.create(filepath, false, bufferSize, repl, BLOCK_SIZE);
|
|
|
+ fail("Creation of an existing file should never succeed.");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ final String message = ioe.getMessage();
|
|
|
+ if (message.contains("file exists")) {
|
|
|
+ AppendTestUtil.LOG.info("done", ioe);
|
|
|
+ done = true;
|
|
|
+ }
|
|
|
+ else if (message.contains(AlreadyBeingCreatedException.class.getSimpleName())) {
|
|
|
+ AppendTestUtil.LOG.info("GOOD! got " + message);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!done) {
|
|
|
+ AppendTestUtil.LOG.info("sleep " + 5000 + "ms");
|
|
|
+ try {Thread.sleep(5000);} catch (InterruptedException e) {}
|
|
|
}
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
- try {
|
|
|
- Thread.sleep(1000);
|
|
|
- } catch (InterruptedException e) {
|
|
|
}
|
|
|
+ assertTrue(done);
|
|
|
}
|
|
|
- System.out.println("Lease for file " + filepath + " is recovered. " +
|
|
|
- "validating its contents now...");
|
|
|
|
|
|
- // revert back client identity
|
|
|
- dfs.dfs.clientName = oldClientName;
|
|
|
+ AppendTestUtil.LOG.info("Lease for file " + filepath + " is recovered. "
|
|
|
+ + "Validating its contents now...");
|
|
|
|
|
|
// verify that file-size matches
|
|
|
assertTrue("File should be " + size + " bytes, but is actually " +
|