|
@@ -75,7 +75,7 @@ public class TestLeaseRecovery2 extends junit.framework.TestCase {
|
|
|
//create a file
|
|
|
DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
|
|
|
int size = AppendTestUtil.nextInt(FILE_SIZE);
|
|
|
- Path filepath = createFile(dfs, size);
|
|
|
+ Path filepath = createFile(dfs, size, true);
|
|
|
|
|
|
// set the soft limit to be 1 second so that the
|
|
|
// namenode triggers lease recovery on next attempt to write-for-open.
|
|
@@ -85,60 +85,61 @@ public class TestLeaseRecovery2 extends junit.framework.TestCase {
|
|
|
verifyFile(dfs, filepath, actual, size);
|
|
|
|
|
|
//test recoverLease
|
|
|
+ // set the soft limit to be 1 hour but recoverLease should
|
|
|
+ // close the file immediately
|
|
|
+ cluster.setLeasePeriod(hardLease, hardLease);
|
|
|
size = AppendTestUtil.nextInt(FILE_SIZE);
|
|
|
- filepath = createFile(dfs, size);
|
|
|
+ filepath = createFile(dfs, size, false);
|
|
|
|
|
|
- // set the soft limit to be 1 second so that the
|
|
|
- // namenode triggers lease recovery on next attempt to write-for-open.
|
|
|
- cluster.setLeasePeriod(softLease, hardLease);
|
|
|
+ // test recoverLese from a different client
|
|
|
+ recoverLease(filepath, null);
|
|
|
+ verifyFile(dfs, filepath, actual, size);
|
|
|
|
|
|
- recoverLease(filepath);
|
|
|
+ // test recoverlease from the same client
|
|
|
+ size = AppendTestUtil.nextInt(FILE_SIZE);
|
|
|
+ filepath = createFile(dfs, size, false);
|
|
|
+
|
|
|
+ // create another file using the same client
|
|
|
+ Path filepath1 = new Path("/foo" + AppendTestUtil.nextInt());
|
|
|
+ FSDataOutputStream stm = dfs.create(filepath1, true,
|
|
|
+ bufferSize, REPLICATION_NUM, BLOCK_SIZE);
|
|
|
+
|
|
|
+ // recover the first file
|
|
|
+ recoverLease(filepath, dfs);
|
|
|
verifyFile(dfs, filepath, actual, size);
|
|
|
|
|
|
+ // continue to write to the second file
|
|
|
+ stm.write(buffer, 0, size);
|
|
|
+ stm.close();
|
|
|
+ verifyFile(dfs, filepath1, actual, size);
|
|
|
}
|
|
|
finally {
|
|
|
try {
|
|
|
- if (cluster != null) {cluster.shutdown();}
|
|
|
+ if (cluster != null) {cluster.getFileSystem().close();cluster.shutdown();}
|
|
|
} catch (Exception e) {
|
|
|
// ignore
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void recoverLease(Path filepath) throws IOException, InterruptedException {
|
|
|
- UserGroupInformation ugi =
|
|
|
- UserGroupInformation.createUserForTesting(fakeUsername,
|
|
|
- new String [] { fakeGroup});
|
|
|
-
|
|
|
- DistributedFileSystem dfs2 = (DistributedFileSystem)
|
|
|
- DFSTestUtil.getFileSystemAs(ugi, conf);
|
|
|
-
|
|
|
- boolean done = false;
|
|
|
- while (!done) {
|
|
|
- try {
|
|
|
- dfs2.recoverLease(filepath);
|
|
|
- done = true;
|
|
|
- } catch (IOException ioe) {
|
|
|
- final String message = ioe.getMessage();
|
|
|
- if (message.contains(AlreadyBeingCreatedException.class.getSimpleName())) {
|
|
|
- AppendTestUtil.LOG.info("GOOD! got " + message);
|
|
|
- }
|
|
|
- else {
|
|
|
- AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (!done) {
|
|
|
- AppendTestUtil.LOG.info("sleep " + 1000 + "ms");
|
|
|
- try {Thread.sleep(5000);} catch (InterruptedException e) {}
|
|
|
- }
|
|
|
+ private void recoverLease(Path filepath, DistributedFileSystem dfs2) throws Exception {
|
|
|
+ if (dfs2==null) {
|
|
|
+ UserGroupInformation ugi =
|
|
|
+ UserGroupInformation.createUserForTesting(fakeUsername,
|
|
|
+ new String [] { fakeGroup});
|
|
|
+ dfs2 = (DistributedFileSystem) DFSTestUtil.getFileSystemAs(ugi, conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ while (!dfs2.recoverLease(filepath)) {
|
|
|
+ AppendTestUtil.LOG.info("sleep " + 5000 + "ms");
|
|
|
+ Thread.sleep(5000);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// try to re-open the file before closing the previous handle. This
|
|
|
// should fail but will trigger lease recovery.
|
|
|
- private Path createFile(DistributedFileSystem dfs, int size)
|
|
|
- throws IOException, InterruptedException {
|
|
|
+ private Path createFile(DistributedFileSystem dfs, int size,
|
|
|
+ boolean triggerSoftLease) throws IOException, InterruptedException {
|
|
|
// create a random file name
|
|
|
String filestr = "/foo" + AppendTestUtil.nextInt();
|
|
|
System.out.println("filestr=" + filestr);
|
|
@@ -154,8 +155,10 @@ public class TestLeaseRecovery2 extends junit.framework.TestCase {
|
|
|
// sync file
|
|
|
AppendTestUtil.LOG.info("sync");
|
|
|
stm.sync();
|
|
|
- AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
|
|
|
- dfs.dfs.leasechecker.interruptAndJoin();
|
|
|
+ if (triggerSoftLease) {
|
|
|
+ AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
|
|
|
+ dfs.dfs.leasechecker.interruptAndJoin();
|
|
|
+ }
|
|
|
return filepath;
|
|
|
}
|
|
|
|