|
@@ -834,23 +834,27 @@ public class TestRPC {
|
|
|
TestProtocol.class, new TestImpl(), ADDRESS, 0, 5, true, conf, null
|
|
|
);
|
|
|
server.start();
|
|
|
- InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
|
|
-
|
|
|
- final TestProtocol proxy = (TestProtocol) RPC.getProxy(
|
|
|
- TestProtocol.class, TestProtocol.versionID, addr, conf);
|
|
|
- // Connect to the server
|
|
|
- proxy.ping();
|
|
|
- // Interrupt self, try another call
|
|
|
- Thread.currentThread().interrupt();
|
|
|
try {
|
|
|
+ InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
|
|
+
|
|
|
+ final TestProtocol proxy = (TestProtocol) RPC.getProxy(
|
|
|
+ TestProtocol.class, TestProtocol.versionID, addr, conf);
|
|
|
+ // Connect to the server
|
|
|
proxy.ping();
|
|
|
- fail("Interruption did not cause IPC to fail");
|
|
|
- } catch (IOException ioe) {
|
|
|
- if (!ioe.toString().contains("InterruptedException")) {
|
|
|
- throw ioe;
|
|
|
+ // Interrupt self, try another call
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ try {
|
|
|
+ proxy.ping();
|
|
|
+ fail("Interruption did not cause IPC to fail");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ if (!ioe.toString().contains("InterruptedException")) {
|
|
|
+ throw ioe;
|
|
|
+ }
|
|
|
+ // clear interrupt status for future tests
|
|
|
+ Thread.interrupted();
|
|
|
}
|
|
|
- // clear interrupt status for future tests
|
|
|
- Thread.interrupted();
|
|
|
+ } finally {
|
|
|
+ server.stop();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -862,59 +866,62 @@ public class TestRPC {
|
|
|
);
|
|
|
|
|
|
server.start();
|
|
|
-
|
|
|
- int numConcurrentRPC = 200;
|
|
|
- InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
|
|
- final CyclicBarrier barrier = new CyclicBarrier(numConcurrentRPC);
|
|
|
- final CountDownLatch latch = new CountDownLatch(numConcurrentRPC);
|
|
|
- final AtomicBoolean leaderRunning = new AtomicBoolean(true);
|
|
|
- final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
|
|
|
- Thread leaderThread = null;
|
|
|
-
|
|
|
- for (int i = 0; i < numConcurrentRPC; i++) {
|
|
|
- final int num = i;
|
|
|
- final TestProtocol proxy = (TestProtocol) RPC.getProxy(
|
|
|
- TestProtocol.class, TestProtocol.versionID, addr, conf);
|
|
|
- Thread rpcThread = new Thread(new Runnable() {
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- try {
|
|
|
- barrier.await();
|
|
|
- while (num == 0 || leaderRunning.get()) {
|
|
|
+ try {
|
|
|
+ int numConcurrentRPC = 200;
|
|
|
+ InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
|
|
+ final CyclicBarrier barrier = new CyclicBarrier(numConcurrentRPC);
|
|
|
+ final CountDownLatch latch = new CountDownLatch(numConcurrentRPC);
|
|
|
+ final AtomicBoolean leaderRunning = new AtomicBoolean(true);
|
|
|
+ final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
|
|
|
+ Thread leaderThread = null;
|
|
|
+
|
|
|
+ for (int i = 0; i < numConcurrentRPC; i++) {
|
|
|
+ final int num = i;
|
|
|
+ final TestProtocol proxy = (TestProtocol) RPC.getProxy(
|
|
|
+ TestProtocol.class, TestProtocol.versionID, addr, conf);
|
|
|
+ Thread rpcThread = new Thread(new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ barrier.await();
|
|
|
+ while (num == 0 || leaderRunning.get()) {
|
|
|
+ proxy.slowPing(false);
|
|
|
+ }
|
|
|
+
|
|
|
proxy.slowPing(false);
|
|
|
+ } catch (Exception e) {
|
|
|
+ if (num == 0) {
|
|
|
+ leaderRunning.set(false);
|
|
|
+ } else {
|
|
|
+ error.set(e);
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG.error(e);
|
|
|
+ } finally {
|
|
|
+ latch.countDown();
|
|
|
}
|
|
|
-
|
|
|
- proxy.slowPing(false);
|
|
|
- } catch (Exception e) {
|
|
|
- if (num == 0) {
|
|
|
- leaderRunning.set(false);
|
|
|
- } else {
|
|
|
- error.set(e);
|
|
|
- }
|
|
|
-
|
|
|
- LOG.error(e);
|
|
|
- } finally {
|
|
|
- latch.countDown();
|
|
|
}
|
|
|
+ });
|
|
|
+ rpcThread.start();
|
|
|
+
|
|
|
+ if (leaderThread == null) {
|
|
|
+ leaderThread = rpcThread;
|
|
|
}
|
|
|
- });
|
|
|
- rpcThread.start();
|
|
|
-
|
|
|
- if (leaderThread == null) {
|
|
|
- leaderThread = rpcThread;
|
|
|
}
|
|
|
+ // let threads get past the barrier
|
|
|
+ Thread.sleep(1000);
|
|
|
+ // stop a single thread
|
|
|
+ while (leaderRunning.get()) {
|
|
|
+ leaderThread.interrupt();
|
|
|
+ }
|
|
|
+
|
|
|
+ latch.await();
|
|
|
+
|
|
|
+ // should not cause any other thread to get an error
|
|
|
+ assertTrue("rpc got exception " + error.get(), error.get() == null);
|
|
|
+ } finally {
|
|
|
+ server.stop();
|
|
|
}
|
|
|
- // let threads get past the barrier
|
|
|
- Thread.sleep(1000);
|
|
|
- // stop a single thread
|
|
|
- while (leaderRunning.get()) {
|
|
|
- leaderThread.interrupt();
|
|
|
- }
|
|
|
-
|
|
|
- latch.await();
|
|
|
-
|
|
|
- // should not cause any other thread to get an error
|
|
|
- assertTrue("rpc got exception " + error.get(), error.get() == null);
|
|
|
}
|
|
|
|
|
|
public static void main(String[] args) throws Exception {
|