|
@@ -17,15 +17,9 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.ipc;
|
|
|
|
|
|
-import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
|
|
-import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
|
|
|
-import static org.junit.Assert.assertEquals;
|
|
|
-import static org.junit.Assert.assertTrue;
|
|
|
-
|
|
|
-import java.io.IOException;
|
|
|
-import java.net.InetSocketAddress;
|
|
|
-import java.net.URISyntaxException;
|
|
|
-
|
|
|
+import com.google.protobuf.BlockingService;
|
|
|
+import com.google.protobuf.RpcController;
|
|
|
+import com.google.protobuf.ServiceException;
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
@@ -36,76 +30,37 @@ import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
|
|
|
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
|
|
|
import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto;
|
|
|
import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto;
|
|
|
-import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
|
|
|
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpc2Proto;
|
|
|
+import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
|
|
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
+import org.junit.After;
|
|
|
import org.junit.Assert;
|
|
|
-import org.junit.Test;
|
|
|
import org.junit.Before;
|
|
|
-import org.junit.After;
|
|
|
+import org.junit.Test;
|
|
|
|
|
|
+import java.io.IOException;
|
|
|
+import java.net.URISyntaxException;
|
|
|
|
|
|
-import com.google.protobuf.BlockingService;
|
|
|
-import com.google.protobuf.RpcController;
|
|
|
-import com.google.protobuf.ServiceException;
|
|
|
+import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
|
|
|
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
|
+import static org.junit.Assert.assertTrue;
|
|
|
|
|
|
/**
|
|
|
* Test for testing protocol buffer based RPC mechanism.
|
|
|
* This test depends on test.proto definition of types in src/test/proto
|
|
|
* and protobuf service definition from src/test/test_rpc_service.proto
|
|
|
*/
|
|
|
-public class TestProtoBufRpc {
|
|
|
- public final static String ADDRESS = "0.0.0.0";
|
|
|
- public final static int PORT = 0;
|
|
|
- private static InetSocketAddress addr;
|
|
|
- private static Configuration conf;
|
|
|
+public class TestProtoBufRpc extends TestRpcBase {
|
|
|
private static RPC.Server server;
|
|
|
private final static int SLEEP_DURATION = 1000;
|
|
|
|
|
|
- @ProtocolInfo(protocolName = "testProto", protocolVersion = 1)
|
|
|
- public interface TestRpcService
|
|
|
- extends TestProtobufRpcProto.BlockingInterface {
|
|
|
- }
|
|
|
-
|
|
|
@ProtocolInfo(protocolName = "testProto2", protocolVersion = 1)
|
|
|
public interface TestRpcService2 extends
|
|
|
TestProtobufRpc2Proto.BlockingInterface {
|
|
|
}
|
|
|
|
|
|
- public static class PBServerImpl implements TestRpcService {
|
|
|
-
|
|
|
- @Override
|
|
|
- public EmptyResponseProto ping(RpcController unused,
|
|
|
- EmptyRequestProto request) throws ServiceException {
|
|
|
- // Ensure clientId is received
|
|
|
- byte[] clientId = Server.getClientId();
|
|
|
- Assert.assertNotNull(Server.getClientId());
|
|
|
- Assert.assertEquals(16, clientId.length);
|
|
|
- return EmptyResponseProto.newBuilder().build();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public EchoResponseProto echo(RpcController unused, EchoRequestProto request)
|
|
|
- throws ServiceException {
|
|
|
- return EchoResponseProto.newBuilder().setMessage(request.getMessage())
|
|
|
- .build();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public EmptyResponseProto error(RpcController unused,
|
|
|
- EmptyRequestProto request) throws ServiceException {
|
|
|
- throw new ServiceException("error", new RpcServerException("error"));
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public EmptyResponseProto error2(RpcController unused,
|
|
|
- EmptyRequestProto request) throws ServiceException {
|
|
|
- throw new ServiceException("error", new URISyntaxException("",
|
|
|
- "testException"));
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
public static class PBServer2Impl implements TestRpcService2 {
|
|
|
|
|
|
@Override
|
|
@@ -133,12 +88,13 @@ public class TestProtoBufRpc {
|
|
|
}
|
|
|
|
|
|
@Before
|
|
|
- public void setUp() throws IOException { // Setup server for both protocols
|
|
|
+ public void setUp() throws IOException { // Setup server for both protocols
|
|
|
conf = new Configuration();
|
|
|
conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, 1024);
|
|
|
conf.setBoolean(CommonConfigurationKeys.IPC_SERVER_LOG_SLOW_RPC, true);
|
|
|
// Set RPC engine to protobuf RPC engine
|
|
|
RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
|
|
|
+ RPC.setProtocolEngine(conf, TestRpcService2.class, ProtobufRpcEngine.class);
|
|
|
|
|
|
// Create server side implementation
|
|
|
PBServerImpl serverImpl = new PBServerImpl();
|
|
@@ -149,12 +105,12 @@ public class TestProtoBufRpc {
|
|
|
server = new RPC.Builder(conf).setProtocol(TestRpcService.class)
|
|
|
.setInstance(service).setBindAddress(ADDRESS).setPort(PORT).build();
|
|
|
addr = NetUtils.getConnectAddress(server);
|
|
|
-
|
|
|
+
|
|
|
// now the second protocol
|
|
|
PBServer2Impl server2Impl = new PBServer2Impl();
|
|
|
BlockingService service2 = TestProtobufRpc2Proto
|
|
|
.newReflectiveBlockingService(server2Impl);
|
|
|
-
|
|
|
+
|
|
|
server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService2.class,
|
|
|
service2);
|
|
|
server.start();
|
|
@@ -166,31 +122,20 @@ public class TestProtoBufRpc {
|
|
|
server.stop();
|
|
|
}
|
|
|
|
|
|
- private static TestRpcService getClient() throws IOException {
|
|
|
- // Set RPC engine to protobuf RPC engine
|
|
|
- RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
|
|
|
- return RPC.getProxy(TestRpcService.class, 0, addr, conf);
|
|
|
- }
|
|
|
-
|
|
|
- private static TestRpcService2 getClient2() throws IOException {
|
|
|
- // Set RPC engine to protobuf RPC engine
|
|
|
- RPC.setProtocolEngine(conf, TestRpcService2.class,
|
|
|
- ProtobufRpcEngine.class);
|
|
|
- return RPC.getProxy(TestRpcService2.class, 0, addr,
|
|
|
- conf);
|
|
|
+ private TestRpcService2 getClient2() throws IOException {
|
|
|
+ return RPC.getProxy(TestRpcService2.class, 0, addr, conf);
|
|
|
}
|
|
|
|
|
|
@Test (timeout=5000)
|
|
|
public void testProtoBufRpc() throws Exception {
|
|
|
- TestRpcService client = getClient();
|
|
|
+ TestRpcService client = getClient(addr, conf);
|
|
|
testProtoBufRpc(client);
|
|
|
}
|
|
|
|
|
|
// separated test out so that other tests can call it.
|
|
|
public static void testProtoBufRpc(TestRpcService client) throws Exception {
|
|
|
// Test ping method
|
|
|
- EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
|
|
|
- client.ping(null, emptyRequest);
|
|
|
+ client.ping(null, newEmptyRequest());
|
|
|
|
|
|
// Test echo method
|
|
|
EchoRequestProto echoRequest = EchoRequestProto.newBuilder()
|
|
@@ -200,7 +145,7 @@ public class TestProtoBufRpc {
|
|
|
|
|
|
// Test error method - error should be thrown as RemoteException
|
|
|
try {
|
|
|
- client.error(null, emptyRequest);
|
|
|
+ client.error(null, newEmptyRequest());
|
|
|
Assert.fail("Expected exception is not thrown");
|
|
|
} catch (ServiceException e) {
|
|
|
RemoteException re = (RemoteException)e.getCause();
|
|
@@ -217,13 +162,11 @@ public class TestProtoBufRpc {
|
|
|
TestRpcService2 client = getClient2();
|
|
|
|
|
|
// Test ping method
|
|
|
- EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
|
|
|
- client.ping2(null, emptyRequest);
|
|
|
+ client.ping2(null, newEmptyRequest());
|
|
|
|
|
|
// Test echo method
|
|
|
- EchoRequestProto echoRequest = EchoRequestProto.newBuilder()
|
|
|
- .setMessage("hello").build();
|
|
|
- EchoResponseProto echoResponse = client.echo2(null, echoRequest);
|
|
|
+ EchoResponseProto echoResponse = client.echo2(null,
|
|
|
+ newEchoRequest("hello"));
|
|
|
Assert.assertEquals(echoResponse.getMessage(), "hello");
|
|
|
|
|
|
// Ensure RPC metrics are updated
|
|
@@ -238,11 +181,10 @@ public class TestProtoBufRpc {
|
|
|
|
|
|
@Test (timeout=5000)
|
|
|
public void testProtoBufRandomException() throws Exception {
|
|
|
- TestRpcService client = getClient();
|
|
|
- EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
|
|
|
+ TestRpcService client = getClient(addr, conf);
|
|
|
|
|
|
try {
|
|
|
- client.error2(null, emptyRequest);
|
|
|
+ client.error2(null, newEmptyRequest());
|
|
|
} catch (ServiceException se) {
|
|
|
Assert.assertTrue(se.getCause() instanceof RemoteException);
|
|
|
RemoteException re = (RemoteException) se.getCause();
|
|
@@ -258,17 +200,14 @@ public class TestProtoBufRpc {
|
|
|
public void testExtraLongRpc() throws Exception {
|
|
|
TestRpcService2 client = getClient2();
|
|
|
final String shortString = StringUtils.repeat("X", 4);
|
|
|
- EchoRequestProto echoRequest = EchoRequestProto.newBuilder()
|
|
|
- .setMessage(shortString).build();
|
|
|
// short message goes through
|
|
|
- EchoResponseProto echoResponse = client.echo2(null, echoRequest);
|
|
|
+ EchoResponseProto echoResponse = client.echo2(null,
|
|
|
+ newEchoRequest(shortString));
|
|
|
Assert.assertEquals(shortString, echoResponse.getMessage());
|
|
|
|
|
|
final String longString = StringUtils.repeat("X", 4096);
|
|
|
- echoRequest = EchoRequestProto.newBuilder()
|
|
|
- .setMessage(longString).build();
|
|
|
try {
|
|
|
- echoResponse = client.echo2(null, echoRequest);
|
|
|
+ client.echo2(null, newEchoRequest(longString));
|
|
|
Assert.fail("expected extra-long RPC to fail");
|
|
|
} catch (ServiceException se) {
|
|
|
// expected
|
|
@@ -281,8 +220,7 @@ public class TestProtoBufRpc {
|
|
|
// make 10 K fast calls
|
|
|
for (int x = 0; x < 10000; x++) {
|
|
|
try {
|
|
|
- EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
|
|
|
- client.ping2(null, emptyRequest);
|
|
|
+ client.ping2(null, newEmptyRequest());
|
|
|
} catch (Exception ex) {
|
|
|
throw ex;
|
|
|
}
|
|
@@ -294,10 +232,7 @@ public class TestProtoBufRpc {
|
|
|
long before = rpcMetrics.getRpcSlowCalls();
|
|
|
|
|
|
// make a really slow call. Sleep sleeps for 1000ms
|
|
|
- TestProtos.SleepRequestProto sleepRequest =
|
|
|
- TestProtos.SleepRequestProto.newBuilder()
|
|
|
- .setMilliSeconds(SLEEP_DURATION * 3).build();
|
|
|
- TestProtos.SleepResponseProto Response = client.sleep(null, sleepRequest);
|
|
|
+ client.sleep(null, newSleepRequest(SLEEP_DURATION * 3));
|
|
|
|
|
|
long after = rpcMetrics.getRpcSlowCalls();
|
|
|
// Ensure slow call is logged.
|
|
@@ -312,8 +247,7 @@ public class TestProtoBufRpc {
|
|
|
|
|
|
// make 10 K fast calls
|
|
|
for (int x = 0; x < 10000; x++) {
|
|
|
- EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build();
|
|
|
- client.ping2(null, emptyRequest);
|
|
|
+ client.ping2(null, newEmptyRequest());
|
|
|
}
|
|
|
|
|
|
// Ensure RPC metrics are updated
|
|
@@ -322,10 +256,7 @@ public class TestProtoBufRpc {
|
|
|
long before = rpcMetrics.getRpcSlowCalls();
|
|
|
|
|
|
// make a really slow call. Sleep sleeps for 1000ms
|
|
|
- TestProtos.SleepRequestProto sleepRequest =
|
|
|
- TestProtos.SleepRequestProto.newBuilder()
|
|
|
- .setMilliSeconds(SLEEP_DURATION).build();
|
|
|
- TestProtos.SleepResponseProto Response = client.sleep(null, sleepRequest);
|
|
|
+ client.sleep(null, newSleepRequest(SLEEP_DURATION));
|
|
|
|
|
|
long after = rpcMetrics.getRpcSlowCalls();
|
|
|
|