浏览代码

HDFS-12572. Ozone: OzoneFileSystem: delete/list status/rename/mkdir APIs. Contributed by Mukul Kumar Singh.

Xiaoyu Yao 7 年之前
父节点
当前提交
7c88653735
共有 15 个文件被更改,包括 536 次插入78 次删除
  1. 2 2
      hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
  2. 3 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
  3. 8 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java
  4. 31 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneBucket.java
  5. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/exceptions/OzoneExceptionMapper.java
  6. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/BucketProcessTemplate.java
  7. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/KeyProcessTemplate.java
  8. 5 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
  9. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataStore.java
  10. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/RocksDBStore.java
  11. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMetadataStore.java
  12. 4 15
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
  13. 4 1
      hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/Constants.java
  14. 452 43
      hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
  15. 15 0
      hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java

+ 2 - 2
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -1461,13 +1461,13 @@
 
 <!-- Ozone file system properties -->
 <property>
-  <name>fs.ozfs.impl</name>
+  <name>fs.o3.impl</name>
   <value>org.apache.hadoop.fs.ozone.OzoneFileSystem</value>
   <description>The implementation class of the Ozone FileSystem.</description>
 </property>
 
 <property>
-  <name>fs.AbstractFileSystem.ozfs.impl</name>
+  <name>fs.AbstractFileSystem.o3.impl</name>
   <value>org.apache.hadoop.fs.ozone.OzFs</value>
   <description>The implementation class of the OzFs AbstractFileSystem.</description>
 </property>

+ 3 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java

@@ -99,6 +99,9 @@ public class TestCommonConfigurationFields extends TestConfigurationFieldsBase {
     // S3A properties are in a different subtree.
     xmlPrefixToSkipCompare.add("fs.s3a.");
 
+    // O3 properties are in a different subtree.
+    xmlPrefixToSkipCompare.add("fs.o3.");
+
     // WASB properties are in a different subtree.
     // - org.apache.hadoop.fs.azure.NativeAzureFileSystem
     xmlPrefixToSkipCompare.add("fs.wasb.impl");

+ 8 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java

@@ -306,8 +306,10 @@ public class KSMMetadataManagerImpl implements KSMMetadataManager {
       rangeResult = store.getSequentialRangeKVs(
           getBucketKey(volumeName, startBucket),
           maxNumOfBuckets + 1, filter);
-      //Remove start key from result.
-      rangeResult.remove(0);
+      if (!rangeResult.isEmpty()) {
+        //Remove start key from result.
+        rangeResult.remove(0);
+      }
     } else {
       rangeResult = store.getSequentialRangeKVs(null, maxNumOfBuckets, filter);
     }
@@ -350,8 +352,10 @@ public class KSMMetadataManagerImpl implements KSMMetadataManager {
       rangeResult = store.getSequentialRangeKVs(
           getDBKeyBytes(volumeName, bucketName, startKey),
           maxKeys + 1, filter);
-      //Remove start key from result.
-      rangeResult.remove(0);
+      if (!rangeResult.isEmpty()) {
+        //Remove start key from result.
+        rangeResult.remove(0);
+      }
     } else {
       rangeResult = store.getSequentialRangeKVs(null, maxKeys, filter);
     }

+ 31 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneBucket.java

@@ -39,6 +39,7 @@ import org.apache.http.entity.ContentType;
 import org.apache.http.entity.FileEntity;
 import org.apache.http.entity.InputStreamEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,6 +54,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.file.Path;
 import java.util.LinkedList;
@@ -506,6 +508,35 @@ public class OzoneBucket {
     }
   }
 
+  /**
+   * List keys in a bucket with the provided prefix, with paging results.
+   *
+   * @param prefix The prefix of the object keys
+   * @param maxResult max size per response
+   * @param prevKey the previous key for paging
+   */
+  public List<OzoneKey> listKeys(String prefix, int maxResult, String prevKey)
+      throws OzoneException {
+    HttpGet getRequest = null;
+    try {
+      final URI uri =  new URIBuilder(volume.getClient().getEndPointURI())
+          .setPath(OzoneConsts.KSM_KEY_PREFIX + getVolume().getVolumeName() +
+              OzoneConsts.KSM_KEY_PREFIX + getBucketName())
+          .setParameter(Header.OZONE_LIST_QUERY_PREFIX, prefix)
+          .setParameter(Header.OZONE_LIST_QUERY_MAXKEYS,
+              String.valueOf(maxResult))
+          .setParameter(Header.OZONE_LIST_QUERY_PREVKEY, prevKey)
+          .build();
+      final OzoneRestClient client = getVolume().getClient();
+      getRequest = client.getHttpGet(uri.toString());
+      return executeListKeys(getRequest, HttpClientBuilder.create().build());
+    } catch (IOException | URISyntaxException e) {
+      throw new OzoneRestClientException(e.getMessage());
+    } finally {
+      OzoneClientUtils.releaseConnection(getRequest);
+    }
+  }
+
   /**
    * Execute list Key.
    *

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/exceptions/OzoneExceptionMapper.java

@@ -36,7 +36,7 @@ public class OzoneExceptionMapper implements ExceptionMapper<OzoneException> {
 
   @Override
   public Response toResponse(OzoneException exception) {
-    LOG.info("Returning exception. ex: {}", exception.toJsonString());
+    LOG.debug("Returning exception. ex: {}", exception.toJsonString());
     MDC.clear();
     return Response.status((int)exception.getHttpCode())
       .entity(exception.toJsonString()).build();

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/BucketProcessTemplate.java

@@ -94,7 +94,7 @@ public abstract class BucketProcessTemplate {
       BucketArgs args = new BucketArgs(volume, bucket, userArgs);
       MDC.put(OZONE_RESOURCE, args.getResourceName());
       Response response =  doProcess(args);
-      LOG.info("Success");
+      LOG.debug("Success");
       MDC.clear();
       return response;
 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/KeyProcessTemplate.java

@@ -89,7 +89,7 @@ public abstract class KeyProcessTemplate {
       KeyArgs args = new KeyArgs(volume, bucket, key, userArgs);
       MDC.put(OZONE_RESOURCE, args.getResourceName());
       Response response =  doProcess(args, is, request, headers, info);
-      LOG.info("Success");
+      LOG.debug("Success");
       MDC.clear();
       return response;
 

+ 5 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java

@@ -298,8 +298,9 @@ public class LevelDBStore implements MetadataStore {
    * @param startKey a start key.
    * @param count max number of entries to return.
    * @param filters customized one or more {@link MetadataKeyFilter}.
-   * @return a list of entries found in the database.
-   * @throws IOException if an invalid startKey is given or other I/O errors.
+   * @return a list of entries found in the database or an empty list if the
+   * startKey is invalid.
+   * @throws IOException if there are I/O errors.
    * @throws IllegalArgumentException if count is less than 0.
    */
   private List<Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
@@ -321,7 +322,8 @@ public class LevelDBStore implements MetadataStore {
         dbIter.seekToFirst();
       } else {
         if (db.get(startKey) == null) {
-          throw new IOException("Invalid start key, not found in current db.");
+          // Key not found, return empty list
+          return result;
         }
         dbIter.seek(startKey);
       }

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataStore.java

@@ -90,8 +90,9 @@ public interface MetadataStore extends Closeable{
    * @param startKey a start key.
    * @param count max number of entries to return.
    * @param filters customized one or more {@link MetadataKeyFilter}.
-   * @return a list of entries found in the database.
-   * @throws IOException if an invalid startKey is given or other I/O errors.
+   * @return a list of entries found in the database or an empty list if the
+   * startKey is invalid.
+   * @throws IOException if there are I/O errors.
    * @throws IllegalArgumentException if count is less than 0.
    */
   List<Map.Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/RocksDBStore.java

@@ -160,7 +160,8 @@ public class RocksDBStore implements MetadataStore {
         it.seekToFirst();
       } else {
         if(get(startKey) == null) {
-          throw new IOException("Invalid start key, not found in current db");
+          // Key not found, return empty list
+          return result;
         }
         it.seek(startKey);
       }

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMetadataStore.java

@@ -334,10 +334,10 @@ public class TestMetadataStore {
 
   @Test
   public void testInvalidStartKey() throws IOException {
-    // If startKey is invalid, throws an invalid key exception.
-    expectedException.expect(IOException.class);
-    expectedException.expectMessage("Invalid start key");
-    store.getRangeKVs(getBytes("unknownKey"), MAX_GETRANGE_LENGTH);
+    // If startKey is invalid, the returned list should be empty.
+    List<Map.Entry<byte[], byte[]>> kvs =
+        store.getRangeKVs(getBytes("unknownKey"), MAX_GETRANGE_LENGTH);
+    Assert.assertEquals(kvs.size(), 0);
   }
 
   @Test

+ 4 - 15
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java

@@ -699,13 +699,8 @@ public class TestKeySpaceManager {
 
     // Provide an invalid bucket name as start key.
     listBucketArgs = new ListArgs(volArgs, null, 100, "unknown_bucket_name");
-    try {
-      storageHandler.listBuckets(listBucketArgs);
-      Assert.fail("Expecting an error when the given bucket name is invalid.");
-    } catch (Exception e) {
-      Assert.assertTrue(e instanceof IOException);
-      Assert.assertTrue(e.getMessage().contains(Status.INTERNAL_ERROR.name()));
-    }
+    ListBuckets buckets = storageHandler.listBuckets(listBucketArgs);
+    Assert.assertEquals(buckets.getBuckets().size(), 0);
 
     // Use all arguments.
     listBucketArgs = new ListArgs(volArgs, "b", 5, "bBucket_7");
@@ -824,14 +819,8 @@ public class TestKeySpaceManager {
 
     // Provide an invalid key name as start key.
     listKeyArgs = new ListArgs(bucketArgs, null, 100, "invalid_start_key");
-    try {
-      storageHandler.listKeys(listKeyArgs);
-      Assert.fail("Expecting an error when the given start"
-          + " key name is invalid.");
-    } catch (IOException e) {
-      GenericTestUtils.assertExceptionContains(
-          Status.INTERNAL_ERROR.name(), e);
-    }
+    ListKeys keys = storageHandler.listKeys(listKeyArgs);
+    Assert.assertEquals(keys.getKeyList().size(), 0);
 
     // Provide an invalid maxKeys argument.
     try {

+ 4 - 1
hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/Constants.java

@@ -23,7 +23,7 @@ package org.apache.hadoop.fs.ozone;
  */
 public class Constants {
 
-  public static final String OZONE_URI_SCHEME = "ozfs";
+  public static final String OZONE_URI_SCHEME = "o3";
 
   public static final String OZONE_DEFAULT_USER = "hdfs";
 
@@ -39,6 +39,9 @@ public class Constants {
 
   public static final String OZONE_URI_DELIMITER = "/";
 
+  /** Page size for Ozone listing operation. */
+  public static final int LISTING_PAGE_SIZE = 1024;
+
   private Constants() {
 
   }

+ 452 - 43
hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java

@@ -18,14 +18,26 @@
 
 package org.apache.hadoop.fs.ozone;
 
+import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.text.ParseException;
+import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.List;
 import java.util.Objects;
 
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.ozone.web.client.OzoneKey;
 import org.apache.hadoop.ozone.web.client.OzoneRestClient;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
@@ -33,16 +45,10 @@ import org.apache.http.client.utils.URIBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.ozone.web.client.OzoneBucket;
 import org.apache.hadoop.ozone.web.client.OzoneVolume;
@@ -55,6 +61,8 @@ import static org.apache.hadoop.fs.ozone.Constants.OZONE_URI_SCHEME;
 import static org.apache.hadoop.fs.ozone.Constants.OZONE_USER_DIR;
 import static org.apache.hadoop.fs.ozone.Constants.OZONE_HTTP_SCHEME;
 import static org.apache.hadoop.fs.ozone.Constants.OZONE_URI_DELIMITER;
+import static org.apache.hadoop.fs.ozone.Constants.LISTING_PAGE_SIZE;
+import static org.apache.hadoop.fs.ozone.Constants.BUFFER_DIR_KEY;
 
 /**
  * The Ozone Filesystem implementation.
@@ -181,10 +189,19 @@ public class OzoneFileSystem extends FileSystem {
           // path references a file and overwrite is disabled
           throw new FileAlreadyExistsException(f + " already exists");
         }
-        LOG.debug("Overwriting file {}", f);
-        //TODO: Delete the existing file here
+        LOG.trace("Overwriting file {}", f);
+        deleteObject(key);
       }
     } catch (FileNotFoundException ignored) {
+      // check if the parent directory needs to be created
+      Path parent = f.getParent();
+      try {
+        // create all the directories for the parent
+        FileStatus parentStatus = getFileStatus(parent);
+        LOG.trace("parent key:{} status:{}", key, parentStatus);
+      } catch (FileNotFoundException e) {
+        mkdirs(parent);
+      }
       // This exception needs to ignored as this means that the file currently
       // does not exists and a new file can thus be created.
     }
@@ -222,19 +239,221 @@ public class OzoneFileSystem extends FileSystem {
         + getClass().getSimpleName() + " FileSystem implementation");
   }
 
+  private class RenameIterator extends OzoneListingIterator {
+    private final String srcKey;
+    private final String dstKey;
+
+    RenameIterator(Path srcPath, Path dstPath)
+        throws IOException {
+      super(srcPath, true);
+      srcKey = pathToKey(srcPath);
+      dstKey = pathToKey(dstPath);
+      LOG.trace("rename from:{} to:{}", srcKey, dstKey);
+    }
+
+    boolean processKey(String key) throws IOException {
+      String newKeyName = dstKey.concat(key.substring(srcKey.length()));
+      return rename(key, newKeyName);
+    }
+
+    // TODO: currently rename work by copying the file, with changes in KSM,
+    // this operation can be made improved by renaming the keys in KSM directly.
+    private boolean rename(String src, String dst) throws IOException {
+      final LocalDirAllocator dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY);
+      final File tmpFile = dirAlloc.createTmpFileForWrite("output-",
+          LocalDirAllocator.SIZE_UNKNOWN, getConf());
+
+      try {
+        LOG.trace("rename by copying file from:{} to:{}", src, dst);
+        bucket.getKey(src, tmpFile.toPath());
+        bucket.putKey(dst, tmpFile);
+        return true;
+      } catch (OzoneException oe) {
+        String msg = String.format("Error when renaming key from:%s to:%s",
+            src, dst);
+        LOG.error(msg, oe);
+        throw new IOException(msg, oe);
+      } finally {
+        if (!tmpFile.delete()) {
+          LOG.warn("Can not delete tmpFile: " + tmpFile);
+        }
+      }
+    }
+  }
+
+  /**
+   * Check whether the source and destination path are valid and then perform
+   * rename by copying the data from source path to destination path.
+   *
+   * The rename operation is performed by copying data from source key
+   * to destination key. This is done by reading the source key data into a
+   * temporary file and then writing this temporary file to destination key.
+   * The temporary file is deleted after the rename operation.
+   * TODO: Optimize the operation by renaming keys in KSM.
+   *
+   * @param src source path for rename
+   * @param dst destination path for rename
+   * @return true if rename operation succeeded or
+   * if the src and dst have the same path and are of the same type
+   * @throws IOException on I/O errors or if the src/dst paths are invalid.
+   */
   @Override
   public boolean rename(Path src, Path dst) throws IOException {
-    return false;
+    LOG.trace("rename() from:{} to:{}", src, dst);
+
+    if (src.isRoot()) {
+      // Cannot rename root of file system
+      LOG.trace("Cannot rename the root of a filesystem");
+      return false;
+    }
+
+    // Cannot rename a directory to its own subdirectory
+    Path parent = dst.getParent();
+    while (parent != null && !src.equals(parent)) {
+      parent = parent.getParent();
+    }
+    if (parent != null) {
+      return false;
+    }
+
+    // Check if the source exists
+    FileStatus srcStatus;
+    try {
+      srcStatus = getFileStatus(src);
+    } catch (FileNotFoundException fnfe) {
+      // source doesn't exist, return
+      return false;
+    }
+
+    // Check if the destination exists
+    FileStatus dstStatus;
+    try {
+      dstStatus = getFileStatus(dst);
+    } catch (FileNotFoundException fnde) {
+      dstStatus = null;
+    }
+
+    if (dstStatus == null) {
+      // If dst doesn't exist, check whether dst parent dir exists or not
+      // if the parent exists, the source can still be renamed to dst path
+      dstStatus = getFileStatus(dst.getParent());
+      if (!dstStatus.isDirectory()) {
+        throw new IOException(String.format(
+            "Failed to rename %s to %s, %s is a file", src, dst,
+            dst.getParent()));
+      }
+    } else {
+      // if dst exists and source and destination are same,
+      // check both the src and dst are of same type
+      if (srcStatus.getPath().equals(dstStatus.getPath())) {
+        return !srcStatus.isDirectory();
+      } else if (dstStatus.isDirectory()) {
+        // If dst is a directory, rename source as subpath of it.
+        // for example rename /source to /dst will lead to /dst/source
+        dst = new Path(dst, src.getName());
+        FileStatus[] statuses;
+        try {
+          statuses = listStatus(dst);
+        } catch (FileNotFoundException fnde) {
+          statuses = null;
+        }
+
+        if (statuses != null && statuses.length > 0) {
+          // If dst exists and not a directory not empty
+          throw new FileAlreadyExistsException(String.format(
+              "Failed to rename %s to %s, file already exists or not empty!",
+              src, dst));
+        }
+      } else {
+        // If dst is not a directory
+        throw new FileAlreadyExistsException(String.format(
+            "Failed to rename %s to %s, file already exists!", src, dst));
+      }
+    }
+
+    if (srcStatus.isDirectory()) {
+      if (dst.toString().startsWith(src.toString())) {
+        LOG.trace("Cannot rename a directory to a subdirectory of self");
+        return false;
+      }
+    }
+    RenameIterator iterator = new RenameIterator(src, dst);
+    iterator.iterate();
+    return src.equals(dst) || delete(src, true);
+  }
+
+  private class DeleteIterator extends OzoneListingIterator {
+    private boolean recursive;
+    DeleteIterator(Path f, boolean recursive)
+        throws IOException {
+      super(f, recursive);
+      this.recursive = recursive;
+    }
+
+    boolean processKey(String key) throws IOException {
+      if (key.equals("")) {
+        LOG.trace("Skipping deleting root directory");
+        return true;
+      } else {
+        LOG.trace("deleting key:" + key);
+        boolean succeed = deleteObject(key);
+        // if recursive delete is requested ignore the return value of
+        // deleteObject and issue deletes for other keys.
+        return recursive || succeed;
+      }
+    }
   }
 
   @Override
   public boolean delete(Path f, boolean recursive) throws IOException {
-    return false;
+    LOG.trace("delete() path:{} recursive:{}", f, recursive);
+    try {
+      DeleteIterator iterator = new DeleteIterator(f, recursive);
+      return iterator.iterate();
+    } catch (FileNotFoundException e) {
+      LOG.error("Couldn't delete {} - does not exist", f);
+      return false;
+    }
+  }
+
+  private class ListStatusIterator extends OzoneListingIterator {
+    private  List<FileStatus> statuses = new ArrayList<>(LISTING_PAGE_SIZE);
+    private Path f;
+
+    ListStatusIterator(Path f) throws IOException  {
+      super(f, true);
+      this.f = f;
+    }
+
+    boolean processKey(String key) throws IOException {
+      Path keyPath = new Path(OZONE_URI_DELIMITER + key);
+      if (key.equals(getPathKey())) {
+        if (pathIsDirectory()) {
+          return true;
+        } else {
+          statuses.add(getFileStatus(keyPath));
+          return true;
+        }
+      }
+      // left with only subkeys now
+      if (keyPath.getParent().getName().equals(f.getName())) {
+        // skip keys which are for subdirectories of the directory
+        statuses.add(getFileStatus(keyPath));
+      }
+      return true;
+    }
+
+    FileStatus[] getStatuses() {
+      return statuses.toArray(new FileStatus[statuses.size()]);
+    }
   }
 
   @Override
   public FileStatus[] listStatus(Path f) throws IOException {
-    return null;
+    LOG.trace("listStatus() path:{}", f);
+    ListStatusIterator iterator = new ListStatusIterator(f);
+    iterator.iterate();
+    return iterator.getStatuses();
   }
 
   @Override
@@ -247,38 +466,67 @@ public class OzoneFileSystem extends FileSystem {
     return workingDir;
   }
 
-  @Override
-  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
-    return false;
-  }
-
-  private OzoneKey getKeyStatus(String keyName) {
-    try {
-      return bucket.getKeyInfo(keyName);
-    } catch (OzoneException e) {
-      LOG.trace("Key:{} does not exists", keyName);
-      return null;
-    }
+  /**
+   * Check whether the path is valid and then create directories.
+   * Directory is represented using a key with no value.
+   * All the non-existent parent directories are also created.
+   *
+   * @param path directory path to be created
+   * @return true if directory exists or created successfully.
+   * @throws IOException
+   */
+  private boolean mkdir(Path path) throws IOException {
+    Path fPart = path;
+    Path prevfPart = null;
+    do {
+      LOG.trace("validating path:{}", fPart);
+      try {
+        FileStatus fileStatus = getFileStatus(fPart);
+        if (fileStatus.isDirectory()) {
+          // If path exists and a directory, exit
+          break;
+        } else {
+          // Found a file here, rollback and delete newly created directories
+          LOG.trace("Found a file with same name as directory, path:{}", fPart);
+          if (prevfPart != null) {
+            delete(prevfPart, true);
+          }
+          throw new FileAlreadyExistsException(String.format(
+              "Can't make directory for path '%s', it is a file.", fPart));
+        }
+      } catch (FileNotFoundException fnfe) {
+        LOG.trace("creating directory for fpart:{}", fPart);
+        String key = pathToKey(fPart);
+        String dirKey = addTrailingSlashIfNeeded(key);
+        if (!createDirectory(dirKey)) {
+          // Directory creation failed here,
+          // rollback and delete newly created directories
+          LOG.trace("Directory creation failed, path:{}", fPart);
+          if (prevfPart != null) {
+            delete(prevfPart, true);
+          }
+          return false;
+        }
+      }
+      prevfPart = fPart;
+      fPart = fPart.getParent();
+    } while (fPart != null);
+    return true;
   }
 
-  private long getModifiedTime(String modifiedTime, String key) {
-    try {
-      return OzoneUtils.formatDate(modifiedTime);
-    } catch (ParseException pe) {
-      LOG.error("Invalid time:{} for key:{}", modifiedTime, key, pe);
-      return 0;
+  @Override
+  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+    LOG.trace("mkdir() path:{} ", f);
+    String key = pathToKey(f);
+    if (StringUtils.isEmpty(key)) {
+      return false;
     }
-  }
-
-  private boolean isDirectory(OzoneKey key) {
-    LOG.trace("key name:{} size:{}", key.getObjectInfo().getKeyName(),
-        key.getObjectInfo().getSize());
-    return key.getObjectInfo().getKeyName().endsWith(OZONE_URI_DELIMITER)
-        && (key.getObjectInfo().getSize() == 0);
+    return mkdir(f);
   }
 
   @Override
   public FileStatus getFileStatus(Path f) throws IOException {
+    LOG.trace("getFileStatus() path:{}", f);
     Path qualifiedPath = f.makeQualified(uri, workingDir);
     String key = pathToKey(qualifiedPath);
 
@@ -289,11 +537,10 @@ public class OzoneFileSystem extends FileSystem {
     }
 
     // consider this a file and get key status
-    OzoneKey meta = getKeyStatus(key);
-    if (meta == null && !key.endsWith(OZONE_URI_DELIMITER)) {
-      // if that fails consider this a directory
-      key += OZONE_URI_DELIMITER;
-      meta = getKeyStatus(key);
+    OzoneKey meta = getKeyInfo(key);
+    if (meta == null) {
+      key = addTrailingSlashIfNeeded(key);
+      meta = getKeyInfo(key);
     }
 
     if (meta == null) {
@@ -304,6 +551,7 @@ public class OzoneFileSystem extends FileSystem {
           getModifiedTime(meta.getObjectInfo().getModifiedOn(), key),
           qualifiedPath);
     } else {
+      //TODO: Fetch replication count from ratis config
       return new FileStatus(meta.getObjectInfo().getSize(), false, 1,
             getDefaultBlockSize(f),
           getModifiedTime(meta.getObjectInfo().getModifiedOn(), key),
@@ -311,13 +559,102 @@ public class OzoneFileSystem extends FileSystem {
     }
   }
 
+  /**
+   * Helper method to fetch the key metadata info.
+   * @param key key whose metadata information needs to be fetched
+   * @return metadata info of the key
+   */
+  private OzoneKey getKeyInfo(String key) {
+    try {
+      return bucket.getKeyInfo(key);
+    } catch (OzoneException e) {
+      LOG.trace("Key:{} does not exists", key);
+      return null;
+    }
+  }
+
+  /**
+   * Helper method to get the modified time of the key.
+   * @param key key to fetch the modified time
+   * @return last modified time of the key
+   */
+  private long getModifiedTime(String modifiedTime, String key) {
+    try {
+      return OzoneUtils.formatDate(modifiedTime);
+    } catch (ParseException pe) {
+      LOG.error("Invalid time:{} for key:{}", modifiedTime, key, pe);
+      return 0;
+    }
+  }
+
+  /**
+   * Helper method to check if an Ozone key is representing a directory.
+   * @param key key to be checked as a directory
+   * @return true if key is a directory, false otherwise
+   */
+  private boolean isDirectory(OzoneKey key) {
+    LOG.trace("key name:{} size:{}", key.getObjectInfo().getKeyName(),
+        key.getObjectInfo().getSize());
+    return key.getObjectInfo().getKeyName().endsWith(OZONE_URI_DELIMITER)
+        && (key.getObjectInfo().getSize() == 0);
+  }
+
+  /**
+   * Helper method to list entries matching the key name in bucket.
+   * @param dirKey key prefix for listing the keys
+   * @param lastKey last iterated key
+   * @return List of Keys
+   */
+  List<OzoneKey> listKeys(String dirKey, String lastKey)
+      throws IOException {
+    LOG.trace("list keys dirKey:{} lastKey:{}", dirKey, lastKey);
+    try {
+      return bucket.listKeys(dirKey, LISTING_PAGE_SIZE, lastKey);
+    } catch (OzoneException oe) {
+      LOG.error("list keys failed dirKey:{} lastKey:{}", dirKey, lastKey, oe);
+      throw new IOException("List keys failed " + oe.getMessage());
+    }
+  }
+
+  /**
+   * Helper method to create an directory specified by key name in bucket.
+   * @param keyName key name to be created as directory
+   * @return true if the key is created, false otherwise
+   */
+  private boolean createDirectory(String keyName) {
+    try {
+      LOG.trace("creating dir for key:{}", keyName);
+      bucket.putKey(keyName, "");
+      return true;
+    } catch (OzoneException oe) {
+      LOG.error("create key failed for key:{}", keyName, oe);
+      return false;
+    }
+  }
+
+  /**
+   * Helper method to delete an object specified by key name in bucket.
+   * @param keyName key name to be deleted
+   * @return true if the key is deleted, false otherwise
+   */
+  private boolean deleteObject(String keyName) {
+    LOG.trace("issuing delete for key" + keyName);
+    try {
+      bucket.deleteKey(keyName);
+      return true;
+    } catch (OzoneException oe) {
+      LOG.error("delete key failed " + oe.getMessage());
+      return false;
+    }
+  }
+
   /**
    * Turn a path (relative or otherwise) into an Ozone key.
    *
    * @param path the path of the file.
    * @return the key of the object that represents the file.
    */
-  private String pathToKey(Path path) {
+  public String pathToKey(Path path) {
     Objects.requireNonNull(path, "Path can not be null!");
     if (!path.isAbsolute()) {
       path = new Path(workingDir, path);
@@ -328,6 +665,20 @@ public class OzoneFileSystem extends FileSystem {
     return key;
   }
 
+  /**
+   * Add trailing delimiter to path if it is already not present.
+   *
+   * @param key the ozone Key which needs to be appended
+   * @return delimiter appended key
+   */
+  String addTrailingSlashIfNeeded(String key) {
+    if (StringUtils.isNotEmpty(key) && !key.endsWith(OZONE_URI_DELIMITER)) {
+      return key + OZONE_URI_DELIMITER;
+    } else {
+      return key;
+    }
+  }
+
   @Override
   public String toString() {
     return "OzoneFileSystem{URI=" + uri + ", "
@@ -336,4 +687,62 @@ public class OzoneFileSystem extends FileSystem {
         + "statistics=" + statistics
         + "}";
   }
+
+  private abstract class OzoneListingIterator {
+    private final Path path;
+    private final boolean recursive;
+    private final FileStatus status;
+    private String pathKey;
+
+    OzoneListingIterator(Path path, boolean recursive)
+        throws IOException {
+      this.path = path;
+      this.recursive = recursive;
+      this.status = getFileStatus(path);
+      this.pathKey = pathToKey(path);
+      if (status.isDirectory()) {
+        this.pathKey = addTrailingSlashIfNeeded(pathKey);
+      }
+    }
+
+    abstract boolean processKey(String key) throws IOException;
+
+    // iterates all the keys in the particular path
+    boolean iterate() throws IOException {
+      LOG.trace("Iterating path {} - recursive {}", path, recursive);
+      if (status.isDirectory()) {
+        LOG.trace("Iterating directory:{}", pathKey);
+        String lastKey = pathKey;
+        while (true) {
+          List<OzoneKey> ozoneKeys = listKeys(pathKey, lastKey);
+          LOG.trace("number of sub keys:{}", ozoneKeys.size());
+          if (ozoneKeys.size() == 0) {
+            return processKey(pathKey);
+          } else {
+            if (!recursive) {
+              throw new PathIsNotEmptyDirectoryException(path.toString());
+            } else {
+              for (OzoneKey ozoneKey : ozoneKeys) {
+                lastKey = ozoneKey.getObjectInfo().getKeyName();
+                if (!processKey(lastKey)) {
+                  return false;
+                }
+              }
+            }
+          }
+        }
+      } else {
+        LOG.trace("iterating file:{}", path);
+        return processKey(pathKey);
+      }
+    }
+
+    String getPathKey() {
+      return pathKey;
+    }
+
+    boolean pathIsDirectory() {
+      return status.isDirectory();
+    }
+  }
 }

+ 15 - 0
hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java

@@ -123,4 +123,19 @@ public class TestOzoneFileInterfaces {
       Assert.assertEquals(out, data);
     }
   }
+
+  @Test
+  public void testDirectory() throws IOException {
+    String dirPath = RandomStringUtils.randomAlphanumeric(5);
+    Path path = new Path("/" + dirPath);
+    Assert.assertTrue(fs.mkdirs(path));
+
+    FileStatus status = fs.getFileStatus(path);
+    Assert.assertTrue(status.isDirectory());
+    Assert.assertEquals(status.getLen(), 0);
+
+    FileStatus[] statusList = fs.listStatus(new Path("/"));
+    Assert.assertEquals(statusList.length, 1);
+    Assert.assertEquals(statusList[0], status);
+  }
 }