Browse Source

svn merge -c 1244185 from trunk for HADOOP-8070.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1291965 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 13 years ago
parent
commit
b3a969f03d

+ 2 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -45,6 +45,8 @@ Release 0.23-PB - Unreleased
 
     HADOOP-7965. Support for protocol version and signature in PB. (jitendra)
 
+    HADOOP-8070. Add a standalone benchmark for RPC call performance. (todd)
+
   BUG FIXES
 
     HADOOP-7695. RPC.stopProxy can throw unintended exception while logging

+ 9 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -42,6 +42,7 @@ import java.nio.channels.SocketChannel;
 import java.nio.channels.WritableByteChannel;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -92,6 +93,8 @@ import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
  * a port and is defined by a parameter class and a value class.
@@ -301,6 +304,12 @@ public abstract class Server {
     return rpcMetrics;
   }
 
+  
+  @VisibleForTesting
+  Iterable<? extends Thread> getHandlers() {
+    return Arrays.asList(handlers);
+  }
+
   /**
    * Refresh the service authorization ACL for the service handled by this server.
    */

+ 420 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/RPCCallBenchmark.java

@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.ipc.TestProtoBufRpc.PBServerImpl;
+import org.apache.hadoop.ipc.TestProtoBufRpc.TestRpcService;
+import org.apache.hadoop.ipc.TestRPC.TestProtocol;
+import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
+import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
+import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.MultithreadedTestUtil;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.google.common.base.Joiner;
+import com.google.protobuf.BlockingService;
+
+/**
+ * Benchmark for protobuf RPC.
+ * Run with --help option for usage.
+ */
+public class RPCCallBenchmark implements Tool, Configurable {
+  private Configuration conf;
+  private AtomicLong callCount = new AtomicLong(0);
+  private static ThreadMXBean threadBean =
+    ManagementFactory.getThreadMXBean();
+  
+  private static class MyOptions {
+    private boolean failed = false;
+    private int serverThreads = 0;
+    private int serverReaderThreads = 1;
+    private int clientThreads = 0;
+    private String host = "0.0.0.0";
+    private int port = 12345;
+    public int secondsToRun = 15;
+    private int msgSize = 1024;
+    public Class<? extends RpcEngine> rpcEngine =
+      WritableRpcEngine.class;
+    
+    private MyOptions(String args[]) {
+      try {
+        Options opts = buildOptions();
+        CommandLineParser parser = new GnuParser();
+        CommandLine line = parser.parse(opts, args, true);
+        processOptions(line, opts);
+        validateOptions();
+      } catch (ParseException e) {
+        System.err.println(e.getMessage());
+        System.err.println("Try \"--help\" option for details.");
+        failed = true;
+      }
+    }
+
+    private void validateOptions() throws ParseException {
+      if (serverThreads <= 0 && clientThreads <= 0) {
+        throw new ParseException("Must specify at least -c or -s");
+      }
+    }
+
+    @SuppressWarnings("static-access")
+    private Options buildOptions() {
+      Options opts = new Options();
+      opts.addOption(
+        OptionBuilder.withLongOpt("serverThreads").hasArg(true)
+        .withArgName("numthreads")
+        .withDescription("number of server threads (handlers) to run (or 0 to not run server)")
+        .create("s"));
+      opts.addOption(
+        OptionBuilder.withLongOpt("serverReaderThreads").hasArg(true)
+        .withArgName("threads")
+        .withDescription("number of server reader threads to run")
+        .create("r"));
+
+      
+      opts.addOption(
+        OptionBuilder.withLongOpt("clientThreads").hasArg(true)
+        .withArgName("numthreads")
+        .withDescription("number of client threads to run (or 0 to not run client)")
+        .create("c"));
+
+      opts.addOption(
+        OptionBuilder.withLongOpt("messageSize").hasArg(true)
+        .withArgName("bytes")
+        .withDescription("size of call parameter in bytes")
+        .create("m"));
+
+      opts.addOption(
+          OptionBuilder.withLongOpt("time").hasArg(true)
+          .withArgName("seconds")
+          .withDescription("number of seconds to run clients for")
+          .create("t"));
+      opts.addOption(
+          OptionBuilder.withLongOpt("port").hasArg(true)
+          .withArgName("port")
+          .withDescription("port to listen or connect on")
+          .create("p"));
+      opts.addOption(
+          OptionBuilder.withLongOpt("host").hasArg(true)
+          .withArgName("addr")
+          .withDescription("host to listen or connect on")
+          .create('h'));
+      
+      opts.addOption(
+          OptionBuilder.withLongOpt("engine").hasArg(true)
+          .withArgName("writable|protobuf")
+          .withDescription("engine to use")
+          .create('e'));
+      
+      opts.addOption(
+          OptionBuilder.withLongOpt("help").hasArg(false)
+          .withDescription("show this screen")
+          .create('?'));
+
+      return opts;
+    }
+    
+    private void processOptions(CommandLine line, Options opts)
+      throws ParseException {
+      if (line.hasOption("help") || line.hasOption('?')) {
+        HelpFormatter formatter = new HelpFormatter();
+        System.out.println("Protobuf IPC benchmark.");
+        System.out.println();
+        formatter.printHelp(100,
+            "java ... PBRPCBenchmark [options]",
+            "\nSupported options:", opts, "");
+        return;
+      }
+
+      if (line.hasOption('s')) {
+        serverThreads = Integer.parseInt(line.getOptionValue('s'));
+      }
+      if (line.hasOption('r')) {
+        serverReaderThreads = Integer.parseInt(line.getOptionValue('r'));
+      }
+      if (line.hasOption('c')) {
+        clientThreads = Integer.parseInt(line.getOptionValue('c'));
+      }
+      if (line.hasOption('t')) {
+        secondsToRun = Integer.parseInt(line.getOptionValue('t'));
+      }
+      if (line.hasOption('m')) {
+        msgSize = Integer.parseInt(line.getOptionValue('m'));
+      }
+      if (line.hasOption('p')) {
+        port = Integer.parseInt(line.getOptionValue('p'));
+      }
+      if (line.hasOption('h')) {
+        host = line.getOptionValue('h');
+      }
+      if (line.hasOption('e')) {
+        String eng = line.getOptionValue('e');
+        if ("protobuf".equals(eng)) {
+          rpcEngine = ProtobufRpcEngine.class;
+        } else if ("writable".equals(eng)) {
+          rpcEngine = WritableRpcEngine.class;
+        } else {
+          throw new ParseException("invalid engine: " + eng);
+        }
+      }
+      
+      String[] remainingArgs = line.getArgs();
+      if (remainingArgs.length != 0) {
+        throw new ParseException("Extra arguments: " +
+            Joiner.on(" ").join(remainingArgs));
+      }
+    }
+    
+    @Override
+    public String toString() {
+      return "rpcEngine=" + rpcEngine + "\nserverThreads=" + serverThreads
+          + "\nserverReaderThreads=" + serverReaderThreads + "\nclientThreads="
+          + clientThreads + "\nhost=" + host + "\nport=" + port
+          + "\nsecondsToRun=" + secondsToRun + "\nmsgSize=" + msgSize;
+    }
+  }
+
+
+  
+  private Server startServer(MyOptions opts) throws IOException {
+    if (opts.serverThreads <= 0) {
+      return null;
+    }
+    conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
+        opts.serverReaderThreads);
+    
+    RPC.Server server;
+    // Get RPC server for server side implementation
+    if (opts.rpcEngine == ProtobufRpcEngine.class) {
+      // Create server side implementation
+      PBServerImpl serverImpl = new PBServerImpl();
+      BlockingService service = TestProtobufRpcProto
+          .newReflectiveBlockingService(serverImpl);
+
+      server = RPC.getServer(TestRpcService.class, service,
+          opts.host, opts.port, opts.serverThreads, false, conf, null);
+    } else if (opts.rpcEngine == WritableRpcEngine.class) {
+      server = RPC.getServer(TestProtocol.class, new TestRPC.TestImpl(),
+          opts.host, opts.port, opts.serverThreads, false, conf, null);
+    } else {
+      throw new RuntimeException("Bad engine: " + opts.rpcEngine);
+    }
+    server.start();
+    return server;
+  }
+  
+  private long getTotalCpuTime(Iterable<? extends Thread> threads) {
+    long total = 0;
+    for (Thread t : threads) {
+      long tid = t.getId();
+      total += threadBean.getThreadCpuTime(tid);
+    }
+    return total;
+  }
+  
+  @Override
+  public int run(String[] args) throws Exception {
+    MyOptions opts = new MyOptions(args);
+    if (opts.failed) {
+      return -1;
+    }
+    
+    // Set RPC engine to the configured RPC engine
+    RPC.setProtocolEngine(conf, TestRpcService.class, opts.rpcEngine);
+
+    Server server = startServer(opts);
+    try {
+      
+      TestContext ctx = setupClientTestContext(opts);
+      if (ctx != null) {
+        long totalCalls = 0;
+        ctx.startThreads();
+        long veryStart = System.nanoTime();
+
+        // Loop printing results every second until the specified
+        // time has elapsed
+        for (int i = 0; i < opts.secondsToRun ; i++) {
+          long st = System.nanoTime();
+          ctx.waitFor(1000);
+          long et = System.nanoTime();
+          long ct = callCount.getAndSet(0);
+          totalCalls += ct;
+          double callsPerSec = (ct * 1000000000)/(et - st);
+          System.out.println("Calls per second: " + callsPerSec);
+        }
+        
+        // Print results
+
+        if (totalCalls > 0) {
+          long veryEnd = System.nanoTime();
+          double callsPerSec =
+            (totalCalls * 1000000000)/(veryEnd - veryStart);
+          long cpuNanosClient = getTotalCpuTime(ctx.getTestThreads());
+          long cpuNanosServer = -1;
+          if (server != null) {
+            cpuNanosServer = getTotalCpuTime(server.getHandlers());; 
+          }
+          System.out.println("====== Results ======");
+          System.out.println("Options:\n" + opts);
+          System.out.println("Total calls per second: " + callsPerSec);
+          System.out.println("CPU time per call on client: " +
+              (cpuNanosClient / totalCalls) + " ns");
+          if (server != null) {
+            System.out.println("CPU time per call on server: " +
+                (cpuNanosServer / totalCalls) + " ns");
+          }
+        } else {
+          System.out.println("No calls!");
+        }
+
+        ctx.stop();
+      } else {
+        while (true) {
+          Thread.sleep(10000);
+        }
+      }
+    } finally {
+      if (server != null) {
+        server.stop();
+      }
+    }
+    
+    return 0;
+  }
+
+
+  private TestContext setupClientTestContext(final MyOptions opts)
+      throws IOException, InterruptedException {
+    if (opts.clientThreads <= 0) {
+      return null;
+    }
+
+    // Set up a separate proxy for each client thread,
+    // rather than making them share TCP pipes.
+    int numProxies = opts.clientThreads;
+    final RpcServiceWrapper proxies[] = new RpcServiceWrapper[numProxies];
+    for (int i = 0; i < numProxies; i++) {
+      proxies[i] =
+        UserGroupInformation.createUserForTesting("proxy-" + i,new String[]{})
+        .doAs(new PrivilegedExceptionAction<RpcServiceWrapper>() {
+          @Override
+          public RpcServiceWrapper run() throws Exception {
+            return createRpcClient(opts);
+          }
+        });
+    }
+
+    // Create an echo message of the desired length
+    final StringBuilder msgBuilder = new StringBuilder(opts.msgSize);
+    for (int c = 0; c < opts.msgSize; c++) {
+      msgBuilder.append('x');
+    }
+    final String echoMessage = msgBuilder.toString();
+
+    // Create the clients in a test context
+    TestContext ctx = new TestContext();
+    for (int i = 0; i < opts.clientThreads; i++) {
+      final RpcServiceWrapper proxy = proxies[i % numProxies];
+      
+      ctx.addThread(new MultithreadedTestUtil.RepeatingTestThread(ctx) {
+        @Override
+        public void doAnAction() throws Exception {
+          proxy.doEcho(echoMessage);
+          callCount.incrementAndGet();
+        }
+      });
+    }
+    return ctx;
+  }
+
+  /**
+   * Simple interface that can be implemented either by the
+   * protobuf or writable implementations.
+   */
+  private interface RpcServiceWrapper {
+    public String doEcho(String msg) throws Exception;
+  }
+
+  /**
+   * Create a client proxy for the specified engine.
+   */
+  private RpcServiceWrapper createRpcClient(MyOptions opts) throws IOException {
+    InetSocketAddress addr = NetUtils.createSocketAddr(opts.host, opts.port);
+    
+    if (opts.rpcEngine == ProtobufRpcEngine.class) {
+      final TestRpcService proxy = RPC.getProxy(TestRpcService.class, 0, addr, conf);
+      return new RpcServiceWrapper() {
+        @Override
+        public String doEcho(String msg) throws Exception {
+          EchoRequestProto req = EchoRequestProto.newBuilder()
+            .setMessage(msg)
+            .build();
+          EchoResponseProto responseProto = proxy.echo(null, req);
+          return responseProto.getMessage();
+        }
+      };
+    } else if (opts.rpcEngine == WritableRpcEngine.class) {
+      final TestProtocol proxy = (TestProtocol)RPC.getProxy(
+          TestProtocol.class, TestProtocol.versionID, addr, conf);
+      return new RpcServiceWrapper() {
+        @Override
+        public String doEcho(String msg) throws Exception {
+          return proxy.echo(msg);
+        }
+      };
+    } else {
+      throw new RuntimeException("unsupported engine: " + opts.rpcEngine);
+    }
+  }
+
+  public static void main(String []args) throws Exception {
+    int rc = ToolRunner.run(new RPCCallBenchmark(), args);
+    System.exit(rc);
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+}

+ 53 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCallBenchmark.java

@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+
+public class TestRPCCallBenchmark {
+
+  @Test(timeout=20000)
+  public void testBenchmarkWithWritable() throws Exception {
+    int rc = ToolRunner.run(new RPCCallBenchmark(),
+        new String[] {
+      "--clientThreads", "30",
+      "--serverThreads", "30",
+      "--time", "5",
+      "--serverReaderThreads", "4",
+      "--messageSize", "1024",
+      "--engine", "writable"});
+    assertEquals(0, rc);
+  }
+  
+  @Test(timeout=20000)
+  public void testBenchmarkWithProto() throws Exception {
+    int rc = ToolRunner.run(new RPCCallBenchmark(),
+        new String[] {
+      "--clientThreads", "30",
+      "--serverThreads", "30",
+      "--time", "5",
+      "--serverReaderThreads", "4",
+      "--messageSize", "1024",
+      "--engine", "protobuf"});
+    assertEquals(0, rc);
+  }
+}

+ 4 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java

@@ -164,6 +164,10 @@ public abstract class MultithreadedTestUtil {
       }
       checkException();
     }
+
+    public Iterable<? extends Thread> getTestThreads() {
+      return testThreads;
+    }
   }
 
   /**