Browse Source

HDDS-9. Add GRPC protocol interceptors for Ozone Block Token. Contributed by Xiaoyu Yao.

Xiaoyu Yao 6 years ago
parent
commit
7e2770699c
26 changed files with 843 additions and 61 deletions
  1. 65 0
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ClientCredentialInterceptor.java
  2. 57 7
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
  3. 3 2
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
  4. 52 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java
  5. 23 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/package-info.java
  6. 8 19
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenException.java
  7. 113 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java
  8. 29 8
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenIdentifier.java
  9. 1 1
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenSelector.java
  10. 38 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/TokenVerifier.java
  11. 22 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/package-info.java
  12. 11 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java
  13. 1 1
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateServer.java
  14. 11 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
  15. 1 1
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/CertificateSignRequest.java
  16. 1 1
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/SelfSignedCertificate.java
  17. 2 12
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/exceptions/CertificateException.java
  18. 14 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
  19. 1 1
      hadoop-hdds/common/src/main/proto/hdds.proto
  20. 62 4
      hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/token/TestOzoneBlockTokenIdentifier.java
  21. 22 0
      hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/token/package-info.java
  22. 1 1
      hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificates/TestCertificateSignRequest.java
  23. 1 1
      hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificates/TestRootCertificate.java
  24. 74 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ServerCredentialInterceptor.java
  25. 21 2
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
  26. 209 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestSecureOzoneContainer.java

+ 65 - 0
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ClientCredentialInterceptor.java

@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm;
+
+import org.apache.ratis.thirdparty.io.grpc.CallOptions;
+import org.apache.ratis.thirdparty.io.grpc.Channel;
+import org.apache.ratis.thirdparty.io.grpc.ClientCall;
+import org.apache.ratis.thirdparty.io.grpc.ClientInterceptor;
+import org.apache.ratis.thirdparty.io.grpc.ForwardingClientCall;
+import org.apache.ratis.thirdparty.io.grpc.Metadata;
+import org.apache.ratis.thirdparty.io.grpc.MethodDescriptor;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OBT_METADATA_KEY;
+import static org.apache.hadoop.ozone.OzoneConsts.USER_METADATA_KEY;
+
+/**
+ * GRPC client interceptor for ozone block token.
+ */
+public class ClientCredentialInterceptor implements ClientInterceptor {
+
+  private final String user;
+  private final String token;
+
+  public ClientCredentialInterceptor(String user, String token) {
+    this.user = user;
+    this.token = token;
+  }
+
+  @Override
+  public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
+      MethodDescriptor<ReqT, RespT> method,
+      CallOptions callOptions,
+      Channel next) {
+
+    return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
+        next.newCall(method, callOptions)) {
+      @Override
+      public void start(Listener<RespT> responseListener, Metadata headers) {
+        if (token != null) {
+          headers.put(OBT_METADATA_KEY, token);
+        }
+        if (user != null) {
+          headers.put(USER_METADATA_KEY, user);
+        }
+        super.start(responseListener, headers);
+      }
+    };
+  }
+}

+ 57 - 7
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java

@@ -31,22 +31,32 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServi
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSelector;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
 import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
+import org.apache.ratis.thirdparty.io.grpc.Status;
 import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
 import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
 import java.util.List;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutionException;
-import java.util.UUID;
-import java.util.Map;
-import java.util.HashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeoutException;
@@ -93,7 +103,9 @@ public class XceiverClientGrpc extends XceiverClientSpi {
     connectToDatanode(dn);
     connectToDatanode(dn);
   }
   }
 
 
-  private void connectToDatanode(DatanodeDetails dn) {
+
+  private void connectToDatanode(DatanodeDetails dn) throws IOException,
+      SCMSecurityException {
     // read port from the data node, on failure use default configured
     // read port from the data node, on failure use default configured
     // port.
     // port.
     int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
     int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
@@ -101,16 +113,49 @@ public class XceiverClientGrpc extends XceiverClientSpi {
       port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
       port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
           OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
           OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
     }
     }
+
+    // Add credential context to the client call
+    String userName = UserGroupInformation.getCurrentUser()
+        .getShortUserName();
+
+    // Add block token if block token (mutual auth) is required but the client
+    // does not have a mTLS (private key and ca signed certificate)
+    String encodedToken = null;
+    SecurityConfig secConfig = new SecurityConfig(config);
+    if (secConfig.isGrpcBlockTokenEnabled()) {
+      InetSocketAddress addr = new InetSocketAddress(dn.getIpAddress(), port);
+      encodedToken = getEncodedBlockToken(addr);
+      if (encodedToken == null) {
+        throw new SCMSecurityException("No Block token available to access " +
+            "service at : " + addr.toString());
+      }
+    }
     LOG.debug("Connecting to server Port : " + dn.getIpAddress());
     LOG.debug("Connecting to server Port : " + dn.getIpAddress());
-    ManagedChannel channel =
-        NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext()
+    NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(dn
+            .getIpAddress(), port).usePlaintext()
             .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
             .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
-            .build();
+            .intercept(new ClientCredentialInterceptor(userName, encodedToken));
+    ManagedChannel channel = channelBuilder.build();
     XceiverClientProtocolServiceStub asyncStub =
     XceiverClientProtocolServiceStub asyncStub =
         XceiverClientProtocolServiceGrpc.newStub(channel);
         XceiverClientProtocolServiceGrpc.newStub(channel);
     asyncStubs.put(dn.getUuid(), asyncStub);
     asyncStubs.put(dn.getUuid(), asyncStub);
     channels.put(dn.getUuid(), channel);
     channels.put(dn.getUuid(), channel);
   }
   }
+
+  private String getEncodedBlockToken(InetSocketAddress addr)
+      throws IOException{
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    OzoneBlockTokenSelector tokenSelector = new OzoneBlockTokenSelector();
+    Text service = SecurityUtil.buildTokenService(addr);
+    Token<OzoneBlockTokenIdentifier> token = tokenSelector.selectToken(
+        service, ugi.getTokens());
+    if (token != null) {
+      token.setService(service);
+      return token.encodeToUrlString();
+    }
+    return null;
+  }
+
   /**
   /**
    * Returns if the xceiver client connects to all servers in the pipeline.
    * Returns if the xceiver client connects to all servers in the pipeline.
    *
    *
@@ -173,6 +218,11 @@ public class XceiverClientGrpc extends XceiverClientSpi {
       } catch (ExecutionException | InterruptedException e) {
       } catch (ExecutionException | InterruptedException e) {
         LOG.warn("Failed to execute command " + request + " on datanode " + dn
         LOG.warn("Failed to execute command " + request + " on datanode " + dn
             .getUuidString(), e);
             .getUuidString(), e);
+        if (Status.fromThrowable(e.getCause()).getCode()
+            == Status.UNAUTHENTICATED.getCode()) {
+          throw new SCMSecurityException("Failed to authenticate with " +
+              "GRPC XceiverServer with Ozone block token.");
+        }
       }
       }
     }
     }
 
 

+ 3 - 2
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java

@@ -145,6 +145,7 @@ public final class HddsConfigKeys {
       "hdds.x509.signature.algorithm";
       "hdds.x509.signature.algorithm";
   public static final String HDDS_X509_SIGNATURE_ALGO_DEFAULT = "SHA256withRSA";
   public static final String HDDS_X509_SIGNATURE_ALGO_DEFAULT = "SHA256withRSA";
 
 
-
-
+  public static final String HDDS_GRPC_BLOCK_TOKEN_ENABLED = "hdds.grpc.block" +
+      ".token.enabled";
+  public static final boolean HDDS_GRPC_BLOCK_TOKEN_ENABLED_DEFAULT = false;
 }
 }

+ 52 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java

@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.security.exception;
+
+import java.io.IOException;
+
+/**
+ * Root Security Exception call for all Certificate related Execptions.
+ */
+public class SCMSecurityException extends IOException {
+
+  /**
+   * Ctor.
+   * @param message - Error Message.
+   */
+  public SCMSecurityException(String message) {
+    super(message);
+  }
+
+  /**
+   * Ctor.
+   * @param message - Message.
+   * @param cause  - Actual cause.
+   */
+  public SCMSecurityException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  /**
+   * Ctor.
+   * @param cause - Base Exception.
+   */
+  public SCMSecurityException(Throwable cause) {
+    super(cause);
+  }
+}

+ 23 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/package-info.java

@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * Exceptions thrown by SCM security classes.
+ */
+package org.apache.hadoop.hdds.security.exception;

+ 8 - 19
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/exceptions/SCMSecurityException.java → hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenException.java

@@ -17,18 +17,20 @@
  *
  *
  */
  */
 
 
-package org.apache.hadoop.hdds.security.x509.exceptions;
+package org.apache.hadoop.hdds.security.token;
+
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 
 
 /**
 /**
- * Root Security Exception call for all Certificate related Execptions.
+ * Block Token Exceptions from the SCM Security layer.
  */
  */
-public class SCMSecurityException extends Exception {
+public class BlockTokenException extends SCMSecurityException {
 
 
   /**
   /**
    * Ctor.
    * Ctor.
    * @param message - Error Message.
    * @param message - Error Message.
    */
    */
-  public SCMSecurityException(String message) {
+  public BlockTokenException(String message) {
     super(message);
     super(message);
   }
   }
 
 
@@ -37,7 +39,7 @@ public class SCMSecurityException extends Exception {
    * @param message - Message.
    * @param message - Message.
    * @param cause  - Actual cause.
    * @param cause  - Actual cause.
    */
    */
-  public SCMSecurityException(String message, Throwable cause) {
+  public BlockTokenException(String message, Throwable cause) {
     super(message, cause);
     super(message, cause);
   }
   }
 
 
@@ -45,20 +47,7 @@ public class SCMSecurityException extends Exception {
    * Ctor.
    * Ctor.
    * @param cause - Base Exception.
    * @param cause - Base Exception.
    */
    */
-  public SCMSecurityException(Throwable cause) {
+  public BlockTokenException(Throwable cause) {
     super(cause);
     super(cause);
   }
   }
-
-
-  /**
-   * Ctor.
-   * @param message - Error Message
-   * @param cause  - Cause
-   * @param enableSuppression - Enable suppression.
-   * @param writableStackTrace - Writable stack trace.
-   */
-  public SCMSecurityException(String message, Throwable cause,
-      boolean enableSuppression, boolean writableStackTrace) {
-    super(message, cause, enableSuppression, writableStackTrace);
-  }
 }
 }

+ 113 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java

@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.security.token;
+
+import com.google.common.base.Strings;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Time;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.security.cert.X509Certificate;
+
+
+/**
+ * Verify token and return a UGI with token if authenticated.
+ */
+public class BlockTokenVerifier implements TokenVerifier {
+
+  private final CertificateClient caClient;
+  private final SecurityConfig conf;
+
+  public BlockTokenVerifier(SecurityConfig conf, CertificateClient caClient) {
+    this.conf = conf;
+    this.caClient = caClient;
+  }
+
+  private boolean isExpired(long expiryDate) {
+    return Time.now() > expiryDate;
+  }
+
+  @Override
+  public UserGroupInformation verify(String user, String tokenStr)
+      throws SCMSecurityException {
+    if (conf.isGrpcBlockTokenEnabled()) {
+      if (Strings.isNullOrEmpty(tokenStr)) {
+        throw new BlockTokenException("Fail to find any token (empty or " +
+            "null.");
+      }
+      final Token<OzoneBlockTokenIdentifier> token = new Token();
+      OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier();
+      try {
+        token.decodeFromUrlString(tokenStr);
+        ByteArrayInputStream buf = new ByteArrayInputStream(
+            token.getIdentifier());
+        DataInputStream in = new DataInputStream(buf);
+        tokenId.readFields(in);
+      } catch (IOException ex) {
+        throw new BlockTokenException("Failed to decode token : " + tokenStr);
+      }
+
+      // TODO: revisit this when caClient is ready, skip signature check now.
+      /**
+       * the final code should like
+       * if (caClient == null) {
+       *   throw new SCMSecurityException("Certificate client not available to
+       *       validate token");
+       * }
+       */
+      if (caClient != null) {
+        X509Certificate singerCert = caClient.queryCertificate(
+            "certId=" + tokenId.getOmCertSerialId());
+        if (singerCert == null) {
+          throw new BlockTokenException("Can't find signer certificate " +
+              "(OmCertSerialId: " + tokenId.getOmCertSerialId() +
+              ") of the block token for user: " + tokenId.getUser());
+        }
+        Boolean validToken = caClient.verifySignature(tokenId.getBytes(),
+            token.getPassword(), singerCert);
+        if (!validToken) {
+          throw new BlockTokenException("Invalid block token for user: " +
+              tokenId.getUser());
+        }
+      }
+      // check expiration
+      if (isExpired(tokenId.getExpiryDate())) {
+        UserGroupInformation tokenUser = tokenId.getUser();
+        tokenUser.setAuthenticationMethod(
+            UserGroupInformation.AuthenticationMethod.TOKEN);
+        throw new BlockTokenException("Expired block token for user: " +
+            tokenUser);
+      }
+      // defer access mode, bcsid and maxLength check to container dispatcher
+      UserGroupInformation ugi = tokenId.getUser();
+      ugi.addToken(token);
+      ugi.setAuthenticationMethod(UserGroupInformation
+          .AuthenticationMethod.TOKEN);
+      return ugi;
+    } else {
+      return UserGroupInformation.createRemoteUser(user);
+    }
+  }
+}

+ 29 - 8
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneBlockTokenIdentifier.java → hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenIdentifier.java

@@ -16,7 +16,7 @@
  * limitations under the License.
  * limitations under the License.
  */
  */
 
 
-package org.apache.hadoop.ozone.security;
+package org.apache.hadoop.hdds.security.token;
 
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.EqualsBuilder;
@@ -49,16 +49,22 @@ public class OzoneBlockTokenIdentifier extends TokenIdentifier {
   private long expiryDate;
   private long expiryDate;
   private String ownerId;
   private String ownerId;
   private String blockId;
   private String blockId;
-  private final EnumSet<AccessModeProto> modes;
-  private final String omCertSerialId;
+  private EnumSet<AccessModeProto> modes;
+  private String omCertSerialId;
+  private long maxLength;
+
+  public OzoneBlockTokenIdentifier() {
+  }
 
 
   public OzoneBlockTokenIdentifier(String ownerId, String blockId,
   public OzoneBlockTokenIdentifier(String ownerId, String blockId,
-      EnumSet<AccessModeProto> modes, long expiryDate, String omCertSerialId) {
+      EnumSet<AccessModeProto> modes, long expiryDate, String omCertSerialId,
+      long maxLength) {
     this.ownerId = ownerId;
     this.ownerId = ownerId;
     this.blockId = blockId;
     this.blockId = blockId;
     this.expiryDate = expiryDate;
     this.expiryDate = expiryDate;
     this.modes = modes == null ? EnumSet.noneOf(AccessModeProto.class) : modes;
     this.modes = modes == null ? EnumSet.noneOf(AccessModeProto.class) : modes;
     this.omCertSerialId = omCertSerialId;
     this.omCertSerialId = omCertSerialId;
+    this.maxLength = maxLength;
   }
   }
 
 
   @Override
   @Override
@@ -89,6 +95,10 @@ public class OzoneBlockTokenIdentifier extends TokenIdentifier {
     return omCertSerialId;
     return omCertSerialId;
   }
   }
 
 
+  public long getMaxLength() {
+    return maxLength;
+  }
+
   @Override
   @Override
   public Text getKind() {
   public Text getKind() {
     return KIND_NAME;
     return KIND_NAME;
@@ -100,7 +110,7 @@ public class OzoneBlockTokenIdentifier extends TokenIdentifier {
         + ", ownerId=" + this.getOwnerId()
         + ", ownerId=" + this.getOwnerId()
         + ", omCertSerialId=" + this.getOmCertSerialId()
         + ", omCertSerialId=" + this.getOmCertSerialId()
         + ", blockId=" + this.getBlockId() + ", access modes="
         + ", blockId=" + this.getBlockId() + ", access modes="
-        + this.getAccessModes() + ")";
+        + this.getAccessModes() + ", maxLength=" + this.getMaxLength() + ")";
   }
   }
 
 
   static boolean isEqual(Object a, Object b) {
   static boolean isEqual(Object a, Object b) {
@@ -121,6 +131,7 @@ public class OzoneBlockTokenIdentifier extends TokenIdentifier {
           .append(this.blockId, that.blockId)
           .append(this.blockId, that.blockId)
           .append(this.modes, that.modes)
           .append(this.modes, that.modes)
           .append(this.omCertSerialId, that.omCertSerialId)
           .append(this.omCertSerialId, that.omCertSerialId)
+          .append(this.maxLength, that.maxLength)
           .build();
           .build();
     }
     }
     return false;
     return false;
@@ -134,6 +145,7 @@ public class OzoneBlockTokenIdentifier extends TokenIdentifier {
         .append(this.ownerId)
         .append(this.ownerId)
         .append(this.modes)
         .append(this.modes)
         .append(this.omCertSerialId)
         .append(this.omCertSerialId)
+        .append(this.maxLength)
         .build();
         .build();
   }
   }
 
 
@@ -143,7 +155,14 @@ public class OzoneBlockTokenIdentifier extends TokenIdentifier {
     if (!dis.markSupported()) {
     if (!dis.markSupported()) {
       throw new IOException("Could not peek first byte.");
       throw new IOException("Could not peek first byte.");
     }
     }
-    readFieldsProtobuf(dis);
+    BlockTokenSecretProto tokenPtoto =
+        BlockTokenSecretProto.parseFrom((DataInputStream) in);
+    this.ownerId = tokenPtoto.getOwnerId();
+    this.blockId = tokenPtoto.getBlockId();
+    this.modes = EnumSet.copyOf(tokenPtoto.getModesList());
+    this.expiryDate = tokenPtoto.getExpiryDate();
+    this.omCertSerialId = tokenPtoto.getOmCertSerialId();
+    this.maxLength = tokenPtoto.getMaxLength();
   }
   }
 
 
   @VisibleForTesting
   @VisibleForTesting
@@ -153,7 +172,8 @@ public class OzoneBlockTokenIdentifier extends TokenIdentifier {
         BlockTokenSecretProto.parseFrom((DataInputStream) in);
         BlockTokenSecretProto.parseFrom((DataInputStream) in);
     return new OzoneBlockTokenIdentifier(tokenPtoto.getOwnerId(),
     return new OzoneBlockTokenIdentifier(tokenPtoto.getOwnerId(),
         tokenPtoto.getBlockId(), EnumSet.copyOf(tokenPtoto.getModesList()),
         tokenPtoto.getBlockId(), EnumSet.copyOf(tokenPtoto.getModesList()),
-        tokenPtoto.getExpiryDate(), tokenPtoto.getOmCertSerialId());
+        tokenPtoto.getExpiryDate(), tokenPtoto.getOmCertSerialId(),
+        tokenPtoto.getMaxLength());
   }
   }
 
 
   @Override
   @Override
@@ -167,7 +187,8 @@ public class OzoneBlockTokenIdentifier extends TokenIdentifier {
         .setBlockId(this.getBlockId())
         .setBlockId(this.getBlockId())
         .setOwnerId(this.getOwnerId())
         .setOwnerId(this.getOwnerId())
         .setOmCertSerialId(this.getOmCertSerialId())
         .setOmCertSerialId(this.getOmCertSerialId())
-        .setExpiryDate(this.getExpiryDate());
+        .setExpiryDate(this.getExpiryDate())
+        .setMaxLength(this.getMaxLength());
     // Add access mode allowed
     // Add access mode allowed
     for (AccessModeProto mode : this.getAccessModes()) {
     for (AccessModeProto mode : this.getAccessModes()) {
       builder.addModes(AccessModeProto.valueOf(mode.name()));
       builder.addModes(AccessModeProto.valueOf(mode.name()));

+ 1 - 1
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneBlockTokenSelector.java → hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenSelector.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * See the License for the specific language governing permissions and
  * limitations under the License.
  * limitations under the License.
  */
  */
-package org.apache.hadoop.ozone.security;
+package org.apache.hadoop.hdds.security.token;
 
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;

+ 38 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/TokenVerifier.java

@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.security.token;
+
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Ozone GRPC token header verifier.
+ */
+public interface TokenVerifier {
+  /**
+   * Given a user and tokenStr header, return a UGI object with token if
+   * verified.
+   * @param user user of the request
+   * @param tokenStr token str of the request
+   * @return UGI
+   * @throws SCMSecurityException
+   */
+  UserGroupInformation verify(String user, String tokenStr)
+      throws SCMSecurityException;
+}

+ 22 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/package-info.java

@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains the block token related test classes.
+ */
+package org.apache.hadoop.hdds.security.token;

+ 11 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java

@@ -34,6 +34,8 @@ import java.time.Duration;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DEFAULT_KEY_ALGORITHM;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DEFAULT_KEY_ALGORITHM;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DEFAULT_KEY_LEN;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DEFAULT_KEY_LEN;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DEFAULT_SECURITY_PROVIDER;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DEFAULT_SECURITY_PROVIDER;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_BLOCK_TOKEN_ENABLED;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_BLOCK_TOKEN_ENABLED_DEFAULT;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_KEY_ALGORITHM;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_KEY_ALGORITHM;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_KEY_DIR_NAME;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_KEY_DIR_NAME;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_KEY_DIR_NAME_DEFAULT;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_KEY_DIR_NAME_DEFAULT;
@@ -70,6 +72,7 @@ public class SecurityConfig {
   private final String publicKeyFileName;
   private final String publicKeyFileName;
   private final Duration certDuration;
   private final Duration certDuration;
   private final String x509SignatureAlgo;
   private final String x509SignatureAlgo;
+  private final Boolean grpcBlockTokenEnabled;
 
 
   /**
   /**
    * Constructs a SecurityConfig.
    * Constructs a SecurityConfig.
@@ -106,6 +109,10 @@ public class SecurityConfig {
     this.x509SignatureAlgo = this.configuration.get(HDDS_X509_SIGNATURE_ALGO,
     this.x509SignatureAlgo = this.configuration.get(HDDS_X509_SIGNATURE_ALGO,
         HDDS_X509_SIGNATURE_ALGO_DEFAULT);
         HDDS_X509_SIGNATURE_ALGO_DEFAULT);
 
 
+    this.grpcBlockTokenEnabled = this.configuration.getBoolean(
+        HDDS_GRPC_BLOCK_TOKEN_ENABLED,
+        HDDS_GRPC_BLOCK_TOKEN_ENABLED_DEFAULT);
+
     // First Startup -- if the provider is null, check for the provider.
     // First Startup -- if the provider is null, check for the provider.
     if (SecurityConfig.provider == null) {
     if (SecurityConfig.provider == null) {
       synchronized (SecurityConfig.class) {
       synchronized (SecurityConfig.class) {
@@ -213,6 +220,10 @@ public class SecurityConfig {
     return this.certDuration;
     return this.certDuration;
   }
   }
 
 
+  public Boolean isGrpcBlockTokenEnabled() {
+    return this.grpcBlockTokenEnabled;
+  }
+
   /**
   /**
    * Adds a security provider dynamically if it is not loaded already.
    * Adds a security provider dynamically if it is not loaded already.
    *
    *

+ 1 - 1
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateServer.java

@@ -19,8 +19,8 @@
 
 
 package org.apache.hadoop.hdds.security.x509.certificate.authority;
 package org.apache.hadoop.hdds.security.x509.certificate.authority;
 
 
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.certificates.CertificateSignRequest;
 import org.apache.hadoop.hdds.security.x509.certificates.CertificateSignRequest;
-import org.apache.hadoop.hdds.security.x509.exceptions.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.bouncycastle.cert.X509CertificateHolder;
 import org.bouncycastle.cert.X509CertificateHolder;
 
 

+ 11 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java

@@ -90,6 +90,17 @@ public interface CertificateClient {
   boolean verifySignature(InputStream stream, byte[] signature,
   boolean verifySignature(InputStream stream, byte[] signature,
       X509Certificate cert);
       X509Certificate cert);
 
 
+  /**
+   * Verifies a digital Signature, given the signature and the certificate of
+   * the signer.
+   * @param data - Data in byte array.
+   * @param signature - Byte Array containing the signature.
+   * @param cert - Certificate of the Signer.
+   * @return true if verified, false if not.
+   */
+  boolean verifySignature(byte[] data, byte[] signature,
+      X509Certificate cert);
+
   /**
   /**
    * Returns a CSR builder that can be used to creates a Certificate sigining
    * Returns a CSR builder that can be used to creates a Certificate sigining
    * request.
    * request.

+ 1 - 1
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/CertificateSignRequest.java

@@ -20,9 +20,9 @@ package org.apache.hadoop.hdds.security.x509.certificates;
 
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.exceptions.CertificateException;
 import org.apache.hadoop.hdds.security.x509.exceptions.CertificateException;
-import org.apache.hadoop.hdds.security.x509.exceptions.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.keys.SecurityUtil;
 import org.apache.hadoop.hdds.security.x509.keys.SecurityUtil;
 import org.apache.logging.log4j.util.Strings;
 import org.apache.logging.log4j.util.Strings;
 import org.bouncycastle.asn1.DEROctetString;
 import org.bouncycastle.asn1.DEROctetString;

+ 1 - 1
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/SelfSignedCertificate.java

@@ -22,9 +22,9 @@ package org.apache.hadoop.hdds.security.x509.certificates;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.exceptions.CertificateException;
 import org.apache.hadoop.hdds.security.x509.exceptions.CertificateException;
-import org.apache.hadoop.hdds.security.x509.exceptions.SCMSecurityException;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Time;
 import org.apache.logging.log4j.util.Strings;
 import org.apache.logging.log4j.util.Strings;
 import org.bouncycastle.asn1.x500.X500Name;
 import org.bouncycastle.asn1.x500.X500Name;

+ 2 - 12
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/exceptions/CertificateException.java

@@ -19,6 +19,8 @@
 
 
 package org.apache.hadoop.hdds.security.x509.exceptions;
 package org.apache.hadoop.hdds.security.x509.exceptions;
 
 
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+
 /**
 /**
  * Certificate Exceptions from the SCM Security layer.
  * Certificate Exceptions from the SCM Security layer.
  */
  */
@@ -48,16 +50,4 @@ public class CertificateException extends SCMSecurityException {
   public CertificateException(Throwable cause) {
   public CertificateException(Throwable cause) {
     super(cause);
     super(cause);
   }
   }
-
-  /**
-   * Ctor.
-   * @param message - Error Message
-   * @param cause  - Cause
-   * @param enableSuppression - Enable suppression.
-   * @param writableStackTrace - Writable stack trace.
-   */
-  public CertificateException(String message, Throwable cause,
-      boolean enableSuppression, boolean writableStackTrace) {
-    super(message, cause, enableSuppression, writableStackTrace);
-  }
 }
 }

+ 14 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java

@@ -19,6 +19,11 @@
 package org.apache.hadoop.ozone;
 package org.apache.hadoop.ozone;
 
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ratis.thirdparty.io.grpc.Context;
+import org.apache.ratis.thirdparty.io.grpc.Metadata;
+
+import static org.apache.ratis.thirdparty.io.grpc.Metadata.ASCII_STRING_MARSHALLER;
 
 
 /**
 /**
  * Set of constants used in Ozone implementation.
  * Set of constants used in Ozone implementation.
@@ -248,4 +253,13 @@ public final class OzoneConsts {
   // For Multipart upload
   // For Multipart upload
   public static final int OM_MULTIPART_MIN_SIZE = 5 * 1024 * 1024;
   public static final int OM_MULTIPART_MIN_SIZE = 5 * 1024 * 1024;
 
 
+  // GRPC block token metadata header and context key
+  public static final String OZONE_BLOCK_TOKEN = "blocktoken";
+  public static final Context.Key<UserGroupInformation> UGI_CTX_KEY =
+      Context.key("UGI");
+
+  public static final Metadata.Key<String> OBT_METADATA_KEY =
+      Metadata.Key.of(OZONE_BLOCK_TOKEN, ASCII_STRING_MARSHALLER);
+  public static final Metadata.Key<String> USER_METADATA_KEY =
+      Metadata.Key.of(OZONE_USER, ASCII_STRING_MARSHALLER);
 }
 }

+ 1 - 1
hadoop-hdds/common/src/main/proto/hdds.proto

@@ -216,7 +216,7 @@ message BlockTokenSecretProto {
     required uint64 expiryDate = 3;
     required uint64 expiryDate = 3;
     required string omCertSerialId = 4;
     required string omCertSerialId = 4;
     repeated AccessModeProto modes = 5;
     repeated AccessModeProto modes = 5;
-
+    required uint64 maxLength = 6;
 }
 }
 
 
 message BlockID {
 message BlockID {

+ 62 - 4
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/TestOzoneBlockTokenIdentifier.java → hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/token/TestOzoneBlockTokenIdentifier.java

@@ -15,8 +15,10 @@
  * See the License for the specific language governing permissions and
  * See the License for the specific language governing permissions and
  * limitations under the License.
  * limitations under the License.
  */
  */
-package org.apache.hadoop.ozone.security;
+package org.apache.hadoop.hdds.security.token;
 
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
 import java.security.GeneralSecurityException;
 import java.security.GeneralSecurityException;
@@ -38,14 +40,19 @@ import java.util.Map;
 import javax.crypto.KeyGenerator;
 import javax.crypto.KeyGenerator;
 import javax.crypto.Mac;
 import javax.crypto.Mac;
 import javax.crypto.SecretKey;
 import javax.crypto.SecretKey;
+
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
 import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Time;
 import org.junit.After;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
@@ -109,7 +116,7 @@ public class TestOzoneBlockTokenIdentifier {
     OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier(
     OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier(
         "testUser", "84940",
         "testUser", "84940",
         EnumSet.allOf(HddsProtos.BlockTokenSecretProto.AccessModeProto.class),
         EnumSet.allOf(HddsProtos.BlockTokenSecretProto.AccessModeProto.class),
-        expiryTime, cert.getSerialNumber().toString());
+        expiryTime, cert.getSerialNumber().toString(), 128L);
     byte[] signedToken = signTokenAsymmetric(tokenId, privateKey);
     byte[] signedToken = signTokenAsymmetric(tokenId, privateKey);
 
 
     // Verify a valid signed OzoneMaster Token with Ozone Master
     // Verify a valid signed OzoneMaster Token with Ozone Master
@@ -121,12 +128,63 @@ public class TestOzoneBlockTokenIdentifier {
     // public key(certificate)
     // public key(certificate)
     tokenId = new OzoneBlockTokenIdentifier("", "",
     tokenId = new OzoneBlockTokenIdentifier("", "",
         EnumSet.allOf(HddsProtos.BlockTokenSecretProto.AccessModeProto.class),
         EnumSet.allOf(HddsProtos.BlockTokenSecretProto.AccessModeProto.class),
-        expiryTime, cert.getSerialNumber().toString());
+        expiryTime, cert.getSerialNumber().toString(), 128L);
     LOG.info("Unsigned token {} is {}", tokenId,
     LOG.info("Unsigned token {} is {}", tokenId,
         verifyTokenAsymmetric(tokenId, RandomUtils.nextBytes(128), cert));
         verifyTokenAsymmetric(tokenId, RandomUtils.nextBytes(128), cert));
 
 
   }
   }
 
 
+  @Test
+  public void testTokenSerialization() throws GeneralSecurityException,
+      IOException {
+    String keystore = new File(KEYSTORES_DIR, "keystore.jks")
+        .getAbsolutePath();
+    String truststore = new File(KEYSTORES_DIR, "truststore.jks")
+        .getAbsolutePath();
+    String trustPassword = "trustPass";
+    String keyStorePassword = "keyStorePass";
+    String keyPassword = "keyPass";
+    long maxLength = 128L;
+
+    KeyStoreTestUtil.createKeyStore(keystore, keyStorePassword, keyPassword,
+        "OzoneMaster", keyPair.getPrivate(), cert);
+
+    // Create trust store and put the certificate in the trust store
+    Map<String, X509Certificate> certs = Collections.singletonMap("server",
+        cert);
+    KeyStoreTestUtil.createTrustStore(truststore, trustPassword, certs);
+
+    // Sign the OzoneMaster Token with Ozone Master private key
+    PrivateKey privateKey = keyPair.getPrivate();
+    OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier(
+        "testUser", "84940",
+        EnumSet.allOf(HddsProtos.BlockTokenSecretProto.AccessModeProto.class),
+        expiryTime, cert.getSerialNumber().toString(), maxLength);
+    byte[] signedToken = signTokenAsymmetric(tokenId, privateKey);
+
+
+    Token<BlockTokenIdentifier> token = new Token(tokenId.getBytes(),
+        signedToken, tokenId.getKind(), new Text("host:port"));
+
+    String encodeToUrlString = token.encodeToUrlString();
+
+    Token<BlockTokenIdentifier>decodedToken = new Token();
+    decodedToken.decodeFromUrlString(encodeToUrlString);
+
+    OzoneBlockTokenIdentifier decodedTokenId = new OzoneBlockTokenIdentifier();
+    decodedTokenId.readFields(new DataInputStream(
+        new ByteArrayInputStream(decodedToken.getIdentifier())));
+
+    Assert.assertEquals(decodedTokenId, tokenId);
+    Assert.assertEquals(decodedTokenId.getMaxLength(), maxLength);
+
+    // Verify a decoded signed Token with public key(certificate)
+    boolean isValidToken = verifyTokenAsymmetric(decodedTokenId, decodedToken
+        .getPassword(), cert);
+    LOG.info("{} is {}", tokenId, isValidToken ? "valid." : "invalid.");
+  }
+
+
   public byte[] signTokenAsymmetric(OzoneBlockTokenIdentifier tokenId,
   public byte[] signTokenAsymmetric(OzoneBlockTokenIdentifier tokenId,
       PrivateKey privateKey) throws NoSuchAlgorithmException,
       PrivateKey privateKey) throws NoSuchAlgorithmException,
       InvalidKeyException, SignatureException {
       InvalidKeyException, SignatureException {
@@ -162,7 +220,7 @@ public class TestOzoneBlockTokenIdentifier {
     return new OzoneBlockTokenIdentifier(RandomStringUtils.randomAlphabetic(6),
     return new OzoneBlockTokenIdentifier(RandomStringUtils.randomAlphabetic(6),
         RandomStringUtils.randomAlphabetic(5),
         RandomStringUtils.randomAlphabetic(5),
         EnumSet.allOf(HddsProtos.BlockTokenSecretProto.AccessModeProto.class),
         EnumSet.allOf(HddsProtos.BlockTokenSecretProto.AccessModeProto.class),
-        expiryTime, cert.getSerialNumber().toString());
+        expiryTime, cert.getSerialNumber().toString(), 1024768L);
   }
   }
 
 
   @Test
   @Test

+ 22 - 0
hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/token/package-info.java

@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains the block token related classes.
+ */
+package org.apache.hadoop.hdds.security.token;

+ 1 - 1
hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificates/TestCertificateSignRequest.java

@@ -18,8 +18,8 @@
 package org.apache.hadoop.hdds.security.x509.certificates;
 package org.apache.hadoop.hdds.security.x509.certificates;
 
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
-import org.apache.hadoop.hdds.security.x509.exceptions.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
 import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
 import org.apache.hadoop.hdds.security.x509.keys.SecurityUtil;
 import org.apache.hadoop.hdds.security.x509.keys.SecurityUtil;
 import org.bouncycastle.asn1.ASN1Sequence;
 import org.bouncycastle.asn1.ASN1Sequence;

+ 1 - 1
hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificates/TestRootCertificate.java

@@ -20,8 +20,8 @@
 package org.apache.hadoop.hdds.security.x509.certificates;
 package org.apache.hadoop.hdds.security.x509.certificates;
 
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
-import org.apache.hadoop.hdds.security.x509.exceptions.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
 import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
 import org.bouncycastle.asn1.x509.Extension;
 import org.bouncycastle.asn1.x509.Extension;
 import org.bouncycastle.cert.X509CertificateHolder;
 import org.bouncycastle.cert.X509CertificateHolder;

+ 74 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ServerCredentialInterceptor.java

@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server;
+
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.token.TokenVerifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ratis.thirdparty.io.grpc.Context;
+import org.apache.ratis.thirdparty.io.grpc.Contexts;
+import org.apache.ratis.thirdparty.io.grpc.Metadata;
+import org.apache.ratis.thirdparty.io.grpc.ServerCall;
+import org.apache.ratis.thirdparty.io.grpc.ServerCallHandler;
+import org.apache.ratis.thirdparty.io.grpc.ServerInterceptor;
+import org.apache.ratis.thirdparty.io.grpc.Status;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OBT_METADATA_KEY;
+import static org.apache.hadoop.ozone.OzoneConsts.USER_METADATA_KEY;
+import static org.apache.hadoop.ozone.OzoneConsts.UGI_CTX_KEY;
+/**
+ * Grpc Server Interceptor for Ozone Block token.
+ */
+public class ServerCredentialInterceptor implements ServerInterceptor {
+
+
+  private static final ServerCall.Listener NOOP_LISTENER =
+      new ServerCall.Listener() {
+  };
+
+  private final TokenVerifier verifier;
+
+  ServerCredentialInterceptor(TokenVerifier verifier) {
+    this.verifier = verifier;
+  }
+
+  @Override
+  public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
+      ServerCall<ReqT, RespT> call, Metadata headers,
+      ServerCallHandler<ReqT, RespT> next) {
+    String token = headers.get(OBT_METADATA_KEY);
+    String user = headers.get(USER_METADATA_KEY);
+    Context ctx = Context.current();
+    try {
+      UserGroupInformation ugi = verifier.verify(user, token);
+      if (ugi == null) {
+        call.close(Status.UNAUTHENTICATED.withDescription("Missing Block " +
+            "Token from headers when block token is required."), headers);
+        return NOOP_LISTENER;
+      } else {
+        ctx = ctx.withValue(UGI_CTX_KEY, ugi);
+      }
+    } catch (SCMSecurityException e) {
+      call.close(Status.UNAUTHENTICATED.withDescription(e.getMessage())
+          .withCause(e), headers);
+      return NOOP_LISTENER;
+    }
+    return Contexts.interceptCall(ctx, call, headers, next);
+  }
+}

+ 21 - 2
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java

@@ -18,6 +18,7 @@
 
 
 package org.apache.hadoop.ozone.container.common.transport.server;
 package org.apache.hadoop.ozone.container.common.transport.server;
 
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -30,6 +31,9 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.container.common.helpers.
 import org.apache.hadoop.hdds.scm.container.common.helpers.
     StorageContainerException;
     StorageContainerException;
+import org.apache.hadoop.hdds.security.token.BlockTokenVerifier;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@@ -37,6 +41,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import org.apache.ratis.thirdparty.io.grpc.BindableService;
 import org.apache.ratis.thirdparty.io.grpc.BindableService;
 import org.apache.ratis.thirdparty.io.grpc.Server;
 import org.apache.ratis.thirdparty.io.grpc.Server;
 import org.apache.ratis.thirdparty.io.grpc.ServerBuilder;
 import org.apache.ratis.thirdparty.io.grpc.ServerBuilder;
+import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors;
 import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
 import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
@@ -92,8 +97,16 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
         DatanodeDetails.newPort(DatanodeDetails.Port.Name.STANDALONE, port));
         DatanodeDetails.newPort(DatanodeDetails.Port.Name.STANDALONE, port));
     NettyServerBuilder nettyServerBuilder =
     NettyServerBuilder nettyServerBuilder =
         ((NettyServerBuilder) ServerBuilder.forPort(port))
         ((NettyServerBuilder) ServerBuilder.forPort(port))
-            .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
-            .addService(new GrpcXceiverService(dispatcher));
+            .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE);
+
+    // Populate UGI context via ServerCredentialInterceptor
+    SecurityConfig secConfig = new SecurityConfig(conf);
+    ServerCredentialInterceptor credInterceptor =
+        new ServerCredentialInterceptor(
+            new BlockTokenVerifier(secConfig, getCaClient()));
+    nettyServerBuilder.addService(ServerInterceptors.intercept(
+          new GrpcXceiverService(dispatcher), credInterceptor));
+
     for (BindableService service : additionalServices) {
     for (BindableService service : additionalServices) {
       nettyServerBuilder.addService(service);
       nettyServerBuilder.addService(service);
     }
     }
@@ -101,6 +114,12 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
     storageContainer = dispatcher;
     storageContainer = dispatcher;
   }
   }
 
 
+  @VisibleForTesting
+  public CertificateClient getCaClient() {
+    // TODO: instantiate CertificateClient
+    return null;
+  }
+
   @Override
   @Override
   public int getIPCPort() {
   public int getIPCPort() {
     return this.port;
     return this.port;

+ 209 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestSecureOzoneContainer.java

@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.UUID;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests ozone containers via secure grpc/netty.
+ */
+@RunWith(Parameterized.class)
+public class TestSecureOzoneContainer {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      TestSecureOzoneContainer.class);
+  /**
+   * Set the timeout for every test.
+   */
+  @Rule
+  public Timeout testTimeout = new Timeout(300000);
+
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  private OzoneConfiguration conf;
+  private SecurityConfig secConfig;
+  private Boolean requireBlockToken;
+  private Boolean hasBlockToken;
+  private Boolean blockTokeExpired;
+
+
+  public TestSecureOzoneContainer(Boolean requireBlockToken,
+      Boolean hasBlockToken, Boolean blockTokenExpired) {
+    this.requireBlockToken = requireBlockToken;
+    this.hasBlockToken = hasBlockToken;
+    this.blockTokeExpired = blockTokenExpired;
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> blockTokenOptions() {
+    return Arrays.asList(new Object[][] {
+        {true, true, false},
+        {true, true, true},
+        {true, false, false},
+        {false, true, false},
+        {false, false, false}});
+  }
+
+  @Before
+  public void setup() throws IOException{
+    conf = new OzoneConfiguration();
+    String ozoneMetaPath =
+        GenericTestUtils.getTempPath("ozoneMeta");
+    conf.set(OZONE_METADATA_DIRS, ozoneMetaPath);
+
+    secConfig = new SecurityConfig(conf);
+
+  }
+
+  @Test
+  public void testCreateOzoneContainer() throws Exception {
+    LOG.info("Test case: requireBlockToken: {} hasBlockToken: {} " +
+        "blockTokenExpired: {}.", requireBlockToken, hasBlockToken,
+        blockTokeExpired);
+    conf.setBoolean(HddsConfigKeys.HDDS_GRPC_BLOCK_TOKEN_ENABLED,
+        requireBlockToken);
+
+    long containerID = ContainerTestHelper.getTestContainerID();
+    OzoneContainer container = null;
+    System.out.println(System.getProperties().getProperty("java.library.path"));
+    try {
+      Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline();
+      conf.set(HDDS_DATANODE_DIR_KEY, tempFolder.getRoot().getPath());
+      conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, pipeline
+          .getFirstNode().getPort(DatanodeDetails.Port.Name.STANDALONE)
+          .getValue());
+      conf.setBoolean(
+          OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
+
+      DatanodeDetails dn = TestUtils.randomDatanodeDetails();
+      container = new OzoneContainer(dn, conf, null);
+      //Setting scmId, as we start manually ozone container.
+      container.getDispatcher().setScmId(UUID.randomUUID().toString());
+      container.start();
+
+      UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
+          "user1",  new String[] {"usergroup"});
+      long expiryDate = (blockTokeExpired) ?
+          Time.now() - 60 * 60 * 2 : Time.now() + 60 * 60 * 24;
+
+      OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier(
+          "testUser", "cid:lud:bcsid",
+          EnumSet.allOf(HddsProtos.BlockTokenSecretProto.AccessModeProto.class),
+          expiryDate, "1234", 128L);
+
+      int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
+      if (port == 0) {
+        port = secConfig.getConfiguration().getInt(OzoneConfigKeys
+                .DFS_CONTAINER_IPC_PORT,
+            OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
+      }
+      InetSocketAddress addr =
+          new InetSocketAddress(dn.getIpAddress(), port);
+
+      Token<OzoneBlockTokenIdentifier> token =
+          new Token(tokenId.getBytes(), new byte[2], tokenId.getKind(),
+              SecurityUtil.buildTokenService(addr));
+      if (hasBlockToken) {
+        ugi.addToken(token);
+      }
+
+      ugi.doAs(new PrivilegedAction<Void>() {
+        @Override
+        public Void run() {
+          try {
+            XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf);
+            client.connect();
+            createContainerForTesting(client, containerID);
+          } catch (Exception e) {
+            if (requireBlockToken && hasBlockToken && !blockTokeExpired) {
+              fail("Client with BlockToken should succeed when block token is" +
+                  " required.");
+            }
+            if (requireBlockToken && hasBlockToken && blockTokeExpired) {
+              assertTrue("Receive expected exception",
+                  e instanceof SCMSecurityException);
+            }
+            if (requireBlockToken && !hasBlockToken) {
+              assertTrue("Receive expected exception", e instanceof
+                  SCMSecurityException);
+            }
+          }
+          return null;
+        }
+      });
+    } finally {
+      if (container != null) {
+        container.stop();
+      }
+    }
+  }
+
+  public static void createContainerForTesting(XceiverClientSpi client,
+      long containerID) throws Exception {
+    // Create container
+    ContainerProtos.ContainerCommandRequestProto request =
+        ContainerTestHelper.getCreateContainerRequest(
+            containerID, client.getPipeline());
+    ContainerProtos.ContainerCommandResponseProto response =
+        client.sendCommand(request);
+    Assert.assertNotNull(response);
+    Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
+  }
+}