Sfoglia il codice sorgente

HDDS-1555. Disable install snapshot for ContainerStateMachine. Contributed by Siddharth Wagle. (#846)

Mukul Kumar Singh 5 anni fa
parent
commit
9df6275954

+ 37 - 35
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java

@@ -18,52 +18,54 @@
 
 package org.apache.hadoop.hdds.scm;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.security.x509.SecurityConfig;
-
-import io.opentracing.Scope;
-import io.opentracing.util.GlobalTracer;
-import org.apache.hadoop.util.Time;
-import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.proto.RaftProtos;
-import org.apache.ratis.protocol.GroupMismatchException;
-import org.apache.ratis.protocol.RaftRetryFailureException;
-import org.apache.ratis.retry.RetryPolicy;
-import org.apache.ratis.thirdparty.com.google.protobuf
-    .InvalidProtocolBufferException;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
-
+import org.apache.hadoop.util.Time;
 import org.apache.ratis.RatisHelper;
 import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.GroupMismatchException;
 import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftException;
+import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import io.opentracing.Scope;
+import io.opentracing.util.GlobalTracer;
 
 /**
  * An abstract implementation of {@link XceiverClientSpi} using Ratis.
@@ -309,10 +311,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
               Time.monotonicNowNanos() - requestTime);
         }).thenApply(reply -> {
           try {
-            // we need to handle RaftRetryFailure Exception
-            RaftRetryFailureException raftRetryFailureException =
-                reply.getRetryFailureException();
-            if (raftRetryFailureException != null) {
+            if (!reply.isSuccess()) {
               // in case of raft retry failure, the raft client is
               // not able to connect to the leader hence the pipeline
               // can not be used but this instance of RaftClient will close
@@ -324,7 +323,10 @@ public final class XceiverClientRatis extends XceiverClientSpi {
               // to SCM as in this case, it is the raft client which is not
               // able to connect to leader in the pipeline, though the
               // pipeline can still be functional.
-              throw new CompletionException(raftRetryFailureException);
+              RaftException exception = reply.getException();
+              Preconditions.checkNotNull(exception, "Raft reply failure but " +
+                  "no exception propagated.");
+              throw new CompletionException(exception);
             }
             ContainerCommandResponseProto response =
                 ContainerCommandResponseProto

+ 6 - 1
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java

@@ -107,6 +107,11 @@ public final class ScmConfigKeys {
       "dfs.container.ratis.log.appender.queue.byte-limit";
   public static final String
       DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT = "32MB";
+  public static final String DFS_CONTAINER_RATIS_LOG_PURGE_GAP =
+      "dfs.container.ratis.log.purge.gap";
+  // TODO: Set to 1024 once RATIS issue around purge is fixed.
+  public static final int DFS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT =
+      1000000000;
   // expiry interval stateMachineData cache entry inside containerStateMachine
   public static final String
       DFS_CONTAINER_RATIS_STATEMACHINEDATA_CACHE_EXPIRY_INTERVAL =
@@ -146,7 +151,7 @@ public final class ScmConfigKeys {
 
   public static final String DFS_RATIS_SNAPSHOT_THRESHOLD_KEY =
       "dfs.ratis.snapshot.threshold";
-  public static final long DFS_RATIS_SNAPSHOT_THRESHOLD_DEFAULT = 10000;
+  public static final long DFS_RATIS_SNAPSHOT_THRESHOLD_DEFAULT = 100000;
 
   public static final String DFS_RATIS_SERVER_FAILURE_DURATION_KEY =
       "dfs.ratis.server.failure.duration";

+ 4 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java

@@ -322,6 +322,10 @@ public final class OzoneConfigKeys {
   public static final String
       DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT =
       ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT;
+  public static final String DFS_CONTAINER_RATIS_LOG_PURGE_GAP =
+      ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP;
+  public static final int DFS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT =
+      ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT;
   public static final String DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_KEY =
       ScmConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_KEY;
   public static final TimeDuration

+ 8 - 0
hadoop-hdds/common/src/main/resources/ozone-default.xml

@@ -104,6 +104,14 @@
     <description>Byte limit for ratis leader's log appender queue.
     </description>
   </property>
+  <property>
+    <name>dfs.container.ratis.log.purge.gap</name>
+    <value>1000000000</value>
+    <tag>OZONE, DEBUG, CONTAINER, RATIS</tag>
+    <description>Purge gap between the last purged commit index
+      and the current index, when the leader decides to purge its log.
+    </description>
+  </property>
   <property>
     <name>dfs.container.ratis.datanode.storage.dir</name>
     <value/>

+ 18 - 10
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java

@@ -28,12 +28,11 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
-import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
 import org.apache.ratis.thirdparty.com.google.protobuf
     .InvalidProtocolBufferException;
@@ -195,12 +194,12 @@ public class ContainerStateMachine extends BaseStateMachine {
       throws IOException {
     if (snapshot == null) {
       TermIndex empty =
-          TermIndex.newTermIndex(0, RaftServerConstants.INVALID_LOG_INDEX);
+          TermIndex.newTermIndex(0, RaftLog.INVALID_LOG_INDEX);
       LOG.info(
           "The snapshot info is null." + "Setting the last applied index to:"
               + empty);
       setLastAppliedTermIndex(empty);
-      return RaftServerConstants.INVALID_LOG_INDEX;
+      return RaftLog.INVALID_LOG_INDEX;
     }
 
     final File snapshotFile = snapshot.getFile().getPath().toFile();
@@ -243,7 +242,7 @@ public class ContainerStateMachine extends BaseStateMachine {
   public long takeSnapshot() throws IOException {
     TermIndex ti = getLastAppliedTermIndex();
     LOG.info("Taking snapshot at termIndex:" + ti);
-    if (ti != null && ti.getIndex() != RaftServerConstants.INVALID_LOG_INDEX) {
+    if (ti != null && ti.getIndex() != RaftLog.INVALID_LOG_INDEX) {
       final File snapshotFile =
           storage.getSnapshotFile(ti.getTerm(), ti.getIndex());
       LOG.info("Taking a snapshot to file {}", snapshotFile);
@@ -651,14 +650,13 @@ public class ContainerStateMachine extends BaseStateMachine {
   }
 
   @Override
-  public void notifySlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
-    ratisServer.handleNodeSlowness(group, roleInfoProto);
+  public void notifySlowness(RoleInfoProto roleInfoProto) {
+    ratisServer.handleNodeSlowness(gid, roleInfoProto);
   }
 
   @Override
-  public void notifyExtendedNoLeader(RaftGroup group,
-      RoleInfoProto roleInfoProto) {
-    ratisServer.handleNoLeader(group, roleInfoProto);
+  public void notifyExtendedNoLeader(RoleInfoProto roleInfoProto) {
+    ratisServer.handleNoLeader(gid, roleInfoProto);
   }
 
   @Override
@@ -667,6 +665,16 @@ public class ContainerStateMachine extends BaseStateMachine {
     evictStateMachineCache();
   }
 
+  @Override
+  public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
+      RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
+    ratisServer.handleInstallSnapshotFromLeader(gid, roleInfoProto,
+        firstTermIndexInLog);
+    final CompletableFuture<TermIndex> future = new CompletableFuture<>();
+    future.complete(firstTermIndexInLog);
+    return future;
+  }
+
   @Override
   public void close() throws IOException {
     evictStateMachineCache();

+ 41 - 10
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java

@@ -57,7 +57,6 @@ import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.NotLeaderException;
 import org.apache.ratis.protocol.StateMachineException;
 import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
@@ -66,6 +65,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
 import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
@@ -240,8 +240,9 @@ public final class XceiverServerRatis extends XceiverServer {
         OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT_DEFAULT,
         StorageUnit.BYTES);
-    RaftServerConfigKeys.Log.setElementLimit(properties, logQueueNumElements);
-    RaftServerConfigKeys.Log.setByteLimit(properties, logQueueByteLimit);
+    RaftServerConfigKeys.Log.setQueueElementLimit(
+        properties, logQueueNumElements);
+    RaftServerConfigKeys.Log.setQueueByteLimit(properties, logQueueByteLimit);
 
     int numSyncRetries = conf.getInt(
         OzoneConfigKeys.DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES,
@@ -251,8 +252,17 @@ public final class XceiverServerRatis extends XceiverServer {
         numSyncRetries);
 
     // Enable the StateMachineCaching
-    RaftServerConfigKeys.Log.StateMachineData
-        .setCachingEnabled(properties, true);
+    RaftServerConfigKeys.Log.StateMachineData.setCachingEnabled(
+        properties, true);
+
+    RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(properties,
+        false);
+
+    int purgeGap = conf.getInt(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP,
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP_DEFAULT);
+    RaftServerConfigKeys.Log.setPurgeGap(properties, purgeGap);
+
     return properties;
   }
 
@@ -590,11 +600,32 @@ public final class XceiverServerRatis extends XceiverServer {
     return pipelineIDs;
   }
 
-  void handleNodeSlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
-    handlePipelineFailure(group.getGroupId(), roleInfoProto);
+  void handleNodeSlowness(RaftGroupId groupId, RoleInfoProto roleInfoProto) {
+    handlePipelineFailure(groupId, roleInfoProto);
   }
 
-  void handleNoLeader(RaftGroup group, RoleInfoProto roleInfoProto) {
-    handlePipelineFailure(group.getGroupId(), roleInfoProto);
+  void handleNoLeader(RaftGroupId groupId, RoleInfoProto roleInfoProto) {
+    handlePipelineFailure(groupId, roleInfoProto);
+  }
+
+  /**
+   * The fact that the snapshot contents cannot be used to actually catch up
+   * the follower, it is the reason to initiate close pipeline and
+   * not install the snapshot. The follower will basically never be able to
+   * catch up.
+   *
+   * @param groupId raft group information
+   * @param roleInfoProto information about the current node role and
+   *                      rpc delay information.
+   * @param firstTermIndexInLog After the snapshot installation is complete,
+   * return the last included term index in the snapshot.
+   */
+  void handleInstallSnapshotFromLeader(RaftGroupId groupId,
+                                       RoleInfoProto roleInfoProto,
+                                       TermIndex firstTermIndexInLog) {
+    LOG.warn("Install snapshot notification received from Leader with " +
+        "termIndex: {}, terminating pipeline: {}",
+        firstTermIndexInLog, groupId);
+    handlePipelineFailure(groupId, roleInfoProto);
   }
-}
+}

+ 1 - 1
hadoop-hdds/pom.xml

@@ -47,7 +47,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
     <hdds.version>0.5.0-SNAPSHOT</hdds.version>
 
     <!-- Apache Ratis version -->
-    <ratis.version>0.4.0-fe2b15d-SNAPSHOT</ratis.version>
+    <ratis.version>0.4.0-2337318-SNAPSHOT</ratis.version>
 
     <bouncycastle.version>1.60</bouncycastle.version>
 

+ 27 - 32
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java

@@ -17,6 +17,8 @@
 
 package org.apache.hadoop.ozone.om.ratis;
 
+import static org.apache.hadoop.ozone.om.exceptions.OMException.STATUS_CODE;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
@@ -24,23 +26,18 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.ServiceException;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .OMRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftException;
 import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftRetryFailureException;
 import org.apache.ratis.protocol.StateMachineException;
 import org.apache.ratis.retry.RetryPolicies;
 import org.apache.ratis.retry.RetryPolicy;
@@ -51,7 +48,9 @@ import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.hadoop.ozone.om.exceptions.OMException.STATUS_CODE;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.ServiceException;
 
 /**
  * OM Ratis client to interact with OM Ratis server endpoint.
@@ -167,29 +166,25 @@ public final class OzoneManagerRatisClient implements Closeable {
     CompletableFuture<RaftClientReply> raftClientReply =
         sendRequestAsync(request);
 
-    CompletableFuture<OMResponse> omRatisResponse =
-        raftClientReply.whenComplete((reply, e) -> LOG.debug(
-            "received reply {} for request: cmdType={} traceID={} " +
-                "exception: {}", reply, request.getCmdType(),
-            request.getTraceID(), e))
-            .thenApply(reply -> {
-              try {
-                // we need to handle RaftRetryFailure Exception
-                RaftRetryFailureException raftRetryFailureException =
-                    reply.getRetryFailureException();
-                if (raftRetryFailureException != null) {
-                  throw new CompletionException(raftRetryFailureException);
-                }
-
-                OMResponse response = OMRatisHelper
-                    .getOMResponseFromRaftClientReply(reply);
-
-                return response;
-              } catch (InvalidProtocolBufferException e) {
-                throw new CompletionException(e);
-              }
-            });
-    return omRatisResponse;
+    return raftClientReply.whenComplete((reply, e) -> LOG.debug(
+        "received reply {} for request: cmdType={} traceID={} " +
+            "exception: {}", reply, request.getCmdType(),
+        request.getTraceID(), e))
+        .thenApply(reply -> {
+          try {
+            Preconditions.checkNotNull(reply);
+            if (!reply.isSuccess()) {
+              RaftException exception = reply.getException();
+              Preconditions.checkNotNull(exception, "Raft reply failure " +
+                  "but no exception propagated.");
+              throw new CompletionException(exception);
+            }
+            return OMRatisHelper.getOMResponseFromRaftClientReply(reply);
+
+          } catch (InvalidProtocolBufferException e) {
+            throw new CompletionException(e);
+          }
+        });
   }
 
   /**

+ 1 - 1
hadoop-ozone/pom.xml

@@ -29,7 +29,7 @@
     <hadoop.version>3.2.0</hadoop.version>
     <hdds.version>0.5.0-SNAPSHOT</hdds.version>
     <ozone.version>0.5.0-SNAPSHOT</ozone.version>
-    <ratis.version>0.4.0-fe2b15d-SNAPSHOT</ratis.version>
+    <ratis.version>0.4.0-2337318-SNAPSHOT</ratis.version>
     <bouncycastle.version>1.60</bouncycastle.version>
     <ozone.release>Crater Lake</ozone.release>
     <declared.ozone.version>${ozone.version}</declared.ozone.version>