|
@@ -43,6 +43,7 @@ import java.util.Iterator;
|
|
|
import java.util.Map.Entry;
|
|
|
import java.util.Random;
|
|
|
import java.util.Set;
|
|
|
+import java.util.concurrent.Callable;
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
@@ -56,6 +57,8 @@ import java.util.concurrent.atomic.AtomicLong;
|
|
|
import javax.net.SocketFactory;
|
|
|
import javax.security.sasl.Sasl;
|
|
|
|
|
|
+import com.google.common.cache.Cache;
|
|
|
+import com.google.common.cache.CacheBuilder;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
@@ -122,8 +125,8 @@ public class Client {
|
|
|
retryCount.set(rc);
|
|
|
}
|
|
|
|
|
|
- private Hashtable<ConnectionId, Connection> connections =
|
|
|
- new Hashtable<ConnectionId, Connection>();
|
|
|
+ private final Cache<ConnectionId, Connection> connections =
|
|
|
+ CacheBuilder.newBuilder().build();
|
|
|
|
|
|
private Class<? extends Writable> valueClass; // class of call values
|
|
|
private AtomicBoolean running = new AtomicBoolean(true); // if client runs
|
|
@@ -1167,13 +1170,7 @@ public class Client {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- // release the resources
|
|
|
- // first thing to do;take the connection out of the connection list
|
|
|
- synchronized (connections) {
|
|
|
- if (connections.get(remoteId) == this) {
|
|
|
- connections.remove(remoteId);
|
|
|
- }
|
|
|
- }
|
|
|
+ connections.invalidate(remoteId);
|
|
|
|
|
|
// close the streams and therefore the socket
|
|
|
IOUtils.closeStream(out);
|
|
@@ -1260,14 +1257,12 @@ public class Client {
|
|
|
}
|
|
|
|
|
|
// wake up all connections
|
|
|
- synchronized (connections) {
|
|
|
- for (Connection conn : connections.values()) {
|
|
|
- conn.interrupt();
|
|
|
- }
|
|
|
+ for (Connection conn : connections.asMap().values()) {
|
|
|
+ conn.interrupt();
|
|
|
}
|
|
|
|
|
|
// wait until all connections are closed
|
|
|
- while (!connections.isEmpty()) {
|
|
|
+ while (connections.size() > 0) {
|
|
|
try {
|
|
|
Thread.sleep(100);
|
|
|
} catch (InterruptedException e) {
|
|
@@ -1283,56 +1278,12 @@ public class Client {
|
|
|
*/
|
|
|
public Writable call(Writable param, InetSocketAddress address)
|
|
|
throws IOException {
|
|
|
- return call(RPC.RpcKind.RPC_BUILTIN, param, address);
|
|
|
-
|
|
|
- }
|
|
|
- /** Make a call, passing <code>param</code>, to the IPC server running at
|
|
|
- * <code>address</code>, returning the value. Throws exceptions if there are
|
|
|
- * network problems or if the remote code threw an exception.
|
|
|
- * @deprecated Use {@link #call(RPC.RpcKind, Writable,
|
|
|
- * ConnectionId)} instead
|
|
|
- */
|
|
|
- @Deprecated
|
|
|
- public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress address)
|
|
|
- throws IOException {
|
|
|
- return call(rpcKind, param, address, null);
|
|
|
- }
|
|
|
-
|
|
|
- /** Make a call, passing <code>param</code>, to the IPC server running at
|
|
|
- * <code>address</code> with the <code>ticket</code> credentials, returning
|
|
|
- * the value.
|
|
|
- * Throws exceptions if there are network problems or if the remote code
|
|
|
- * threw an exception.
|
|
|
- * @deprecated Use {@link #call(RPC.RpcKind, Writable,
|
|
|
- * ConnectionId)} instead
|
|
|
- */
|
|
|
- @Deprecated
|
|
|
- public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr,
|
|
|
- UserGroupInformation ticket) throws IOException {
|
|
|
- ConnectionId remoteId = ConnectionId.getConnectionId(addr, null, ticket, 0,
|
|
|
+ ConnectionId remoteId = ConnectionId.getConnectionId(address, null, null, 0,
|
|
|
conf);
|
|
|
- return call(rpcKind, param, remoteId);
|
|
|
- }
|
|
|
-
|
|
|
- /** Make a call, passing <code>param</code>, to the IPC server running at
|
|
|
- * <code>address</code> which is servicing the <code>protocol</code> protocol,
|
|
|
- * with the <code>ticket</code> credentials and <code>rpcTimeout</code> as
|
|
|
- * timeout, returning the value.
|
|
|
- * Throws exceptions if there are network problems or if the remote code
|
|
|
- * threw an exception.
|
|
|
- * @deprecated Use {@link #call(RPC.RpcKind, Writable,
|
|
|
- * ConnectionId)} instead
|
|
|
- */
|
|
|
- @Deprecated
|
|
|
- public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr,
|
|
|
- Class<?> protocol, UserGroupInformation ticket,
|
|
|
- int rpcTimeout) throws IOException {
|
|
|
- ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
|
|
|
- ticket, rpcTimeout, conf);
|
|
|
- return call(rpcKind, param, remoteId);
|
|
|
+ return call(RpcKind.RPC_BUILTIN, param, remoteId);
|
|
|
+
|
|
|
}
|
|
|
|
|
|
-
|
|
|
/**
|
|
|
* Same as {@link #call(RPC.RpcKind, Writable, InetSocketAddress,
|
|
|
* Class, UserGroupInformation, int, Configuration)}
|
|
@@ -1506,15 +1457,14 @@ public class Client {
|
|
|
@InterfaceAudience.Private
|
|
|
@InterfaceStability.Unstable
|
|
|
Set<ConnectionId> getConnectionIds() {
|
|
|
- synchronized (connections) {
|
|
|
- return connections.keySet();
|
|
|
- }
|
|
|
+ return connections.asMap().keySet();
|
|
|
}
|
|
|
|
|
|
/** Get a connection from the pool, or create a new one and add it to the
|
|
|
* pool. Connections to a given ConnectionId are reused. */
|
|
|
- private Connection getConnection(ConnectionId remoteId,
|
|
|
- Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)
|
|
|
+ private Connection getConnection(
|
|
|
+ final ConnectionId remoteId,
|
|
|
+ Call call, final int serviceClass, AtomicBoolean fallbackToSimpleAuth)
|
|
|
throws IOException {
|
|
|
if (!running.get()) {
|
|
|
// the client is stopped
|
|
@@ -1525,15 +1475,23 @@ public class Client {
|
|
|
* connectionsId object and with set() method. We need to manage the
|
|
|
* refs for keys in HashMap properly. For now its ok.
|
|
|
*/
|
|
|
- do {
|
|
|
- synchronized (connections) {
|
|
|
- connection = connections.get(remoteId);
|
|
|
- if (connection == null) {
|
|
|
- connection = new Connection(remoteId, serviceClass);
|
|
|
- connections.put(remoteId, connection);
|
|
|
- }
|
|
|
+ while(true) {
|
|
|
+ try {
|
|
|
+ connection = connections.get(remoteId, new Callable<Connection>() {
|
|
|
+ @Override
|
|
|
+ public Connection call() throws Exception {
|
|
|
+ return new Connection(remoteId, serviceClass);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } catch (ExecutionException e) {
|
|
|
+ throw new IOException(e);
|
|
|
+ }
|
|
|
+ if (connection.addCall(call)) {
|
|
|
+ break;
|
|
|
+ } else {
|
|
|
+ connections.invalidate(remoteId);
|
|
|
}
|
|
|
- } while (!connection.addCall(call));
|
|
|
+ }
|
|
|
|
|
|
//we don't invoke the method below inside "synchronized (connections)"
|
|
|
//block above. The reason for that is if the server happens to be slow,
|