瀏覽代碼

HADOOP-10300. Allowed deferred sending of call responses. (Daryn Sharp via yliu)

yliu 9 年之前
父節點
當前提交
146f297d7d

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -33,6 +33,9 @@ Release 2.8.0 - UNRELEASED
 
     HADOOP-12360. Create StatsD metrics2 sink. (Dave Marion via stevel)
 
+    HADOOP-10300. Allowed deferred sending of call responses. (Daryn Sharp via
+    yliu)
+
   IMPROVEMENTS
 
     HADOOP-12458. Retries is typoed to spell Retires in parts of

+ 58 - 18
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -576,6 +576,7 @@ public abstract class Server {
     private long timestamp;               // time received when response is null
                                           // time served when response is not null
     private ByteBuffer rpcResponse;       // the response for this call
+    private AtomicInteger responseWaitCount = new AtomicInteger(1);
     private final RPC.RpcKind rpcKind;
     private final byte[] clientId;
     private final TraceScope traceScope; // the HTrace scope on the server side
@@ -610,10 +611,47 @@ public abstract class Server {
           + retryCount;
     }
 
+    public void setResponse(Throwable t) throws IOException {
+      setupResponse(new ByteArrayOutputStream(), this,
+          RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER,
+          null, t.getClass().getName(), StringUtils.stringifyException(t));
+    }
+
     public void setResponse(ByteBuffer response) {
       this.rpcResponse = response;
     }
 
+    /**
+     * Allow a IPC response to be postponed instead of sent immediately
+     * after the handler returns from the proxy method.  The intended use
+     * case is freeing up the handler thread when the response is known,
+     * but an expensive pre-condition must be satisfied before it's sent
+     * to the client.
+     */
+    @InterfaceStability.Unstable
+    @InterfaceAudience.LimitedPrivate({"HDFS"})
+    public void postponeResponse() {
+      int count = responseWaitCount.incrementAndGet();
+      assert count > 0 : "response has already been sent";
+    }
+
+    @InterfaceStability.Unstable
+    @InterfaceAudience.LimitedPrivate({"HDFS"})
+    public void sendResponse() throws IOException {
+      int count = responseWaitCount.decrementAndGet();
+      assert count >= 0 : "response has already been sent";
+      if (count == 0) {
+        if (rpcResponse == null) {
+          // needed by postponed operations to indicate an exception has
+          // occurred.  it's too late to re-encode the response so just
+          // drop the connection.
+          connection.close();
+        } else {
+          connection.sendResponse(this);
+        }
+      }
+    }
+
     // For Schedulable
     @Override
     public UserGroupInformation getUserGroupInformation() {
@@ -1224,10 +1262,6 @@ public abstract class Server {
         RpcConstants.INVALID_RETRY_COUNT, null, this);
     private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
     
-    private final Call saslCall = new Call(AuthProtocol.SASL.callId,
-        RpcConstants.INVALID_RETRY_COUNT, null, this);
-    private final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream();
-    
     private boolean sentNegotiate = false;
     private boolean useWrap = false;
     
@@ -1502,21 +1536,24 @@ public abstract class Server {
       }
       return response.build();
     }
-    
+
     private void doSaslReply(Message message) throws IOException {
+      final Call saslCall = new Call(AuthProtocol.SASL.callId,
+          RpcConstants.INVALID_RETRY_COUNT, null, this);
+      final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream();
       setupResponse(saslResponse, saslCall,
           RpcStatusProto.SUCCESS, null,
           new RpcResponseWrapper(message), null, null);
-      responder.doRespond(saslCall);
+      saslCall.sendResponse();
     }
-    
+
     private void doSaslReply(Exception ioe) throws IOException {
       setupResponse(authFailedResponse, authFailedCall,
           RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_UNAUTHORIZED,
           null, ioe.getClass().getName(), ioe.getLocalizedMessage());
-      responder.doRespond(authFailedCall);
+      authFailedCall.sendResponse();
     }
-    
+
     private void disposeSasl() {
       if (saslServer != null) {
         try {
@@ -1692,7 +1729,7 @@ public abstract class Server {
         setupResponse(buffer, fakeCall, 
             RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_VERSION_MISMATCH,
             null, VersionMismatch.class.getName(), errMsg);
-        responder.doRespond(fakeCall);
+        fakeCall.sendResponse();
       } else if (clientVersion >= 3) {
         Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null,
             this);
@@ -1700,7 +1737,7 @@ public abstract class Server {
         setupResponseOldVersionFatal(buffer, fakeCall,
             null, VersionMismatch.class.getName(), errMsg);
 
-        responder.doRespond(fakeCall);
+        fakeCall.sendResponse();
       } else if (clientVersion == 2) { // Hadoop 0.18.3
         Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null,
             this);
@@ -1710,8 +1747,7 @@ public abstract class Server {
         WritableUtils.writeString(out, VersionMismatch.class.getName());
         WritableUtils.writeString(out, errMsg);
         fakeCall.setResponse(ByteBuffer.wrap(buffer.toByteArray()));
-        
-        responder.doRespond(fakeCall);
+        fakeCall.sendResponse();
       }
     }
     
@@ -1719,7 +1755,7 @@ public abstract class Server {
       Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null, this);
       fakeCall.setResponse(ByteBuffer.wrap(
           RECEIVED_HTTP_REQ_RESPONSE.getBytes(Charsets.UTF_8)));
-      responder.doRespond(fakeCall);
+      fakeCall.sendResponse();
     }
 
     /** Reads the connection context following the connection header
@@ -1859,7 +1895,7 @@ public abstract class Server {
         setupResponse(authFailedResponse, call,
             RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null,
             ioe.getClass().getName(), ioe.getMessage());
-        responder.doRespond(call);
+        call.sendResponse();
         throw wrse;
       }
     }
@@ -2065,6 +2101,10 @@ public abstract class Server {
       }
     }
 
+    private void sendResponse(Call call) throws IOException {
+      responder.doRespond(call);
+    }
+
     /**
      * Get service class for connection
      * @return the serviceClass
@@ -2202,7 +2242,7 @@ public abstract class Server {
                   + call.toString());
               buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
             }
-            responder.doRespond(call);
+            call.sendResponse();
           }
         } catch (InterruptedException e) {
           if (running) {                          // unexpected -- log it
@@ -2398,7 +2438,7 @@ public abstract class Server {
    * @param error error message, if the call failed
    * @throws IOException
    */
-  private void setupResponse(ByteArrayOutputStream responseBuf,
+  private static void setupResponse(ByteArrayOutputStream responseBuf,
                              Call call, RpcStatusProto status, RpcErrorCodeProto erCode,
                              Writable rv, String errorClass, String error) 
   throws IOException {
@@ -2494,7 +2534,7 @@ public abstract class Server {
   }
   
   
-  private void wrapWithSasl(ByteArrayOutputStream response, Call call)
+  private static void wrapWithSasl(ByteArrayOutputStream response, Call call)
       throws IOException {
     if (call.connection.saslServer != null) {
       byte[] token = response.toByteArray();

+ 151 - 10
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java

@@ -18,35 +18,43 @@
 
 package org.apache.hadoop.ipc;
 
+import static org.junit.Assert.*;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Random;
-
-import junit.framework.TestCase;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.Server.Call;
 import org.apache.hadoop.net.NetUtils;
+import org.junit.Assert;
+import org.junit.Test;
 
 /**
  * This test provokes partial writes in the server, which is 
  * serving multiple clients.
  */
-public class TestIPCServerResponder extends TestCase {
+public class TestIPCServerResponder {
 
   public static final Log LOG = 
             LogFactory.getLog(TestIPCServerResponder.class);
 
   private static Configuration conf = new Configuration();
 
-  public TestIPCServerResponder(final String name) {
-    super(name);
-  }
-
   private static final Random RANDOM = new Random();
 
   private static final String ADDRESS = "0.0.0.0";
@@ -115,21 +123,23 @@ public class TestIPCServerResponder extends TestCase {
     }
   }
 
+  @Test
   public void testResponseBuffer() 
       throws IOException, InterruptedException {
     Server.INITIAL_RESP_BUF_SIZE = 1;
     conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
                 1);
-    testServerResponder(1, true, 1, 1, 5);
+    checkServerResponder(1, true, 1, 1, 5);
     conf = new Configuration(); // reset configuration
   }
 
+  @Test
   public void testServerResponder()
       throws IOException, InterruptedException {
-    testServerResponder(10, true, 1, 10, 200);
+    checkServerResponder(10, true, 1, 10, 200);
   }
 
-  public void testServerResponder(final int handlerCount, 
+  public void checkServerResponder(final int handlerCount, 
                                   final boolean handlerSleep, 
                                   final int clientCount,
                                   final int callerCount,
@@ -159,4 +169,135 @@ public class TestIPCServerResponder extends TestCase {
     server.stop();
   }
 
+  // Test that IPC calls can be marked for a deferred response.
+  // call 0: immediate
+  // call 1: immediate
+  // call 2: delayed with wait for 1 sendResponse, check if blocked
+  // call 3: immediate, proves handler is freed
+  // call 4: delayed with wait for 2 sendResponses, check if blocked
+  // call 2: sendResponse, should return
+  // call 4: sendResponse, should remain blocked
+  // call 5: immediate, prove handler is still free
+  // call 4: sendResponse, expect it to return
+  @Test(timeout=10000)
+  public void testDeferResponse() throws IOException, InterruptedException {
+    final AtomicReference<Call> deferredCall = new AtomicReference<Call>();
+    final AtomicInteger count = new AtomicInteger();
+    final Writable wait0 = new IntWritable(0);
+    final Writable wait1 = new IntWritable(1);
+    final Writable wait2 = new IntWritable(2);
+
+    // use only 1 handler to prove it's freed after every call
+    Server server = new Server(ADDRESS, 0, IntWritable.class, 1, conf){
+      @Override
+      public Writable call(RPC.RpcKind rpcKind, String protocol,
+          Writable waitCount, long receiveTime) throws IOException {
+        Call call = Server.getCurCall().get();
+        int wait = ((IntWritable)waitCount).get();
+        while (wait-- > 0) {
+          call.postponeResponse();
+          deferredCall.set(call);
+        }
+        return new IntWritable(count.getAndIncrement());
+      }
+    };
+    server.start();
+
+    final InetSocketAddress address = NetUtils.getConnectAddress(server);
+    final Client client = new Client(IntWritable.class, conf);
+    Call[] waitingCalls = new Call[2];
+
+    // calls should return immediately, check the sequence number is
+    // increasing
+    assertEquals(0,
+        ((IntWritable)client.call(wait0, address)).get());
+    assertEquals(1,
+        ((IntWritable)client.call(wait0, address)).get());
+
+    // do a call in the background that will have a deferred response
+    final ExecutorService exec = Executors.newCachedThreadPool();
+    Future<Integer> future1 = exec.submit(new Callable<Integer>() {
+      @Override
+      public Integer call() throws IOException {
+        return ((IntWritable)client.call(wait1, address)).get();
+      }
+    });
+    // make sure it blocked
+    try {
+      future1.get(1, TimeUnit.SECONDS);
+      Assert.fail("ipc shouldn't have responded");
+    } catch (TimeoutException te) {
+      // ignore, expected
+    } catch (Exception ex) {
+      Assert.fail("unexpected exception:"+ex);
+    }
+    assertFalse(future1.isDone());
+    waitingCalls[0] = deferredCall.get();
+    assertNotNull(waitingCalls[0]);
+
+    // proves the handler isn't tied up, and that the prior sequence number
+    // was consumed
+    assertEquals(3,
+        ((IntWritable)client.call(wait0, address)).get());
+
+    // another call with wait count of 2
+    Future<Integer> future2 = exec.submit(new Callable<Integer>() {
+      @Override
+      public Integer call() throws IOException {
+        return ((IntWritable)client.call(wait2, address)).get();
+      }
+    });
+    // make sure it blocked
+    try {
+      future2.get(1, TimeUnit.SECONDS);
+      Assert.fail("ipc shouldn't have responded");
+    } catch (TimeoutException te) {
+      // ignore, expected
+    } catch (Exception ex) {
+      Assert.fail("unexpected exception:"+ex);
+    }
+    assertFalse(future2.isDone());
+    waitingCalls[1] = deferredCall.get();
+    assertNotNull(waitingCalls[1]);
+
+    // the background calls should still be blocked
+    assertFalse(future1.isDone());
+    assertFalse(future2.isDone());
+
+    // trigger responses
+    waitingCalls[0].sendResponse();
+    waitingCalls[1].sendResponse();
+    try {
+      int val = future1.get(1, TimeUnit.SECONDS);
+      assertEquals(2, val);
+    } catch (Exception ex) {
+      Assert.fail("unexpected exception:"+ex);
+    }
+
+    // make sure it's still blocked
+    try {
+      future2.get(1, TimeUnit.SECONDS);
+      Assert.fail("ipc shouldn't have responded");
+    } catch (TimeoutException te) {
+      // ignore, expected
+    } catch (Exception ex) {
+      Assert.fail("unexpected exception:"+ex);
+    }
+    assertFalse(future2.isDone());
+
+    // call should return immediately
+    assertEquals(5,
+        ((IntWritable)client.call(wait0, address)).get());
+
+    // trigger last waiting call
+    waitingCalls[1].sendResponse();
+    try {
+      int val = future2.get(1, TimeUnit.SECONDS);
+      assertEquals(4, val);
+    } catch (Exception ex) {
+      Assert.fail("unexpected exception:"+ex);
+    }
+
+    server.stop();
+  }
 }