ソースを参照

HDFS-12000. Ozone: Container : Add key versioning support-1. Contributed by Chen Liang.

Xiaoyu Yao 7 年 前
コミット
43a133486d
17 ファイル変更582 行追加74 行削除
  1. 2 5
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
  2. 31 6
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
  3. 3 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
  4. 80 15
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyInfo.java
  5. 17 22
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyLocationInfo.java
  6. 118 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyLocationInfoGroup.java
  7. 10 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/OpenKeySession.java
  8. 1 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java
  9. 11 2
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
  10. 9 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java
  11. 34 15
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
  12. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java
  13. 3 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
  14. 4 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
  15. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
  16. 254 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKsmBlockVersioning.java
  17. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java

+ 2 - 5
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java

@@ -18,7 +18,6 @@
 package org.apache.hadoop.ozone.client.io;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
@@ -166,13 +165,11 @@ public class ChunkGroupInputStream extends InputStream {
       StorageContainerLocationProtocolClientSideTranslatorPB
           storageContainerLocationClient, String requestId)
       throws IOException {
-    int index = 0;
     long length = 0;
     String containerKey;
     ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream();
-    for (KsmKeyLocationInfo ksmKeyLocationInfo : keyInfo.getKeyLocationList()) {
-      // check index as sanity check
-      Preconditions.checkArgument(index++ == ksmKeyLocationInfo.getIndex());
+    for (KsmKeyLocationInfo ksmKeyLocationInfo :
+        keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly()) {
       String containerName = ksmKeyLocationInfo.getContainerName();
       Pipeline pipeline =
           storageContainerLocationClient.getContainer(containerName);

+ 31 - 6
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.client.io;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
@@ -43,6 +44,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
+import java.util.List;
 
 /**
  * Maintaining a list of ChunkInputStream. Write based on offset.
@@ -98,6 +100,11 @@ public class ChunkGroupOutputStream extends OutputStream {
     streamEntries.add(new ChunkOutputStreamEntry(outputStream, length));
   }
 
+  @VisibleForTesting
+  public List<ChunkOutputStreamEntry> getStreamEntries() {
+    return streamEntries;
+  }
+
   public ChunkGroupOutputStream(
       OpenKeySession handler, XceiverClientManager xceiverClientManager,
       StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
@@ -122,12 +129,31 @@ public class ChunkGroupOutputStream extends OutputStream {
     this.chunkSize = chunkSize;
     this.requestID = requestId;
     LOG.debug("Expecting open key with one block, but got" +
-        info.getKeyLocationList().size());
+        info.getKeyLocationVersions().size());
+  }
+
+  /**
+   * When a key is opened, it is possible that there are some blocks already
+   * allocated to it for this open session. In this case, to make use of these
+   * blocks, we need to add these blocks to stream entries. But, a key's version
+   * also includes blocks from previous versions, we need to avoid adding these
+   * old blocks to stream entries, because these old blocks should not be picked
+   * for write. To do this, the following method checks that, only those
+   * blocks created in this particular open version are added to stream entries.
+   *
+   * @param version the set of blocks that are pre-allocated.
+   * @param openVersion the version corresponding to the pre-allocation.
+   * @throws IOException
+   */
+  public void addPreallocateBlocks(KsmKeyLocationInfoGroup version,
+      long openVersion) throws IOException {
     // server may return any number of blocks, (0 to any)
-    int idx = 0;
-    for (KsmKeyLocationInfo subKeyInfo : info.getKeyLocationList()) {
-      subKeyInfo.setIndex(idx++);
-      checkKeyLocationInfo(subKeyInfo);
+    // only the blocks allocated in this open session (block createVersion
+    // equals to open session version)
+    for (KsmKeyLocationInfo subKeyInfo : version.getLocationList()) {
+      if (subKeyInfo.getCreateVersion() == openVersion) {
+        checkKeyLocationInfo(subKeyInfo);
+      }
     }
   }
 
@@ -255,7 +281,6 @@ public class ChunkGroupOutputStream extends OutputStream {
    */
   private void allocateNewBlock(int index) throws IOException {
     KsmKeyLocationInfo subKeyInfo = ksmClient.allocateBlock(keyArgs, openID);
-    subKeyInfo.setIndex(index);
     checkKeyLocationInfo(subKeyInfo);
   }
 

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java

@@ -464,6 +464,9 @@ public class RpcClient implements ClientProtocol {
             .setType(OzoneProtos.ReplicationType.valueOf(type.toString()))
             .setFactor(OzoneProtos.ReplicationFactor.valueOf(factor.getValue()))
             .build();
+    groupOutputStream.addPreallocateBlocks(
+        openKey.getKeyInfo().getLatestVersionLocations(),
+        openKey.getOpenVersion());
     return new OzoneOutputStream(groupOutputStream);
   }
 

+ 80 - 15
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyInfo.java

@@ -17,9 +17,11 @@
  */
 package org.apache.hadoop.ozone.ksm.helpers;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
 import org.apache.hadoop.util.Time;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -34,18 +36,29 @@ public final class KsmKeyInfo {
   // name of key client specified
   private final String keyName;
   private long dataSize;
-  private List<KsmKeyLocationInfo> keyLocationList;
+  private List<KsmKeyLocationInfoGroup> keyLocationVersions;
   private final long creationTime;
   private long modificationTime;
 
   private KsmKeyInfo(String volumeName, String bucketName, String keyName,
-      List<KsmKeyLocationInfo> locationInfos, long dataSize, long creationTime,
-      long modificationTime) {
+      List<KsmKeyLocationInfoGroup> versions, long dataSize,
+      long creationTime, long modificationTime) {
     this.volumeName = volumeName;
     this.bucketName = bucketName;
     this.keyName = keyName;
     this.dataSize = dataSize;
-    this.keyLocationList = locationInfos;
+    // it is important that the versions are ordered from old to new.
+    // Do this sanity check when versions got loaded on creating KsmKeyInfo.
+    // TODO : this is not necessary, here only because versioning is still a
+    // work in-progress, remove this following check when versioning is
+    // complete and prove correctly functioning
+    long currentVersion = -1;
+    for (KsmKeyLocationInfoGroup version : versions) {
+      Preconditions.checkArgument(
+            currentVersion + 1 == version.getVersion());
+      currentVersion = version.getVersion();
+    }
+    this.keyLocationVersions = versions;
     this.creationTime = creationTime;
     this.modificationTime = modificationTime;
   }
@@ -70,16 +83,64 @@ public final class KsmKeyInfo {
     this.dataSize = size;
   }
 
-  public List<KsmKeyLocationInfo> getKeyLocationList() {
-    return keyLocationList;
+  public synchronized KsmKeyLocationInfoGroup getLatestVersionLocations()
+      throws IOException {
+    return keyLocationVersions.size() == 0? null :
+        keyLocationVersions.get(keyLocationVersions.size() - 1);
+  }
+
+  public List<KsmKeyLocationInfoGroup> getKeyLocationVersions() {
+    return keyLocationVersions;
   }
 
   public void updateModifcationTime() {
     this.modificationTime = Time.monotonicNow();
   }
 
-  public void appendKeyLocation(KsmKeyLocationInfo newLocation) {
-    keyLocationList.add(newLocation);
+  /**
+   * Append a set of blocks to the latest version. Note that these blocks are
+   * part of the latest version, not a new version.
+   *
+   * @param newLocationList the list of new blocks to be added.
+   * @throws IOException
+   */
+  public synchronized void appendNewBlocks(
+      List<KsmKeyLocationInfo> newLocationList) throws IOException {
+    if (keyLocationVersions.size() == 0) {
+      throw new IOException("Appending new block, but no version exist");
+    }
+    KsmKeyLocationInfoGroup currentLatestVersion =
+        keyLocationVersions.get(keyLocationVersions.size() - 1);
+    currentLatestVersion.appendNewBlocks(newLocationList);
+    setModificationTime(Time.now());
+  }
+
+  /**
+   * Add a new set of blocks. The new blocks will be added as appending a new
+   * version to the all version list.
+   *
+   * @param newLocationList the list of new blocks to be added.
+   * @throws IOException
+   */
+  public synchronized long addNewVersion(
+      List<KsmKeyLocationInfo> newLocationList) throws IOException {
+    long latestVersionNum;
+    if (keyLocationVersions.size() == 0) {
+      // no version exist, these blocks are the very first version.
+      keyLocationVersions.add(new KsmKeyLocationInfoGroup(0, newLocationList));
+      latestVersionNum = 0;
+    } else {
+      // it is important that the new version are always at the tail of the list
+      KsmKeyLocationInfoGroup currentLatestVersion =
+          keyLocationVersions.get(keyLocationVersions.size() - 1);
+      // the new version is created based on the current latest version
+      KsmKeyLocationInfoGroup newVersion =
+          currentLatestVersion.generateNextVersion(newLocationList);
+      keyLocationVersions.add(newVersion);
+      latestVersionNum = newVersion.getVersion();
+    }
+    setModificationTime(Time.now());
+    return latestVersionNum;
   }
 
   public long getCreationTime() {
@@ -102,7 +163,7 @@ public final class KsmKeyInfo {
     private String bucketName;
     private String keyName;
     private long dataSize;
-    private List<KsmKeyLocationInfo> ksmKeyLocationInfos;
+    private List<KsmKeyLocationInfoGroup> ksmKeyLocationInfoGroups;
     private long creationTime;
     private long modificationTime;
 
@@ -122,8 +183,8 @@ public final class KsmKeyInfo {
     }
 
     public Builder setKsmKeyLocationInfos(
-        List<KsmKeyLocationInfo> ksmKeyLocationInfoList) {
-      this.ksmKeyLocationInfos = ksmKeyLocationInfoList;
+        List<KsmKeyLocationInfoGroup> ksmKeyLocationInfoList) {
+      this.ksmKeyLocationInfoGroups = ksmKeyLocationInfoList;
       return this;
     }
 
@@ -144,19 +205,23 @@ public final class KsmKeyInfo {
 
     public KsmKeyInfo build() {
       return new KsmKeyInfo(
-          volumeName, bucketName, keyName, ksmKeyLocationInfos,
+          volumeName, bucketName, keyName, ksmKeyLocationInfoGroups,
           dataSize, creationTime, modificationTime);
     }
   }
 
   public KeyInfo getProtobuf() {
+    long latestVersion = keyLocationVersions.size() == 0 ? -1 :
+        keyLocationVersions.get(keyLocationVersions.size() - 1).getVersion();
     return KeyInfo.newBuilder()
         .setVolumeName(volumeName)
         .setBucketName(bucketName)
         .setKeyName(keyName)
         .setDataSize(dataSize)
-        .addAllKeyLocationList(keyLocationList.stream()
-            .map(KsmKeyLocationInfo::getProtobuf).collect(Collectors.toList()))
+        .addAllKeyLocationList(keyLocationVersions.stream()
+            .map(KsmKeyLocationInfoGroup::getProtobuf)
+            .collect(Collectors.toList()))
+        .setLatestVersion(latestVersion)
         .setCreationTime(creationTime)
         .setModificationTime(modificationTime)
         .build();
@@ -168,7 +233,7 @@ public final class KsmKeyInfo {
         keyInfo.getBucketName(),
         keyInfo.getKeyName(),
         keyInfo.getKeyLocationListList().stream()
-            .map(KsmKeyLocationInfo::getFromProtobuf)
+            .map(KsmKeyLocationInfoGroup::getFromProtobuf)
             .collect(Collectors.toList()),
         keyInfo.getDataSize(),
         keyInfo.getCreationTime(),

+ 17 - 22
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyLocationInfo.java

@@ -28,21 +28,29 @@ public final class KsmKeyLocationInfo {
   private final String blockID;
   private final boolean shouldCreateContainer;
   // the id of this subkey in all the subkeys.
-  private int index;
   private final long length;
   private final long offset;
+  // the version number indicating when this block was added
+  private long createVersion;
 
   private KsmKeyLocationInfo(String containerName,
-      String blockID, boolean shouldCreateContainer, int index,
+      String blockID, boolean shouldCreateContainer,
       long length, long offset) {
     this.containerName = containerName;
     this.blockID = blockID;
     this.shouldCreateContainer = shouldCreateContainer;
-    this.index = index;
     this.length = length;
     this.offset = offset;
   }
 
+  public void setCreateVersion(long version) {
+    createVersion = version;
+  }
+
+  public long getCreateVersion() {
+    return createVersion;
+  }
+
   public String getContainerName() {
     return containerName;
   }
@@ -55,14 +63,6 @@ public final class KsmKeyLocationInfo {
     return shouldCreateContainer;
   }
 
-  public int getIndex() {
-    return index;
-  }
-
-  public void setIndex(int idx) {
-    index = idx;
-  }
-
   public long getLength() {
     return length;
   }
@@ -78,10 +78,9 @@ public final class KsmKeyLocationInfo {
     private String containerName;
     private String blockID;
     private boolean shouldCreateContainer;
-    // the id of this subkey in all the subkeys.
-    private int index;
     private long length;
     private long offset;
+
     public Builder setContainerName(String container) {
       this.containerName = container;
       return this;
@@ -97,11 +96,6 @@ public final class KsmKeyLocationInfo {
       return this;
     }
 
-    public Builder setIndex(int id) {
-      this.index = id;
-      return this;
-    }
-
     public Builder setLength(long len) {
       this.length = len;
       return this;
@@ -114,7 +108,7 @@ public final class KsmKeyLocationInfo {
 
     public KsmKeyLocationInfo build() {
       return new KsmKeyLocationInfo(containerName, blockID,
-          shouldCreateContainer, index, length, offset);
+          shouldCreateContainer, length, offset);
     }
   }
 
@@ -123,19 +117,20 @@ public final class KsmKeyLocationInfo {
         .setContainerName(containerName)
         .setBlockID(blockID)
         .setShouldCreateContainer(shouldCreateContainer)
-        .setIndex(index)
         .setLength(length)
         .setOffset(offset)
+        .setCreateVersion(createVersion)
         .build();
   }
 
   public static KsmKeyLocationInfo getFromProtobuf(KeyLocation keyLocation) {
-    return new KsmKeyLocationInfo(
+    KsmKeyLocationInfo info = new KsmKeyLocationInfo(
         keyLocation.getContainerName(),
         keyLocation.getBlockID(),
         keyLocation.getShouldCreateContainer(),
-        keyLocation.getIndex(),
         keyLocation.getLength(),
         keyLocation.getOffset());
+    info.setCreateVersion(keyLocation.getCreateVersion());
+    return info;
   }
 }

+ 118 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyLocationInfoGroup.java

@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.ksm.helpers;
+
+import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyLocationList;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * A list of key locations. This class represents one single version of the
+ * blocks of a key.
+ */
+public class KsmKeyLocationInfoGroup {
+  private final long version;
+  private final List<KsmKeyLocationInfo> locationList;
+
+  public KsmKeyLocationInfoGroup(long version,
+      List<KsmKeyLocationInfo> locations) {
+    this.version = version;
+    this.locationList = locations;
+  }
+
+  /**
+   * Return only the blocks that are created in the most recent version.
+   *
+   * @return the list of blocks that are created in the latest version.
+   */
+  public List<KsmKeyLocationInfo> getBlocksLatestVersionOnly() {
+    List<KsmKeyLocationInfo> list = new ArrayList<>();
+    locationList.stream().filter(x -> x.getCreateVersion() == version)
+        .forEach(list::add);
+    return list;
+  }
+
+  public long getVersion() {
+    return version;
+  }
+
+  public List<KsmKeyLocationInfo> getLocationList() {
+    return locationList;
+  }
+
+  public KeyLocationList getProtobuf() {
+    return KeyLocationList.newBuilder()
+        .setVersion(version)
+        .addAllKeyLocations(
+            locationList.stream().map(KsmKeyLocationInfo::getProtobuf)
+                .collect(Collectors.toList()))
+        .build();
+  }
+
+  public static KsmKeyLocationInfoGroup getFromProtobuf(
+      KeyLocationList keyLocationList) {
+    return new KsmKeyLocationInfoGroup(
+        keyLocationList.getVersion(),
+        keyLocationList.getKeyLocationsList().stream()
+            .map(KsmKeyLocationInfo::getFromProtobuf)
+            .collect(Collectors.toList()));
+  }
+
+  /**
+   * Given a new block location, generate a new version list based upon this
+   * one.
+   *
+   * @param newLocationList a list of new location to be added.
+   * @return
+   */
+  KsmKeyLocationInfoGroup generateNextVersion(
+      List<KsmKeyLocationInfo> newLocationList) throws IOException {
+    // TODO : revisit if we can do this method more efficiently
+    // one potential inefficiency here is that later version always include
+    // older ones. e.g. v1 has B1, then v2, v3...will all have B1 and only add
+    // more
+    List<KsmKeyLocationInfo> newList = new ArrayList<>();
+    newList.addAll(locationList);
+    for (KsmKeyLocationInfo newInfo : newLocationList) {
+      // all these new blocks will have addVersion of current version + 1
+      newInfo.setCreateVersion(version + 1);
+      newList.add(newInfo);
+    }
+    return new KsmKeyLocationInfoGroup(version + 1, newList);
+  }
+
+  void appendNewBlocks(List<KsmKeyLocationInfo> newLocationList)
+      throws IOException {
+    for (KsmKeyLocationInfo info : newLocationList) {
+      info.setCreateVersion(version);
+      locationList.add(info);
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("version:").append(version).append(" ");
+    for (KsmKeyLocationInfo kli : locationList) {
+      sb.append(kli.getBlockID()).append(" || ");
+    }
+    return sb.toString();
+  }
+}

+ 10 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/OpenKeySession.java

@@ -25,10 +25,19 @@ package org.apache.hadoop.ozone.ksm.helpers;
 public class OpenKeySession {
   private final int id;
   private final KsmKeyInfo keyInfo;
+  // the version of the key when it is being opened in this session.
+  // a block that has a create version equals to open version means it will
+  // be committed only when this open session is closed.
+  private long openVersion;
 
-  public OpenKeySession(int id, KsmKeyInfo info) {
+  public OpenKeySession(int id, KsmKeyInfo info, long version) {
     this.id = id;
     this.keyInfo = info;
+    this.openVersion = version;
+  }
+
+  public long getOpenVersion() {
+    return this.openVersion;
   }
 
   public KsmKeyInfo getKeyInfo() {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java

@@ -538,7 +538,7 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB
       throw new IOException("Create key failed, error:" + resp.getStatus());
     }
     return new OpenKeySession(resp.getID(),
-        KsmKeyInfo.getFromProtobuf(resp.getKeyInfo()));
+        KsmKeyInfo.getFromProtobuf(resp.getKeyInfo()), resp.getOpenVersion());
   }
 
   @Override

+ 11 - 2
hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto

@@ -235,7 +235,13 @@ message KeyLocation {
     required bool shouldCreateContainer = 3;
     required uint64 offset = 4;
     required uint64 length = 5;
-    required uint32 index = 6;
+    // indicated at which version this block gets created.
+    optional uint64 createVersion = 6;
+}
+
+message KeyLocationList {
+    optional uint64 version = 1;
+    repeated KeyLocation keyLocations = 2;
 }
 
 message KeyInfo {
@@ -243,9 +249,10 @@ message KeyInfo {
     required string bucketName = 2;
     required string keyName = 3;
     required uint64 dataSize = 4;
-    repeated KeyLocation keyLocationList = 5;
+    repeated KeyLocationList keyLocationList = 5;
     required uint64 creationTime = 6;
     required uint64 modificationTime = 7;
+    optional uint64 latestVersion = 8;
 }
 
 message LocateKeyRequest {
@@ -258,6 +265,8 @@ message LocateKeyResponse {
     // clients' followup request may carry this ID for stateful operations (similar
     // to a cookie).
     optional uint32 ID = 3;
+    // TODO : allow specifiying a particular version to read.
+    optional uint64 openVersion = 4;
 }
 
 message SetBucketPropertyRequest {

+ 9 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java

@@ -23,6 +23,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
 import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
 import org.apache.hadoop.ozone.common.BlockGroup;
@@ -45,6 +46,7 @@ import org.apache.hadoop.utils.MetadataStoreBuilder;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Map;
@@ -473,7 +475,11 @@ public class KSMMetadataManagerImpl implements KSMMetadataManager {
       KsmKeyInfo info =
           KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue()));
       // Get block keys as a list.
-      List<String> item = info.getKeyLocationList().stream()
+      KsmKeyLocationInfoGroup latest = info.getLatestVersionLocations();
+      if (latest == null) {
+        return Collections.emptyList();
+      }
+      List<String> item = latest.getLocationList().stream()
           .map(KsmKeyLocationInfo::getBlockID)
           .collect(Collectors.toList());
       BlockGroup keyBlocks = BlockGroup.newBuilder()
@@ -503,7 +509,8 @@ public class KSMMetadataManagerImpl implements KSMMetadataManager {
         continue;
       }
       // Get block keys as a list.
-      List<String> item = info.getKeyLocationList().stream()
+      List<String> item = info.getLatestVersionLocations()
+          .getBlocksLatestVersionOnly().stream()
           .map(KsmKeyLocationInfo::getBlockID)
           .collect(Collectors.toList());
       BlockGroup keyBlocks = BlockGroup.newBuilder()

+ 34 - 15
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
 import org.apache.hadoop.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
 import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.KeyInfo;
@@ -40,6 +41,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
@@ -195,9 +197,10 @@ public class KeyManagerImpl implements KeyManager {
           .setShouldCreateContainer(allocatedBlock.getCreateContainer())
           .setLength(scmBlockSize)
           .setOffset(0)
-          .setIndex(keyInfo.getKeyLocationList().size())
           .build();
-      keyInfo.appendKeyLocation(info);
+      // current version not committed, so new blocks coming now are added to
+      // the same version
+      keyInfo.appendNewBlocks(Collections.singletonList(info));
       keyInfo.updateModifcationTime();
       metadataManager.put(openKey, keyInfo.getProtobuf().toByteArray());
       return info;
@@ -237,7 +240,6 @@ public class KeyManagerImpl implements KeyManager {
       // the point, if client needs more blocks, client can always call
       // allocateBlock. But if requested size is not 0, KSM will preallocate
       // some blocks and piggyback to client, to save RPC calls.
-      int idx = 0;
       while (requestedSize > 0) {
         long allocateSize = Math.min(scmBlockSize, requestedSize);
         AllocatedBlock allocatedBlock =
@@ -246,28 +248,45 @@ public class KeyManagerImpl implements KeyManager {
             .setContainerName(allocatedBlock.getPipeline().getContainerName())
             .setBlockID(allocatedBlock.getKey())
             .setShouldCreateContainer(allocatedBlock.getCreateContainer())
-            .setIndex(idx++)
             .setLength(allocateSize)
             .setOffset(0)
             .build();
         locations.add(subKeyInfo);
         requestedSize -= allocateSize;
       }
-      long currentTime = Time.now();
       // NOTE size of a key is not a hard limit on anything, it is a value that
       // client should expect, in terms of current size of key. If client sets a
       // value, then this value is used, otherwise, we allocate a single block
       // which is the current size, if read by the client.
       long size = args.getDataSize() >= 0 ? args.getDataSize() : scmBlockSize;
-      KsmKeyInfo keyInfo = new KsmKeyInfo.Builder()
-          .setVolumeName(args.getVolumeName())
-          .setBucketName(args.getBucketName())
-          .setKeyName(args.getKeyName())
-          .setKsmKeyLocationInfos(locations)
-          .setCreationTime(currentTime)
-          .setModificationTime(currentTime)
-          .setDataSize(size)
-          .build();
+      byte[] keyKey = metadataManager.getDBKeyBytes(
+          volumeName, bucketName, keyName);
+      byte[] value = metadataManager.get(keyKey);
+      KsmKeyInfo keyInfo;
+      long openVersion;
+      if (value != null) {
+        // the key already exist, the new blocks will be added as new version
+        keyInfo = KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(value));
+        // when locations.size = 0, the new version will have identical blocks
+        // as its previous version
+        openVersion = keyInfo.addNewVersion(locations);
+        keyInfo.setDataSize(size + keyInfo.getDataSize());
+      } else {
+        // the key does not exist, create a new object, the new blocks are the
+        // version 0
+        long currentTime = Time.now();
+        keyInfo = new KsmKeyInfo.Builder()
+            .setVolumeName(args.getVolumeName())
+            .setBucketName(args.getBucketName())
+            .setKeyName(args.getKeyName())
+            .setKsmKeyLocationInfos(Collections.singletonList(
+                new KsmKeyLocationInfoGroup(0, locations)))
+            .setCreationTime(currentTime)
+            .setModificationTime(currentTime)
+            .setDataSize(size)
+            .build();
+        openVersion = 0;
+      }
       // Generate a random ID which is not already in meta db.
       int id = -1;
       // in general this should finish in a couple times at most. putting some
@@ -285,7 +304,7 @@ public class KeyManagerImpl implements KeyManager {
       }
       LOG.debug("Key {} allocated in volume {} bucket {}",
           keyName, volumeName, bucketName);
-      return new OpenKeySession(id, keyInfo);
+      return new OpenKeySession(id, keyInfo, openVersion);
     } catch (KSMException e) {
       throw e;
     } catch (IOException ex) {

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java

@@ -334,6 +334,7 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements
       OpenKeySession openKey = impl.openKey(ksmKeyArgs);
       resp.setKeyInfo(openKey.getKeyInfo().getProtobuf());
       resp.setID(openKey.getId());
+      resp.setOpenVersion(openKey.getOpenVersion());
       resp.setStatus(Status.OK);
     } catch (IOException e) {
       resp.setStatus(exceptionToResponseStatus(e));

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java

@@ -419,6 +419,9 @@ public final class DistributedStorageHandler implements StorageHandler {
             .setType(xceiverClientManager.getType())
             .setFactor(xceiverClientManager.getFactor())
             .build();
+    groupOutputStream.addPreallocateBlocks(
+        openKey.getKeyInfo().getLatestVersionLocations(),
+        openKey.getOpenVersion());
     return new OzoneOutputStream(groupOutputStream);
   }
 

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java

@@ -323,7 +323,7 @@ public class TestStorageContainerManager {
     // on datanodes.
     Set<String> containerNames = new HashSet<>();
     for (Map.Entry<String, KsmKeyInfo> entry : keyLocations.entrySet()) {
-      entry.getValue().getKeyLocationList()
+      entry.getValue().getLatestVersionLocations().getLocationList()
           .forEach(loc -> containerNames.add(loc.getContainerName()));
     }
 
@@ -331,7 +331,7 @@ public class TestStorageContainerManager {
     // total number of containerBlocks via creation call.
     int totalCreatedBlocks = 0;
     for (KsmKeyInfo info : keyLocations.values()) {
-      totalCreatedBlocks += info.getKeyLocationList().size();
+      totalCreatedBlocks += info.getKeyLocationVersions().size();
     }
     Assert.assertTrue(totalCreatedBlocks > 0);
     Assert.assertEquals(totalCreatedBlocks,
@@ -340,7 +340,8 @@ public class TestStorageContainerManager {
     // Create a deletion TX for each key.
     Map<String, List<String>> containerBlocks = Maps.newHashMap();
     for (KsmKeyInfo info : keyLocations.values()) {
-      List<KsmKeyLocationInfo> list = info.getKeyLocationList();
+      List<KsmKeyLocationInfo> list =
+          info.getLatestVersionLocations().getLocationList();
       list.forEach(location -> {
         if (containerBlocks.containsKey(location.getContainerName())) {
           containerBlocks.get(location.getContainerName())

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java

@@ -387,7 +387,8 @@ public class TestOzoneRpcClient {
     OzoneProtos.ReplicationFactor replicationFactor =
         OzoneProtos.ReplicationFactor.valueOf(factor.getValue());
     KsmKeyInfo keyInfo = keySpaceManager.lookupKey(keyArgs);
-    for (KsmKeyLocationInfo info: keyInfo.getKeyLocationList()) {
+    for (KsmKeyLocationInfo info:
+        keyInfo.getLatestVersionLocations().getLocationList()) {
       Pipeline pipeline =
           storageContainerLocationClient.getContainer(info.getContainerName());
       if ((pipeline.getFactor() != replicationFactor) ||

+ 254 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKsmBlockVersioning.java

@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.ksm;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.OzoneConfiguration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
+import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.web.handlers.BucketArgs;
+import org.apache.hadoop.ozone.web.handlers.KeyArgs;
+import org.apache.hadoop.ozone.web.handlers.UserArgs;
+import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
+import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This class tests the versioning of blocks from KSM side.
+ */
+public class TestKsmBlockVersioning {
+  private static MiniOzoneCluster cluster = null;
+  private static UserArgs userArgs;
+  private static OzoneConfiguration conf;
+  private static KeySpaceManager keySpaceManager;
+  private static StorageHandler storageHandler;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true and
+   * OZONE_HANDLER_TYPE_KEY = "distributed"
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
+        OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
+    cluster = new MiniOzoneClassicCluster.Builder(conf)
+        .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
+    storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
+    userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
+        null, null, null, null);
+    keySpaceManager = cluster.getKeySpaceManager();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testAllocateCommit() throws Exception {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+    String keyName = "key" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
+    bucketArgs.setAddAcls(new LinkedList<>());
+    bucketArgs.setRemoveAcls(new LinkedList<>());
+    bucketArgs.setStorageType(StorageType.DISK);
+    storageHandler.createBucket(bucketArgs);
+
+    KsmKeyArgs keyArgs = new KsmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setDataSize(1000)
+        .build();
+
+    // 1st update, version 0
+    OpenKeySession openKey = keySpaceManager.openKey(keyArgs);
+    keySpaceManager.commitKey(keyArgs, openKey.getId());
+
+    KsmKeyInfo keyInfo = keySpaceManager.lookupKey(keyArgs);
+    KsmKeyLocationInfoGroup highestVersion =
+        checkVersions(keyInfo.getKeyLocationVersions());
+    assertEquals(0, highestVersion.getVersion());
+    assertEquals(1, highestVersion.getLocationList().size());
+
+    // 2nd update, version 1
+    openKey = keySpaceManager.openKey(keyArgs);
+    //KsmKeyLocationInfo locationInfo =
+    //    keySpaceManager.allocateBlock(keyArgs, openKey.getId());
+    keySpaceManager.commitKey(keyArgs, openKey.getId());
+
+    keyInfo = keySpaceManager.lookupKey(keyArgs);
+    highestVersion = checkVersions(keyInfo.getKeyLocationVersions());
+    assertEquals(1, highestVersion.getVersion());
+    assertEquals(2, highestVersion.getLocationList().size());
+
+    // 3rd update, version 2
+    openKey = keySpaceManager.openKey(keyArgs);
+    // this block will be appended to the latest version of version 2.
+    keySpaceManager.allocateBlock(keyArgs, openKey.getId());
+    keySpaceManager.commitKey(keyArgs, openKey.getId());
+
+    keyInfo = keySpaceManager.lookupKey(keyArgs);
+    highestVersion = checkVersions(keyInfo.getKeyLocationVersions());
+    assertEquals(2, highestVersion.getVersion());
+    assertEquals(4, highestVersion.getLocationList().size());
+  }
+
+  private KsmKeyLocationInfoGroup checkVersions(
+      List<KsmKeyLocationInfoGroup> versions) {
+    KsmKeyLocationInfoGroup currentVersion = null;
+    for (KsmKeyLocationInfoGroup version : versions) {
+      if (currentVersion != null) {
+        assertEquals(currentVersion.getVersion() + 1, version.getVersion());
+        for (KsmKeyLocationInfo info : currentVersion.getLocationList()) {
+          boolean found = false;
+          // all the blocks from the previous version must present in the next
+          // version
+          for (KsmKeyLocationInfo info2 : version.getLocationList()) {
+            if (info.getBlockID().equals(info2.getBlockID())) {
+              found = true;
+              break;
+            }
+          }
+          assertTrue(found);
+        }
+      }
+      currentVersion = version;
+    }
+    return currentVersion;
+  }
+
+  @Test
+  public void testReadLatestVersion() throws Exception {
+
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+    String keyName = "key" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
+    bucketArgs.setAddAcls(new LinkedList<>());
+    bucketArgs.setRemoveAcls(new LinkedList<>());
+    bucketArgs.setStorageType(StorageType.DISK);
+    storageHandler.createBucket(bucketArgs);
+
+    KsmKeyArgs ksmKeyArgs = new KsmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setDataSize(1000)
+        .build();
+
+    String dataString = RandomStringUtils.randomAlphabetic(100);
+    KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
+    // this write will create 1st version with one block
+    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
+      stream.write(dataString.getBytes());
+    }
+    byte[] data = new byte[dataString.length()];
+    try (InputStream in = storageHandler.newKeyReader(keyArgs)) {
+      in.read(data);
+    }
+    KsmKeyInfo keyInfo = keySpaceManager.lookupKey(ksmKeyArgs);
+    assertEquals(dataString, DFSUtil.bytes2String(data));
+    assertEquals(0, keyInfo.getLatestVersionLocations().getVersion());
+    assertEquals(1,
+        keyInfo.getLatestVersionLocations().getLocationList().size());
+
+    // this write will create 2nd version, 2nd version will contain block from
+    // version 1, and add a new block
+    dataString = RandomStringUtils.randomAlphabetic(10);
+    data = new byte[dataString.length()];
+    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
+      stream.write(dataString.getBytes());
+    }
+    try (InputStream in = storageHandler.newKeyReader(keyArgs)) {
+      in.read(data);
+    }
+    keyInfo = keySpaceManager.lookupKey(ksmKeyArgs);
+    assertEquals(dataString, DFSUtil.bytes2String(data));
+    assertEquals(1, keyInfo.getLatestVersionLocations().getVersion());
+    assertEquals(2,
+        keyInfo.getLatestVersionLocations().getLocationList().size());
+
+    dataString = RandomStringUtils.randomAlphabetic(200);
+    data = new byte[dataString.length()];
+    try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
+      stream.write(dataString.getBytes());
+    }
+    try (InputStream in = storageHandler.newKeyReader(keyArgs)) {
+      in.read(data);
+    }
+    keyInfo = keySpaceManager.lookupKey(ksmKeyArgs);
+    assertEquals(dataString, DFSUtil.bytes2String(data));
+    assertEquals(2, keyInfo.getLatestVersionLocations().getVersion());
+    assertEquals(3,
+        keyInfo.getLatestVersionLocations().getLocationList().size());
+  }
+}

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java

@@ -618,7 +618,8 @@ public class TestKeys {
       // Memorize chunks that has been created,
       // so we can verify actual deletions at DN side later.
       for (KsmKeyInfo keyInfo : createdKeys) {
-        List<KsmKeyLocationInfo> locations = keyInfo.getKeyLocationList();
+        List<KsmKeyLocationInfo> locations =
+            keyInfo.getLatestVersionLocations().getLocationList();
         for (KsmKeyLocationInfo location : locations) {
           String containerName = location.getContainerName();
           KeyData keyData = new KeyData(containerName, location.getBlockID());