Browse Source

HDDS-1461. Optimize listStatus api in OzoneFileSystem (#782)

Lokesh Jain 6 years ago
parent
commit
c1d7d68c78
24 changed files with 773 additions and 179 deletions
  1. 4 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java
  2. 17 0
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
  3. 18 1
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
  4. 8 0
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
  5. 13 0
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
  6. 1 0
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
  7. 2 1
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
  8. 89 0
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneFSUtils.java
  9. 6 5
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneFileStatus.java
  10. 14 0
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
  11. 40 7
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
  12. 19 1
      hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
  13. 141 38
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
  14. 17 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
  15. 4 1
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
  16. 25 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
  17. 4 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/fs/OzoneManagerFS.java
  18. 26 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
  19. 211 0
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
  20. 14 0
      hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
  21. 27 123
      hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java
  22. 4 0
      hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
  23. 41 1
      hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java
  24. 28 1
      hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java

+ 4 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java

@@ -298,4 +298,8 @@ public class RDBStore implements DBStore {
   public File getDbLocation() {
     return dbLocation;
   }
+
+  public CodecRegistry getCodecRegistry() {
+    return codecRegistry;
+  }
 }

+ 17 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java

@@ -528,6 +528,23 @@ public class OzoneBucket extends WithMetadata {
             recursive);
   }
 
+  /**
+   * List the status for a file or a directory and its contents.
+   *
+   * @param keyName    Absolute path of the entry to be listed
+   * @param recursive  For a directory if true all the descendants of a
+   *                   particular directory are listed
+   * @param startKey   Key from which listing needs to start. If startKey exists
+   *                   its status is included in the final list.
+   * @param numEntries Number of entries to list from the start key
+   * @return list of file status
+   */
+  public List<OzoneFileStatus> listStatus(String keyName, boolean recursive,
+      String startKey, long numEntries) throws IOException {
+    return proxy
+        .listStatus(volumeName, name, keyName, recursive, startKey, numEntries);
+  }
+
   /**
    * An Iterator to iterate over {@link OzoneKey} list.
    */

+ 18 - 1
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java

@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -604,4 +604,21 @@ public interface ClientProtocol {
   OzoneOutputStream createFile(String volumeName, String bucketName,
       String keyName, long size, ReplicationType type, ReplicationFactor factor,
       boolean overWrite, boolean recursive) throws IOException;
+
+  /**
+   * List the status for a file or a directory and its contents.
+   *
+   * @param volumeName Volume name
+   * @param bucketName Bucket name
+   * @param keyName    Absolute path of the entry to be listed
+   * @param recursive  For a directory if true all the descendants of a
+   *                   particular directory are listed
+   * @param startKey   Key from which listing needs to start. If startKey exists
+   *                   its status is included in the final list.
+   * @param numEntries Number of entries to list from the start key
+   * @return list of file status
+   */
+  List<OzoneFileStatus> listStatus(String volumeName, String bucketName,
+      String keyName, boolean recursive, String startKey, long numEntries)
+      throws IOException;
 }

+ 8 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java

@@ -1113,4 +1113,12 @@ public class RestClient implements ClientProtocol {
     throw new UnsupportedOperationException(
         "Ozone REST protocol does not " + "support this operation.");
   }
+
+  @Override
+  public List<OzoneFileStatus> listStatus(String volumeName, String bucketName,
+      String keyName, boolean recursive, String startKey, long numEntries)
+      throws IOException {
+    throw new UnsupportedOperationException(
+        "Ozone REST protocol does not " + "support this operation.");
+  }
 }

+ 13 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java

@@ -993,6 +993,19 @@ public class RpcClient implements ClientProtocol, KeyProviderTokenIssuer {
         factor);
   }
 
+  @Override
+  public List<OzoneFileStatus> listStatus(String volumeName, String bucketName,
+      String keyName, boolean recursive, String startKey, long numEntries)
+      throws IOException {
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .build();
+    return ozoneManagerClient
+        .listStatus(keyArgs, recursive, startKey, numEntries);
+  }
+
   private OzoneInputStream createInputStream(OmKeyInfo keyInfo,
       String requestId) throws IOException {
     LengthInputStream lengthInputStream = KeyInputStream

+ 1 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java

@@ -189,6 +189,7 @@ public final class OmUtils {
     case ListMultiPartUploadParts:
     case GetFileStatus:
     case LookupFile:
+    case ListStatus:
       return true;
     case CreateVolume:
     case SetVolumeProperty:

+ 2 - 1
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java

@@ -58,7 +58,8 @@ public enum OMAction implements AuditAction {
   GET_FILE_STATUS,
   CREATE_DIRECTORY,
   CREATE_FILE,
-  LOOKUP_FILE;
+  LOOKUP_FILE,
+  LIST_STATUS;
 
   @Override
   public String getAction() {

+ 89 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneFSUtils.java

@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.helpers;
+
+import org.apache.hadoop.fs.Path;
+
+import java.nio.file.Paths;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+
+/**
+ * Utility class for OzoneFileSystem.
+ */
+public final class OzoneFSUtils {
+
+  private OzoneFSUtils() {}
+
+  /**
+   * Returns string representation of path after removing the leading slash.
+   */
+  public static String pathToKey(Path path) {
+    return path.toString().substring(1);
+  }
+
+  /**
+   * Returns string representation of the input path parent. The function adds
+   * a trailing slash if it does not exist and returns an empty string if the
+   * parent is root.
+   */
+  public static String getParent(String keyName) {
+    java.nio.file.Path parentDir = Paths.get(keyName).getParent();
+    if (parentDir == null) {
+      return "";
+    }
+    return addTrailingSlashIfNeeded(parentDir.toString());
+  }
+
+  /**
+   * The function returns immediate child of given ancestor in a particular
+   * descendant. For example if ancestor is /a/b and descendant is /a/b/c/d/e
+   * the function should return /a/b/c/. If the descendant itself is the
+   * immediate child then it is returned as is without adding a trailing slash.
+   * This is done to distinguish files from a directory as in ozone files do
+   * not carry a trailing slash.
+   */
+  public static String getImmediateChild(String descendant, String ancestor) {
+    ancestor =
+        !ancestor.isEmpty() ? addTrailingSlashIfNeeded(ancestor) : ancestor;
+    if (!descendant.startsWith(ancestor)) {
+      return null;
+    }
+    java.nio.file.Path descendantPath = Paths.get(descendant);
+    java.nio.file.Path ancestorPath = Paths.get(ancestor);
+    int ancestorPathNameCount =
+        ancestor.isEmpty() ? 0 : ancestorPath.getNameCount();
+    if (descendantPath.getNameCount() - ancestorPathNameCount > 1) {
+      return addTrailingSlashIfNeeded(
+          ancestor + descendantPath.getName(ancestorPathNameCount));
+    }
+    return descendant;
+  }
+
+  public static String addTrailingSlashIfNeeded(String key) {
+    if (!key.endsWith(OZONE_URI_DELIMITER)) {
+      return key + OZONE_URI_DELIMITER;
+    } else {
+      return key;
+    }
+  }
+
+  public static boolean isFile(String keyName) {
+    return !keyName.endsWith(OZONE_URI_DELIMITER);
+  }
+}

+ 6 - 5
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OzoneFileStatus.java

@@ -18,11 +18,11 @@
 
 package org.apache.hadoop.ozone.om.helpers;
 
-import org.apache.hadoop.fs.FSProtos.FileStatusProto;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.protocolPB.PBHelper;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneFileStatusProto;
 
 import java.io.IOException;
 import java.net.URI;
@@ -53,13 +53,14 @@ public class OzoneFileStatus extends FileStatus {
     super(0, true, 0, 0, 0, getPath(keyName));
   }
 
-  public FileStatusProto getProtobuf() throws IOException {
-    return PBHelper.convert(this);
+  public OzoneFileStatusProto getProtobuf() throws IOException {
+    return OzoneFileStatusProto.newBuilder().setStatus(PBHelper.convert(this))
+        .build();
   }
 
-  public static OzoneFileStatus getFromProtobuf(FileStatusProto response)
+  public static OzoneFileStatus getFromProtobuf(OzoneFileStatusProto response)
       throws IOException {
-    return new OzoneFileStatus(PBHelper.convert(response));
+    return new OzoneFileStatus(PBHelper.convert(response.getStatus()));
   }
 
   public static Path getPath(String keyName) {

+ 14 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java

@@ -450,5 +450,19 @@ public interface OzoneManagerProtocol
    *                     invalid arguments
    */
   OmKeyInfo lookupFile(OmKeyArgs keyArgs) throws IOException;
+
+  /**
+   * List the status for a file or a directory and its contents.
+   *
+   * @param keyArgs    Key args
+   * @param recursive  For a directory if true all the descendants of a
+   *                   particular directory are listed
+   * @param startKey   Key from which listing needs to start. If startKey exists
+   *                   its status is included in the final list.
+   * @param numEntries Number of entries to list from the start key
+   * @return list of file status
+   */
+  List<OzoneFileStatus> listStatus(OmKeyArgs keyArgs, boolean recursive,
+      String startKey, long numEntries) throws IOException;
 }
 

+ 40 - 7
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java

@@ -55,7 +55,13 @@ import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneFileStatusProto;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.LookupFileRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.LookupFileResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateFileRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateFileResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListStatusRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListStatusResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateDirectoryRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusRequest;
@@ -1281,14 +1287,13 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
         .setBucketName(args.getBucketName())
         .setKeyName(args.getKeyName())
         .build();
-    OzoneManagerProtocolProtos.LookupFileRequest lookupFileRequest =
-        OzoneManagerProtocolProtos.LookupFileRequest.newBuilder()
+    LookupFileRequest lookupFileRequest = LookupFileRequest.newBuilder()
             .setKeyArgs(keyArgs)
             .build();
     OMRequest omRequest = createOMRequest(Type.LookupFile)
         .setLookupFileRequest(lookupFileRequest)
         .build();
-    OzoneManagerProtocolProtos.LookupFileResponse resp =
+    LookupFileResponse resp =
         handleError(submitRequest(omRequest)).getLookupFileResponse();
     return OmKeyInfo.getFromProtobuf(resp.getKeyInfo());
   }
@@ -1304,8 +1309,7 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
         .setType(args.getType())
         .setFactor(args.getFactor())
         .build();
-    OzoneManagerProtocolProtos.CreateFileRequest createFileRequest =
-        OzoneManagerProtocolProtos.CreateFileRequest.newBuilder()
+    CreateFileRequest createFileRequest = CreateFileRequest.newBuilder()
             .setKeyArgs(keyArgs)
             .setIsOverwrite(overWrite)
             .setIsRecursive(recursive)
@@ -1313,9 +1317,38 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
     OMRequest omRequest = createOMRequest(Type.CreateFile)
         .setCreateFileRequest(createFileRequest)
         .build();
-    OzoneManagerProtocolProtos.CreateFileResponse resp =
+    CreateFileResponse resp =
         handleError(submitRequest(omRequest)).getCreateFileResponse();
     return new OpenKeySession(resp.getID(),
         OmKeyInfo.getFromProtobuf(resp.getKeyInfo()), resp.getOpenVersion());
   }
+
+  @Override
+  public List<OzoneFileStatus> listStatus(OmKeyArgs args, boolean recursive,
+      String startKey, long numEntries) throws IOException {
+    KeyArgs keyArgs = KeyArgs.newBuilder()
+        .setVolumeName(args.getVolumeName())
+        .setBucketName(args.getBucketName())
+        .setKeyName(args.getKeyName())
+        .build();
+    ListStatusRequest listStatusRequest =
+        ListStatusRequest.newBuilder()
+            .setKeyArgs(keyArgs)
+            .setRecursive(recursive)
+            .setStartKey(startKey)
+            .setNumEntries(numEntries)
+            .build();
+    OMRequest omRequest = createOMRequest(Type.ListStatus)
+        .setListStatusRequest(listStatusRequest)
+        .build();
+    ListStatusResponse listStatusResponse =
+        handleError(submitRequest(omRequest)).getListStatusResponse();
+    List<OzoneFileStatus> statusList =
+        new ArrayList<>(listStatusResponse.getStatusesCount());
+    for (OzoneFileStatusProto fileStatus : listStatusResponse
+        .getStatusesList()) {
+      statusList.add(OzoneFileStatus.getFromProtobuf(fileStatus));
+    }
+    return statusList;
+  }
 }

+ 19 - 1
hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto

@@ -85,6 +85,7 @@ enum Type {
   CreateDirectory = 71;
   CreateFile = 72;
   LookupFile = 73;
+  ListStatus = 74;
 }
 
 message OMRequest {
@@ -141,6 +142,7 @@ message OMRequest {
   optional CreateDirectoryRequest           createDirectoryRequest         = 71;
   optional CreateFileRequest                createFileRequest              = 72;
   optional LookupFileRequest                lookupFileRequest              = 73;
+  optional ListStatusRequest                listStatusRequest              = 74;
 }
 
 message OMResponse {
@@ -200,6 +202,7 @@ message OMResponse {
   optional CreateDirectoryResponse            createDirectoryResponse      = 71;
   optional CreateFileResponse                 createFileResponse           = 72;
   optional LookupFileResponse                 lookupFileResponse           = 73;
+  optional ListStatusResponse                 listStatusResponse           = 74;
 }
 
 enum Status {
@@ -561,12 +564,16 @@ message KeyInfo {
     optional FileEncryptionInfoProto fileEncryptionInfo = 12;
 }
 
+message OzoneFileStatusProto {
+    required hadoop.fs.FileStatusProto status = 1;
+}
+
 message GetFileStatusRequest {
     required KeyArgs keyArgs = 1;
 }
 
 message GetFileStatusResponse {
-    required hadoop.fs.FileStatusProto status = 1;
+    required OzoneFileStatusProto status = 1;
 }
 
 message CreateDirectoryRequest {
@@ -599,6 +606,17 @@ message LookupFileResponse {
     optional KeyInfo keyInfo = 1;
 }
 
+message ListStatusRequest {
+    required KeyArgs keyArgs = 1;
+    required bool recursive = 2;
+    required string startKey = 3;
+    required uint64 numEntries = 4;
+}
+
+message ListStatusResponse {
+    repeated OzoneFileStatusProto statuses = 1;
+}
+
 message CreateKeyRequest {
     required KeyArgs keyArgs = 1;
 }

+ 141 - 38
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java

@@ -32,8 +32,8 @@ import java.util.concurrent.TimeUnit;
 import java.security.GeneralSecurityException;
 import java.security.PrivilegedExceptionAction;
 
+import com.google.common.base.Strings;
 import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
@@ -67,6 +67,7 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
 import org.apache.hadoop.ozone.om.helpers.OmPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .KeyArgs;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -85,6 +86,10 @@ import org.apache.hadoop.util.Time;
 import org.apache.hadoop.utils.BackgroundService;
 import org.apache.hadoop.utils.db.BatchOperation;
 import org.apache.hadoop.utils.db.DBStore;
+import org.apache.hadoop.utils.db.CodecRegistry;
+import org.apache.hadoop.utils.db.RDBStore;
+import org.apache.hadoop.utils.db.TableIterator;
+import org.apache.hadoop.utils.db.Table;
 
 import com.google.common.base.Preconditions;
 
@@ -1360,10 +1365,10 @@ public class KeyManagerImpl implements KeyManager {
       // Check if this is the root of the filesystem.
       if (keyName.length() == 0) {
         validateBucket(volumeName, bucketName);
-        return new OzoneFileStatus(keyName);
+        return new OzoneFileStatus(OZONE_URI_DELIMITER);
       }
 
-      //Check if the key is a file.
+      // Check if the key is a file.
       String fileKeyBytes = metadataManager.getOzoneKey(
           volumeName, bucketName, keyName);
       OmKeyInfo fileKeyInfo = metadataManager.getKeyTable().get(fileKeyBytes);
@@ -1372,7 +1377,7 @@ public class KeyManagerImpl implements KeyManager {
         return new OzoneFileStatus(fileKeyInfo, scmBlockSize, false);
       }
 
-      String dirKey = addTrailingSlashIfNeeded(keyName);
+      String dirKey = OzoneFSUtils.addTrailingSlashIfNeeded(keyName);
       String dirKeyBytes = metadataManager.getOzoneKey(
           volumeName, bucketName, dirKey);
       OmKeyInfo dirKeyInfo = metadataManager.getKeyTable().get(dirKeyBytes);
@@ -1390,7 +1395,7 @@ public class KeyManagerImpl implements KeyManager {
           " bucket:" + bucketName + " key:" + keyName + " with error no " +
           "such file exists:");
       throw new OMException("Unable to get file status: volume: " +
-          volumeName + "bucket: " + bucketName + "key: " + keyName,
+          volumeName + " bucket: " + bucketName + " key: " + keyName,
           ResultCodes.FILE_NOT_FOUND);
     } finally {
       metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
@@ -1413,47 +1418,50 @@ public class KeyManagerImpl implements KeyManager {
     String bucketName = args.getBucketName();
     String keyName = args.getKeyName();
 
+    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
     try {
-      metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
-
-      // verify bucket exists
-      OmBucketInfo bucketInfo = getBucketInfo(volumeName, bucketName);
 
       // Check if this is the root of the filesystem.
       if (keyName.length() == 0) {
         return;
       }
 
-      verifyNoFilesInPath(volumeName, bucketName, Paths.get(keyName), false);
-      String dir = addTrailingSlashIfNeeded(keyName);
-      String dirDbKey =
-          metadataManager.getOzoneKey(volumeName, bucketName, dir);
-      FileEncryptionInfo encInfo = getFileEncryptionInfo(bucketInfo);
+      Path keyPath = Paths.get(keyName);
+      OzoneFileStatus status =
+          verifyNoFilesInPath(volumeName, bucketName, keyPath, false);
+      if (status != null && OzoneFSUtils.pathToKey(status.getPath())
+          .equals(keyName)) {
+        // if directory already exists
+        return;
+      }
       OmKeyInfo dirDbKeyInfo =
-          createDirectoryKeyInfo(volumeName, bucketName, dir, new ArrayList<>(),
-              ReplicationFactor.ONE, ReplicationType.RATIS, encInfo);
+          createDirectoryKey(volumeName, bucketName, keyName);
+      String dirDbKey = metadataManager
+          .getOzoneKey(volumeName, bucketName, dirDbKeyInfo.getKeyName());
       metadataManager.getKeyTable().put(dirDbKey, dirDbKeyInfo);
-
     } finally {
       metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
     }
   }
 
-  private OmKeyInfo createDirectoryKeyInfo(String volumeName, String bucketName,
-      String keyName, List<OmKeyLocationInfo> locations,
-      ReplicationFactor factor, ReplicationType type,
-      FileEncryptionInfo encInfo) {
+  private OmKeyInfo createDirectoryKey(String volumeName, String bucketName,
+      String keyName) throws IOException {
+    // verify bucket exists
+    OmBucketInfo bucketInfo = getBucketInfo(volumeName, bucketName);
+
+    String dir = OzoneFSUtils.addTrailingSlashIfNeeded(keyName);
+    FileEncryptionInfo encInfo = getFileEncryptionInfo(bucketInfo);
     return new OmKeyInfo.Builder()
         .setVolumeName(volumeName)
         .setBucketName(bucketName)
-        .setKeyName(keyName)
+        .setKeyName(dir)
         .setOmKeyLocationInfos(Collections.singletonList(
-            new OmKeyLocationInfoGroup(0, locations)))
+            new OmKeyLocationInfoGroup(0, new ArrayList<>())))
         .setCreationTime(Time.now())
         .setModificationTime(Time.now())
         .setDataSize(0)
-        .setReplicationType(type)
-        .setReplicationFactor(factor)
+        .setReplicationType(ReplicationType.RATIS)
+        .setReplicationFactor(ReplicationFactor.ONE)
         .setFileEncryptionInfo(encInfo)
         .build();
   }
@@ -1482,9 +1490,8 @@ public class KeyManagerImpl implements KeyManager {
     String keyName = args.getKeyName();
     OpenKeySession keySession;
 
+    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
     try {
-      metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
-
       OzoneFileStatus fileStatus;
       try {
         fileStatus = getFileStatus(args);
@@ -1531,8 +1538,8 @@ public class KeyManagerImpl implements KeyManager {
     String bucketName = args.getBucketName();
     String keyName = args.getKeyName();
 
+    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
     try {
-      metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
       OzoneFileStatus fileStatus = getFileStatus(args);
       if (fileStatus.isFile()) {
         return fileStatus.getKeyInfo();
@@ -1546,6 +1553,105 @@ public class KeyManagerImpl implements KeyManager {
         ResultCodes.NOT_A_FILE);
   }
 
+  /**
+   * List the status for a file or a directory and its contents.
+   *
+   * @param args       Key args
+   * @param recursive  For a directory if true all the descendants of a
+   *                   particular directory are listed
+   * @param startKey   Key from which listing needs to start. If startKey exists
+   *                   its status is included in the final list.
+   * @param numEntries Number of entries to list from the start key
+   * @return list of file status
+   */
+  public List<OzoneFileStatus> listStatus(OmKeyArgs args, boolean recursive,
+      String startKey, long numEntries) throws IOException {
+    Preconditions.checkNotNull(args, "Key args can not be null");
+    String volumeName = args.getVolumeName();
+    String bucketName = args.getBucketName();
+    String keyName = args.getKeyName();
+
+    List<OzoneFileStatus> fileStatusList = new ArrayList<>();
+    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    try {
+      if (Strings.isNullOrEmpty(startKey)) {
+        OzoneFileStatus fileStatus = getFileStatus(args);
+        if (fileStatus.isFile()) {
+          return Collections.singletonList(fileStatus);
+        }
+        startKey = OzoneFSUtils.addTrailingSlashIfNeeded(keyName);
+      }
+
+      String seekKeyInDb =
+          metadataManager.getOzoneKey(volumeName, bucketName, startKey);
+      String keyInDb = OzoneFSUtils.addTrailingSlashIfNeeded(
+          metadataManager.getOzoneKey(volumeName, bucketName, keyName));
+      TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
+          iterator = metadataManager.getKeyTable().iterator();
+      iterator.seek(seekKeyInDb);
+
+      if (!iterator.hasNext()) {
+        return Collections.emptyList();
+      }
+
+      if (iterator.key().equals(keyInDb)) {
+        // skip the key which needs to be listed
+        iterator.next();
+      }
+
+      while (iterator.hasNext() && numEntries - fileStatusList.size() > 0) {
+        String entryInDb = iterator.key();
+        OmKeyInfo value = iterator.value().getValue();
+        if (entryInDb.startsWith(keyInDb)) {
+          String entryKeyName = value.getKeyName();
+          if (recursive) {
+            // for recursive list all the entries
+            fileStatusList.add(new OzoneFileStatus(value, scmBlockSize,
+                !OzoneFSUtils.isFile(entryKeyName)));
+            iterator.next();
+          } else {
+            // get the child of the directory to list from the entry. For
+            // example if directory to list is /a and entry is /a/b/c where
+            // c is a file. The immediate child is b which is a directory. c
+            // should not be listed as child of a.
+            String immediateChild = OzoneFSUtils
+                .getImmediateChild(entryKeyName, keyName);
+            boolean isFile = OzoneFSUtils.isFile(immediateChild);
+            if (isFile) {
+              fileStatusList
+                  .add(new OzoneFileStatus(value, scmBlockSize, !isFile));
+              iterator.next();
+            } else {
+              // if entry is a directory
+              fileStatusList.add(new OzoneFileStatus(immediateChild));
+              // skip the other descendants of this child directory.
+              iterator.seek(
+                  getNextGreaterString(volumeName, bucketName, immediateChild));
+            }
+          }
+        } else {
+          break;
+        }
+      }
+    } finally {
+      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+    }
+    return fileStatusList;
+  }
+
+  private String getNextGreaterString(String volumeName, String bucketName,
+      String keyPrefix) throws IOException {
+    // Increment the last character of the string and return the new ozone key.
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(keyPrefix),
+        "Key prefix is null or empty");
+    CodecRegistry codecRegistry =
+        ((RDBStore) metadataManager.getStore()).getCodecRegistry();
+    byte[] keyPrefixInBytes = codecRegistry.asRawData(keyPrefix);
+    keyPrefixInBytes[keyPrefixInBytes.length - 1]++;
+    String nextPrefix = codecRegistry.asObject(keyPrefixInBytes, String.class);
+    return metadataManager.getOzoneKey(volumeName, bucketName, nextPrefix);
+  }
+
   /**
    * Verify that none of the parent path exists as file in the filesystem.
    *
@@ -1555,6 +1661,8 @@ public class KeyManagerImpl implements KeyManager {
    *                           directory for the ozone filesystem.
    * @param directoryMustExist throws exception if true and given path does not
    *                           exist as directory
+   * @return OzoneFileStatus of the first directory found in path in reverse
+   * order
    * @throws OMException if ancestor exists as file in the filesystem
    *                     if directoryMustExist flag is true and parent does
    *                     not exist
@@ -1562,8 +1670,9 @@ public class KeyManagerImpl implements KeyManager {
    * @throws IOException if there is error in the db
    *                     invalid arguments
    */
-  private void verifyNoFilesInPath(String volumeName, String bucketName,
-      Path path, boolean directoryMustExist) throws IOException {
+  private OzoneFileStatus verifyNoFilesInPath(String volumeName,
+      String bucketName, Path path, boolean directoryMustExist)
+      throws IOException {
     OmKeyArgs.Builder argsBuilder = new OmKeyArgs.Builder()
         .setVolumeName(volumeName)
         .setBucketName(bucketName);
@@ -1580,7 +1689,7 @@ public class KeyManagerImpl implements KeyManager {
                   + "bucket: " + bucketName + "key: " + keyName,
               ResultCodes.FILE_ALREADY_EXISTS);
         } else if (fileStatus.isDirectory()) {
-          break;
+          return fileStatus;
         }
       } catch (OMException ex) {
         if (ex.getResult() != ResultCodes.FILE_NOT_FOUND) {
@@ -1594,6 +1703,7 @@ public class KeyManagerImpl implements KeyManager {
       }
       path = path.getParent();
     }
+    return null;
   }
 
   private FileEncryptionInfo getFileEncryptionInfo(OmBucketInfo bucketInfo)
@@ -1617,11 +1727,4 @@ public class KeyManagerImpl implements KeyManager {
     return encInfo;
   }
 
-  private String addTrailingSlashIfNeeded(String key) {
-    if (StringUtils.isNotEmpty(key) && !key.endsWith(OZONE_URI_DELIMITER)) {
-      return key + OZONE_URI_DELIMITER;
-    } else {
-      return key;
-    }
-  }
 }

+ 17 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java

@@ -71,6 +71,7 @@ public class OMMetrics {
   private @Metric MutableCounterLong numCreateDirectory;
   private @Metric MutableCounterLong numCreateFile;
   private @Metric MutableCounterLong numLookupFile;
+  private @Metric MutableCounterLong numListStatus;
 
   // Failure Metrics
   private @Metric MutableCounterLong numVolumeCreateFails;
@@ -107,6 +108,7 @@ public class OMMetrics {
   private @Metric MutableCounterLong numCreateDirectoryFails;
   private @Metric MutableCounterLong numCreateFileFails;
   private @Metric MutableCounterLong numLookupFileFails;
+  private @Metric MutableCounterLong numListStatusFails;
 
   // Metrics for total number of volumes, buckets and keys
 
@@ -333,6 +335,16 @@ public class OMMetrics {
     numLookupFileFails.incr();
   }
 
+  public void incNumListStatus() {
+    numKeyOps.incr();
+    numFSOps.incr();
+    numListStatus.incr();
+  }
+
+  public void incNumListStatusFails() {
+    numListStatusFails.incr();
+  }
+
   public void incNumListMultipartUploadPartFails() {
     numListMultipartUploadPartFails.incr();
   }
@@ -638,6 +650,11 @@ public class OMMetrics {
     return numGetFileStatus.value();
   }
 
+  @VisibleForTesting
+  public long getNumListStatus() {
+    return numListStatus.value();
+  }
+
   @VisibleForTesting
   public long getNumVolumeListFails() {
     return numVolumeListFails.value();

+ 4 - 1
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java

@@ -350,7 +350,10 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
     // TODO : Throw if the Bucket is null?
     builder.append(OM_KEY_PREFIX).append(bucket);
     if (StringUtil.isNotBlank(key)) {
-      builder.append(OM_KEY_PREFIX).append(key);
+      builder.append(OM_KEY_PREFIX);
+      if (!key.equals(OM_KEY_PREFIX)) {
+        builder.append(key);
+      }
     }
     return builder.toString();
   }

+ 25 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java

@@ -2946,6 +2946,31 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     }
   }
 
+  @Override
+  public List<OzoneFileStatus> listStatus(OmKeyArgs args, boolean recursive,
+      String startKey, long numEntries) throws IOException {
+    if(isAclEnabled) {
+      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.READ,
+          args.getVolumeName(), args.getBucketName(), args.getKeyName());
+    }
+    boolean auditSuccess = true;
+    try {
+      metrics.incNumListStatus();
+      return keyManager.listStatus(args, recursive, startKey, numEntries);
+    } catch (Exception ex) {
+      metrics.incNumListStatusFails();
+      auditSuccess = false;
+      AUDIT.logWriteFailure(buildAuditMessageForFailure(OMAction.LIST_STATUS,
+          (args == null) ? null : args.toAuditMap(), ex));
+      throw ex;
+    } finally {
+      if(auditSuccess){
+        AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+            OMAction.LIST_STATUS, (args == null) ? null : args.toAuditMap()));
+      }
+    }
+  }
+
   /**
    * Startup options.
    */

+ 4 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/fs/OzoneManagerFS.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 
 import java.io.IOException;
+import java.util.List;
 
 /**
  * Ozone Manager FileSystem interface.
@@ -37,4 +38,7 @@ public interface OzoneManagerFS {
       boolean isRecursive) throws IOException;
 
   OmKeyInfo lookupFile(OmKeyArgs args) throws IOException;
+
+  List<OzoneFileStatus> listStatus(OmKeyArgs keyArgs, boolean recursive,
+      String startKey, long numEntries) throws IOException;
 }

+ 26 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java

@@ -40,6 +40,7 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
 import org.apache.hadoop.ozone.om.helpers.OmPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
@@ -347,6 +348,11 @@ public class OzoneManagerRequestHandler implements RequestHandler {
             lookupFile(request.getLookupFileRequest());
         responseBuilder.setLookupFileResponse(lookupFileResponse);
         break;
+      case ListStatus:
+        OzoneManagerProtocolProtos.ListStatusResponse listStatusResponse =
+            listStatus(request.getListStatusRequest());
+        responseBuilder.setListStatusResponse(listStatusResponse);
+        break;
       default:
         responseBuilder.setSuccess(false);
         responseBuilder.setMessage("Unrecognized Command Type: " + cmdType);
@@ -1031,4 +1037,24 @@ public class OzoneManagerRequestHandler implements RequestHandler {
   protected OzoneManagerServerProtocol getOzoneManagerServerProtocol() {
     return impl;
   }
+
+  private OzoneManagerProtocolProtos.ListStatusResponse listStatus(
+      OzoneManagerProtocolProtos.ListStatusRequest request) throws IOException {
+    KeyArgs keyArgs = request.getKeyArgs();
+    OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(keyArgs.getVolumeName())
+        .setBucketName(keyArgs.getBucketName())
+        .setKeyName(keyArgs.getKeyName())
+        .build();
+    List<OzoneFileStatus> statuses =
+        impl.listStatus(omKeyArgs, request.getRecursive(),
+            request.getStartKey(), request.getNumEntries());
+    OzoneManagerProtocolProtos.ListStatusResponse.Builder
+        listStatusResponseBuilder =
+        OzoneManagerProtocolProtos.ListStatusResponse.newBuilder();
+    for (OzoneFileStatus status : statuses) {
+      listStatusResponseBuilder.addStatuses(status.getProtobuf());
+    }
+    return listStatusResponseBuilder.build();
+  }
 }

+ 211 - 0
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java

@@ -22,6 +22,13 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.UUID;
 
 import org.apache.commons.io.FileUtils;
@@ -45,6 +52,7 @@ import org.apache.hadoop.ozone.om.helpers.*;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
 
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.AfterClass;
@@ -114,6 +122,22 @@ public class TestKeyManagerImpl {
     FileUtils.deleteDirectory(dir);
   }
 
+  @After
+  public void cleanupTest() throws IOException {
+    List<OzoneFileStatus> fileStatuses = keyManager
+        .listStatus(createBuilder().setKeyName("").build(), true, "", 100000);
+    for (OzoneFileStatus fileStatus : fileStatuses) {
+      if (fileStatus.isFile()) {
+        keyManager.deleteKey(
+            createKeyArgs(fileStatus.getPath().toString().substring(1)));
+      } else {
+        keyManager.deleteKey(createKeyArgs(OzoneFSUtils
+            .addTrailingSlashIfNeeded(
+                fileStatus.getPath().toString().substring(1))));
+      }
+    }
+  }
+
   private static void createBucket(String volumeName, String bucketName)
       throws IOException {
     OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
@@ -331,6 +355,193 @@ public class TestKeyManagerImpl {
     }
   }
 
+  private OmKeyArgs createKeyArgs(String toKeyName) {
+    return createBuilder().setKeyName(toKeyName).build();
+  }
+
+  @Test
+  public void testListStatus() throws IOException {
+    String superDir = RandomStringUtils.randomAlphabetic(5);
+
+    int numDirectories = 5;
+    int numFiles = 5;
+    // set of directory descendants of root
+    Set<String> directorySet = new TreeSet<>();
+    // set of file descendants of root
+    Set<String> fileSet = new TreeSet<>();
+    createDepthTwoDirectory(superDir, numDirectories, numFiles, directorySet,
+        fileSet);
+    // set of all descendants of root
+    Set<String> children = new TreeSet<>(directorySet);
+    children.addAll(fileSet);
+    // number of entries in the filesystem
+    int numEntries = directorySet.size() + fileSet.size();
+
+    OmKeyArgs rootDirArgs = createKeyArgs("");
+    List<OzoneFileStatus> fileStatuses =
+        keyManager.listStatus(rootDirArgs, true, "", 100);
+    // verify the number of status returned is same as number of entries
+    Assert.assertEquals(numEntries, fileStatuses.size());
+
+    fileStatuses = keyManager.listStatus(rootDirArgs, false, "", 100);
+    // the number of immediate children of root is 1
+    Assert.assertEquals(1, fileStatuses.size());
+
+    // if startKey is the first descendant of the root then listStatus should
+    // return all the entries.
+    String startKey = children.iterator().next();
+    fileStatuses = keyManager.listStatus(rootDirArgs, true,
+        startKey.substring(0, startKey.length() - 1), 100);
+    Assert.assertEquals(numEntries, fileStatuses.size());
+
+    for (String directory : directorySet) {
+      // verify status list received for each directory with recursive flag set
+      // to false
+      OmKeyArgs dirArgs = createKeyArgs(directory);
+      fileStatuses = keyManager.listStatus(dirArgs, false, "", 100);
+      verifyFileStatus(directory, fileStatuses, directorySet, fileSet, false);
+
+      // verify status list received for each directory with recursive flag set
+      // to true
+      fileStatuses = keyManager.listStatus(dirArgs, true, "", 100);
+      verifyFileStatus(directory, fileStatuses, directorySet, fileSet, true);
+
+      // verify list status call with using the startKey parameter and
+      // recursive flag set to false. After every call to listStatus use the
+      // latest received file status as the startKey until no more entries are
+      // left to list.
+      List<OzoneFileStatus> tempFileStatus = null;
+      Set<OzoneFileStatus> tmpStatusSet = new HashSet<>();
+      do {
+        tempFileStatus = keyManager.listStatus(dirArgs, false,
+            tempFileStatus != null ? OzoneFSUtils.pathToKey(
+                tempFileStatus.get(tempFileStatus.size() - 1).getPath()) : null,
+            2);
+        tmpStatusSet.addAll(tempFileStatus);
+      } while (tempFileStatus.size() == 2);
+      verifyFileStatus(directory, new ArrayList<>(tmpStatusSet), directorySet,
+          fileSet, false);
+
+      // verify list status call with using the startKey parameter and
+      // recursive flag set to true. After every call to listStatus use the
+      // latest received file status as the startKey until no more entries are
+      // left to list.
+      tempFileStatus = null;
+      tmpStatusSet = new HashSet<>();
+      do {
+        tempFileStatus = keyManager.listStatus(dirArgs, true,
+            tempFileStatus != null ? OzoneFSUtils.pathToKey(
+                tempFileStatus.get(tempFileStatus.size() - 1).getPath()) : null,
+            2);
+        tmpStatusSet.addAll(tempFileStatus);
+      } while (tempFileStatus.size() == 2);
+      verifyFileStatus(directory, new ArrayList<>(tmpStatusSet), directorySet,
+          fileSet, true);
+    }
+  }
+
+  /**
+   * Creates a depth two directory.
+   *
+   * @param superDir       Super directory to create
+   * @param numDirectories number of directory children
+   * @param numFiles       number of file children
+   * @param directorySet   set of descendant directories for the super directory
+   * @param fileSet        set of descendant files for the super directory
+   */
+  private void createDepthTwoDirectory(String superDir, int numDirectories,
+      int numFiles, Set<String> directorySet, Set<String> fileSet)
+      throws IOException {
+    // create super directory
+    OmKeyArgs superDirArgs = createKeyArgs(superDir);
+    keyManager.createDirectory(superDirArgs);
+    directorySet.add(superDir);
+
+    // add directory children to super directory
+    Set<String> childDirectories =
+        createDirectories(superDir, new HashMap<>(), numDirectories);
+    directorySet.addAll(childDirectories);
+    // add file to super directory
+    fileSet.addAll(createFiles(superDir, new HashMap<>(), numFiles));
+
+    // for each child directory create files and directories
+    for (String child : childDirectories) {
+      fileSet.addAll(createFiles(child, new HashMap<>(), numFiles));
+      directorySet
+          .addAll(createDirectories(child, new HashMap<>(), numDirectories));
+    }
+  }
+
+  private void verifyFileStatus(String directory,
+      List<OzoneFileStatus> fileStatuses, Set<String> directorySet,
+      Set<String> fileSet, boolean recursive) {
+
+    for (OzoneFileStatus fileStatus : fileStatuses) {
+      String keyName = OzoneFSUtils.pathToKey(fileStatus.getPath());
+      String parent = Paths.get(keyName).getParent().toString();
+      if (!recursive) {
+        // if recursive is false, verify all the statuses have the input
+        // directory as parent
+        Assert.assertEquals(parent, directory);
+      }
+      // verify filestatus is present in directory or file set accordingly
+      if (fileStatus.isDirectory()) {
+        Assert.assertTrue(directorySet.contains(keyName));
+      } else {
+        Assert.assertTrue(fileSet.contains(keyName));
+      }
+    }
+
+    // count the number of entries which should be present in the directory
+    int numEntries = 0;
+    Set<String> entrySet = new TreeSet<>(directorySet);
+    entrySet.addAll(fileSet);
+    for (String entry : entrySet) {
+      if (OzoneFSUtils.getParent(entry)
+          .startsWith(OzoneFSUtils.addTrailingSlashIfNeeded(directory))) {
+        if (recursive) {
+          numEntries++;
+        } else if (OzoneFSUtils.getParent(entry)
+            .equals(OzoneFSUtils.addTrailingSlashIfNeeded(directory))) {
+          numEntries++;
+        }
+      }
+    }
+    // verify the number of entries match the status list size
+    Assert.assertEquals(fileStatuses.size(), numEntries);
+  }
+
+  private Set<String> createDirectories(String parent,
+      Map<String, List<String>> directoryMap, int numDirectories)
+      throws IOException {
+    Set<String> keyNames = new TreeSet<>();
+    for (int i = 0; i < numDirectories; i++) {
+      String keyName = parent + "/" + RandomStringUtils.randomAlphabetic(5);
+      OmKeyArgs keyArgs = createBuilder().setKeyName(keyName).build();
+      keyManager.createDirectory(keyArgs);
+      keyNames.add(keyName);
+    }
+    directoryMap.put(parent, new ArrayList<>(keyNames));
+    return keyNames;
+  }
+
+  private List<String> createFiles(String parent,
+      Map<String, List<String>> fileMap, int numFiles) throws IOException {
+    List<String> keyNames = new ArrayList<>();
+    for (int i = 0; i < numFiles; i++) {
+      String keyName = parent + "/" + RandomStringUtils.randomAlphabetic(5);
+      OmKeyArgs keyArgs = createBuilder().setKeyName(keyName).build();
+      OpenKeySession keySession = keyManager.createFile(keyArgs, false, false);
+      keyArgs.setLocationInfoList(
+          keySession.getKeyInfo().getLatestVersionLocations()
+              .getLocationList());
+      keyManager.commitKey(keyArgs, keySession.getId());
+      keyNames.add(keyName);
+    }
+    fileMap.put(parent, keyNames);
+    return keyNames;
+  }
+
   private OmKeyArgs.Builder createBuilder() {
     return new OmKeyArgs.Builder()
         .setBucketName(BUCKET_NAME)

+ 14 - 0
hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java

@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -253,6 +254,19 @@ public class BasicOzoneClientAdapterImpl implements OzoneClientAdapter {
     return new IteratorAdapter(bucket.listKeys(pathKey));
   }
 
+  public List<OzoneFileStatus> listStatus(String keyName, boolean recursive,
+      String startKey, long numEntries) throws IOException {
+    try {
+      incrementCounter(Statistic.OBJECTS_LIST);
+      return bucket.listStatus(keyName, recursive, startKey, numEntries);
+    } catch (OMException e) {
+      if (e.getResult() == OMException.ResultCodes.FILE_NOT_FOUND) {
+        throw new FileNotFoundException(e.getMessage());
+      }
+      throw e;
+    }
+  }
+
   @Override
   public Token<OzoneTokenIdentifier> getDelegationToken(String renewer)
       throws IOException {

+ 27 - 123
hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneFileSystem.java

@@ -22,17 +22,13 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.ArrayList;
 import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
+import java.util.LinkedList;
 import java.util.Objects;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -46,6 +42,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Progressable;
@@ -494,130 +491,37 @@ public class BasicOzoneFileSystem extends FileSystem {
     }
   }
 
-  private class ListStatusIterator extends OzoneListingIterator {
-    // _fileStatuses_ maintains a list of file(s) which is either the input
-    // path itself or a child of the input directory path.
-    private List<FileStatus> fileStatuses = new ArrayList<>(LISTING_PAGE_SIZE);
-    // _subDirStatuses_ maintains a list of sub-dirs of the input directory
-    // path.
-    private Map<Path, FileStatus> subDirStatuses =
-        new HashMap<>(LISTING_PAGE_SIZE);
-    private Path f; // the input path
-
-    ListStatusIterator(Path f) throws IOException {
-      super(f);
-      this.f = f;
-    }
+  @Override
+  public FileStatus[] listStatus(Path f) throws IOException {
+    incrementCounter(Statistic.INVOCATION_LIST_STATUS);
+    statistics.incrementReadOps(1);
+    LOG.trace("listStatus() path:{}", f);
+    int numEntries = LISTING_PAGE_SIZE;
+    LinkedList<OzoneFileStatus> statuses = new LinkedList<>();
+    List<OzoneFileStatus> tmpStatusList;
+    String startKey = "";
 
-    /**
-     * Add the key to the listStatus result if the key corresponds to the
-     * input path or is an immediate child of the input path.
-     *
-     * @param key key to be processed
-     * @return always returns true
-     * @throws IOException
-     */
-    @Override
-    boolean processKey(String key) throws IOException {
-      Path keyPath = new Path(OZONE_URI_DELIMITER + key);
-      if (key.equals(getPathKey())) {
-        if (pathIsDirectory()) {
-          // if input path is a directory, we add the sub-directories and
-          // files under this directory.
-          return true;
-        } else {
-          addFileStatus(keyPath);
-          return true;
-        }
-      }
-      // Left with only subkeys now
-      // We add only the immediate child files and sub-dirs i.e. we go only
-      // upto one level down the directory tree structure.
-      if (pathToKey(keyPath.getParent()).equals(pathToKey(f))) {
-        // This key is an immediate child. Can be file or directory
-        if (key.endsWith(OZONE_URI_DELIMITER)) {
-          // Key is a directory
-          addSubDirStatus(keyPath);
+    do {
+      tmpStatusList =
+          adapter.listStatus(pathToKey(f), false, startKey, numEntries);
+      if (!tmpStatusList.isEmpty()) {
+        if (startKey.isEmpty()) {
+          statuses.addAll(tmpStatusList);
         } else {
-          addFileStatus(keyPath);
-        }
-      } else {
-        // This key is not the immediate child of the input directory. So we
-        // traverse the parent tree structure of this key until we get the
-        // immediate child of the input directory.
-        Path immediateChildPath = getImmediateChildPath(keyPath.getParent());
-        if (immediateChildPath != null) {
-          addSubDirStatus(immediateChildPath);
+          statuses.addAll(tmpStatusList.subList(1, tmpStatusList.size()));
         }
+        startKey = pathToKey(statuses.getLast().getPath());
       }
-      return true;
-    }
+      // listStatus returns entries numEntries in size if available.
+      // Any lesser number of entries indicate that the required entries have
+      // exhausted.
+    } while (tmpStatusList.size() == numEntries);
 
-    /**
-     * Adds the FileStatus of keyPath to final result of listStatus.
-     *
-     * @param filePath path to the file
-     * @throws FileNotFoundException
-     */
-    void addFileStatus(Path filePath) throws IOException {
-      fileStatuses.add(getFileStatus(filePath));
+    for (OzoneFileStatus status : statuses) {
+      status.makeQualified(uri, status.getPath().makeQualified(uri, workingDir),
+          getUsername(), getUsername());
     }
-
-    /**
-     * Adds the FileStatus of the subdir to final result of listStatus, if not
-     * already included.
-     *
-     * @param dirPath path to the dir
-     * @throws FileNotFoundException
-     */
-    void addSubDirStatus(Path dirPath) throws IOException {
-      // Check if subdir path is already included in statuses.
-      if (!subDirStatuses.containsKey(dirPath)) {
-        subDirStatuses.put(dirPath, getFileStatus(dirPath));
-      }
-    }
-
-    /**
-     * Traverse the parent directory structure of keyPath to determine the
-     * which parent/ grand-parent/.. is the immediate child of the input path f.
-     *
-     * @param keyPath path whose parent directory structure should be traversed.
-     * @return immediate child path of the input path f.
-     */
-    Path getImmediateChildPath(Path keyPath) {
-      Path path = keyPath;
-      Path parent = path.getParent();
-      while (parent != null) {
-        if (pathToKey(parent).equals(pathToKey(f))) {
-          return path;
-        }
-        path = parent;
-        parent = path.getParent();
-      }
-      return null;
-    }
-
-    /**
-     * Return the result of listStatus operation. If the input path is a
-     * file, return the status for only that file. If the input path is a
-     * directory, return the statuses for all the child files and sub-dirs.
-     */
-    FileStatus[] getStatuses() {
-      List<FileStatus> result = Stream.concat(
-          fileStatuses.stream(), subDirStatuses.values().stream())
-          .collect(Collectors.toList());
-      return result.toArray(new FileStatus[result.size()]);
-    }
-  }
-
-  @Override
-  public FileStatus[] listStatus(Path f) throws IOException {
-    incrementCounter(Statistic.INVOCATION_LIST_STATUS);
-    statistics.incrementReadOps(1);
-    LOG.trace("listStatus() path:{}", f);
-    ListStatusIterator iterator = new ListStatusIterator(f);
-    iterator.iterate();
-    return iterator.getStatuses();
+    return statuses.toArray(new FileStatus[0]);
   }
 
   @Override

+ 4 - 0
hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java

@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.util.Iterator;
+import java.util.List;
 
 /**
  * Lightweight adapter to separate hadoop/ozone classes.
@@ -52,6 +53,9 @@ public interface OzoneClientAdapter {
 
   Iterator<BasicKeyInfo> listKeys(String pathKey);
 
+  List<OzoneFileStatus> listStatus(String keyName, boolean recursive,
+      String startKey, long numEntries) throws IOException;
+
   Token<OzoneTokenIdentifier> getDelegationToken(String renewer)
       throws IOException;
 

+ 41 - 1
hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java

@@ -20,12 +20,15 @@ package org.apache.hadoop.fs.ozone;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.junit.Assert;
@@ -104,6 +107,8 @@ public class TestOzoneFileInterfaces {
 
   private OzoneFSStorageStatistics statistics;
 
+  private OMMetrics omMetrics;
+
   public TestOzoneFileInterfaces(boolean setDefaultFs,
       boolean useAbsolutePath) {
     this.setDefaultFs = setDefaultFs;
@@ -147,6 +152,7 @@ public class TestOzoneFileInterfaces {
     }
     o3fs = (OzoneFileSystem) fs;
     statistics = (OzoneFSStorageStatistics) o3fs.getOzoneFSOpsCountStatistics();
+    omMetrics = cluster.getOzoneManager().getMetrics();
   }
 
   @After
@@ -246,11 +252,45 @@ public class TestOzoneFileInterfaces {
     assertEquals(1, statusList.length);
     assertEquals(status, statusList[0]);
 
-    FileStatus statusRoot = fs.getFileStatus(createPath("/"));
+    fs.getFileStatus(createPath("/"));
     assertTrue("Root dir (/) is not a directory.", status.isDirectory());
     assertEquals(0, status.getLen());
   }
 
+  @Test
+  public void testListStatus() throws IOException {
+    List<Path> paths = new ArrayList<>();
+    String dirPath = RandomStringUtils.randomAlphanumeric(5);
+    Path path = createPath("/" + dirPath);
+    paths.add(path);
+    assertTrue("Makedirs returned with false for the path " + path,
+        fs.mkdirs(path));
+
+    long listObjects = statistics.getLong(Statistic.OBJECTS_LIST.getSymbol());
+    long omListStatus = omMetrics.getNumListStatus();
+    FileStatus[] statusList = fs.listStatus(createPath("/"));
+    assertEquals(1, statusList.length);
+    assertEquals(++listObjects,
+        statistics.getLong(Statistic.OBJECTS_LIST.getSymbol()).longValue());
+    assertEquals(++omListStatus, omMetrics.getNumListStatus());
+    assertEquals(fs.getFileStatus(path), statusList[0]);
+
+    dirPath = RandomStringUtils.randomAlphanumeric(5);
+    path = createPath("/" + dirPath);
+    paths.add(path);
+    assertTrue("Makedirs returned with false for the path " + path,
+        fs.mkdirs(path));
+
+    statusList = fs.listStatus(createPath("/"));
+    assertEquals(2, statusList.length);
+    assertEquals(++listObjects,
+        statistics.getLong(Statistic.OBJECTS_LIST.getSymbol()).longValue());
+    assertEquals(++omListStatus, omMetrics.getNumListStatus());
+    for (Path p : paths) {
+      assertTrue(Arrays.asList(statusList).contains(fs.getFileStatus(p)));
+    }
+  }
+
   @Test
   public void testOzoneManagerFileSystemInterface() throws IOException {
     String dirPath = RandomStringUtils.randomAlphanumeric(5);

+ 28 - 1
hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java

@@ -44,6 +44,9 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Set;
+import java.util.TreeSet;
+
 import org.junit.rules.Timeout;
 
 import static org.junit.Assert.assertEquals;
@@ -172,7 +175,7 @@ public class TestOzoneFileSystem {
   public void testListStatus() throws Exception {
     Path parent = new Path("/testListStatus");
     Path file1 = new Path(parent, "key1");
-    Path file2 = new Path(parent, "key1/key2");
+    Path file2 = new Path(parent, "key2");
     ContractTestUtils.touch(fs, file1);
     ContractTestUtils.touch(fs, file2);
 
@@ -219,6 +222,30 @@ public class TestOzoneFileSystem {
     assertFalse(fileStatus2.equals(dir12.toString()));
   }
 
+  /**
+   * Tests listStatus operation on root directory.
+   */
+  @Test
+  public void testListStatusOnLargeDirectory() throws Exception {
+    Path root = new Path("/");
+    Set<String> paths = new TreeSet<>();
+    int numDirs = 5111;
+    for(int i = 0; i < numDirs; i++) {
+      Path p = new Path(root, String.valueOf(i));
+      fs.mkdirs(p);
+      paths.add(p.getName());
+    }
+
+    FileStatus[] fileStatuses = o3fs.listStatus(root);
+    assertEquals(
+        "Total directories listed do not match the existing directories",
+        numDirs, fileStatuses.length);
+
+    for (int i=0; i < numDirs; i++) {
+      assertTrue(paths.contains(fileStatuses[i].getPath().getName()));
+    }
+  }
+
   /**
    * Tests listStatus on a path with subdirs.
    */