Explorar el Código

HDDS-793. Support custom key/value annotations on volume/bucket/key. Contributed by Elek, Marton.

Xiaoyu Yao hace 6 años
padre
commit
9fc7df8afb
Se han modificado 42 ficheros con 681 adiciones y 153 borrados
  1. 28 2
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/BucketArgs.java
  2. 12 5
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
  3. 9 1
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKeyDetails.java
  4. 17 2
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneVolume.java
  5. 25 3
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/VolumeArgs.java
  6. 3 1
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
  7. 9 4
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
  8. 22 13
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
  9. 54 0
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/KeyValueUtil.java
  10. 14 4
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketArgs.java
  11. 52 6
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
  12. 26 2
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
  13. 57 8
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
  14. 33 35
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeArgs.java
  15. 44 0
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/WithMetadata.java
  16. 6 0
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
  17. 6 0
      hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
  18. 46 0
      hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmBucketInfo.java
  19. 52 0
      hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java
  20. 21 0
      hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/package-info.java
  21. 5 4
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rest/TestOzoneRestClient.java
  22. 2 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java
  23. 2 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
  24. 2 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
  25. 60 17
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
  26. 3 2
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
  27. 26 19
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
  28. 2 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
  29. 4 3
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
  30. 2 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
  31. 2 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestContainerReportWithKeys.java
  32. 3 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/TestOzoneRestWithMiniCluster.java
  33. 8 5
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
  34. 2 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
  35. 1 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
  36. 2 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
  37. 2 1
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/web/ozShell/keys/PutKeyHandler.java
  38. 4 2
      hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
  39. 3 2
      hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
  40. 6 3
      hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
  41. 2 1
      hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectHead.java
  42. 2 1
      hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java

+ 28 - 2
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/BucketArgs.java

@@ -21,7 +21,9 @@ package org.apache.hadoop.ozone.client;
 import org.apache.hadoop.hdds.protocol.StorageType;
 import org.apache.hadoop.ozone.OzoneAcl;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * This class encapsulates the arguments that are
@@ -43,6 +45,11 @@ public final class BucketArgs {
    */
   private StorageType storageType;
 
+  /**
+   * Custom key/value metadata.
+   */
+  private Map<String, String> metadata;
+
   /**
    * Private constructor, constructed via builder.
    * @param versioning Bucket version flag.
@@ -50,10 +57,11 @@ public final class BucketArgs {
    * @param acls list of ACLs.
    */
   private BucketArgs(Boolean versioning, StorageType storageType,
-                     List<OzoneAcl> acls) {
+                     List<OzoneAcl> acls, Map<String, String> metadata) {
     this.acls = acls;
     this.versioning = versioning;
     this.storageType = storageType;
+    this.metadata = metadata;
   }
 
   /**
@@ -80,6 +88,15 @@ public final class BucketArgs {
     return acls;
   }
 
+  /**
+   * Custom metadata for the buckets.
+   *
+   * @return key value map
+   */
+  public Map<String, String> getMetadata() {
+    return metadata;
+  }
+
   /**
    * Returns new builder class that builds a OmBucketInfo.
    *
@@ -96,6 +113,11 @@ public final class BucketArgs {
     private Boolean versioning;
     private StorageType storageType;
     private List<OzoneAcl> acls;
+    private Map<String, String> metadata;
+
+    public Builder() {
+      metadata = new HashMap<>();
+    }
 
     public BucketArgs.Builder setVersioning(Boolean versionFlag) {
       this.versioning = versionFlag;
@@ -112,12 +134,16 @@ public final class BucketArgs {
       return this;
     }
 
+    public BucketArgs.Builder addMetadata(String key, String value) {
+      this.metadata.put(key, value);
+      return this;
+    }
     /**
      * Constructs the BucketArgs.
      * @return instance of BucketArgs.
      */
     public BucketArgs build() {
-      return new BucketArgs(versioning, storageType, acls);
+      return new BucketArgs(versioning, storageType, acls, metadata);
     }
   }
 }

+ 12 - 5
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java

@@ -32,8 +32,10 @@ import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
+import org.apache.hadoop.ozone.om.helpers.WithMetadata;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -42,7 +44,7 @@ import java.util.NoSuchElementException;
 /**
  * A class that encapsulates OzoneBucket.
  */
-public class OzoneBucket {
+public class OzoneBucket extends WithMetadata {
 
   /**
    * The proxy used for connecting to the cluster and perform
@@ -107,7 +109,8 @@ public class OzoneBucket {
   public OzoneBucket(Configuration conf, ClientProtocol proxy,
                      String volumeName, String bucketName,
                      List<OzoneAcl> acls, StorageType storageType,
-                     Boolean versioning, long creationTime) {
+                     Boolean versioning, long creationTime,
+                     Map<String, String> metadata) {
     Preconditions.checkNotNull(proxy, "Client proxy is not set.");
     this.proxy = proxy;
     this.volumeName = volumeName;
@@ -123,6 +126,7 @@ public class OzoneBucket {
     this.defaultReplicationType = ReplicationType.valueOf(conf.get(
         OzoneConfigKeys.OZONE_REPLICATION_TYPE,
         OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT));
+    this.metadata = metadata;
   }
 
   @VisibleForTesting
@@ -248,7 +252,8 @@ public class OzoneBucket {
    */
   public OzoneOutputStream createKey(String key, long size)
       throws IOException {
-    return createKey(key, size, defaultReplicationType, defaultReplication);
+    return createKey(key, size, defaultReplicationType, defaultReplication,
+        new HashMap<>());
   }
 
   /**
@@ -262,9 +267,11 @@ public class OzoneBucket {
    */
   public OzoneOutputStream createKey(String key, long size,
                                      ReplicationType type,
-                                     ReplicationFactor factor)
+                                     ReplicationFactor factor,
+                                     Map<String, String> keyMetadata)
       throws IOException {
-    return proxy.createKey(volumeName, name, key, size, type, factor);
+    return proxy
+        .createKey(volumeName, name, key, size, type, factor, keyMetadata);
   }
 
   /**

+ 9 - 1
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneKeyDetails.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.client;
 import org.apache.hadoop.hdds.client.ReplicationType;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * A class that encapsulates OzoneKeyLocation.
@@ -32,6 +33,8 @@ public class OzoneKeyDetails extends OzoneKey {
    */
   private List<OzoneKeyLocation> ozoneKeyLocations;
 
+  private Map<String, String> metadata;
+
   /**
    * Constructs OzoneKeyDetails from OmKeyInfo.
    */
@@ -39,10 +42,11 @@ public class OzoneKeyDetails extends OzoneKey {
   public OzoneKeyDetails(String volumeName, String bucketName, String keyName,
                          long size, long creationTime, long modificationTime,
                          List<OzoneKeyLocation> ozoneKeyLocations,
-                         ReplicationType type) {
+                         ReplicationType type, Map<String, String> metadata) {
     super(volumeName, bucketName, keyName, size, creationTime,
         modificationTime, type);
     this.ozoneKeyLocations = ozoneKeyLocations;
+    this.metadata = metadata;
   }
 
   /**
@@ -52,6 +56,10 @@ public class OzoneKeyDetails extends OzoneKey {
     return ozoneKeyLocations;
   }
 
+  public Map<String, String> getMetadata() {
+    return metadata;
+  }
+
   /**
    * Set details of key location.
    * @param ozoneKeyLocations - details of key location

+ 17 - 2
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneVolume.java

@@ -19,8 +19,10 @@
 package org.apache.hadoop.ozone.client;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -28,6 +30,7 @@ import org.apache.hadoop.hdds.client.OzoneQuota;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
+import org.apache.hadoop.ozone.om.helpers.WithMetadata;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -35,7 +38,7 @@ import com.google.common.base.Preconditions;
 /**
  * A class that encapsulates OzoneVolume.
  */
-public class OzoneVolume {
+public class OzoneVolume extends WithMetadata {
 
   /**
    * The proxy used for connecting to the cluster and perform
@@ -81,11 +84,13 @@ public class OzoneVolume {
    * @param quotaInBytes Volume quota in bytes.
    * @param creationTime creation time of the volume
    * @param acls ACLs associated with the volume.
+   * @param metadata custom key value metadata.
    */
   @SuppressWarnings("parameternumber")
   public OzoneVolume(Configuration conf, ClientProtocol proxy, String name,
                      String admin, String owner, long quotaInBytes,
-                     long creationTime, List<OzoneAcl> acls) {
+                     long creationTime, List<OzoneAcl> acls,
+                     Map<String, String> metadata) {
     Preconditions.checkNotNull(proxy, "Client proxy is not set.");
     this.proxy = proxy;
     this.name = name;
@@ -95,6 +100,15 @@ public class OzoneVolume {
     this.creationTime = creationTime;
     this.acls = acls;
     this.listCacheSize = HddsClientUtils.getListCacheSize(conf);
+    this.metadata = metadata;
+  }
+
+  @SuppressWarnings("parameternumber")
+  public OzoneVolume(Configuration conf, ClientProtocol proxy, String name,
+                     String admin, String owner, long quotaInBytes,
+                     long creationTime, List<OzoneAcl> acls) {
+    this(conf, proxy, name, admin, owner, quotaInBytes, creationTime, acls,
+        new HashMap<>());
   }
 
   @VisibleForTesting
@@ -108,6 +122,7 @@ public class OzoneVolume {
     this.quotaInBytes = quotaInBytes;
     this.creationTime = creationTime;
     this.acls = acls;
+    this.metadata = new HashMap<>();
   }
 
   /**

+ 25 - 3
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/VolumeArgs.java

@@ -21,7 +21,9 @@ package org.apache.hadoop.ozone.client;
 import org.apache.hadoop.ozone.OzoneAcl;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * This class encapsulates the arguments that are
@@ -33,6 +35,7 @@ public final class VolumeArgs {
   private final String owner;
   private final String quota;
   private final List<OzoneAcl> acls;
+  private Map<String, String> metadata;
 
   /**
    * Private constructor, constructed via builder.
@@ -41,12 +44,16 @@ public final class VolumeArgs {
    * @param quota Volume Quota.
    * @param acls User to access rights map.
    */
-  private VolumeArgs(String admin, String owner,
-                        String quota, List<OzoneAcl> acls) {
+  private VolumeArgs(String admin,
+      String owner,
+      String quota,
+      List<OzoneAcl> acls,
+      Map<String, String> metadata) {
     this.admin = admin;
     this.owner = owner;
     this.quota = quota;
     this.acls = acls;
+    this.metadata = metadata;
   }
 
   /**
@@ -73,6 +80,15 @@ public final class VolumeArgs {
     return quota;
   }
 
+  /**
+   * Return custom key value map.
+   *
+   * @return metadata
+   */
+  public Map<String, String> getMetadata() {
+    return metadata;
+  }
+
   public List<OzoneAcl> getAcls() {
     return acls;
   }
@@ -93,6 +109,7 @@ public final class VolumeArgs {
     private String ownerName;
     private String volumeQuota;
     private List<OzoneAcl> listOfAcls;
+    private Map<String, String> metadata = new HashMap<>();
 
 
     public VolumeArgs.Builder setAdmin(String admin) {
@@ -110,6 +127,10 @@ public final class VolumeArgs {
       return this;
     }
 
+    public VolumeArgs.Builder addMetadata(String key, String value) {
+      metadata.put(key, value);
+      return this;
+    }
     public VolumeArgs.Builder setAcls(List<OzoneAcl> acls)
         throws IOException {
       this.listOfAcls = acls;
@@ -121,7 +142,8 @@ public final class VolumeArgs {
      * @return CreateVolumeArgs.
      */
     public VolumeArgs build() {
-      return new VolumeArgs(adminName, ownerName, volumeQuota, listOfAcls);
+      return new VolumeArgs(adminName, ownerName, volumeQuota, listOfAcls,
+          metadata);
     }
   }
 

+ 3 - 1
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java

@@ -262,12 +262,14 @@ public interface ClientProtocol {
    * @param bucketName Name of the Bucket
    * @param keyName Name of the Key
    * @param size Size of the data
+   * @param metadata custom key value metadata
    * @return {@link OzoneOutputStream}
    *
    */
   OzoneOutputStream createKey(String volumeName, String bucketName,
                               String keyName, long size, ReplicationType type,
-                              ReplicationFactor factor)
+                              ReplicationFactor factor,
+                              Map<String, String> metadata)
       throws IOException;
 
   /**

+ 9 - 4
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java

@@ -83,6 +83,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.text.ParseException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -572,7 +573,8 @@ public class RestClient implements ClientProtocol {
           bucketInfo.getAcls(),
           bucketInfo.getStorageType(),
           getBucketVersioningFlag(bucketInfo.getVersioning()),
-          HddsClientUtils.formatDateTime(bucketInfo.getCreatedOn()));
+          HddsClientUtils.formatDateTime(bucketInfo.getCreatedOn()),
+          new HashMap<>());
       EntityUtils.consume(response);
       return bucket;
     } catch (URISyntaxException | ParseException e) {
@@ -611,7 +613,8 @@ public class RestClient implements ClientProtocol {
         return new OzoneBucket(conf, this, volumeName,
             bucketInfo.getBucketName(), bucketInfo.getAcls(),
             bucketInfo.getStorageType(),
-            getBucketVersioningFlag(bucketInfo.getVersioning()), creationTime);
+            getBucketVersioningFlag(bucketInfo.getVersioning()), creationTime,
+            new HashMap<>());
       }).collect(Collectors.toList());
     } catch (URISyntaxException e) {
       throw new IOException(e);
@@ -631,7 +634,8 @@ public class RestClient implements ClientProtocol {
   @Override
   public OzoneOutputStream createKey(
       String volumeName, String bucketName, String keyName, long size,
-      ReplicationType type, ReplicationFactor factor)
+      ReplicationType type, ReplicationFactor factor,
+      Map<String, String> metadata)
       throws IOException {
     // TODO: Once ReplicationType and ReplicationFactor are supported in
     // OzoneHandler (in Datanode), set them in header.
@@ -865,7 +869,8 @@ public class RestClient implements ClientProtocol {
           HddsClientUtils.formatDateTime(keyInfo.getCreatedOn()),
           HddsClientUtils.formatDateTime(keyInfo.getModifiedOn()),
           ozoneKeyLocations, ReplicationType.valueOf(
-              keyInfo.getType().toString()));
+              keyInfo.getType().toString()),
+          new HashMap<>());
       EntityUtils.consume(response);
       return key;
     } catch (URISyntaxException | ParseException e) {

+ 22 - 13
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java

@@ -247,6 +247,7 @@ public class RpcClient implements ClientProtocol {
     builder.setAdminName(admin);
     builder.setOwnerName(owner);
     builder.setQuotaInBytes(quota);
+    builder.addAllMetadata(volArgs.getMetadata());
 
     //Remove duplicates and add ACLs
     for (OzoneAcl ozoneAcl :
@@ -290,7 +291,8 @@ public class RpcClient implements ClientProtocol {
         volume.getQuotaInBytes(),
         volume.getCreationTime(),
         volume.getAclMap().ozoneAclGetProtobuf().stream().
-            map(OMPBHelper::convertOzoneAcl).collect(Collectors.toList()));
+            map(OMPBHelper::convertOzoneAcl).collect(Collectors.toList()),
+        volume.getMetadata());
   }
 
   @Override
@@ -341,7 +343,8 @@ public class RpcClient implements ClientProtocol {
         volume.getQuotaInBytes(),
         volume.getCreationTime(),
         volume.getAclMap().ozoneAclGetProtobuf().stream().
-            map(OMPBHelper::convertOzoneAcl).collect(Collectors.toList())))
+            map(OMPBHelper::convertOzoneAcl).collect(Collectors.toList()),
+        volume.getMetadata()))
         .collect(Collectors.toList());
   }
 
@@ -380,6 +383,7 @@ public class RpcClient implements ClientProtocol {
     builder.setVolumeName(volumeName)
         .setBucketName(bucketName)
         .setIsVersionEnabled(isVersionEnabled)
+        .addAllMetadata(bucketArgs.getMetadata())
         .setStorageType(storageType)
         .setAcls(listOfAcls.stream().distinct().collect(Collectors.toList()));
 
@@ -510,17 +514,18 @@ public class RpcClient implements ClientProtocol {
   public OzoneBucket getBucketDetails(
       String volumeName, String bucketName) throws IOException {
     HddsClientUtils.verifyResourceName(volumeName, bucketName);
-    OmBucketInfo bucketArgs =
+    OmBucketInfo bucketInfo =
         ozoneManagerClient.getBucketInfo(volumeName, bucketName);
     return new OzoneBucket(
         conf,
         this,
-        bucketArgs.getVolumeName(),
-        bucketArgs.getBucketName(),
-        bucketArgs.getAcls(),
-        bucketArgs.getStorageType(),
-        bucketArgs.getIsVersionEnabled(),
-        bucketArgs.getCreationTime());
+        bucketInfo.getVolumeName(),
+        bucketInfo.getBucketName(),
+        bucketInfo.getAcls(),
+        bucketInfo.getStorageType(),
+        bucketInfo.getIsVersionEnabled(),
+        bucketInfo.getCreationTime(),
+        bucketInfo.getMetadata());
   }
 
   @Override
@@ -538,14 +543,16 @@ public class RpcClient implements ClientProtocol {
         bucket.getAcls(),
         bucket.getStorageType(),
         bucket.getIsVersionEnabled(),
-        bucket.getCreationTime()))
+        bucket.getCreationTime(),
+        bucket.getMetadata()))
         .collect(Collectors.toList());
   }
 
   @Override
   public OzoneOutputStream createKey(
       String volumeName, String bucketName, String keyName, long size,
-      ReplicationType type, ReplicationFactor factor)
+      ReplicationType type, ReplicationFactor factor,
+      Map<String, String> metadata)
       throws IOException {
     HddsClientUtils.verifyResourceName(volumeName, bucketName);
     HddsClientUtils.checkNotNull(keyName, type, factor);
@@ -557,6 +564,7 @@ public class RpcClient implements ClientProtocol {
         .setDataSize(size)
         .setType(HddsProtos.ReplicationType.valueOf(type.toString()))
         .setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue()))
+        .addAllMetadata(metadata)
         .build();
 
     OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
@@ -669,7 +677,7 @@ public class RpcClient implements ClientProtocol {
     return new OzoneKeyDetails(keyInfo.getVolumeName(), keyInfo.getBucketName(),
         keyInfo.getKeyName(), keyInfo.getDataSize(), keyInfo.getCreationTime(),
         keyInfo.getModificationTime(), ozoneKeyLocations, ReplicationType
-        .valueOf(keyInfo.getType().toString()));
+        .valueOf(keyInfo.getType().toString()), keyInfo.getMetadata());
   }
 
   @Override
@@ -728,7 +736,8 @@ public class RpcClient implements ClientProtocol {
         bucket.getAcls(),
         bucket.getStorageType(),
         bucket.getIsVersionEnabled(),
-        bucket.getCreationTime()))
+        bucket.getCreationTime(),
+        bucket.getMetadata()))
         .collect(Collectors.toList());
   }
 

+ 54 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/KeyValueUtil.java

@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.helpers;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.KeyValue;
+
+/**
+ * Convert from/to hdds KeyValue protobuf structure.
+ */
+public final class KeyValueUtil {
+  private KeyValueUtil() {
+  }
+
+  /**
+   * Parse Key,Value map data from protobuf representation.
+   */
+  public static Map<String, String> getFromProtobuf(List<KeyValue> metadata) {
+    return metadata.stream()
+        .collect(Collectors.toMap(KeyValue::getKey,
+            KeyValue::getValue));
+  }
+
+  /**
+   * Encode key value map to protobuf.
+   */
+  public static List<KeyValue> toProtobuf(Map<String, String> keyValueMap) {
+    List<KeyValue> metadataList = new LinkedList<>();
+    for (Map.Entry<String, String> entry : keyValueMap.entrySet()) {
+      metadataList.add(KeyValue.newBuilder().setKey(entry.getKey()).
+          setValue(entry.getValue()).build());
+    }
+    return metadataList;
+  }
+}

+ 14 - 4
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketArgs.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.ozone.om.helpers;
 
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -34,7 +35,7 @@ import com.google.common.base.Preconditions;
 /**
  * A class that encapsulates Bucket Arguments.
  */
-public final class OmBucketArgs implements Auditable {
+public final class OmBucketArgs extends WithMetadata implements Auditable {
   /**
    * Name of the volume in which the bucket belongs to.
    */
@@ -72,13 +73,15 @@ public final class OmBucketArgs implements Auditable {
    */
   private OmBucketArgs(String volumeName, String bucketName,
                        List<OzoneAcl> addAcls, List<OzoneAcl> removeAcls,
-                       Boolean isVersionEnabled, StorageType storageType) {
+      Boolean isVersionEnabled, StorageType storageType,
+      Map<String, String> metadata) {
     this.volumeName = volumeName;
     this.bucketName = bucketName;
     this.addAcls = addAcls;
     this.removeAcls = removeAcls;
     this.isVersionEnabled = isVersionEnabled;
     this.storageType = storageType;
+    this.metadata = metadata;
   }
 
   /**
@@ -167,6 +170,7 @@ public final class OmBucketArgs implements Auditable {
     private List<OzoneAcl> removeAcls;
     private Boolean isVersionEnabled;
     private StorageType storageType;
+    private Map<String, String> metadata;
 
     public Builder setVolumeName(String volume) {
       this.volumeName = volume;
@@ -193,6 +197,11 @@ public final class OmBucketArgs implements Auditable {
       return this;
     }
 
+    public Builder addMetadata(Map<String, String> metadataMap) {
+      this.metadata = metadataMap;
+      return this;
+    }
+
     public Builder setStorageType(StorageType storage) {
       this.storageType = storage;
       return this;
@@ -206,7 +215,7 @@ public final class OmBucketArgs implements Auditable {
       Preconditions.checkNotNull(volumeName);
       Preconditions.checkNotNull(bucketName);
       return new OmBucketArgs(volumeName, bucketName, addAcls,
-          removeAcls, isVersionEnabled, storageType);
+          removeAcls, isVersionEnabled, storageType, metadata);
     }
   }
 
@@ -249,6 +258,7 @@ public final class OmBucketArgs implements Auditable {
         bucketArgs.hasIsVersionEnabled() ?
             bucketArgs.getIsVersionEnabled() : null,
         bucketArgs.hasStorageType() ? StorageType.valueOf(
-            bucketArgs.getStorageType()) : null);
+            bucketArgs.getStorageType()) : null,
+        KeyValueUtil.getFromProtobuf(bucketArgs.getMetadataList()));
   }
 }

+ 52 - 6
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java

@@ -17,10 +17,13 @@
  */
 package org.apache.hadoop.ozone.om.helpers;
 
+
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.protocol.StorageType;
@@ -35,7 +38,7 @@ import com.google.common.base.Preconditions;
 /**
  * A class that encapsulates Bucket Info.
  */
-public final class OmBucketInfo implements Auditable {
+public final class OmBucketInfo extends WithMetadata implements Auditable {
   /**
    * Name of the volume in which the bucket belongs to.
    */
@@ -71,15 +74,20 @@ public final class OmBucketInfo implements Auditable {
    * @param storageType - Storage type to be used.
    * @param creationTime - Bucket creation time.
    */
-  private OmBucketInfo(String volumeName, String bucketName,
-                       List<OzoneAcl> acls, boolean isVersionEnabled,
-                       StorageType storageType, long creationTime) {
+  private OmBucketInfo(String volumeName,
+                       String bucketName,
+                       List<OzoneAcl> acls,
+                       boolean isVersionEnabled,
+                       StorageType storageType,
+                       long creationTime,
+                       Map<String, String> metadata) {
     this.volumeName = volumeName;
     this.bucketName = bucketName;
     this.acls = acls;
     this.isVersionEnabled = isVersionEnabled;
     this.storageType = storageType;
     this.creationTime = creationTime;
+    this.metadata = metadata;
   }
 
   /**
@@ -165,12 +173,14 @@ public final class OmBucketInfo implements Auditable {
     private Boolean isVersionEnabled;
     private StorageType storageType;
     private long creationTime;
+    private Map<String, String> metadata;
 
     public Builder() {
       //Default values
       this.acls = new LinkedList<>();
       this.isVersionEnabled = false;
       this.storageType = StorageType.DISK;
+      this.metadata = new HashMap<>();
     }
 
     public Builder setVolumeName(String volume) {
@@ -203,6 +213,16 @@ public final class OmBucketInfo implements Auditable {
       return this;
     }
 
+    public Builder addMetadata(String key, String value) {
+      metadata.put(key, value);
+      return this;
+    }
+
+    public Builder addAllMetadata(Map<String, String> additionalMetadata) {
+      metadata.putAll(additionalMetadata);
+      return this;
+    }
+
     /**
      * Constructs the OmBucketInfo.
      * @return instance of OmBucketInfo.
@@ -215,7 +235,8 @@ public final class OmBucketInfo implements Auditable {
       Preconditions.checkNotNull(storageType);
 
       return new OmBucketInfo(volumeName, bucketName, acls,
-          isVersionEnabled, storageType, creationTime);
+          isVersionEnabled, storageType, creationTime, metadata
+      );
     }
   }
 
@@ -231,6 +252,7 @@ public final class OmBucketInfo implements Auditable {
         .setIsVersionEnabled(isVersionEnabled)
         .setStorageType(storageType.toProto())
         .setCreationTime(creationTime)
+        .addAllMetadata(KeyValueUtil.toProtobuf(metadata))
         .build();
   }
 
@@ -247,6 +269,30 @@ public final class OmBucketInfo implements Auditable {
             OMPBHelper::convertOzoneAcl).collect(Collectors.toList()),
         bucketInfo.getIsVersionEnabled(),
         StorageType.valueOf(bucketInfo.getStorageType()),
-        bucketInfo.getCreationTime());
+        bucketInfo.getCreationTime(),
+        KeyValueUtil.getFromProtobuf(bucketInfo.getMetadataList()));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    OmBucketInfo that = (OmBucketInfo) o;
+    return creationTime == that.creationTime &&
+        volumeName.equals(that.volumeName) &&
+        bucketName.equals(that.bucketName) &&
+        Objects.equals(acls, that.acls) &&
+        Objects.equals(isVersionEnabled, that.isVersionEnabled) &&
+        storageType == that.storageType &&
+        Objects.equals(metadata, that.metadata);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(volumeName, bucketName);
   }
 }

+ 26 - 2
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.audit.Auditable;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,12 +43,14 @@ public final class OmKeyArgs implements Auditable {
   private final boolean isMultipartKey;
   private final String multipartUploadID;
   private final int multipartUploadPartNumber;
+  private Map<String, String> metadata;
 
   @SuppressWarnings("parameternumber")
   private OmKeyArgs(String volumeName, String bucketName, String keyName,
       long dataSize, ReplicationType type, ReplicationFactor factor,
       List<OmKeyLocationInfo> locationInfoList, boolean isMultipart,
-      String uploadID, int partNumber) {
+      String uploadID, int partNumber,
+      Map<String, String> metadataMap) {
     this.volumeName = volumeName;
     this.bucketName = bucketName;
     this.keyName = keyName;
@@ -58,6 +61,7 @@ public final class OmKeyArgs implements Auditable {
     this.isMultipartKey = isMultipart;
     this.multipartUploadID = uploadID;
     this.multipartUploadPartNumber = partNumber;
+    this.metadata = metadataMap;
   }
 
   public boolean getIsMultipartKey() {
@@ -100,6 +104,14 @@ public final class OmKeyArgs implements Auditable {
     dataSize = size;
   }
 
+  public Map<String, String> getMetadata() {
+    return metadata;
+  }
+
+  public void setMetadata(Map<String, String> metadata) {
+    this.metadata = metadata;
+  }
+
   public void setLocationInfoList(List<OmKeyLocationInfo> locationInfoList) {
     this.locationInfoList = locationInfoList;
   }
@@ -146,6 +158,7 @@ public final class OmKeyArgs implements Auditable {
     private boolean isMultipartKey;
     private String multipartUploadID;
     private int multipartUploadPartNumber;
+    private Map<String, String> metadata = new HashMap<>();
 
     public Builder setVolumeName(String volume) {
       this.volumeName = volume;
@@ -197,10 +210,21 @@ public final class OmKeyArgs implements Auditable {
       return this;
     }
 
+    public Builder addMetadata(String key, String value) {
+      this.metadata.put(key, value);
+      return this;
+    }
+
+    public Builder addAllMetadata(Map<String, String> metadatamap) {
+      this.metadata.putAll(metadatamap);
+      return this;
+    }
+
     public OmKeyArgs build() {
       return new OmKeyArgs(volumeName, bucketName, keyName, dataSize, type,
           factor, locationInfoList, isMultipartKey, multipartUploadID,
-          multipartUploadPartNumber);
+          multipartUploadPartNumber, metadata);
     }
+
   }
 }

+ 57 - 8
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java

@@ -19,7 +19,10 @@ package org.apache.hadoop.ozone.om.helpers;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -33,7 +36,7 @@ import com.google.common.base.Preconditions;
  * This is returned from OM to client, and client use class to talk to
  * datanode. Also, this is the metadata written to om.db on server side.
  */
-public final class OmKeyInfo {
+public final class OmKeyInfo extends WithMetadata {
   private final String volumeName;
   private final String bucketName;
   // name of key client specified
@@ -46,11 +49,12 @@ public final class OmKeyInfo {
   private HddsProtos.ReplicationFactor factor;
 
   @SuppressWarnings("parameternumber")
-  private OmKeyInfo(String volumeName, String bucketName, String keyName,
-                    List<OmKeyLocationInfoGroup> versions, long dataSize,
-                    long creationTime, long modificationTime,
-                    HddsProtos.ReplicationType type,
-                    HddsProtos.ReplicationFactor factor) {
+  OmKeyInfo(String volumeName, String bucketName, String keyName,
+      List<OmKeyLocationInfoGroup> versions, long dataSize,
+      long creationTime, long modificationTime,
+      HddsProtos.ReplicationType type,
+      HddsProtos.ReplicationFactor factor,
+      Map<String, String> metadata) {
     this.volumeName = volumeName;
     this.bucketName = bucketName;
     this.keyName = keyName;
@@ -71,6 +75,7 @@ public final class OmKeyInfo {
     this.modificationTime = modificationTime;
     this.factor = factor;
     this.type = type;
+    this.metadata = metadata;
   }
 
   public String getVolumeName() {
@@ -217,6 +222,12 @@ public final class OmKeyInfo {
     private HddsProtos.ReplicationType type;
     private HddsProtos.ReplicationFactor factor;
     private boolean isMultipartKey;
+    private Map<String, String> metadata;
+
+    public Builder() {
+      this.metadata = new HashMap<>();
+      omKeyLocationInfoGroups = new ArrayList<>();
+    }
 
     public Builder setVolumeName(String volume) {
       this.volumeName = volume;
@@ -269,10 +280,20 @@ public final class OmKeyInfo {
       return this;
     }
 
+    public Builder addMetadata(String key, String value) {
+      metadata.put(key, value);
+      return this;
+    }
+
+    public Builder addAllMetadata(Map<String, String> newMetadata) {
+      metadata.putAll(newMetadata);
+      return this;
+    }
+
     public OmKeyInfo build() {
       return new OmKeyInfo(
           volumeName, bucketName, keyName, omKeyLocationInfoGroups,
-          dataSize, creationTime, modificationTime, type, factor);
+          dataSize, creationTime, modificationTime, type, factor, metadata);
     }
   }
 
@@ -292,6 +313,7 @@ public final class OmKeyInfo {
         .setLatestVersion(latestVersion)
         .setCreationTime(creationTime)
         .setModificationTime(modificationTime)
+        .addAllMetadata(KeyValueUtil.toProtobuf(metadata))
         .build();
   }
 
@@ -307,7 +329,34 @@ public final class OmKeyInfo {
         keyInfo.getCreationTime(),
         keyInfo.getModificationTime(),
         keyInfo.getType(),
-        keyInfo.getFactor());
+        keyInfo.getFactor(),
+        KeyValueUtil.getFromProtobuf(keyInfo.getMetadataList()));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    OmKeyInfo omKeyInfo = (OmKeyInfo) o;
+    return dataSize == omKeyInfo.dataSize &&
+        creationTime == omKeyInfo.creationTime &&
+        modificationTime == omKeyInfo.modificationTime &&
+        volumeName.equals(omKeyInfo.volumeName) &&
+        bucketName.equals(omKeyInfo.bucketName) &&
+        keyName.equals(omKeyInfo.keyName) &&
+        Objects
+            .equals(keyLocationVersions, omKeyInfo.keyLocationVersions) &&
+        type == omKeyInfo.type &&
+        factor == omKeyInfo.factor &&
+        Objects.equals(metadata, omKeyInfo.metadata);
   }
 
+  @Override
+  public int hashCode() {
+    return Objects.hash(volumeName, bucketName, keyName);
+  }
 }

+ 33 - 35
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmVolumeArgs.java

@@ -17,34 +17,29 @@
  */
 package org.apache.hadoop.ozone.om.helpers;
 
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.audit.Auditable;
-import org.apache.hadoop.ozone.protocol.proto
-    .OzoneManagerProtocolProtos.OzoneAclInfo;
-import org.apache.hadoop.ozone.protocol.proto
-    .OzoneManagerProtocolProtos.VolumeInfo;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.KeyValue;
-
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
+
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.audit.Auditable;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
+
+import com.google.common.base.Preconditions;
 
 
 /**
  * A class that encapsulates the OmVolumeArgs Args.
  */
-public final class OmVolumeArgs implements Auditable{
+public final class OmVolumeArgs extends WithMetadata implements Auditable {
   private final String adminName;
   private final String ownerName;
   private final String volume;
   private final long creationTime;
   private final long quotaInBytes;
-  private final Map<String, String> keyValueMap;
   private final OmOzoneAclMap aclMap;
 
   /**
@@ -53,18 +48,18 @@ public final class OmVolumeArgs implements Auditable{
    * @param ownerName  - Volume owner's name
    * @param volume - volume name
    * @param quotaInBytes - Volume Quota in bytes.
-   * @param keyValueMap - keyValue map.
+   * @param metadata - metadata map for custom key/value data.
    * @param aclMap - User to access rights map.
    * @param creationTime - Volume creation time.
    */
   private OmVolumeArgs(String adminName, String ownerName, String volume,
-                       long quotaInBytes, Map<String, String> keyValueMap,
+                       long quotaInBytes, Map<String, String> metadata,
                        OmOzoneAclMap aclMap, long creationTime) {
     this.adminName = adminName;
     this.ownerName = ownerName;
     this.volume = volume;
     this.quotaInBytes = quotaInBytes;
-    this.keyValueMap = keyValueMap;
+    this.metadata = metadata;
     this.aclMap = aclMap;
     this.creationTime = creationTime;
   }
@@ -109,10 +104,6 @@ public final class OmVolumeArgs implements Auditable{
     return quotaInBytes;
   }
 
-  public Map<String, String> getKeyValueMap() {
-    return keyValueMap;
-  }
-
   public OmOzoneAclMap getAclMap() {
     return aclMap;
   }
@@ -145,14 +136,14 @@ public final class OmVolumeArgs implements Auditable{
     private String volume;
     private long creationTime;
     private long quotaInBytes;
-    private Map<String, String> keyValueMap;
+    private Map<String, String> metadata;
     private OmOzoneAclMap aclMap;
 
     /**
      * Constructs a builder.
      */
     public Builder() {
-      keyValueMap = new HashMap<>();
+      metadata = new HashMap<>();
       aclMap = new OmOzoneAclMap();
     }
 
@@ -182,7 +173,14 @@ public final class OmVolumeArgs implements Auditable{
     }
 
     public Builder addMetadata(String key, String value) {
-      keyValueMap.put(key, value); // overwrite if present.
+      metadata.put(key, value); // overwrite if present.
+      return this;
+    }
+
+    public Builder addAllMetadata(Map<String, String> additionalMetaData) {
+      if (additionalMetaData != null) {
+        metadata.putAll(additionalMetaData);
+      }
       return this;
     }
 
@@ -200,16 +198,13 @@ public final class OmVolumeArgs implements Auditable{
       Preconditions.checkNotNull(ownerName);
       Preconditions.checkNotNull(volume);
       return new OmVolumeArgs(adminName, ownerName, volume, quotaInBytes,
-          keyValueMap, aclMap, creationTime);
+          metadata, aclMap, creationTime);
     }
+
   }
 
   public VolumeInfo getProtobuf() {
-    List<KeyValue> metadataList = new LinkedList<>();
-    for (Map.Entry<String, String> entry : keyValueMap.entrySet()) {
-      metadataList.add(KeyValue.newBuilder().setKey(entry.getKey()).
-          setValue(entry.getValue()).build());
-    }
+
     List<OzoneAclInfo> aclList = aclMap.ozoneAclGetProtobuf();
 
     return VolumeInfo.newBuilder()
@@ -217,7 +212,7 @@ public final class OmVolumeArgs implements Auditable{
         .setOwnerName(ownerName)
         .setVolume(volume)
         .setQuotaInBytes(quotaInBytes)
-        .addAllMetadata(metadataList)
+        .addAllMetadata(KeyValueUtil.toProtobuf(metadata))
         .addAllVolumeAcls(aclList)
         .setCreationTime(
             creationTime == 0 ? System.currentTimeMillis() : creationTime)
@@ -225,14 +220,17 @@ public final class OmVolumeArgs implements Auditable{
   }
 
   public static OmVolumeArgs getFromProtobuf(VolumeInfo volInfo) {
-    Map<String, String> kvMap = volInfo.getMetadataList().stream()
-        .collect(Collectors.toMap(KeyValue::getKey,
-            KeyValue::getValue));
+
     OmOzoneAclMap aclMap =
         OmOzoneAclMap.ozoneAclGetFromProtobuf(volInfo.getVolumeAclsList());
 
-    return new OmVolumeArgs(volInfo.getAdminName(), volInfo.getOwnerName(),
-        volInfo.getVolume(), volInfo.getQuotaInBytes(), kvMap, aclMap,
+    return new OmVolumeArgs(
+        volInfo.getAdminName(),
+        volInfo.getOwnerName(),
+        volInfo.getVolume(),
+        volInfo.getQuotaInBytes(),
+        KeyValueUtil.getFromProtobuf(volInfo.getMetadataList()),
+        aclMap,
         volInfo.getCreationTime());
   }
 }

+ 44 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/WithMetadata.java

@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.helpers;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Mixin class to handle custom metadata.
+ */
+public class WithMetadata {
+
+  protected Map<String, String> metadata = new HashMap<>();
+
+  /**
+   * Custom key value metadata.
+   */
+  public Map<String, String> getMetadata() {
+    return metadata;
+  }
+
+  /**
+   * Set custom key value metadata.
+   */
+  public void setMetadata(Map<String, String> metadata) {
+    this.metadata = metadata;
+  }
+
+}

+ 6 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolTranslator;
+import org.apache.hadoop.ozone.om.helpers.KeyValueUtil;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@@ -652,6 +653,11 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
       keyArgs.setDataSize(args.getDataSize());
     }
 
+    if (args.getMetadata() != null && args.getMetadata().size() > 0) {
+      keyArgs.addAllMetadata(KeyValueUtil.toProtobuf(args.getMetadata()));
+    }
+    req.setKeyArgs(keyArgs.build());
+
     if (args.getMultipartUploadID() != null) {
       keyArgs.setMultipartUploadID(args.getMultipartUploadID());
     }

+ 6 - 0
hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto

@@ -317,6 +317,8 @@ message BucketInfo {
     required bool isVersionEnabled = 4 [default = false];
     required StorageTypeProto storageType = 5 [default = DISK];
     required uint64 creationTime = 6;
+    repeated hadoop.hdds.KeyValue metadata = 7;
+
 }
 
 enum StorageTypeProto {
@@ -333,6 +335,7 @@ message BucketArgs {
     repeated OzoneAclInfo removeAcls = 4;
     optional bool isVersionEnabled = 5;
     optional StorageTypeProto storageType = 6;
+    repeated hadoop.hdds.KeyValue metadata = 7;
 }
 
 message OzoneAclInfo {
@@ -409,6 +412,7 @@ message KeyArgs {
     optional bool isMultipartKey = 8;
     optional string multipartUploadID = 9;
     optional uint32 multipartNumber = 10;
+    repeated hadoop.hdds.KeyValue metadata = 11;
 }
 
 message KeyLocation {
@@ -436,6 +440,8 @@ message KeyInfo {
     required uint64 creationTime = 8;
     required uint64 modificationTime = 9;
     optional uint64 latestVersion = 10;
+    repeated hadoop.hdds.KeyValue metadata = 11;
+
 }
 
 message CreateKeyRequest {

+ 46 - 0
hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmBucketInfo.java

@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.helpers;
+
+import org.apache.hadoop.hdds.protocol.StorageType;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test BucketInfo.
+ */
+public class TestOmBucketInfo {
+
+  @Test
+  public void protobufConversion() {
+    OmBucketInfo bucket = OmBucketInfo.newBuilder()
+        .setBucketName("bucket")
+        .setVolumeName("vol1")
+        .setCreationTime(1L)
+        .setIsVersionEnabled(false)
+        .setStorageType(StorageType.ARCHIVE)
+        .build();
+
+    OmBucketInfo afterSerialization =
+        OmBucketInfo.getFromProtobuf(bucket.getProtobuf());
+
+    Assert.assertEquals(bucket, afterSerialization);
+
+  }
+}

+ 52 - 0
hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java

@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.helpers;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo.Builder;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test OmKeyInfo.
+ */
+public class TestOmKeyInfo {
+
+  @Test
+  public void protobufConversion() {
+    OmKeyInfo key = new Builder()
+        .setKeyName("key1")
+        .setBucketName("bucket")
+        .setVolumeName("vol1")
+        .setCreationTime(123L)
+        .setModificationTime(123L)
+        .setDataSize(123L)
+        .setReplicationFactor(ReplicationFactor.THREE)
+        .setReplicationType(ReplicationType.RATIS)
+        .addMetadata("key1", "value1")
+        .addMetadata("key2", "value2")
+        .build();
+
+    OmKeyInfo keyAfterSerialization =
+        OmKeyInfo.getFromProtobuf(key.getProtobuf());
+
+    Assert.assertEquals(key, keyAfterSerialization);
+  }
+}

+ 21 - 0
hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/package-info.java

@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.helpers;
+/**
+ * Unit tests of helpers.
+ */

+ 5 - 4
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rest/TestOzoneRestClient.java

@@ -46,6 +46,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.UUID;
 
@@ -350,7 +351,7 @@ public class TestOzoneRestClient {
 
       OzoneOutputStream out = bucket.createKey(keyName,
           value.getBytes().length, ReplicationType.STAND_ALONE,
-          ReplicationFactor.ONE);
+          ReplicationFactor.ONE, new HashMap<>());
       out.write(value.getBytes());
       out.close();
       OzoneKey key = bucket.getKey(keyName);
@@ -376,7 +377,7 @@ public class TestOzoneRestClient {
     OzoneBucket bucket = volume.getBucket(bucketName);
     OzoneOutputStream out = bucket.createKey(keyName,
         value.getBytes().length, ReplicationType.STAND_ALONE,
-        ReplicationFactor.ONE);
+        ReplicationFactor.ONE, new HashMap<>());
     out.write(value.getBytes());
     out.close();
     OzoneKey key = bucket.getKey(keyName);
@@ -398,7 +399,7 @@ public class TestOzoneRestClient {
     OzoneBucket bucket = volume.getBucket(bucketName);
     OzoneOutputStream out = bucket.createKey(fromKeyName,
         value.getBytes().length, ReplicationType.STAND_ALONE,
-        ReplicationFactor.ONE);
+        ReplicationFactor.ONE, new HashMap<>());
     out.write(value.getBytes());
     out.close();
     OzoneKey key = bucket.getKey(fromKeyName);
@@ -427,7 +428,7 @@ public class TestOzoneRestClient {
     String keyValue = RandomStringUtils.random(128);
     OzoneOutputStream out = bucket.createKey(keyName,
         keyValue.getBytes().length, ReplicationType.STAND_ALONE,
-        ReplicationFactor.ONE);
+        ReplicationFactor.ONE, new HashMap<>());
     out.write(keyValue.getBytes());
     out.close();
 

+ 2 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java

@@ -37,6 +37,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -109,7 +110,7 @@ public class TestBCSID {
     OzoneOutputStream key =
         objectStore.getVolume(volumeName).getBucket(bucketName)
             .createKey("ratis", 1024, ReplicationType.RATIS,
-                ReplicationFactor.ONE);
+                ReplicationFactor.ONE, new HashMap<>());
     key.write("ratis".getBytes());
     key.close();
 

+ 2 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java

@@ -39,6 +39,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -117,7 +118,7 @@ public class TestContainerStateMachine {
     OzoneOutputStream key =
         objectStore.getVolume(volumeName).getBucket(bucketName)
             .createKey("ratis", 1024, ReplicationType.RATIS,
-                ReplicationFactor.ONE);
+                ReplicationFactor.ONE, new HashMap<>());
     // First write and flush creates a container in the datanode
     key.write("ratis".getBytes());
     key.flush();

+ 2 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java

@@ -39,6 +39,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -115,7 +116,7 @@ public class TestContainerStateMachineFailures {
     OzoneOutputStream key =
         objectStore.getVolume(volumeName).getBucket(bucketName)
             .createKey("ratis", 1024, ReplicationType.RATIS,
-                ReplicationFactor.ONE);
+                ReplicationFactor.ONE, new HashMap<>());
     // First write and flush creates a container in the datanode
     key.write("ratis".getBytes());
     key.flush();

+ 60 - 17
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java

@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -59,6 +60,7 @@ import org.apache.hadoop.ozone.client.OzoneKey;
 import org.apache.hadoop.ozone.client.OzoneKeyDetails;
 import org.apache.hadoop.ozone.client.OzoneKeyLocation;
 import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.VolumeArgs;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
@@ -200,6 +202,39 @@ public abstract class TestOzoneRpcClientAbstract {
     store.getVolume(volumeName);
   }
 
+  @Test
+  public void testCreateVolumeWithMetadata()
+      throws IOException, OzoneException {
+    String volumeName = UUID.randomUUID().toString();
+    VolumeArgs volumeArgs = VolumeArgs.newBuilder()
+        .addMetadata("key1", "val1")
+        .build();
+    store.createVolume(volumeName, volumeArgs);
+    OzoneVolume volume = store.getVolume(volumeName);
+
+    Assert.assertEquals("val1", volume.getMetadata().get("key1"));
+    Assert.assertEquals(volumeName, volume.getName());
+  }
+
+  @Test
+  public void testCreateBucketWithMetadata()
+      throws IOException, OzoneException {
+    long currentTime = Time.now();
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    BucketArgs args = BucketArgs.newBuilder()
+        .addMetadata("key1", "value1").build();
+    volume.createBucket(bucketName, args);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+    Assert.assertEquals(bucketName, bucket.getName());
+    Assert.assertNotNull(bucket.getMetadata());
+    Assert.assertEquals("value1", bucket.getMetadata().get("key1"));
+
+  }
+
+  
   @Test
   public void testCreateBucket()
       throws IOException, OzoneException {
@@ -518,7 +553,7 @@ public abstract class TestOzoneRpcClientAbstract {
 
       OzoneOutputStream out = bucket.createKey(keyName,
           value.getBytes().length, ReplicationType.STAND_ALONE,
-          ReplicationFactor.ONE);
+          ReplicationFactor.ONE, new HashMap<>());
       out.write(value.getBytes());
       out.close();
       OzoneKey key = bucket.getKey(keyName);
@@ -549,7 +584,7 @@ public abstract class TestOzoneRpcClientAbstract {
 
     // create the initial key with size 0, write will allocate the first block.
     OzoneOutputStream out = bucket.createKey(keyName, 0,
-        ReplicationType.STAND_ALONE, ReplicationFactor.ONE);
+        ReplicationType.STAND_ALONE, ReplicationFactor.ONE, new HashMap<>());
     out.write(value.getBytes());
     out.close();
     OmKeyArgs.Builder builder = new OmKeyArgs.Builder();
@@ -587,7 +622,7 @@ public abstract class TestOzoneRpcClientAbstract {
 
       OzoneOutputStream out = bucket.createKey(keyName,
           value.getBytes().length, ReplicationType.RATIS,
-          ReplicationFactor.ONE);
+          ReplicationFactor.ONE, new HashMap<>());
       out.write(value.getBytes());
       out.close();
       OzoneKey key = bucket.getKey(keyName);
@@ -622,7 +657,7 @@ public abstract class TestOzoneRpcClientAbstract {
 
       OzoneOutputStream out = bucket.createKey(keyName,
           value.getBytes().length, ReplicationType.RATIS,
-          ReplicationFactor.THREE);
+          ReplicationFactor.THREE, new HashMap<>());
       out.write(value.getBytes());
       out.close();
       OzoneKey key = bucket.getKey(keyName);
@@ -656,7 +691,7 @@ public abstract class TestOzoneRpcClientAbstract {
 
     OzoneOutputStream out = bucket
         .createKey(keyName, value.getBytes().length, ReplicationType.RATIS,
-            ReplicationFactor.THREE);
+            ReplicationFactor.THREE, new HashMap<>());
     KeyOutputStream groupOutputStream =
         (KeyOutputStream) out.getOutputStream();
     XceiverClientManager manager = groupOutputStream.getXceiverClientManager();
@@ -757,7 +792,7 @@ public abstract class TestOzoneRpcClientAbstract {
     // create the initial key with size 0, write will allocate the first block.
     OzoneOutputStream out = bucket.createKey(keyName,
         keyValue.getBytes().length, ReplicationType.STAND_ALONE,
-        ReplicationFactor.ONE);
+        ReplicationFactor.ONE, new HashMap<>());
     out.write(keyValue.getBytes());
     out.close();
 
@@ -844,7 +879,7 @@ public abstract class TestOzoneRpcClientAbstract {
     // Write data into a key
     OzoneOutputStream out = bucket.createKey(keyName,
         value.getBytes().length, ReplicationType.STAND_ALONE,
-        ReplicationFactor.ONE);
+        ReplicationFactor.ONE, new HashMap<>());
     out.write(value.getBytes());
     out.close();
 
@@ -925,7 +960,7 @@ public abstract class TestOzoneRpcClientAbstract {
     OzoneBucket bucket = volume.getBucket(bucketName);
     OzoneOutputStream out = bucket.createKey(keyName,
         value.getBytes().length, ReplicationType.STAND_ALONE,
-        ReplicationFactor.ONE);
+        ReplicationFactor.ONE, new HashMap<>());
     out.write(value.getBytes());
     out.close();
     OzoneKey key = bucket.getKey(keyName);
@@ -947,7 +982,7 @@ public abstract class TestOzoneRpcClientAbstract {
     OzoneBucket bucket = volume.getBucket(bucketName);
     OzoneOutputStream out = bucket.createKey(fromKeyName,
         value.getBytes().length, ReplicationType.STAND_ALONE,
-        ReplicationFactor.ONE);
+        ReplicationFactor.ONE, new HashMap<>());
     out.write(value.getBytes());
     out.close();
     OzoneKey key = bucket.getKey(fromKeyName);
@@ -1141,22 +1176,26 @@ public abstract class TestOzoneRpcClientAbstract {
       byte[] value = RandomStringUtils.randomAscii(10240).getBytes();
       OzoneOutputStream one = volAbucketA.createKey(
           keyBaseA + i + "-" + RandomStringUtils.randomNumeric(5),
-          value.length, ReplicationType.STAND_ALONE, ReplicationFactor.ONE);
+          value.length, ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
+          new HashMap<>());
       one.write(value);
       one.close();
       OzoneOutputStream two = volAbucketB.createKey(
           keyBaseA + i + "-" + RandomStringUtils.randomNumeric(5),
-          value.length, ReplicationType.STAND_ALONE, ReplicationFactor.ONE);
+          value.length, ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
+          new HashMap<>());
       two.write(value);
       two.close();
       OzoneOutputStream three = volBbucketA.createKey(
           keyBaseA + i + "-" + RandomStringUtils.randomNumeric(5),
-          value.length, ReplicationType.STAND_ALONE, ReplicationFactor.ONE);
+          value.length, ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
+          new HashMap<>());
       three.write(value);
       three.close();
       OzoneOutputStream four = volBbucketB.createKey(
           keyBaseA + i + "-" + RandomStringUtils.randomNumeric(5),
-          value.length, ReplicationType.STAND_ALONE, ReplicationFactor.ONE);
+          value.length, ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
+          new HashMap<>());
       four.write(value);
       four.close();
     }
@@ -1170,22 +1209,26 @@ public abstract class TestOzoneRpcClientAbstract {
       byte[] value = RandomStringUtils.randomAscii(10240).getBytes();
       OzoneOutputStream one = volAbucketA.createKey(
           keyBaseB + i + "-" + RandomStringUtils.randomNumeric(5),
-          value.length, ReplicationType.STAND_ALONE, ReplicationFactor.ONE);
+          value.length, ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
+          new HashMap<>());
       one.write(value);
       one.close();
       OzoneOutputStream two = volAbucketB.createKey(
           keyBaseB + i + "-" + RandomStringUtils.randomNumeric(5),
-          value.length, ReplicationType.STAND_ALONE, ReplicationFactor.ONE);
+          value.length, ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
+          new HashMap<>());
       two.write(value);
       two.close();
       OzoneOutputStream three = volBbucketA.createKey(
           keyBaseB + i + "-" + RandomStringUtils.randomNumeric(5),
-          value.length, ReplicationType.STAND_ALONE, ReplicationFactor.ONE);
+          value.length, ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
+          new HashMap<>());
       three.write(value);
       three.close();
       OzoneOutputStream four = volBbucketB.createKey(
           keyBaseB + i + "-" + RandomStringUtils.randomNumeric(5),
-          value.length, ReplicationType.STAND_ALONE, ReplicationFactor.ONE);
+          value.length, ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
+          new HashMap<>());
       four.write(value);
       four.close();
     }

+ 3 - 2
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java

@@ -51,6 +51,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.UUID;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
@@ -134,7 +135,7 @@ public class TestSecureOzoneRpcClient extends TestOzoneRpcClient {
 
       try (OzoneOutputStream out = bucket.createKey(keyName,
           value.getBytes().length, ReplicationType.STAND_ALONE,
-          ReplicationFactor.ONE)) {
+          ReplicationFactor.ONE, new HashMap<>())) {
         out.write(value.getBytes());
       }
 
@@ -177,7 +178,7 @@ public class TestSecureOzoneRpcClient extends TestOzoneRpcClient {
 
       try(OzoneOutputStream out = bucket.createKey(keyName,
           value.getBytes().length, ReplicationType.STAND_ALONE,
-          ReplicationFactor.ONE)) {
+          ReplicationFactor.ONE, new HashMap<>())) {
         LambdaTestUtils.intercept(IOException.class, "UNAUTHENTICATED: Fail " +
                 "to find any token ",
             () -> out.write(value.getBytes()));

+ 26 - 19
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java

@@ -18,13 +18,33 @@
 
 package org.apache.hadoop.ozone.container;
 
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.net.ServerSocket;
 import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.UUID;
+
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto.Builder;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.ozone.HddsDatanodeService;
@@ -35,31 +55,18 @@ import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.OzoneChecksumException;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.security.token.Token;
+
+import com.google.common.base.Preconditions;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
-import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
-import org.apache.hadoop.ozone.container.common.helpers.BlockData;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.security.NoSuchAlgorithmException;
-import java.util.*;
-
 /**
  * Helpers for container tests.
  */
@@ -668,7 +675,7 @@ public final class ContainerTestHelper {
             org.apache.hadoop.hdds.client.ReplicationFactor.ONE :
             org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
     return objectStore.getVolume(volumeName).getBucket(bucketName)
-        .createKey(keyName, size, type, factor);
+        .createKey(keyName, size, type, factor, new HashMap<>());
   }
 
   public static void validateData(String keyName, byte[] data,

+ 2 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java

@@ -58,6 +58,7 @@ import org.slf4j.event.Level;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Set;
 import java.util.List;
 import java.util.HashSet;
@@ -133,7 +134,7 @@ public class TestBlockDeletion {
     String keyName = UUID.randomUUID().toString();
 
     OzoneOutputStream out = bucket.createKey(keyName, value.getBytes().length,
-        ReplicationType.STAND_ALONE, ReplicationFactor.ONE);
+        ReplicationType.STAND_ALONE, ReplicationFactor.ONE, new HashMap<>());
     for (int i = 0; i < 100; i++) {
       out.write(value.getBytes());
     }

+ 4 - 3
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java

@@ -44,6 +44,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
 
@@ -92,7 +93,7 @@ public class TestCloseContainerByPipeline {
   public void testIfCloseContainerCommandHandlerIsInvoked() throws Exception {
     OzoneOutputStream key = objectStore.getVolume("test").getBucket("test")
         .createKey("standalone", 1024, ReplicationType.RATIS,
-            ReplicationFactor.ONE);
+            ReplicationFactor.ONE, new HashMap<>());
     key.write("standalone".getBytes());
     key.close();
 
@@ -146,7 +147,7 @@ public class TestCloseContainerByPipeline {
 
     OzoneOutputStream key = objectStore.getVolume("test").getBucket("test")
         .createKey("standalone", 1024, ReplicationType.RATIS,
-            ReplicationFactor.ONE);
+            ReplicationFactor.ONE, new HashMap<>());
     key.write("standalone".getBytes());
     key.close();
 
@@ -195,7 +196,7 @@ public class TestCloseContainerByPipeline {
 
     OzoneOutputStream key = objectStore.getVolume("test").getBucket("test")
         .createKey("ratis", 1024, ReplicationType.RATIS,
-            ReplicationFactor.THREE);
+            ReplicationFactor.THREE, new HashMap<>());
     key.write("ratis".getBytes());
     key.close();
 

+ 2 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java

@@ -41,6 +41,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.concurrent.TimeoutException;
 
 /**
@@ -66,7 +67,7 @@ public class TestCloseContainerHandler {
     objectStore.getVolume("test").createBucket("test");
     OzoneOutputStream key = objectStore.getVolume("test").getBucket("test")
         .createKey("test", 1024, ReplicationType.STAND_ALONE,
-            ReplicationFactor.ONE);
+            ReplicationFactor.ONE, new HashMap<>());
     key.write("test".getBytes());
     key.close();
 

+ 2 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestContainerReportWithKeys.java

@@ -40,6 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.HashMap;
 
 /**
  * This class tests container report with DN container state info.
@@ -93,7 +94,7 @@ public class TestContainerReportWithKeys {
     OzoneOutputStream key =
         objectStore.getVolume(volumeName).getBucket(bucketName)
             .createKey(keyName, keySize, ReplicationType.STAND_ALONE,
-                ReplicationFactor.ONE);
+                ReplicationFactor.ONE, new HashMap<>());
     String dataString = RandomStringUtils.randomAlphabetic(keySize);
     key.write(dataString.getBytes());
     key.close();

+ 3 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/TestOzoneRestWithMiniCluster.java

@@ -47,6 +47,7 @@ import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.HashMap;
 
 /**
  * End-to-end testing of Ozone REST operations.
@@ -107,7 +108,8 @@ public class TestOzoneRestWithMiniCluster {
       throws IOException {
     try (
         OzoneOutputStream ozoneOutputStream = bucket
-            .createKey(keyName, 0, replicationType, replicationFactor);
+            .createKey(keyName, 0, replicationType, replicationFactor,
+                new HashMap<>());
         InputStream inputStream = IOUtils.toInputStream(keyData, UTF_8)) {
       IOUtils.copy(inputStream, ozoneOutputStream);
     }

+ 8 - 5
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java

@@ -80,6 +80,7 @@ import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -259,7 +260,8 @@ public class TestKeys {
 
       try (
           OzoneOutputStream ozoneOutputStream = bucket
-              .createKey(keyName, 0, replicationType, replicationFactor);
+              .createKey(keyName, 0, replicationType, replicationFactor,
+                  new HashMap<>());
           InputStream fileInputStream = new FileInputStream(file)) {
         IOUtils.copy(fileInputStream, ozoneOutputStream);
       }
@@ -291,7 +293,7 @@ public class TestKeys {
     String newkeyName = OzoneUtils.getRequestID().toLowerCase();
     OzoneOutputStream ozoneOutputStream = helperClient
         .createKey(helper.getVol().getName(), helper.getBucket().getName(),
-            newkeyName, 0, replicationType, replicationFactor);
+            newkeyName, 0, replicationType, replicationFactor, new HashMap<>());
     ozoneOutputStream.close();
     keyList = helperClient
         .listKeys(helper.getVol().getName(), helper.getBucket().getName(), null,
@@ -302,7 +304,7 @@ public class TestKeys {
     try {
       ozoneOutputStream = helperClient
           .createKey("invalid-volume", helper.getBucket().getName(), newkeyName,
-              0, replicationType, replicationFactor);
+              0, replicationType, replicationFactor, new HashMap<>());
       ozoneOutputStream.close();
       fail("Put key should have thrown"
           + " when using invalid volume name.");
@@ -314,7 +316,7 @@ public class TestKeys {
     try {
       ozoneOutputStream = helperClient
           .createKey(helper.getVol().getName(), "invalid-bucket", newkeyName, 0,
-              replicationType, replicationFactor);
+              replicationType, replicationFactor, new HashMap<>());
       ozoneOutputStream.close();
       fail("Put key should have thrown "
           + "when using invalid bucket name.");
@@ -488,7 +490,8 @@ public class TestKeys {
       String newkeyName = "list-key" + x;
       try (
           OzoneOutputStream ozoneOutputStream = helper.getBucket()
-              .createKey(newkeyName, 0, replicationType, replicationFactor);
+              .createKey(newkeyName, 0, replicationType, replicationFactor,
+                  new HashMap<>());
           InputStream fileInputStream = new FileInputStream(helper.getFile())) {
         IOUtils.copy(fileInputStream, ozoneOutputStream);
       }

+ 2 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java

@@ -107,6 +107,7 @@ public class BucketManagerImpl implements BucketManager {
           .setStorageType(bucketInfo.getStorageType())
           .setIsVersionEnabled(bucketInfo.getIsVersionEnabled())
           .setCreationTime(Time.now())
+          .addAllMetadata(bucketInfo.getMetadata())
           .build();
       metadataManager.getBucketTable().put(bucketKey,
           omBucketInfo);
@@ -182,6 +183,7 @@ public class BucketManagerImpl implements BucketManager {
       OmBucketInfo.Builder bucketInfoBuilder = OmBucketInfo.newBuilder();
       bucketInfoBuilder.setVolumeName(oldBucketInfo.getVolumeName())
           .setBucketName(oldBucketInfo.getBucketName());
+      bucketInfoBuilder.addAllMetadata(args.getMetadata());
 
       //Check ACLs to update
       if (args.getAddAcls() != null || args.getRemoveAcls() != null) {

+ 1 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java

@@ -334,6 +334,7 @@ public class KeyManagerImpl implements KeyManager {
         // For this upload part we don't need to check in KeyTable. As this
         // is not an actual key, it is a part of the key.
         keyInfo = createKeyInfo(args, locations, factor, type, size);
+        //TODO args.getMetadata
         openVersion = 0;
       } else {
         keyInfo = metadataManager.getKeyTable().get(objectKey);

+ 2 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java

@@ -321,6 +321,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   private void saveOmMetrics() {
     try {
       boolean success;
+      Files.createDirectories(
+          getTempMetricsStorageFile().getParentFile().toPath());
       try (BufferedWriter writer = new BufferedWriter(
           new OutputStreamWriter(new FileOutputStream(
               getTempMetricsStorageFile()), "UTF-8"))) {

+ 2 - 1
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/web/ozShell/keys/PutKeyHandler.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.web.ozShell.keys;
 
 import java.io.File;
 import java.io.FileInputStream;
+import java.util.HashMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
@@ -105,7 +106,7 @@ public class PutKeyHandler extends Handler {
     OzoneBucket bucket = vol.getBucket(bucketName);
     OzoneOutputStream outputStream = bucket
         .createKey(keyName, dataFile.length(), replicationType,
-            replicationFactor);
+            replicationFactor, new HashMap<>());
     FileInputStream fileInputStream = new FileInputStream(dataFile);
     IOUtils.copyBytes(fileInputStream, outputStream, (int) conf
         .getStorageSize(OZONE_SCM_CHUNK_SIZE_KEY, OZONE_SCM_CHUNK_SIZE_DEFAULT,

+ 4 - 2
hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java

@@ -207,7 +207,8 @@ public class OzoneFileSystem extends FileSystem {
     }
 
     OzoneOutputStream ozoneOutputStream =
-        bucket.createKey(key, 0, replicationType, replicationFactor);
+        bucket.createKey(key, 0, replicationType, replicationFactor,
+            new HashMap<>());
     // We pass null to FSDataOutputStream so it won't count writes that
     // are being buffered to a file
     return new FSDataOutputStream(
@@ -770,7 +771,8 @@ public class OzoneFileSystem extends FileSystem {
   private boolean createDirectory(String keyName) {
     try {
       LOG.trace("creating dir for key:{}", keyName);
-      bucket.createKey(keyName, 0, replicationType, replicationFactor).close();
+      bucket.createKey(keyName, 0, replicationType, replicationFactor,
+          new HashMap<>()).close();
       return true;
     } catch (IOException ioe) {
       LOG.error("create key failed for key:{}", keyName, ioe);

+ 3 - 2
hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java

@@ -41,6 +41,7 @@ import java.time.Instant;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -163,7 +164,7 @@ public class ObjectEndpoint extends EndpointBase {
       OzoneBucket bucket = getBucket(bucketName);
 
       output = bucket.createKey(keyPath, length, replicationType,
-          replicationFactor);
+          replicationFactor, new HashMap<>());
 
       if ("STREAMING-AWS4-HMAC-SHA256-PAYLOAD"
           .equals(headers.getHeaderString("x-amz-content-sha256"))) {
@@ -619,7 +620,7 @@ public class ObjectEndpoint extends EndpointBase {
       sourceInputStream = sourceOzoneBucket.readKey(sourceKey);
 
       destOutputStream = destOzoneBucket.createKey(destkey, sourceKeyLen,
-          replicationType, replicationFactor);
+          replicationType, replicationFactor, new HashMap<>());
 
       IOUtils.copy(sourceInputStream, destOutputStream);
 

+ 6 - 3
hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java

@@ -83,12 +83,15 @@ public class OzoneBucketStub extends OzoneBucket {
   @Override
   public OzoneOutputStream createKey(String key, long size) throws IOException {
     return createKey(key, size, ReplicationType.STAND_ALONE,
-        ReplicationFactor.ONE);
+        ReplicationFactor.ONE, new HashMap<>());
   }
 
   @Override
   public OzoneOutputStream createKey(String key, long size,
-      ReplicationType type, ReplicationFactor factor) throws IOException {
+                                     ReplicationType type,
+                                     ReplicationFactor factor,
+                                     Map<String, String> metadata)
+      throws IOException {
     ByteArrayOutputStream byteArrayOutputStream =
         new ByteArrayOutputStream((int) size) {
           @Override
@@ -101,7 +104,7 @@ public class OzoneBucketStub extends OzoneBucket {
                 size,
                 System.currentTimeMillis(),
                 System.currentTimeMillis(),
-                new ArrayList<>(), type
+                new ArrayList<>(), type, metadata
             ));
             super.close();
           }

+ 2 - 1
hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectHead.java

@@ -22,6 +22,7 @@ package org.apache.hadoop.ozone.s3.endpoint;
 import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
 
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
@@ -71,7 +72,7 @@ public class TestObjectHead {
     String value = RandomStringUtils.randomAlphanumeric(32);
     OzoneOutputStream out = bucket.createKey("key1",
         value.getBytes(UTF_8).length, ReplicationType.STAND_ALONE,
-        ReplicationFactor.ONE);
+        ReplicationFactor.ONE, new HashMap<>());
     out.write(value.getBytes(UTF_8));
     out.close();
 

+ 2 - 1
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java

@@ -24,6 +24,7 @@ import java.io.PrintStream;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -592,7 +593,7 @@ public final class RandomKeyGenerator implements Callable<Void> {
                   key, bucket, volume);
               long keyCreateStart = System.nanoTime();
               OzoneOutputStream os =
-                  bucket.createKey(key, keySize, type, factor);
+                  bucket.createKey(key, keySize, type, factor, new HashMap<>());
               long keyCreationDuration = System.nanoTime() - keyCreateStart;
               histograms.get(FreonOps.KEY_CREATE.ordinal())
                   .update(keyCreationDuration);