|
@@ -162,10 +162,10 @@ public class TestConsistentReadsObserver {
|
|
|
assertEquals(1, readStatus.get());
|
|
|
}
|
|
|
|
|
|
- private void testMsync(boolean autoMsync, long autoMsyncPeriodMs)
|
|
|
+ private void testMsync(final boolean autoMsync, final long autoMsyncPeriodMs)
|
|
|
throws Exception {
|
|
|
// 0 == not completed, 1 == succeeded, -1 == failed
|
|
|
- AtomicInteger readStatus = new AtomicInteger(0);
|
|
|
+ final AtomicInteger readStatus = new AtomicInteger(0);
|
|
|
Configuration conf2 = new Configuration(conf);
|
|
|
|
|
|
// Disable FS cache so two different DFS clients will be used.
|
|
@@ -176,7 +176,8 @@ public class TestConsistentReadsObserver {
|
|
|
+ "." + dfs.getUri().getHost(),
|
|
|
autoMsyncPeriodMs, TimeUnit.MILLISECONDS);
|
|
|
}
|
|
|
- DistributedFileSystem dfs2 = (DistributedFileSystem) FileSystem.get(conf2);
|
|
|
+ final DistributedFileSystem dfs2 =
|
|
|
+ (DistributedFileSystem) FileSystem.get(conf2);
|
|
|
|
|
|
// Initialize the proxies for Observer Node.
|
|
|
dfs.getClient().getHAServiceState();
|
|
@@ -191,25 +192,28 @@ public class TestConsistentReadsObserver {
|
|
|
dfs.mkdir(testPath, FsPermission.getDefault());
|
|
|
assertSentTo(0);
|
|
|
|
|
|
- Thread reader = new Thread(() -> {
|
|
|
- try {
|
|
|
- // After msync, client should have the latest state ID from active.
|
|
|
- // Therefore, the subsequent getFileStatus call should succeed.
|
|
|
- if (!autoMsync) {
|
|
|
- // If not testing auto-msync, perform an explicit one here
|
|
|
- dfs2.getClient().msync();
|
|
|
- } else if (autoMsyncPeriodMs > 0) {
|
|
|
- Thread.sleep(autoMsyncPeriodMs);
|
|
|
- }
|
|
|
- dfs2.getFileStatus(testPath);
|
|
|
- if (HATestUtil.isSentToAnyOfNameNodes(dfs2, dfsCluster, 2)) {
|
|
|
- readStatus.set(1);
|
|
|
- } else {
|
|
|
+ Thread reader = new Thread(new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ // After msync, client should have the latest state ID from active.
|
|
|
+ // Therefore, the subsequent getFileStatus call should succeed.
|
|
|
+ if (!autoMsync) {
|
|
|
+ // If not testing auto-msync, perform an explicit one here
|
|
|
+ dfs2.getClient().msync();
|
|
|
+ } else if (autoMsyncPeriodMs > 0) {
|
|
|
+ Thread.sleep(autoMsyncPeriodMs);
|
|
|
+ }
|
|
|
+ dfs2.getFileStatus(testPath);
|
|
|
+ if (HATestUtil.isSentToAnyOfNameNodes(dfs2, dfsCluster, 2)) {
|
|
|
+ readStatus.set(1);
|
|
|
+ } else {
|
|
|
+ readStatus.set(-1);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
readStatus.set(-1);
|
|
|
}
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- readStatus.set(-1);
|
|
|
}
|
|
|
});
|
|
|
|
|
@@ -220,7 +224,12 @@ public class TestConsistentReadsObserver {
|
|
|
|
|
|
dfsCluster.rollEditLogAndTail(0);
|
|
|
|
|
|
- GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 3000);
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ return readStatus.get() != 0;
|
|
|
+ }
|
|
|
+ }, 100, 3000);
|
|
|
assertEquals(1, readStatus.get());
|
|
|
}
|
|
|
|
|
@@ -258,7 +267,7 @@ public class TestConsistentReadsObserver {
|
|
|
dfsCluster.transitionToActive(2);
|
|
|
try {
|
|
|
// 0 == not completed, 1 == succeeded, -1 == failed
|
|
|
- AtomicInteger readStatus = new AtomicInteger(0);
|
|
|
+ final AtomicInteger readStatus = new AtomicInteger(0);
|
|
|
|
|
|
// Initialize the proxies for Observer Node.
|
|
|
dfs.getClient().getHAServiceState();
|
|
@@ -276,17 +285,20 @@ public class TestConsistentReadsObserver {
|
|
|
|
|
|
// Disable FS cache so two different DFS clients will be used.
|
|
|
conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
|
|
|
- DistributedFileSystem dfs2 =
|
|
|
+ final DistributedFileSystem dfs2 =
|
|
|
(DistributedFileSystem) FileSystem.get(conf2);
|
|
|
dfs2.getClient().getHAServiceState();
|
|
|
|
|
|
- Thread reader = new Thread(() -> {
|
|
|
- try {
|
|
|
- dfs2.getFileStatus(testPath);
|
|
|
- readStatus.set(1);
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- readStatus.set(-1);
|
|
|
+ Thread reader = new Thread(new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ dfs2.getFileStatus(testPath);
|
|
|
+ readStatus.set(1);
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ readStatus.set(-1);
|
|
|
+ }
|
|
|
}
|
|
|
});
|
|
|
|
|
@@ -299,7 +311,12 @@ public class TestConsistentReadsObserver {
|
|
|
dfsCluster.getNameNode(0)
|
|
|
.getNamesystem().getEditLogTailer().doTailEdits();
|
|
|
|
|
|
- GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000);
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ return readStatus.get() != 0;
|
|
|
+ }
|
|
|
+ }, 100, 10000);
|
|
|
assertEquals(1, readStatus.get());
|
|
|
} finally {
|
|
|
// Put the cluster back the way it was when the test started
|