Browse Source

HDFS-12543. Ozone : allow create key without specifying size. Contributed by Chen Liang.

Nandakumar 7 years ago
parent
commit
a6f51d8cc7
25 changed files with 867 additions and 198 deletions
  1. 5 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
  2. 2 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
  3. 199 91
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
  4. 10 3
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
  5. 5 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyArgs.java
  6. 9 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyInfo.java
  7. 5 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyLocationInfo.java
  8. 41 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/OpenKeySession.java
  9. 27 3
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/protocol/KeySpaceManagerProtocol.java
  10. 70 11
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
  11. 34 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
  12. 21 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java
  13. 10 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java
  14. 41 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java
  15. 27 8
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java
  16. 164 51
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
  17. 28 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
  18. 68 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java
  19. 10 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
  20. 20 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
  21. 1 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestChunkStreams.java
  22. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKSMMetrcis.java
  23. 5 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKSMSQLCli.java
  24. 52 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
  25. 10 5
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestMultipleContainerReadWrite.java

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java

@@ -141,6 +141,11 @@ public final class OzoneConfigKeys {
   public static final int OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT
       = 300000; // 300s for default
 
+  public static final String OZONE_KEY_PREALLOCATION_MAXSIZE =
+      "ozone.key.preallocation.maxsize";
+  public static final long OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT
+      = 128 * OzoneConsts.MB;
+
   public static final String OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER =
       "ozone.block.deleting.limit.per.task";
   public static final int OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java

@@ -95,6 +95,8 @@ public final class OzoneConsts {
   public static final String OZONE_HANDLER_LOCAL = "local";
 
   public static final String DELETING_KEY_PREFIX = "#deleting#";
+  public static final String OPEN_KEY_PREFIX = "#open#";
+  public static final String OPEN_KEY_ID_DELIMINATOR = "#";
 
   /**
    * KSM LevelDB prefixes.

+ 199 - 91
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java

@@ -20,8 +20,11 @@ package org.apache.hadoop.ozone.client.io;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
+import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.ksm.protocolPB.KeySpaceManagerProtocolClientSideTranslatorPB;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
 import org.apache.hadoop.scm.XceiverClientManager;
 import org.apache.hadoop.scm.XceiverClientSpi;
@@ -56,53 +59,126 @@ public class ChunkGroupOutputStream extends OutputStream {
   // array list's get(index) is O(1)
   private final ArrayList<ChunkOutputStreamEntry> streamEntries;
   private int currentStreamIndex;
-  private long totalSize;
   private long byteOffset;
+  private final KeySpaceManagerProtocolClientSideTranslatorPB ksmClient;
+  private final
+      StorageContainerLocationProtocolClientSideTranslatorPB scmClient;
+  private final KsmKeyArgs keyArgs;
+  private final int openID;
+  private final XceiverClientManager xceiverClientManager;
+  private final int chunkSize;
+  private final String requestID;
 
+  /**
+   * A constructor for testing purpose only.
+   */
+  @VisibleForTesting
   public ChunkGroupOutputStream() {
+    streamEntries = new ArrayList<>();
+    ksmClient = null;
+    scmClient = null;
+    keyArgs = null;
+    openID = -1;
+    xceiverClientManager = null;
+    chunkSize = 0;
+    requestID = null;
+  }
+
+  /**
+   * For testing purpose only. Not building output stream from blocks, but
+   * taking from externally.
+   *
+   * @param outputStream
+   * @param length
+   */
+  @VisibleForTesting
+  public synchronized void addStream(OutputStream outputStream, long length) {
+    streamEntries.add(new ChunkOutputStreamEntry(outputStream, length));
+  }
+
+  public ChunkGroupOutputStream(
+      OpenKeySession handler, XceiverClientManager xceiverClientManager,
+      StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
+      KeySpaceManagerProtocolClientSideTranslatorPB ksmClient,
+      int chunkSize, String requestId) throws IOException {
     this.streamEntries = new ArrayList<>();
     this.currentStreamIndex = 0;
-    this.totalSize = 0;
     this.byteOffset = 0;
+    this.ksmClient = ksmClient;
+    this.scmClient = scmClient;
+    KsmKeyInfo info = handler.getKeyInfo();
+    this.keyArgs = new KsmKeyArgs.Builder()
+        .setVolumeName(info.getVolumeName())
+        .setBucketName(info.getBucketName())
+        .setKeyName(info.getKeyName())
+        .setDataSize(info.getDataSize()).build();
+    this.openID = handler.getId();
+    this.xceiverClientManager = xceiverClientManager;
+    this.chunkSize = chunkSize;
+    this.requestID = requestId;
+    LOG.debug("Expecting open key with one block, but got" +
+        info.getKeyLocationList().size());
+    // server may return any number of blocks, (0 to any)
+    int idx = 0;
+    for (KsmKeyLocationInfo subKeyInfo : info.getKeyLocationList()) {
+      subKeyInfo.setIndex(idx++);
+      checkKeyLocationInfo(subKeyInfo);
+    }
   }
 
-  @VisibleForTesting
-  public long getByteOffset() {
-    return byteOffset;
+  private void checkKeyLocationInfo(KsmKeyLocationInfo subKeyInfo)
+      throws IOException {
+    String containerKey = subKeyInfo.getBlockID();
+    String containerName = subKeyInfo.getContainerName();
+    Pipeline pipeline = scmClient.getContainer(containerName);
+    XceiverClientSpi xceiverClient =
+        xceiverClientManager.acquireClient(pipeline);
+    // create container if needed
+    if (subKeyInfo.getShouldCreateContainer()) {
+      try {
+        scmClient.notifyObjectCreationStage(
+            NotifyObjectCreationStageRequestProto.Type.container,
+            containerName,
+            NotifyObjectCreationStageRequestProto.Stage.begin);
+        ContainerProtocolCalls.createContainer(xceiverClient, requestID);
+        scmClient.notifyObjectCreationStage(
+            NotifyObjectCreationStageRequestProto.Type.container,
+            containerName,
+            NotifyObjectCreationStageRequestProto.Stage.complete);
+      } catch (StorageContainerException ex) {
+        if (ex.getResult().equals(Result.CONTAINER_EXISTS)) {
+          //container already exist, this should never happen
+          LOG.debug("Container {} already exists.", containerName);
+        } else {
+          LOG.error("Container creation failed for {}.", containerName, ex);
+          throw ex;
+        }
+      }
+    }
+    streamEntries.add(new ChunkOutputStreamEntry(containerKey,
+        keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID,
+        chunkSize, subKeyInfo.getLength()));
   }
 
-  /**
-   * Append another stream to the end of the list. Note that the streams are not
-   * actually created to this point, only enough meta data about the stream is
-   * stored. When something is to be actually written to the stream, the stream
-   * will be created (if not already).
-   *
-   * @param containerKey the key to store in the container
-   * @param key the ozone key
-   * @param xceiverClientManager xceiver manager instance
-   * @param xceiverClient xceiver manager instance
-   * @param requestID the request id
-   * @param chunkSize the chunk size for this key chunks
-   * @param length the total length of this key
-   */
-  public synchronized void addStream(String containerKey, String key,
-      XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient,
-      String requestID, int chunkSize, long length) {
-    streamEntries.add(new ChunkOutputStreamEntry(containerKey, key,
-        xceiverClientManager, xceiverClient, requestID, chunkSize, length));
-    totalSize += length;
-  }
 
   @VisibleForTesting
-  public synchronized void addStream(OutputStream outputStream, long length) {
-    streamEntries.add(new ChunkOutputStreamEntry(outputStream, length));
-    totalSize += length;
+  public long getByteOffset() {
+    return byteOffset;
   }
 
+
   @Override
   public synchronized void write(int b) throws IOException {
     if (streamEntries.size() <= currentStreamIndex) {
-      throw new IndexOutOfBoundsException();
+      Preconditions.checkNotNull(ksmClient);
+      // allocate a new block, if a exception happens, log an error and
+      // throw exception to the caller directly, and the write fails.
+      try {
+        allocateNewBlock(currentStreamIndex);
+      } catch (IOException ioe) {
+        LOG.error("Allocate block fail when writing.");
+        throw ioe;
+      }
     }
     ChunkOutputStreamEntry entry = streamEntries.get(currentStreamIndex);
     entry.write(b);
@@ -137,15 +213,21 @@ public class ChunkGroupOutputStream extends OutputStream {
     if (len == 0) {
       return;
     }
-    if (streamEntries.size() <= currentStreamIndex) {
-      throw new IOException("Write out of stream range! stream index:" +
-          currentStreamIndex);
-    }
-    if (totalSize - byteOffset < len) {
-      throw new IOException("Can not write " + len + " bytes with only " +
-          (totalSize - byteOffset) + " byte space");
-    }
+    int succeededAllocates = 0;
     while (len > 0) {
+      if (streamEntries.size() <= currentStreamIndex) {
+        Preconditions.checkNotNull(ksmClient);
+        // allocate a new block, if a exception happens, log an error and
+        // throw exception to the caller directly, and the write fails.
+        try {
+          allocateNewBlock(currentStreamIndex);
+          succeededAllocates += 1;
+        } catch (IOException ioe) {
+          LOG.error("Try to allocate more blocks for write failed, already " +
+              "allocated " + succeededAllocates + " blocks for this write.");
+          throw ioe;
+        }
+      }
       // in theory, this condition should never violate due the check above
       // still do a sanity check.
       Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
@@ -161,6 +243,21 @@ public class ChunkGroupOutputStream extends OutputStream {
     }
   }
 
+  /**
+   * Contact KSM to get a new block. Set the new block with the index (e.g.
+   * first block has index = 0, second has index = 1 etc.)
+   *
+   * The returned block is made to new ChunkOutputStreamEntry to write.
+   *
+   * @param index the index of the block.
+   * @throws IOException
+   */
+  private void allocateNewBlock(int index) throws IOException {
+    KsmKeyLocationInfo subKeyInfo = ksmClient.allocateBlock(keyArgs, openID);
+    subKeyInfo.setIndex(index);
+    checkKeyLocationInfo(subKeyInfo);
+  }
+
   @Override
   public synchronized void flush() throws IOException {
     for (int i = 0; i <= currentStreamIndex; i++) {
@@ -168,10 +265,73 @@ public class ChunkGroupOutputStream extends OutputStream {
     }
   }
 
+  /**
+   * Commit the key to KSM, this will add the blocks as the new key blocks.
+   *
+   * @throws IOException
+   */
   @Override
   public synchronized void close() throws IOException {
     for (ChunkOutputStreamEntry entry : streamEntries) {
-      entry.close();
+      if (entry != null) {
+        entry.close();
+      }
+    }
+    if (keyArgs != null) {
+      // in test, this could be null
+      keyArgs.setDataSize(byteOffset);
+      ksmClient.commitKey(keyArgs, openID);
+    } else {
+      LOG.warn("Closing ChunkGroupOutputStream, but key args is null");
+    }
+  }
+
+  /**
+   * Builder class of ChunkGroupOutputStream.
+   */
+  public static class Builder {
+    private OpenKeySession openHandler;
+    private XceiverClientManager xceiverManager;
+    private StorageContainerLocationProtocolClientSideTranslatorPB scmClient;
+    private KeySpaceManagerProtocolClientSideTranslatorPB ksmClient;
+    private int chunkSize;
+    private String requestID;
+
+    public Builder setHandler(OpenKeySession handler) {
+      this.openHandler = handler;
+      return this;
+    }
+
+    public Builder setXceiverClientManager(XceiverClientManager manager) {
+      this.xceiverManager = manager;
+      return this;
+    }
+
+    public Builder setScmClient(
+        StorageContainerLocationProtocolClientSideTranslatorPB client) {
+      this.scmClient = client;
+      return this;
+    }
+
+    public Builder setKsmClient(
+        KeySpaceManagerProtocolClientSideTranslatorPB client) {
+      this.ksmClient = client;
+      return this;
+    }
+
+    public Builder setChunkSize(int size) {
+      this.chunkSize = size;
+      return this;
+    }
+
+    public Builder setRequestID(String id) {
+      this.requestID = id;
+      return this;
+    }
+
+    public ChunkGroupOutputStream build() throws IOException {
+      return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient,
+          ksmClient, chunkSize, requestID);
     }
   }
 
@@ -266,56 +426,4 @@ public class ChunkGroupOutputStream extends OutputStream {
       }
     }
   }
-
-  public static ChunkGroupOutputStream getFromKsmKeyInfo(
-      KsmKeyInfo keyInfo, XceiverClientManager xceiverClientManager,
-      StorageContainerLocationProtocolClientSideTranslatorPB
-          storageContainerLocationClient,
-      int chunkSize, String requestId) throws IOException {
-    // TODO: the following createContainer and key writes may fail, in which
-    // case we should revert the above allocateKey to KSM.
-    // check index as sanity check
-    int index = 0;
-    String blockID;
-    ChunkGroupOutputStream groupOutputStream = new ChunkGroupOutputStream();
-    for (KsmKeyLocationInfo subKeyInfo : keyInfo.getKeyLocationList()) {
-      blockID = subKeyInfo.getBlockID();
-
-      Preconditions.checkArgument(index++ == subKeyInfo.getIndex());
-      String containerName = subKeyInfo.getContainerName();
-      Pipeline pipeline =
-          storageContainerLocationClient.getContainer(containerName);
-      XceiverClientSpi xceiverClient =
-          xceiverClientManager.acquireClient(pipeline);
-      // create container if needed
-      if (subKeyInfo.getShouldCreateContainer()) {
-        try {
-          storageContainerLocationClient.notifyObjectCreationStage(
-              NotifyObjectCreationStageRequestProto.Type.container,
-              containerName,
-              NotifyObjectCreationStageRequestProto.Stage.begin);
-
-          ContainerProtocolCalls.createContainer(xceiverClient, requestId);
-
-          storageContainerLocationClient.notifyObjectCreationStage(
-              NotifyObjectCreationStageRequestProto.Type.container,
-              containerName,
-              NotifyObjectCreationStageRequestProto.Stage.complete);
-        } catch (StorageContainerException ex) {
-          if (ex.getResult().equals(Result.CONTAINER_EXISTS)) {
-            //container already exist, this should never happen
-            LOG.debug("Container {} already exists.", containerName);
-          } else {
-            LOG.error("Container creation failed for {}.", containerName, ex);
-            throw ex;
-          }
-        }
-      }
-
-      groupOutputStream.addStream(blockID, keyInfo.getKeyName(),
-          xceiverClientManager, xceiverClient, requestId, chunkSize,
-          subKeyInfo.getLength());
-    }
-    return groupOutputStream;
-  }
 }

+ 10 - 3
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
 import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
+import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.ksm.protocolPB
     .KeySpaceManagerProtocolClientSideTranslatorPB;
 import org.apache.hadoop.ozone.ksm.protocolPB
@@ -444,10 +445,16 @@ public class RpcClient implements ClientProtocol {
         .setFactor(factor)
         .build();
 
-    KsmKeyInfo keyInfo = keySpaceManagerClient.allocateKey(keyArgs);
+    OpenKeySession openKey = keySpaceManagerClient.openKey(keyArgs);
     ChunkGroupOutputStream groupOutputStream =
-        ChunkGroupOutputStream.getFromKsmKeyInfo(keyInfo, xceiverClientManager,
-            storageContainerLocationClient, chunkSize, requestId);
+        new ChunkGroupOutputStream.Builder()
+            .setHandler(openKey)
+            .setXceiverClientManager(xceiverClientManager)
+            .setScmClient(storageContainerLocationClient)
+            .setKsmClient(keySpaceManagerClient)
+            .setChunkSize(chunkSize)
+            .setRequestID(requestId)
+            .build();
     return new OzoneOutputStream(groupOutputStream);
   }
 

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyArgs.java

@@ -27,7 +27,7 @@ public final class KsmKeyArgs {
   private final String volumeName;
   private final String bucketName;
   private final String keyName;
-  private final long dataSize;
+  private long dataSize;
   private final ReplicationType type;
   private final ReplicationFactor factor;
 
@@ -65,6 +65,10 @@ public final class KsmKeyArgs {
     return dataSize;
   }
 
+  public void setDataSize(long size) {
+    dataSize = size;
+  }
+
   /**
    * Builder class of KsmKeyArgs.
    */

+ 9 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyInfo.java

@@ -32,7 +32,7 @@ public final class KsmKeyInfo {
   private final String bucketName;
   // name of key client specified
   private final String keyName;
-  private final long dataSize;
+  private long dataSize;
   private List<KsmKeyLocationInfo> keyLocationList;
   private final long creationTime;
   private final long modificationTime;
@@ -65,10 +65,18 @@ public final class KsmKeyInfo {
     return dataSize;
   }
 
+  public void setDataSize(long size) {
+    this.dataSize = size;
+  }
+
   public List<KsmKeyLocationInfo> getKeyLocationList() {
     return keyLocationList;
   }
 
+  public void appendKeyLocation(KsmKeyLocationInfo newLocation) {
+    keyLocationList.add(newLocation);
+  }
+
   public long getCreationTime() {
     return creationTime;
   }

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyLocationInfo.java

@@ -28,7 +28,7 @@ public final class KsmKeyLocationInfo {
   private final String blockID;
   private final boolean shouldCreateContainer;
   // the id of this subkey in all the subkeys.
-  private final int index;
+  private int index;
   private final long length;
   private final long offset;
 
@@ -59,6 +59,10 @@ public final class KsmKeyLocationInfo {
     return index;
   }
 
+  public void setIndex(int idx) {
+    index = idx;
+  }
+
   public long getLength() {
     return length;
   }

+ 41 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/OpenKeySession.java

@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.ksm.helpers;
+
+/**
+ * This class represents a open key "session". A session here means a key is
+ * opened by a specific client, the client sends the handler to server, such
+ * that servers can recognize this client, and thus know how to close the key.
+ */
+public class OpenKeySession {
+  private final int id;
+  private final KsmKeyInfo keyInfo;
+
+  public OpenKeySession(int id, KsmKeyInfo info) {
+    this.id = id;
+    this.keyInfo = info;
+  }
+
+  public KsmKeyInfo getKeyInfo() {
+    return keyInfo;
+  }
+
+  public int getId() {
+    return id;
+  }
+}

+ 27 - 3
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/protocol/KeySpaceManagerProtocol.java

@@ -21,7 +21,9 @@ import org.apache.hadoop.ozone.ksm.helpers.KsmBucketArgs;
 import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
 import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
+import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.OzoneAclInfo;
 import java.io.IOException;
@@ -129,13 +131,35 @@ public interface KeySpaceManagerProtocol {
   void setBucketProperty(KsmBucketArgs args) throws IOException;
 
   /**
-   * Allocate a block to a container, the block is returned to the client.
+   * Open the given key and return an open key session.
    *
    * @param args the args of the key.
-   * @return KsmKeyInfo isntacne that client uses to talk to container.
+   * @return OpenKeySession instance that client uses to talk to container.
+   * @throws IOException
+   */
+  OpenKeySession openKey(KsmKeyArgs args) throws IOException;
+
+  /**
+   * Commit a key. This will make the change from the client visible. The client
+   * is identified by the clientID.
+   *
+   * @param args the key to commit
+   * @param clientID the client identification
    * @throws IOException
    */
-  KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException;
+  void commitKey(KsmKeyArgs args, int clientID) throws IOException;
+
+  /**
+   * Allocate a new block, it is assumed that the client is having an open key
+   * session going on. This block will be appended to this open key session.
+   *
+   * @param args the key to append
+   * @param clientID the client identification
+   * @return an allocated block
+   * @throws IOException
+   */
+  KsmKeyLocationInfo allocateBlock(KsmKeyArgs args, int clientID)
+      throws IOException;
 
   /**
    * Look up for the container of an existing key.

+ 70 - 11
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java

@@ -28,8 +28,18 @@ import org.apache.hadoop.ozone.ksm.helpers.KsmBucketArgs;
 import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
 import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
+import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.ksm.protocol.KeySpaceManagerProtocol;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.AllocateBlockRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.AllocateBlockResponse;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.CommitKeyRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.CommitKeyResponse;
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.BucketArgs;
 import org.apache.hadoop.ozone.protocol.proto
@@ -498,38 +508,87 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB
   }
 
   /**
-   * Allocate a block for a key, then use the returned meta info to talk to data
-   * node to actually write the key.
+   * Create a new open session of the key, then use the returned meta info to
+   * talk to data node to actually write the key.
    * @param args the args for the key to be allocated
    * @return a handler to the key, returned client
    * @throws IOException
    */
   @Override
-  public KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException {
+  public OpenKeySession openKey(KsmKeyArgs args) throws IOException {
     LocateKeyRequest.Builder req = LocateKeyRequest.newBuilder();
+    KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
+        .setVolumeName(args.getVolumeName())
+        .setBucketName(args.getBucketName())
+        .setKeyName(args.getKeyName());
+    if (args.getDataSize() > 0) {
+      keyArgs.setDataSize(args.getDataSize());
+    }
+    req.setKeyArgs(keyArgs.build());
+
+    final LocateKeyResponse resp;
+    try {
+      resp = rpcProxy.createKey(NULL_RPC_CONTROLLER, req.build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+    if (resp.getStatus() != Status.OK) {
+      throw new IOException("Create key failed, error:" + resp.getStatus());
+    }
+    return new OpenKeySession(resp.getID(),
+        KsmKeyInfo.getFromProtobuf(resp.getKeyInfo()));
+  }
+
+  @Override
+  public KsmKeyLocationInfo allocateBlock(KsmKeyArgs args, int clientID)
+      throws IOException {
+    AllocateBlockRequest.Builder req = AllocateBlockRequest.newBuilder();
     KeyArgs keyArgs = KeyArgs.newBuilder()
         .setVolumeName(args.getVolumeName())
         .setBucketName(args.getBucketName())
         .setKeyName(args.getKeyName())
-        .setDataSize(args.getDataSize())
-        .setType(args.getType())
-        .setFactor(args.getFactor())
-        .build();
+        .setDataSize(args.getDataSize()).build();
     req.setKeyArgs(keyArgs);
+    req.setClientID(clientID);
 
-    final LocateKeyResponse resp;
+    final AllocateBlockResponse resp;
     try {
-      resp = rpcProxy.createKey(NULL_RPC_CONTROLLER, req.build());
+      resp = rpcProxy.allocateBlock(NULL_RPC_CONTROLLER, req.build());
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }
     if (resp.getStatus() != Status.OK) {
-      throw new IOException("Create key failed, error:" +
+      throw new IOException("Allocate block failed, error:" +
           resp.getStatus());
     }
-    return KsmKeyInfo.getFromProtobuf(resp.getKeyInfo());
+    return KsmKeyLocationInfo.getFromProtobuf(resp.getKeyLocation());
   }
 
+  @Override
+  public void commitKey(KsmKeyArgs args, int clientID)
+      throws IOException {
+    CommitKeyRequest.Builder req = CommitKeyRequest.newBuilder();
+    KeyArgs keyArgs = KeyArgs.newBuilder()
+        .setVolumeName(args.getVolumeName())
+        .setBucketName(args.getBucketName())
+        .setKeyName(args.getKeyName())
+        .setDataSize(args.getDataSize()).build();
+    req.setKeyArgs(keyArgs);
+    req.setClientID(clientID);
+
+    final CommitKeyResponse resp;
+    try {
+      resp = rpcProxy.commitKey(NULL_RPC_CONTROLLER, req.build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+    if (resp.getStatus() != Status.OK) {
+      throw new IOException("Commit key failed, error:" +
+          resp.getStatus());
+    }
+  }
+
+
   @Override
   public KsmKeyInfo lookupKey(KsmKeyArgs args) throws IOException {
     LocateKeyRequest.Builder req = LocateKeyRequest.newBuilder();

+ 34 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto

@@ -255,6 +255,9 @@ message LocateKeyRequest {
 message LocateKeyResponse {
     required Status status = 1;
     optional KeyInfo keyInfo = 2;
+    // clients' followup request may carry this ID for stateful operations (similar
+    // to a cookie).
+    optional uint32 ID = 3;
 }
 
 message SetBucketPropertyRequest {
@@ -287,6 +290,25 @@ message ListKeysResponse {
     repeated KeyInfo keyInfo = 2;
 }
 
+message AllocateBlockRequest {
+    required KeyArgs keyArgs = 1;
+    required uint32 clientID = 2;
+}
+
+message AllocateBlockResponse {
+    required Status status = 1;
+    required KeyLocation keyLocation = 2;
+}
+
+message CommitKeyRequest {
+    required KeyArgs keyArgs = 1;
+    required uint32 clientID = 2;
+}
+
+message CommitKeyResponse {
+    required Status status = 1;
+}
+
 /**
  The KSM service that takes care of Ozone namespace.
 */
@@ -380,4 +402,16 @@ service KeySpaceManagerService {
     */
     rpc listKeys(ListKeysRequest)
     returns(ListKeysResponse);
+
+    /**
+      Commit a key.
+    */
+    rpc commitKey(CommitKeyRequest)
+    returns(CommitKeyResponse);
+
+    /**
+      Allocate a new block for a key.
+    */
+    rpc allocateBlock(AllocateBlockRequest)
+    returns(AllocateBlockResponse);
 }

+ 21 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java

@@ -125,6 +125,27 @@ public interface KSMMetadataManager {
    */
   byte[] getDeletedKeyName(byte[] keyName);
 
+  /**
+   * Returns the DB key name of a open key in KSM metadata store.
+   * Should be #open# prefix followed by actual key name.
+   * @param keyName - key name
+   * @param id - the id for this open
+   * @return bytes of DB key.
+   */
+  byte[] getOpenKeyNameBytes(String keyName, int id);
+
+  /**
+   * Returns the full name of a key given volume name, bucket name and key name.
+   * Generally done by padding certain delimiters.
+   *
+   * @param volumeName - volume name
+   * @param bucketName - bucket name
+   * @param keyName - key name
+   * @return the full key name.
+   */
+  String getKeyWithDBPrefix(String volumeName, String bucketName,
+      String keyName);
+
   /**
    * Given a volume, check if it is empty,
    * i.e there are no buckets inside it.

+ 10 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java

@@ -53,6 +53,8 @@ import java.util.stream.Collectors;
 
 import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OPEN_KEY_ID_DELIMINATOR;
+import static org.apache.hadoop.ozone.OzoneConsts.OPEN_KEY_PREFIX;
 import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
     .OZONE_KSM_DB_CACHE_SIZE_DEFAULT;
 import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
@@ -153,7 +155,8 @@ public class KSMMetadataManagerImpl implements KSMMetadataManager {
     return sb.toString();
   }
 
-  private String getKeyWithDBPrefix(String volume, String bucket, String key) {
+  @Override
+  public String getKeyWithDBPrefix(String volume, String bucket, String key) {
     String keyVB = OzoneConsts.KSM_KEY_PREFIX + volume
         + OzoneConsts.KSM_KEY_PREFIX + bucket
         + OzoneConsts.KSM_KEY_PREFIX;
@@ -171,6 +174,12 @@ public class KSMMetadataManagerImpl implements KSMMetadataManager {
         DELETING_KEY_PREFIX + DFSUtil.bytes2String(keyName));
   }
 
+  @Override
+  public byte[] getOpenKeyNameBytes(String keyName, int id) {
+    return DFSUtil.string2Bytes(OPEN_KEY_PREFIX + id +
+        OPEN_KEY_ID_DELIMINATOR + keyName);
+  }
+
   /**
    * Returns the read lock used on Metadata DB.
    * @return readLock

+ 41 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java

@@ -56,6 +56,8 @@ public class KSMMetrics {
   private @Metric MutableCounterLong numBucketLists;
   private @Metric MutableCounterLong numKeyLists;
   private @Metric MutableCounterLong numVolumeLists;
+  private @Metric MutableCounterLong numKeyCommits;
+  private @Metric MutableCounterLong numAllocateBlockCalls;
 
   // Failure Metrics
   private @Metric MutableCounterLong numVolumeCreateFails;
@@ -73,6 +75,8 @@ public class KSMMetrics {
   private @Metric MutableCounterLong numBucketListFails;
   private @Metric MutableCounterLong numKeyListFails;
   private @Metric MutableCounterLong numVolumeListFails;
+  private @Metric MutableCounterLong numKeyCommitFails;
+  private @Metric MutableCounterLong numBlockAllocateCallFails;
 
   public KSMMetrics() {
   }
@@ -207,6 +211,23 @@ public class KSMMetrics {
     numKeyDeletes.incr();
   }
 
+  public void incNumKeyCommits() {
+    numKeyOps.incr();
+    numKeyCommits.incr();
+  }
+
+  public void incNumKeyCommitFails() {
+    numKeyCommitFails.incr();
+  }
+
+  public void incNumBlockAllocateCalls() {
+    numAllocateBlockCalls.incr();
+  }
+
+  public void incNumBlockAllocateCallFails() {
+    numBlockAllocateCallFails.incr();
+  }
+
   public void incNumBucketListFails() {
     numBucketListFails.incr();
   }
@@ -369,6 +390,26 @@ public class KSMMetrics {
     return numVolumeListFails.value();
   }
 
+  @VisibleForTesting
+  public long getNumKeyCommits() {
+    return numKeyCommits.value();
+  }
+
+  @VisibleForTesting
+  public long getNumKeyCommitFails() {
+    return numKeyCommitFails.value();
+  }
+
+  @VisibleForTesting
+  public long getNumBlockAllocates() {
+    return numAllocateBlockCalls.value();
+  }
+
+  @VisibleForTesting
+  public long getNumBlockAllocateFails() {
+    return numBlockAllocateCallFails.value();
+  }
+
   public void unRegister() {
     MetricsSystem ms = DefaultMetricsSystem.instance();
     ms.unregisterSource(SOURCE_NAME);

+ 27 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java

@@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.ksm;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
+import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
 
 import java.io.IOException;
 import java.util.List;
@@ -39,22 +41,39 @@ public interface KeyManager {
   void stop() throws IOException;
 
   /**
-   * Given the args of a key to put, return a pipeline for the key. Writes
-   * the key to pipeline mapping to meta data.
+   * After calling commit, the key will be made visible. There can be multiple
+   * open key writes in parallel (identified by client id). The most recently
+   * committed one will be the one visible.
    *
-   * Note that this call only allocate a block for key, and adds the
-   * corresponding entry to metadata. The block will be returned to client side
-   * handler DistributedStorageHandler. Which will make another call to
-   * datanode to create container (if needed) and writes the key.
+   * @param args the key to commit.
+   * @param clientID the client that is committing.
+   * @throws IOException
+   */
+  void commitKey(KsmKeyArgs args, int clientID) throws IOException;
+
+  /**
+   * A client calls this on an open key, to request to allocate a new block,
+   * and appended to the tail of current block list of the open client.
+   *
+   * @param args the key to append
+   * @param clientID the client requesting block.
+   * @return the reference to the new block.
+   * @throws IOException
+   */
+  KsmKeyLocationInfo allocateBlock(KsmKeyArgs args, int clientID)
+      throws IOException;
+  /**
+   * Given the args of a key to put, write an open key entry to meta data.
    *
    * In case that the container creation or key write failed on
    * DistributedStorageHandler, this key's metadata will still stay in KSM.
+   * TODO garbage collect the open keys that never get closed
    *
    * @param args the args of the key provided by client.
-   * @return a KsmKeyInfo instance client uses to talk to container.
+   * @return a OpenKeySession instance client uses to talk to container.
    * @throws Exception
    */
-  KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException;
+  OpenKeySession openKey(KsmKeyArgs args) throws IOException;
 
   /**
    * Look up an existing key. Return the info of the key to client side, which

+ 164 - 51
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
 import org.apache.hadoop.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
 import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
+import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
 import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
@@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT;
@@ -47,6 +49,8 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVI
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
@@ -69,6 +73,9 @@ public class KeyManagerImpl implements KeyManager {
   private final boolean useRatis;
   private final BackgroundService keyDeletingService;
 
+  private final long preallocateMax;
+  private final Random random;
+
   public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
       KSMMetadataManager metadataManager, OzoneConfiguration conf) {
     this.scmBlockClient = scmBlockClient;
@@ -83,8 +90,12 @@ public class KeyManagerImpl implements KeyManager {
     long serviceTimeout = conf.getTimeDuration(
         OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
         OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
+    this.preallocateMax = conf.getLong(
+        OZONE_KEY_PREALLOCATION_MAXSIZE,
+        OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT);
     keyDeletingService = new KeyDeletingService(
         scmBlockClient, this, svcInterval, serviceTimeout, conf);
+    random = new Random();
   }
 
   @Override
@@ -97,8 +108,28 @@ public class KeyManagerImpl implements KeyManager {
     keyDeletingService.shutdown();
   }
 
+  private void validateBucket(String volumeName, String bucketName)
+      throws IOException {
+    byte[] volumeKey = metadataManager.getVolumeKey(volumeName);
+    byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
+
+    //Check if the volume exists
+    if(metadataManager.get(volumeKey) == null) {
+      LOG.error("volume not found: {}", volumeName);
+      throw new KSMException("Volume not found",
+          KSMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
+    }
+    //Check if bucket already exists
+    if(metadataManager.get(bucketKey) == null) {
+      LOG.error("bucket not found: {}/{} ", volumeName, bucketName);
+      throw new KSMException("Bucket not found",
+          KSMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
+    }
+  }
+
   @Override
-  public KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException {
+  public KsmKeyLocationInfo allocateBlock(KsmKeyArgs args, int clientID)
+      throws IOException {
     Preconditions.checkNotNull(args);
     metadataManager.writeLock().lock();
     String volumeName = args.getVolumeName();
@@ -118,80 +149,162 @@ public class KeyManagerImpl implements KeyManager {
     }
 
     try {
-      byte[] volumeKey = metadataManager.getVolumeKey(volumeName);
-      byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
-      byte[] keyKey =
-          metadataManager.getDBKeyBytes(volumeName, bucketName, keyName);
-
-      //Check if the volume exists
-      if (metadataManager.get(volumeKey) == null) {
-        LOG.debug("volume not found: {}", volumeName);
-        throw new KSMException("Volume not found",
-            KSMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
-      }
-      //Check if bucket already exists
-      if (metadataManager.get(bucketKey) == null) {
-        LOG.debug("bucket not found: {}/{} ", volumeName, bucketName);
-        throw new KSMException("Bucket not found",
-            KSMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
+      validateBucket(volumeName, bucketName);
+      String objectKey = metadataManager.getKeyWithDBPrefix(
+          volumeName, bucketName, keyName);
+      byte[] openKey = metadataManager.getOpenKeyNameBytes(objectKey, clientID);
+      byte[] keyData = metadataManager.get(openKey);
+      if (keyData == null) {
+        LOG.error("Allocate block for a key not in open status in meta store " +
+            objectKey + " with ID " + clientID);
+        throw new KSMException("Open Key not found",
+            KSMException.ResultCodes.FAILED_KEY_NOT_FOUND);
       }
+      AllocatedBlock allocatedBlock =
+          scmBlockClient.allocateBlock(scmBlockSize, type, factor);
+      KsmKeyInfo keyInfo =
+          KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(keyData));
+      KsmKeyLocationInfo info = new KsmKeyLocationInfo.Builder()
+          .setContainerName(allocatedBlock.getPipeline().getContainerName())
+          .setBlockID(allocatedBlock.getKey())
+          .setShouldCreateContainer(allocatedBlock.getCreateContainer())
+          .setLength(scmBlockSize)
+          .setOffset(0)
+          .setIndex(keyInfo.getKeyLocationList().size())
+          .build();
+      keyInfo.appendKeyLocation(info);
+      metadataManager.put(openKey, keyInfo.getProtobuf().toByteArray());
+      return info;
+    } finally {
+      metadataManager.writeLock().unlock();
+    }
+  }
 
-      // TODO: Garbage collect deleted blocks due to overwrite of a key.
-      // FIXME: BUG: Please see HDFS-11922.
-      // If user overwrites a key, then we are letting it pass without
-      // corresponding process.
-      // In reality we need to garbage collect those blocks by telling SCM to
-      // clean up those blocks when it can. Right now making this change
-      // allows us to pass tests that expect ozone can overwrite a key.
+  @Override
+  public OpenKeySession openKey(KsmKeyArgs args) throws IOException {
+    Preconditions.checkNotNull(args);
+    metadataManager.writeLock().lock();
+    String volumeName = args.getVolumeName();
+    String bucketName = args.getBucketName();
+    String keyName = args.getKeyName();
+    ReplicationFactor factor = args.getFactor();
+    ReplicationType type = args.getType();
 
-      // When we talk to SCM make sure that we ask for at least a byte in the
-      // block. This way even if the call is for a zero length key, we back it
-      // with a actual SCM block.
-      // TODO : Review this decision later. We can get away with only a
-      // metadata entry in case of 0 length key.
-      long targetSize = args.getDataSize();
-      List<KsmKeyLocationInfo> subKeyInfos = new ArrayList<>();
-      int idx = 0;
-      long offset = 0;
+    // If user does not specify a replication strategy or
+    // replication factor, KSM will use defaults.
+    if(factor == null) {
+      factor = useRatis ? ReplicationFactor.THREE: ReplicationFactor.ONE;
+    }
 
-      // in case targetSize == 0, subKeyInfos will be an empty list
-      while (targetSize > 0) {
-        long allocateSize = Math.min(targetSize, scmBlockSize);
+    if(type == null) {
+      type = useRatis ? ReplicationType.RATIS : ReplicationType.STAND_ALONE;
+    }
+
+    try {
+      validateBucket(volumeName, bucketName);
+      long requestedSize = Math.min(preallocateMax, args.getDataSize());
+      List<KsmKeyLocationInfo> locations = new ArrayList<>();
+      String objectKey = metadataManager.getKeyWithDBPrefix(
+          volumeName, bucketName, keyName);
+      // requested size is not required but more like a optimization:
+      // SCM looks at the requested, if it 0, no block will be allocated at
+      // the point, if client needs more blocks, client can always call
+      // allocateBlock. But if requested size is not 0, KSM will preallocate
+      // some blocks and piggyback to client, to save RPC calls.
+      int idx = 0;
+      while (requestedSize > 0) {
+        long allocateSize = Math.min(scmBlockSize, requestedSize);
         AllocatedBlock allocatedBlock =
             scmBlockClient.allocateBlock(allocateSize, type, factor);
         KsmKeyLocationInfo subKeyInfo = new KsmKeyLocationInfo.Builder()
             .setContainerName(allocatedBlock.getPipeline().getContainerName())
             .setBlockID(allocatedBlock.getKey())
             .setShouldCreateContainer(allocatedBlock.getCreateContainer())
-            .setIndex(idx)
+            .setIndex(idx++)
             .setLength(allocateSize)
-            .setOffset(offset)
+            .setOffset(0)
             .build();
-        idx += 1;
-        offset += allocateSize;
-        targetSize -= allocateSize;
-        subKeyInfos.add(subKeyInfo);
+        locations.add(subKeyInfo);
+        requestedSize -= allocateSize;
       }
-
       long currentTime = Time.now();
-      KsmKeyInfo keyBlock = new KsmKeyInfo.Builder()
+      // NOTE size of a key is not a hard limit on anything, it is a value that
+      // client should expect, in terms of current size of key. If client sets a
+      // value, then this value is used, otherwise, we allocate a single block
+      // which is the current size, if read by the client.
+      long size = args.getDataSize() >= 0 ? args.getDataSize() : scmBlockSize;
+      KsmKeyInfo keyInfo = new KsmKeyInfo.Builder()
           .setVolumeName(args.getVolumeName())
           .setBucketName(args.getBucketName())
           .setKeyName(args.getKeyName())
-          .setDataSize(args.getDataSize())
-          .setKsmKeyLocationInfos(subKeyInfos)
+          .setKsmKeyLocationInfos(locations)
           .setCreationTime(currentTime)
           .setModificationTime(currentTime)
+          .setDataSize(size)
           .build();
-      metadataManager.put(keyKey, keyBlock.getProtobuf().toByteArray());
-      LOG.debug("Key {} allocated in volume {} bucket {}", keyName, volumeName,
-          bucketName);
-      return keyBlock;
+      // Generate a random ID which is not already in meta db.
+      int id = -1;
+      // in general this should finish in a couple times at most. putting some
+      // arbitrary large number here to avoid dead loop.
+      for (int j = 0; j < 10000; j++) {
+        id = random.nextInt();
+        byte[] openKey = metadataManager.getOpenKeyNameBytes(objectKey, id);
+        if (metadataManager.get(openKey) == null) {
+          metadataManager.put(openKey, keyInfo.getProtobuf().toByteArray());
+          break;
+        }
+      }
+      if (id == -1) {
+        throw new IOException("Failed to find a usable id for " + objectKey);
+      }
+      LOG.debug("Key {} allocated in volume {} bucket {}",
+          keyName, volumeName, bucketName);
+      return new OpenKeySession(id, keyInfo);
+    } catch (KSMException e) {
+      throw e;
+    } catch (IOException ex) {
+      if (!(ex instanceof KSMException)) {
+        LOG.error("Key open failed for volume:{} bucket:{} key:{}",
+            volumeName, bucketName, keyName, ex);
+      }
+      throw new KSMException(ex.getMessage(),
+          KSMException.ResultCodes.FAILED_KEY_ALLOCATION);
+    } finally {
+      metadataManager.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public void commitKey(KsmKeyArgs args, int clientID) throws IOException {
+    Preconditions.checkNotNull(args);
+    metadataManager.writeLock().lock();
+    String volumeName = args.getVolumeName();
+    String bucketName = args.getBucketName();
+    String keyName = args.getKeyName();
+    try {
+      validateBucket(volumeName, bucketName);
+      String objectKey = metadataManager.getKeyWithDBPrefix(
+          volumeName, bucketName, keyName);
+      byte[] objectKeyBytes = metadataManager.getDBKeyBytes(volumeName,
+          bucketName, keyName);
+      byte[] openKey = metadataManager.getOpenKeyNameBytes(objectKey, clientID);
+      byte[] openKeyData = metadataManager.get(openKey);
+      if (openKeyData == null) {
+        throw new KSMException("Commit a key without corresponding entry " +
+            DFSUtil.bytes2String(openKey), ResultCodes.FAILED_KEY_NOT_FOUND);
+      }
+      KsmKeyInfo keyInfo =
+          KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(openKeyData));
+      keyInfo.setDataSize(args.getDataSize());
+      BatchOperation batch = new BatchOperation();
+      batch.delete(openKey);
+      batch.put(objectKeyBytes, keyInfo.getProtobuf().toByteArray());
+      metadataManager.writeBatch(batch);
     } catch (KSMException e) {
       throw e;
     } catch (IOException ex) {
       if (!(ex instanceof KSMException)) {
-        LOG.error("Key allocation failed for volume:{} bucket:{} key:{}",
+        LOG.error("Key commit failed for volume:{} bucket:{} key:{}",
             volumeName, bucketName, keyName, ex);
       }
       throw new KSMException(ex.getMessage(),

+ 28 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java

@@ -28,7 +28,9 @@ import org.apache.hadoop.ozone.ksm.helpers.KsmBucketArgs;
 import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
 import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
+import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.ksm.protocol.KeySpaceManagerProtocol;
 import org.apache.hadoop.ozone.ksm.protocolPB.KeySpaceManagerProtocolPB;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -466,16 +468,40 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl
    * @throws IOException
    */
   @Override
-  public KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException {
+  public OpenKeySession openKey(KsmKeyArgs args) throws IOException {
     try {
       metrics.incNumKeyAllocates();
-      return keyManager.allocateKey(args);
+      return keyManager.openKey(args);
     } catch (Exception ex) {
       metrics.incNumKeyAllocateFails();
       throw ex;
     }
   }
 
+  @Override
+  public void commitKey(KsmKeyArgs args, int clientID)
+      throws IOException {
+    try {
+      metrics.incNumKeyCommits();
+      keyManager.commitKey(args, clientID);
+    } catch (Exception ex) {
+      metrics.incNumKeyCommitFails();
+      throw ex;
+    }
+  }
+
+  @Override
+  public KsmKeyLocationInfo allocateBlock(KsmKeyArgs args, int clientID)
+      throws IOException {
+    try {
+      metrics.incNumBlockAllocateCalls();
+      return keyManager.allocateBlock(args, clientID);
+    } catch (Exception ex) {
+      metrics.incNumBlockAllocateCallFails();
+      throw ex;
+    }
+  }
+
   /**
    * Lookup a key.
    *

+ 68 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java

@@ -23,10 +23,20 @@ import org.apache.hadoop.ozone.ksm.helpers.KsmBucketArgs;
 import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
 import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
+import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.ksm.protocol.KeySpaceManagerProtocol;
 import org.apache.hadoop.ozone.ksm.protocolPB.KeySpaceManagerProtocolPB;
 import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.AllocateBlockRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.AllocateBlockResponse;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.CommitKeyRequest;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.CommitKeyResponse;
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.CreateBucketRequest;
 import org.apache.hadoop.ozone.protocol.proto
@@ -81,6 +91,7 @@ import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.List
 import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.ListKeysResponse;
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.Status;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -303,16 +314,26 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements
         LocateKeyResponse.newBuilder();
     try {
       KeyArgs keyArgs = request.getKeyArgs();
+      OzoneProtos.ReplicationType type =
+          keyArgs.hasType()? keyArgs.getType() : null;
+      OzoneProtos.ReplicationFactor factor =
+          keyArgs.hasFactor()? keyArgs.getFactor() : null;
       KsmKeyArgs ksmKeyArgs = new KsmKeyArgs.Builder()
           .setVolumeName(keyArgs.getVolumeName())
           .setBucketName(keyArgs.getBucketName())
           .setKeyName(keyArgs.getKeyName())
           .setDataSize(keyArgs.getDataSize())
-          .setType(keyArgs.getType())
-          .setFactor(keyArgs.getFactor())
+          .setType(type)
+          .setFactor(factor)
           .build();
-      KsmKeyInfo keyInfo = impl.allocateKey(ksmKeyArgs);
-      resp.setKeyInfo(keyInfo.getProtobuf());
+      if (keyArgs.hasDataSize()) {
+        ksmKeyArgs.setDataSize(keyArgs.getDataSize());
+      } else {
+        ksmKeyArgs.setDataSize(0);
+      }
+      OpenKeySession openKey = impl.openKey(ksmKeyArgs);
+      resp.setKeyInfo(openKey.getKeyInfo().getProtobuf());
+      resp.setID(openKey.getId());
       resp.setStatus(Status.OK);
     } catch (IOException e) {
       resp.setStatus(exceptionToResponseStatus(e));
@@ -332,7 +353,6 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements
           .setVolumeName(keyArgs.getVolumeName())
           .setBucketName(keyArgs.getBucketName())
           .setKeyName(keyArgs.getKeyName())
-          .setDataSize(keyArgs.getDataSize())
           .build();
       KsmKeyInfo keyInfo = impl.lookupKey(ksmKeyArgs);
       resp.setKeyInfo(keyInfo.getProtobuf());
@@ -436,4 +456,47 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements
     }
     return resp.build();
   }
+
+  @Override
+  public CommitKeyResponse commitKey(RpcController controller,
+      CommitKeyRequest request) throws ServiceException {
+    CommitKeyResponse.Builder resp =
+        CommitKeyResponse.newBuilder();
+    try {
+      KeyArgs keyArgs = request.getKeyArgs();
+      KsmKeyArgs ksmKeyArgs = new KsmKeyArgs.Builder()
+          .setVolumeName(keyArgs.getVolumeName())
+          .setBucketName(keyArgs.getBucketName())
+          .setKeyName(keyArgs.getKeyName())
+          .build();
+      int id = request.getClientID();
+      impl.commitKey(ksmKeyArgs, id);
+      resp.setStatus(Status.OK);
+    } catch (IOException e) {
+      resp.setStatus(exceptionToResponseStatus(e));
+    }
+    return resp.build();
+  }
+
+  @Override
+  public AllocateBlockResponse allocateBlock(RpcController controller,
+      AllocateBlockRequest request) throws ServiceException {
+    AllocateBlockResponse.Builder resp =
+        AllocateBlockResponse.newBuilder();
+    try {
+      KeyArgs keyArgs = request.getKeyArgs();
+      KsmKeyArgs ksmKeyArgs = new KsmKeyArgs.Builder()
+          .setVolumeName(keyArgs.getVolumeName())
+          .setBucketName(keyArgs.getBucketName())
+          .setKeyName(keyArgs.getKeyName())
+          .build();
+      int id = request.getClientID();
+      KsmKeyLocationInfo newLocation = impl.allocateBlock(ksmKeyArgs, id);
+      resp.setKeyLocation(newLocation.getProtobuf());
+      resp.setStatus(Status.OK);
+    } catch (IOException e) {
+      resp.setStatus(exceptionToResponseStatus(e));
+    }
+    return resp.build();
+  }
 }

+ 10 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
 import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
+import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.ksm.protocolPB
     .KeySpaceManagerProtocolClientSideTranslatorPB;
 import org.apache.hadoop.conf.OzoneConfiguration;
@@ -409,10 +410,16 @@ public final class DistributedStorageHandler implements StorageHandler {
         .setFactor(xceiverClientManager.getFactor())
         .build();
     // contact KSM to allocate a block for key.
-    KsmKeyInfo keyInfo = keySpaceManagerClient.allocateKey(keyArgs);
+    OpenKeySession openKey = keySpaceManagerClient.openKey(keyArgs);
     ChunkGroupOutputStream groupOutputStream =
-        ChunkGroupOutputStream.getFromKsmKeyInfo(keyInfo, xceiverClientManager,
-            storageContainerLocationClient, chunkSize, args.getRequestID());
+        new ChunkGroupOutputStream.Builder()
+            .setHandler(openKey)
+            .setXceiverClientManager(xceiverClientManager)
+            .setScmClient(storageContainerLocationClient)
+            .setKsmClient(keySpaceManagerClient)
+            .setChunkSize(chunkSize)
+            .setRequestID(args.getRequestID())
+            .build();
     return new OzoneOutputStream(groupOutputStream);
   }
 

+ 20 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml

@@ -1062,4 +1062,24 @@
       exceed this count then the oldest idle connection is evicted.
     </description>
   </property>
+
+  <property>
+    <name>ozone.key.preallocation.maxsize</name>
+    <value>134217728</value>
+    <tag>OZONE, KSM, PERFORMANCE</tag>
+    <description>
+      When a new key write request is sent to KSM, if a size is requested, at most
+      128MB of size is allocated at request time. If client needs more space for the
+      write, separate block allocation requests will be made.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.client.list.cache</name>
+    <value>1000</value>
+    <tag>OZONE, PERFORMANCE</tag>
+    <description>
+      Configuration property to configure the cache size of client list calls.
+    </description>
+  </property>
 </configuration>

+ 1 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestChunkStreams.java

@@ -25,7 +25,6 @@ import org.junit.rules.ExpectedException;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
@@ -103,9 +102,7 @@ public class TestChunkStreams {
       // TODO : if we decide to take the 400 bytes instead in the future,
       // other add more informative error code rather than exception, need to
       // change this part.
-      exception.expect(IOException.class);
-      exception.expectMessage(
-          "Can not write 500 bytes with only 400 byte space");
+      exception.expect(Exception.class);
       groupOutputStream.write(RandomStringUtils.randomAscii(500).getBytes());
       assertEquals(100, groupOutputStream.getByteOffset());
     }

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKSMMetrcis.java

@@ -176,7 +176,7 @@ public class TestKSMMetrcis {
         .getInternalState(ksmManager, "keyManager");
     KeyManager mockKm = Mockito.spy(bucketManager);
 
-    Mockito.doReturn(null).when(mockKm).allocateKey(null);
+    Mockito.doReturn(null).when(mockKm).openKey(null);
     Mockito.doNothing().when(mockKm).deleteKey(null);
     Mockito.doReturn(null).when(mockKm).lookupKey(null);
     Mockito.doReturn(null).when(mockKm).listKeys(null, null, null, null, 0);
@@ -192,7 +192,7 @@ public class TestKSMMetrcis {
     assertCounter("NumKeyLists", 1L, ksmMetrics);
 
     // inject exception to test for Failure Metrics
-    Mockito.doThrow(exception).when(mockKm).allocateKey(null);
+    Mockito.doThrow(exception).when(mockKm).openKey(null);
     Mockito.doThrow(exception).when(mockKm).deleteKey(null);
     Mockito.doThrow(exception).when(mockKm).lookupKey(null);
     Mockito.doThrow(exception).when(mockKm).listKeys(
@@ -284,7 +284,7 @@ public class TestKSMMetrcis {
    */
   private void doKeyOps() {
     try {
-      ksmManager.allocateKey(null);
+      ksmManager.openKey(null);
     } catch (IOException ignored) {
     }
 

+ 5 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKSMSQLCli.java

@@ -232,14 +232,15 @@ public class TestKSMSQLCli {
     sql = "SELECT * FROM keyInfo";
     rs = executeQuery(conn, sql);
     HashMap<String, List<String>> expectedMap2 = new HashMap<>();
+    // no data written, data size will be 0
     expectedMap2.put(keyName0,
-        Arrays.asList(volumeName0, bucketName0, Integer.toString(100)));
+        Arrays.asList(volumeName0, bucketName0, "0"));
     expectedMap2.put(keyName1,
-        Arrays.asList(volumeName1, bucketName1, Integer.toString(200)));
+        Arrays.asList(volumeName1, bucketName1, "0"));
     expectedMap2.put(keyName2,
-        Arrays.asList(volumeName0, bucketName2, Integer.toString(300)));
+        Arrays.asList(volumeName0, bucketName2, "0"));
     expectedMap2.put(keyName3,
-        Arrays.asList(volumeName0, bucketName2, Integer.toString(400)));
+        Arrays.asList(volumeName0, bucketName2, "0"));
     while (rs.next()) {
       String volumeName = rs.getString("volumeName");
       String bucketName = rs.getString("bucketName");

+ 52 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java

@@ -990,6 +990,57 @@ public class TestKeySpaceManager {
     Assert.assertTrue((OzoneUtils.formatDate(keyInfo.getModifiedOn())
         / 1000) >= (currentTime / 1000));
     Assert.assertEquals(keyName, keyInfo.getKeyName());
-    Assert.assertEquals(4096, keyInfo.getSize());
+    // with out data written, the size would be 0
+    Assert.assertEquals(0, keyInfo.getSize());
+  }
+
+  /**
+   * Test that the write can proceed without having to set the right size.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testWriteSize() throws IOException, OzoneException {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
+    bucketArgs.setAddAcls(new LinkedList<>());
+    bucketArgs.setRemoveAcls(new LinkedList<>());
+    bucketArgs.setStorageType(StorageType.DISK);
+    storageHandler.createBucket(bucketArgs);
+
+    String dataString = RandomStringUtils.randomAscii(100);
+    // write a key without specifying size at all
+    String keyName = "testKey";
+    KeyArgs keyArgs = new KeyArgs(keyName, bucketArgs);
+    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
+      stream.write(dataString.getBytes());
+    }
+    byte[] data = new byte[dataString.length()];
+    try (InputStream in = storageHandler.newKeyReader(keyArgs)) {
+      in.read(data);
+    }
+    Assert.assertEquals(dataString, DFSUtil.bytes2String(data));
+
+    // write a key with a size, but write above it.
+    String keyName1 = "testKey1";
+    KeyArgs keyArgs1 = new KeyArgs(keyName1, bucketArgs);
+    keyArgs1.setSize(30);
+    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs1)) {
+      stream.write(dataString.getBytes());
+    }
+    byte[] data1 = new byte[dataString.length()];
+    try (InputStream in = storageHandler.newKeyReader(keyArgs1)) {
+      in.read(data1);
+    }
+    Assert.assertEquals(dataString, DFSUtil.bytes2String(data1));
   }
 }

+ 10 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestMultipleContainerReadWrite.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.apache.hadoop.scm.ScmConfigKeys;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -70,7 +71,7 @@ public class TestMultipleContainerReadWrite {
   public static void init() throws Exception {
     conf = new OzoneConfiguration();
     // set to as small as 100 bytes per block.
-    conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, 100);
+    conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, 1);
     conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 5);
     conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
         OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
@@ -110,9 +111,9 @@ public class TestMultipleContainerReadWrite {
     bucketArgs.setStorageType(StorageType.DISK);
     storageHandler.createBucket(bucketArgs);
 
-    String dataString = RandomStringUtils.randomAscii(500);
+    String dataString = RandomStringUtils.randomAscii(3 * (int)OzoneConsts.MB);
     KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
-    keyArgs.setSize(500);
+    keyArgs.setSize(3 * (int)OzoneConsts.MB);
 
     try (OutputStream outputStream = storageHandler.newKeyWriter(keyArgs)) {
       outputStream.write(dataString.getBytes());
@@ -126,10 +127,14 @@ public class TestMultipleContainerReadWrite {
     // checking whether container meta data has the chunk file persisted.
     MetricsRecordBuilder containerMetrics = getMetrics(
         "StorageContainerMetrics");
-    assertCounter("numWriteChunk", 5L, containerMetrics);
-    assertCounter("numReadChunk", 5L, containerMetrics);
+    assertCounter("numWriteChunk", 3L, containerMetrics);
+    assertCounter("numReadChunk", 3L, containerMetrics);
   }
 
+  // Disable this test, because this tests assumes writing beyond a specific
+  // size is not allowed. Which is not true for now. Keeping this test in case
+  // we add this restrict in the future.
+  @Ignore
   @Test
   public void testErrorWrite() throws Exception {
     String userName = "user" + RandomStringUtils.randomNumeric(5);