|
@@ -21,7 +21,6 @@ package org.apache.hadoop.ipc;
|
|
|
import java.io.ByteArrayInputStream;
|
|
|
import java.io.ByteArrayOutputStream;
|
|
|
import java.io.DataInputStream;
|
|
|
-import java.io.DataOutput;
|
|
|
import java.io.DataOutputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.net.BindException;
|
|
@@ -43,9 +42,7 @@ import java.nio.channels.SocketChannel;
|
|
|
import java.nio.channels.WritableByteChannel;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.Arrays;
|
|
|
import java.util.Collections;
|
|
|
-import java.util.HashMap;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
@@ -69,35 +66,28 @@ import org.apache.hadoop.io.BytesWritable;
|
|
|
import org.apache.hadoop.io.IntWritable;
|
|
|
import org.apache.hadoop.io.Writable;
|
|
|
import org.apache.hadoop.io.WritableUtils;
|
|
|
-import org.apache.hadoop.ipc.RPC.RpcInvoker;
|
|
|
import org.apache.hadoop.ipc.RPC.VersionMismatch;
|
|
|
-import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
|
|
|
-import org.apache.hadoop.ipc.RpcPayloadHeader.RpcPayloadOperation;
|
|
|
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
|
|
|
import org.apache.hadoop.ipc.metrics.RpcMetrics;
|
|
|
-import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.security.AccessControlException;
|
|
|
import org.apache.hadoop.security.SaslRpcServer;
|
|
|
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
|
|
|
+import org.apache.hadoop.security.SaslRpcServer.SaslStatus;
|
|
|
import org.apache.hadoop.security.SaslRpcServer.SaslDigestCallbackHandler;
|
|
|
import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
|
|
|
-import org.apache.hadoop.security.SaslRpcServer.SaslStatus;
|
|
|
-import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.security.authorize.ProxyUsers;
|
|
|
import org.apache.hadoop.security.authorize.AuthorizationException;
|
|
|
import org.apache.hadoop.security.authorize.PolicyProvider;
|
|
|
-import org.apache.hadoop.security.authorize.ProxyUsers;
|
|
|
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
|
|
|
+import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
import org.apache.hadoop.security.token.SecretManager;
|
|
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|
|
-import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
-import org.apache.hadoop.util.ProtoUtil;
|
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
|
|
|
-import com.google.common.annotations.VisibleForTesting;
|
|
|
-
|
|
|
/** An abstract IPC service. IPC calls take a single {@link Writable} as a
|
|
|
* parameter, and return a {@link Writable} as their value. A service runs on
|
|
|
* a port and is defined by a parameter class and a value class.
|
|
@@ -113,105 +103,17 @@ public abstract class Server {
|
|
|
*/
|
|
|
public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
|
|
|
|
|
|
- /**
|
|
|
- * Serialization type for ConnectionContext and RpcPayloadHeader
|
|
|
- */
|
|
|
- public enum IpcSerializationType {
|
|
|
- // Add new serialization type to the end without affecting the enum order
|
|
|
- PROTOBUF;
|
|
|
-
|
|
|
- void write(DataOutput out) throws IOException {
|
|
|
- out.writeByte(this.ordinal());
|
|
|
- }
|
|
|
-
|
|
|
- static IpcSerializationType fromByte(byte b) {
|
|
|
- return IpcSerializationType.values()[b];
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * If the user accidentally sends an HTTP GET to an IPC port, we detect this
|
|
|
- * and send back a nicer response.
|
|
|
- */
|
|
|
- private static final ByteBuffer HTTP_GET_BYTES = ByteBuffer.wrap(
|
|
|
- "GET ".getBytes());
|
|
|
-
|
|
|
- /**
|
|
|
- * An HTTP response to send back if we detect an HTTP request to our IPC
|
|
|
- * port.
|
|
|
- */
|
|
|
- static final String RECEIVED_HTTP_REQ_RESPONSE =
|
|
|
- "HTTP/1.1 404 Not Found\r\n" +
|
|
|
- "Content-type: text/plain\r\n\r\n" +
|
|
|
- "It looks like you are making an HTTP request to a Hadoop IPC port. " +
|
|
|
- "This is not the correct port for the web interface on this daemon.\r\n";
|
|
|
-
|
|
|
// 1 : Introduce ping and server does not throw away RPCs
|
|
|
// 3 : Introduce the protocol into the RPC connection header
|
|
|
// 4 : Introduced SASL security layer
|
|
|
// 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal}
|
|
|
// in ObjectWritable to efficiently transmit arrays of primitives
|
|
|
- // 6 : Made RPC payload header explicit
|
|
|
- // 7 : Changed Ipc Connection Header to use Protocol buffers
|
|
|
- public static final byte CURRENT_VERSION = 7;
|
|
|
+ public static final byte CURRENT_VERSION = 5;
|
|
|
|
|
|
/**
|
|
|
* Initial and max size of response buffer
|
|
|
*/
|
|
|
static int INITIAL_RESP_BUF_SIZE = 10240;
|
|
|
-
|
|
|
- static class RpcKindMapValue {
|
|
|
- final Class<? extends Writable> rpcRequestWrapperClass;
|
|
|
- final RpcInvoker rpcInvoker;
|
|
|
- RpcKindMapValue (Class<? extends Writable> rpcRequestWrapperClass,
|
|
|
- RpcInvoker rpcInvoker) {
|
|
|
- this.rpcInvoker = rpcInvoker;
|
|
|
- this.rpcRequestWrapperClass = rpcRequestWrapperClass;
|
|
|
- }
|
|
|
- }
|
|
|
- static Map<RpcKind, RpcKindMapValue> rpcKindMap = new
|
|
|
- HashMap<RpcKind, RpcKindMapValue>(4);
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * Register a RPC kind and the class to deserialize the rpc request.
|
|
|
- *
|
|
|
- * Called by static initializers of rpcKind Engines
|
|
|
- * @param rpcKind
|
|
|
- * @param rpcRequestWrapperClass - this class is used to deserialze the
|
|
|
- * the rpc request.
|
|
|
- * @param rpcInvoker - use to process the calls on SS.
|
|
|
- */
|
|
|
-
|
|
|
- public static void registerProtocolEngine(RpcKind rpcKind,
|
|
|
- Class<? extends Writable> rpcRequestWrapperClass,
|
|
|
- RpcInvoker rpcInvoker) {
|
|
|
- RpcKindMapValue old =
|
|
|
- rpcKindMap.put(rpcKind, new RpcKindMapValue(rpcRequestWrapperClass, rpcInvoker));
|
|
|
- if (old != null) {
|
|
|
- rpcKindMap.put(rpcKind, old);
|
|
|
- throw new IllegalArgumentException("ReRegistration of rpcKind: " +
|
|
|
- rpcKind);
|
|
|
- }
|
|
|
- LOG.debug("rpcKind=" + rpcKind +
|
|
|
- ", rpcRequestWrapperClass=" + rpcRequestWrapperClass +
|
|
|
- ", rpcInvoker=" + rpcInvoker);
|
|
|
- }
|
|
|
-
|
|
|
- public Class<? extends Writable> getRpcRequestWrapper(
|
|
|
- RpcKind rpcKind) {
|
|
|
- if (rpcRequestClass != null)
|
|
|
- return rpcRequestClass;
|
|
|
- RpcKindMapValue val = rpcKindMap.get(rpcKind);
|
|
|
- return (val == null) ? null : val.rpcRequestWrapperClass;
|
|
|
- }
|
|
|
-
|
|
|
- public static RpcInvoker getRpcInvoker(RpcKind rpcKind) {
|
|
|
- RpcKindMapValue val = rpcKindMap.get(rpcKind);
|
|
|
- return (val == null) ? null : val.rpcInvoker;
|
|
|
- }
|
|
|
-
|
|
|
|
|
|
public static final Log LOG = LogFactory.getLog(Server.class);
|
|
|
public static final Log AUDITLOG =
|
|
@@ -276,7 +178,7 @@ public abstract class Server {
|
|
|
private int port; // port we listen on
|
|
|
private int handlerCount; // number of handler threads
|
|
|
private int readThreads; // number of read threads
|
|
|
- private Class<? extends Writable> rpcRequestClass; // class used for deserializing the rpc request
|
|
|
+ private Class<? extends Writable> paramClass; // class of call parameters
|
|
|
private int maxIdleTime; // the maximum idle time after
|
|
|
// which a client may be disconnected
|
|
|
private int thresholdIdleConnections; // the number of idle connections
|
|
@@ -337,21 +239,10 @@ public abstract class Server {
|
|
|
* Returns a handle to the rpcMetrics (required in tests)
|
|
|
* @return rpc metrics
|
|
|
*/
|
|
|
- @VisibleForTesting
|
|
|
public RpcMetrics getRpcMetrics() {
|
|
|
return rpcMetrics;
|
|
|
}
|
|
|
|
|
|
- @VisibleForTesting
|
|
|
- public RpcDetailedMetrics getRpcDetailedMetrics() {
|
|
|
- return rpcDetailedMetrics;
|
|
|
- }
|
|
|
-
|
|
|
- @VisibleForTesting
|
|
|
- Iterable<? extends Thread> getHandlers() {
|
|
|
- return Arrays.asList(handlers);
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Refresh the service authorization ACL for the service handled by this server.
|
|
|
*/
|
|
@@ -370,33 +261,28 @@ public abstract class Server {
|
|
|
|
|
|
/** A call queued for handling. */
|
|
|
private static class Call {
|
|
|
- private final int callId; // the client's call id
|
|
|
- private final Writable rpcRequest; // Serialized Rpc request from client
|
|
|
- private final Connection connection; // connection to client
|
|
|
- private long timestamp; // time received when response is null
|
|
|
- // time served when response is not null
|
|
|
- private ByteBuffer rpcResponse; // the response for this call
|
|
|
- private final RpcKind rpcKind;
|
|
|
-
|
|
|
- public Call(int id, Writable param, Connection connection) {
|
|
|
- this( id, param, connection, RpcKind.RPC_BUILTIN );
|
|
|
- }
|
|
|
- public Call(int id, Writable param, Connection connection, RpcKind kind) {
|
|
|
- this.callId = id;
|
|
|
- this.rpcRequest = param;
|
|
|
+ private int id; // the client's call id
|
|
|
+ private Writable param; // the parameter passed
|
|
|
+ private Connection connection; // connection to client
|
|
|
+ private long timestamp; // the time received when response is null
|
|
|
+ // the time served when response is not null
|
|
|
+ private ByteBuffer response; // the response for this call
|
|
|
+
|
|
|
+ public Call(int id, Writable param, Connection connection) {
|
|
|
+ this.id = id;
|
|
|
+ this.param = param;
|
|
|
this.connection = connection;
|
|
|
this.timestamp = System.currentTimeMillis();
|
|
|
- this.rpcResponse = null;
|
|
|
- this.rpcKind = kind;
|
|
|
+ this.response = null;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public String toString() {
|
|
|
- return rpcRequest.toString() + " from " + connection.toString();
|
|
|
+ return param.toString() + " from " + connection.toString();
|
|
|
}
|
|
|
|
|
|
public void setResponse(ByteBuffer response) {
|
|
|
- this.rpcResponse = response;
|
|
|
+ this.response = response;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -895,17 +781,17 @@ public abstract class Server {
|
|
|
call = responseQueue.removeFirst();
|
|
|
SocketChannel channel = call.connection.channel;
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug(getName() + ": responding to #" + call.callId + " from " +
|
|
|
+ LOG.debug(getName() + ": responding to #" + call.id + " from " +
|
|
|
call.connection);
|
|
|
}
|
|
|
//
|
|
|
// Send as much data as we can in the non-blocking fashion
|
|
|
//
|
|
|
- int numBytes = channelWrite(channel, call.rpcResponse);
|
|
|
+ int numBytes = channelWrite(channel, call.response);
|
|
|
if (numBytes < 0) {
|
|
|
return true;
|
|
|
}
|
|
|
- if (!call.rpcResponse.hasRemaining()) {
|
|
|
+ if (!call.response.hasRemaining()) {
|
|
|
call.connection.decRpcCount();
|
|
|
if (numElements == 1) { // last call fully processes.
|
|
|
done = true; // no more data for this channel.
|
|
@@ -913,7 +799,7 @@ public abstract class Server {
|
|
|
done = false; // more calls pending to be sent.
|
|
|
}
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug(getName() + ": responding to #" + call.callId + " from " +
|
|
|
+ LOG.debug(getName() + ": responding to #" + call.id + " from " +
|
|
|
call.connection + " Wrote " + numBytes + " bytes.");
|
|
|
}
|
|
|
} else {
|
|
@@ -941,7 +827,7 @@ public abstract class Server {
|
|
|
}
|
|
|
}
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug(getName() + ": responding to #" + call.callId + " from " +
|
|
|
+ LOG.debug(getName() + ": responding to #" + call.id + " from " +
|
|
|
call.connection + " Wrote partial " + numBytes +
|
|
|
" bytes.");
|
|
|
}
|
|
@@ -988,9 +874,9 @@ public abstract class Server {
|
|
|
|
|
|
/** Reads calls from a connection and queues them for handling. */
|
|
|
public class Connection {
|
|
|
- private boolean connectionHeaderRead = false; // connection header is read?
|
|
|
- private boolean connectionContextRead = false; //if connection context that
|
|
|
- //follows connection header is read
|
|
|
+ private boolean rpcHeaderRead = false; // if initial rpc header is read
|
|
|
+ private boolean headerRead = false; //if the connection header that
|
|
|
+ //follows version is read.
|
|
|
|
|
|
private SocketChannel channel;
|
|
|
private ByteBuffer data;
|
|
@@ -1006,14 +892,14 @@ public abstract class Server {
|
|
|
private int remotePort;
|
|
|
private InetAddress addr;
|
|
|
|
|
|
- IpcConnectionContextProto connectionContext;
|
|
|
- String protocolName;
|
|
|
+ ConnectionHeader header = new ConnectionHeader();
|
|
|
+ Class<?> protocol;
|
|
|
boolean useSasl;
|
|
|
SaslServer saslServer;
|
|
|
private AuthMethod authMethod;
|
|
|
private boolean saslContextEstablished;
|
|
|
private boolean skipInitialSaslHandshake;
|
|
|
- private ByteBuffer connectionHeaderBuf = null;
|
|
|
+ private ByteBuffer rpcHeaderBuffer;
|
|
|
private ByteBuffer unwrappedData;
|
|
|
private ByteBuffer unwrappedDataLengthBuffer;
|
|
|
|
|
@@ -1027,7 +913,6 @@ public abstract class Server {
|
|
|
private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
|
|
|
// Fake 'call' for SASL context setup
|
|
|
private static final int SASL_CALLID = -33;
|
|
|
-
|
|
|
private final Call saslCall = new Call(SASL_CALLID, null, this);
|
|
|
private final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream();
|
|
|
|
|
@@ -1131,7 +1016,6 @@ public abstract class Server {
|
|
|
throw new AccessControlException(
|
|
|
"Server is not configured to do DIGEST authentication.");
|
|
|
}
|
|
|
- secretManager.checkAvailableForRead();
|
|
|
saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
|
|
|
.getMechanismName(), null, SaslRpcServer.SASL_DEFAULT_REALM,
|
|
|
SaslRpcServer.SASL_PROPS, new SaslDigestCallbackHandler(
|
|
@@ -1261,30 +1145,21 @@ public abstract class Server {
|
|
|
if (count < 0 || dataLengthBuffer.remaining() > 0)
|
|
|
return count;
|
|
|
}
|
|
|
-
|
|
|
- if (!connectionHeaderRead) {
|
|
|
+
|
|
|
+ if (!rpcHeaderRead) {
|
|
|
//Every connection is expected to send the header.
|
|
|
- if (connectionHeaderBuf == null) {
|
|
|
- connectionHeaderBuf = ByteBuffer.allocate(3);
|
|
|
+ if (rpcHeaderBuffer == null) {
|
|
|
+ rpcHeaderBuffer = ByteBuffer.allocate(2);
|
|
|
}
|
|
|
- count = channelRead(channel, connectionHeaderBuf);
|
|
|
- if (count < 0 || connectionHeaderBuf.remaining() > 0) {
|
|
|
+ count = channelRead(channel, rpcHeaderBuffer);
|
|
|
+ if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
|
|
|
return count;
|
|
|
}
|
|
|
- int version = connectionHeaderBuf.get(0);
|
|
|
- byte[] method = new byte[] {connectionHeaderBuf.get(1)};
|
|
|
+ int version = rpcHeaderBuffer.get(0);
|
|
|
+ byte[] method = new byte[] {rpcHeaderBuffer.get(1)};
|
|
|
authMethod = AuthMethod.read(new DataInputStream(
|
|
|
new ByteArrayInputStream(method)));
|
|
|
- dataLengthBuffer.flip();
|
|
|
-
|
|
|
- // Check if it looks like the user is hitting an IPC port
|
|
|
- // with an HTTP GET - this is a common error, so we can
|
|
|
- // send back a simple string indicating as much.
|
|
|
- if (HTTP_GET_BYTES.equals(dataLengthBuffer)) {
|
|
|
- setupHttpRequestOnIpcPortResponse();
|
|
|
- return -1;
|
|
|
- }
|
|
|
-
|
|
|
+ dataLengthBuffer.flip();
|
|
|
if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
|
|
|
//Warning is ok since this is not supposed to happen.
|
|
|
LOG.warn("Incorrect header or version mismatch from " +
|
|
@@ -1294,14 +1169,6 @@ public abstract class Server {
|
|
|
setupBadVersionResponse(version);
|
|
|
return -1;
|
|
|
}
|
|
|
-
|
|
|
- IpcSerializationType serializationType = IpcSerializationType
|
|
|
- .fromByte(connectionHeaderBuf.get(2));
|
|
|
- if (serializationType != IpcSerializationType.PROTOBUF) {
|
|
|
- respondUnsupportedSerialization(serializationType);
|
|
|
- return -1;
|
|
|
- }
|
|
|
-
|
|
|
dataLengthBuffer.clear();
|
|
|
if (authMethod == null) {
|
|
|
throw new IOException("Unable to read authentication method");
|
|
@@ -1331,8 +1198,8 @@ public abstract class Server {
|
|
|
useSasl = true;
|
|
|
}
|
|
|
|
|
|
- connectionHeaderBuf = null;
|
|
|
- connectionHeaderRead = true;
|
|
|
+ rpcHeaderBuffer = null;
|
|
|
+ rpcHeaderRead = true;
|
|
|
continue;
|
|
|
}
|
|
|
|
|
@@ -1363,7 +1230,7 @@ public abstract class Server {
|
|
|
skipInitialSaslHandshake = false;
|
|
|
continue;
|
|
|
}
|
|
|
- boolean isHeaderRead = connectionContextRead;
|
|
|
+ boolean isHeaderRead = headerRead;
|
|
|
if (useSasl) {
|
|
|
saslReadAndProcess(data.array());
|
|
|
} else {
|
|
@@ -1411,34 +1278,23 @@ public abstract class Server {
|
|
|
responder.doRespond(fakeCall);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- private void respondUnsupportedSerialization(IpcSerializationType st) throws IOException {
|
|
|
- String errMsg = "Server IPC version " + CURRENT_VERSION
|
|
|
- + " do not support serilization " + st.toString();
|
|
|
- ByteArrayOutputStream buffer = new ByteArrayOutputStream();
|
|
|
-
|
|
|
- Call fakeCall = new Call(-1, null, this);
|
|
|
- setupResponse(buffer, fakeCall, Status.FATAL, null,
|
|
|
- IpcException.class.getName(), errMsg);
|
|
|
- responder.doRespond(fakeCall);
|
|
|
- }
|
|
|
-
|
|
|
- private void setupHttpRequestOnIpcPortResponse() throws IOException {
|
|
|
- Call fakeCall = new Call(0, null, this);
|
|
|
- fakeCall.setResponse(ByteBuffer.wrap(
|
|
|
- RECEIVED_HTTP_REQ_RESPONSE.getBytes()));
|
|
|
- responder.doRespond(fakeCall);
|
|
|
- }
|
|
|
|
|
|
- /** Reads the connection context following the connection header */
|
|
|
- private void processConnectionContext(byte[] buf) throws IOException {
|
|
|
+ /// Reads the connection header following version
|
|
|
+ private void processHeader(byte[] buf) throws IOException {
|
|
|
DataInputStream in =
|
|
|
new DataInputStream(new ByteArrayInputStream(buf));
|
|
|
- connectionContext = IpcConnectionContextProto.parseFrom(in);
|
|
|
- protocolName = connectionContext.hasProtocol() ? connectionContext
|
|
|
- .getProtocol() : null;
|
|
|
-
|
|
|
- UserGroupInformation protocolUser = ProtoUtil.getUgi(connectionContext);
|
|
|
+ header.readFields(in);
|
|
|
+ try {
|
|
|
+ String protocolClassName = header.getProtocol();
|
|
|
+ if (protocolClassName != null) {
|
|
|
+ protocol = getProtocolClass(header.getProtocol(), conf);
|
|
|
+ rpcDetailedMetrics.init(protocol);
|
|
|
+ }
|
|
|
+ } catch (ClassNotFoundException cnfe) {
|
|
|
+ throw new IOException("Unknown protocol: " + header.getProtocol());
|
|
|
+ }
|
|
|
+
|
|
|
+ UserGroupInformation protocolUser = header.getUgi();
|
|
|
if (!useSasl) {
|
|
|
user = protocolUser;
|
|
|
if (user != null) {
|
|
@@ -1512,15 +1368,15 @@ public abstract class Server {
|
|
|
|
|
|
private void processOneRpc(byte[] buf) throws IOException,
|
|
|
InterruptedException {
|
|
|
- if (connectionContextRead) {
|
|
|
+ if (headerRead) {
|
|
|
processData(buf);
|
|
|
} else {
|
|
|
- processConnectionContext(buf);
|
|
|
- connectionContextRead = true;
|
|
|
+ processHeader(buf);
|
|
|
+ headerRead = true;
|
|
|
if (!authorizeConnection()) {
|
|
|
throw new AccessControlException("Connection from " + this
|
|
|
- + " for protocol " + connectionContext.getProtocol()
|
|
|
- + " is unauthorized for user " + user);
|
|
|
+ + " for protocol " + header.getProtocol()
|
|
|
+ + " is unauthorized for user " + user);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1528,43 +1384,18 @@ public abstract class Server {
|
|
|
private void processData(byte[] buf) throws IOException, InterruptedException {
|
|
|
DataInputStream dis =
|
|
|
new DataInputStream(new ByteArrayInputStream(buf));
|
|
|
- RpcPayloadHeader header = new RpcPayloadHeader();
|
|
|
- header.readFields(dis); // Read the RpcPayload header
|
|
|
+ int id = dis.readInt(); // try to read an id
|
|
|
|
|
|
if (LOG.isDebugEnabled())
|
|
|
- LOG.debug(" got #" + header.getCallId());
|
|
|
- if (header.getOperation() != RpcPayloadOperation.RPC_FINAL_PAYLOAD) {
|
|
|
- throw new IOException("IPC Server does not implement operation" +
|
|
|
- header.getOperation());
|
|
|
- }
|
|
|
- // If we know the rpc kind, get its class so that we can deserialize
|
|
|
- // (Note it would make more sense to have the handler deserialize but
|
|
|
- // we continue with this original design.
|
|
|
- Class<? extends Writable> rpcRequestClass =
|
|
|
- getRpcRequestWrapper(header.getkind());
|
|
|
- if (rpcRequestClass == null) {
|
|
|
- LOG.warn("Unknown rpc kind " + header.getkind() +
|
|
|
- " from client " + getHostAddress());
|
|
|
- final Call readParamsFailedCall =
|
|
|
- new Call(header.getCallId(), null, this);
|
|
|
- ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
|
|
|
-
|
|
|
- setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
|
|
|
- IOException.class.getName(),
|
|
|
- "Unknown rpc kind " + header.getkind());
|
|
|
- responder.doRespond(readParamsFailedCall);
|
|
|
- return;
|
|
|
- }
|
|
|
- Writable rpcRequest;
|
|
|
- try { //Read the rpc request
|
|
|
- rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
|
|
|
- rpcRequest.readFields(dis);
|
|
|
+ LOG.debug(" got #" + id);
|
|
|
+ Writable param;
|
|
|
+ try {
|
|
|
+ param = ReflectionUtils.newInstance(paramClass, conf);//read param
|
|
|
+ param.readFields(dis);
|
|
|
} catch (Throwable t) {
|
|
|
LOG.warn("Unable to read call parameters for client " +
|
|
|
- getHostAddress() + "on connection protocol " +
|
|
|
- this.protocolName + " for rpcKind " + header.getkind(), t);
|
|
|
- final Call readParamsFailedCall =
|
|
|
- new Call(header.getCallId(), null, this);
|
|
|
+ getHostAddress(), t);
|
|
|
+ final Call readParamsFailedCall = new Call(id, null, this);
|
|
|
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
|
|
|
|
|
|
setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
|
|
@@ -1574,7 +1405,7 @@ public abstract class Server {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- Call call = new Call(header.getCallId(), rpcRequest, this, header.getkind());
|
|
|
+ Call call = new Call(id, param, this);
|
|
|
callQueue.put(call); // queue the call; maybe blocked here
|
|
|
incRpcCount(); // Increment the rpc count
|
|
|
}
|
|
@@ -1589,9 +1420,9 @@ public abstract class Server {
|
|
|
&& (authMethod != AuthMethod.DIGEST)) {
|
|
|
ProxyUsers.authorize(user, this.getHostAddress(), conf);
|
|
|
}
|
|
|
- authorize(user, protocolName, getHostInetAddress());
|
|
|
+ authorize(user, header, getHostInetAddress());
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Successfully authorized " + connectionContext);
|
|
|
+ LOG.debug("Successfully authorized " + header);
|
|
|
}
|
|
|
rpcMetrics.incrAuthorizationSuccesses();
|
|
|
} catch (AuthorizationException ae) {
|
|
@@ -1636,10 +1467,11 @@ public abstract class Server {
|
|
|
while (running) {
|
|
|
try {
|
|
|
final Call call = callQueue.take(); // pop the queue; maybe blocked here
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug(getName() + ": has Call#" + call.callId +
|
|
|
- "for RpcKind " + call.rpcKind + " from " + call.connection);
|
|
|
- }
|
|
|
+
|
|
|
+ if (LOG.isDebugEnabled())
|
|
|
+ LOG.debug(getName() + ": has #" + call.id + " from " +
|
|
|
+ call.connection);
|
|
|
+
|
|
|
String errorClass = null;
|
|
|
String error = null;
|
|
|
Writable value = null;
|
|
@@ -1649,7 +1481,7 @@ public abstract class Server {
|
|
|
// Make the call as the user via Subject.doAs, thus associating
|
|
|
// the call with the Subject
|
|
|
if (call.connection.user == null) {
|
|
|
- value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest,
|
|
|
+ value = call(call.connection.protocol, call.param,
|
|
|
call.timestamp);
|
|
|
} else {
|
|
|
value =
|
|
@@ -1658,28 +1490,15 @@ public abstract class Server {
|
|
|
@Override
|
|
|
public Writable run() throws Exception {
|
|
|
// make the call
|
|
|
- return call(call.rpcKind, call.connection.protocolName,
|
|
|
- call.rpcRequest, call.timestamp);
|
|
|
+ return call(call.connection.protocol,
|
|
|
+ call.param, call.timestamp);
|
|
|
|
|
|
}
|
|
|
}
|
|
|
);
|
|
|
}
|
|
|
} catch (Throwable e) {
|
|
|
- String logMsg = getName() + ", call " + call + ": error: " + e;
|
|
|
- if (e instanceof RuntimeException || e instanceof Error) {
|
|
|
- // These exception types indicate something is probably wrong
|
|
|
- // on the server side, as opposed to just a normal exceptional
|
|
|
- // result.
|
|
|
- LOG.warn(logMsg, e);
|
|
|
- } else if (e instanceof StandbyException) {
|
|
|
- // Don't log the whole stack trace of these exceptions.
|
|
|
- // Way too noisy!
|
|
|
- LOG.info(logMsg);
|
|
|
- } else {
|
|
|
- LOG.info(logMsg, e);
|
|
|
- }
|
|
|
-
|
|
|
+ LOG.info(getName() + ", call: " + call + ", error: ", e);
|
|
|
errorClass = e.getClass().getName();
|
|
|
error = StringUtils.stringifyException(e);
|
|
|
// Remove redundant error class name from the beginning of the stack trace
|
|
@@ -1724,33 +1543,24 @@ public abstract class Server {
|
|
|
Configuration conf)
|
|
|
throws IOException
|
|
|
{
|
|
|
- this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer
|
|
|
- .toString(port), null);
|
|
|
+ this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer.toString(port), null);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Constructs a server listening on the named port and address. Parameters passed must
|
|
|
+ /** Constructs a server listening on the named port and address. Parameters passed must
|
|
|
* be of the named class. The <code>handlerCount</handlerCount> determines
|
|
|
* the number of handler threads that will be used to process calls.
|
|
|
* If queueSizePerHandler or numReaders are not -1 they will be used instead of parameters
|
|
|
* from configuration. Otherwise the configuration will be picked up.
|
|
|
- *
|
|
|
- * If rpcRequestClass is null then the rpcRequestClass must have been
|
|
|
- * registered via {@link #registerProtocolEngine(RpcPayloadHeader.RpcKind,
|
|
|
- * Class, RPC.RpcInvoker)}
|
|
|
- * This parameter has been retained for compatibility with existing tests
|
|
|
- * and usage.
|
|
|
*/
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- protected Server(String bindAddress, int port,
|
|
|
- Class<? extends Writable> rpcRequestClass, int handlerCount,
|
|
|
- int numReaders, int queueSizePerHandler, Configuration conf,
|
|
|
- String serverName, SecretManager<? extends TokenIdentifier> secretManager)
|
|
|
+ protected Server(String bindAddress, int port,
|
|
|
+ Class<? extends Writable> paramClass, int handlerCount, int numReaders, int queueSizePerHandler,
|
|
|
+ Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager)
|
|
|
throws IOException {
|
|
|
this.bindAddress = bindAddress;
|
|
|
this.conf = conf;
|
|
|
this.port = port;
|
|
|
- this.rpcRequestClass = rpcRequestClass;
|
|
|
+ this.paramClass = paramClass;
|
|
|
this.handlerCount = handlerCount;
|
|
|
this.socketSendBufferSize = 0;
|
|
|
if (queueSizePerHandler != -1) {
|
|
@@ -1831,7 +1641,7 @@ public abstract class Server {
|
|
|
throws IOException {
|
|
|
response.reset();
|
|
|
DataOutputStream out = new DataOutputStream(response);
|
|
|
- out.writeInt(call.callId); // write call id
|
|
|
+ out.writeInt(call.id); // write call id
|
|
|
out.writeInt(status.state); // write status
|
|
|
|
|
|
if (status == Status.SUCCESS) {
|
|
@@ -1948,38 +1758,37 @@ public abstract class Server {
|
|
|
|
|
|
/**
|
|
|
* Called for each call.
|
|
|
- * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, String,
|
|
|
- * Writable, long)} instead
|
|
|
+ * @deprecated Use {@link #call(Class, Writable, long)} instead
|
|
|
*/
|
|
|
@Deprecated
|
|
|
- public Writable call(Writable param, long receiveTime) throws Exception {
|
|
|
- return call(RpcKind.RPC_BUILTIN, null, param, receiveTime);
|
|
|
+ public Writable call(Writable param, long receiveTime) throws IOException {
|
|
|
+ return call(null, param, receiveTime);
|
|
|
}
|
|
|
|
|
|
/** Called for each call. */
|
|
|
- public abstract Writable call(RpcKind rpcKind, String protocol,
|
|
|
- Writable param, long receiveTime) throws Exception;
|
|
|
+ public abstract Writable call(Class<?> protocol,
|
|
|
+ Writable param, long receiveTime)
|
|
|
+ throws IOException;
|
|
|
|
|
|
/**
|
|
|
* Authorize the incoming client connection.
|
|
|
*
|
|
|
* @param user client user
|
|
|
- * @param protocolName - the protocol
|
|
|
+ * @param connection incoming connection
|
|
|
* @param addr InetAddress of incoming connection
|
|
|
* @throws AuthorizationException when the client isn't authorized to talk the protocol
|
|
|
*/
|
|
|
- private void authorize(UserGroupInformation user, String protocolName,
|
|
|
- InetAddress addr) throws AuthorizationException {
|
|
|
+ public void authorize(UserGroupInformation user,
|
|
|
+ ConnectionHeader connection,
|
|
|
+ InetAddress addr
|
|
|
+ ) throws AuthorizationException {
|
|
|
if (authorize) {
|
|
|
- if (protocolName == null) {
|
|
|
- throw new AuthorizationException("Null protocol not authorized");
|
|
|
- }
|
|
|
Class<?> protocol = null;
|
|
|
try {
|
|
|
- protocol = getProtocolClass(protocolName, getConf());
|
|
|
+ protocol = getProtocolClass(connection.getProtocol(), getConf());
|
|
|
} catch (ClassNotFoundException cfne) {
|
|
|
throw new AuthorizationException("Unknown protocol: " +
|
|
|
- protocolName);
|
|
|
+ connection.getProtocol());
|
|
|
}
|
|
|
serviceAuthorizationManager.authorize(user, protocol, getConf(), addr);
|
|
|
}
|
|
@@ -2109,5 +1918,5 @@ public abstract class Server {
|
|
|
|
|
|
int nBytes = initialRemaining - buf.remaining();
|
|
|
return (nBytes > 0) ? nBytes : ret;
|
|
|
- }
|
|
|
+ }
|
|
|
}
|