|
@@ -26,6 +26,9 @@ import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertNotSame;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
import static org.junit.Assert.fail;
|
|
|
+import static org.mockito.Mockito.spy;
|
|
|
+import static org.mockito.Mockito.timeout;
|
|
|
+import static org.mockito.Mockito.verify;
|
|
|
|
|
|
import java.io.Closeable;
|
|
|
import java.io.InterruptedIOException;
|
|
@@ -67,6 +70,7 @@ import org.apache.hadoop.io.retry.RetryPolicy;
|
|
|
import org.apache.hadoop.io.retry.RetryProxy;
|
|
|
import org.apache.hadoop.ipc.Client.ConnectionId;
|
|
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
|
|
|
+import org.apache.hadoop.ipc.Server.Call;
|
|
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.security.AccessControlException;
|
|
@@ -82,6 +86,8 @@ import org.apache.hadoop.test.MetricsAsserts;
|
|
|
import org.apache.hadoop.test.MockitoUtil;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
+import org.mockito.Mockito;
|
|
|
+import org.mockito.internal.util.reflection.Whitebox;
|
|
|
|
|
|
import com.google.protobuf.DescriptorProtos;
|
|
|
import com.google.protobuf.DescriptorProtos.EnumDescriptorProto;
|
|
@@ -1104,8 +1110,13 @@ public class TestRPC {
|
|
|
.setBindAddress(ADDRESS).setPort(0)
|
|
|
.setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true)
|
|
|
.build();
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ CallQueueManager<Call> spy = spy((CallQueueManager<Call>) Whitebox
|
|
|
+ .getInternalState(server, "callQueue"));
|
|
|
+ Whitebox.setInternalState(server, "callQueue", spy);
|
|
|
server.start();
|
|
|
|
|
|
+ Exception lastException = null;
|
|
|
final TestProtocol proxy =
|
|
|
RPC.getProxy(TestProtocol.class, TestProtocol.versionID,
|
|
|
NetUtils.getConnectAddress(server), conf);
|
|
@@ -1122,10 +1133,7 @@ public class TestRPC {
|
|
|
return null;
|
|
|
}
|
|
|
}));
|
|
|
- }
|
|
|
- while (server.getCallQueueLen() != 1
|
|
|
- && countThreads(CallQueueManager.class.getName()) != 1) {
|
|
|
- Thread.sleep(100);
|
|
|
+ verify(spy, timeout(500).times(i + 1)).offer(Mockito.<Call>anyObject());
|
|
|
}
|
|
|
try {
|
|
|
proxy.sleep(100);
|
|
@@ -1133,6 +1141,8 @@ public class TestRPC {
|
|
|
IOException unwrapExeption = e.unwrapRemoteException();
|
|
|
if (unwrapExeption instanceof RetriableException) {
|
|
|
succeeded = true;
|
|
|
+ } else {
|
|
|
+ lastException = unwrapExeption;
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
@@ -1140,6 +1150,9 @@ public class TestRPC {
|
|
|
RPC.stopProxy(proxy);
|
|
|
executorService.shutdown();
|
|
|
}
|
|
|
+ if (lastException != null) {
|
|
|
+ LOG.error("Last received non-RetriableException:", lastException);
|
|
|
+ }
|
|
|
assertTrue("RetriableException not received", succeeded);
|
|
|
}
|
|
|
|