|
@@ -32,15 +32,21 @@ import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
import java.io.File;
|
|
|
-import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
import java.lang.reflect.Proxy;
|
|
|
import java.net.URI;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY;
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
+import static org.junit.Assert.assertFalse;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
import static org.junit.Assert.fail;
|
|
|
|
|
|
+
|
|
|
// Main unit tests for ObserverNode
|
|
|
public class TestObserverNode {
|
|
|
private Configuration conf;
|
|
@@ -58,7 +64,9 @@ public class TestObserverNode {
|
|
|
@Before
|
|
|
public void setUp() throws Exception {
|
|
|
conf = new Configuration();
|
|
|
- setUpCluster(1);
|
|
|
+ conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
|
|
|
+ conf.setTimeDuration(
|
|
|
+ DFS_HA_TAILEDITS_PERIOD_KEY, 100, TimeUnit.MILLISECONDS);
|
|
|
|
|
|
testPath = new Path("/test");
|
|
|
testPath2 = new Path("/test2");
|
|
@@ -74,18 +82,12 @@ public class TestObserverNode {
|
|
|
|
|
|
@Test
|
|
|
public void testSimpleRead() throws Exception {
|
|
|
+ setUpCluster(1);
|
|
|
setObserverRead(true);
|
|
|
|
|
|
dfs.mkdir(testPath, FsPermission.getDefault());
|
|
|
assertSentTo(0);
|
|
|
|
|
|
- try {
|
|
|
- dfs.getFileStatus(testPath);
|
|
|
- fail("Should throw FileNotFoundException");
|
|
|
- } catch (FileNotFoundException e) {
|
|
|
- // Pass
|
|
|
- }
|
|
|
-
|
|
|
rollEditLogAndTail(0);
|
|
|
dfs.getFileStatus(testPath);
|
|
|
assertSentTo(2);
|
|
@@ -96,6 +98,7 @@ public class TestObserverNode {
|
|
|
|
|
|
@Test
|
|
|
public void testFailover() throws Exception {
|
|
|
+ setUpCluster(1);
|
|
|
setObserverRead(false);
|
|
|
|
|
|
dfs.mkdir(testPath, FsPermission.getDefault());
|
|
@@ -115,6 +118,7 @@ public class TestObserverNode {
|
|
|
|
|
|
@Test
|
|
|
public void testDoubleFailover() throws Exception {
|
|
|
+ setUpCluster(1);
|
|
|
setObserverRead(true);
|
|
|
|
|
|
dfs.mkdir(testPath, FsPermission.getDefault());
|
|
@@ -180,6 +184,7 @@ public class TestObserverNode {
|
|
|
|
|
|
@Test
|
|
|
public void testObserverShutdown() throws Exception {
|
|
|
+ setUpCluster(1);
|
|
|
setObserverRead(true);
|
|
|
|
|
|
dfs.mkdir(testPath, FsPermission.getDefault());
|
|
@@ -201,6 +206,7 @@ public class TestObserverNode {
|
|
|
|
|
|
@Test
|
|
|
public void testObserverFailOverAndShutdown() throws Exception {
|
|
|
+ setUpCluster(1);
|
|
|
// Test the case when there is a failover before ONN shutdown
|
|
|
setObserverRead(true);
|
|
|
|
|
@@ -273,6 +279,7 @@ public class TestObserverNode {
|
|
|
|
|
|
@Test
|
|
|
public void testBootstrap() throws Exception {
|
|
|
+ setUpCluster(1);
|
|
|
for (URI u : dfsCluster.getNameDirs(2)) {
|
|
|
File dir = new File(u.getPath());
|
|
|
assertTrue(FileUtil.fullyDelete(dir));
|
|
@@ -284,6 +291,44 @@ public class TestObserverNode {
|
|
|
assertEquals(0, rc);
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testMsyncSimple() throws Exception {
|
|
|
+ // disable fast path here because this test's assertions are based on the
|
|
|
+ // timing of explicitly called rollEditLogAndTail. Although this means this
|
|
|
+ // test takes some time to run
|
|
|
+ // TODO: revisit if there is a better way.
|
|
|
+ conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, false);
|
|
|
+ conf.setTimeDuration(DFS_HA_LOGROLL_PERIOD_KEY, 60, TimeUnit.SECONDS);
|
|
|
+ conf.setTimeDuration(
|
|
|
+ DFS_HA_TAILEDITS_PERIOD_KEY, 30, TimeUnit.SECONDS);
|
|
|
+ setUpCluster(1);
|
|
|
+ setObserverRead(true);
|
|
|
+
|
|
|
+ AtomicBoolean readSucceed = new AtomicBoolean(false);
|
|
|
+
|
|
|
+ dfs.mkdir(testPath, FsPermission.getDefault());
|
|
|
+ assertSentTo(0);
|
|
|
+
|
|
|
+ Thread reader = new Thread(() -> {
|
|
|
+ try {
|
|
|
+ // this read will block until roll and tail edits happen.
|
|
|
+ dfs.getFileStatus(testPath);
|
|
|
+ readSucceed.set(true);
|
|
|
+ } catch (IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ reader.start();
|
|
|
+ // the reader is still blocking, not succeeded yet.
|
|
|
+ assertFalse(readSucceed.get());
|
|
|
+ rollEditLogAndTail(0);
|
|
|
+ // wait a while for all the change to be done
|
|
|
+ Thread.sleep(100);
|
|
|
+ // the reader should have succeed.
|
|
|
+ assertTrue(readSucceed.get());
|
|
|
+ }
|
|
|
+
|
|
|
private void setUpCluster(int numObservers) throws Exception {
|
|
|
qjmhaCluster = new MiniQJMHACluster.Builder(conf)
|
|
|
.setNumNameNodes(2 + numObservers)
|