|
@@ -170,10 +170,10 @@ public class TestZKRMStateStoreZKClientConnections extends
|
|
|
throws Exception {
|
|
|
|
|
|
TestZKClient zkClientTester = new TestZKClient();
|
|
|
- String path = "/test";
|
|
|
+ final String path = "/test";
|
|
|
YarnConfiguration conf = new YarnConfiguration();
|
|
|
conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
|
|
|
- ZKRMStateStore store =
|
|
|
+ final ZKRMStateStore store =
|
|
|
(ZKRMStateStore) zkClientTester.getRMStateStore(conf);
|
|
|
TestDispatcher dispatcher = new TestDispatcher();
|
|
|
store.setRMDispatcher(dispatcher);
|
|
@@ -185,14 +185,20 @@ public class TestZKRMStateStoreZKClientConnections extends
|
|
|
store.setDataWithRetries(path, "newBytes".getBytes(), 0);
|
|
|
|
|
|
stopServer();
|
|
|
+ final AtomicBoolean isSucceeded = new AtomicBoolean(false);
|
|
|
zkClientTester.watcher.waitForDisconnected(ZK_OP_WAIT_TIME);
|
|
|
- try {
|
|
|
- store.getDataWithRetries(path, true);
|
|
|
- fail("Expected ZKClient time out exception");
|
|
|
- } catch (Exception e) {
|
|
|
- assertTrue(e.getMessage().contains(
|
|
|
- "Wait for ZKClient creation timed out"));
|
|
|
- }
|
|
|
+ Thread thread = new Thread() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ store.getDataWithRetries(path, true);
|
|
|
+ isSucceeded.set(true);
|
|
|
+ } catch (Exception e) {
|
|
|
+ isSucceeded.set(false);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ thread.start();
|
|
|
|
|
|
// ZKRMStateStore Session restored
|
|
|
startServer();
|
|
@@ -206,6 +212,8 @@ public class TestZKRMStateStoreZKClientConnections extends
|
|
|
fail(error);
|
|
|
}
|
|
|
assertEquals("newBytes", new String(ret));
|
|
|
+ thread.join();
|
|
|
+ assertTrue(isSucceeded.get());
|
|
|
}
|
|
|
|
|
|
@Test(timeout = 20000)
|