|
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
import static org.junit.Assert.fail;
|
|
|
|
|
|
+import com.google.common.base.Supplier;
|
|
|
import java.io.IOException;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
@@ -121,7 +122,7 @@ public class TestConsistentReadsObserver {
|
|
|
@Test
|
|
|
public void testMsyncSimple() throws Exception {
|
|
|
// 0 == not completed, 1 == succeeded, -1 == failed
|
|
|
- AtomicInteger readStatus = new AtomicInteger(0);
|
|
|
+ final AtomicInteger readStatus = new AtomicInteger(0);
|
|
|
|
|
|
// Making an uncoordinated call, which initialize the proxy
|
|
|
// to Observer node.
|
|
@@ -129,14 +130,17 @@ public class TestConsistentReadsObserver {
|
|
|
dfs.mkdir(testPath, FsPermission.getDefault());
|
|
|
assertSentTo(0);
|
|
|
|
|
|
- Thread reader = new Thread(() -> {
|
|
|
- try {
|
|
|
- // this read will block until roll and tail edits happen.
|
|
|
- dfs.getFileStatus(testPath);
|
|
|
- readStatus.set(1);
|
|
|
- } catch (IOException e) {
|
|
|
- e.printStackTrace();
|
|
|
- readStatus.set(-1);
|
|
|
+ Thread reader = new Thread(new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ // this read will block until roll and tail edits happen.
|
|
|
+ dfs.getFileStatus(testPath);
|
|
|
+ readStatus.set(1);
|
|
|
+ } catch (IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ readStatus.set(-1);
|
|
|
+ }
|
|
|
}
|
|
|
});
|
|
|
|
|
@@ -145,7 +149,12 @@ public class TestConsistentReadsObserver {
|
|
|
assertEquals(0, readStatus.get());
|
|
|
dfsCluster.rollEditLogAndTail(0);
|
|
|
// wait a while for all the change to be done
|
|
|
- GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000);
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ return readStatus.get() != 0;
|
|
|
+ }
|
|
|
+ }, 100, 10000);
|
|
|
// the reader should have succeed.
|
|
|
assertEquals(1, readStatus.get());
|
|
|
}
|
|
@@ -159,19 +168,22 @@ public class TestConsistentReadsObserver {
|
|
|
|
|
|
// a status flag, initialized to 0, after reader finished, this will be
|
|
|
// updated to 1, -1 on error
|
|
|
- AtomicInteger readStatus = new AtomicInteger(0);
|
|
|
+ final AtomicInteger readStatus = new AtomicInteger(0);
|
|
|
|
|
|
// create a separate thread to make a blocking read.
|
|
|
- Thread reader = new Thread(() -> {
|
|
|
- try {
|
|
|
- // this read call will block until server state catches up. But due to
|
|
|
- // configuration, this will take a very long time.
|
|
|
- dfs.getClient().getFileInfo("/");
|
|
|
- readStatus.set(1);
|
|
|
- fail("Should have been interrupted before getting here.");
|
|
|
- } catch (IOException e) {
|
|
|
- e.printStackTrace();
|
|
|
- readStatus.set(-1);
|
|
|
+ Thread reader = new Thread(new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ // this read call will block until server state catches up. But due to
|
|
|
+ // configuration, this will take a very long time.
|
|
|
+ dfs.getClient().getFileInfo("/");
|
|
|
+ readStatus.set(1);
|
|
|
+ fail("Should have been interrupted before getting here.");
|
|
|
+ } catch (IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ readStatus.set(-1);
|
|
|
+ }
|
|
|
}
|
|
|
});
|
|
|
reader.start();
|