|
@@ -18,8 +18,6 @@
|
|
|
package org.apache.hadoop.portmap;
|
|
|
|
|
|
import java.util.HashMap;
|
|
|
-import java.util.Map.Entry;
|
|
|
-import java.util.Set;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -34,31 +32,34 @@ import org.apache.hadoop.oncrpc.security.VerifierNone;
|
|
|
import org.jboss.netty.buffer.ChannelBuffer;
|
|
|
import org.jboss.netty.buffer.ChannelBuffers;
|
|
|
import org.jboss.netty.channel.ChannelHandlerContext;
|
|
|
+import org.jboss.netty.channel.ChannelStateEvent;
|
|
|
+import org.jboss.netty.channel.ExceptionEvent;
|
|
|
+import org.jboss.netty.channel.MessageEvent;
|
|
|
+import org.jboss.netty.channel.group.ChannelGroup;
|
|
|
+import org.jboss.netty.handler.timeout.IdleState;
|
|
|
+import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
|
|
|
+import org.jboss.netty.handler.timeout.IdleStateEvent;
|
|
|
|
|
|
-/**
|
|
|
- * An rpcbind request handler.
|
|
|
- */
|
|
|
-public class RpcProgramPortmap extends RpcProgram implements PortmapInterface {
|
|
|
- public static final int PROGRAM = 100000;
|
|
|
- public static final int VERSION = 2;
|
|
|
-
|
|
|
+final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler implements PortmapInterface {
|
|
|
+ static final int PROGRAM = 100000;
|
|
|
+ static final int VERSION = 2;
|
|
|
private static final Log LOG = LogFactory.getLog(RpcProgramPortmap.class);
|
|
|
|
|
|
/** Map synchronized usis monitor lock of this instance */
|
|
|
private final HashMap<String, PortmapMapping> map;
|
|
|
|
|
|
- public RpcProgramPortmap() {
|
|
|
- super("portmap", "localhost", RPCB_PORT, PROGRAM, VERSION, VERSION);
|
|
|
- map = new HashMap<String, PortmapMapping>(256);
|
|
|
- }
|
|
|
+ /** ChannelGroup that remembers all active channels for gracefully shutdown. */
|
|
|
+ private final ChannelGroup allChannels;
|
|
|
|
|
|
- /** Dump all the register RPC services */
|
|
|
- private synchronized void dumpRpcServices() {
|
|
|
- Set<Entry<String, PortmapMapping>> entrySet = map.entrySet();
|
|
|
- for (Entry<String, PortmapMapping> entry : entrySet) {
|
|
|
- LOG.info("Service: " + entry.getKey() + " portmapping: "
|
|
|
- + entry.getValue());
|
|
|
- }
|
|
|
+ RpcProgramPortmap(ChannelGroup allChannels) {
|
|
|
+ this.allChannels = allChannels;
|
|
|
+ map = new HashMap<String, PortmapMapping>(256);
|
|
|
+ PortmapMapping m = new PortmapMapping(PROGRAM, VERSION,
|
|
|
+ PortmapMapping.TRANSPORT_TCP, RpcProgram.RPCB_PORT);
|
|
|
+ PortmapMapping m1 = new PortmapMapping(PROGRAM, VERSION,
|
|
|
+ PortmapMapping.TRANSPORT_UDP, RpcProgram.RPCB_PORT);
|
|
|
+ map.put(PortmapMapping.key(m), m);
|
|
|
+ map.put(PortmapMapping.key(m1), m1);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -77,7 +78,6 @@ public class RpcProgramPortmap extends RpcProgram implements PortmapInterface {
|
|
|
PortmapMapping value = null;
|
|
|
synchronized(this) {
|
|
|
map.put(key, mapping);
|
|
|
- dumpRpcServices();
|
|
|
value = map.get(key);
|
|
|
}
|
|
|
return PortmapResponse.intReply(out, xid, value.getPort());
|
|
@@ -126,21 +126,15 @@ public class RpcProgramPortmap extends RpcProgram implements PortmapInterface {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void register(PortmapMapping mapping) {
|
|
|
- String key = PortmapMapping.key(mapping);
|
|
|
- synchronized(this) {
|
|
|
- map.put(key, mapping);
|
|
|
- }
|
|
|
- }
|
|
|
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
|
|
|
+ throws Exception {
|
|
|
|
|
|
- @Override
|
|
|
- public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
|
|
|
+ RpcInfo info = (RpcInfo) e.getMessage();
|
|
|
RpcCall rpcCall = (RpcCall) info.header();
|
|
|
final Procedure portmapProc = Procedure.fromValue(rpcCall.getProcedure());
|
|
|
int xid = rpcCall.getXid();
|
|
|
- byte[] data = new byte[info.data().readableBytes()];
|
|
|
- info.data().readBytes(data);
|
|
|
- XDR in = new XDR(data);
|
|
|
+ XDR in = new XDR(info.data().toByteBuffer().asReadOnlyBuffer(),
|
|
|
+ XDR.State.READING);
|
|
|
XDR out = new XDR();
|
|
|
|
|
|
if (portmapProc == Procedure.PMAPPROC_NULL) {
|
|
@@ -162,13 +156,29 @@ public class RpcProgramPortmap extends RpcProgram implements PortmapInterface {
|
|
|
reply.write(out);
|
|
|
}
|
|
|
|
|
|
- ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
|
|
|
+ ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
|
|
|
+ .buffer());
|
|
|
RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
|
|
|
RpcUtil.sendRpcResponse(ctx, rsp);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- protected boolean isIdempotent(RpcCall call) {
|
|
|
- return false;
|
|
|
+ public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
|
|
|
+ throws Exception {
|
|
|
+ allChannels.add(e.getChannel());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
|
|
|
+ throws Exception {
|
|
|
+ if (e.getState() == IdleState.ALL_IDLE) {
|
|
|
+ e.getChannel().close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
|
|
|
+ LOG.warn("Encountered ", e.getCause());
|
|
|
+ e.getChannel().close();
|
|
|
}
|
|
|
}
|