浏览代码

HDFS-17156. Client may receive old state ID which will lead to inconsistent reads. (#5951)

Reviewed-by: Simbarashe Dzinamarira <sdzinamarira@linkedin.com>
Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
(cherry picked from commit 42b4525f75b828bf58170187f030b08622e238ab)
Chunyi Yang 1 年之前
父节点
当前提交
ebad33fd2d

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

@@ -1198,10 +1198,10 @@ public class Client implements AutoCloseable {
         if (status == RpcStatusProto.SUCCESS) {
           Writable value = packet.newInstance(valueClass, conf);
           final Call call = calls.remove(callId);
-          call.setRpcResponse(value);
           if (call.alignmentContext != null) {
             call.alignmentContext.receiveResponseState(header);
           }
+          call.setRpcResponse(value);
         }
         // verify that packet length was correct
         if (packet.remaining() > 0) {

+ 39 - 1
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java

@@ -82,6 +82,7 @@ import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.ipc.RPC.RpcKind;
 import org.apache.hadoop.ipc.Server.Call;
 import org.apache.hadoop.ipc.Server.Connection;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 import org.apache.hadoop.net.ConnectTimeoutException;
 import org.apache.hadoop.net.NetUtils;
@@ -162,9 +163,15 @@ public class TestIPC {
   static LongWritable call(Client client, LongWritable param,
       InetSocketAddress addr, int rpcTimeout, Configuration conf)
           throws IOException {
+    return call(client, param, addr, rpcTimeout, conf, null);
+  }
+
+  static LongWritable call(Client client, LongWritable param,
+      InetSocketAddress addr, int rpcTimeout, Configuration conf, AlignmentContext alignmentContext)
+      throws IOException {
     final ConnectionId remoteId = getConnectionId(addr, rpcTimeout, conf);
     return (LongWritable)client.call(RPC.RpcKind.RPC_BUILTIN, param, remoteId,
-        RPC.RPC_SERVICE_CLASS_DEFAULT, null);
+        RPC.RPC_SERVICE_CLASS_DEFAULT, null, alignmentContext);
   }
 
   static class TestServer extends Server {
@@ -1332,6 +1339,37 @@ public class TestIPC {
       server.stop();
     }
   }
+
+  /**
+   * Verify that stateID is received into call before
+   * caller is notified.
+   * @throws IOException
+   */
+  @Test(timeout=60000)
+  public void testReceiveStateBeforeCallerNotification() throws IOException {
+    AtomicBoolean stateReceived = new AtomicBoolean(false);
+    AlignmentContext alignmentContext = Mockito.mock(AlignmentContext.class);
+    Mockito.doAnswer((Answer<Void>) invocation -> {
+      Thread.sleep(1000);
+      stateReceived.set(true);
+      return null;
+    }).when(alignmentContext)
+        .receiveResponseState(any(RpcHeaderProtos.RpcResponseHeaderProto.class));
+
+    final Client client = new Client(LongWritable.class, conf);
+    final TestServer server = new TestServer(1, false);
+
+    try {
+      InetSocketAddress addr = NetUtils.getConnectAddress(server);
+      server.start();
+      call(client, new LongWritable(RANDOM.nextLong()), addr,
+          0, conf, alignmentContext);
+      Assert.assertTrue(stateReceived.get());
+    } finally {
+      client.stop();
+      server.stop();
+    }
+  }
   
   /** A dummy protocol */
   interface DummyProtocol {