|
@@ -496,132 +496,6 @@ public class TestOpenFilesWithSnapshot {
|
|
flumeOutputStream.close();
|
|
flumeOutputStream.close();
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * Test snapshot capturing open files when an open file with active lease
|
|
|
|
- * is deleted by the client.
|
|
|
|
- */
|
|
|
|
- @Test (timeout = 120000)
|
|
|
|
- public void testSnapshotsForOpenFilesAndDeletion() throws Exception {
|
|
|
|
- // Construct the directory tree
|
|
|
|
- final Path snapRootDir = new Path("/level_0_A");
|
|
|
|
- final String flumeFileName = "flume.log";
|
|
|
|
- final String hbaseFileName = "hbase.log";
|
|
|
|
- final String snap1Name = "snap_1";
|
|
|
|
- final String snap2Name = "snap_2";
|
|
|
|
- final String snap3Name = "snap_3";
|
|
|
|
-
|
|
|
|
- // Create files and open streams
|
|
|
|
- final Path flumeFile = new Path(snapRootDir, flumeFileName);
|
|
|
|
- createFile(flumeFile);
|
|
|
|
- final Path hbaseFile = new Path(snapRootDir, hbaseFileName);
|
|
|
|
- createFile(hbaseFile);
|
|
|
|
- FSDataOutputStream flumeOutputStream = fs.append(flumeFile);
|
|
|
|
- FSDataOutputStream hbaseOutputStream = fs.append(hbaseFile);
|
|
|
|
-
|
|
|
|
- // Create Snapshot S1
|
|
|
|
- final Path snap1Dir = SnapshotTestHelper.createSnapshot(
|
|
|
|
- fs, snapRootDir, snap1Name);
|
|
|
|
- final Path flumeS1Path = new Path(snap1Dir, flumeFileName);
|
|
|
|
- final long flumeFileLengthAfterS1 = fs.getFileStatus(flumeFile).getLen();
|
|
|
|
- final Path hbaseS1Path = new Path(snap1Dir, hbaseFileName);
|
|
|
|
- final long hbaseFileLengthAfterS1 = fs.getFileStatus(hbaseFile).getLen();
|
|
|
|
-
|
|
|
|
- // Verify if Snap S1 file length is same as the the current versions
|
|
|
|
- Assert.assertEquals(flumeFileLengthAfterS1,
|
|
|
|
- fs.getFileStatus(flumeS1Path).getLen());
|
|
|
|
- Assert.assertEquals(hbaseFileLengthAfterS1,
|
|
|
|
- fs.getFileStatus(hbaseS1Path).getLen());
|
|
|
|
-
|
|
|
|
- long flumeFileWrittenDataLength = flumeFileLengthAfterS1;
|
|
|
|
- long hbaseFileWrittenDataLength = hbaseFileLengthAfterS1;
|
|
|
|
- int newWriteLength = (int) (BLOCKSIZE * 1.5);
|
|
|
|
- byte[] buf = new byte[newWriteLength];
|
|
|
|
- Random random = new Random();
|
|
|
|
- random.nextBytes(buf);
|
|
|
|
-
|
|
|
|
- // Write more data to files
|
|
|
|
- flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf);
|
|
|
|
- hbaseFileWrittenDataLength += writeToStream(hbaseOutputStream, buf);
|
|
|
|
-
|
|
|
|
- // Create Snapshot S2
|
|
|
|
- final Path snap2Dir = SnapshotTestHelper.createSnapshot(
|
|
|
|
- fs, snapRootDir, snap2Name);
|
|
|
|
- final Path flumeS2Path = new Path(snap2Dir, flumeFileName);
|
|
|
|
- final Path hbaseS2Path = new Path(snap2Dir, hbaseFileName);
|
|
|
|
-
|
|
|
|
- // Verify current files length are same as all data written till now
|
|
|
|
- final long flumeFileLengthAfterS2 = fs.getFileStatus(flumeFile).getLen();
|
|
|
|
- Assert.assertEquals(flumeFileWrittenDataLength, flumeFileLengthAfterS2);
|
|
|
|
- final long hbaseFileLengthAfterS2 = fs.getFileStatus(hbaseFile).getLen();
|
|
|
|
- Assert.assertEquals(hbaseFileWrittenDataLength, hbaseFileLengthAfterS2);
|
|
|
|
-
|
|
|
|
- // Verify if Snap S2 file length is same as the current versions
|
|
|
|
- Assert.assertEquals(flumeFileLengthAfterS2,
|
|
|
|
- fs.getFileStatus(flumeS2Path).getLen());
|
|
|
|
- Assert.assertEquals(hbaseFileLengthAfterS2,
|
|
|
|
- fs.getFileStatus(hbaseS2Path).getLen());
|
|
|
|
-
|
|
|
|
- // Write more data to open files
|
|
|
|
- writeToStream(flumeOutputStream, buf);
|
|
|
|
- hbaseFileWrittenDataLength += writeToStream(hbaseOutputStream, buf);
|
|
|
|
-
|
|
|
|
- // Verify old snapshots have point-in-time/frozen file
|
|
|
|
- // lengths even after the current versions have moved forward.
|
|
|
|
- Assert.assertEquals(flumeFileLengthAfterS1,
|
|
|
|
- fs.getFileStatus(flumeS1Path).getLen());
|
|
|
|
- Assert.assertEquals(flumeFileLengthAfterS2,
|
|
|
|
- fs.getFileStatus(flumeS2Path).getLen());
|
|
|
|
- Assert.assertEquals(hbaseFileLengthAfterS1,
|
|
|
|
- fs.getFileStatus(hbaseS1Path).getLen());
|
|
|
|
- Assert.assertEquals(hbaseFileLengthAfterS2,
|
|
|
|
- fs.getFileStatus(hbaseS2Path).getLen());
|
|
|
|
-
|
|
|
|
- // Delete flume current file. Snapshots should
|
|
|
|
- // still have references to flume file.
|
|
|
|
- boolean flumeFileDeleted = fs.delete(flumeFile, true);
|
|
|
|
- Assert.assertTrue(flumeFileDeleted);
|
|
|
|
- Assert.assertFalse(fs.exists(flumeFile));
|
|
|
|
- Assert.assertTrue(fs.exists(flumeS1Path));
|
|
|
|
- Assert.assertTrue(fs.exists(flumeS2Path));
|
|
|
|
-
|
|
|
|
- SnapshotTestHelper.createSnapshot(fs, snapRootDir, "tmp_snap");
|
|
|
|
- fs.deleteSnapshot(snapRootDir, "tmp_snap");
|
|
|
|
-
|
|
|
|
- // Delete snap_2. snap_1 still has reference to
|
|
|
|
- // the flume file.
|
|
|
|
- fs.deleteSnapshot(snapRootDir, snap2Name);
|
|
|
|
- Assert.assertFalse(fs.exists(flumeS2Path));
|
|
|
|
- Assert.assertTrue(fs.exists(flumeS1Path));
|
|
|
|
-
|
|
|
|
- // Delete snap_1. Now all traces of flume file
|
|
|
|
- // is gone.
|
|
|
|
- fs.deleteSnapshot(snapRootDir, snap1Name);
|
|
|
|
- Assert.assertFalse(fs.exists(flumeS2Path));
|
|
|
|
- Assert.assertFalse(fs.exists(flumeS1Path));
|
|
|
|
-
|
|
|
|
- // Create Snapshot S3
|
|
|
|
- final Path snap3Dir = SnapshotTestHelper.createSnapshot(
|
|
|
|
- fs, snapRootDir, snap3Name);
|
|
|
|
- final Path hbaseS3Path = new Path(snap3Dir, hbaseFileName);
|
|
|
|
-
|
|
|
|
- // Verify live files length is same as all data written till now
|
|
|
|
- final long hbaseFileLengthAfterS3 = fs.getFileStatus(hbaseFile).getLen();
|
|
|
|
- Assert.assertEquals(hbaseFileWrittenDataLength, hbaseFileLengthAfterS3);
|
|
|
|
-
|
|
|
|
- // Write more data to open files
|
|
|
|
- hbaseFileWrittenDataLength += writeToStream(hbaseOutputStream, buf);
|
|
|
|
-
|
|
|
|
- // Verify old snapshots have point-in-time/frozen file
|
|
|
|
- // lengths even after the flume open file is deleted and
|
|
|
|
- // the hbase live file has moved forward.
|
|
|
|
- Assert.assertEquals(hbaseFileLengthAfterS3,
|
|
|
|
- fs.getFileStatus(hbaseS3Path).getLen());
|
|
|
|
- Assert.assertEquals(hbaseFileWrittenDataLength,
|
|
|
|
- fs.getFileStatus(hbaseFile).getLen());
|
|
|
|
-
|
|
|
|
- hbaseOutputStream.close();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
private void restartNameNode() throws Exception {
|
|
private void restartNameNode() throws Exception {
|
|
cluster.triggerBlockReports();
|
|
cluster.triggerBlockReports();
|
|
NameNode nameNode = cluster.getNameNode();
|
|
NameNode nameNode = cluster.getNameNode();
|