Selaa lähdekoodia

HDFS-13331. [SBN read] Add lastSeenStateId to RpcRequestHeader. Contributed by Plamen Jeliazkov.

Erik Krogen 7 vuotta sitten
vanhempi
commit
d0c85f83da

+ 14 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.ipc;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 
 /**
@@ -48,4 +49,17 @@ public interface AlignmentContext {
    */
   void receiveResponseState(RpcResponseHeaderProto header);
 
+  /**
+   * This is the intended client method call to pull last seen state info
+   * into RPC request processing.
+   * @param header The RPC request header builder.
+   */
+  void updateRequestState(RpcRequestHeaderProto.Builder header);
+
+  /**
+   * This is the intended server method call to implement to receive
+   * client state info during RPC response header processing.
+   * @param header The RPC request header.
+   */
+  void receiveRequestState(RpcRequestHeaderProto header);
 }

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

@@ -1115,7 +1115,7 @@ public class Client implements AutoCloseable {
       // Items '1' and '2' are prepared here. 
       RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
           call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
-          clientId);
+          clientId, alignmentContext);
 
       final ResponseBuffer buf = new ResponseBuffer();
       header.writeDelimitedTo(buf);

+ 5 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -2408,6 +2408,11 @@ public abstract class Server {
         }
       }
 
+      if (alignmentContext != null) {
+        // Check incoming RPC request's state.
+        alignmentContext.receiveRequestState(header);
+      }
+
       CallerContext callerContext = null;
       if (header.hasCallerContext()) {
         callerContext =

+ 13 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.util;
 import java.io.DataInput;
 import java.io.IOException;
 
+import org.apache.hadoop.ipc.AlignmentContext;
 import org.apache.hadoop.ipc.CallerContext;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
@@ -165,6 +166,13 @@ public abstract class ProtoUtil {
   public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind,
       RpcRequestHeaderProto.OperationProto operation, int callId,
       int retryCount, byte[] uuid) {
+    return makeRpcRequestHeader(rpcKind, operation, callId, retryCount, uuid,
+        null);
+  }
+
+  public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind,
+      RpcRequestHeaderProto.OperationProto operation, int callId,
+      int retryCount, byte[] uuid, AlignmentContext alignmentContext) {
     RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder();
     result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId)
         .setRetryCount(retryCount).setClientId(ByteString.copyFrom(uuid));
@@ -190,6 +198,11 @@ public abstract class ProtoUtil {
       result.setCallerContext(contextBuilder);
     }
 
+    // Add alignment context if it is not null
+    if (alignmentContext != null) {
+      alignmentContext.updateRequestState(result);
+    }
+
     return result.build();
   }
 }

+ 1 - 0
hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto

@@ -90,6 +90,7 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
   optional sint32 retryCount = 5 [default = -1];
   optional RPCTraceInfoProto traceInfo = 6; // tracing info
   optional RPCCallerContextProto callerContext = 7; // call context
+  optional int64 stateId = 8; // The last seen Global State ID
 }
 
 

+ 21 - 8
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java

@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.ipc.AlignmentContext;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 
 /**
@@ -32,15 +33,10 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 @InterfaceStability.Stable
 class ClientGCIContext implements AlignmentContext {
 
-  private final DFSClient dfsClient;
   private final AtomicLong lastSeenStateId = new AtomicLong(Long.MIN_VALUE);
 
-  /**
-   * Client side constructor.
-   * @param dfsClient client side state receiver
-   */
-  ClientGCIContext(DFSClient dfsClient) {
-    this.dfsClient = dfsClient;
+  long getLastSeenStateId() {
+    return lastSeenStateId.get();
   }
 
   /**
@@ -53,13 +49,30 @@ class ClientGCIContext implements AlignmentContext {
   }
 
   /**
-   * Client side implementation for receiving state alignment info.
+   * Client side implementation for receiving state alignment info in responses.
    */
   @Override
   public void receiveResponseState(RpcResponseHeaderProto header) {
     updateMax(header.getStateId());
   }
 
+  /**
+   * Client side implementation for providing state alignment info in requests.
+   */
+  @Override
+  public void updateRequestState(RpcRequestHeaderProto.Builder header) {
+    header.setStateId(lastSeenStateId.longValue());
+  }
+
+  /**
+   * Client side implementation only provides state alignment info in requests.
+   * Client does not receive RPC requests therefore this does nothing.
+   */
+  @Override
+  public void receiveRequestState(RpcRequestHeaderProto header) {
+    // Do nothing.
+  }
+
   private void updateMax(long sample) {
     while (true) {
       long curMax = lastSeenStateId.get();

+ 8 - 2
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -218,7 +218,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   final UserGroupInformation ugi;
   volatile boolean clientRunning = true;
   volatile long lastLeaseRenewal;
-  volatile long lastSeenStateId;
   private volatile FsServerDefaults serverDefaults;
   private volatile long serverDefaultsLastUpdate;
   final String clientName;
@@ -240,6 +239,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
   private final int smallBufferSize;
   private final long serverDefaultsValidityPeriod;
+  private final ClientGCIContext alignmentContext;
 
   public DfsClientConf getConf() {
     return dfsClientConf;
@@ -391,7 +391,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     this.saslClient = new SaslDataTransferClient(
         conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
         TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
-    Client.setAlignmentContext(new ClientGCIContext(this));
+    this.alignmentContext = new ClientGCIContext();
+    Client.setAlignmentContext(alignmentContext);
   }
 
   /**
@@ -540,6 +541,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     return clientRunning;
   }
 
+  @VisibleForTesting
+  ClientGCIContext getAlignmentContext() {
+    return alignmentContext;
+  }
+
   long getLastLeaseRenewal() {
     return lastLeaseRenewal;
   }

+ 25 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.ipc.AlignmentContext;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 
 /**
@@ -41,7 +42,7 @@ class GlobalStateIdContext implements AlignmentContext {
   }
 
   /**
-   * Server side implementation for providing state alignment info.
+   * Server side implementation for providing state alignment info in responses.
    */
   @Override
   public void updateResponseState(RpcResponseHeaderProto.Builder header) {
@@ -56,4 +57,27 @@ class GlobalStateIdContext implements AlignmentContext {
   public void receiveResponseState(RpcResponseHeaderProto header) {
     // Do nothing.
   }
+
+  /**
+   * Server side implementation only receives state alignment info.
+   * It does not build RPC requests therefore this does nothing.
+   */
+  @Override
+  public void updateRequestState(RpcRequestHeaderProto.Builder header) {
+    // Do nothing.
+  }
+
+  /**
+   * Server side implementation for processing state alignment info in requests.
+   */
+  @Override
+  public void receiveRequestState(RpcRequestHeaderProto header) {
+    long serverStateId = namesystem.getLastWrittenTransactionId();
+    long clientStateId = header.getStateId();
+    if (clientStateId > serverStateId) {
+      FSNamesystem.LOG.warn("A client sent stateId: " + clientStateId +
+          ", but server state is: " + serverStateId);
+    }
+  }
+
 }

+ 94 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java

@@ -18,20 +18,33 @@
 
 package org.apache.hadoop.hdfs;
 
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
 import static org.junit.Assert.assertThat;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.ipc.AlignmentContext;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
 
 /**
  * Class is used to test server sending state alignment information to clients
@@ -91,7 +104,7 @@ public class TestStateAlignmentContext {
   public void testStateTransferOnWrite() throws Exception {
     long preWriteState = cluster.getNamesystem().getLastWrittenTransactionId();
     DFSTestUtil.writeFile(dfs, new Path("/testFile1"), "abc");
-    long clientState = dfs.dfs.lastSeenStateId;
+    long clientState = dfs.dfs.getAlignmentContext().getLastSeenStateId();
     long postWriteState = cluster.getNamesystem().getLastWrittenTransactionId();
     // Write(s) should have increased state. Check for greater than.
     assertThat(clientState > preWriteState, is(true));
@@ -109,7 +122,8 @@ public class TestStateAlignmentContext {
     long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId();
     DFSTestUtil.readFile(dfs, new Path("/testFile2"));
     // Read should catch client up to last written state.
-    assertThat(dfs.dfs.lastSeenStateId, is(lastWrittenId));
+    long clientState = dfs.dfs.getAlignmentContext().getLastSeenStateId();
+    assertThat(clientState, is(lastWrittenId));
   }
 
   /**
@@ -122,10 +136,86 @@ public class TestStateAlignmentContext {
     long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId();
     try (DistributedFileSystem clearDfs =
              (DistributedFileSystem) FileSystem.get(CONF)) {
-      assertThat(clearDfs.dfs.lastSeenStateId, is(0L));
+      ClientGCIContext clientState = clearDfs.dfs.getAlignmentContext();
+      assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE));
       DFSTestUtil.readFile(clearDfs, new Path("/testFile3"));
-      assertThat(clearDfs.dfs.lastSeenStateId, is(lastWrittenId));
+      assertThat(clientState.getLastSeenStateId(), is(lastWrittenId));
     }
   }
 
+  /**
+   * This test mocks an AlignmentContext and ensures that DFSClient
+   * writes its lastSeenStateId into RPC requests.
+   */
+  @Test
+  public void testClientSendsState() throws Exception {
+    AlignmentContext alignmentContext = dfs.dfs.getAlignmentContext();
+    AlignmentContext spiedAlignContext = Mockito.spy(alignmentContext);
+    Client.setAlignmentContext(spiedAlignContext);
+
+    // Collect RpcRequestHeaders for verification later.
+    final List<RpcHeaderProtos.RpcRequestHeaderProto.Builder> collectedHeaders =
+        new ArrayList<>();
+    Mockito.doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock a) throws Throwable {
+        Object[] arguments = a.getArguments();
+        RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
+            (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
+        collectedHeaders.add(header);
+        return a.callRealMethod();
+      }
+    }).when(spiedAlignContext).updateRequestState(Mockito.any());
+
+    DFSTestUtil.writeFile(dfs, new Path("/testFile4"), "shv");
+
+    // Ensure first header and last header have different state.
+    assertThat(collectedHeaders.size() > 1, is(true));
+    assertThat(collectedHeaders.get(0).getStateId(),
+        is(not(collectedHeaders.get(collectedHeaders.size() - 1))));
+
+    // Ensure collected RpcRequestHeaders are in increasing order.
+    long lastHeader = collectedHeaders.get(0).getStateId();
+    for(RpcHeaderProtos.RpcRequestHeaderProto.Builder header :
+        collectedHeaders.subList(1, collectedHeaders.size())) {
+      long currentHeader = header.getStateId();
+      assertThat(currentHeader >= lastHeader, is(true));
+      lastHeader = header.getStateId();
+    }
+  }
+
+  /**
+   * This test mocks an AlignmentContext to send stateIds greater than
+   * server's stateId in RPC requests.
+   */
+  @Test
+  public void testClientSendsGreaterState() throws Exception {
+    AlignmentContext alignmentContext = dfs.dfs.getAlignmentContext();
+    AlignmentContext spiedAlignContext = Mockito.spy(alignmentContext);
+    Client.setAlignmentContext(spiedAlignContext);
+
+    // Make every client call have a stateId > server's stateId.
+    Mockito.doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock a) throws Throwable {
+        Object[] arguments = a.getArguments();
+        RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
+            (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
+        try {
+          return a.callRealMethod();
+        } finally {
+          header.setStateId(Long.MAX_VALUE);
+        }
+      }
+    }).when(spiedAlignContext).updateRequestState(Mockito.any());
+
+    GenericTestUtils.LogCapturer logCapturer =
+        GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.LOG);
+    DFSTestUtil.writeFile(dfs, new Path("/testFile4"), "shv");
+    logCapturer.stopCapturing();
+
+    String output = logCapturer.getOutput();
+    assertThat(output, containsString("A client sent stateId: "));
+  }
+
 }