소스 검색

HDFS-10480. Add an admin command to list currently open files. Contributed by Manoj Govindassamy.

Andrew Wang 8 년 전
부모
커밋
fb68980959
22개의 변경된 파일841개의 추가작업 그리고 7개의 파일을 삭제
  1. 12 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  2. 15 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
  3. 15 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
  4. 12 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
  5. 58 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFileEntry.java
  6. 59 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java
  7. 23 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
  8. 18 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
  9. 18 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
  10. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  11. 20 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
  12. 45 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  13. 49 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
  14. 8 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
  15. 60 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
  16. 10 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
  17. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
  18. 38 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
  19. 59 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java
  20. 10 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java
  21. 234 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java
  22. 72 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java

+ 12 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -128,6 +128,8 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
 import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
@@ -3025,4 +3027,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   Tracer getTracer() {
     return tracer;
   }
+
+  /**
+   * Get a remote iterator to the open files list managed by NameNode.
+   *
+   * @throws IOException
+   */
+  public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException {
+    checkOpen();
+    return new OpenFilesIterator(namenode, tracer);
+  }
 }

+ 15 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -87,6 +87,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
@@ -2881,4 +2882,18 @@ public class DistributedFileSystem extends FileSystem {
   public HdfsDataOutputStreamBuilder createFile(Path path) {
     return new HdfsDataOutputStreamBuilder(this, path).create().overwrite(true);
   }
+
+  /**
+   * Returns a RemoteIterator which can be used to list all open files
+   * currently managed by the NameNode. For large numbers of open files,
+   * iterator will fetch the list in batches of configured size.
+   * <p/>
+   * Since the list is fetched in batches, it does not represent a
+   * consistent snapshot of the all open files.
+   * <p/>
+   * This method can only be called by HDFS superusers.
+   */
+  public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException {
+    return dfs.listOpenFiles();
+  }
 }

+ 15 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java

@@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 
@@ -560,4 +561,18 @@ public class HdfsAdmin {
     dfs.setPermission(trashPath, TRASH_PERMISSION);
   }
 
+  /**
+   * Returns a RemoteIterator which can be used to list all open files
+   * currently managed by the NameNode. For large numbers of open files,
+   * iterator will fetch the list in batches of configured size.
+   * <p/>
+   * Since the list is fetched in batches, it does not represent a
+   * consistent snapshot of the all open files.
+   * <p/>
+   * This method can only be called by HDFS superusers.
+   */
+  public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException {
+    return dfs.listOpenFiles();
+  }
+
 }

+ 12 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

@@ -1606,4 +1606,16 @@ public interface ClientProtocol {
    */
   @Idempotent
   QuotaUsage getQuotaUsage(String path) throws IOException;
+
+  /**
+   * List open files in the system in batches. INode id is the cursor and the
+   * open files returned in a batch will have their INode ids greater than
+   * the cursor INode id. Open files can only be requested by super user and
+   * the the list across batches are not atomic.
+   *
+   * @param prevId the cursor INode id.
+   * @throws IOException
+   */
+  @Idempotent
+  BatchedEntries<OpenFileEntry> listOpenFiles(long prevId) throws IOException;
 }

+ 58 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFileEntry.java

@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * An open file entry for use by DFSAdmin commands.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class OpenFileEntry {
+  private final long id;
+  private final String filePath;
+  private final String clientName;
+  private final String clientMachine;
+
+  public OpenFileEntry(long id, String filePath,
+      String clientName, String clientMachine) {
+    this.id = id;
+    this.filePath = filePath;
+    this.clientName = clientName;
+    this.clientMachine = clientMachine;
+  }
+
+  public long getId() {
+    return id;
+  }
+
+  public String getFilePath() {
+    return filePath;
+  }
+
+  public String getClientMachine() {
+    return clientMachine;
+  }
+
+  public String getClientName() {
+    return clientName;
+  }
+}

+ 59 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/OpenFilesIterator.java

@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.BatchedRemoteIterator;
+import org.apache.htrace.core.TraceScope;
+import org.apache.htrace.core.Tracer;
+
+/**
+ * OpenFilesIterator is a remote iterator that iterates over the open files list
+ * managed by the NameNode. Since the list is retrieved in batches, it does not
+ * represent a consistent view of all open files.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class OpenFilesIterator extends
+    BatchedRemoteIterator<Long, OpenFileEntry> {
+  private final ClientProtocol namenode;
+  private final Tracer tracer;
+
+  public OpenFilesIterator(ClientProtocol namenode, Tracer tracer) {
+    super(HdfsConstants.GRANDFATHER_INODE_ID);
+    this.namenode = namenode;
+    this.tracer = tracer;
+  }
+
+  @Override
+  public BatchedEntries<OpenFileEntry> makeRequest(Long prevId)
+      throws IOException {
+    try (TraceScope ignored = tracer.newScope("listOpenFiles")) {
+      return namenode.listOpenFiles(prevId);
+    }
+  }
+
+  @Override
+  public Long elementToPrevKey(OpenFileEntry entry) {
+    return entry.getId();
+  }
+}

+ 23 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java

@@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.BlocksStats;
+import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
@@ -139,10 +140,13 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCa
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListOpenFilesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListOpenFilesResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCacheDirectiveRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.OpenFilesBatchResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCacheDirectiveRequestProto;
@@ -1752,4 +1756,23 @@ public class ClientNamenodeProtocolTranslatorPB implements
       throw ProtobufHelper.getRemoteException(e);
     }
   }
+
+  @Override
+  public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId)
+      throws IOException {
+    ListOpenFilesRequestProto req =
+        ListOpenFilesRequestProto.newBuilder().setId(prevId).build();
+    try {
+      ListOpenFilesResponseProto response = rpcProxy.listOpenFiles(null, req);
+      List<OpenFileEntry> openFileEntries =
+          Lists.newArrayListWithCapacity(response.getEntriesCount());
+      for (OpenFilesBatchResponseProto p : response.getEntriesList()) {
+        openFileEntries.add(PBHelperClient.convert(p));
+      }
+      return new BatchedListEntries<>(openFileEntries, response.getHasMore());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
 }

+ 18 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java

@@ -91,6 +91,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.protocol.BlocksStats;
+import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
@@ -120,6 +121,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEdi
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsECBlockGroupsStatsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsBlocksStatsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.OpenFilesBatchResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeActionProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
@@ -1253,6 +1255,21 @@ public class PBHelperClient {
         proto.getKeyName());
   }
 
+  public static OpenFilesBatchResponseProto convert(OpenFileEntry
+      openFileEntry) {
+    return OpenFilesBatchResponseProto.newBuilder()
+        .setId(openFileEntry.getId())
+        .setPath(openFileEntry.getFilePath())
+        .setClientName(openFileEntry.getClientName())
+        .setClientMachine(openFileEntry.getClientMachine())
+        .build();
+  }
+
+  public static OpenFileEntry convert(OpenFilesBatchResponseProto proto) {
+    return new OpenFileEntry(proto.getId(), proto.getPath(),
+        proto.getClientName(), proto.getClientMachine());
+  }
+
   public static AclStatus convert(GetAclStatusResponseProto e) {
     AclStatusProto r = e.getResult();
     AclStatus.Builder builder = new AclStatus.Builder();
@@ -2826,4 +2843,4 @@ public class PBHelperClient {
     }
     return ret;
   }
-}
+}

+ 18 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto

@@ -777,6 +777,22 @@ message GetEditsFromTxidResponseProto {
   required EventsListProto eventsList = 1;
 }
 
+message ListOpenFilesRequestProto {
+  required int64 id = 1;
+}
+
+message OpenFilesBatchResponseProto {
+  required int64 id = 1;
+  required string path = 2;
+  required string clientName = 3;
+  required string clientMachine = 4;
+}
+
+message ListOpenFilesResponseProto {
+  repeated OpenFilesBatchResponseProto entries = 1;
+  required bool hasMore = 2;
+}
+
 service ClientNamenodeProtocol {
   rpc getBlockLocations(GetBlockLocationsRequestProto)
       returns(GetBlockLocationsResponseProto);
@@ -945,4 +961,6 @@ service ClientNamenodeProtocol {
       returns(GetErasureCodingCodecsResponseProto);
   rpc getQuotaUsage(GetQuotaUsageRequestProto)
       returns(GetQuotaUsageResponseProto);
+  rpc listOpenFiles(ListOpenFilesRequestProto)
+      returns(ListOpenFilesResponseProto);
 }

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -873,6 +873,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       HdfsClientConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY;
   public static final int    DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES_DEFAULT = 100;
   public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses";
+  public static final String DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES =
+      "dfs.namenode.list.openfiles.num.responses";
+  public static final int    DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES_DEFAULT =
+      1000;
   public static final String DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_KEY = "dfs.namenode.edekcacheloader.interval.ms";
   public static final int DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_DEFAULT = 1000;
   public static final String DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY = "dfs.namenode.edekcacheloader.initial.delay.ms";

+ 20 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java

@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
@@ -155,6 +156,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCa
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListOpenFilesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListOpenFilesResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto;
@@ -1717,4 +1720,21 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public ListOpenFilesResponseProto listOpenFiles(RpcController controller,
+      ListOpenFilesRequestProto req) throws ServiceException {
+    try {
+      BatchedEntries<OpenFileEntry> entries = server.listOpenFiles(req.getId());
+      ListOpenFilesResponseProto.Builder builder =
+          ListOpenFilesResponseProto.newBuilder();
+      builder.setHasMore(entries.hasMore());
+      for (int i = 0; i < entries.size(); i++) {
+        builder.addEntries(PBHelperClient.convert(entries.get(i)));
+      }
+      return builder.build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }

+ 45 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -91,6 +91,7 @@ import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
 
 import org.apache.hadoop.hdfs.protocol.BlocksStats;
 import org.apache.hadoop.hdfs.protocol.ECBlockGroupsStats;
+import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import static org.apache.hadoop.util.Time.now;
 import static org.apache.hadoop.util.Time.monotonicNow;
@@ -424,6 +425,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   /** Maximum time the lock is hold to release lease. */
   private final long maxLockHoldToReleaseLeaseMs;
 
+  // Batch size for open files response
+  private final int maxListOpenFilesResponses;
+
   // Scan interval is not configurable.
   private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL =
     TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
@@ -874,6 +878,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         inodeAttributeProvider = ReflectionUtils.newInstance(klass, conf);
         LOG.info("Using INode attribute provider: " + klass.getName());
       }
+      this.maxListOpenFilesResponses = conf.getInt(
+          DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES,
+          DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES_DEFAULT
+      );
+      Preconditions.checkArgument(maxListOpenFilesResponses > 0,
+          DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES +
+              " must be a positive integer."
+      );
     } catch(IOException e) {
       LOG.error(getClass().getSimpleName() + " initialization failed.", e);
       close();
@@ -905,6 +917,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return maxLockHoldToReleaseLeaseMs;
   }
 
+  public int getMaxListOpenFilesResponses() {
+    return maxListOpenFilesResponses;
+  }
+
   void lockRetryCache() {
     if (retryCache != null) {
       retryCache.lock();
@@ -1714,6 +1730,35 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     blockManager.metaSave(out);
   }
 
+  /**
+   * List open files in the system in batches. prevId is the cursor INode id and
+   * the open files returned in a batch will have their INode ids greater than
+   * this cursor. Open files can only be requested by super user and the the
+   * list across batches does not represent a consistent view of all open files.
+   *
+   * @param prevId the cursor INode id.
+   * @throws IOException
+   */
+  BatchedListEntries<OpenFileEntry> listOpenFiles(long prevId)
+      throws IOException {
+    final String operationName = "listOpenFiles";
+    checkSuperuserPrivilege();
+    checkOperation(OperationCategory.READ);
+    readLock();
+    BatchedListEntries<OpenFileEntry> batchedListEntries;
+    try {
+      checkOperation(OperationCategory.READ);
+      batchedListEntries = leaseManager.getUnderConstructionFiles(prevId);
+    } catch (AccessControlException e) {
+      logAuditEvent(false, operationName, null);
+      throw e;
+    } finally {
+      readUnlock(operationName);
+    }
+    logAuditEvent(true, operationName, null);
+    return batchedListEntries;
+  }
+
   private String metaSaveAsString() {
     StringWriter sw = new StringWriter();
     PrintWriter pw = new PrintWriter(sw);

+ 49 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java

@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.PriorityQueue;
@@ -40,7 +39,9 @@ import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.util.Daemon;
@@ -94,7 +95,7 @@ public class LeaseManager {
         }
   });
   // INodeID -> Lease
-  private final HashMap<Long, Lease> leasesById = new HashMap<>();
+  private final TreeMap<Long, Lease> leasesById = new TreeMap<>();
 
   private Daemon lmthread;
   private volatile boolean shouldRunMonitor;
@@ -245,6 +246,52 @@ public class LeaseManager {
     return iipSet;
   }
 
+  /**
+   * Get a batch of under construction files from the currently active leases.
+   * File INodeID is the cursor used to fetch new batch of results and the
+   * batch size is configurable using below config param. Since the list is
+   * fetched in batches, it does not represent a consistent view of all
+   * open files.
+   *
+   * @see org.apache.hadoop.hdfs.DFSConfigKeys#DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES
+   * @param prevId the INodeID cursor
+   * @throws IOException
+   */
+  public BatchedListEntries<OpenFileEntry> getUnderConstructionFiles(
+      final long prevId) throws IOException {
+    assert fsnamesystem.hasReadLock();
+    SortedMap<Long, Lease> remainingLeases;
+    synchronized (this) {
+      remainingLeases = leasesById.tailMap(prevId, false);
+    }
+    Collection<Long> inodeIds = remainingLeases.keySet();
+    final int numResponses = Math.min(
+        this.fsnamesystem.getMaxListOpenFilesResponses(), inodeIds.size());
+    final List<OpenFileEntry> openFileEntries =
+        Lists.newArrayListWithExpectedSize(numResponses);
+
+    int count = 0;
+    for (Long inodeId: inodeIds) {
+      final INodeFile inodeFile =
+          fsnamesystem.getFSDirectory().getInode(inodeId).asFile();
+      if (!inodeFile.isUnderConstruction()) {
+        LOG.warn("The file " + inodeFile.getFullPathName()
+            + " is not under construction but has lease.");
+        continue;
+      }
+      openFileEntries.add(new OpenFileEntry(
+          inodeFile.getId(), inodeFile.getFullPathName(),
+          inodeFile.getFileUnderConstructionFeature().getClientName(),
+          inodeFile.getFileUnderConstructionFeature().getClientMachine()));
+      count++;
+      if (count >= numResponses) {
+        break;
+      }
+    }
+    boolean hasMore = (numResponses < remainingLeases.size());
+    return new BatchedListEntries<>(openFileEntries, hasMore);
+  }
+
   /** @return the lease containing src */
   public synchronized Lease getLease(INodeFile src) {return leasesById.get(src.getId());}
 

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -110,6 +110,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
 import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
@@ -1308,6 +1309,13 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     namesystem.metaSave(filename);
   }
 
+  @Override // ClientProtocol
+  public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId)
+      throws IOException {
+    checkNNStartup();
+    return namesystem.listOpenFiles(prevId);
+  }
+
   @Override // ClientProtocol
   public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
       throws IOException {

+ 60 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java

@@ -49,7 +49,9 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.FsTracer;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.shell.Command;
 import org.apache.hadoop.fs.shell.CommandFormat;
 import org.apache.hadoop.fs.shell.PathData;
@@ -73,6 +75,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
 import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
@@ -455,6 +459,7 @@ public class DFSAdmin extends FsShell {
     "\t[-getDatanodeInfo <datanode_host:ipc_port>]\n" +
     "\t[-metasave filename]\n" +
     "\t[-triggerBlockReport [-incremental] <datanode_host:ipc_port>]\n" +
+    "\t[-listOpenFiles]\n" +
     "\t[-help [cmd]]\n";
 
   /**
@@ -881,6 +886,45 @@ public class DFSAdmin extends FsShell {
     return exitCode;
   }
 
+  /**
+   * Command to list all the open files currently managed by NameNode.
+   * Usage: hdfs dfsadmin -listOpenFiles
+   *
+   * @throws IOException
+   */
+  public int listOpenFiles() throws IOException {
+    DistributedFileSystem dfs = getDFS();
+    Configuration dfsConf = dfs.getConf();
+    URI dfsUri = dfs.getUri();
+    boolean isHaEnabled = HAUtilClient.isLogicalUri(dfsConf, dfsUri);
+
+    RemoteIterator<OpenFileEntry> openFilesRemoteIterator;
+    if (isHaEnabled) {
+      ProxyAndInfo<ClientProtocol> proxy = NameNodeProxies.createNonHAProxy(
+          dfsConf, HAUtil.getAddressOfActive(getDFS()), ClientProtocol.class,
+          UserGroupInformation.getCurrentUser(), false);
+      openFilesRemoteIterator = new OpenFilesIterator(proxy.getProxy(),
+          FsTracer.get(dfsConf));
+    } else {
+      openFilesRemoteIterator = dfs.listOpenFiles();
+    }
+    printOpenFiles(openFilesRemoteIterator);
+    return 0;
+  }
+
+  private void printOpenFiles(RemoteIterator<OpenFileEntry> openFilesIterator)
+      throws IOException {
+    System.out.println(String.format("%-20s\t%-20s\t%s", "Client Host",
+          "Client Name", "Open File Path"));
+    while (openFilesIterator.hasNext()) {
+      OpenFileEntry openFileEntry = openFilesIterator.next();
+      System.out.println(String.format("%-20s\t%-20s\t%20s",
+          openFileEntry.getClientMachine(),
+          openFileEntry.getClientName(),
+          openFileEntry.getFilePath()));
+    }
+  }
+
   /**
    * Command to ask the namenode to set the balancer bandwidth for all of the
    * datanodes.
@@ -1138,6 +1182,10 @@ public class DFSAdmin extends FsShell {
         + "\tIf 'incremental' is specified, it will be an incremental\n"
         + "\tblock report; otherwise, it will be a full block report.\n";
 
+    String listOpenFiles = "-listOpenFiles\n"
+        + "\tList all open files currently managed by the NameNode along\n"
+        + "\twith client name and client machine accessing them.\n";
+
     String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
       "\t\tis specified.\n";
 
@@ -1203,6 +1251,8 @@ public class DFSAdmin extends FsShell {
       System.out.println(evictWriters);
     } else if ("getDatanodeInfo".equalsIgnoreCase(cmd)) {
       System.out.println(getDatanodeInfo);
+    } else if ("listOpenFiles".equalsIgnoreCase(cmd)) {
+      System.out.println(listOpenFiles);
     } else if ("help".equals(cmd)) {
       System.out.println(help);
     } else {
@@ -1238,6 +1288,7 @@ public class DFSAdmin extends FsShell {
       System.out.println(evictWriters);
       System.out.println(getDatanodeInfo);
       System.out.println(triggerBlockReport);
+      System.out.println(listOpenFiles);
       System.out.println(help);
       System.out.println();
       ToolRunner.printGenericCommandUsage(System.out);
@@ -1879,6 +1930,8 @@ public class DFSAdmin extends FsShell {
     } else if ("-triggerBlockReport".equals(cmd)) {
       System.err.println("Usage: hdfs dfsadmin"
           + " [-triggerBlockReport [-incremental] <datanode_host:ipc_port>]");
+    } else if ("-listOpenFiles".equals(cmd)) {
+      System.err.println("Usage: hdfs dfsadmin [-listOpenFiles]");
     } else {
       System.err.println("Usage: hdfs dfsadmin");
       System.err.println("Note: Administrative commands can only be run as the HDFS superuser.");
@@ -2032,6 +2085,11 @@ public class DFSAdmin extends FsShell {
         printUsage(cmd);
         return exitCode;
       }
+    } else if ("-listOpenFiles".equals(cmd)) {
+      if (argv.length != 1) {
+        printUsage(cmd);
+        return exitCode;
+      }
     }
     
     // initialize DFSAdmin
@@ -2113,6 +2171,8 @@ public class DFSAdmin extends FsShell {
         exitCode = reconfig(argv, i);
       } else if ("-triggerBlockReport".equals(cmd)) {
         exitCode = triggerBlockReport(argv);
+      } else if ("-listOpenFiles".equals(cmd)) {
+        exitCode = listOpenFiles();
       } else if ("-help".equals(cmd)) {
         if (i < argv.length) {
           printHelp(argv[i]);

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -2788,6 +2788,16 @@
   </description>
 </property>
 
+  <property>
+    <name>dfs.namenode.list.openfiles.num.responses</name>
+    <value>1000</value>
+    <description>
+      When listing open files, the maximum number of open files that will be
+      returned in a single batch. Fetching the list incrementally in batches
+      improves namenode performance.
+    </description>
+  </property>
+
 <property>
   <name>dfs.namenode.edekcacheloader.interval.ms</name>
   <value>1000</value>

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md

@@ -370,6 +370,7 @@ Usage:
         hdfs dfsadmin [-getDatanodeInfo <datanode_host:ipc_port>]
         hdfs dfsadmin [-metasave filename]
         hdfs dfsadmin [-triggerBlockReport [-incremental] <datanode_host:ipc_port>]
+        hdfs dfsadmin [-listOpenFiles]
         hdfs dfsadmin [-help [cmd]]
 
 | COMMAND\_OPTION | Description |
@@ -406,6 +407,7 @@ Usage:
 | `-getDatanodeInfo` \<datanode\_host:ipc\_port\> | Get the information about the given datanode. See [Rolling Upgrade document](./HdfsRollingUpgrade.html#dfsadmin_-getDatanodeInfo) for the detail. |
 | `-metasave` filename | Save Namenode's primary data structures to *filename* in the directory specified by hadoop.log.dir property. *filename* is overwritten if it exists. *filename* will contain one line for each of the following<br/>1. Datanodes heart beating with Namenode<br/>2. Blocks waiting to be replicated<br/>3. Blocks currently being replicated<br/>4. Blocks waiting to be deleted |
 | `-triggerBlockReport` `[-incremental]` \<datanode\_host:ipc\_port\> | Trigger a block report for the given datanode. If 'incremental' is specified, it will be otherwise, it will be a full block report. |
+| `-listOpenFiles` | List all open files currently managed by the NameNode along with client name and client machine accessing them. |
 | `-help` [cmd] | Displays help for the given command or all commands if none is specified. |
 
 Runs a HDFS dfsadmin client.

+ 38 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java

@@ -62,8 +62,10 @@ import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
@@ -1863,8 +1865,8 @@ public class DFSTestUtil {
     }, 100, waitTime);
   }
 
- /**
-   * Change the length of a block at datanode dnIndex
+  /**
+   * Change the length of a block at datanode dnIndex.
    */
   public static boolean changeReplicaLength(MiniDFSCluster cluster,
       ExtendedBlock blk, int dnIndex, int lenDelta) throws IOException {
@@ -2249,4 +2251,38 @@ public class DFSTestUtil {
       assertFalse("File in trash : " + trashPath, fs.exists(trashPath));
     }
   }
+
+  public static Map<Path, FSDataOutputStream> createOpenFiles(FileSystem fs,
+      String filePrefix, int numFilesToCreate) throws IOException {
+    final Map<Path, FSDataOutputStream> filesCreated = new HashMap<>();
+    final byte[] buffer = new byte[(int) (1024 * 1.75)];
+    final Random rand = new Random(0xFEED0BACL);
+    for (int i = 0; i < numFilesToCreate; i++) {
+      Path file = new Path("/" + filePrefix + "-" + i);
+      FSDataOutputStream stm = fs.create(file, true, 1024, (short) 1, 1024);
+      rand.nextBytes(buffer);
+      stm.write(buffer);
+      filesCreated.put(file, stm);
+    }
+    return filesCreated;
+  }
+
+  public static HashSet<Path> closeOpenFiles(
+      HashMap<Path, FSDataOutputStream> openFilesMap,
+      int numFilesToClose) throws IOException {
+    HashSet<Path> closedFiles = new HashSet<>();
+    for (Iterator<Entry<Path, FSDataOutputStream>> it =
+         openFilesMap.entrySet().iterator(); it.hasNext();) {
+      Entry<Path, FSDataOutputStream> entry = it.next();
+      LOG.info("Closing file: " + entry.getKey());
+      entry.getValue().close();
+      closedFiles.add(entry.getKey());
+      it.remove();
+      numFilesToClose--;
+      if (numFilesToClose == 0) {
+        break;
+      }
+    }
+    return closedFiles;
+  }
 }

+ 59 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHdfsAdmin.java

@@ -18,12 +18,14 @@
 package org.apache.hadoop.hdfs;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -31,11 +33,14 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
 import org.apache.hadoop.fs.BlockStoragePolicySpi;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.junit.After;
 import org.junit.Assert;
@@ -49,11 +54,15 @@ public class TestHdfsAdmin {
   private static final Path TEST_PATH = new Path("/test");
   private static final short REPL = 1;
   private static final int SIZE = 128;
+  private static final int OPEN_FILES_BATCH_SIZE = 5;
   private final Configuration conf = new Configuration();
   private MiniDFSCluster cluster;
 
   @Before
   public void setUpCluster() throws IOException {
+    conf.setLong(
+        DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES,
+        OPEN_FILES_BATCH_SIZE);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
     cluster.waitActive();
   }
@@ -205,4 +214,54 @@ public class TestHdfsAdmin {
     Assert.assertNotNull("should not return null for an encrypted cluster",
         hdfsAdmin.getKeyProvider());
   }
+
+  @Test(timeout = 120000L)
+  public void testListOpenFiles() throws IOException {
+    HashSet<Path> closedFileSet = new HashSet<>();
+    HashMap<Path, FSDataOutputStream> openFileMap = new HashMap<>();
+    FileSystem fs = FileSystem.get(conf);
+    verifyOpenFiles(closedFileSet, openFileMap);
+
+    int numClosedFiles = OPEN_FILES_BATCH_SIZE * 4;
+    int numOpenFiles = (OPEN_FILES_BATCH_SIZE * 3) + 1;
+    for (int i = 0; i < numClosedFiles; i++) {
+      Path filePath = new Path("/closed-file-" + i);
+      DFSTestUtil.createFile(fs, filePath, SIZE, REPL, 0);
+      closedFileSet.add(filePath);
+    }
+    verifyOpenFiles(closedFileSet, openFileMap);
+
+    openFileMap.putAll(
+        DFSTestUtil.createOpenFiles(fs, "open-file-1", numOpenFiles));
+    verifyOpenFiles(closedFileSet, openFileMap);
+
+    closedFileSet.addAll(DFSTestUtil.closeOpenFiles(openFileMap,
+        openFileMap.size() / 2));
+    verifyOpenFiles(closedFileSet, openFileMap);
+
+    openFileMap.putAll(
+        DFSTestUtil.createOpenFiles(fs, "open-file-2", 10));
+    verifyOpenFiles(closedFileSet, openFileMap);
+
+    while(openFileMap.size() > 0) {
+      closedFileSet.addAll(DFSTestUtil.closeOpenFiles(openFileMap, 1));
+      verifyOpenFiles(closedFileSet, openFileMap);
+    }
+  }
+
+  private void verifyOpenFiles(HashSet<Path> closedFiles,
+      HashMap<Path, FSDataOutputStream> openFileMap) throws IOException {
+    HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+    HashSet<Path> openFiles = new HashSet<>(openFileMap.keySet());
+    RemoteIterator<OpenFileEntry> openFilesRemoteItr =
+        hdfsAdmin.listOpenFiles();
+    while (openFilesRemoteItr.hasNext()) {
+      String filePath = openFilesRemoteItr.next().getFilePath();
+      assertFalse(filePath + " should not be listed under open files!",
+          closedFiles.contains(filePath));
+      assertTrue(filePath + " is not listed under open files!",
+          openFiles.remove(new Path(filePath)));
+    }
+    assertTrue("Not all open files are listed!", openFiles.isEmpty());
+  }
 }

+ 10 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java

@@ -40,6 +40,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -190,6 +191,7 @@ public class TestLeaseManager {
   @Test (timeout = 60000)
   public void testInodeWithLeases() throws Exception {
     FSNamesystem fsNamesystem = makeMockFsNameSystem();
+    when(fsNamesystem.getMaxListOpenFilesResponses()).thenReturn(1024);
     FSDirectory fsDirectory = fsNamesystem.getFSDirectory();
     LeaseManager lm = new LeaseManager(fsNamesystem);
     Set<Long> iNodeIds = new HashSet<>(Arrays.asList(
@@ -208,6 +210,7 @@ public class TestLeaseManager {
 
     for (Long iNodeId : iNodeIds) {
       INodeFile iNodeFile = stubInodeFile(iNodeId);
+      iNodeFile.toUnderConstruction("hbase", "gce-100");
       iNodeFile.setParent(rootInodeDirectory);
       when(fsDirectory.getInode(iNodeId)).thenReturn(iNodeFile);
       lm.addLease("holder_" + iNodeId, iNodeId);
@@ -230,6 +233,7 @@ public class TestLeaseManager {
   @Test (timeout = 240000)
   public void testInodeWithLeasesAtScale() throws Exception {
     FSNamesystem fsNamesystem = makeMockFsNameSystem();
+    when(fsNamesystem.getMaxListOpenFilesResponses()).thenReturn(4096);
     FSDirectory fsDirectory = fsNamesystem.getFSDirectory();
     LeaseManager lm = new LeaseManager(fsNamesystem);
 
@@ -275,7 +279,7 @@ public class TestLeaseManager {
 
   private void testInodeWithLeasesAtScaleImpl(final LeaseManager leaseManager,
       final FSDirectory fsDirectory, INodeDirectory ancestorDirectory,
-      int scale) {
+      int scale) throws IOException {
     verifyINodeLeaseCounts(leaseManager, ancestorDirectory, 0, 0, 0);
 
     Set<Long> iNodeIds = new HashSet<>();
@@ -284,6 +288,7 @@ public class TestLeaseManager {
     }
     for (Long iNodeId : iNodeIds) {
       INodeFile iNodeFile = stubInodeFile(iNodeId);
+      iNodeFile.toUnderConstruction("hbase", "gce-100");
       iNodeFile.setParent(ancestorDirectory);
       when(fsDirectory.getInode(iNodeId)).thenReturn(iNodeFile);
       leaseManager.addLease("holder_" + iNodeId, iNodeId);
@@ -386,13 +391,16 @@ public class TestLeaseManager {
 
   private void verifyINodeLeaseCounts(final LeaseManager leaseManager,
       INodeDirectory ancestorDirectory, int iNodeIdWithLeaseCount,
-      int iNodeWithLeaseCount, int iNodeUnderAncestorLeaseCount) {
+      int iNodeWithLeaseCount, int iNodeUnderAncestorLeaseCount)
+      throws IOException {
     assertEquals(iNodeIdWithLeaseCount,
         leaseManager.getINodeIdWithLeases().size());
     assertEquals(iNodeWithLeaseCount,
         leaseManager.getINodeWithLeases().size());
     assertEquals(iNodeUnderAncestorLeaseCount,
         leaseManager.getINodeWithLeases(ancestorDirectory).size());
+    assertEquals(iNodeIdWithLeaseCount,
+        leaseManager.getUnderConstructionFiles(0).size());
   }
 
   private Map<String, INode> createINodeTree(INodeDirectory parentDir,

+ 234 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListOpenFiles.java

@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HAUtil;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Verify open files listing.
+ */
+public class TestListOpenFiles {
+  private static final int NUM_DATA_NODES = 3;
+  private static final int BATCH_SIZE = 5;
+  private static MiniDFSCluster cluster = null;
+  private static DistributedFileSystem fs = null;
+  private static NamenodeProtocols nnRpc = null;
+  private static final Log LOG = LogFactory.getLog(TestListOpenFiles.class);
+
+  @Before
+  public void setUp() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+    conf.setLong(
+        DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, BATCH_SIZE);
+    cluster = new MiniDFSCluster.Builder(conf).
+        numDataNodes(NUM_DATA_NODES).build();
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    nnRpc = cluster.getNameNodeRpc();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (fs != null) {
+      fs.close();
+    }
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test(timeout = 120000L)
+  public void testListOpenFilesViaNameNodeRPC() throws Exception {
+    HashMap<Path, FSDataOutputStream> openFiles = new HashMap<>();
+    createFiles(fs, "closed", 10);
+    verifyOpenFiles(openFiles);
+
+    BatchedEntries<OpenFileEntry> openFileEntryBatchedEntries =
+        nnRpc.listOpenFiles(0);
+    assertTrue("Open files list should be empty!",
+        openFileEntryBatchedEntries.size() == 0);
+
+    openFiles.putAll(
+        DFSTestUtil.createOpenFiles(fs, "open-1", 1));
+    verifyOpenFiles(openFiles);
+
+    openFiles.putAll(
+        DFSTestUtil.createOpenFiles(fs, "open-2",
+        (BATCH_SIZE * 2 + BATCH_SIZE / 2)));
+    verifyOpenFiles(openFiles);
+
+    DFSTestUtil.closeOpenFiles(openFiles, openFiles.size() / 2);
+    verifyOpenFiles(openFiles);
+
+    openFiles.putAll(
+        DFSTestUtil.createOpenFiles(fs, "open-3", (BATCH_SIZE * 5)));
+    verifyOpenFiles(openFiles);
+
+    while(openFiles.size() > 0) {
+      DFSTestUtil.closeOpenFiles(openFiles, 1);
+      verifyOpenFiles(openFiles);
+    }
+  }
+
+  private void verifyOpenFiles(Map<Path, FSDataOutputStream> openFiles)
+      throws IOException {
+    HashSet<Path> remainingFiles = new HashSet<>(openFiles.keySet());
+    OpenFileEntry lastEntry = null;
+    BatchedEntries<OpenFileEntry> batchedEntries;
+    do {
+      if (lastEntry == null) {
+        batchedEntries = nnRpc.listOpenFiles(0);
+      } else {
+        batchedEntries = nnRpc.listOpenFiles(lastEntry.getId());
+      }
+      assertTrue("Incorrect open files list size!",
+          batchedEntries.size() <= BATCH_SIZE);
+      for (int i = 0; i < batchedEntries.size(); i++) {
+        lastEntry = batchedEntries.get(i);
+        String filePath = lastEntry.getFilePath();
+        LOG.info("OpenFile: " + filePath);
+        assertTrue("Unexpected open file: " + filePath,
+            remainingFiles.remove(new Path(filePath)));
+      }
+    } while (batchedEntries.hasMore());
+    assertTrue(remainingFiles.size() + " open files not listed!",
+        remainingFiles.size() == 0);
+  }
+
+  private Set<Path> createFiles(FileSystem fileSystem, String fileNamePrefix,
+      int numFilesToCreate) throws IOException {
+    HashSet<Path> files = new HashSet<>();
+    for (int i = 0; i < numFilesToCreate; i++) {
+      Path filePath = new Path(fileNamePrefix + "-" + i);
+      DFSTestUtil.createFile(fileSystem, filePath, 1024, (short) 3, 1);
+    }
+    return files;
+  }
+
+  /**
+   * Verify dfsadmin -listOpenFiles command in HA mode.
+   */
+  @Test(timeout = 120000)
+  public void testListOpenFilesInHA() throws Exception {
+    fs.close();
+    cluster.shutdown();
+    HdfsConfiguration haConf = new HdfsConfiguration();
+    haConf.setLong(
+        DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, BATCH_SIZE);
+    MiniDFSCluster haCluster =
+        new MiniDFSCluster.Builder(haConf)
+        .nnTopology(MiniDFSNNTopology.simpleHATopology())
+        .numDataNodes(0)
+        .build();
+    try {
+      HATestUtil.setFailoverConfigurations(haCluster, haConf);
+      FileSystem fileSystem = HATestUtil.configureFailoverFs(haCluster, haConf);
+
+      List<ClientProtocol> namenodes =
+          HAUtil.getProxiesForAllNameNodesInNameservice(haConf,
+              HATestUtil.getLogicalHostname(haCluster));
+      haCluster.transitionToActive(0);
+      assertTrue(HAUtil.isAtLeastOneActive(namenodes));
+
+      final byte[] data = new byte[1024];
+      ThreadLocalRandom.current().nextBytes(data);
+      DFSTestUtil.createOpenFiles(fileSystem, "ha-open-file",
+          ((BATCH_SIZE * 4) + (BATCH_SIZE / 2)));
+
+      final DFSAdmin dfsAdmin = new DFSAdmin(haConf);
+      final AtomicBoolean failoverCompleted = new AtomicBoolean(false);
+      final AtomicBoolean listOpenFilesError = new AtomicBoolean(false);
+      final int listingIntervalMsec = 250;
+      Thread clientThread = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          while(!failoverCompleted.get()) {
+            try {
+              assertEquals(0, ToolRunner.run(dfsAdmin,
+                  new String[] {"-listOpenFiles"}));
+              // Sleep for some time to avoid
+              // flooding logs with listing.
+              Thread.sleep(listingIntervalMsec);
+            } catch (Exception e) {
+              listOpenFilesError.set(true);
+              LOG.info("Error listing open files: ", e);
+              break;
+            }
+          }
+        }
+      });
+      clientThread.start();
+
+      // Let client list open files for few
+      // times before the NN failover.
+      Thread.sleep(listingIntervalMsec * 2);
+
+      LOG.info("Shutting down Active NN0!");
+      haCluster.shutdownNameNode(0);
+      LOG.info("Transitioning NN1 to Active!");
+      haCluster.transitionToActive(1);
+      failoverCompleted.set(true);
+
+      assertEquals(0, ToolRunner.run(dfsAdmin,
+          new String[] {"-listOpenFiles"}));
+      assertFalse("Client Error!", listOpenFilesError.get());
+
+      clientThread.join();
+    } finally {
+      if (haCluster != null) {
+        haCluster.shutdown();
+      }
+    }
+  }
+}

+ 72 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java

@@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.ReconfigurationUtil;
 import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSClient;
@@ -60,6 +61,8 @@ import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Scanner;
 import java.util.concurrent.TimeoutException;
@@ -593,6 +596,75 @@ public class TestDFSAdmin {
     }
   }
 
+  @Test(timeout = 300000L)
+  public void testListOpenFiles() throws Exception {
+    redirectStream();
+
+    final Configuration dfsConf = new HdfsConfiguration();
+    dfsConf.setInt(
+        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
+    dfsConf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    dfsConf.setLong(DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, 5);
+    final Path baseDir = new Path(
+        PathUtils.getTestDir(getClass()).getAbsolutePath(),
+        GenericTestUtils.getMethodName());
+    dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.toString());
+
+    final int numDataNodes = 3;
+    final int numClosedFiles = 25;
+    final int numOpenFiles = 15;
+
+    try(MiniDFSCluster miniCluster = new MiniDFSCluster
+        .Builder(dfsConf)
+        .numDataNodes(numDataNodes).build()) {
+      final short replFactor = 1;
+      final long fileLength = 512L;
+      final FileSystem fs = miniCluster.getFileSystem();
+      final Path parentDir = new Path("/tmp/files/");
+
+      fs.mkdirs(parentDir);
+      HashSet<Path> closedFileSet = new HashSet<>();
+      for (int i = 0; i < numClosedFiles; i++) {
+        Path file = new Path(parentDir, "closed-file-" + i);
+        DFSTestUtil.createFile(fs, file, fileLength, replFactor, 12345L);
+        closedFileSet.add(file);
+      }
+
+      HashMap<Path, FSDataOutputStream> openFilesMap = new HashMap<>();
+      for (int i = 0; i < numOpenFiles; i++) {
+        Path file = new Path(parentDir, "open-file-" + i);
+        DFSTestUtil.createFile(fs, file, fileLength, replFactor, 12345L);
+        FSDataOutputStream outputStream = fs.append(file);
+        openFilesMap.put(file, outputStream);
+      }
+
+      final DFSAdmin dfsAdmin = new DFSAdmin(dfsConf);
+      assertEquals(0, ToolRunner.run(dfsAdmin,
+          new String[]{"-listOpenFiles"}));
+      verifyOpenFilesListing(closedFileSet, openFilesMap);
+
+      for (int count = 0; count < numOpenFiles; count++) {
+        closedFileSet.addAll(DFSTestUtil.closeOpenFiles(openFilesMap, 1));
+        resetStream();
+        assertEquals(0, ToolRunner.run(dfsAdmin,
+            new String[]{"-listOpenFiles"}));
+        verifyOpenFilesListing(closedFileSet, openFilesMap);
+      }
+    }
+  }
+
+  private void verifyOpenFilesListing(HashSet<Path> closedFileSet,
+      HashMap<Path, FSDataOutputStream> openFilesMap) {
+    final String outStr = scanIntoString(out);
+    LOG.info("dfsadmin -listOpenFiles output: \n" + out);
+    for (Path closedFilePath : closedFileSet) {
+      assertThat(outStr, not(containsString(closedFilePath.toString() + "\n")));
+    }
+    for (Path openFilePath : openFilesMap.keySet()) {
+      assertThat(outStr, is(containsString(openFilePath.toString() + "\n")));
+    }
+  }
+
   private void verifyNodesAndCorruptBlocks(
       final int numDn,
       final int numLiveDn,