Przeglądaj źródła

HDFS-13880. Add mechanism to allow certain RPC calls to bypass sync. Contributed by Chen Liang.

Chen Liang 6 lat temu
rodzic
commit
bf9d8ba66b

+ 16 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java

@@ -38,6 +38,7 @@ public interface AlignmentContext {
   /**
    * This is the intended server method call to implement to pass state info
    * during RPC response header construction.
+   *
    * @param header The RPC response header builder.
    */
   void updateResponseState(RpcResponseHeaderProto.Builder header);
@@ -45,6 +46,7 @@ public interface AlignmentContext {
   /**
    * This is the intended client method call to implement to recieve state info
    * during RPC response processing.
+   *
    * @param header The RPC response header.
    */
   void receiveResponseState(RpcResponseHeaderProto header);
@@ -52,6 +54,7 @@ public interface AlignmentContext {
   /**
    * This is the intended client method call to pull last seen state info
    * into RPC request processing.
+   *
    * @param header The RPC request header builder.
    */
   void updateRequestState(RpcRequestHeaderProto.Builder header);
@@ -59,6 +62,7 @@ public interface AlignmentContext {
   /**
    * This is the intended server method call to implement to receive
    * client state info during RPC response header processing.
+   *
    * @param header The RPC request header.
    * @return state id of in the request header.
    */
@@ -66,7 +70,19 @@ public interface AlignmentContext {
 
   /**
    * Returns the last seen state id of the alignment context instance.
+   *
    * @return the value of the last seen state id.
    */
   long getLastSeenStateId();
+
+  /**
+   * Return true if this method call does need to be synced, false
+   * otherwise. sync meaning server state needs to have caught up with
+   * client state.
+   *
+   * @param protocolName the name of the protocol
+   * @param method the method call to check
+   * @return true if this method is async, false otherwise.
+   */
+  boolean isCoordinatedCall(String protocolName, String method);
 }

+ 37 - 5
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -705,6 +705,7 @@ public abstract class Server {
     private int priorityLevel;
     // the priority level assigned by scheduler, 0 by default
     private long clientStateId;
+    private boolean isCallCoordinated;
 
     Call() {
       this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT,
@@ -736,6 +737,7 @@ public abstract class Server {
       this.traceScope = traceScope;
       this.callerContext = callerContext;
       this.clientStateId = Long.MIN_VALUE;
+      this.isCallCoordinated = false;
     }
 
     @Override
@@ -821,6 +823,14 @@ public abstract class Server {
       this.clientStateId = stateId;
     }
 
+    public void markCallCoordinated(boolean flag) {
+      this.isCallCoordinated = flag;
+    }
+
+    public boolean isCallCoordinated() {
+      return this.isCallCoordinated;
+    }
+
     @InterfaceStability.Unstable
     public void deferResponse() {
       this.deferredResponse = true;
@@ -2448,9 +2458,31 @@ public abstract class Server {
 
       // Save the priority level assignment by the scheduler
       call.setPriorityLevel(callQueue.getPriorityLevel(call));
-      if(alignmentContext != null) {
-        long stateId = alignmentContext.receiveRequestState(header);
-        call.setClientStateId(stateId);
+      if(alignmentContext != null && call.rpcRequest != null &&
+          (call.rpcRequest instanceof ProtobufRpcEngine.RpcProtobufRequest)) {
+        // if call.rpcRequest is not RpcProtobufRequest, will skip the following
+        // step and treat the call as uncoordinated. As currently only certain
+        // ClientProtocol methods request made through RPC protobuf needs to be
+        // coordinated.
+        String methodName;
+        String protoName;
+        try {
+          ProtobufRpcEngine.RpcProtobufRequest req =
+              (ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest;
+          methodName = req.getRequestHeader().getMethodName();
+          protoName = req.getRequestHeader().getDeclaringClassProtocolName();
+        } catch (IOException ioe) {
+          throw new RpcServerException("Rpc request header check fail", ioe);
+        }
+        if (!alignmentContext.isCoordinatedCall(protoName, methodName)) {
+          call.markCallCoordinated(false);
+        } else {
+          call.markCallCoordinated(true);
+          long stateId = alignmentContext.receiveRequestState(header);
+          call.setClientStateId(stateId);
+        }
+      } else {
+        call.markCallCoordinated(false);
       }
 
       try {
@@ -2634,8 +2666,8 @@ public abstract class Server {
         TraceScope traceScope = null;
         try {
           final Call call = callQueue.take(); // pop the queue; maybe blocked here
-          if (alignmentContext != null && call.getClientStateId() >
-              alignmentContext.getLastSeenStateId()) {
+          if (alignmentContext != null && call.isCallCoordinated() &&
+              call.getClientStateId() > alignmentContext.getLastSeenStateId()) {
             /*
              * The call processing should be postponed until the client call's
              * state id is aligned (>=) with the server state id.

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java

@@ -42,6 +42,12 @@ public class ClientGSIContext implements AlignmentContext {
     return lastSeenStateId.get();
   }
 
+  @Override
+  public boolean isCoordinatedCall(String protocolName, String method) {
+    throw new UnsupportedOperationException(
+        "Client should not be checking uncoordinated call");
+  }
+
   /**
    * Client side implementation only receives state alignment info.
    * It does not provide state alignment info therefore this does nothing.

+ 27 - 27
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

@@ -126,7 +126,7 @@ public interface ClientProtocol {
    * @throws IOException If an I/O error occurred
    */
   @Idempotent
-  @ReadOnly(atimeAffected = true)
+  @ReadOnly(atimeAffected = true, isCoordinated = true)
   LocatedBlocks getBlockLocations(String src, long offset, long length)
       throws IOException;
 
@@ -136,7 +136,7 @@ public interface ClientProtocol {
    * @throws IOException
    */
   @Idempotent
-  @ReadOnly
+  @ReadOnly(isCoordinated = true)
   FsServerDefaults getServerDefaults() throws IOException;
 
   /**
@@ -269,7 +269,7 @@ public interface ClientProtocol {
    * @return All the in-use block storage policies currently.
    */
   @Idempotent
-  @ReadOnly
+  @ReadOnly(isCoordinated = true)
   BlockStoragePolicy[] getStoragePolicies() throws IOException;
 
   /**
@@ -312,7 +312,7 @@ public interface ClientProtocol {
    *           If file/dir <code>src</code> is not found
    */
   @Idempotent
-  @ReadOnly
+  @ReadOnly(isCoordinated = true)
   BlockStoragePolicy getStoragePolicy(String path) throws IOException;
 
   /**
@@ -679,7 +679,7 @@ public interface ClientProtocol {
    * @throws IOException If an I/O error occurred
    */
   @Idempotent
-  @ReadOnly
+  @ReadOnly(isCoordinated = true)
   DirectoryListing getListing(String src, byte[] startAfter,
       boolean needLocation) throws IOException;
 
@@ -690,7 +690,7 @@ public interface ClientProtocol {
    * @throws IOException If an I/O error occurred
    */
   @Idempotent
-  @ReadOnly
+  @ReadOnly(isCoordinated = true)
   SnapshottableDirectoryStatus[] getSnapshottableDirListing()
       throws IOException;
 
@@ -793,7 +793,7 @@ public interface ClientProtocol {
    *           a symlink.
    */
   @Idempotent
-  @ReadOnly
+  @ReadOnly(isCoordinated = true)
   long getPreferredBlockSize(String filename)
       throws IOException;
 
@@ -933,7 +933,7 @@ public interface ClientProtocol {
    * cookie returned from the previous call.
    */
   @Idempotent
-  @ReadOnly
+  @ReadOnly(isCoordinated = true)
   CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
       throws IOException;
 
@@ -969,7 +969,7 @@ public interface ClientProtocol {
    * @throws IOException If an I/O error occurred
    */
   @Idempotent
-  @ReadOnly
+  @ReadOnly(isCoordinated = true)
   HdfsFileStatus getFileInfo(String src) throws IOException;
 
   /**
@@ -984,7 +984,7 @@ public interface ClientProtocol {
    * @throws IOException If an I/O error occurred
    */
   @Idempotent
-  @ReadOnly
+  @ReadOnly(isCoordinated = true)
   boolean isFileClosed(String src) throws IOException;
 
   /**
@@ -1001,7 +1001,7 @@ public interface ClientProtocol {
    * @throws IOException If an I/O error occurred
    */
   @Idempotent
-  @ReadOnly
+  @ReadOnly(isCoordinated = true)
   HdfsFileStatus getFileLinkInfo(String src) throws IOException;
 
   /**
@@ -1015,7 +1015,7 @@ public interface ClientProtocol {
    * @throws IOException If an I/O error occurred
    */
   @Idempotent
-  @ReadOnly
+  @ReadOnly(isCoordinated = true)
   ContentSummary getContentSummary(String path) throws IOException;
 
   /**
@@ -1128,7 +1128,7 @@ public interface ClientProtocol {
    *           or an I/O error occurred
    */
   @Idempotent
-  @ReadOnly
+  @ReadOnly(isCoordinated = true)
   String getLinkTarget(String path) throws IOException;
 
   /**
@@ -1199,7 +1199,7 @@ public interface ClientProtocol {
    * @throws IOException
    */
   @Idempotent
-  @ReadOnly
+  @ReadOnly(isCoordinated = true)
   DataEncryptionKey getDataEncryptionKey() throws IOException;
 
   /**
@@ -1268,7 +1268,7 @@ public interface ClientProtocol {
    * @throws IOException on error
    */
   @Idempotent
-  @ReadOnly
+  @ReadOnly(isCoordinated = true)
   SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot,
       String fromSnapshot, String toSnapshot) throws IOException;
 
@@ -1314,7 +1314,7 @@ public interface ClientProtocol {
    * @return A batch of CacheDirectiveEntry objects.
    */
   @Idempotent
-  @ReadOnly
+  @ReadOnly(isCoordinated = true)
   BatchedEntries<CacheDirectiveEntry> listCacheDirectives(
       long prevId, CacheDirectiveInfo filter) throws IOException;
 
@@ -1356,7 +1356,7 @@ public interface ClientProtocol {
    * @return A batch of CachePoolEntry objects.
    */
   @Idempotent
-  @ReadOnly
+  @ReadOnly(isCoordinated = true)
   BatchedEntries<CachePoolEntry> listCachePools(String prevPool)
       throws IOException;
 
@@ -1403,7 +1403,7 @@ public interface ClientProtocol {
    * Gets the ACLs of files and directories.
    */
   @Idempotent
-  @ReadOnly
+  @ReadOnly(isCoordinated = true)
   AclStatus getAclStatus(String src) throws IOException;
 
   /**
@@ -1417,7 +1417,7 @@ public interface ClientProtocol {
    * Get the encryption zone for a path.
    */
   @Idempotent
-  @ReadOnly
+  @ReadOnly(isCoordinated = true)
   EncryptionZone getEZForPath(String src)
     throws IOException;
 
@@ -1429,7 +1429,7 @@ public interface ClientProtocol {
    * @return Batch of encryption zones.
    */
   @Idempotent
-  @ReadOnly
+  @ReadOnly(isCoordinated = true)
   BatchedEntries<EncryptionZone> listEncryptionZones(
       long prevId) throws IOException;
 
@@ -1463,7 +1463,7 @@ public interface ClientProtocol {
    * @throws IOException
    */
   @Idempotent
-  @ReadOnly
+  @ReadOnly(isCoordinated = true)
   List<XAttr> getXAttrs(String src, List<XAttr> xAttrs)
       throws IOException;
 
@@ -1479,7 +1479,7 @@ public interface ClientProtocol {
    * @throws IOException
    */
   @Idempotent
-  @ReadOnly
+  @ReadOnly(isCoordinated = true)
   List<XAttr> listXAttrs(String src)
       throws IOException;
 
@@ -1514,7 +1514,7 @@ public interface ClientProtocol {
    * @throws IOException see specific implementation
    */
   @Idempotent
-  @ReadOnly
+  @ReadOnly // TODO : after HDFS-13749 is done, change to coordinated call
   void checkAccess(String path, FsAction mode) throws IOException;
 
   /**
@@ -1523,7 +1523,7 @@ public interface ClientProtocol {
    * the starting point for the inotify event stream.
    */
   @Idempotent
-  @ReadOnly
+  @ReadOnly(isCoordinated = true)
   long getCurrentEditLogTxid() throws IOException;
 
   /**
@@ -1531,7 +1531,7 @@ public interface ClientProtocol {
    * transactions for txids equal to or greater than txid.
    */
   @Idempotent
-  @ReadOnly
+  @ReadOnly(isCoordinated = true)
   EventBatchList getEditsFromTxid(long txid) throws IOException;
 
   /**
@@ -1563,7 +1563,7 @@ public interface ClientProtocol {
    * @throws IOException
    */
   @Idempotent
-  @ReadOnly
+  @ReadOnly(isCoordinated = true)
   BatchedEntries<OpenFileEntry> listOpenFiles(long prevId) throws IOException;
 
   /**
@@ -1574,6 +1574,6 @@ public interface ClientProtocol {
    * @throws IOException
    */
   @Idempotent
-  @ReadOnly
+  @ReadOnly(activeOnly = true)
   void msync() throws IOException;
 }

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ReadOnly.java

@@ -44,4 +44,11 @@ public @interface ReadOnly {
    * is only available on the active namenode.
    */
   boolean activeOnly() default false;
+
+  /**
+   * @return if true, when processing the rpc call of the target method, the
+   * server side will wait if server state id is behind client (msync). If
+   * false, the method will be processed regardless of server side state.
+   */
+  boolean isCoordinated() default false;
 }

+ 21 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java

@@ -18,9 +18,13 @@
 
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.lang.reflect.Method;
+import java.util.HashSet;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly;
 import org.apache.hadoop.ipc.AlignmentContext;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
@@ -34,12 +38,23 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 class GlobalStateIdContext implements AlignmentContext {
   private final FSNamesystem namesystem;
 
+  private final HashSet<String> coordinatedMethods;
+
   /**
    * Server side constructor.
    * @param namesystem server side state provider
    */
   GlobalStateIdContext(FSNamesystem namesystem) {
     this.namesystem = namesystem;
+    this.coordinatedMethods = new HashSet<>();
+    // For now, only ClientProtocol methods can be coordinated, so only checking
+    // against ClientProtocol.
+    for (Method method : ClientProtocol.class.getDeclaredMethods()) {
+      if (method.isAnnotationPresent(ReadOnly.class) &&
+          method.getAnnotationsByType(ReadOnly.class)[0].isCoordinated()) {
+        coordinatedMethods.add(method.getName());
+      }
+    }
   }
 
   /**
@@ -92,4 +107,10 @@ class GlobalStateIdContext implements AlignmentContext {
   public long getLastSeenStateId() {
     return namesystem.getFSImage().getCorrectLastAppliedOrWrittenTxId();
   }
+
+  @Override
+  public boolean isCoordinatedCall(String protocolName, String methodName) {
+    return protocolName.equals(ClientProtocol.class.getCanonicalName())
+        && coordinatedMethods.contains(methodName);
+  }
 }

+ 52 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.io.retry.FailoverProxyProvider;
@@ -351,6 +352,57 @@ public class TestObserverNode {
     assertEquals(1, readStatus.get());
   }
 
+  @Test
+  public void testUncoordinatedCall() throws Exception {
+    // disable fast tailing so that coordination takes time.
+    conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, false);
+    conf.setTimeDuration(DFS_HA_LOGROLL_PERIOD_KEY, 300, TimeUnit.SECONDS);
+    conf.setTimeDuration(
+        DFS_HA_TAILEDITS_PERIOD_KEY, 200, TimeUnit.SECONDS);
+    setUpCluster(1);
+    setObserverRead(true);
+
+    // make a write call so that client will be ahead of
+    // observer for now.
+    dfs.mkdir(testPath, FsPermission.getDefault());
+
+    // a status flag, initialized to 0, after reader finished, this will be
+    // updated to 1, -1 on error
+    AtomicInteger readStatus = new AtomicInteger(0);
+
+    // create a separate thread to make a blocking read.
+    Thread reader = new Thread(() -> {
+      try {
+        // this read call will block until server state catches up. But due to
+        // configuration, this will take a very long time.
+        dfs.getClient().getFileInfo("/");
+        readStatus.set(1);
+        fail("Should have been interrupted before getting here.");
+      } catch (IOException e) {
+        e.printStackTrace();
+        readStatus.set(-1);
+      }
+    });
+    reader.start();
+
+    long before = System.currentTimeMillis();
+    dfs.getClient().datanodeReport(HdfsConstants.DatanodeReportType.ALL);
+    long after = System.currentTimeMillis();
+
+    // should succeed immediately, because datanodeReport is marked an
+    // uncoordinated call, and will not be waiting for server to catch up.
+    assertTrue(after - before < 200);
+    // by this time, reader thread should still be blocking, so the status not
+    // updated
+    assertEquals(0, readStatus.get());
+    Thread.sleep(5000);
+    // reader thread status should still be unchanged after 5 sec...
+    assertEquals(0, readStatus.get());
+    // and the reader thread is not dead, so it must be still waiting
+    assertEquals(Thread.State.WAITING, reader.getState());
+    reader.interrupt();
+  }
+
   private void setUpCluster(int numObservers) throws Exception {
     qjmhaCluster = new MiniQJMHACluster.Builder(conf)
         .setNumNameNodes(2 + numObservers)