|
@@ -44,7 +44,9 @@ public class TestLeaseRecovery2 extends junit.framework.TestCase {
|
|
static final long BLOCK_SIZE = 1024;
|
|
static final long BLOCK_SIZE = 1024;
|
|
static final int FILE_SIZE = 1024*16;
|
|
static final int FILE_SIZE = 1024*16;
|
|
static final short REPLICATION_NUM = (short)3;
|
|
static final short REPLICATION_NUM = (short)3;
|
|
- static byte[] buffer = new byte[FILE_SIZE];
|
|
|
|
|
|
+ private static byte[] buffer = new byte[FILE_SIZE];
|
|
|
|
+ private final Configuration conf = new Configuration();
|
|
|
|
+ private final int bufferSize = conf.getInt("io.file.buffer.size", 4096);
|
|
|
|
|
|
static private String fakeUsername = "fakeUser1";
|
|
static private String fakeUsername = "fakeUser1";
|
|
static private String fakeGroup = "supergroup";
|
|
static private String fakeGroup = "supergroup";
|
|
@@ -52,9 +54,6 @@ public class TestLeaseRecovery2 extends junit.framework.TestCase {
|
|
public void testBlockSynchronization() throws Exception {
|
|
public void testBlockSynchronization() throws Exception {
|
|
final long softLease = 1000;
|
|
final long softLease = 1000;
|
|
final long hardLease = 60 * 60 *1000;
|
|
final long hardLease = 60 * 60 *1000;
|
|
- final short repl = 3;
|
|
|
|
- final Configuration conf = new Configuration();
|
|
|
|
- final int bufferSize = conf.getInt("io.file.buffer.size", 4096);
|
|
|
|
conf.setLong("dfs.block.size", BLOCK_SIZE);
|
|
conf.setLong("dfs.block.size", BLOCK_SIZE);
|
|
conf.setInt("dfs.heartbeat.interval", 1);
|
|
conf.setInt("dfs.heartbeat.interval", 1);
|
|
// conf.setInt("io.bytes.per.checksum", 16);
|
|
// conf.setInt("io.bytes.per.checksum", 16);
|
|
@@ -75,80 +74,27 @@ public class TestLeaseRecovery2 extends junit.framework.TestCase {
|
|
|
|
|
|
//create a file
|
|
//create a file
|
|
DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
|
|
DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
|
|
- // create a random file name
|
|
|
|
- String filestr = "/foo" + AppendTestUtil.nextInt();
|
|
|
|
- System.out.println("filestr=" + filestr);
|
|
|
|
- Path filepath = new Path(filestr);
|
|
|
|
- FSDataOutputStream stm = dfs.create(filepath, true,
|
|
|
|
- bufferSize, repl, BLOCK_SIZE);
|
|
|
|
- assertTrue(dfs.dfs.exists(filestr));
|
|
|
|
-
|
|
|
|
- // write random number of bytes into it.
|
|
|
|
int size = AppendTestUtil.nextInt(FILE_SIZE);
|
|
int size = AppendTestUtil.nextInt(FILE_SIZE);
|
|
- System.out.println("size=" + size);
|
|
|
|
- stm.write(buffer, 0, size);
|
|
|
|
-
|
|
|
|
- // sync file
|
|
|
|
- AppendTestUtil.LOG.info("sync");
|
|
|
|
- stm.sync();
|
|
|
|
- AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
|
|
|
|
- dfs.dfs.leasechecker.interruptAndJoin();
|
|
|
|
|
|
+ Path filepath = createFile(dfs, size);
|
|
|
|
|
|
// set the soft limit to be 1 second so that the
|
|
// set the soft limit to be 1 second so that the
|
|
// namenode triggers lease recovery on next attempt to write-for-open.
|
|
// namenode triggers lease recovery on next attempt to write-for-open.
|
|
cluster.setLeasePeriod(softLease, hardLease);
|
|
cluster.setLeasePeriod(softLease, hardLease);
|
|
|
|
|
|
- // try to re-open the file before closing the previous handle. This
|
|
|
|
- // should fail but will trigger lease recovery.
|
|
|
|
- {
|
|
|
|
- UserGroupInformation ugi =
|
|
|
|
- UserGroupInformation.createUserForTesting(fakeUsername,
|
|
|
|
- new String [] { fakeGroup});
|
|
|
|
-
|
|
|
|
- FileSystem dfs2 = DFSTestUtil.getFileSystemAs(ugi, conf);
|
|
|
|
-
|
|
|
|
- boolean done = false;
|
|
|
|
- for(int i = 0; i < 10 && !done; i++) {
|
|
|
|
- AppendTestUtil.LOG.info("i=" + i);
|
|
|
|
- try {
|
|
|
|
- dfs2.create(filepath, false, bufferSize, repl, BLOCK_SIZE);
|
|
|
|
- fail("Creation of an existing file should never succeed.");
|
|
|
|
- } catch (IOException ioe) {
|
|
|
|
- final String message = ioe.getMessage();
|
|
|
|
- if (message.contains("file exists")) {
|
|
|
|
- AppendTestUtil.LOG.info("done", ioe);
|
|
|
|
- done = true;
|
|
|
|
- }
|
|
|
|
- else if (message.contains(AlreadyBeingCreatedException.class.getSimpleName())) {
|
|
|
|
- AppendTestUtil.LOG.info("GOOD! got " + message);
|
|
|
|
- }
|
|
|
|
- else {
|
|
|
|
- AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (!done) {
|
|
|
|
- AppendTestUtil.LOG.info("sleep " + 5000 + "ms");
|
|
|
|
- try {Thread.sleep(5000);} catch (InterruptedException e) {}
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- assertTrue(done);
|
|
|
|
- }
|
|
|
|
|
|
+ recoverLeaseUsingCreate(filepath);
|
|
|
|
+ verifyFile(dfs, filepath, actual, size);
|
|
|
|
|
|
- AppendTestUtil.LOG.info("Lease for file " + filepath + " is recovered. "
|
|
|
|
- + "Validating its contents now...");
|
|
|
|
|
|
+ //test recoverLease
|
|
|
|
+ size = AppendTestUtil.nextInt(FILE_SIZE);
|
|
|
|
+ filepath = createFile(dfs, size);
|
|
|
|
+
|
|
|
|
+ // set the soft limit to be 1 second so that the
|
|
|
|
+ // namenode triggers lease recovery on next attempt to write-for-open.
|
|
|
|
+ cluster.setLeasePeriod(softLease, hardLease);
|
|
|
|
|
|
- // verify that file-size matches
|
|
|
|
- assertTrue("File should be " + size + " bytes, but is actually " +
|
|
|
|
- " found to be " + dfs.getFileStatus(filepath).getLen() +
|
|
|
|
- " bytes",
|
|
|
|
- dfs.getFileStatus(filepath).getLen() == size);
|
|
|
|
|
|
+ recoverLease(filepath);
|
|
|
|
+ verifyFile(dfs, filepath, actual, size);
|
|
|
|
|
|
- // verify that there is enough data to read.
|
|
|
|
- System.out.println("File size is good. Now validating sizes from datanodes...");
|
|
|
|
- FSDataInputStream stmin = dfs.open(filepath);
|
|
|
|
- stmin.readFully(0, actual, 0, size);
|
|
|
|
- stmin.close();
|
|
|
|
}
|
|
}
|
|
finally {
|
|
finally {
|
|
try {
|
|
try {
|
|
@@ -158,4 +104,112 @@ public class TestLeaseRecovery2 extends junit.framework.TestCase {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ private void recoverLease(Path filepath) throws IOException, InterruptedException {
|
|
|
|
+ UserGroupInformation ugi =
|
|
|
|
+ UserGroupInformation.createUserForTesting(fakeUsername,
|
|
|
|
+ new String [] { fakeGroup});
|
|
|
|
+
|
|
|
|
+ DistributedFileSystem dfs2 = (DistributedFileSystem)
|
|
|
|
+ DFSTestUtil.getFileSystemAs(ugi, conf);
|
|
|
|
+
|
|
|
|
+ boolean done = false;
|
|
|
|
+ while (!done) {
|
|
|
|
+ try {
|
|
|
|
+ dfs2.recoverLease(filepath);
|
|
|
|
+ done = true;
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ final String message = ioe.getMessage();
|
|
|
|
+ if (message.contains(AlreadyBeingCreatedException.class.getSimpleName())) {
|
|
|
|
+ AppendTestUtil.LOG.info("GOOD! got " + message);
|
|
|
|
+ }
|
|
|
|
+ else {
|
|
|
|
+ AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (!done) {
|
|
|
|
+ AppendTestUtil.LOG.info("sleep " + 1000 + "ms");
|
|
|
|
+ try {Thread.sleep(5000);} catch (InterruptedException e) {}
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // try to re-open the file before closing the previous handle. This
|
|
|
|
+ // should fail but will trigger lease recovery.
|
|
|
|
+ private Path createFile(DistributedFileSystem dfs, int size)
|
|
|
|
+ throws IOException, InterruptedException {
|
|
|
|
+ // create a random file name
|
|
|
|
+ String filestr = "/foo" + AppendTestUtil.nextInt();
|
|
|
|
+ System.out.println("filestr=" + filestr);
|
|
|
|
+ Path filepath = new Path(filestr);
|
|
|
|
+ FSDataOutputStream stm = dfs.create(filepath, true,
|
|
|
|
+ bufferSize, REPLICATION_NUM, BLOCK_SIZE);
|
|
|
|
+ assertTrue(dfs.dfs.exists(filestr));
|
|
|
|
+
|
|
|
|
+ // write random number of bytes into it.
|
|
|
|
+ System.out.println("size=" + size);
|
|
|
|
+ stm.write(buffer, 0, size);
|
|
|
|
+
|
|
|
|
+ // sync file
|
|
|
|
+ AppendTestUtil.LOG.info("sync");
|
|
|
|
+ stm.sync();
|
|
|
|
+ AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
|
|
|
|
+ dfs.dfs.leasechecker.interruptAndJoin();
|
|
|
|
+ return filepath;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void recoverLeaseUsingCreate(Path filepath)
|
|
|
|
+ throws IOException, InterruptedException {
|
|
|
|
+ UserGroupInformation ugi =
|
|
|
|
+ UserGroupInformation.createUserForTesting(fakeUsername,
|
|
|
|
+ new String [] { fakeGroup});
|
|
|
|
+ FileSystem dfs2 = DFSTestUtil.getFileSystemAs(ugi, conf);
|
|
|
|
+
|
|
|
|
+ boolean done = false;
|
|
|
|
+ for(int i = 0; i < 10 && !done; i++) {
|
|
|
|
+ AppendTestUtil.LOG.info("i=" + i);
|
|
|
|
+ try {
|
|
|
|
+ dfs2.create(filepath, false, bufferSize, (short)1, BLOCK_SIZE);
|
|
|
|
+ fail("Creation of an existing file should never succeed.");
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ final String message = ioe.getMessage();
|
|
|
|
+ if (message.contains("file exists")) {
|
|
|
|
+ AppendTestUtil.LOG.info("done", ioe);
|
|
|
|
+ done = true;
|
|
|
|
+ }
|
|
|
|
+ else if (message.contains(AlreadyBeingCreatedException.class.getSimpleName())) {
|
|
|
|
+ AppendTestUtil.LOG.info("GOOD! got " + message);
|
|
|
|
+ }
|
|
|
|
+ else {
|
|
|
|
+ AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (!done) {
|
|
|
|
+ AppendTestUtil.LOG.info("sleep " + 5000 + "ms");
|
|
|
|
+ try {Thread.sleep(5000);} catch (InterruptedException e) {}
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ assertTrue(done);
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void verifyFile(FileSystem dfs, Path filepath, byte[] actual,
|
|
|
|
+ int size) throws IOException {
|
|
|
|
+ AppendTestUtil.LOG.info("Lease for file " + filepath + " is recovered. "
|
|
|
|
+ + "Validating its contents now...");
|
|
|
|
+
|
|
|
|
+ // verify that file-size matches
|
|
|
|
+ assertTrue("File should be " + size + " bytes, but is actually " +
|
|
|
|
+ " found to be " + dfs.getFileStatus(filepath).getLen() +
|
|
|
|
+ " bytes",
|
|
|
|
+ dfs.getFileStatus(filepath).getLen() == size);
|
|
|
|
+
|
|
|
|
+ // verify that there is enough data to read.
|
|
|
|
+ System.out.println("File size is good. Now validating sizes from datanodes...");
|
|
|
|
+ FSDataInputStream stmin = dfs.open(filepath);
|
|
|
|
+ stmin.readFully(0, actual, 0, size);
|
|
|
|
+ stmin.close();
|
|
|
|
+ }
|
|
}
|
|
}
|