|
@@ -843,6 +843,123 @@ public class TestOpenFilesWithSnapshot {
|
|
|
hbaseFileCksumBeforeTruncate, hbaseFileCksumS3);
|
|
|
}
|
|
|
|
|
|
+ private Path createSnapshot(Path snapRootDir, String snapName,
|
|
|
+ String fileName) throws Exception {
|
|
|
+ final Path snap1Dir = SnapshotTestHelper.createSnapshot(
|
|
|
+ fs, snapRootDir, snapName);
|
|
|
+ return new Path(snap1Dir, fileName);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void verifyFileSize(long fileSize, Path... filePaths) throws
|
|
|
+ IOException {
|
|
|
+ for (Path filePath : filePaths) {
|
|
|
+ Assert.assertEquals(fileSize, fs.getFileStatus(filePath).getLen());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Verify open files captured in the snapshots across config disable
|
|
|
+ * and enable.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testOpenFilesWithMixedConfig() throws Exception {
|
|
|
+ final Path snapRootDir = new Path("/level_0_A");
|
|
|
+ final String flumeFileName = "flume.log";
|
|
|
+ final String snap1Name = "s1";
|
|
|
+ final String snap2Name = "s2";
|
|
|
+ final String snap3Name = "s3";
|
|
|
+ final String snap4Name = "s4";
|
|
|
+ final String snap5Name = "s5";
|
|
|
+
|
|
|
+ // Create files and open streams
|
|
|
+ final Path flumeFile = new Path(snapRootDir, flumeFileName);
|
|
|
+ createFile(flumeFile);
|
|
|
+ FSDataOutputStream flumeOutputStream = fs.append(flumeFile);
|
|
|
+
|
|
|
+ // 1. Disable capture open files
|
|
|
+ cluster.getNameNode().getNamesystem()
|
|
|
+ .getSnapshotManager().setCaptureOpenFiles(false);
|
|
|
+
|
|
|
+ // Create Snapshot S1
|
|
|
+ final Path flumeS1Path = createSnapshot(snapRootDir,
|
|
|
+ snap1Name, flumeFileName);
|
|
|
+
|
|
|
+ // Verify if Snap S1 file length is same as the the current versions
|
|
|
+ verifyFileSize(FILELEN, flumeS1Path);
|
|
|
+
|
|
|
+ // Write more data to files
|
|
|
+ long flumeFileWrittenDataLength = FILELEN;
|
|
|
+ int newWriteLength = (int) (BLOCKSIZE * 1.5);
|
|
|
+ byte[] buf = new byte[newWriteLength];
|
|
|
+ Random random = new Random();
|
|
|
+ random.nextBytes(buf);
|
|
|
+ flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf);
|
|
|
+
|
|
|
+ // Create Snapshot S2
|
|
|
+ final Path flumeS2Path = createSnapshot(snapRootDir,
|
|
|
+ snap2Name, flumeFileName);
|
|
|
+
|
|
|
+ // Since capture open files was disabled, all snapshots paths
|
|
|
+ // and the current version should have same file lengths.
|
|
|
+ verifyFileSize(flumeFileWrittenDataLength,
|
|
|
+ flumeFile, flumeS2Path, flumeS1Path);
|
|
|
+
|
|
|
+ // 2. Enable capture open files
|
|
|
+ cluster.getNameNode().getNamesystem()
|
|
|
+ .getSnapshotManager() .setCaptureOpenFiles(true);
|
|
|
+
|
|
|
+ // Write more data to files
|
|
|
+ flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf);
|
|
|
+ long flumeFileLengthAfterS3 = flumeFileWrittenDataLength;
|
|
|
+
|
|
|
+ // Create Snapshot S3
|
|
|
+ final Path flumeS3Path = createSnapshot(snapRootDir,
|
|
|
+ snap3Name, flumeFileName);
|
|
|
+
|
|
|
+ // Since open files captured in the previous snapshots were with config
|
|
|
+ // disabled, their file lengths are now same as the current version.
|
|
|
+ // With the config turned on, any new data written to the open files
|
|
|
+ // will no more reflect in the current version or old snapshot paths.
|
|
|
+ verifyFileSize(flumeFileWrittenDataLength, flumeFile, flumeS3Path,
|
|
|
+ flumeS2Path, flumeS1Path);
|
|
|
+
|
|
|
+ // Write more data to files
|
|
|
+ flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf);
|
|
|
+
|
|
|
+ // Create Snapshot S4
|
|
|
+ final Path flumeS4Path = createSnapshot(snapRootDir,
|
|
|
+ snap4Name, flumeFileName);
|
|
|
+
|
|
|
+ // Verify S4 has the latest data
|
|
|
+ verifyFileSize(flumeFileWrittenDataLength, flumeFile, flumeS4Path);
|
|
|
+
|
|
|
+ // But, open files captured as of Snapshot S3 and before should
|
|
|
+ // have their old file lengths intact.
|
|
|
+ verifyFileSize(flumeFileLengthAfterS3, flumeS3Path,
|
|
|
+ flumeS2Path, flumeS1Path);
|
|
|
+
|
|
|
+ long flumeFileLengthAfterS4 = flumeFileWrittenDataLength;
|
|
|
+
|
|
|
+ // 3. Disable capture open files
|
|
|
+ cluster.getNameNode().getNamesystem()
|
|
|
+ .getSnapshotManager() .setCaptureOpenFiles(false);
|
|
|
+
|
|
|
+ // Create Snapshot S5
|
|
|
+ final Path flumeS5Path = createSnapshot(snapRootDir,
|
|
|
+ snap5Name, flumeFileName);
|
|
|
+
|
|
|
+ flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf);
|
|
|
+
|
|
|
+ // Since capture open files was disabled, any snapshots taken after the
|
|
|
+ // config change and the current version should have same file lengths
|
|
|
+ // for the open files.
|
|
|
+ verifyFileSize(flumeFileWrittenDataLength, flumeFile, flumeS5Path);
|
|
|
+
|
|
|
+ // But, the old snapshots taken before the config disable should
|
|
|
+ // continue to be consistent.
|
|
|
+ verifyFileSize(flumeFileLengthAfterS4, flumeS4Path);
|
|
|
+ }
|
|
|
+
|
|
|
private void restartNameNode() throws Exception {
|
|
|
cluster.triggerBlockReports();
|
|
|
NameNode nameNode = cluster.getNameNode();
|