Преглед изворни кода

HDDS-943. Add block token validation in HddsDispatcher/XceiverServer. Contributed by Ajay Kumar.

Xiaoyu Yao пре 6 година
родитељ
комит
2aaaf12f9e
18 измењених фајлова са 684 додато и 92 уклоњено
  1. 3 3
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
  2. 26 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java
  3. 9 1
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java
  4. 8 8
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java
  5. 1 1
      hadoop-hdds/common/src/main/resources/ozone-default.xml
  6. 16 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
  7. 86 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
  8. 17 23
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
  9. 34 9
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
  10. 6 5
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
  11. 151 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
  12. 1 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
  13. 35 10
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
  14. 13 7
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestSecureOzoneContainer.java
  15. 32 11
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
  16. 238 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
  17. 4 4
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
  18. 4 9
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java

+ 3 - 3
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java

@@ -126,9 +126,9 @@ public final class HddsConfigKeys {
   public static final String HDDS_X509_SIGNATURE_ALGO =
       "hdds.x509.signature.algorithm";
   public static final String HDDS_X509_SIGNATURE_ALGO_DEFAULT = "SHA256withRSA";
-  public static final String HDDS_GRPC_BLOCK_TOKEN_ENABLED =
-      "hdds.grpc.block.token.enabled";
-  public static final boolean HDDS_GRPC_BLOCK_TOKEN_ENABLED_DEFAULT = false;
+  public static final String HDDS_BLOCK_TOKEN_ENABLED =
+      "hdds.block.token.enabled";
+  public static final boolean HDDS_BLOCK_TOKEN_ENABLED_DEFAULT = false;
 
   public static final String HDDS_X509_DIR_NAME = "hdds.x509.dir.name";
   public static final String HDDS_X509_DIR_NAME_DEFAULT = "certs";

+ 26 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java

@@ -24,6 +24,7 @@ import java.io.IOException;
  * Root Security Exception call for all Certificate related Execptions.
  */
 public class SCMSecurityException extends IOException {
+  private final ErrorCode errorCode;
 
   /**
    * Ctor.
@@ -31,6 +32,7 @@ public class SCMSecurityException extends IOException {
    */
   public SCMSecurityException(String message) {
     super(message);
+    this.errorCode = ErrorCode.DEFAULT;
   }
 
   /**
@@ -40,6 +42,17 @@ public class SCMSecurityException extends IOException {
    */
   public SCMSecurityException(String message, Throwable cause) {
     super(message, cause);
+    this.errorCode = ErrorCode.DEFAULT;
+  }
+
+  /**
+   * Ctor.
+   * @param message - Message.
+   * @param error   - error code.
+   */
+  public SCMSecurityException(String message, ErrorCode error) {
+    super(message);
+    this.errorCode = error;
   }
 
   /**
@@ -48,5 +61,18 @@ public class SCMSecurityException extends IOException {
    */
   public SCMSecurityException(Throwable cause) {
     super(cause);
+    this.errorCode = ErrorCode.DEFAULT;
+  }
+
+  public ErrorCode getErrorCode() {
+    return errorCode;
+  }
+
+  /**
+   * Error codes to make it easy to decode these exceptions.
+   */
+  public enum ErrorCode {
+    DEFAULT,
+    MISSING_BLOCK_TOKEN
   }
 }

+ 9 - 1
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java

@@ -25,6 +25,8 @@ import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
@@ -40,6 +42,8 @@ public class BlockTokenVerifier implements TokenVerifier {
   private final CertificateClient caClient;
   private final SecurityConfig conf;
   private static boolean testStub = false;
+  private final static Logger LOGGER =
+      LoggerFactory.getLogger(BlockTokenVerifier.class);
 
   public BlockTokenVerifier(SecurityConfig conf, CertificateClient caClient) {
     this.conf = conf;
@@ -53,7 +57,9 @@ public class BlockTokenVerifier implements TokenVerifier {
   @Override
   public UserGroupInformation verify(String user, String tokenStr)
       throws SCMSecurityException {
-    if (conf.isGrpcBlockTokenEnabled()) {
+    if (conf.isBlockTokenEnabled()) {
+      // TODO: add audit logs.
+
       if (Strings.isNullOrEmpty(tokenStr) || isTestStub()) {
         throw new BlockTokenException("Fail to find any token (empty or " +
             "null.");
@@ -62,10 +68,12 @@ public class BlockTokenVerifier implements TokenVerifier {
       OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier();
       try {
         token.decodeFromUrlString(tokenStr);
+        LOGGER.debug("Verifying token:{} for user:{} ", token, user);
         ByteArrayInputStream buf = new ByteArrayInputStream(
             token.getIdentifier());
         DataInputStream in = new DataInputStream(buf);
         tokenId.readFields(in);
+
       } catch (IOException ex) {
         throw new BlockTokenException("Failed to decode token : " + tokenStr);
       }

+ 8 - 8
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java

@@ -36,8 +36,8 @@ import java.time.Duration;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DEFAULT_KEY_ALGORITHM;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DEFAULT_KEY_LEN;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DEFAULT_SECURITY_PROVIDER;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_BLOCK_TOKEN_ENABLED;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_BLOCK_TOKEN_ENABLED_DEFAULT;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_TLS_ENABLED;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_TLS_ENABLED_DEFAULT;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_TLS_PROVIDER;
@@ -98,7 +98,7 @@ public class SecurityConfig {
   private final String publicKeyFileName;
   private final Duration certDuration;
   private final String x509SignatureAlgo;
-  private final boolean grpcBlockTokenEnabled;
+  private final boolean blockTokenEnabled;
   private final String certificateDir;
   private final String certificateFileName;
   private final boolean grpcTlsEnabled;
@@ -147,9 +147,9 @@ public class SecurityConfig {
     this.certificateFileName = this.configuration.get(HDDS_X509_FILE_NAME,
         HDDS_X509_FILE_NAME_DEFAULT);
 
-    this.grpcBlockTokenEnabled = this.configuration.getBoolean(
-        HDDS_GRPC_BLOCK_TOKEN_ENABLED,
-        HDDS_GRPC_BLOCK_TOKEN_ENABLED_DEFAULT);
+    this.blockTokenEnabled = this.configuration.getBoolean(
+        HDDS_BLOCK_TOKEN_ENABLED,
+        HDDS_BLOCK_TOKEN_ENABLED_DEFAULT);
 
     this.grpcTlsEnabled = this.configuration.getBoolean(HDDS_GRPC_TLS_ENABLED,
         HDDS_GRPC_TLS_ENABLED_DEFAULT);
@@ -357,8 +357,8 @@ public class SecurityConfig {
     return this.certDuration;
   }
 
-  public boolean isGrpcBlockTokenEnabled() {
-    return this.grpcBlockTokenEnabled;
+  public boolean isBlockTokenEnabled() {
+    return this.blockTokenEnabled;
   }
 
   /**

+ 1 - 1
hadoop-hdds/common/src/main/resources/ozone-default.xml

@@ -1658,7 +1658,7 @@
   </property>
   
   <property>
-    <name>hdds.grpc.block.token.enabled</name>
+    <name>hdds.block.token.enabled</name>
     <value>false</value>
     <tag>OZONE, HDDS, SECURITY, TOKEN</tag>
     <description>True if block tokens are enabled, else false.</description>

+ 16 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java

@@ -23,7 +23,9 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandResponseProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto
     .XceiverClientProtocolServiceGrpc;
+import org.apache.hadoop.hdds.security.token.TokenVerifier;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,9 +41,18 @@ public class GrpcXceiverService extends
       LOG = LoggerFactory.getLogger(GrpcXceiverService.class);
 
   private final ContainerDispatcher dispatcher;
+  private final boolean isGrpcTokenEnabled;
+  private final TokenVerifier tokenVerifier;
 
   public GrpcXceiverService(ContainerDispatcher dispatcher) {
+    this(dispatcher, false, null);
+  }
+
+  public GrpcXceiverService(ContainerDispatcher dispatcher,
+      boolean grpcTokenEnabled, TokenVerifier tokenVerifier) {
     this.dispatcher = dispatcher;
+    this.isGrpcTokenEnabled = grpcTokenEnabled;
+    this.tokenVerifier = tokenVerifier;
   }
 
   @Override
@@ -53,6 +64,11 @@ public class GrpcXceiverService extends
       @Override
       public void onNext(ContainerCommandRequestProto request) {
         try {
+          if(isGrpcTokenEnabled) {
+            // ServerInterceptors intercepts incoming request and creates ugi.
+            tokenVerifier.verify(UserGroupInformation.getCurrentUser()
+                .getShortUserName(), request.getEncodedToken());
+          }
           ContainerCommandResponseProto resp =
               dispatcher.dispatch(request, null);
           responseObserver.onNext(resp);

+ 86 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java

@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.token.BlockTokenVerifier;
+import org.apache.hadoop.hdds.security.token.TokenVerifier;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * A server endpoint that acts as the communication layer for Ozone containers.
+ */
+public abstract class XceiverServer implements XceiverServerSpi {
+
+  private final SecurityConfig secConfig;
+  private final TokenVerifier tokenVerifier;
+
+  public XceiverServer(Configuration conf) {
+    Objects.nonNull(conf);
+    this.secConfig = new SecurityConfig(conf);
+    tokenVerifier = new BlockTokenVerifier(secConfig, getCaClient());
+  }
+
+  /**
+   * Default implementation which just validates security token if security is
+   * enabled.
+   *
+   * @param request ContainerCommandRequest
+   */
+  @Override
+  public void submitRequest(ContainerCommandRequestProto request,
+      HddsProtos.PipelineID pipelineID) throws IOException {
+    if (secConfig.isSecurityEnabled()) {
+      String encodedToken = request.getEncodedToken();
+      if (encodedToken == null) {
+        throw new SCMSecurityException("Security is enabled but client " +
+            "request is missing block token.",
+            SCMSecurityException.ErrorCode.MISSING_BLOCK_TOKEN);
+      }
+      tokenVerifier.verify(encodedToken, "");
+    }
+  }
+
+  @VisibleForTesting
+  protected CertificateClient getCaClient() {
+    // TODO: instantiate CertificateClient
+    return null;
+  }
+
+  protected SecurityConfig getSecurityConfig() {
+    return secConfig;
+  }
+
+  protected TokenVerifier getBlockTokenVerifier() {
+    return tokenVerifier;
+  }
+
+  public SecurityConfig getSecConfig() {
+    return secConfig;
+  }
+
+}

+ 17 - 23
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java

@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.ozone.container.common.transport.server;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -31,9 +30,6 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.container.common.helpers.
     StorageContainerException;
-import org.apache.hadoop.hdds.security.token.BlockTokenVerifier;
-import org.apache.hadoop.hdds.security.x509.SecurityConfig;
-import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@@ -62,7 +58,7 @@ import java.util.UUID;
  * Creates a Grpc server endpoint that acts as the communication layer for
  * Ozone containers.
  */
-public final class XceiverServerGrpc implements XceiverServerSpi {
+public final class XceiverServerGrpc extends XceiverServer {
   private static final Logger
       LOG = LoggerFactory.getLogger(XceiverServerGrpc.class);
   private int port;
@@ -77,6 +73,7 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
    */
   public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf,
       ContainerDispatcher dispatcher, BindableService... additionalServices) {
+    super(conf);
     Preconditions.checkNotNull(conf);
 
     this.id = datanodeDetails.getUuid();
@@ -103,33 +100,35 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
         ((NettyServerBuilder) ServerBuilder.forPort(port))
             .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE);
 
-    // Populate UGI context via ServerCredentialInterceptor
-    SecurityConfig secConfig = new SecurityConfig(conf);
     ServerCredentialInterceptor credInterceptor =
-        new ServerCredentialInterceptor(
-            new BlockTokenVerifier(secConfig, getCaClient()));
+        new ServerCredentialInterceptor(getBlockTokenVerifier());
     nettyServerBuilder.addService(ServerInterceptors.intercept(
-          new GrpcXceiverService(dispatcher), credInterceptor));
+        new GrpcXceiverService(dispatcher,
+            getSecurityConfig().isBlockTokenEnabled(),
+            getBlockTokenVerifier()), credInterceptor));
+
 
     for (BindableService service : additionalServices) {
       nettyServerBuilder.addService(service);
     }
 
-    if (secConfig.isGrpcTlsEnabled()) {
-      File privateKeyFilePath = secConfig.getServerPrivateKeyFile();
-      File serverCertChainFilePath = secConfig.getServerCertChainFile();
-      File clientCertChainFilePath = secConfig.getClientCertChainFile();
+    if (getSecConfig().isGrpcTlsEnabled()) {
+      File privateKeyFilePath = getSecurityConfig().getServerPrivateKeyFile();
+      File serverCertChainFilePath =
+          getSecurityConfig().getServerCertChainFile();
+      File clientCertChainFilePath =
+          getSecurityConfig().getClientCertChainFile();
       try {
         SslContextBuilder sslClientContextBuilder = SslContextBuilder.forServer(
             serverCertChainFilePath, privateKeyFilePath);
-        if (secConfig.isGrpcMutualTlsRequired() && clientCertChainFilePath
-            != null) {
+        if (getSecurityConfig().isGrpcMutualTlsRequired() &&
+            clientCertChainFilePath != null) {
           // Only needed for mutual TLS
           sslClientContextBuilder.clientAuth(ClientAuth.REQUIRE);
           sslClientContextBuilder.trustManager(clientCertChainFilePath);
         }
         SslContextBuilder sslContextBuilder = GrpcSslContexts.configure(
-            sslClientContextBuilder, secConfig.getGrpcSslProvider());
+            sslClientContextBuilder, getSecurityConfig().getGrpcSslProvider());
         nettyServerBuilder.sslContext(sslContextBuilder.build());
       } catch (Exception ex) {
         LOG.error("Unable to setup TLS for secure datanode GRPC endpoint.", ex);
@@ -139,12 +138,6 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
     storageContainer = dispatcher;
   }
 
-  @VisibleForTesting
-  public CertificateClient getCaClient() {
-    // TODO: instantiate CertificateClient
-    return null;
-  }
-
   @Override
   public int getIPCPort() {
     return this.port;
@@ -173,6 +166,7 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
   @Override
   public void submitRequest(ContainerCommandRequestProto request,
       HddsProtos.PipelineID pipelineID) throws IOException {
+    super.submitRequest(request, pipelineID);
     ContainerProtos.ContainerCommandResponseProto response =
         storageContainer.dispatch(request, null);
     if (response.getResult() != ContainerProtos.Result.SUCCESS) {

+ 34 - 9
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java

@@ -45,6 +45,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ReadChunkResponseProto;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.hdds.security.token.TokenVerifier;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.server.storage.RaftStorage;
@@ -126,6 +128,8 @@ public class ContainerStateMachine extends BaseStateMachine {
   private final Map<Long, Long> applyTransactionCompletionMap;
   private long lastIndex;
   private final Cache<Long, ByteString> stateMachineDataCache;
+  private final boolean isBlockTokenEnabled;
+  private final TokenVerifier tokenVerifier;
   /**
    * CSM metrics.
    */
@@ -133,7 +137,8 @@ public class ContainerStateMachine extends BaseStateMachine {
 
   public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher,
       ThreadPoolExecutor chunkExecutor, XceiverServerRatis ratisServer,
-      List<ExecutorService> executors, long expiryInterval) {
+      List<ExecutorService> executors, long expiryInterval,
+      boolean isBlockTokenEnabled, TokenVerifier tokenVerifier) {
     this.gid = gid;
     this.dispatcher = dispatcher;
     this.chunkExecutor = chunkExecutor;
@@ -149,6 +154,8 @@ public class ContainerStateMachine extends BaseStateMachine {
         // set the limit on no of cached entries equal to no of max threads
         // executing writeStateMachineData
         .maximumSize(chunkExecutor.getCorePoolSize()).build();
+    this.isBlockTokenEnabled = isBlockTokenEnabled;
+    this.tokenVerifier = tokenVerifier;
   }
 
   @Override
@@ -289,8 +296,13 @@ public class ContainerStateMachine extends BaseStateMachine {
 
   private ContainerCommandResponseProto dispatchCommand(
       ContainerCommandRequestProto requestProto,
-      DispatcherContext context) {
+      DispatcherContext context) throws IOException {
     LOG.trace("dispatch {}", requestProto);
+    if(isBlockTokenEnabled) {
+      // ServerInterceptors intercepts incoming request and creates ugi.
+      tokenVerifier.verify(UserGroupInformation.getCurrentUser()
+          .getShortUserName(), requestProto.getEncodedToken());
+    }
     ContainerCommandResponseProto response =
         dispatcher.dispatch(requestProto, context);
     LOG.trace("response {}", response);
@@ -298,7 +310,7 @@ public class ContainerStateMachine extends BaseStateMachine {
   }
 
   private Message runCommand(ContainerCommandRequestProto requestProto,
-      DispatcherContext context) {
+      DispatcherContext context) throws IOException {
     return dispatchCommand(requestProto, context)::toByteString;
   }
 
@@ -326,8 +338,15 @@ public class ContainerStateMachine extends BaseStateMachine {
             .setLogIndex(entryIndex)
             .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
             .build();
-    CompletableFuture<Message> writeChunkFuture = CompletableFuture
-        .supplyAsync(() -> runCommand(requestProto, context), chunkExecutor);
+    CompletableFuture<Message> writeChunkFuture;
+    try {
+      Message msg = runCommand(requestProto, context);
+      writeChunkFuture = CompletableFuture
+          .supplyAsync(() -> msg, chunkExecutor);
+    }catch(IOException ie) {
+      writeChunkFuture = completeExceptionally(ie);
+    }
+
     writeChunkFutureMap.put(entryIndex, writeChunkFuture);
     LOG.debug("writeChunk writeStateMachineData : blockId " + write.getBlockID()
         + " logIndex " + entryIndex + " chunkName " + write.getChunkData()
@@ -386,7 +405,8 @@ public class ContainerStateMachine extends BaseStateMachine {
   }
 
   private ByteString readStateMachineData(
-      ContainerCommandRequestProto requestProto, long term, long index) {
+      ContainerCommandRequestProto requestProto, long term, long index)
+      throws IOException {
     WriteChunkRequestProto writeChunkRequestProto =
         requestProto.getWriteChunk();
     ContainerProtos.ChunkInfo chunkInfo = writeChunkRequestProto.getChunkData();
@@ -559,9 +579,14 @@ public class ContainerStateMachine extends BaseStateMachine {
         builder
             .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA);
       }
-      future = CompletableFuture
-          .supplyAsync(() -> runCommand(requestProto, builder.build()),
-              getCommandExecutor(requestProto));
+      try {
+        Message msg = runCommand(requestProto, builder.build());
+        future = CompletableFuture.supplyAsync(() -> msg,
+            getCommandExecutor(requestProto));
+      } catch (IOException ie) {
+        future = completeExceptionally(ie);
+      }
+
       lastIndex = index;
       future.thenAccept(m -> {
         final Long previous =

+ 6 - 5
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java

@@ -38,8 +38,7 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.common.transport.server
-    .XceiverServerSpi;
+import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.RatisHelper;
 import org.apache.ratis.client.RaftClientConfigKeys;
@@ -69,7 +68,6 @@ import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.ws.rs.HEAD;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -91,7 +89,7 @@ import java.util.concurrent.atomic.AtomicLong;
  * Creates a ratis server endpoint that acts as the communication layer for
  * Ozone containers.
  */
-public final class XceiverServerRatis implements XceiverServerSpi {
+public final class XceiverServerRatis extends XceiverServer {
   private static final Logger LOG = LoggerFactory
       .getLogger(XceiverServerRatis.class);
   private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
@@ -115,6 +113,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
       ContainerDispatcher dispatcher, Configuration conf, StateContext
       context, GrpcTlsConfig tlsConfig)
       throws IOException {
+    super(conf);
     Objects.requireNonNull(dd, "id == null");
     this.port = port;
     RaftProperties serverProperties = newRaftProperties(conf);
@@ -155,7 +154,8 @@ public final class XceiverServerRatis implements XceiverServerSpi {
 
   private ContainerStateMachine getStateMachine(RaftGroupId gid) {
     return new ContainerStateMachine(gid, dispatcher, chunkExecutor, this,
-        Collections.unmodifiableList(executors), cacheEntryExpiryInteval);
+        Collections.unmodifiableList(executors), cacheEntryExpiryInteval,
+        getSecurityConfig().isBlockTokenEnabled(), getBlockTokenVerifier());
   }
 
   private RaftProperties newRaftProperties(Configuration conf) {
@@ -479,6 +479,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
   @Override
   public void submitRequest(ContainerCommandRequestProto request,
       HddsProtos.PipelineID pipelineID) throws IOException {
+    super.submitRequest(request, pipelineID);
     RaftClientReply reply;
     RaftClientRequest raftClientRequest =
         createRaftClientRequest(request, pipelineID,

+ 151 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java

@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.client.CertificateClientTestImpl;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+
+/**
+ * Tests the containerStateMachine failure handling.
+ */
+
+public class TestContainerStateMachine {
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+  private static OzoneClient client;
+  private static ObjectStore objectStore;
+  private static String volumeName;
+  private static String bucketName;
+  private static String path;
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    path = GenericTestUtils
+        .getTempPath(TestContainerStateMachine.class.getSimpleName());
+    File baseDir = new File(path);
+    baseDir.mkdirs();
+
+    conf.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true);
+  //  conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
+    conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
+        TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
+        TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+    conf.setQuietMode(false);
+    OzoneManager.setTestSecureOmFlag(true);
+  //  conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.toString());
+    cluster =
+        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1)
+            .setHbInterval(200)
+            .setCertificateClient(new CertificateClientTestImpl(conf))
+            .build();
+    cluster.waitForClusterToBeReady();
+    cluster.getOzoneManager().startSecretManager();
+    //the easiest way to create an open container is creating a key
+    client = OzoneClientFactory.getClient(conf);
+    objectStore = client.getObjectStore();
+    volumeName = "testcontainerstatemachinefailures";
+    bucketName = volumeName;
+    objectStore.createVolume(volumeName);
+    objectStore.getVolume(volumeName).createBucket(bucketName);
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testContainerStateMachineFailures() throws Exception {
+    OzoneOutputStream key =
+        objectStore.getVolume(volumeName).getBucket(bucketName)
+            .createKey("ratis", 1024, ReplicationType.RATIS,
+                ReplicationFactor.ONE);
+    // First write and flush creates a container in the datanode
+    key.write("ratis".getBytes());
+    key.flush();
+    key.write("ratis".getBytes());
+
+    //get the name of a valid container
+    KeyOutputStream groupOutputStream =
+        (KeyOutputStream) key.getOutputStream();
+
+    List<OmKeyLocationInfo> locationInfoList =
+        groupOutputStream.getLocationInfoList();
+    Assert.assertEquals(1, locationInfoList.size());
+    OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+
+    // delete the container dir
+    FileUtil.fullyDelete(new File(
+        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
+            .getContainer().getContainerSet()
+            .getContainer(omKeyLocationInfo.getContainerID()).getContainerData()
+            .getContainerPath()));
+
+    key.close();
+    // Make sure the container is marked unhealthy
+    Assert.assertTrue(
+        cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
+            .getContainer().getContainerSet()
+            .getContainer(omKeyLocationInfo.getContainerID())
+            .getContainerState()
+            == ContainerProtos.ContainerDataProto.State.UNHEALTHY);
+  }
+}

+ 1 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java

@@ -86,7 +86,7 @@ public class TestSecureOzoneRpcClient extends TestOzoneRpcClient {
     conf = new OzoneConfiguration();
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
     conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 1);
-    conf.setBoolean(HddsConfigKeys.HDDS_GRPC_BLOCK_TOKEN_ENABLED, true);
+    conf.setBoolean(HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED, true);
     conf.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath());
     CertificateClientTestImpl certificateClientTest =
         new CertificateClientTestImpl(conf);

+ 35 - 10
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java

@@ -23,8 +23,10 @@ import java.security.MessageDigest;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto.Builder;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -35,6 +37,7 @@ import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.OzoneChecksumException;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.security.token.Token;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -213,7 +216,7 @@ public final class ContainerTestHelper {
     writeRequest.setChunkData(info.getProtoBufMessage());
     writeRequest.setData(ByteString.copyFrom(data));
 
-    ContainerCommandRequestProto.Builder request =
+    Builder request =
         ContainerCommandRequestProto.newBuilder();
     request.setCmdType(ContainerProtos.Type.WriteChunk);
     request.setContainerID(blockID.getContainerID());
@@ -255,7 +258,7 @@ public final class ContainerTestHelper {
     smallFileRequest.setData(ByteString.copyFrom(data));
     smallFileRequest.setBlock(putRequest);
 
-    ContainerCommandRequestProto.Builder request =
+    Builder request =
         ContainerCommandRequestProto.newBuilder();
     request.setCmdType(ContainerProtos.Type.PutSmallFile);
     request.setContainerID(blockID.getContainerID());
@@ -274,7 +277,7 @@ public final class ContainerTestHelper {
     ContainerCommandRequestProto getKey = getBlockRequest(pipeline, putKey);
     smallFileRequest.setBlock(getKey.getGetBlock());
 
-    ContainerCommandRequestProto.Builder request =
+    Builder request =
         ContainerCommandRequestProto.newBuilder();
     request.setCmdType(ContainerProtos.Type.GetSmallFile);
     request.setContainerID(getKey.getGetBlock().getBlockID().getContainerID());
@@ -304,7 +307,7 @@ public final class ContainerTestHelper {
     readRequest.setBlockID(request.getBlockID());
     readRequest.setChunkData(request.getChunkData());
 
-    ContainerCommandRequestProto.Builder newRequest =
+    Builder newRequest =
         ContainerCommandRequestProto.newBuilder();
     newRequest.setCmdType(ContainerProtos.Type.ReadChunk);
     newRequest.setContainerID(readRequest.getBlockID().getContainerID());
@@ -337,7 +340,7 @@ public final class ContainerTestHelper {
     deleteRequest.setChunkData(writeRequest.getChunkData());
     deleteRequest.setBlockID(writeRequest.getBlockID());
 
-    ContainerCommandRequestProto.Builder request =
+    Builder request =
         ContainerCommandRequestProto.newBuilder();
     request.setCmdType(ContainerProtos.Type.DeleteChunk);
     request.setContainerID(writeRequest.getBlockID().getContainerID());
@@ -356,8 +359,12 @@ public final class ContainerTestHelper {
   public static ContainerCommandRequestProto getCreateContainerRequest(
       long containerID, Pipeline pipeline) throws IOException {
     LOG.trace("addContainer: {}", containerID);
+    return getContainerCommandRequestBuilder(containerID, pipeline).build();
+  }
 
-    ContainerCommandRequestProto.Builder request =
+  private static Builder getContainerCommandRequestBuilder(long containerID,
+      Pipeline pipeline) throws IOException {
+    Builder request =
         ContainerCommandRequestProto.newBuilder();
     request.setCmdType(ContainerProtos.Type.CreateContainer);
     request.setContainerID(containerID);
@@ -366,6 +373,24 @@ public final class ContainerTestHelper {
     request.setTraceID(UUID.randomUUID().toString());
     request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
 
+    return request;
+  }
+
+  /**
+   * Returns a create container command for test purposes. There are a bunch of
+   * tests where we need to just send a request and get a reply.
+   *
+   * @return ContainerCommandRequestProto.
+   */
+  public static ContainerCommandRequestProto getCreateContainerSecureRequest(
+      long containerID, Pipeline pipeline,
+      Token<OzoneBlockTokenIdentifier> token) throws IOException {
+    LOG.trace("addContainer: {}", containerID);
+
+    Builder request = getContainerCommandRequestBuilder(containerID, pipeline);
+    if(token != null){
+      request.setEncodedToken(token.encodeToUrlString());
+    }
     return request.build();
   }
 
@@ -393,7 +418,7 @@ public final class ContainerTestHelper {
     Pipeline pipeline =
         ContainerTestHelper.createSingleNodePipeline();
 
-    ContainerCommandRequestProto.Builder request =
+    Builder request =
         ContainerCommandRequestProto.newBuilder();
     request.setCmdType(ContainerProtos.Type.UpdateContainer);
     request.setContainerID(containerID);
@@ -444,7 +469,7 @@ public final class ContainerTestHelper {
     blockData.setBlockCommitSequenceId(0);
     putRequest.setBlockData(blockData.getProtoBufMessage());
 
-    ContainerCommandRequestProto.Builder request =
+    Builder request =
         ContainerCommandRequestProto.newBuilder();
     request.setCmdType(ContainerProtos.Type.PutBlock);
     request.setContainerID(blockData.getContainerID());
@@ -472,7 +497,7 @@ public final class ContainerTestHelper {
         ContainerProtos.GetBlockRequestProto.newBuilder();
     getRequest.setBlockID(blockID);
 
-    ContainerCommandRequestProto.Builder request =
+    Builder request =
         ContainerCommandRequestProto.newBuilder();
     request.setCmdType(ContainerProtos.Type.GetBlock);
     request.setContainerID(blockID.getContainerID());
@@ -510,7 +535,7 @@ public final class ContainerTestHelper {
     ContainerProtos.DeleteBlockRequestProto.Builder delRequest =
         ContainerProtos.DeleteBlockRequestProto.newBuilder();
     delRequest.setBlockID(blockID);
-    ContainerCommandRequestProto.Builder request =
+    Builder request =
         ContainerCommandRequestProto.newBuilder();
     request.setCmdType(ContainerProtos.Type.DeleteBlock);
     request.setContainerID(blockID.getContainerID());

+ 13 - 7
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestSecureOzoneContainer.java

@@ -120,7 +120,7 @@ public class TestSecureOzoneContainer {
     LOG.info("Test case: requireBlockToken: {} hasBlockToken: {} " +
         "blockTokenExpired: {}.", requireBlockToken, hasBlockToken,
         blockTokeExpired);
-    conf.setBoolean(HddsConfigKeys.HDDS_GRPC_BLOCK_TOKEN_ENABLED,
+    conf.setBoolean(HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED,
         requireBlockToken);
 
     long containerID = ContainerTestHelper.getTestContainerID();
@@ -161,7 +161,7 @@ public class TestSecureOzoneContainer {
           new InetSocketAddress(dn.getIpAddress(), port);
 
       Token<OzoneBlockTokenIdentifier> token =
-          new Token(tokenId.getBytes(), new byte[2], tokenId.getKind(),
+          new Token(tokenId.getBytes(), new byte[50], tokenId.getKind(),
               SecurityUtil.buildTokenService(addr));
       if (hasBlockToken) {
         ugi.addToken(token);
@@ -173,9 +173,15 @@ public class TestSecureOzoneContainer {
           try {
             XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf);
             client.connect(token.encodeToUrlString());
-            createContainerForTesting(client, containerID);
+            if (hasBlockToken) {
+              createContainerForTesting(client, containerID, token);
+            } else {
+              createContainerForTesting(client, containerID, null);
+            }
+
           } catch (Exception e) {
             if (requireBlockToken && hasBlockToken && !blockTokeExpired) {
+              LOG.error("Unexpected error. ", e);
               fail("Client with BlockToken should succeed when block token is" +
                   " required.");
             }
@@ -185,7 +191,7 @@ public class TestSecureOzoneContainer {
             }
             if (requireBlockToken && !hasBlockToken) {
               assertTrue("Receive expected exception", e instanceof
-                  SCMSecurityException);
+                  IOException);
             }
           }
           return null;
@@ -199,11 +205,11 @@ public class TestSecureOzoneContainer {
   }
 
   public static void createContainerForTesting(XceiverClientSpi client,
-      long containerID) throws Exception {
+      long containerID, Token token) throws Exception {
     // Create container
     ContainerProtos.ContainerCommandRequestProto request =
-        ContainerTestHelper.getCreateContainerRequest(
-            containerID, client.getPipeline());
+        ContainerTestHelper.getCreateContainerSecureRequest(
+            containerID, client.getPipeline(), token);
     ContainerProtos.ContainerCommandResponseProto response =
         client.sendCommand(request);
     Assert.assertNotNull(response);

+ 32 - 11
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java

@@ -19,11 +19,14 @@
 package org.apache.hadoop.ozone.container.server;
 
 import com.google.common.collect.Maps;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
@@ -54,14 +57,17 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.util.function.CheckedBiConsumer;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
 import static org.apache.ratis.rpc.SupportedRpcType.NETTY;
@@ -72,8 +78,9 @@ import static org.mockito.Mockito.mock;
  */
 @Ignore("Takes too long to run this test. Ignoring for time being.")
 public class TestContainerServer {
-  static final String TEST_DIR
-      = GenericTestUtils.getTestDir("dfs").getAbsolutePath() + File.separator;
+  static final String TEST_DIR = GenericTestUtils.getTestDir("dfs")
+      .getAbsolutePath() + File.separator;
+  private static final OzoneConfiguration CONF = new OzoneConfiguration();
 
   private GrpcReplicationService createReplicationService(
       ContainerController containerController) {
@@ -81,6 +88,11 @@ public class TestContainerServer {
         new OnDemandContainerReplicationSource(containerController));
   }
 
+  @BeforeClass
+  static public void setup() {
+    CONF.set(HddsConfigKeys.HDDS_METADATA_DIR_NAME, TEST_DIR);
+  }
+
   @Test
   public void testClientServer() throws Exception {
     DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
@@ -151,17 +163,16 @@ public class TestContainerServer {
     try {
       final Pipeline pipeline =
           ContainerTestHelper.createPipeline(numDatanodes);
-      final OzoneConfiguration conf = new OzoneConfiguration();
-      initConf.accept(pipeline, conf);
+      initConf.accept(pipeline, CONF);
 
       for (DatanodeDetails dn : pipeline.getNodes()) {
-        final XceiverServerSpi s = createServer.apply(dn, conf);
+        final XceiverServerSpi s = createServer.apply(dn, CONF);
         servers.add(s);
         s.start();
         initServer.accept(dn, pipeline);
       }
 
-      client = createClient.apply(pipeline, conf);
+      client = createClient.apply(pipeline, CONF);
       client.connect();
 
       final ContainerCommandRequestProto request =
@@ -184,7 +195,7 @@ public class TestContainerServer {
   public void testClientServerWithContainerDispatcher() throws Exception {
     XceiverServerGrpc server = null;
     XceiverClientGrpc client = null;
-
+    UUID scmId = UUID.randomUUID();
     try {
       Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline();
       OzoneConfiguration conf = new OzoneConfiguration();
@@ -196,16 +207,26 @@ public class TestContainerServer {
       VolumeSet volumeSet = mock(VolumeSet.class);
       ContainerMetrics metrics = ContainerMetrics.create(conf);
       Map<ContainerProtos.ContainerType, Handler> handlers = Maps.newHashMap();
+      DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
+      DatanodeStateMachine stateMachine = Mockito.mock(
+          DatanodeStateMachine.class);
+      StateContext context = Mockito.mock(StateContext.class);
+      Mockito.when(stateMachine.getDatanodeDetails())
+          .thenReturn(datanodeDetails);
+      Mockito.when(context.getParent()).thenReturn(stateMachine);
+
+
       for (ContainerProtos.ContainerType containerType :
           ContainerProtos.ContainerType.values()) {
         handlers.put(containerType,
-            Handler.getHandlerForContainerType(
-                containerType, conf, null, containerSet, volumeSet, metrics));
+            Handler.getHandlerForContainerType(containerType, conf, context,
+                containerSet, volumeSet, metrics));
       }
       HddsDispatcher dispatcher = new HddsDispatcher(
-          conf, containerSet, volumeSet, handlers, null, metrics);
+          conf, containerSet, volumeSet, handlers, context, metrics);
+      dispatcher.setScmId(scmId.toString());
       dispatcher.init();
-      DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
+
       server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher,
           createReplicationService(
               new ContainerController(containerSet, null)));

+ 238 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java

@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.server;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
+import org.apache.hadoop.hdds.scm.XceiverClientRatis;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.RatisTestHelper;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
+import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.hadoop.ozone.container.replication.GrpcReplicationService;
+import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.util.function.CheckedBiConsumer;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
+import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
+import static org.apache.ratis.rpc.SupportedRpcType.NETTY;
+
+/**
+ * Test Container servers when security is enabled.
+ */
+public class TestSecureContainerServer {
+  static final String TEST_DIR
+      = GenericTestUtils.getTestDir("dfs").getAbsolutePath() + File.separator;
+  private static final OzoneConfiguration CONF = new OzoneConfiguration();
+
+  private GrpcReplicationService createReplicationService(
+      ContainerController containerController) {
+    return new GrpcReplicationService(
+        new OnDemandContainerReplicationSource(containerController));
+  }
+
+  @BeforeClass
+  static public void setup() {
+    CONF.set(HddsConfigKeys.HDDS_METADATA_DIR_NAME, TEST_DIR);
+    CONF.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
+    CONF.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true);
+  }
+
+  @Test
+  public void testClientServer() throws Exception {
+    DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
+    ContainerSet containerSet = new ContainerSet();
+    ContainerController controller = new ContainerController(
+        containerSet, null);
+    runTestClientServer(1, (pipeline, conf) -> conf
+            .setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
+                pipeline.getFirstNode()
+                    .getPort(DatanodeDetails.Port.Name.STANDALONE).getValue()),
+        XceiverClientGrpc::new,
+        (dn, conf) -> new XceiverServerGrpc(datanodeDetails, conf,
+            new TestContainerDispatcher(),
+            createReplicationService(controller)), (dn, p) -> {
+        });
+  }
+
+  @FunctionalInterface
+  interface CheckedBiFunction<LEFT, RIGHT, OUT, THROWABLE extends Throwable> {
+    OUT apply(LEFT left, RIGHT right) throws THROWABLE;
+  }
+
+  @Test
+  public void testClientServerRatisGrpc() throws Exception {
+    runTestClientServerRatis(GRPC, 1);
+    runTestClientServerRatis(GRPC, 3);
+  }
+
+  @Test
+  @Ignore
+  public void testClientServerRatisNetty() throws Exception {
+    runTestClientServerRatis(NETTY, 1);
+    runTestClientServerRatis(NETTY, 3);
+  }
+
+  static XceiverServerRatis newXceiverServerRatis(
+      DatanodeDetails dn, OzoneConfiguration conf) throws IOException {
+    conf.setInt(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT,
+        dn.getPort(DatanodeDetails.Port.Name.RATIS).getValue());
+    final String dir = TEST_DIR + dn.getUuid();
+    conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
+
+    final ContainerDispatcher dispatcher = new TestContainerDispatcher();
+    return XceiverServerRatis
+        .newXceiverServerRatis(dn, conf, dispatcher, null);
+  }
+
+  static void runTestClientServerRatis(RpcType rpc, int numNodes)
+      throws Exception {
+    runTestClientServer(numNodes,
+        (pipeline, conf) -> RatisTestHelper.initRatisConf(rpc, conf),
+        XceiverClientRatis::newXceiverClientRatis,
+        TestSecureContainerServer::newXceiverServerRatis,
+        (dn, p) -> RatisTestHelper.initXceiverServerRatis(rpc, dn, p));
+  }
+
+  static void runTestClientServer(
+      int numDatanodes,
+      CheckedBiConsumer<Pipeline, OzoneConfiguration, IOException> initConf,
+      CheckedBiFunction<Pipeline, OzoneConfiguration, XceiverClientSpi,
+          IOException> createClient,
+      CheckedBiFunction<DatanodeDetails, OzoneConfiguration, XceiverServerSpi,
+          IOException> createServer,
+      CheckedBiConsumer<DatanodeDetails, Pipeline, IOException> initServer)
+      throws Exception {
+    final List<XceiverServerSpi> servers = new ArrayList<>();
+    XceiverClientSpi client = null;
+    String containerName = OzoneUtils.getRequestID();
+    try {
+      final Pipeline pipeline = ContainerTestHelper.createPipeline(numDatanodes);
+
+      initConf.accept(pipeline, CONF);
+
+      for (DatanodeDetails dn : pipeline.getNodes()) {
+        final XceiverServerSpi s = createServer.apply(dn, CONF);
+        servers.add(s);
+        s.start();
+        initServer.accept(dn, pipeline);
+      }
+
+      client = createClient.apply(pipeline, CONF);
+      client.connect();
+
+      // Test 1: Test failure in request without block token.
+      final ContainerCommandRequestProto request =
+          ContainerTestHelper
+              .getCreateContainerRequest(
+                  ContainerTestHelper.getTestContainerID(), pipeline);
+      Assert.assertNotNull(request.getTraceID());
+
+      XceiverClientSpi finalClient = client;
+      LambdaTestUtils.intercept(IOException.class,
+          () -> finalClient.sendCommand(request));
+
+      // Test 2: Test success in request with valid block token.
+      final ContainerCommandRequestProto request2 =
+          ContainerTestHelper
+              .getCreateContainerSecureRequest(
+                  ContainerTestHelper.getTestContainerID(), pipeline,
+                  new Token<>());
+      Assert.assertNotNull(request2.getTraceID());
+
+      XceiverClientSpi finalClient2 = client;
+      LambdaTestUtils.intercept(IOException.class, "",
+          () -> finalClient2.sendCommand(request));
+    } finally {
+      if (client != null) {
+        client.close();
+      }
+      servers.stream().forEach(XceiverServerSpi::stop);
+    }
+  }
+
+  private static class TestContainerDispatcher implements ContainerDispatcher {
+    /**
+     * Dispatches commands to container layer.
+     *
+     * @param msg - Command Request
+     * @return Command Response
+     */
+    @Override
+    public ContainerCommandResponseProto dispatch(
+        ContainerCommandRequestProto msg,
+        DispatcherContext context) {
+      return ContainerTestHelper.getCreateContainerResponse(msg);
+    }
+
+    @Override
+    public void init() {
+    }
+
+    @Override
+    public void validateContainerCommand(
+        ContainerCommandRequestProto msg) throws StorageContainerException {
+    }
+
+    @Override
+    public void shutdown() {
+    }
+    @Override
+    public Handler getHandler(ContainerProtos.ContainerType containerType) {
+      return null;
+    }
+
+    @Override
+    public void setScmId(String scmId) {
+
+    }
+  }
+
+}

+ 4 - 4
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java

@@ -63,8 +63,8 @@ import org.apache.hadoop.utils.db.DBStore;
 
 import com.google.common.base.Preconditions;
 
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_BLOCK_TOKEN_ENABLED;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_BLOCK_TOKEN_ENABLED_DEFAULT;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
@@ -119,8 +119,8 @@ public class KeyManagerImpl implements KeyManager {
     start(conf);
     this.secretManager = secretManager;
     this.grpcBlockTokenEnabled = conf.getBoolean(
-        HDDS_GRPC_BLOCK_TOKEN_ENABLED,
-        HDDS_GRPC_BLOCK_TOKEN_ENABLED_DEFAULT);
+        HDDS_BLOCK_TOKEN_ENABLED,
+        HDDS_BLOCK_TOKEN_ENABLED_DEFAULT);
   }
 
   @Override

+ 4 - 9
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java

@@ -255,7 +255,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
 
     omRpcAddressTxt = new Text(OmUtils.getOmRpcAddress(configuration));
     secConfig = new SecurityConfig(configuration);
-    if (secConfig.isGrpcBlockTokenEnabled()) {
+    if (secConfig.isBlockTokenEnabled()) {
       blockTokenMgr = createBlockTokenSecretManager(configuration);
     }
     if(secConfig.isSecurityEnabled()){
@@ -385,7 +385,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     if (testSecureOmFlag) {
       return new OzoneBlockTokenSecretManager(secConfig, expiryTime, "1");
     }
-    Objects.nonNull(certClient);
+    Objects.requireNonNull(certClient);
     return new OzoneBlockTokenSecretManager(secConfig, expiryTime,
         certClient.getCertificate(OM_DAEMON).getSerialNumber().toString());
   }
@@ -418,7 +418,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       LOG.error("Unable to read key pair for OM.", e);
       throw new RuntimeException(e);
     }
-    if (secConfig.isGrpcBlockTokenEnabled() && blockTokenMgr != null) {
+    if (secConfig.isBlockTokenEnabled() && blockTokenMgr != null) {
       try {
         LOG.info("Starting OM block token secret manager");
         blockTokenMgr.start(keyPair);
@@ -982,11 +982,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     }
   }
 
-  private boolean shouldUseDelegationTokens() {
-    return UserGroupInformation.isSecurityEnabled();
-  }
-
-
   /**
    *
    * @return true if delegation token operation is allowed
@@ -1770,7 +1765,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
 
   @Override
   public AuditMessage buildAuditMessageForSuccess(AuditAction op,
-                                                  Map<String, String> auditMap) {
+      Map<String, String> auditMap) {
     return new AuditMessage.Builder()
         .setUser((Server.getRemoteUser() == null) ? null :
             Server.getRemoteUser().getUserName())