Browse Source

HDDS-1731. Implement File CreateFile Request to use Cache and DoubleBuffer. (#1044)

Bharat Viswanadham 5 years ago
parent
commit
8965ddcf59

+ 3 - 0
hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto

@@ -684,6 +684,9 @@ message CreateFileRequest {
     required KeyArgs keyArgs = 1;
     required KeyArgs keyArgs = 1;
     required bool isRecursive = 2;
     required bool isRecursive = 2;
     required bool isOverwrite = 3;
     required bool isOverwrite = 3;
+    // Set in OM HA during preExecute step. This way all OM's use same ID in
+    // OM HA.
+    optional uint64 clientID = 4;
 }
 }
 
 
 message CreateFileResponse {
 message CreateFileResponse {

+ 158 - 17
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java

@@ -16,6 +16,23 @@
  */
  */
 package org.apache.hadoop.ozone.om;
 package org.apache.hadoop.ozone.om;
 
 
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.Timeout;
+import org.apache.log4j.Logger;
+
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.client.ReplicationType;
@@ -30,6 +47,7 @@ import org.apache.hadoop.ozone.client.BucketArgs;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -41,22 +59,7 @@ import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.VolumeArgs;
 import org.apache.hadoop.ozone.client.VolumeArgs;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Time;
-import org.apache.log4j.Logger;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.Timeout;
 
 
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
 
 
 import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl
 import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl
     .NODE_FAILURE_TIMEOUT;
     .NODE_FAILURE_TIMEOUT;
@@ -69,6 +72,9 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys
     .OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY;
     .OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY;
 import static org.apache.hadoop.ozone.OzoneConfigKeys
 import static org.apache.hadoop.ozone.OzoneConfigKeys
     .OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
     .OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_ALREADY_EXISTS;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE;
+import static org.junit.Assert.fail;
 
 
 /**
 /**
  * Test Ozone Manager operation in distributed handler scenario.
  * Test Ozone Manager operation in distributed handler scenario.
@@ -285,6 +291,141 @@ public class TestOzoneManagerHA {
 
 
   }
   }
 
 
+
+  @Test
+  public void testFileOperationsWithRecursive() throws Exception {
+    OzoneBucket ozoneBucket = setupBucket();
+
+    String data = "random data";
+
+    // one level key name
+    String keyName = UUID.randomUUID().toString();
+    testCreateFile(ozoneBucket, keyName, data, true, false);
+
+    // multi level key name
+    keyName = "dir1/dir2/dir3/file1";
+    testCreateFile(ozoneBucket, keyName, data, true, false);
+
+
+    data = "random data random data";
+
+    // multi level key name with over write set.
+    testCreateFile(ozoneBucket, keyName, data, true, true);
+
+
+    try {
+      testCreateFile(ozoneBucket, keyName, data, true, false);
+      fail("testFileOperationsWithRecursive");
+    } catch (OMException ex) {
+      Assert.assertEquals(FILE_ALREADY_EXISTS, ex.getResult());
+    }
+
+    // Try now with a file name which is same as a directory.
+    try {
+      keyName = "folder/folder2";
+      ozoneBucket.createDirectory(keyName);
+      testCreateFile(ozoneBucket, keyName, data, true, false);
+      fail("testFileOperationsWithNonRecursive");
+    } catch (OMException ex) {
+      Assert.assertEquals(NOT_A_FILE, ex.getResult());
+    }
+
+  }
+
+
+  @Test
+  public void testFileOperationsWithNonRecursive() throws Exception {
+    OzoneBucket ozoneBucket = setupBucket();
+
+    String data = "random data";
+
+    // one level key name
+    String keyName = UUID.randomUUID().toString();
+    testCreateFile(ozoneBucket, keyName, data, false, false);
+
+    // multi level key name
+    keyName = "dir1/dir2/dir3/file1";
+
+    // Should fail, as this is non-recursive and no parent directories exist
+    try {
+      testCreateFile(ozoneBucket, keyName, data, false, false);
+    } catch (OMException ex) {
+      Assert.assertEquals(NOT_A_FILE, ex.getResult());
+    }
+
+    // create directory, now this should pass.
+    ozoneBucket.createDirectory("dir1/dir2/dir3");
+    testCreateFile(ozoneBucket, keyName, data, false, false);
+    data = "random data random data";
+
+    // multi level key name with over write set.
+    testCreateFile(ozoneBucket, keyName, data, false, true);
+
+    try {
+      testCreateFile(ozoneBucket, keyName, data, false, false);
+      fail("testFileOperationsWithRecursive");
+    } catch (OMException ex) {
+      Assert.assertEquals(FILE_ALREADY_EXISTS, ex.getResult());
+    }
+
+
+    // Try now with a file which already exists under the path
+    ozoneBucket.createDirectory("folder1/folder2/folder3/folder4");
+
+    keyName = "folder1/folder2/folder3/folder4/file1";
+    testCreateFile(ozoneBucket, keyName, data, false, false);
+
+    keyName = "folder1/folder2/folder3/file1";
+    testCreateFile(ozoneBucket, keyName, data, false, false);
+
+    // Try now with a file under path already. This should fail.
+    try {
+      keyName = "folder/folder2";
+      ozoneBucket.createDirectory(keyName);
+      testCreateFile(ozoneBucket, keyName, data, false, false);
+      fail("testFileOperationsWithNonRecursive");
+    } catch (OMException ex) {
+      Assert.assertEquals(NOT_A_FILE, ex.getResult());
+    }
+
+  }
+
+  /**
+   * This method createFile and verifies the file is successfully created or
+   * not.
+   * @param ozoneBucket
+   * @param keyName
+   * @param data
+   * @param recursive
+   * @param overwrite
+   * @throws Exception
+   */
+  public void testCreateFile(OzoneBucket ozoneBucket, String keyName,
+      String data, boolean recursive, boolean overwrite)
+      throws Exception {
+
+    OzoneOutputStream ozoneOutputStream = ozoneBucket.createFile(keyName,
+        data.length(), ReplicationType.RATIS, ReplicationFactor.ONE,
+        overwrite, recursive);
+
+    ozoneOutputStream.write(data.getBytes(), 0, data.length());
+    ozoneOutputStream.close();
+
+    OzoneKeyDetails ozoneKeyDetails = ozoneBucket.getKey(keyName);
+
+    Assert.assertEquals(keyName, ozoneKeyDetails.getName());
+    Assert.assertEquals(ozoneBucket.getName(), ozoneKeyDetails.getBucketName());
+    Assert.assertEquals(ozoneBucket.getVolumeName(),
+        ozoneKeyDetails.getVolumeName());
+    Assert.assertEquals(data.length(), ozoneKeyDetails.getDataSize());
+
+    OzoneInputStream ozoneInputStream = ozoneBucket.readKey(keyName);
+
+    byte[] fileContent = new byte[data.getBytes().length];
+    ozoneInputStream.read(fileContent);
+    Assert.assertEquals(data, new String(fileContent));
+  }
+
   @Test
   @Test
   public void testMultipartUploadWithOneOmNodeDown() throws Exception {
   public void testMultipartUploadWithOneOmNodeDown() throws Exception {
 
 
@@ -437,7 +578,7 @@ public class TestOzoneManagerHA {
         Assert.assertTrue(retVolumeinfo.getAdmin().equals(adminName));
         Assert.assertTrue(retVolumeinfo.getAdmin().equals(adminName));
       } else {
       } else {
         // Verify that the request failed
         // Verify that the request failed
-        Assert.fail("There is no quorum. Request should have failed");
+        fail("There is no quorum. Request should have failed");
       }
       }
     } catch (ConnectException | RemoteException e) {
     } catch (ConnectException | RemoteException e) {
       if (!checkSuccess) {
       if (!checkSuccess) {
@@ -566,7 +707,7 @@ public class TestOzoneManagerHA {
 
 
     try {
     try {
       createVolumeTest(true);
       createVolumeTest(true);
-      Assert.fail("TestOMRetryProxy should fail when there are no OMs running");
+      fail("TestOMRetryProxy should fail when there are no OMs running");
     } catch (ConnectException e) {
     } catch (ConnectException e) {
       // Each retry attempt tries upto 10 times to connect. So there should be
       // Each retry attempt tries upto 10 times to connect. So there should be
       // 10*10 "Retrying connect to server" messages
       // 10*10 "Retrying connect to server" messages

+ 3 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.ozone.om.request.bucket.OMBucketDeleteRequest;
 import org.apache.hadoop.ozone.om.request.bucket.OMBucketSetPropertyRequest;
 import org.apache.hadoop.ozone.om.request.bucket.OMBucketSetPropertyRequest;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.request.file.OMDirectoryCreateRequest;
 import org.apache.hadoop.ozone.om.request.file.OMDirectoryCreateRequest;
+import org.apache.hadoop.ozone.om.request.file.OMFileCreateRequest;
 import org.apache.hadoop.ozone.om.request.key.OMAllocateBlockRequest;
 import org.apache.hadoop.ozone.om.request.key.OMAllocateBlockRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyCreateRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyCreateRequest;
@@ -93,6 +94,8 @@ public final class OzoneManagerRatisUtils {
       return new OMKeyRenameRequest(omRequest);
       return new OMKeyRenameRequest(omRequest);
     case CreateDirectory:
     case CreateDirectory:
       return new OMDirectoryCreateRequest(omRequest);
       return new OMDirectoryCreateRequest(omRequest);
+    case CreateFile:
+      return new OMFileCreateRequest(omRequest);
     default:
     default:
       // TODO: will update once all request types are implemented.
       // TODO: will update once all request types are implemented.
       return null;
       return null;

+ 4 - 2
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java

@@ -44,6 +44,8 @@ import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 
 
+import javax.annotation.Nonnull;
+
 /**
 /**
  * OMClientRequest provides methods which every write OM request should
  * OMClientRequest provides methods which every write OM request should
  * implement.
  * implement.
@@ -170,8 +172,8 @@ public abstract class OMClientRequest implements RequestAuditor {
    * @param ex - IOException
    * @param ex - IOException
    * @return error response need to be returned to client - OMResponse.
    * @return error response need to be returned to client - OMResponse.
    */
    */
-  protected OMResponse createErrorOMResponse(OMResponse.Builder omResponse,
-      IOException ex) {
+  protected OMResponse createErrorOMResponse(
+      @Nonnull OMResponse.Builder omResponse, @Nonnull IOException ex) {
 
 
     omResponse.setSuccess(false);
     omResponse.setSuccess(false);
     if (ex.getMessage() != null) {
     if (ex.getMessage() != null) {

+ 11 - 56
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java

@@ -19,7 +19,6 @@
 package org.apache.hadoop.ozone.om.request.file;
 package org.apache.hadoop.ozone.om.request.file;
 
 
 import java.io.IOException;
 import java.io.IOException;
-import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Collections;
@@ -67,6 +66,10 @@ import org.apache.hadoop.utils.db.cache.CacheValue;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_ALREADY_EXISTS;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_ALREADY_EXISTS;
 import static  org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
 import static  org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+import static org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryResult.DIRECTORY_EXISTS_IN_GIVENPATH;
+import static org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryResult.FILE_EXISTS_IN_GIVENPATH;
+import static org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryResult.NONE;
+import static org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryResult.FILE_EXISTS;
 /**
 /**
  * Handle create directory request.
  * Handle create directory request.
  */
  */
@@ -156,16 +159,17 @@ public class OMDirectoryCreateRequest extends OMClientRequest
 
 
       // Need to check if any files exist in the given path, if they exist we
       // Need to check if any files exist in the given path, if they exist we
       // cannot create a directory with the given key.
       // cannot create a directory with the given key.
-      OMDirectoryResult omDirectoryResult = verifyFilesInPath(omMetadataManager,
-          volumeName, bucketName, omMetadataManager.getOzoneDirKey(volumeName,
-              bucketName, keyName), Paths.get(keyName));
+      OMFileRequest.OMDirectoryResult omDirectoryResult =
+          OMFileRequest.verifyFilesInPath(omMetadataManager,
+          volumeName, bucketName, keyName, Paths.get(keyName));
 
 
-      if (omDirectoryResult == OMDirectoryResult.FILE_ALREADY_EXISTS) {
+      if (omDirectoryResult == FILE_EXISTS ||
+          omDirectoryResult == FILE_EXISTS_IN_GIVENPATH) {
         throw new OMException("Unable to create directory: " +keyName
         throw new OMException("Unable to create directory: " +keyName
             + " in volume/bucket: " + volumeName + "/" + bucketName,
             + " in volume/bucket: " + volumeName + "/" + bucketName,
             FILE_ALREADY_EXISTS);
             FILE_ALREADY_EXISTS);
-      } else if (omDirectoryResult == OMDirectoryResult.SUB_DIRECTORY_EXISTS ||
-          omDirectoryResult == OMDirectoryResult.NONE) {
+      } else if (omDirectoryResult == DIRECTORY_EXISTS_IN_GIVENPATH ||
+          omDirectoryResult == NONE) {
         dirKeyInfo = createDirectoryKeyInfo(ozoneManager, omBucketInfo,
         dirKeyInfo = createDirectoryKeyInfo(ozoneManager, omBucketInfo,
             volumeName, bucketName, keyName, keyArgs);
             volumeName, bucketName, keyName, keyArgs);
 
 
@@ -206,45 +210,6 @@ public class OMDirectoryCreateRequest extends OMClientRequest
     }
     }
   }
   }
 
 
-  /**
-   * Verify any files exist in the given path in the specified volume/bucket.
-   * @param omMetadataManager
-   * @param volumeName
-   * @param bucketName
-   * @param keyPath
-   * @return true - if file exist in the given path, else false.
-   * @throws IOException
-   */
-  private OMDirectoryResult verifyFilesInPath(
-      OMMetadataManager omMetadataManager, String volumeName, String bucketName,
-      String directoryName, Path keyPath) throws IOException {
-
-    while (keyPath != null) {
-      String keyName = keyPath.toString();
-
-      String dbKeyName = omMetadataManager.getOzoneKey(volumeName,
-          bucketName, keyName);
-      String dbDirKeyName = omMetadataManager.getOzoneDirKey(volumeName,
-          bucketName, keyName);
-
-      if (omMetadataManager.getKeyTable().get(dbKeyName) != null) {
-        // Found a file in the given path.
-        return OMDirectoryResult.FILE_ALREADY_EXISTS;
-      } else if (omMetadataManager.getKeyTable().get(dbDirKeyName) != null) {
-        if (dbDirKeyName.equals(directoryName)) {
-          return OMDirectoryResult.DIRECTORY_ALREADY_EXISTS;
-        } else {
-          return OMDirectoryResult.SUB_DIRECTORY_EXISTS;
-        }
-      }
-      keyPath = keyPath.getParent();
-    }
-
-    // Found no files/ directories in the given path.
-    return OMDirectoryResult.NONE;
-  }
-
-
   private OmKeyInfo createDirectoryKeyInfo(OzoneManager ozoneManager,
   private OmKeyInfo createDirectoryKeyInfo(OzoneManager ozoneManager,
       OmBucketInfo omBucketInfo, String volumeName, String bucketName,
       OmBucketInfo omBucketInfo, String volumeName, String bucketName,
       String keyName, KeyArgs keyArgs)
       String keyName, KeyArgs keyArgs)
@@ -269,14 +234,4 @@ public class OMDirectoryCreateRequest extends OMClientRequest
         .build();
         .build();
   }
   }
 
 
-  /**
-   * Return codes used by verifyFilesInPath method.
-   */
-  enum OMDirectoryResult {
-    DIRECTORY_ALREADY_EXISTS,
-    FILE_ALREADY_EXISTS,
-    SUB_DIRECTORY_EXISTS,
-    NONE
-  }
-
 }
 }

+ 349 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java

@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.file;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.request.key.OMKeyCreateRequest;
+import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .CreateFileRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .KeyArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMRequest;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.UniqueId;
+import org.apache.hadoop.utils.db.Table;
+import org.apache.hadoop.utils.db.TableIterator;
+import org.apache.hadoop.utils.db.cache.CacheKey;
+import org.apache.hadoop.utils.db.cache.CacheValue;
+
+
+import static org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryResult.DIRECTORY_EXISTS;
+import static org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryResult.DIRECTORY_EXISTS_IN_GIVENPATH;
+import static org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryResult.FILE_EXISTS_IN_GIVENPATH;
+import static org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryResult.FILE_EXISTS;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+import static org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryResult.NONE;
+
+/**
+ * Handles create file request.
+ */
+public class OMFileCreateRequest extends OMKeyCreateRequest
+    implements OMKeyRequest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMFileCreateRequest.class);
+  public OMFileCreateRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+
+  @Override
+  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+    CreateFileRequest createFileRequest = getOmRequest().getCreateFileRequest();
+    Preconditions.checkNotNull(createFileRequest);
+
+    KeyArgs keyArgs = createFileRequest.getKeyArgs();
+
+    if (keyArgs.getKeyName().length() == 0) {
+      // Check if this is the root of the filesystem.
+      // Not throwing exception here, as need to throw exception after
+      // checking volume/bucket exists.
+      return getOmRequest().toBuilder().setUserInfo(getUserInfo()).build();
+    }
+
+    long scmBlockSize = ozoneManager.getScmBlockSize();
+
+    // NOTE size of a key is not a hard limit on anything, it is a value that
+    // client should expect, in terms of current size of key. If client sets
+    // a value, then this value is used, otherwise, we allocate a single
+    // block which is the current size, if read by the client.
+    final long requestedSize = keyArgs.getDataSize() > 0 ?
+        keyArgs.getDataSize() : scmBlockSize;
+
+    boolean useRatis = ozoneManager.shouldUseRatis();
+
+    HddsProtos.ReplicationFactor factor = keyArgs.getFactor();
+    if (factor == null) {
+      factor = useRatis ? HddsProtos.ReplicationFactor.THREE :
+          HddsProtos.ReplicationFactor.ONE;
+    }
+
+    HddsProtos.ReplicationType type = keyArgs.getType();
+    if (type == null) {
+      type = useRatis ? HddsProtos.ReplicationType.RATIS :
+          HddsProtos.ReplicationType.STAND_ALONE;
+    }
+
+    // TODO: Here we are allocating block with out any check for
+    //  bucket/key/volume or not and also with out any authorization checks.
+
+    List< OmKeyLocationInfo > omKeyLocationInfoList =
+        allocateBlock(ozoneManager.getScmClient(),
+              ozoneManager.getBlockTokenSecretManager(), type, factor,
+              new ExcludeList(), requestedSize, scmBlockSize,
+              ozoneManager.getPreallocateBlocksMax(),
+              ozoneManager.isGrpcBlockTokenEnabled(),
+              ozoneManager.getOMNodeId());
+
+    KeyArgs.Builder newKeyArgs = keyArgs.toBuilder()
+        .setModificationTime(Time.now()).setType(type).setFactor(factor)
+        .setDataSize(requestedSize);
+
+    newKeyArgs.addAllKeyLocations(omKeyLocationInfoList.stream()
+        .map(OmKeyLocationInfo::getProtobuf).collect(Collectors.toList()));
+
+    CreateFileRequest.Builder newCreateFileRequest =
+        createFileRequest.toBuilder().setKeyArgs(newKeyArgs)
+            .setClientID(UniqueId.next());
+
+    return getOmRequest().toBuilder()
+        .setCreateFileRequest(newCreateFileRequest).setUserInfo(getUserInfo())
+        .build();
+  }
+
+  @Override
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+      long transactionLogIndex) {
+
+    CreateFileRequest createFileRequest = getOmRequest().getCreateFileRequest();
+    KeyArgs keyArgs = createFileRequest.getKeyArgs();
+
+    String volumeName = keyArgs.getVolumeName();
+    String bucketName = keyArgs.getBucketName();
+    String keyName = keyArgs.getKeyName();
+
+    // if isRecursive is true, file would be created even if parent
+    // directories does not exist.
+    boolean isRecursive = createFileRequest.getIsRecursive();
+
+    // if isOverWrite is true, file would be over written.
+    boolean isOverWrite = createFileRequest.getIsOverwrite();
+
+    OMMetrics omMetrics = ozoneManager.getMetrics();
+    omMetrics.incNumCreateFile();
+
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+
+    boolean acquiredLock = false;
+    IOException exception = null;
+    FileEncryptionInfo encryptionInfo = null;
+    OmKeyInfo omKeyInfo = null;
+
+    final List<OmKeyLocationInfo> locations = new ArrayList<>();
+
+    try {
+      // check Acl
+      if (ozoneManager.getAclsEnabled()) {
+        checkAcls(ozoneManager, OzoneObj.ResourceType.BUCKET,
+            OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE,
+            volumeName, bucketName, keyName);
+      }
+
+      // acquire lock
+      acquiredLock = omMetadataManager.getLock().acquireLock(BUCKET_LOCK,
+          volumeName, bucketName);
+
+      OmBucketInfo bucketInfo =
+          omMetadataManager.getBucketTable().get(
+              omMetadataManager.getBucketKey(volumeName, bucketName));
+
+      if (bucketInfo == null) {
+        throw new OMException("Bucket " + bucketName + " not found",
+            OMException.ResultCodes.BUCKET_NOT_FOUND);
+      }
+
+      if (keyName.length() == 0) {
+        // Check if this is the root of the filesystem.
+        throw new OMException("Can not write to directory: " + keyName,
+            OMException.ResultCodes.NOT_A_FILE);
+      }
+
+      OMFileRequest.OMDirectoryResult omDirectoryResult =
+          OMFileRequest.verifyFilesInPath(omMetadataManager, volumeName,
+              bucketName, keyName, Paths.get(keyName));
+
+      // Check if a file or directory exists with same key name.
+      if (omDirectoryResult == FILE_EXISTS) {
+        if (!isOverWrite) {
+          throw new OMException("File " + keyName + " already exists",
+              OMException.ResultCodes.FILE_ALREADY_EXISTS);
+        }
+      } else if (omDirectoryResult == DIRECTORY_EXISTS) {
+        throw new OMException("Can not write to directory: " + keyName,
+            OMException.ResultCodes.NOT_A_FILE);
+      } else if (omDirectoryResult == FILE_EXISTS_IN_GIVENPATH) {
+        throw new OMException("Can not create file: " + keyName + "as there " +
+            "is already file in the given path",
+            OMException.ResultCodes.NOT_A_FILE);
+      }
+
+      if (!isRecursive) {
+        // We cannot create a file if complete parent directories does not exist
+
+        // verifyFilesInPath, checks only the path and its parent directories.
+        // But there may be some keys below the given path. So this method
+        // checks them.
+
+        // Example:
+        // Existing keys in table
+        // a/b/c/d/e
+        // a/b/c/d/f
+        // a/b
+
+        // Take an example if given key to be created with isRecursive set
+        // to false is "a/b/c/e".
+
+        // There is no key in keyTable with the provided path.
+        // Check in case if there are keys exist in given path. (This can
+        // happen if keys are directly created using key requests.)
+
+        // We need to do this check only in the case of non-recursive, so
+        // not included the checks done in checkKeysUnderPath in
+        // verifyFilesInPath method, as that method is common method for
+        // directory and file create request. This also avoid's this
+        // unnecessary check which is not required for those cases.
+        if (omDirectoryResult == NONE ||
+            omDirectoryResult == DIRECTORY_EXISTS_IN_GIVENPATH) {
+          boolean canBeCreated = checkKeysUnderPath(omMetadataManager,
+              volumeName, bucketName, keyName);
+          if (!canBeCreated) {
+            throw new OMException("Can not create file: " + keyName + "as one" +
+                " of parent directory is not created",
+                OMException.ResultCodes.NOT_A_FILE);
+          }
+        }
+      }
+
+      // do open key
+      encryptionInfo = getFileEncryptionInfo(ozoneManager, bucketInfo);
+      omKeyInfo = prepareKeyInfo(omMetadataManager, keyArgs,
+          omMetadataManager.getOzoneKey(volumeName, bucketName,
+              keyName), keyArgs.getDataSize(), locations, encryptionInfo);
+
+    } catch (IOException ex) {
+      exception = ex;
+    } finally {
+      if (acquiredLock) {
+        omMetadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+            bucketName);
+      }
+    }
+
+    return prepareCreateKeyResponse(keyArgs, omKeyInfo, locations,
+        encryptionInfo, exception, createFileRequest.getClientID(),
+        transactionLogIndex, volumeName, bucketName, keyName, ozoneManager,
+        OMAction.CREATE_FILE);
+  }
+
+
+
+  /**
+   * Check if any keys exist under given path.
+   * @param omMetadataManager
+   * @param volumeName
+   * @param bucketName
+   * @param keyName
+   * @return if exists true, else false. If key name is one level path return
+   * true.
+   * @throws IOException
+   */
+  private boolean checkKeysUnderPath(OMMetadataManager omMetadataManager,
+      @Nonnull String volumeName, @Nonnull String bucketName,
+      @Nonnull String keyName) throws IOException {
+
+    Path parentPath =  Paths.get(keyName).getParent();
+
+    if (parentPath != null) {
+      String dbKeyPath = omMetadataManager.getOzoneDirKey(volumeName,
+          bucketName, parentPath.toString());
+
+      // First check in key table cache.
+      Iterator< Map.Entry<CacheKey<String>, CacheValue<OmKeyInfo>>> iterator =
+          omMetadataManager.getKeyTable().cacheIterator();
+
+      while (iterator.hasNext()) {
+        Map.Entry< CacheKey< String >, CacheValue< OmKeyInfo > > entry =
+            iterator.next();
+        String key = entry.getKey().getCacheKey();
+        OmKeyInfo omKeyInfo = entry.getValue().getCacheValue();
+        // Making sure that entry is not for delete key request.
+        if (key.startsWith(dbKeyPath) && omKeyInfo != null) {
+          return true;
+        }
+      }
+      try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
+               keyIter = omMetadataManager.getKeyTable().iterator()) {
+        Table.KeyValue<String, OmKeyInfo> kv = keyIter.seek(dbKeyPath);
+
+
+        if (kv != null) {
+          // Check the entry in db is not marked for delete. This can happen
+          // while entry is marked for delete, but it is not flushed to DB.
+          CacheValue<OmKeyInfo> cacheValue = omMetadataManager.getKeyTable()
+              .getCacheValue(new CacheKey<>(kv.getKey()));
+          if (cacheValue != null) {
+            if (kv.getKey().startsWith(dbKeyPath)
+                && cacheValue.getCacheValue() != null) {
+              return true; // we found at least one key with this db key path
+            }
+          } else {
+            if (kv.getKey().startsWith(dbKeyPath)) {
+              return true; // we found at least one key with this db key path
+            }
+          }
+        }
+      }
+    } else {
+      // one level key path.
+      // We can safely return true, as this method is called after
+      // verifyFilesInPath, so with this keyName there is no file and directory.
+      return true;
+    }
+    return false;
+  }
+}

+ 116 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileRequest.java

@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.file;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Base class for file requests.
+ */
+public final class OMFileRequest {
+
+  private OMFileRequest() {
+  }
+  /**
+   * Verify any files exist in the given path in the specified volume/bucket.
+   * @param omMetadataManager
+   * @param volumeName
+   * @param bucketName
+   * @param keyPath
+   * @return true - if file exist in the given path, else false.
+   * @throws IOException
+   */
+  public static OMDirectoryResult verifyFilesInPath(
+      @Nonnull OMMetadataManager omMetadataManager,
+      @Nonnull String volumeName,
+      @Nonnull String bucketName, @Nonnull String keyName,
+      @Nonnull Path keyPath) throws IOException {
+
+    String fileNameFromDetails = omMetadataManager.getOzoneKey(volumeName,
+        bucketName, keyName);
+    String dirNameFromDetails = omMetadataManager.getOzoneDirKey(volumeName,
+        bucketName, keyName);
+
+    while (keyPath != null) {
+      String pathName = keyPath.toString();
+
+      String dbKeyName = omMetadataManager.getOzoneKey(volumeName,
+          bucketName, pathName);
+      String dbDirKeyName = omMetadataManager.getOzoneDirKey(volumeName,
+          bucketName, pathName);
+
+      if (omMetadataManager.getKeyTable().get(dbKeyName) != null) {
+        // Found a file in the given path.
+        // Check if this is actual file or a file in the given path
+        if (dbKeyName.equals(fileNameFromDetails)) {
+          return OMDirectoryResult.FILE_EXISTS;
+        } else {
+          return OMDirectoryResult.FILE_EXISTS_IN_GIVENPATH;
+        }
+      } else if (omMetadataManager.getKeyTable().get(dbDirKeyName) != null) {
+        // Found a directory in the given path.
+        // Check if this is actual directory or a directory in the given path
+        if (dbDirKeyName.equals(dirNameFromDetails)) {
+          return OMDirectoryResult.DIRECTORY_EXISTS;
+        } else {
+          return OMDirectoryResult.DIRECTORY_EXISTS_IN_GIVENPATH;
+        }
+      }
+      keyPath = keyPath.getParent();
+    }
+
+    // Found no files/ directories in the given path.
+    return OMDirectoryResult.NONE;
+  }
+
+  /**
+   * Return codes used by verifyFilesInPath method.
+   */
+  enum OMDirectoryResult {
+
+    // In below examples path is assumed as "a/b/c" in volume volume1 and
+    // bucket b1.
+
+    // When a directory exists in given path.
+    // If we have a directory with name "a/b" we return this enum value.
+    DIRECTORY_EXISTS_IN_GIVENPATH,
+
+    // When a file exists in given path.
+    // If we have a file with name "a/b" we return this enum value.
+    FILE_EXISTS_IN_GIVENPATH,
+
+    // When file already exists with the given path.
+    // If we have a file with name "a/b/c" we return this enum value.
+    FILE_EXISTS,
+
+    // When directory exists with the given path.
+    // If we have a file with name "a/b/c" we return this enum value.
+    DIRECTORY_EXISTS,
+
+    // If no file/directory exists with the given path.
+    // If we don't have any file/directory name with "a/b/c" or any
+    // sub-directory or file name from the given path we return this enum value.
+    NONE
+  }
+}

+ 163 - 83
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java

@@ -24,6 +24,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 
 import com.google.common.base.Optional;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Preconditions;
@@ -34,18 +36,18 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
-import org.apache.hadoop.ozone.audit.AuditLogger;
 import org.apache.hadoop.ozone.audit.OMAction;
 import org.apache.hadoop.ozone.audit.OMAction;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.response.file.OMFileCreateResponse;
 import org.apache.hadoop.ozone.om.response.key.OMKeyCreateResponse;
 import org.apache.hadoop.ozone.om.response.key.OMKeyCreateResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -66,7 +68,8 @@ import org.apache.hadoop.utils.db.cache.CacheKey;
 import org.apache.hadoop.utils.db.cache.CacheValue;
 import org.apache.hadoop.utils.db.cache.CacheValue;
 
 
 import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
 import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
-
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.CreateKey;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.CreateFile;
 /**
 /**
  * Handles CreateKey request.
  * Handles CreateKey request.
  */
  */
@@ -162,7 +165,6 @@ public class OMKeyCreateRequest extends OMClientRequest
 
 
     KeyArgs keyArgs = createKeyRequest.getKeyArgs();
     KeyArgs keyArgs = createKeyRequest.getKeyArgs();
 
 
-
     String volumeName = keyArgs.getVolumeName();
     String volumeName = keyArgs.getVolumeName();
     String bucketName = keyArgs.getBucketName();
     String bucketName = keyArgs.getBucketName();
     String keyName = keyArgs.getKeyName();
     String keyName = keyArgs.getKeyName();
@@ -170,14 +172,12 @@ public class OMKeyCreateRequest extends OMClientRequest
     OMMetrics omMetrics = ozoneManager.getMetrics();
     OMMetrics omMetrics = ozoneManager.getMetrics();
     omMetrics.incNumKeyAllocates();
     omMetrics.incNumKeyAllocates();
 
 
-    AuditLogger auditLogger = ozoneManager.getAuditLogger();
-
-    Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgs);
-
-    OMResponse.Builder omResponse = OMResponse.newBuilder().setCmdType(
-            OzoneManagerProtocolProtos.Type.CreateKey).setStatus(
-            OzoneManagerProtocolProtos.Status.OK).setSuccess(true);
-
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    OmKeyInfo omKeyInfo = null;
+    final List< OmKeyLocationInfo > locations = new ArrayList<>();
+    FileEncryptionInfo encryptionInfo = null;
+    IOException exception = null;
+    boolean acquireLock = false;
     try {
     try {
       // check Acl
       // check Acl
       if (ozoneManager.getAclsEnabled()) {
       if (ozoneManager.getAclsEnabled()) {
@@ -185,50 +185,70 @@ public class OMKeyCreateRequest extends OMClientRequest
             OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE,
             OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE,
             volumeName, bucketName, keyName);
             volumeName, bucketName, keyName);
       }
       }
-    } catch (IOException ex) {
-      LOG.error("Open failed for Key: {} in volume/bucket:{}/{}",
-          keyName, bucketName, volumeName, ex);
-      omMetrics.incNumKeyAllocateFails();
-      auditLog(auditLogger, buildAuditMessage(OMAction.ALLOCATE_KEY, auditMap,
-          ex, getOmRequest().getUserInfo()));
-      return new OMKeyCreateResponse(null, -1L,
-          createErrorOMResponse(omResponse, ex));
-    }
-
-    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
-    String dbOpenKeyName = omMetadataManager.getOpenKey(volumeName,
-        bucketName, keyName, createKeyRequest.getClientID());
-    String dbKeyName = omMetadataManager.getOzoneKey(volumeName, bucketName,
-        keyName);
-    String dbBucketKey = omMetadataManager.getBucketKey(volumeName, bucketName);
 
 
-    OmKeyInfo omKeyInfo = null;
-    final List< OmKeyLocationInfo > locations = new ArrayList<>();
-    FileEncryptionInfo encryptionInfo = null;
-    long openVersion = 0L;
-    IOException exception = null;
-    omMetadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName,
-        bucketName);
-    try {
+      acquireLock = omMetadataManager.getLock().acquireLock(BUCKET_LOCK,
+          volumeName, bucketName);
       validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
       validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
       //TODO: We can optimize this get here, if getKmsProvider is null, then
       //TODO: We can optimize this get here, if getKmsProvider is null, then
       // bucket encryptionInfo will be not set. If this assumption holds
       // bucket encryptionInfo will be not set. If this assumption holds
       // true, we can avoid get from bucket table.
       // true, we can avoid get from bucket table.
-      OmBucketInfo bucketInfo =
-          omMetadataManager.getBucketTable().get(dbBucketKey);
+
+      OmBucketInfo bucketInfo = omMetadataManager.getBucketTable().get(
+              omMetadataManager.getBucketKey(volumeName, bucketName));
+
       encryptionInfo = getFileEncryptionInfo(ozoneManager, bucketInfo);
       encryptionInfo = getFileEncryptionInfo(ozoneManager, bucketInfo);
-      omKeyInfo = prepareKeyInfo(omMetadataManager, keyArgs, dbKeyName,
+
+      omKeyInfo = prepareKeyInfo(omMetadataManager, keyArgs,
+          omMetadataManager.getOzoneKey(volumeName, bucketName, keyName),
           keyArgs.getDataSize(), locations, encryptionInfo);
           keyArgs.getDataSize(), locations, encryptionInfo);
+
     } catch (IOException ex) {
     } catch (IOException ex) {
       LOG.error("Key open failed for volume:{} bucket:{} key:{}",
       LOG.error("Key open failed for volume:{} bucket:{} key:{}",
           volumeName, bucketName, keyName, ex);
           volumeName, bucketName, keyName, ex);
       exception = ex;
       exception = ex;
     } finally {
     } finally {
-      omMetadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
-          bucketName);
+      if (acquireLock) {
+        omMetadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+            bucketName);
+      }
     }
     }
 
 
+    return prepareCreateKeyResponse(keyArgs, omKeyInfo, locations,
+        encryptionInfo, exception, createKeyRequest.getClientID(),
+        transactionLogIndex, volumeName, bucketName, keyName, ozoneManager,
+        OMAction.ALLOCATE_KEY);
+  }
+
+  /**
+   * Prepare the response returned to the client.
+   * @param keyArgs
+   * @param omKeyInfo
+   * @param locations
+   * @param encryptionInfo
+   * @param exception
+   * @param clientID
+   * @param transactionLogIndex
+   * @param volumeName
+   * @param bucketName
+   * @param keyName
+   * @param ozoneManager
+   * @return OMClientResponse
+   */
+  @SuppressWarnings("parameternumber")
+  protected OMClientResponse prepareCreateKeyResponse(@Nonnull KeyArgs keyArgs,
+      OmKeyInfo omKeyInfo, @Nonnull List<OmKeyLocationInfo> locations,
+      FileEncryptionInfo encryptionInfo, @Nullable IOException exception,
+      long clientID, long transactionLogIndex, @Nonnull String volumeName,
+      @Nonnull String bucketName, @Nonnull String keyName,
+      @Nonnull OzoneManager ozoneManager, @Nonnull OMAction omAction) {
+
+    OMResponse.Builder omResponse = OMResponse.newBuilder().setStatus(
+        OzoneManagerProtocolProtos.Status.OK);
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
 
 
+    Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgs);
+
+    OMClientResponse omClientResponse = null;
     if (exception == null) {
     if (exception == null) {
       if (omKeyInfo == null) {
       if (omKeyInfo == null) {
         // the key does not exist, create a new object, the new blocks are the
         // the key does not exist, create a new object, the new blocks are the
@@ -238,59 +258,103 @@ public class OMKeyCreateRequest extends OMClientRequest
             encryptionInfo);
             encryptionInfo);
       }
       }
 
 
-      openVersion = omKeyInfo.getLatestVersionLocations().getVersion();
+      long openVersion = omKeyInfo.getLatestVersionLocations().getVersion();
 
 
+      // Append blocks
       try {
       try {
         omKeyInfo.appendNewBlocks(keyArgs.getKeyLocationsList().stream()
         omKeyInfo.appendNewBlocks(keyArgs.getKeyLocationsList().stream()
             .map(OmKeyLocationInfo::getFromProtobuf)
             .map(OmKeyLocationInfo::getFromProtobuf)
             .collect(Collectors.toList()), false);
             .collect(Collectors.toList()), false);
 
 
       } catch (IOException ex) {
       } catch (IOException ex) {
-        LOG.error("Open failed for Key: {} in volume/bucket:{}/{}",
-            keyName, bucketName, volumeName, ex);
-        omMetrics.incNumKeyAllocateFails();
-        auditLog(auditLogger, buildAuditMessage(OMAction.ALLOCATE_KEY, auditMap,
-            ex, getOmRequest().getUserInfo()));
-        return new OMKeyCreateResponse(null, -1L,
-            createErrorOMResponse(omResponse, ex));
+        exception = ex;
       }
       }
 
 
-      // Add to cache entry can be done outside of lock for this openKey.
-      // Even if bucket gets deleted, when commitKey we shall identify if
-      // bucket gets deleted.
-      omMetadataManager.getOpenKeyTable().addCacheEntry(
-          new CacheKey<>(dbOpenKeyName),
-          new CacheValue<>(Optional.of(omKeyInfo), transactionLogIndex));
-
-      LOG.debug("Key {} allocated in volume/bucket: {}/{}", keyName, volumeName,
-          bucketName);
-
-      auditLog(auditLogger, buildAuditMessage(OMAction.ALLOCATE_KEY, auditMap,
-          exception, getOmRequest().getUserInfo()));
-
-      long clientID = createKeyRequest.getClientID();
-
-      omResponse.setCreateKeyResponse(CreateKeyResponse.newBuilder()
-          .setKeyInfo(omKeyInfo.getProtobuf())
-          .setID(clientID).setOpenVersion(openVersion)
-          .build());
+      if (exception != null) {
+        LOG.error("{} failed for Key: {} in volume/bucket:{}/{}",
+            omAction.getAction(), keyName, bucketName, volumeName, exception);
+        omClientResponse = createKeyErrorResponse(ozoneManager.getMetrics(),
+            omAction, exception, omResponse);
+      } else {
+        String dbOpenKeyName = omMetadataManager.getOpenKey(volumeName,
+            bucketName, keyName, clientID);
+
+        // Add to cache entry can be done outside of lock for this openKey.
+        // Even if bucket gets deleted, when commitKey we shall identify if
+        // bucket gets deleted.
+        omMetadataManager.getOpenKeyTable().addCacheEntry(
+            new CacheKey<>(dbOpenKeyName),
+            new CacheValue<>(Optional.of(omKeyInfo), transactionLogIndex));
+
+        LOG.debug("{} for Key: {} in volume/bucket: {}/{}",
+            omAction.getAction(), keyName, volumeName, bucketName);
+
+
+        if (omAction == OMAction.CREATE_FILE) {
+          ozoneManager.getMetrics().incNumCreateFile();
+          omResponse.setCreateFileResponse(
+              OzoneManagerProtocolProtos.CreateFileResponse.newBuilder()
+                  .setKeyInfo(omKeyInfo.getProtobuf())
+                  .setID(clientID)
+                  .setOpenVersion(openVersion).build());
+          omResponse.setCmdType(OzoneManagerProtocolProtos.Type.CreateFile);
+          omClientResponse = new OMFileCreateResponse(omKeyInfo, clientID,
+              omResponse.build());
+        } else {
+          ozoneManager.getMetrics().incNumKeyAllocates();
+          omResponse.setCreateKeyResponse(CreateKeyResponse.newBuilder()
+              .setKeyInfo(omKeyInfo.getProtobuf())
+              .setID(clientID).setOpenVersion(openVersion)
+              .build());
+          omResponse.setCmdType(OzoneManagerProtocolProtos.Type.CreateKey);
+          omClientResponse = new OMKeyCreateResponse(omKeyInfo, clientID,
+            omResponse.build());
+        }
+      }
 
 
-      return new OMKeyCreateResponse(omKeyInfo, clientID, omResponse.build());
+    } else {
+      LOG.error("{} failed for Key: {} in volume/bucket:{}/{}",
+          omAction.getAction(), keyName, volumeName, bucketName, exception);
+      omClientResponse = createKeyErrorResponse(ozoneManager.getMetrics(),
+          omAction, exception, omResponse);
+    }
+    // audit log
+    auditLog(ozoneManager.getAuditLogger(), buildAuditMessage(omAction,
+        auditMap, exception, getOmRequest().getUserInfo()));
+    return omClientResponse;
+  }
 
 
+  private OMClientResponse createKeyErrorResponse(@Nonnull OMMetrics omMetrics,
+      @Nonnull OMAction omAction, @Nonnull IOException exception,
+      @Nonnull OMResponse.Builder omResponse) {
+    if (omAction == OMAction.CREATE_FILE) {
+      omMetrics.incNumCreateFileFails();
+      omResponse.setCmdType(CreateFile);
+      return new OMFileCreateResponse(null, -1L,
+          createErrorOMResponse(omResponse, exception));
     } else {
     } else {
-      auditLog(auditLogger, buildAuditMessage(OMAction.ALLOCATE_KEY, auditMap,
-          exception, getOmRequest().getUserInfo()));
-      LOG.error("Open failed for Key: {} in volume/bucket:{}/{}",
-          keyName, bucketName, volumeName, exception);
       omMetrics.incNumKeyAllocateFails();
       omMetrics.incNumKeyAllocateFails();
+      omResponse.setCmdType(CreateKey);
       return new OMKeyCreateResponse(null, -1L,
       return new OMKeyCreateResponse(null, -1L,
           createErrorOMResponse(omResponse, exception));
           createErrorOMResponse(omResponse, exception));
     }
     }
   }
   }
 
 
-  private OmKeyInfo prepareKeyInfo(OMMetadataManager omMetadataManager,
-      KeyArgs keyArgs, String dbKeyName, long size,
-      List<OmKeyLocationInfo> locations, FileEncryptionInfo encInfo)
+  /**
+   * Prepare OmKeyInfo which will be persisted to openKeyTable.
+   * @param omMetadataManager
+   * @param keyArgs
+   * @param dbKeyName
+   * @param size
+   * @param locations
+   * @param encInfo
+   * @return OmKeyInfo
+   * @throws IOException
+   */
+  protected OmKeyInfo prepareKeyInfo(
+      @Nonnull OMMetadataManager omMetadataManager,
+      @Nonnull KeyArgs keyArgs, @Nonnull String dbKeyName, long size,
+      @Nonnull List<OmKeyLocationInfo> locations, FileEncryptionInfo encInfo)
       throws IOException {
       throws IOException {
     OmKeyInfo keyInfo = null;
     OmKeyInfo keyInfo = null;
     if (keyArgs.getIsMultipartKey()) {
     if (keyArgs.getIsMultipartKey()) {
@@ -313,8 +377,21 @@ public class OMKeyCreateRequest extends OMClientRequest
     return keyInfo;
     return keyInfo;
   }
   }
 
 
-  private OmKeyInfo prepareMultipartKeyInfo(OMMetadataManager omMetadataManager,
-      KeyArgs args, long size, List<OmKeyLocationInfo> locations,
+  /**
+   * Prepare OmKeyInfo for multi-part upload part key which will be persisted
+   * to openKeyTable.
+   * @param omMetadataManager
+   * @param args
+   * @param size
+   * @param locations
+   * @param encInfo
+   * @return OmKeyInfo
+   * @throws IOException
+   */
+  private OmKeyInfo prepareMultipartKeyInfo(
+      @Nonnull OMMetadataManager omMetadataManager,
+      @Nonnull KeyArgs args, long size,
+      @Nonnull List<OmKeyLocationInfo> locations,
       FileEncryptionInfo encInfo) throws IOException {
       FileEncryptionInfo encInfo) throws IOException {
     HddsProtos.ReplicationFactor factor;
     HddsProtos.ReplicationFactor factor;
     HddsProtos.ReplicationType type;
     HddsProtos.ReplicationType type;
@@ -353,11 +430,13 @@ public class OMKeyCreateRequest extends OMClientRequest
    * @param type
    * @param type
    * @param size
    * @param size
    * @param encInfo
    * @param encInfo
-   * @return
+   * @return OmKeyInfo
    */
    */
-  private OmKeyInfo createKeyInfo(KeyArgs keyArgs,
-      List<OmKeyLocationInfo> locations, HddsProtos.ReplicationFactor factor,
-      HddsProtos.ReplicationType type, long size, FileEncryptionInfo encInfo) {
+  private OmKeyInfo createKeyInfo(@Nonnull KeyArgs keyArgs,
+      @Nonnull List<OmKeyLocationInfo> locations,
+      @Nonnull HddsProtos.ReplicationFactor factor,
+      @Nonnull HddsProtos.ReplicationType type, long size,
+      FileEncryptionInfo encInfo) {
     OmKeyInfo.Builder builder = new OmKeyInfo.Builder()
     OmKeyInfo.Builder builder = new OmKeyInfo.Builder()
         .setVolumeName(keyArgs.getVolumeName())
         .setVolumeName(keyArgs.getVolumeName())
         .setBucketName(keyArgs.getBucketName())
         .setBucketName(keyArgs.getBucketName())
@@ -375,4 +454,5 @@ public class OMKeyCreateRequest extends OMClientRequest
     }
     }
     return builder.build();
     return builder.build();
   }
   }
+
 }
 }

+ 40 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMFileCreateResponse.java

@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.file;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.response.key.OMKeyCreateResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
+
+
+
+/**
+ * Response for crate file request.
+ */
+public class OMFileCreateResponse extends OMKeyCreateResponse {
+
+  public OMFileCreateResponse(@Nullable OmKeyInfo omKeyInfo,
+      long openKeySessionID, OMResponse omResponse) {
+    super(omKeyInfo, openKeySessionID, omResponse);
+  }
+
+}

+ 5 - 4
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCreateResponse.java

@@ -18,6 +18,9 @@
 
 
 package org.apache.hadoop.ozone.om.response.key;
 package org.apache.hadoop.ozone.om.response.key;
 
 
+import java.io.IOException;
+import javax.annotation.Nullable;
+
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
@@ -26,8 +29,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMResponse;
     .OMResponse;
 import org.apache.hadoop.utils.db.BatchOperation;
 import org.apache.hadoop.utils.db.BatchOperation;
 
 
-import java.io.IOException;
-
 /**
 /**
  * Response for CreateKey request.
  * Response for CreateKey request.
  */
  */
@@ -36,8 +37,8 @@ public class OMKeyCreateResponse extends OMClientResponse {
   private OmKeyInfo omKeyInfo;
   private OmKeyInfo omKeyInfo;
   private long openKeySessionID;
   private long openKeySessionID;
 
 
-  public OMKeyCreateResponse(OmKeyInfo omKeyInfo, long openKeySessionID,
-      OMResponse omResponse) {
+  public OMKeyCreateResponse(@Nullable OmKeyInfo omKeyInfo,
+      long openKeySessionID, OMResponse omResponse) {
     super(omResponse);
     super(omResponse);
     this.omKeyInfo = omKeyInfo;
     this.omKeyInfo = omKeyInfo;
     this.openKeySessionID = openKeySessionID;
     this.openKeySessionID = openKeySessionID;

+ 1 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java

@@ -109,6 +109,7 @@ public class OzoneManagerHARequestHandlerImpl
     case DeleteKey:
     case DeleteKey:
     case RenameKey:
     case RenameKey:
     case CreateDirectory:
     case CreateDirectory:
+    case CreateFile:
       //TODO: We don't need to pass transactionID, this will be removed when
       //TODO: We don't need to pass transactionID, this will be removed when
       // complete write requests is changed to new model. And also we can
       // complete write requests is changed to new model. And also we can
       // return OMClientResponse, then adding to doubleBuffer can be taken
       // return OMClientResponse, then adding to doubleBuffer can be taken

+ 371 - 0
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMFileCreateRequest.java

@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.file;
+
+import java.util.List;
+import java.util.UUID;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.om.request.key.TestOMKeyRequest;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .CreateFileRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .KeyArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMRequest;
+
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.BUCKET_NOT_FOUND;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.FILE_ALREADY_EXISTS;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.NOT_A_FILE;
+
+/**
+ * Tests OMFileCreateRequest.
+ */
+public class TestOMFileCreateRequest extends TestOMKeyRequest {
+
+
+  @Test
+  public void testPreExecute() throws Exception{
+    OMRequest omRequest = createFileRequest(volumeName, bucketName, keyName,
+        HddsProtos.ReplicationFactor.ONE, HddsProtos.ReplicationType.RATIS,
+        false, false);
+
+    OMFileCreateRequest omFileCreateRequest =
+        new OMFileCreateRequest(omRequest);
+
+    OMRequest modifiedOmRequest = omFileCreateRequest.preExecute(ozoneManager);
+    Assert.assertNotEquals(omRequest, modifiedOmRequest);
+
+
+    // Check clientID and modification time is set or not.
+    Assert.assertTrue(modifiedOmRequest.hasCreateFileRequest());
+    Assert.assertTrue(
+        modifiedOmRequest.getCreateFileRequest().getClientID() > 0);
+
+    KeyArgs keyArgs = modifiedOmRequest.getCreateFileRequest().getKeyArgs();
+    Assert.assertNotNull(keyArgs);
+    Assert.assertTrue(keyArgs.getModificationTime() > 0);
+
+    // As our data size is 100, and scmBlockSize is default to 1000, so we
+    // shall have only one block.
+    List< OzoneManagerProtocolProtos.KeyLocation> keyLocations =
+        keyArgs.getKeyLocationsList();
+
+    // KeyLocation should be set.
+    Assert.assertTrue(keyLocations.size() == 1);
+    Assert.assertEquals(containerID,
+        keyLocations.get(0).getBlockID().getContainerBlockID()
+            .getContainerID());
+    Assert.assertEquals(localID,
+        keyLocations.get(0).getBlockID().getContainerBlockID()
+            .getLocalID());
+    Assert.assertTrue(keyLocations.get(0).hasPipeline());
+
+    Assert.assertEquals(0, keyLocations.get(0).getOffset());
+
+    Assert.assertEquals(scmBlockSize, keyLocations.get(0).getLength());
+  }
+
+  @Test
+  public void testPreExecuteWithBlankKey() throws Exception{
+    OMRequest omRequest = createFileRequest(volumeName, bucketName, "",
+        HddsProtos.ReplicationFactor.ONE, HddsProtos.ReplicationType.RATIS,
+        false, false);
+
+    OMFileCreateRequest omFileCreateRequest = new OMFileCreateRequest(
+        omRequest);
+
+    OMRequest modifiedOmRequest = omFileCreateRequest.preExecute(ozoneManager);
+    Assert.assertNotEquals(omRequest, modifiedOmRequest);
+
+
+    // When KeyName is root, nothing will be set.
+    Assert.assertTrue(modifiedOmRequest.hasCreateFileRequest());
+    Assert.assertFalse(
+        modifiedOmRequest.getCreateFileRequest().getClientID() > 0);
+
+    KeyArgs keyArgs = modifiedOmRequest.getCreateFileRequest().getKeyArgs();
+    Assert.assertNotNull(keyArgs);
+    Assert.assertTrue(keyArgs.getModificationTime() == 0);
+    Assert.assertTrue(keyArgs.getKeyLocationsList().size() == 0);
+  }
+
+  @Test
+  public void testValidateAndUpdateCache() throws Exception {
+    OMRequest omRequest = createFileRequest(volumeName, bucketName, keyName,
+        HddsProtos.ReplicationFactor.ONE, HddsProtos.ReplicationType.RATIS,
+        false, true);
+
+    TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+        omMetadataManager);
+    OMFileCreateRequest omFileCreateRequest = new OMFileCreateRequest(
+        omRequest);
+
+    OMRequest modifiedOmRequest = omFileCreateRequest.preExecute(ozoneManager);
+
+
+    long id = modifiedOmRequest.getCreateFileRequest().getClientID();
+
+    String openKey = omMetadataManager.getOpenKey(volumeName, bucketName,
+        keyName, id);
+
+    // Before calling
+    OmKeyInfo omKeyInfo = omMetadataManager.getOpenKeyTable().get(openKey);
+    Assert.assertNull(omKeyInfo);
+
+    omFileCreateRequest = new OMFileCreateRequest(modifiedOmRequest);
+
+    OMClientResponse omFileCreateResponse =
+        omFileCreateRequest.validateAndUpdateCache(ozoneManager, 100L);
+
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
+        omFileCreateResponse.getOMResponse().getStatus());
+
+    // Check open table whether key is added or not.
+
+    omKeyInfo = omMetadataManager.getOpenKeyTable().get(openKey);
+    Assert.assertNotNull(omKeyInfo);
+
+    List< OmKeyLocationInfo > omKeyLocationInfoList =
+        omKeyInfo.getLatestVersionLocations().getLocationList();
+    Assert.assertTrue(omKeyLocationInfoList.size() == 1);
+
+    OmKeyLocationInfo omKeyLocationInfo = omKeyLocationInfoList.get(0);
+
+    // Check modification time
+    Assert.assertEquals(modifiedOmRequest.getCreateFileRequest()
+        .getKeyArgs().getModificationTime(), omKeyInfo.getModificationTime());
+
+    Assert.assertEquals(omKeyInfo.getModificationTime(),
+        omKeyInfo.getCreationTime());
+
+
+    // Check data of the block
+    OzoneManagerProtocolProtos.KeyLocation keyLocation =
+        modifiedOmRequest.getCreateFileRequest().getKeyArgs()
+            .getKeyLocations(0);
+
+    Assert.assertEquals(keyLocation.getBlockID().getContainerBlockID()
+        .getContainerID(), omKeyLocationInfo.getContainerID());
+    Assert.assertEquals(keyLocation.getBlockID().getContainerBlockID()
+        .getLocalID(), omKeyLocationInfo.getLocalID());
+
+  }
+
+
+  @Test
+  public void testValidateAndUpdateCacheWithBucketNotFound() throws Exception {
+    OMRequest omRequest = createFileRequest(volumeName, bucketName, keyName,
+        HddsProtos.ReplicationFactor.ONE, HddsProtos.ReplicationType.RATIS,
+        false, true);
+
+    TestOMRequestUtils.addVolumeToDB(volumeName, omMetadataManager);
+    OMFileCreateRequest omFileCreateRequest = new OMFileCreateRequest(
+        omRequest);
+
+    OMRequest modifiedOmRequest = omFileCreateRequest.preExecute(ozoneManager);
+
+    omFileCreateRequest = new OMFileCreateRequest(modifiedOmRequest);
+
+
+    OMClientResponse omFileCreateResponse =
+        omFileCreateRequest.validateAndUpdateCache(ozoneManager, 100L);
+    Assert.assertEquals(BUCKET_NOT_FOUND,
+        omFileCreateResponse.getOMResponse().getStatus());
+
+  }
+
+  @Test
+  public void testValidateAndUpdateCacheWithNonRecursive() throws Exception {
+    testNonRecursivePath(UUID.randomUUID().toString(), false, false, false);
+    testNonRecursivePath("a/b", false, false, true);
+
+    // Create some child keys for the path
+    TestOMRequestUtils.addKeyToTable(false, volumeName, bucketName,
+        "a/b/c/d", 0L,  HddsProtos.ReplicationType.RATIS,
+        HddsProtos.ReplicationFactor.ONE, omMetadataManager);
+    testNonRecursivePath("a/b/c", false, false, false);
+
+    // Delete child key and add a path "a/b/ to key table
+    omMetadataManager.getKeyTable().delete(omMetadataManager.getOzoneKey(
+        volumeName, bucketName, "a/b/c/d"));
+
+
+    TestOMRequestUtils.addKeyToTable(false, volumeName, bucketName,
+        "a/b/", 0L,  HddsProtos.ReplicationType.RATIS,
+        HddsProtos.ReplicationFactor.ONE, omMetadataManager);
+    testNonRecursivePath("a/b/e", false, false, false);
+
+  }
+
+  @Test
+  public void testValidateAndUpdateCacheWithRecursive() throws Exception {
+    // Should be able to create file even if parent directories does not
+    // exist and key already exist, as this is with overwrite enabled.
+    testNonRecursivePath(UUID.randomUUID().toString(), false, false, false);
+    TestOMRequestUtils.addKeyToTable(false, volumeName, bucketName,
+        "c/d/e/f", 0L,  HddsProtos.ReplicationType.RATIS,
+        HddsProtos.ReplicationFactor.ONE, omMetadataManager);
+    testNonRecursivePath("c/d/e/f", true, true, false);
+    // Create some child keys for the path
+    TestOMRequestUtils.addKeyToTable(false, volumeName, bucketName,
+        "a/b/c/d", 0L,  HddsProtos.ReplicationType.RATIS,
+        HddsProtos.ReplicationFactor.ONE, omMetadataManager);
+    testNonRecursivePath("a/b/c", false, true, false);
+
+
+  }
+
+  @Test
+  public void testValidateAndUpdateCacheWithRecursiveAndOverWrite()
+      throws Exception {
+
+    String key = "c/d/e/f";
+    // Should be able to create file even if parent directories does not exist
+    testNonRecursivePath(key, false, true, false);
+
+    // Add the key to key table
+    TestOMRequestUtils.addKeyToTable(false, volumeName, bucketName,
+        key, 0L,  HddsProtos.ReplicationType.RATIS,
+        HddsProtos.ReplicationFactor.ONE, omMetadataManager);
+
+    // Even if key exists, should be able to create file as overwrite is set
+    // to true
+    testNonRecursivePath(key, true, true, false);
+    testNonRecursivePath(key, false, true, true);
+  }
+
+  @Test
+  public void testValidateAndUpdateCacheWithNonRecursiveAndOverWrite()
+      throws Exception {
+
+    String key = "c/d/e/f";
+    // Need to add the path which starts with "c/d/e" to keyTable as this is
+    // non-recursive parent should exist.
+    TestOMRequestUtils.addKeyToTable(false, volumeName, bucketName,
+        "c/d/e/h", 0L,  HddsProtos.ReplicationType.RATIS,
+        HddsProtos.ReplicationFactor.ONE, omMetadataManager);
+    testNonRecursivePath(key, false, false, false);
+
+    // Add the key to key table
+    TestOMRequestUtils.addKeyToTable(false, volumeName, bucketName,
+        key, 0L,  HddsProtos.ReplicationType.RATIS,
+        HddsProtos.ReplicationFactor.ONE, omMetadataManager);
+
+    // Even if key exists, should be able to create file as overwrite is set
+    // to true
+    testNonRecursivePath(key, true, false, false);
+    testNonRecursivePath(key, false, false, true);
+  }
+
+
+  private void testNonRecursivePath(String key,
+      boolean overWrite, boolean recursive, boolean fail) throws Exception {
+    OMRequest omRequest = createFileRequest(volumeName, bucketName, key,
+        HddsProtos.ReplicationFactor.ONE, HddsProtos.ReplicationType.RATIS,
+        overWrite, recursive);
+
+    TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+        omMetadataManager);
+    OMFileCreateRequest omFileCreateRequest = new OMFileCreateRequest(
+        omRequest);
+
+    OMRequest modifiedOmRequest = omFileCreateRequest.preExecute(ozoneManager);
+
+    omFileCreateRequest = new OMFileCreateRequest(modifiedOmRequest);
+
+    OMClientResponse omFileCreateResponse =
+        omFileCreateRequest.validateAndUpdateCache(ozoneManager, 100L);
+
+    if (fail) {
+      Assert.assertTrue(omFileCreateResponse.getOMResponse()
+          .getStatus() == NOT_A_FILE || omFileCreateResponse.getOMResponse()
+          .getStatus() == FILE_ALREADY_EXISTS);
+    } else {
+      long id = modifiedOmRequest.getCreateFileRequest().getClientID();
+
+      String openKey = omMetadataManager.getOpenKey(volumeName, bucketName,
+          key, id);
+      OmKeyInfo omKeyInfo = omMetadataManager.getOpenKeyTable().get(openKey);
+      Assert.assertNotNull(omKeyInfo);
+
+      List< OmKeyLocationInfo > omKeyLocationInfoList =
+          omKeyInfo.getLatestVersionLocations().getLocationList();
+      Assert.assertTrue(omKeyLocationInfoList.size() == 1);
+
+      OmKeyLocationInfo omKeyLocationInfo = omKeyLocationInfoList.get(0);
+
+      // Check modification time
+      Assert.assertEquals(modifiedOmRequest.getCreateFileRequest()
+          .getKeyArgs().getModificationTime(), omKeyInfo.getModificationTime());
+
+
+      // Check data of the block
+      OzoneManagerProtocolProtos.KeyLocation keyLocation =
+          modifiedOmRequest.getCreateFileRequest().getKeyArgs()
+              .getKeyLocations(0);
+
+      Assert.assertEquals(keyLocation.getBlockID().getContainerBlockID()
+          .getContainerID(), omKeyLocationInfo.getContainerID());
+      Assert.assertEquals(keyLocation.getBlockID().getContainerBlockID()
+          .getLocalID(), omKeyLocationInfo.getLocalID());
+    }
+  }
+
+
+  /**
+   * Create OMRequest which encapsulates OMFileCreateRequest.
+   * @param volumeName
+   * @param bucketName
+   * @param keyName
+   * @param replicationFactor
+   * @param replicationType
+   * @return OMRequest
+   */
+  private OMRequest createFileRequest(
+      String volumeName, String bucketName, String keyName,
+      HddsProtos.ReplicationFactor replicationFactor,
+      HddsProtos.ReplicationType replicationType, boolean overWrite,
+      boolean recursive) {
+
+    KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
+            .setVolumeName(volumeName).setBucketName(bucketName)
+            .setKeyName(keyName).setFactor(replicationFactor)
+            .setType(replicationType).setDataSize(dataSize);
+
+    CreateFileRequest createFileRequest = CreateFileRequest.newBuilder()
+        .setKeyArgs(keyArgs)
+        .setIsOverwrite(overWrite)
+        .setIsRecursive(recursive).build();
+
+    return OMRequest.newBuilder()
+        .setCmdType(OzoneManagerProtocolProtos.Type.CreateKey)
+        .setClientId(UUID.randomUUID().toString())
+        .setCreateFileRequest(createFileRequest).build();
+
+  }
+}