|
@@ -23,7 +23,11 @@ import java.util.EnumSet;
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
import java.util.Random;
|
|
import java.util.Random;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
@@ -38,12 +42,15 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
|
|
|
+import org.apache.hadoop.util.Time;
|
|
import org.junit.After;
|
|
import org.junit.After;
|
|
import org.junit.Assert;
|
|
import org.junit.Assert;
|
|
import org.junit.Before;
|
|
import org.junit.Before;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
|
|
|
|
public class TestOpenFilesWithSnapshot {
|
|
public class TestOpenFilesWithSnapshot {
|
|
|
|
+ private static final Log LOG =
|
|
|
|
+ LogFactory.getLog(TestOpenFilesWithSnapshot.class.getName());
|
|
private final Configuration conf = new Configuration();
|
|
private final Configuration conf = new Configuration();
|
|
MiniDFSCluster cluster = null;
|
|
MiniDFSCluster cluster = null;
|
|
DistributedFileSystem fs = null;
|
|
DistributedFileSystem fs = null;
|
|
@@ -622,6 +629,108 @@ public class TestOpenFilesWithSnapshot {
|
|
hbaseOutputStream.close();
|
|
hbaseOutputStream.close();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Test client writing to open files are not interrupted when snapshots
|
|
|
|
+ * that captured open files get deleted.
|
|
|
|
+ */
|
|
|
|
+ @Test (timeout = 240000)
|
|
|
|
+ public void testOpenFileWritingAcrossSnapDeletion() throws Exception {
|
|
|
|
+ final Path snapRootDir = new Path("/level_0_A");
|
|
|
|
+ final String flumeFileName = "flume.log";
|
|
|
|
+ final String hbaseFileName = "hbase.log";
|
|
|
|
+ final String snap1Name = "snap_1";
|
|
|
|
+ final String snap2Name = "snap_2";
|
|
|
|
+ final String snap3Name = "snap_3";
|
|
|
|
+
|
|
|
|
+ // Create files and open streams
|
|
|
|
+ final Path flumeFile = new Path(snapRootDir, flumeFileName);
|
|
|
|
+ FSDataOutputStream flumeOut = fs.create(flumeFile, false,
|
|
|
|
+ 8000, (short)3, 1048576);
|
|
|
|
+ flumeOut.close();
|
|
|
|
+ final Path hbaseFile = new Path(snapRootDir, hbaseFileName);
|
|
|
|
+ FSDataOutputStream hbaseOut = fs.create(hbaseFile, false,
|
|
|
|
+ 8000, (short)3, 1048576);
|
|
|
|
+ hbaseOut.close();
|
|
|
|
+
|
|
|
|
+ final AtomicBoolean writerError = new AtomicBoolean(false);
|
|
|
|
+ final CountDownLatch startLatch = new CountDownLatch(1);
|
|
|
|
+ final CountDownLatch deleteLatch = new CountDownLatch(1);
|
|
|
|
+ Thread t = new Thread(new Runnable() {
|
|
|
|
+ @Override
|
|
|
|
+ public void run() {
|
|
|
|
+ try {
|
|
|
|
+ FSDataOutputStream flumeOutputStream = fs.append(flumeFile, 8000);
|
|
|
|
+ FSDataOutputStream hbaseOutputStream = fs.append(hbaseFile, 8000);
|
|
|
|
+ byte[] bytes = new byte[(int) (1024 * 0.2)];
|
|
|
|
+ Random r = new Random(Time.now());
|
|
|
|
+
|
|
|
|
+ for (int i = 0; i < 200000; i++) {
|
|
|
|
+ r.nextBytes(bytes);
|
|
|
|
+ flumeOutputStream.write(bytes);
|
|
|
|
+ if (hbaseOutputStream != null) {
|
|
|
|
+ hbaseOutputStream.write(bytes);
|
|
|
|
+ }
|
|
|
|
+ if (i == 50000) {
|
|
|
|
+ startLatch.countDown();
|
|
|
|
+ } else if (i == 100000) {
|
|
|
|
+ deleteLatch.countDown();
|
|
|
|
+ } else if (i == 150000) {
|
|
|
|
+ hbaseOutputStream.hsync();
|
|
|
|
+ fs.delete(hbaseFile, true);
|
|
|
|
+ try {
|
|
|
|
+ hbaseOutputStream.close();
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ // since the file is deleted before the open stream close,
|
|
|
|
+ // it might throw FileNotFoundException. Ignore the
|
|
|
|
+ // expected exception.
|
|
|
|
+ }
|
|
|
|
+ hbaseOutputStream = null;
|
|
|
|
+ } else if (i % 5000 == 0) {
|
|
|
|
+ LOG.info("Write pos: " + flumeOutputStream.getPos()
|
|
|
|
+ + ", size: " + fs.getFileStatus(flumeFile).getLen()
|
|
|
|
+ + ", loop: " + (i + 1));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ LOG.warn("Writer error: " + e);
|
|
|
|
+ writerError.set(true);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ t.start();
|
|
|
|
+
|
|
|
|
+ startLatch.await();
|
|
|
|
+ final Path snap1Dir = SnapshotTestHelper.createSnapshot(
|
|
|
|
+ fs, snapRootDir, snap1Name);
|
|
|
|
+ final Path flumeS1Path = new Path(snap1Dir, flumeFileName);
|
|
|
|
+ LOG.info("Snap1 file status: " + fs.getFileStatus(flumeS1Path));
|
|
|
|
+ LOG.info("Current file status: " + fs.getFileStatus(flumeFile));
|
|
|
|
+
|
|
|
|
+ deleteLatch.await();
|
|
|
|
+ LOG.info("Snap1 file status: " + fs.getFileStatus(flumeS1Path));
|
|
|
|
+ LOG.info("Current file status: " + fs.getFileStatus(flumeFile));
|
|
|
|
+
|
|
|
|
+ // Verify deletion of snapshot which had the under construction file
|
|
|
|
+ // captured is not truncating the under construction file and the thread
|
|
|
|
+ // writing to the same file not crashing on newer block allocations.
|
|
|
|
+ LOG.info("Deleting " + snap1Name);
|
|
|
|
+ fs.deleteSnapshot(snapRootDir, snap1Name);
|
|
|
|
+
|
|
|
|
+ // Verify creation and deletion of snapshot newer than the oldest
|
|
|
|
+ // snapshot is not crashing the thread writing to under construction file.
|
|
|
|
+ SnapshotTestHelper.createSnapshot(fs, snapRootDir, snap2Name);
|
|
|
|
+ SnapshotTestHelper.createSnapshot(fs, snapRootDir, snap3Name);
|
|
|
|
+ fs.deleteSnapshot(snapRootDir, snap3Name);
|
|
|
|
+ fs.deleteSnapshot(snapRootDir, snap2Name);
|
|
|
|
+ SnapshotTestHelper.createSnapshot(fs, snapRootDir, "test");
|
|
|
|
+
|
|
|
|
+ t.join();
|
|
|
|
+ Assert.assertFalse("Client encountered writing error!", writerError.get());
|
|
|
|
+
|
|
|
|
+ restartNameNode();
|
|
|
|
+ cluster.waitActive();
|
|
|
|
+ }
|
|
|
|
+
|
|
private void restartNameNode() throws Exception {
|
|
private void restartNameNode() throws Exception {
|
|
cluster.triggerBlockReports();
|
|
cluster.triggerBlockReports();
|
|
NameNode nameNode = cluster.getNameNode();
|
|
NameNode nameNode = cluster.getNameNode();
|