|
@@ -18,9 +18,12 @@
|
|
|
package org.apache.hadoop.hdfs.web.http2;
|
|
|
|
|
|
import io.netty.buffer.ByteBuf;
|
|
|
+import io.netty.buffer.Unpooled;
|
|
|
import io.netty.channel.AbstractChannel;
|
|
|
import io.netty.channel.Channel;
|
|
|
import io.netty.channel.ChannelConfig;
|
|
|
+import io.netty.channel.ChannelFuture;
|
|
|
+import io.netty.channel.ChannelFutureListener;
|
|
|
import io.netty.channel.ChannelHandlerContext;
|
|
|
import io.netty.channel.ChannelMetadata;
|
|
|
import io.netty.channel.ChannelOutboundBuffer;
|
|
@@ -32,8 +35,11 @@ import io.netty.handler.codec.UnsupportedMessageTypeException;
|
|
|
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
|
|
|
import io.netty.handler.codec.http2.Http2ConnectionHandler;
|
|
|
import io.netty.handler.codec.http2.Http2Error;
|
|
|
+import io.netty.handler.codec.http2.Http2Exception;
|
|
|
import io.netty.handler.codec.http2.Http2Headers;
|
|
|
+import io.netty.handler.codec.http2.Http2LocalFlowController;
|
|
|
import io.netty.handler.codec.http2.Http2Stream;
|
|
|
+import io.netty.util.ReferenceCountUtil;
|
|
|
import io.netty.util.internal.InternalThreadLocalMap;
|
|
|
|
|
|
import java.net.SocketAddress;
|
|
@@ -64,10 +70,33 @@ public class Http2StreamChannel extends AbstractChannel {
|
|
|
private static final int MAX_READER_STACK_DEPTH = 8;
|
|
|
|
|
|
private final ChannelHandlerContext http2ConnHandlerCtx;
|
|
|
+
|
|
|
private final Http2Stream stream;
|
|
|
+
|
|
|
+ private final Http2LocalFlowController localFlowController;
|
|
|
+
|
|
|
private final Http2ConnectionEncoder encoder;
|
|
|
+
|
|
|
private final DefaultChannelConfig config;
|
|
|
- private final Queue<Object> inboundMessageQueue = new ArrayDeque<>();
|
|
|
+
|
|
|
+ private static final class InboundMessage {
|
|
|
+
|
|
|
+ public final Object msg;
|
|
|
+
|
|
|
+ public final int length;
|
|
|
+
|
|
|
+ public InboundMessage(Object msg, int length) {
|
|
|
+ this.msg = msg;
|
|
|
+ this.length = length;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private final Queue<InboundMessage> inboundMessageQueue = new ArrayDeque<>();
|
|
|
+
|
|
|
+ private boolean writePending = false;
|
|
|
+
|
|
|
+ private int pendingOutboundBytes;
|
|
|
|
|
|
private enum State {
|
|
|
OPEN, HALF_CLOSED_LOCAL, HALF_CLOSED_REMOTE, PRE_CLOSED, CLOSED
|
|
@@ -82,10 +111,16 @@ public class Http2StreamChannel extends AbstractChannel {
|
|
|
Http2ConnectionHandler connHandler =
|
|
|
(Http2ConnectionHandler) http2ConnHandlerCtx.handler();
|
|
|
this.stream = stream;
|
|
|
+ this.localFlowController =
|
|
|
+ connHandler.connection().local().flowController();
|
|
|
this.encoder = connHandler.encoder();
|
|
|
this.config = new DefaultChannelConfig(this);
|
|
|
}
|
|
|
|
|
|
+ public Http2Stream stream() {
|
|
|
+ return stream;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public ChannelConfig config() {
|
|
|
return config;
|
|
@@ -98,8 +133,6 @@ public class Http2StreamChannel extends AbstractChannel {
|
|
|
|
|
|
@Override
|
|
|
public boolean isActive() {
|
|
|
- // we create this channel after HTTP/2 stream active, so we do not have a
|
|
|
- // separated 'active' state.
|
|
|
return isOpen();
|
|
|
}
|
|
|
|
|
@@ -115,6 +148,10 @@ public class Http2StreamChannel extends AbstractChannel {
|
|
|
SocketAddress localAddress, ChannelPromise promise) {
|
|
|
throw new UnsupportedOperationException();
|
|
|
}
|
|
|
+
|
|
|
+ public void forceFlush() {
|
|
|
+ super.flush0();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -149,7 +186,11 @@ public class Http2StreamChannel extends AbstractChannel {
|
|
|
|
|
|
@Override
|
|
|
protected void doClose() throws Exception {
|
|
|
- if (stream.state() != Http2Stream.State.CLOSED) {
|
|
|
+ for (InboundMessage msg; (msg = inboundMessageQueue.poll()) != null;) {
|
|
|
+ ReferenceCountUtil.release(msg.msg);
|
|
|
+ localFlowController.consumeBytes(stream, msg.length);
|
|
|
+ }
|
|
|
+ if (state != State.PRE_CLOSED) {
|
|
|
encoder.writeRstStream(http2ConnHandlerCtx, stream.id(),
|
|
|
Http2Error.INTERNAL_ERROR.code(), http2ConnHandlerCtx.newPromise());
|
|
|
}
|
|
@@ -157,36 +198,44 @@ public class Http2StreamChannel extends AbstractChannel {
|
|
|
}
|
|
|
|
|
|
private final Runnable readTask = new Runnable() {
|
|
|
-
|
|
|
@Override
|
|
|
public void run() {
|
|
|
ChannelPipeline pipeline = pipeline();
|
|
|
- int maxMessagesPerRead = config().getMaxMessagesPerRead();
|
|
|
- for (int i = 0; i < maxMessagesPerRead; i++) {
|
|
|
- Object m = inboundMessageQueue.poll();
|
|
|
- if (m == null) {
|
|
|
- break;
|
|
|
- }
|
|
|
- if (m == LastHttp2Message.get()) {
|
|
|
+ for (InboundMessage m; (m = inboundMessageQueue.poll()) != null;) {
|
|
|
+ if (m.msg == LastHttp2Message.get()) {
|
|
|
state =
|
|
|
state == State.HALF_CLOSED_LOCAL ? State.PRE_CLOSED
|
|
|
: State.HALF_CLOSED_REMOTE;
|
|
|
}
|
|
|
- pipeline.fireChannelRead(m);
|
|
|
+ try {
|
|
|
+ if (m.length > 0
|
|
|
+ && localFlowController.consumeBytes(stream, m.length)) {
|
|
|
+ http2ConnHandlerCtx.channel().flush();
|
|
|
+ }
|
|
|
+ } catch (Http2Exception e) {
|
|
|
+ // an Http2Exception at least means the stream is broken(maybe the
|
|
|
+ // whole connection), so we are out.
|
|
|
+ http2ConnHandlerCtx.pipeline().fireExceptionCaught(e);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ pipeline.fireChannelRead(m.msg);
|
|
|
}
|
|
|
pipeline.fireChannelReadComplete();
|
|
|
+ if (state == State.PRE_CLOSED) {
|
|
|
+ close();
|
|
|
+ }
|
|
|
}
|
|
|
};
|
|
|
|
|
|
@Override
|
|
|
protected void doBeginRead() throws Exception {
|
|
|
- State currentState = this.state;
|
|
|
- if (currentState == State.CLOSED) {
|
|
|
- throw new ClosedChannelException();
|
|
|
- }
|
|
|
if (inboundMessageQueue.isEmpty()) {
|
|
|
return;
|
|
|
}
|
|
|
+ State currentState = this.state;
|
|
|
+ if (remoteSideClosed(currentState)) {
|
|
|
+ throw new ClosedChannelException();
|
|
|
+ }
|
|
|
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
|
|
|
final Integer stackDepth = threadLocals.localChannelReaderStackDepth();
|
|
|
if (stackDepth < MAX_READER_STACK_DEPTH) {
|
|
@@ -201,14 +250,25 @@ public class Http2StreamChannel extends AbstractChannel {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void resumeWrite() {
|
|
|
+ writePending = false;
|
|
|
+ ((Http2Unsafe) unsafe()).forceFlush();
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
|
|
|
State currentState = this.state;
|
|
|
- if (currentState == State.CLOSED) {
|
|
|
+ if (localSideClosed(currentState)) {
|
|
|
throw new ClosedChannelException();
|
|
|
}
|
|
|
+ int writeBufferHighWaterMark = config().getWriteBufferHighWaterMark();
|
|
|
+
|
|
|
boolean flush = false;
|
|
|
for (;;) {
|
|
|
+ if (pendingOutboundBytes >= writeBufferHighWaterMark) {
|
|
|
+ writePending = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
Object msg = in.current();
|
|
|
if (msg == null) {
|
|
|
break;
|
|
@@ -217,15 +277,30 @@ public class Http2StreamChannel extends AbstractChannel {
|
|
|
this.state =
|
|
|
currentState == State.HALF_CLOSED_REMOTE ? State.PRE_CLOSED
|
|
|
: State.HALF_CLOSED_LOCAL;
|
|
|
- encoder.writeData(http2ConnHandlerCtx, stream.id(), http2ConnHandlerCtx
|
|
|
- .alloc().buffer(0), 0, true, http2ConnHandlerCtx.newPromise());
|
|
|
+ encoder.writeData(http2ConnHandlerCtx, stream.id(),
|
|
|
+ Unpooled.EMPTY_BUFFER, 0, true, http2ConnHandlerCtx.newPromise());
|
|
|
} else if (msg instanceof Http2Headers) {
|
|
|
encoder.writeHeaders(http2ConnHandlerCtx, stream.id(),
|
|
|
(Http2Headers) msg, 0, false, http2ConnHandlerCtx.newPromise());
|
|
|
} else if (msg instanceof ByteBuf) {
|
|
|
ByteBuf data = (ByteBuf) msg;
|
|
|
+ final int pendingBytes = data.readableBytes();
|
|
|
+ pendingOutboundBytes += pendingBytes;
|
|
|
encoder.writeData(http2ConnHandlerCtx, stream.id(), data.retain(), 0,
|
|
|
- false, http2ConnHandlerCtx.newPromise());
|
|
|
+ false, http2ConnHandlerCtx.newPromise()).addListener(
|
|
|
+ new ChannelFutureListener() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void operationComplete(ChannelFuture future)
|
|
|
+ throws Exception {
|
|
|
+ pendingOutboundBytes -= pendingBytes;
|
|
|
+ if (writePending
|
|
|
+ && pendingOutboundBytes <= config()
|
|
|
+ .getWriteBufferLowWaterMark()) {
|
|
|
+ resumeWrite();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
} else {
|
|
|
throw new UnsupportedMessageTypeException(msg, Http2Headers.class,
|
|
|
ByteBuf.class);
|
|
@@ -236,33 +311,53 @@ public class Http2StreamChannel extends AbstractChannel {
|
|
|
if (flush) {
|
|
|
http2ConnHandlerCtx.channel().flush();
|
|
|
}
|
|
|
+ if (state == State.PRE_CLOSED) {
|
|
|
+ close();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Append a message to the inbound queue of this channel. You need to call
|
|
|
- * {@link #read()} if you want to pass the message to handlers.
|
|
|
- */
|
|
|
- void writeInbound(Object msg) {
|
|
|
- inboundMessageQueue.add(msg);
|
|
|
+ public void writeInbound(Object msg, int length) {
|
|
|
+ inboundMessageQueue.add(new InboundMessage(msg, length));
|
|
|
}
|
|
|
|
|
|
private static final ImmutableSet<State> REMOTE_SIDE_CLOSED_STATES =
|
|
|
ImmutableSet.of(State.HALF_CLOSED_REMOTE, State.PRE_CLOSED, State.CLOSED);
|
|
|
|
|
|
- /**
|
|
|
- * @return true if remote side finishes sending data to us.
|
|
|
- */
|
|
|
public boolean remoteSideClosed() {
|
|
|
+ return remoteSideClosed(state);
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean remoteSideClosed(State state) {
|
|
|
return REMOTE_SIDE_CLOSED_STATES.contains(state);
|
|
|
}
|
|
|
|
|
|
private static final ImmutableSet<State> LOCAL_SIDE_CLOSED_STATES =
|
|
|
ImmutableSet.of(State.HALF_CLOSED_LOCAL, State.PRE_CLOSED, State.CLOSED);
|
|
|
|
|
|
- /**
|
|
|
- * @return true if we finish sending data to remote side.
|
|
|
- */
|
|
|
public boolean localSideClosed() {
|
|
|
+ return localSideClosed(state);
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean localSideClosed(State state) {
|
|
|
return LOCAL_SIDE_CLOSED_STATES.contains(state);
|
|
|
}
|
|
|
+
|
|
|
+ public void setClosed() {
|
|
|
+ State currentState = this.state;
|
|
|
+ if (!remoteSideClosed(currentState)) {
|
|
|
+ writeInbound(LastHttp2Message.get(), 0);
|
|
|
+ if (config().isAutoRead()) {
|
|
|
+ read();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ currentState = this.state;
|
|
|
+ if (!localSideClosed(currentState)) {
|
|
|
+ this.state =
|
|
|
+ currentState == State.HALF_CLOSED_REMOTE ? State.PRE_CLOSED
|
|
|
+ : State.HALF_CLOSED_LOCAL;
|
|
|
+ if (currentState == State.PRE_CLOSED) {
|
|
|
+ close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|