|
@@ -22,19 +22,19 @@ import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertFalse;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
|
|
|
-import java.nio.ByteBuffer;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
import java.util.Random;
|
|
|
|
|
|
+import io.netty.buffer.ByteBuf;
|
|
|
+import io.netty.buffer.Unpooled;
|
|
|
+import io.netty.channel.ChannelException;
|
|
|
+import io.netty.channel.ChannelHandler;
|
|
|
+import io.netty.channel.ChannelHandlerContext;
|
|
|
import org.apache.hadoop.oncrpc.RpcUtil.RpcFrameDecoder;
|
|
|
import org.apache.hadoop.oncrpc.security.CredentialsNone;
|
|
|
import org.apache.hadoop.oncrpc.security.VerifierNone;
|
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
|
-import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer;
|
|
|
-import org.jboss.netty.buffer.ChannelBuffer;
|
|
|
-import org.jboss.netty.buffer.ChannelBuffers;
|
|
|
-import org.jboss.netty.channel.Channel;
|
|
|
-import org.jboss.netty.channel.ChannelException;
|
|
|
-import org.jboss.netty.channel.ChannelHandlerContext;
|
|
|
import org.junit.Test;
|
|
|
import org.mockito.Mockito;
|
|
|
import org.slf4j.event.Level;
|
|
@@ -55,6 +55,7 @@ public class TestFrameDecoder {
|
|
|
tcpClient.run();
|
|
|
}
|
|
|
|
|
|
+ @ChannelHandler.Sharable
|
|
|
static class TestRpcProgram extends RpcProgram {
|
|
|
|
|
|
protected TestRpcProgram(String program, String host, int port,
|
|
@@ -83,7 +84,7 @@ public class TestFrameDecoder {
|
|
|
new VerifierNone());
|
|
|
XDR out = new XDR();
|
|
|
reply.write(out);
|
|
|
- ChannelBuffer b = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
|
|
|
+ ByteBuf b = Unpooled.wrappedBuffer(out.asReadOnlyWrap().buffer());
|
|
|
RpcResponse rsp = new RpcResponse(b, info.remoteAddress());
|
|
|
RpcUtil.sendRpcResponse(ctx, rsp);
|
|
|
}
|
|
@@ -99,13 +100,14 @@ public class TestFrameDecoder {
|
|
|
RpcFrameDecoder decoder = new RpcFrameDecoder();
|
|
|
|
|
|
// Test "Length field is not received yet"
|
|
|
- ByteBuffer buffer = ByteBuffer.allocate(1);
|
|
|
- ChannelBuffer buf = new ByteBufferBackedChannelBuffer(buffer);
|
|
|
- ChannelBuffer channelBuffer = (ChannelBuffer) decoder.decode(
|
|
|
- Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class),
|
|
|
- buf);
|
|
|
- assertTrue(channelBuffer == null);
|
|
|
+ ByteBuf buf = Unpooled.directBuffer(1);
|
|
|
+ List<Object> outputBufs = new ArrayList<>();
|
|
|
+ decoder.decode(
|
|
|
+ Mockito.mock(ChannelHandlerContext.class), buf,
|
|
|
+ outputBufs);
|
|
|
+ assertTrue(outputBufs.isEmpty());
|
|
|
|
|
|
+ decoder = new RpcFrameDecoder();
|
|
|
// Test all bytes are not received yet
|
|
|
byte[] fragment = new byte[4 + 9];
|
|
|
fragment[0] = (byte) (1 << 7); // final fragment
|
|
@@ -114,15 +116,16 @@ public class TestFrameDecoder {
|
|
|
fragment[3] = (byte) 10; // fragment size = 10 bytes
|
|
|
assertTrue(XDR.isLastFragment(fragment));
|
|
|
assertTrue(XDR.fragmentSize(fragment)==10);
|
|
|
+ buf.release();
|
|
|
|
|
|
- buffer = ByteBuffer.allocate(4 + 9);
|
|
|
- buffer.put(fragment);
|
|
|
- buffer.flip();
|
|
|
- buf = new ByteBufferBackedChannelBuffer(buffer);
|
|
|
- channelBuffer = (ChannelBuffer) decoder.decode(
|
|
|
- Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class),
|
|
|
- buf);
|
|
|
- assertTrue(channelBuffer == null);
|
|
|
+ buf = Unpooled.directBuffer(4 + 9);
|
|
|
+ buf.writeBytes(fragment);
|
|
|
+ outputBufs = new ArrayList<>();
|
|
|
+ decoder.decode(
|
|
|
+ Mockito.mock(ChannelHandlerContext.class), buf,
|
|
|
+ outputBufs);
|
|
|
+ assertTrue(decoder.isLast());
|
|
|
+ buf.release();
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -137,16 +140,15 @@ public class TestFrameDecoder {
|
|
|
fragment1[3] = (byte) 10; // fragment size = 10 bytes
|
|
|
assertFalse(XDR.isLastFragment(fragment1));
|
|
|
assertTrue(XDR.fragmentSize(fragment1)==10);
|
|
|
+
|
|
|
+ List<Object> outputBufs = new ArrayList<>();
|
|
|
|
|
|
// decoder should wait for the final fragment
|
|
|
- ByteBuffer buffer = ByteBuffer.allocate(4 + 10);
|
|
|
- buffer.put(fragment1);
|
|
|
- buffer.flip();
|
|
|
- ChannelBuffer buf = new ByteBufferBackedChannelBuffer(buffer);
|
|
|
- ChannelBuffer channelBuffer = (ChannelBuffer) decoder.decode(
|
|
|
- Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class),
|
|
|
- buf);
|
|
|
- assertTrue(channelBuffer == null);
|
|
|
+ ByteBuf buf = Unpooled.directBuffer(4 + 10, 4 + 10);
|
|
|
+ buf.writeBytes(fragment1);
|
|
|
+ decoder.decode(
|
|
|
+ Mockito.mock(ChannelHandlerContext.class), buf,
|
|
|
+ outputBufs);
|
|
|
|
|
|
byte[] fragment2 = new byte[4 + 10];
|
|
|
fragment2[0] = (byte) (1 << 7); // final fragment
|
|
@@ -155,21 +157,22 @@ public class TestFrameDecoder {
|
|
|
fragment2[3] = (byte) 10; // fragment size = 10 bytes
|
|
|
assertTrue(XDR.isLastFragment(fragment2));
|
|
|
assertTrue(XDR.fragmentSize(fragment2)==10);
|
|
|
+ buf.release();
|
|
|
|
|
|
- buffer = ByteBuffer.allocate(4 + 10);
|
|
|
- buffer.put(fragment2);
|
|
|
- buffer.flip();
|
|
|
- buf = new ByteBufferBackedChannelBuffer(buffer);
|
|
|
- channelBuffer = (ChannelBuffer) decoder.decode(
|
|
|
- Mockito.mock(ChannelHandlerContext.class), Mockito.mock(Channel.class),
|
|
|
- buf);
|
|
|
- assertTrue(channelBuffer != null);
|
|
|
- // Complete frame should have to total size 10+10=20
|
|
|
- assertEquals(20, channelBuffer.readableBytes());
|
|
|
+ buf = Unpooled.directBuffer(4 + 10, 4 + 10);
|
|
|
+ buf.writeBytes(fragment2);
|
|
|
+ decoder.decode(
|
|
|
+ Mockito.mock(ChannelHandlerContext.class), buf,
|
|
|
+ outputBufs);
|
|
|
+ // Expect two completed frames each 10 bytes
|
|
|
+ decoder.isLast();
|
|
|
+ assertEquals(2, outputBufs.size());
|
|
|
+ outputBufs.forEach(b -> assertEquals(((ByteBuf)b).readableBytes(), 10));
|
|
|
+ buf.release();
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
- public void testFrames() {
|
|
|
+ public void testFrames() throws InterruptedException {
|
|
|
int serverPort = startRpcServer(true);
|
|
|
|
|
|
XDR xdrOut = createGetportMount();
|
|
@@ -187,7 +190,7 @@ public class TestFrameDecoder {
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
- public void testUnprivilegedPort() {
|
|
|
+ public void testUnprivilegedPort() throws InterruptedException {
|
|
|
// Don't allow connections from unprivileged ports. Given that this test is
|
|
|
// presumably not being run by root, this will be the case.
|
|
|
int serverPort = startRpcServer(false);
|
|
@@ -218,23 +221,28 @@ public class TestFrameDecoder {
|
|
|
assertEquals(requestSize, resultSize);
|
|
|
}
|
|
|
|
|
|
- private static int startRpcServer(boolean allowInsecurePorts) {
|
|
|
+ private static int startRpcServer(boolean allowInsecurePorts)
|
|
|
+ throws InterruptedException {
|
|
|
Random rand = new Random();
|
|
|
int serverPort = 30000 + rand.nextInt(10000);
|
|
|
int retries = 10; // A few retries in case initial choice is in use.
|
|
|
|
|
|
while (true) {
|
|
|
+ SimpleTcpServer tcpServer = null;
|
|
|
try {
|
|
|
RpcProgram program = new TestFrameDecoder.TestRpcProgram("TestRpcProgram",
|
|
|
"localhost", serverPort, 100000, 1, 2, allowInsecurePorts);
|
|
|
- SimpleTcpServer tcpServer = new SimpleTcpServer(serverPort, program, 1);
|
|
|
+ tcpServer = new SimpleTcpServer(serverPort, program, 1);
|
|
|
tcpServer.run();
|
|
|
break; // Successfully bound a port, break out.
|
|
|
- } catch (ChannelException ce) {
|
|
|
+ } catch (InterruptedException | ChannelException e) {
|
|
|
+ if (tcpServer != null) {
|
|
|
+ tcpServer.shutdown();
|
|
|
+ }
|
|
|
if (retries-- > 0) {
|
|
|
serverPort += rand.nextInt(20); // Port in use? Try another.
|
|
|
} else {
|
|
|
- throw ce; // Out of retries.
|
|
|
+ throw e; // Out of retries.
|
|
|
}
|
|
|
}
|
|
|
}
|