|
@@ -18,20 +18,6 @@
|
|
|
|
|
|
package org.apache.hadoop.ipc;
|
|
package org.apache.hadoop.ipc;
|
|
|
|
|
|
-import static org.junit.Assert.assertEquals;
|
|
|
|
-import static org.junit.Assert.assertFalse;
|
|
|
|
-import static org.junit.Assert.assertTrue;
|
|
|
|
-
|
|
|
|
-import java.io.IOException;
|
|
|
|
-import java.net.InetSocketAddress;
|
|
|
|
-import java.util.ArrayList;
|
|
|
|
-import java.util.Collections;
|
|
|
|
-import java.util.HashMap;
|
|
|
|
-import java.util.List;
|
|
|
|
-import java.util.Map;
|
|
|
|
-import java.util.concurrent.ExecutionException;
|
|
|
|
-import java.util.concurrent.Future;
|
|
|
|
-
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -48,6 +34,17 @@ import org.junit.Assert;
|
|
import org.junit.Before;
|
|
import org.junit.Before;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
|
|
|
|
|
|
+import java.io.IOException;
|
|
|
|
+import java.net.InetSocketAddress;
|
|
|
|
+import java.util.*;
|
|
|
|
+import java.util.concurrent.ExecutionException;
|
|
|
|
+import java.util.concurrent.Future;
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
|
|
+
|
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
|
|
+import static org.junit.Assert.assertFalse;
|
|
|
|
+
|
|
public class TestAsyncIPC {
|
|
public class TestAsyncIPC {
|
|
|
|
|
|
private static Configuration conf;
|
|
private static Configuration conf;
|
|
@@ -87,26 +84,50 @@ public class TestAsyncIPC {
|
|
try {
|
|
try {
|
|
final long param = TestIPC.RANDOM.nextLong();
|
|
final long param = TestIPC.RANDOM.nextLong();
|
|
TestIPC.call(client, param, server, conf);
|
|
TestIPC.call(client, param, server, conf);
|
|
- Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
|
|
|
|
- returnFutures.put(i, returnFuture);
|
|
|
|
|
|
+ returnFutures.put(i, Client.getAsyncRpcResponse());
|
|
expectedValues.put(i, param);
|
|
expectedValues.put(i, param);
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
- LOG.fatal("Caught: " + StringUtils.stringifyException(e));
|
|
|
|
failed = true;
|
|
failed = true;
|
|
|
|
+ throw new RuntimeException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- public void waitForReturnValues() throws InterruptedException,
|
|
|
|
- ExecutionException {
|
|
|
|
|
|
+ void assertReturnValues() throws InterruptedException, ExecutionException {
|
|
for (int i = 0; i < count; i++) {
|
|
for (int i = 0; i < count; i++) {
|
|
LongWritable value = returnFutures.get(i).get();
|
|
LongWritable value = returnFutures.get(i).get();
|
|
- if (expectedValues.get(i) != value.get()) {
|
|
|
|
- LOG.fatal(String.format("Call-%d failed!", i));
|
|
|
|
- failed = true;
|
|
|
|
- break;
|
|
|
|
|
|
+ Assert.assertEquals("call" + i + " failed.",
|
|
|
|
+ expectedValues.get(i).longValue(), value.get());
|
|
|
|
+ }
|
|
|
|
+ Assert.assertFalse(failed);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ void assertReturnValues(long timeout, TimeUnit unit)
|
|
|
|
+ throws InterruptedException, ExecutionException {
|
|
|
|
+ final boolean[] checked = new boolean[count];
|
|
|
|
+ for(boolean done = false; !done;) {
|
|
|
|
+ done = true;
|
|
|
|
+ for (int i = 0; i < count; i++) {
|
|
|
|
+ if (checked[i]) {
|
|
|
|
+ continue;
|
|
|
|
+ } else {
|
|
|
|
+ done = false;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ final LongWritable value;
|
|
|
|
+ try {
|
|
|
|
+ value = returnFutures.get(i).get(timeout, unit);
|
|
|
|
+ } catch (TimeoutException e) {
|
|
|
|
+ LOG.info("call" + i + " caught ", e);
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ Assert.assertEquals("call" + i + " failed.",
|
|
|
|
+ expectedValues.get(i).longValue(), value.get());
|
|
|
|
+ checked[i] = true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ Assert.assertFalse(failed);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -183,8 +204,7 @@ public class TestAsyncIPC {
|
|
|
|
|
|
private void doCall(final int idx, final long param) throws IOException {
|
|
private void doCall(final int idx, final long param) throws IOException {
|
|
TestIPC.call(client, param, server, conf);
|
|
TestIPC.call(client, param, server, conf);
|
|
- Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
|
|
|
|
- returnFutures.put(idx, returnFuture);
|
|
|
|
|
|
+ returnFutures.put(idx, Client.getAsyncRpcResponse());
|
|
expectedValues.put(idx, param);
|
|
expectedValues.put(idx, param);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -233,10 +253,7 @@ public class TestAsyncIPC {
|
|
}
|
|
}
|
|
for (int i = 0; i < callerCount; i++) {
|
|
for (int i = 0; i < callerCount; i++) {
|
|
callers[i].join();
|
|
callers[i].join();
|
|
- callers[i].waitForReturnValues();
|
|
|
|
- String msg = String.format("Expected not failed for caller-%d: %s.", i,
|
|
|
|
- callers[i]);
|
|
|
|
- assertFalse(msg, callers[i].failed);
|
|
|
|
|
|
+ callers[i].assertReturnValues();
|
|
}
|
|
}
|
|
for (int i = 0; i < clientCount; i++) {
|
|
for (int i = 0; i < clientCount; i++) {
|
|
clients[i].stop();
|
|
clients[i].stop();
|
|
@@ -258,25 +275,37 @@ public class TestAsyncIPC {
|
|
try {
|
|
try {
|
|
AsyncCaller caller = new AsyncCaller(client, addr, callCount);
|
|
AsyncCaller caller = new AsyncCaller(client, addr, callCount);
|
|
caller.run();
|
|
caller.run();
|
|
|
|
+ caller.assertReturnValues();
|
|
|
|
+ caller.assertReturnValues();
|
|
|
|
+ caller.assertReturnValues();
|
|
|
|
+ Assert.assertEquals(asyncCallCount, client.getAsyncCallCount());
|
|
|
|
+ } finally {
|
|
|
|
+ client.stop();
|
|
|
|
+ server.stop();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
- caller.waitForReturnValues();
|
|
|
|
- String msg = String.format(
|
|
|
|
- "First time, expected not failed for caller: %s.", caller);
|
|
|
|
- assertFalse(msg, caller.failed);
|
|
|
|
|
|
+ @Test(timeout = 60000)
|
|
|
|
+ public void testFutureGetWithTimeout() throws IOException,
|
|
|
|
+ InterruptedException, ExecutionException {
|
|
|
|
+// GenericTestUtils.setLogLevel(AsyncGetFuture.LOG, Level.ALL);
|
|
|
|
+ final Server server = new TestIPC.TestServer(10, true, conf);
|
|
|
|
+ final InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
|
|
|
+ server.start();
|
|
|
|
|
|
- caller.waitForReturnValues();
|
|
|
|
- assertTrue(asyncCallCount == client.getAsyncCallCount());
|
|
|
|
- msg = String.format("Second time, expected not failed for caller: %s.",
|
|
|
|
- caller);
|
|
|
|
- assertFalse(msg, caller.failed);
|
|
|
|
|
|
+ final Client client = new Client(LongWritable.class, conf);
|
|
|
|
|
|
- assertTrue(asyncCallCount == client.getAsyncCallCount());
|
|
|
|
|
|
+ try {
|
|
|
|
+ final AsyncCaller caller = new AsyncCaller(client, addr, 10);
|
|
|
|
+ caller.run();
|
|
|
|
+ caller.assertReturnValues(10, TimeUnit.MILLISECONDS);
|
|
} finally {
|
|
} finally {
|
|
client.stop();
|
|
client.stop();
|
|
server.stop();
|
|
server.stop();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+
|
|
public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep,
|
|
public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep,
|
|
int clientCount, int callerCount, int callCount) throws IOException,
|
|
int clientCount, int callerCount, int callCount) throws IOException,
|
|
InterruptedException, ExecutionException {
|
|
InterruptedException, ExecutionException {
|
|
@@ -367,9 +396,7 @@ public class TestAsyncIPC {
|
|
server.start();
|
|
server.start();
|
|
final AsyncCaller caller = new AsyncCaller(client, addr, 4);
|
|
final AsyncCaller caller = new AsyncCaller(client, addr, 4);
|
|
caller.run();
|
|
caller.run();
|
|
- caller.waitForReturnValues();
|
|
|
|
- String msg = String.format("Expected not failed for caller: %s.", caller);
|
|
|
|
- assertFalse(msg, caller.failed);
|
|
|
|
|
|
+ caller.assertReturnValues();
|
|
} finally {
|
|
} finally {
|
|
client.stop();
|
|
client.stop();
|
|
server.stop();
|
|
server.stop();
|
|
@@ -406,9 +433,7 @@ public class TestAsyncIPC {
|
|
server.start();
|
|
server.start();
|
|
final AsyncCaller caller = new AsyncCaller(client, addr, 10);
|
|
final AsyncCaller caller = new AsyncCaller(client, addr, 10);
|
|
caller.run();
|
|
caller.run();
|
|
- caller.waitForReturnValues();
|
|
|
|
- String msg = String.format("Expected not failed for caller: %s.", caller);
|
|
|
|
- assertFalse(msg, caller.failed);
|
|
|
|
|
|
+ caller.assertReturnValues();
|
|
} finally {
|
|
} finally {
|
|
client.stop();
|
|
client.stop();
|
|
server.stop();
|
|
server.stop();
|
|
@@ -443,9 +468,7 @@ public class TestAsyncIPC {
|
|
server.start();
|
|
server.start();
|
|
final AsyncCaller caller = new AsyncCaller(client, addr, 10);
|
|
final AsyncCaller caller = new AsyncCaller(client, addr, 10);
|
|
caller.run();
|
|
caller.run();
|
|
- caller.waitForReturnValues();
|
|
|
|
- String msg = String.format("Expected not failed for caller: %s.", caller);
|
|
|
|
- assertFalse(msg, caller.failed);
|
|
|
|
|
|
+ caller.assertReturnValues();
|
|
} finally {
|
|
} finally {
|
|
client.stop();
|
|
client.stop();
|
|
server.stop();
|
|
server.stop();
|
|
@@ -489,10 +512,7 @@ public class TestAsyncIPC {
|
|
}
|
|
}
|
|
for (int i = 0; i < callerCount; ++i) {
|
|
for (int i = 0; i < callerCount; ++i) {
|
|
callers[i].join();
|
|
callers[i].join();
|
|
- callers[i].waitForReturnValues();
|
|
|
|
- String msg = String.format("Expected not failed for caller-%d: %s.", i,
|
|
|
|
- callers[i]);
|
|
|
|
- assertFalse(msg, callers[i].failed);
|
|
|
|
|
|
+ callers[i].assertReturnValues();
|
|
}
|
|
}
|
|
} finally {
|
|
} finally {
|
|
client.stop();
|
|
client.stop();
|