Browse Source

HADOOP-7474. Refactor ClientCache out of WritableRpcEngine. Contributed by jitendra.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1151978 13f79535-47bb-0310-9956-ffa450edef68
Jitendra Nath Pandey 13 years ago
parent
commit
817df4d5d3

+ 2 - 0
common/CHANGES.txt

@@ -292,6 +292,8 @@ Trunk (unreleased changes)
     HADOOP-7378. Add -d option to ls to not expand directories.
     (Daryn Sharp via suresh)
 
+    HADOOP-7474. Refactor ClientCache out of WritableRpcEngine. (jitendra)
+
   OPTIMIZATIONS
   
     HADOOP-7333. Performance improvement in PureJavaCrc32. (Eric Caspole

+ 14 - 2
common/src/java/org/apache/hadoop/ipc/Client.java

@@ -1195,7 +1195,9 @@ public class Client {
    * This class holds the address and the user ticket. The client connections
    * to servers are uniquely identified by <remoteAddress, protocol, ticket>
    */
-  static class ConnectionId {
+  @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
+  @InterfaceStability.Evolving
+  public static class ConnectionId {
     InetSocketAddress address;
     UserGroupInformation ticket;
     Class<?> protocol;
@@ -1266,7 +1268,17 @@ public class Client {
       return pingInterval;
     }
     
-    static ConnectionId getConnectionId(InetSocketAddress addr,
+    /**
+     * Returns a ConnectionId object. 
+     * @param addr Remote address for the connection.
+     * @param protocol Protocol for RPC.
+     * @param ticket UGI
+     * @param rpcTimeout timeout
+     * @param conf Configuration object
+     * @return A ConnectionId instance
+     * @throws IOException
+     */
+    public static ConnectionId getConnectionId(InetSocketAddress addr,
         Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
         Configuration conf) throws IOException {
       String remotePrincipal = getRemotePrincipal(conf, addr, protocol);

+ 103 - 0
common/src/java/org/apache/hadoop/ipc/ClientCache.java

@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.net.SocketFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+
+/* Cache a client using its socket factory as the hash key */
+@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
+@InterfaceStability.Evolving
+public class ClientCache {
+  private Map<SocketFactory, Client> clients =
+    new HashMap<SocketFactory, Client>();
+
+  /**
+   * Construct & cache an IPC client with the user-provided SocketFactory 
+   * if no cached client exists.
+   * 
+   * @param conf Configuration
+   * @param factory SocketFactory for client socket
+   * @param valueClass Class of the expected response
+   * @return an IPC client
+   */
+  public synchronized Client getClient(Configuration conf,
+      SocketFactory factory, Class<? extends Writable> valueClass) {
+    // Construct & cache client.  The configuration is only used for timeout,
+    // and Clients have connection pools.  So we can either (a) lose some
+    // connection pooling and leak sockets, or (b) use the same timeout for all
+    // configurations.  Since the IPC is usually intended globally, not
+    // per-job, we choose (a).
+    Client client = clients.get(factory);
+    if (client == null) {
+      client = new Client(valueClass, conf, factory);
+      clients.put(factory, client);
+    } else {
+      client.incCount();
+    }
+    return client;
+  }
+
+  /**
+   * Construct & cache an IPC client with the default SocketFactory 
+   * and default valueClass if no cached client exists. 
+   * 
+   * @param conf Configuration
+   * @return an IPC client
+   */
+  public synchronized Client getClient(Configuration conf) {
+    return getClient(conf, SocketFactory.getDefault(), ObjectWritable.class);
+  }
+  
+  /**
+   * Construct & cache an IPC client with the user-provided SocketFactory 
+   * if no cached client exists. Default response type is ObjectWritable.
+   * 
+   * @param conf Configuration
+   * @param factory SocketFactory for client socket
+   * @return an IPC client
+   */
+  public synchronized Client getClient(Configuration conf, SocketFactory factory) {
+    return this.getClient(conf, factory, ObjectWritable.class);
+  }
+
+  /**
+   * Stop a RPC client connection 
+   * A RPC client is closed only when its reference count becomes zero.
+   */
+  public void stopClient(Client client) {
+    synchronized (this) {
+      client.decCount();
+      if (client.isZeroReference()) {
+        clients.remove(client.getSocketFactory());
+      }
+    }
+    if (client.isZeroReference()) {
+      client.stop();
+    }
+  }
+}

+ 0 - 57
common/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java

@@ -167,63 +167,6 @@ public class WritableRpcEngine implements RpcEngine {
 
   }
 
-  /* Cache a client using its socket factory as the hash key */
-  static private class ClientCache {
-    private Map<SocketFactory, Client> clients =
-      new HashMap<SocketFactory, Client>();
-
-    /**
-     * Construct & cache an IPC client with the user-provided SocketFactory 
-     * if no cached client exists.
-     * 
-     * @param conf Configuration
-     * @return an IPC client
-     */
-    private synchronized Client getClient(Configuration conf,
-        SocketFactory factory) {
-      // Construct & cache client.  The configuration is only used for timeout,
-      // and Clients have connection pools.  So we can either (a) lose some
-      // connection pooling and leak sockets, or (b) use the same timeout for all
-      // configurations.  Since the IPC is usually intended globally, not
-      // per-job, we choose (a).
-      Client client = clients.get(factory);
-      if (client == null) {
-        client = new Client(ObjectWritable.class, conf, factory);
-        clients.put(factory, client);
-      } else {
-        client.incCount();
-      }
-      return client;
-    }
-
-    /**
-     * Construct & cache an IPC client with the default SocketFactory 
-     * if no cached client exists.
-     * 
-     * @param conf Configuration
-     * @return an IPC client
-     */
-    private synchronized Client getClient(Configuration conf) {
-      return getClient(conf, SocketFactory.getDefault());
-    }
-
-    /**
-     * Stop a RPC client connection 
-     * A RPC client is closed only when its reference count becomes zero.
-     */
-    private void stopClient(Client client) {
-      synchronized (this) {
-        client.decCount();
-        if (client.isZeroReference()) {
-          clients.remove(client.getSocketFactory());
-        }
-      }
-      if (client.isZeroReference()) {
-        client.stop();
-      }
-    }
-  }
-
   private static ClientCache CLIENTS=new ClientCache();
   
   private static class Invoker implements InvocationHandler {