Browse Source

HDFS-12123. Ozone: OzoneClient: Abstraction of OzoneClient and default implementation. Contributed by Nandakumar.

Anu Engineer 8 years ago
parent
commit
ab3d510c1d

+ 117 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneBucket.java

@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone;
+
+
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
+import org.apache.hadoop.ozone.OzoneConsts.Versioning;
+
+import java.util.List;
+
+/**
+ * A class that encapsulates OzoneBucket.
+ */
+public class OzoneBucket {
+
+  /**
+   * Name of the volume in which the bucket belongs to.
+   */
+  private final String volumeName;
+  /**
+   * Name of the bucket.
+   */
+  private final String bucketName;
+  /**
+   * Bucket ACLs.
+   */
+  private final List<OzoneAcl> acls;
+
+  /**
+   * Type of storage to be used for this bucket.
+   * [RAM_DISK, SSD, DISK, ARCHIVE]
+   */
+  private final StorageType storageType;
+
+  /**
+   * Bucket Version flag.
+   */
+  private final Versioning versioning;
+
+
+  /**
+   * Constructs OzoneBucket from KsmBucketInfo.
+   *
+   * @param ksmBucketInfo
+   */
+  public OzoneBucket(KsmBucketInfo ksmBucketInfo) {
+    this.volumeName = ksmBucketInfo.getVolumeName();
+    this.bucketName = ksmBucketInfo.getBucketName();
+    this.acls = ksmBucketInfo.getAcls();
+    this.storageType = ksmBucketInfo.getStorageType();
+    this.versioning = ksmBucketInfo.getIsVersionEnabled() ?
+        Versioning.ENABLED : Versioning.DISABLED;
+  }
+
+  /**
+   * Returns Volume Name.
+   *
+   * @return volumeName
+   */
+  public String getVolumeName() {
+    return volumeName;
+  }
+
+  /**
+   * Returns Bucket Name.
+   *
+   * @return bucketName
+   */
+  public String getBucketName() {
+    return bucketName;
+  }
+
+  /**
+   * Returns ACL's associated with the Bucket.
+   *
+   * @return acls
+   */
+  public List<OzoneAcl> getAcls() {
+    return acls;
+  }
+
+  /**
+   * Returns StorageType of the Bucket.
+   *
+   * @return storageType
+   */
+  public StorageType getStorageType() {
+    return storageType;
+  }
+
+  /**
+   * Returns Versioning associated with the Bucket.
+   *
+   * @return versioning
+   */
+  public Versioning getVersioning() {
+    return versioning;
+  }
+
+}

+ 386 - 222
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClient.java

@@ -17,234 +17,398 @@
 
 
 package org.apache.hadoop.ozone;
 package org.apache.hadoop.ozone;
 
 
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
-import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
-import org.apache.hadoop.ozone.web.exceptions.OzoneException;
-import org.apache.hadoop.ozone.web.handlers.BucketArgs;
-import org.apache.hadoop.ozone.web.handlers.KeyArgs;
-import org.apache.hadoop.ozone.web.handlers.UserArgs;
-import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
-import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
-import org.apache.hadoop.ozone.web.utils.OzoneUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-
-import java.io.Closeable;
+import org.apache.hadoop.ozone.OzoneConsts.Versioning;
+import org.apache.hadoop.ozone.io.OzoneInputStream;
+import org.apache.hadoop.ozone.io.OzoneOutputStream;
+
 import java.io.IOException;
 import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.List;
 
 
- /**
-  * OzoneClient can connect to a Ozone Object Store and
-  * perform basic operations.  It uses StorageHandler to
-  * connect to KSM.
-  */
-public class OzoneClient implements Closeable {
-
-  private final StorageHandler storageHandler;
-  private final UserGroupInformation ugi;
-  private final String hostName;
-  private final OzoneAcl.OzoneACLRights userAclRights;
-
-  public OzoneClient() throws IOException {
-    this(new OzoneConfiguration());
-  }
-
-   /**
-    * Creates OzoneClient object with the given configuration.
-    * @param conf
-    * @throws IOException
-    */
-  public OzoneClient(Configuration conf) throws IOException {
-    this.storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
-    this.ugi = UserGroupInformation.getCurrentUser();
-    this.hostName = OzoneUtils.getHostName();
-    this.userAclRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_USER_RIGHTS,
-        KSMConfigKeys.OZONE_KSM_USER_RIGHTS_DEFAULT);
-  }
-
-   /**
-    * Creates a new Volume.
-    *
-    * @param volumeName Name of the Volume
-    * @throws IOException
-    * @throws OzoneException
-    */
-  public void createVolume(String volumeName)
-      throws IOException, OzoneException {
-    createVolume(volumeName, ugi.getUserName());
-  }
-
-   /**
-    * Creates a new Volume.
-    *
-    * @param volumeName Name of the Volume
-    * @param owner Owner to be set for Volume
-    * @throws IOException
-    * @throws OzoneException
-    */
-  public void createVolume(String volumeName, String owner)
-      throws IOException, OzoneException {
-    createVolume(volumeName, owner, null);
-  }
-
-   /**
-    * Creates a new Volume.
-    *
-    * @param volumeName Name of the Volume
-    * @param owner Owner to be set for Volume
-    * @param quota Volume Quota
-    * @throws IOException
-    * @throws OzoneException
-    */
-  public void createVolume(String volumeName, String owner, String quota)
-      throws IOException, OzoneException {
-    Preconditions.checkNotNull(volumeName);
-    Preconditions.checkNotNull(owner);
-    OzoneUtils.verifyResourceName(volumeName);
-
-    String requestId = OzoneUtils.getRequestID();
-    //since we are reusing UserArgs which is used for REST call
-    // request, info, headers are null.
-    UserArgs userArgs = new UserArgs(owner, requestId, hostName,
-        null, null, null);
-    userArgs.setGroups(ugi.getGroupNames());
-
-    VolumeArgs volumeArgs = new VolumeArgs(volumeName, userArgs);
-    //current user is set as admin for this volume
-    volumeArgs.setAdminName(ugi.getUserName());
-    if (quota != null) {
-      volumeArgs.setQuota(quota);
-    }
-    storageHandler.createVolume(volumeArgs);
-  }
-
-   /**
-    * Creates a new Bucket in the Volume.
-    *
-    * @param volumeName Name of the Volume
-    * @param bucketName Name of the Bucket
-    * @throws IOException
-    * @throws OzoneException
-    */
-  public void createBucket(String volumeName, String bucketName)
-      throws IOException, OzoneException {
-    createBucket(volumeName, bucketName,
-        OzoneConsts.Versioning.NOT_DEFINED, StorageType.DEFAULT);
-  }
-
-   /**
-    * Creates a new Bucket in the Volume.
-    *
-    * @param volumeName
-    * @param bucketName
-    * @param versioning
-    * @throws IOException
-    * @throws OzoneException
-    */
-  public void createBucket(String volumeName, String bucketName,
-                           OzoneConsts.Versioning versioning)
-      throws IOException, OzoneException {
-    createBucket(volumeName, bucketName, versioning,
-        StorageType.DEFAULT);
-  }
-
-   /**
-    * Creates a new Bucket in the Volume.
-    *
-    * @param volumeName Name of the Volume
-    * @param bucketName Name of the Bucket
-    * @param storageType StorageType for the Bucket
-    * @throws IOException
-    * @throws OzoneException
-    */
-  public void createBucket(String volumeName, String bucketName,
-                           StorageType storageType)
-      throws IOException, OzoneException {
-    createBucket(volumeName, bucketName, OzoneConsts.Versioning.NOT_DEFINED,
-        storageType);
-  }
-
-  public void createBucket(String volumeName, String bucketName,
+/**
+ * OzoneClient can connect to a Ozone Cluster and
+ * perform basic operations.
+ */
+public interface OzoneClient {
+
+  /**
+   * Creates a new Volume.
+   *
+   * @param volumeName Name of the Volume
+   *
+   * @throws IOException
+   */
+  void createVolume(String volumeName)
+      throws IOException;
+
+  /**
+   * Creates a new Volume, with owner set.
+   *
+   * @param volumeName Name of the Volume
+   * @param owner Owner to be set for Volume
+   *
+   * @throws IOException
+   */
+  void createVolume(String volumeName, String owner)
+      throws IOException;
+
+  /**
+   * Creates a new Volume, with owner and quota set.
+   *
+   * @param volumeName Name of the Volume
+   * @param owner Owner to be set for Volume
+   * @param acls ACLs to be added to the Volume
+   *
+   * @throws IOException
+   */
+  void createVolume(String volumeName, String owner,
+                    OzoneAcl... acls)
+      throws IOException;
+
+  /**
+   * Creates a new Volume, with owner and quota set.
+   *
+   * @param volumeName Name of the Volume
+   * @param owner Owner to be set for Volume
+   * @param quota Volume Quota
+   *
+   * @throws IOException
+   */
+  void createVolume(String volumeName, String owner,
+                    long quota)
+      throws IOException;
+
+  /**
+   * Creates a new Volume, with owner and quota set.
+   *
+   * @param volumeName Name of the Volume
+   * @param owner Owner to be set for Volume
+   * @param quota Volume Quota
+   * @param acls ACLs to be added to the Volume
+   *
+   * @throws IOException
+   */
+  void createVolume(String volumeName, String owner,
+                    long quota, OzoneAcl... acls)
+      throws IOException;
+
+  /**
+   * Sets the owner of the volume.
+   *
+   * @param volumeName Name of the Volume
+   * @param owner to be set for the Volume
+   *
+   * @throws IOException
+   */
+  void setVolumeOwner(String volumeName, String owner) throws IOException;
+
+  /**
+   * Set Volume Quota.
+   *
+   * @param volumeName Name of the Volume
+   * @param quota Quota to be set for the Volume
+   *
+   * @throws IOException
+   */
+  void setVolumeQuota(String volumeName, long quota)
+      throws IOException;
+
+  /**
+   * Returns {@link OzoneVolume}.
+   *
+   * @param volumeName Name of the Volume
+   *
+   * @return KsmVolumeArgs
+   *
+   * @throws OzoneVolume
+   * */
+  OzoneVolume getVolumeDetails(String volumeName)
+      throws IOException;
+
+  /**
+   * Checks if a Volume exists and the user with a role specified has access
+   * to the Volume.
+   *
+   * @param volumeName Name of the Volume
+   * @param acl requested acls which needs to be checked for access
+   *
+   * @return Boolean - True if the user with a role can access the volume.
+   * This is possible for owners of the volume and admin users
+   *
+   * @throws IOException
+   */
+  boolean checkVolumeAccess(String volumeName, OzoneAcl acl)
+      throws IOException;
+
+  /**
+   * Deletes an Empty Volume.
+   *
+   * @param volumeName Name of the Volume
+   *
+   * @throws IOException
+   */
+  void deleteVolume(String volumeName) throws IOException;
+
+  /**
+   * Returns the List of Volumes owned by current user.
+   *
+   * @param volumePrefix Volume prefix to match
+   *
+   * @return KsmVolumeArgs Iterator
+   *
+   * @throws IOException
+   */
+  Iterator<OzoneVolume> listVolumes(String volumePrefix)
+      throws IOException;
+
+  /**
+   * Returns the List of Volumes owned by the specific user.
+   *
+   * @param volumePrefix Volume prefix to match
+   * @param user User Name
+   *
+   * @return KsmVolumeArgs Iterator
+   *
+   * @throws IOException
+   */
+  Iterator<OzoneVolume> listVolumes(String volumePrefix, String user)
+      throws IOException;
+
+  /**
+   * Creates a new Bucket in the Volume.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   *
+   * @throws IOException
+   */
+  void createBucket(String volumeName, String bucketName)
+      throws IOException;
+
+  /**
+   * Creates a new Bucket in the Volume, with versioning set.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   * @param versioning Bucket versioning
+   *
+   * @throws IOException
+   */
+  void createBucket(String volumeName, String bucketName,
+                    Versioning versioning)
+      throws IOException;
+
+  /**
+   * Creates a new Bucket in the Volume, with storage type set.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   * @param storageType StorageType for the Bucket
+   *
+   * @throws IOException
+   */
+  void createBucket(String volumeName, String bucketName,
+                    StorageType storageType)
+      throws IOException;
+
+  /**
+   * Creates a new Bucket in the Volume, with ACLs set.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   * @param acls OzoneAcls for the Bucket
+   *
+   * @throws IOException
+   */
+  void createBucket(String volumeName, String bucketName,
                            OzoneAcl... acls)
                            OzoneAcl... acls)
-      throws IOException, OzoneException {
-    createBucket(volumeName, bucketName, OzoneConsts.Versioning.NOT_DEFINED,
-        StorageType.DEFAULT, acls);
-  }
+      throws IOException;
 
 
-  public void createBucket(String volumeName, String bucketName,
+
+  /**
+   * Creates a new Bucket in the Volume, with versioning
+   * storage type and ACLs set.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   * @param storageType StorageType for the Bucket
+   *
+   * @throws IOException
+   */
+  void createBucket(String volumeName, String bucketName,
                            OzoneConsts.Versioning versioning,
                            OzoneConsts.Versioning versioning,
                            StorageType storageType, OzoneAcl... acls)
                            StorageType storageType, OzoneAcl... acls)
-      throws IOException, OzoneException {
-    Preconditions.checkNotNull(volumeName);
-    Preconditions.checkNotNull(bucketName);
-    OzoneUtils.verifyResourceName(bucketName);
-
-    List<OzoneAcl> listOfAcls = new ArrayList<>();
-
-    String userName = ugi.getUserName();
-    String requestId = OzoneUtils.getRequestID();
-    String[] groups = ugi.getGroupNames();
-
-    UserArgs userArgs = new UserArgs(userName, requestId, hostName,
-        null, null, null);
-    userArgs.setGroups(groups);
-
-    BucketArgs bucketArgs = new BucketArgs(volumeName, bucketName, userArgs);
-    bucketArgs.setVersioning(versioning);
-    bucketArgs.setStorageType(storageType);
-
-    //Adding current user's ACL to the ACL list, for now this doesn't check
-    //whether the "acls" argument passed to this method already has ACL for
-    //current user. This has to be fixed.
-    OzoneAcl userAcl = new OzoneAcl(OzoneAcl.OzoneACLType.USER, userName,
-        userAclRights);
-    listOfAcls.add(userAcl);
-    //Should we also add ACL of current user's groups?
-    if(acls != null && acls.length > 0) {
-      listOfAcls.addAll(Arrays.asList(acls));
-    }
-
-    bucketArgs.setAddAcls(listOfAcls);
-    storageHandler.createBucket(bucketArgs);
-  }
-
-   /**
-    * Adds a new Key to the Volume/Bucket.
-    *
-    * @param volumeName Name of the Volume
-    * @param bucketName Name of the Bucket
-    * @param keyName Key name
-    * @param value The Value
-    * @throws IOException
-    * @throws OzoneException
-    */
-  public void putKey(String volumeName, String bucketName,
-                     String keyName, byte[] value)
-      throws IOException, OzoneException {
-
-    Preconditions.checkNotNull(volumeName);
-    Preconditions.checkNotNull(bucketName);
-    Preconditions.checkNotNull(keyName);
-    String requestId = OzoneUtils.getRequestID();
-    UserArgs userArgs = new UserArgs(ugi.getUserName(), requestId, hostName,
-        null, null, null);
-    KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
-    keyArgs.setSize(value.length);
-    OutputStream outStream = storageHandler.newKeyWriter(keyArgs);
-    outStream.write(value);
-    outStream.close();
-  }
-
-   /**
-    * Close and release the resources.
-    */
-  @Override
-  public void close() {
-    storageHandler.close();
-  }
+      throws IOException;
+
+  /**
+   * Adds or Removes ACLs from a Bucket.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   *
+   * @throws IOException
+   */
+  void addBucketAcls(String volumeName, String bucketName,
+                     List<OzoneAcl> addAcls)
+      throws IOException;
+
+  /**
+   * Adds or Removes ACLs from a Bucket.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   *
+   * @throws IOException
+   */
+  void removeBucketAcls(String volumeName, String bucketName,
+                        List<OzoneAcl> removeAcls)
+      throws IOException;
+
+
+  /**
+   * Enables or disables Bucket Versioning.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   *
+   * @throws IOException
+   */
+  void setBucketVersioning(String volumeName, String bucketName,
+                           Versioning versioning)
+      throws IOException;
+
+  /**
+   * Sets the Storage Class of a Bucket.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   *
+   * @throws IOException
+   */
+  void setBucketStorageType(String volumeName, String bucketName,
+                            StorageType storageType)
+      throws IOException;
+
+  /**
+   * Deletes a bucket if it is empty.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   *
+   * @throws IOException
+   */
+  void deleteBucket(String volumeName, String bucketName)
+      throws IOException;
+
+  /**
+   * true if the bucket exists and user has read access
+   * to the bucket else throws Exception.
+   *
+   * @param volumeName Name of the Volume
+   *
+   * @throws IOException
+   */
+  void checkBucketAccess(String volumeName, String bucketName)
+      throws IOException;
+
+    /**
+     * Returns {@link OzoneBucket}.
+     *
+     * @param volumeName Name of the Volume
+     * @param bucketName Name of the Bucket
+     *
+     * @return OzoneBucket
+     *
+     * @throws IOException
+     */
+  OzoneBucket getBucketDetails(String volumeName, String bucketName)
+        throws IOException;
+
+  /**
+   * Returns the List of Buckets in the Volume.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketPrefix Bucket prefix to match
+   *
+   * @return KsmVolumeArgs Iterator
+   *
+   * @throws IOException
+   */
+  Iterator<OzoneBucket> listBuckets(String volumeName, String bucketPrefix)
+      throws IOException;
+
+  /**
+   * Writes a key in an existing bucket.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   * @param size Size of the data
+   *
+   * @return OutputStream
+   *
+   */
+  OzoneOutputStream createKey(String volumeName, String bucketName,
+                              String keyName, long size)
+      throws IOException;
+
+  /**
+   * Reads a key from an existing bucket.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   *
+   * @return LengthInputStream
+   *
+   * @throws IOException
+   */
+  OzoneInputStream getKey(String volumeName, String bucketName, String keyName)
+      throws IOException;
+
+
+  /**
+   * Deletes an existing key.
+   *
+   * @param volumeName Name of the Volume
+   *
+   * @throws IOException
+   */
+  void deleteKey(String volumeName, String bucketName, String keyName)
+      throws IOException;
+
+
+  /**
+   * Returns list of {@link OzoneKey} in Volume/Bucket.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   *
+   * @return OzoneKey
+   *
+   * @throws IOException
+   */
+  List<OzoneKey> listKeys(String volumeName, String bucketName,
+                            String keyPrefix)
+      throws IOException;
+
+
+  /**
+   * Get OzoneKey.
+   *
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   * @param keyName Key name
+   *
+   * @return OzoneKey
+   *
+   * @throws IOException
+   */
+  OzoneKey getkeyDetails(String volumeName, String bucketName,
+                        String keyName)
+      throws IOException;
+
+  /**
+   * Close and release the resources.
+   */
+  void close() throws IOException;
 }
 }

+ 505 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java

@@ -0,0 +1,505 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
+import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
+import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
+import org.apache.hadoop.ksm.protocolPB
+    .KeySpaceManagerProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ksm.protocolPB
+    .KeySpaceManagerProtocolPB;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.io.OzoneInputStream;
+import org.apache.hadoop.ozone.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts.Versioning;
+import org.apache.hadoop.ozone.protocolPB.KSMPBHelper;
+import org.apache.hadoop.scm.ScmConfigKeys;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.protocolPB
+    .StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.scm.protocolPB
+    .StorageContainerLocationProtocolPB;
+import org.apache.hadoop.scm.storage.ChunkOutputStream;
+import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * Ozone Client Implementation, it connects to KSM, SCM and DataNode
+ * to execute client calls. This uses RPC protocol for communication
+ * with the servers.
+ */
+public class OzoneClientImpl implements OzoneClient, Closeable {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OzoneClient.class);
+
+  private final StorageContainerLocationProtocolClientSideTranslatorPB
+      storageContainerLocationClient;
+  private final KeySpaceManagerProtocolClientSideTranslatorPB
+      keySpaceManagerClient;
+  private final XceiverClientManager xceiverClientManager;
+  private final int chunkSize;
+
+
+  private final UserGroupInformation ugi;
+  private final OzoneAcl.OzoneACLRights userRights;
+  private final OzoneAcl.OzoneACLRights groupRights;
+
+  /**
+   * Creates OzoneClientImpl instance with new OzoneConfiguration.
+   *
+   * @throws IOException
+   */
+  public OzoneClientImpl() throws IOException {
+    this(new OzoneConfiguration());
+  }
+
+   /**
+    * Creates OzoneClientImpl instance with the given configuration.
+    *
+    * @param conf
+    *
+    * @throws IOException
+    */
+  public OzoneClientImpl(Configuration conf) throws IOException {
+    Preconditions.checkNotNull(conf);
+    this.ugi = UserGroupInformation.getCurrentUser();
+    this.userRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_USER_RIGHTS,
+        KSMConfigKeys.OZONE_KSM_USER_RIGHTS_DEFAULT);
+    this.groupRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS,
+        KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS_DEFAULT);
+
+    long scmVersion =
+        RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
+    InetSocketAddress scmAddress =
+        OzoneClientUtils.getScmAddressForClients(conf);
+    RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
+        ProtobufRpcEngine.class);
+    this.storageContainerLocationClient =
+        new StorageContainerLocationProtocolClientSideTranslatorPB(
+            RPC.getProxy(StorageContainerLocationProtocolPB.class, scmVersion,
+                scmAddress, UserGroupInformation.getCurrentUser(), conf,
+                NetUtils.getDefaultSocketFactory(conf),
+                Client.getRpcTimeout(conf)));
+
+    long ksmVersion =
+        RPC.getProtocolVersion(KeySpaceManagerProtocolPB.class);
+    InetSocketAddress ksmAddress = OzoneClientUtils.getKsmAddress(conf);
+    RPC.setProtocolEngine(conf, KeySpaceManagerProtocolPB.class,
+        ProtobufRpcEngine.class);
+    this.keySpaceManagerClient =
+        new KeySpaceManagerProtocolClientSideTranslatorPB(
+            RPC.getProxy(KeySpaceManagerProtocolPB.class, ksmVersion,
+                ksmAddress, UserGroupInformation.getCurrentUser(), conf,
+                NetUtils.getDefaultSocketFactory(conf),
+                Client.getRpcTimeout(conf)));
+
+    this.xceiverClientManager = new XceiverClientManager(conf);
+
+    int configuredChunkSize = conf.getInt(
+        ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
+        ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT);
+    if(configuredChunkSize > ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE) {
+      LOG.warn("The chunk size ({}) is not allowed to be more than"
+              + " the maximum size ({}),"
+              + " resetting to the maximum size.",
+          configuredChunkSize, ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE);
+      chunkSize = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE;
+    } else {
+      chunkSize = configuredChunkSize;
+    }
+  }
+
+  @Override
+  public void createVolume(String volumeName)
+      throws IOException {
+    createVolume(volumeName, ugi.getUserName());
+  }
+
+  @Override
+  public void createVolume(String volumeName, String owner)
+      throws IOException {
+
+    createVolume(volumeName, owner, OzoneConsts.MAX_QUOTA_IN_BYTES,
+        (OzoneAcl[])null);
+  }
+
+  @Override
+  public void createVolume(String volumeName, String owner,
+                           OzoneAcl... acls)
+      throws IOException {
+    createVolume(volumeName, owner, OzoneConsts.MAX_QUOTA_IN_BYTES, acls);
+  }
+
+  @Override
+  public void createVolume(String volumeName, String owner,
+                           long quota)
+      throws IOException {
+    createVolume(volumeName, owner, quota, (OzoneAcl[])null);
+  }
+
+  @Override
+  public void createVolume(String volumeName, String owner,
+                           long quota, OzoneAcl... acls)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(owner);
+    Preconditions.checkNotNull(quota);
+    Preconditions.checkState(quota >= 0);
+    OzoneAcl userAcl =
+        new OzoneAcl(OzoneAcl.OzoneACLType.USER,
+            owner, userRights);
+    KsmVolumeArgs.Builder builder = KsmVolumeArgs.newBuilder();
+    builder.setAdminName(ugi.getUserName())
+        .setOwnerName(owner)
+        .setVolume(volumeName)
+        .setQuotaInBytes(quota)
+        .addOzoneAcls(KSMPBHelper.convertOzoneAcl(userAcl));
+
+    List<OzoneAcl> listOfAcls = new ArrayList<>();
+
+    //Group ACLs of the User
+    List<String> userGroups = Arrays.asList(UserGroupInformation
+        .createRemoteUser(owner).getGroupNames());
+    userGroups.stream().forEach((group) -> listOfAcls.add(
+        new OzoneAcl(OzoneAcl.OzoneACLType.GROUP, group, groupRights)));
+
+    //ACLs passed as argument
+    if(acls != null) {
+      listOfAcls.addAll(Arrays.asList(acls));
+    }
+
+    //Remove duplicates and set
+    for (OzoneAcl ozoneAcl :
+        listOfAcls.stream().distinct().collect(Collectors.toList())) {
+      builder.addOzoneAcls(KSMPBHelper.convertOzoneAcl(ozoneAcl));
+    }
+
+    LOG.info("Creating Volume: {}, with {} as owner and quota set to {} bytes.",
+        volumeName, owner, quota);
+    keySpaceManagerClient.createVolume(builder.build());
+  }
+
+  @Override
+  public void setVolumeOwner(String volumeName, String owner)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(owner);
+    keySpaceManagerClient.setOwner(volumeName, owner);
+  }
+
+  @Override
+  public void setVolumeQuota(String volumeName, long quota)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(quota);
+    Preconditions.checkState(quota >= 0);
+    keySpaceManagerClient.setQuota(volumeName, quota);
+  }
+
+  @Override
+  public OzoneVolume getVolumeDetails(String volumeName)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    KsmVolumeArgs volumeArgs =
+        keySpaceManagerClient.getVolumeInfo(volumeName);
+    return new OzoneVolume(volumeArgs);
+  }
+
+  @Override
+  public boolean checkVolumeAccess(String volumeName, OzoneAcl acl)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    return keySpaceManagerClient.checkVolumeAccess(volumeName,
+        KSMPBHelper.convertOzoneAcl(acl));
+  }
+
+  @Override
+  public void deleteVolume(String volumeName)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    keySpaceManagerClient.deleteVolume(volumeName);
+  }
+
+  @Override
+  public Iterator<OzoneVolume> listVolumes(String volumePrefix)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public Iterator<OzoneVolume> listVolumes(String volumePrefix,
+                                             String user)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public void createBucket(String volumeName, String bucketName)
+      throws IOException {
+    createBucket(volumeName, bucketName, Versioning.NOT_DEFINED,
+        StorageType.DEFAULT, (OzoneAcl[])null);
+  }
+
+  @Override
+  public void createBucket(String volumeName, String bucketName,
+                           Versioning versioning)
+      throws IOException {
+    createBucket(volumeName, bucketName, versioning,
+        StorageType.DEFAULT, (OzoneAcl[])null);
+  }
+
+  @Override
+  public void createBucket(String volumeName, String bucketName,
+                           StorageType storageType)
+      throws IOException {
+    createBucket(volumeName, bucketName, Versioning.NOT_DEFINED,
+        storageType, (OzoneAcl[])null);
+  }
+
+  @Override
+  public void createBucket(String volumeName, String bucketName,
+                           OzoneAcl... acls)
+      throws IOException {
+    createBucket(volumeName, bucketName, Versioning.NOT_DEFINED,
+        StorageType.DEFAULT, acls);
+  }
+
+  @Override
+  public void createBucket(String volumeName, String bucketName,
+                           Versioning versioning, StorageType storageType,
+                           OzoneAcl... acls)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(bucketName);
+    Preconditions.checkNotNull(versioning);
+    Preconditions.checkNotNull(storageType);
+
+    KsmBucketInfo.Builder builder = KsmBucketInfo.newBuilder();
+    builder.setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setStorageType(storageType)
+        .setIsVersionEnabled(getBucketVersioningProtobuf(
+        versioning));
+
+    String owner = ugi.getUserName();
+    final List<OzoneAcl> listOfAcls = new ArrayList<>();
+
+    //User ACL
+    OzoneAcl userAcl =
+        new OzoneAcl(OzoneAcl.OzoneACLType.USER,
+            owner, userRights);
+    listOfAcls.add(userAcl);
+
+    //Group ACLs of the User
+    List<String> userGroups = Arrays.asList(UserGroupInformation
+        .createRemoteUser(owner).getGroupNames());
+    userGroups.stream().forEach((group) -> listOfAcls.add(
+        new OzoneAcl(OzoneAcl.OzoneACLType.GROUP, group, groupRights)));
+
+    //ACLs passed as argument
+    if(acls != null) {
+      Arrays.stream(acls).forEach((acl) -> listOfAcls.add(acl));
+    }
+
+    //Remove duplicates and set
+    builder.setAcls(listOfAcls.stream().distinct()
+        .collect(Collectors.toList()));
+    LOG.info("Creating Bucket: {}/{}, with Versioning {} and " +
+        "Storage Type set to {}", volumeName, bucketName, versioning,
+        storageType);
+    keySpaceManagerClient.createBucket(builder.build());
+  }
+
+  /**
+   * Converts OzoneConts.Versioning enum to boolean.
+   *
+   * @param version
+   * @return corresponding boolean value
+   */
+  private boolean getBucketVersioningProtobuf(
+      Versioning version) {
+    if(version != null) {
+      switch(version) {
+      case ENABLED:
+        return true;
+      case NOT_DEFINED:
+      case DISABLED:
+      default:
+        return false;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public void addBucketAcls(String volumeName, String bucketName,
+                            List<OzoneAcl> addAcls)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public void removeBucketAcls(String volumeName, String bucketName,
+                               List<OzoneAcl> removeAcls)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public void setBucketVersioning(String volumeName, String bucketName,
+                                  Versioning versioning)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public void setBucketStorageType(String volumeName, String bucketName,
+                                   StorageType storageType)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public void deleteBucket(String volumeName, String bucketName)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public void checkBucketAccess(String volumeName, String bucketName)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public OzoneBucket getBucketDetails(String volumeName,
+                                        String bucketName)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public Iterator<OzoneBucket> listBuckets(String volumeName,
+                                            String bucketPrefix)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public OzoneOutputStream createKey(String volumeName, String bucketName,
+                                     String keyName, long size)
+      throws IOException {
+    String requestId = UUID.randomUUID().toString();
+    KsmKeyArgs keyArgs = new KsmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setDataSize(size)
+        .build();
+
+    String containerKey = buildContainerKey(volumeName, bucketName, keyName);
+    KsmKeyInfo keyInfo = keySpaceManagerClient.allocateKey(keyArgs);
+    // TODO: the following createContainer and key writes may fail, in which
+    // case we should revert the above allocateKey to KSM.
+    String containerName = keyInfo.getContainerName();
+    XceiverClientSpi xceiverClient = getContainer(containerName);
+    if (keyInfo.getShouldCreateContainer()) {
+      LOG.debug("Need to create container {} for key: {}/{}/{}", containerName,
+          volumeName, bucketName, keyName);
+      ContainerProtocolCalls.createContainer(xceiverClient, requestId);
+    }
+    // establish a connection to the container to write the key
+    ChunkOutputStream outputStream = new ChunkOutputStream(containerKey,
+        keyName, xceiverClientManager, xceiverClient, requestId, chunkSize);
+    return new OzoneOutputStream(outputStream);
+  }
+
+  /**
+   * Creates a container key from any number of components by combining all
+   * components with a delimiter.
+   *
+   * @param parts container key components
+   * @return container key
+   */
+  private static String buildContainerKey(String... parts) {
+    return '/' + StringUtils.join('/', parts);
+  }
+
+  private XceiverClientSpi getContainer(String containerName)
+      throws IOException {
+    Pipeline pipeline =
+        storageContainerLocationClient.getContainer(containerName);
+    return xceiverClientManager.acquireClient(pipeline);
+  }
+
+  @Override
+  public OzoneInputStream getKey(String volumeName, String bucketName,
+                                 String keyName)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public void deleteKey(String volumeName, String bucketName,
+                        String keyName)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public List<OzoneKey> listKeys(String volumeName, String bucketName,
+                                   String keyPrefix)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public OzoneKey getkeyDetails(String volumeName, String bucketName,
+                                  String keyName)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
+  public void close() throws IOException {
+    if(xceiverClientManager != null) {
+      xceiverClientManager.close();
+    }
+  }
+}

+ 120 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneKey.java

@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone;
+
+import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
+
+/**
+ * A class that encapsulates OzoneKey.
+ */
+public class OzoneKey {
+
+  /**
+   * Name of the Volume the Key belongs to.
+   */
+  private final String volumeName;
+  /**
+   * Name of the Bucket the Key belongs to.
+   */
+  private final String bucketName;
+  /**
+   * Name of the Key.
+   */
+  private final String keyName;
+  /**
+   * Name of the Container the Key resides in.
+   */
+  private final String containerName;
+  /**
+   * Name of the block id SCM assigned for the key.
+   */
+  private final String blockID;
+  /**
+   * Size of the data.
+   */
+  private final long dataSize;
+
+  /**
+   * Constructs OzoneKey from KsmKeyInfo.
+   *
+   * @param ksmKeyInfo
+   */
+  public OzoneKey(KsmKeyInfo ksmKeyInfo) {
+    this.volumeName = ksmKeyInfo.getVolumeName();
+    this.bucketName = ksmKeyInfo.getBucketName();
+    this.keyName = ksmKeyInfo.getKeyName();
+    this.containerName = ksmKeyInfo.getContainerName();
+    this.blockID = ksmKeyInfo.getBlockID();
+    this.dataSize = ksmKeyInfo.getDataSize();
+  }
+
+  /**
+   * Returns Volume Name associated with the Key.
+   *
+   * @return volumeName
+   */
+  public String getVolumeName() {
+    return volumeName;
+  }
+
+  /**
+   * Returns Bucket Name associated with the Key.
+   *
+   * @return bucketName
+   */
+  public String getBucketName(){
+    return bucketName;
+  }
+
+  /**
+   * Returns the Key Name.
+   *
+   * @return keyName
+   */
+  public String getKeyName() {
+    return keyName;
+  }
+
+  /**
+   * Returns Container Name associated with the Key.
+   *
+   * @return containerName
+   */
+  public String getContainerName() {
+    return containerName;
+  }
+
+  /**
+   * Returns BlockID associated with the Key.
+   *
+   * @return blockID
+   */
+  public String getBlockID() {
+    return blockID;
+  }
+
+  /**
+   * Returns the size of the data.
+   *
+   * @return dataSize
+   */
+  public long getDataSize() {
+    return dataSize;
+  }
+}

+ 107 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneVolume.java

@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone;
+
+import org.apache.hadoop.ksm.helpers.KsmOzoneAclMap;
+import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
+
+/**
+ * A class that encapsulates OzoneVolume.
+ */
+public class OzoneVolume {
+
+  /**
+   * Admin Name of the Volume.
+   */
+  private final String adminName;
+  /**
+   * Owner of the Volume.
+   */
+  private final String ownerName;
+  /**
+   * Name of the Volume.
+   */
+  private final String volumeName;
+  /**
+   * Quota allocated for the Volume.
+   */
+  private final long quotaInBytes;
+  /**
+   * Volume ACLs.
+   */
+  private final KsmOzoneAclMap aclMap;
+
+  /**
+   * Constructs OzoneVolume from KsmVolumeArgs.
+   *
+   * @param ksmVolumeArgs
+   */
+  public OzoneVolume(KsmVolumeArgs ksmVolumeArgs) {
+    this.adminName = ksmVolumeArgs.getAdminName();
+    this.ownerName = ksmVolumeArgs.getOwnerName();
+    this.volumeName = ksmVolumeArgs.getVolume();
+    this.quotaInBytes = ksmVolumeArgs.getQuotaInBytes();
+    this.aclMap = ksmVolumeArgs.getAclMap();
+  }
+
+  /**
+   * Returns Volume's admin name.
+   *
+   * @return adminName
+   */
+  public String getAdminName() {
+    return adminName;
+  }
+
+  /**
+   * Returns Volume's owner name.
+   *
+   * @return ownerName
+   */
+  public String getOwnerName() {
+    return ownerName;
+  }
+
+  /**
+   * Returns Volume name.
+   *
+   * @return volumeName
+   */
+  public String getVolumeName() {
+    return volumeName;
+  }
+
+  /**
+   * Returns Quota allocated for the Volume in bytes.
+   *
+   * @return quotaInBytes
+   */
+  public long getQuota() {
+    return quotaInBytes;
+  }
+
+  /**
+   * Returns OzoneAcl list associated with the Volume.
+   *
+   * @return aclMap
+   */
+  public KsmOzoneAclMap getAclMap() {
+    return aclMap;
+  }
+}

+ 52 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/io/OzoneInputStream.java

@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.io;
+
+import org.apache.hadoop.scm.storage.ChunkInputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * OzoneInputStream is used to read data from Ozone.
+ * It uses SCM's {@link ChunkInputStream} for reading the data.
+ */
+public class OzoneInputStream extends InputStream {
+
+  private final ChunkInputStream inputStream;
+
+  /**
+   * Constructs OzoneInputStream with ChunkInputStream.
+   *
+   * @param inputStream
+   */
+  public OzoneInputStream(ChunkInputStream inputStream) {
+    this.inputStream = inputStream;
+  }
+
+  @Override
+  public int read() throws IOException {
+    return inputStream.read();
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    inputStream.close();
+  }
+
+}

+ 62 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/io/OzoneOutputStream.java

@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.io;
+
+import org.apache.hadoop.scm.storage.ChunkOutputStream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * OzoneOutputStream is used to write data into Ozone.
+ * It uses SCM's {@link ChunkOutputStream} for writing the data.
+ */
+public class OzoneOutputStream extends OutputStream {
+
+  private final ChunkOutputStream outputStream;
+
+  /**
+   * Constructs OzoneOutputStream with ChunkOutputStream.
+   *
+   * @param outputStream
+   */
+  public OzoneOutputStream(ChunkOutputStream outputStream) {
+    this.outputStream = outputStream;
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    outputStream.write(b);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    outputStream.write(b, off, len);
+  }
+
+  @Override
+  public synchronized void flush() throws IOException {
+    outputStream.flush();
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    //commitKey can be done here, if needed.
+    outputStream.close();
+  }
+}

+ 23 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/io/package-info.java

@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.io;
+
+/**
+ * This package contains Ozone I/O classes.
+ */

+ 9 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestOzoneClient.java → hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestOzoneClientImpl.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone;
 package org.apache.hadoop.ozone;
 
 
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.ozone.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.web.exceptions.OzoneException;
 import org.apache.hadoop.ozone.web.exceptions.OzoneException;
 import org.junit.AfterClass;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Assert;
@@ -31,10 +32,10 @@ import java.util.UUID;
 /**
 /**
  * This class is to test all the public facing APIs of Ozone Client.
  * This class is to test all the public facing APIs of Ozone Client.
  */
  */
-public class TestOzoneClient {
+public class TestOzoneClientImpl {
 
 
   private static MiniOzoneCluster cluster = null;
   private static MiniOzoneCluster cluster = null;
-  private static OzoneClient ozClient = null;
+  private static OzoneClientImpl ozClient = null;
 
 
   /**
   /**
    * Create a MiniDFSCluster for testing.
    * Create a MiniDFSCluster for testing.
@@ -51,7 +52,7 @@ public class TestOzoneClient {
         OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
         OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
     cluster = new MiniOzoneCluster.Builder(conf)
     cluster = new MiniOzoneCluster.Builder(conf)
         .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
         .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
-    ozClient = new OzoneClient(conf);
+    ozClient = new OzoneClientImpl(conf);
   }
   }
 
 
   @Test
   @Test
@@ -76,7 +77,8 @@ public class TestOzoneClient {
   public void testCreateVolumeWithQuota()
   public void testCreateVolumeWithQuota()
       throws IOException, OzoneException {
       throws IOException, OzoneException {
     String volumeName = UUID.randomUUID().toString();
     String volumeName = UUID.randomUUID().toString();
-    ozClient.createVolume(volumeName, "test", "10GB");
+    ozClient.createVolume(volumeName, "test",
+        10000000000L);
   }
   }
 
 
   @Test
   @Test
@@ -173,7 +175,9 @@ public class TestOzoneClient {
     String value = "sample value";
     String value = "sample value";
     ozClient.createVolume(volumeName);
     ozClient.createVolume(volumeName);
     ozClient.createBucket(volumeName, bucketName);
     ozClient.createBucket(volumeName, bucketName);
-    ozClient.putKey(volumeName, bucketName, keyName, value.getBytes());
+    OzoneOutputStream out = ozClient.createKey(volumeName, bucketName,
+        keyName, value.getBytes().length);
+    out.write(value.getBytes());
     //Assert has to be done.
     //Assert has to be done.
   }
   }