Browse Source

HDFS-7496. Fix FsVolume removal race conditions on the DataNode by reference-counting the volume instances (lei via cmccabe)

Colin Patrick Mccabe 10 năm trước cách đây
mục cha
commit
b7f4a3156c
24 tập tin đã thay đổi với 717 bổ sung220 xóa
  1. 19 39
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
  2. 10 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
  3. 49 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaHandler.java
  4. 7 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
  5. 48 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeReference.java
  6. 10 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
  7. 5 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java
  8. 11 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
  9. 180 104
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
  10. 120 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
  11. 62 14
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
  12. 11 8
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
  13. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
  14. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
  15. 21 14
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
  16. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
  17. 28 5
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
  18. 8 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
  19. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
  20. 11 11
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
  21. 7 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
  22. 94 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeListTest.java
  23. 5 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
  24. 7 5
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java

+ 19 - 39
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -26,7 +26,6 @@ import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileDescriptor;
 import java.io.FileOutputStream;
-import java.io.FileWriter;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
@@ -49,10 +48,8 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
@@ -125,6 +122,8 @@ class BlockReceiver implements Closeable {
 
   private boolean syncOnClose;
   private long restartBudget;
+  /** the reference of the volume where the block receiver writes to */
+  private final ReplicaHandler replicaHandler;
 
   /**
    * for replaceBlock response
@@ -179,48 +178,50 @@ class BlockReceiver implements Closeable {
       // Open local disk out
       //
       if (isDatanode) { //replication or move
-        replicaInfo = datanode.data.createTemporary(storageType, block);
+        replicaHandler = datanode.data.createTemporary(storageType, block);
       } else {
         switch (stage) {
         case PIPELINE_SETUP_CREATE:
-          replicaInfo = datanode.data.createRbw(storageType, block, allowLazyPersist);
+          replicaHandler = datanode.data.createRbw(storageType, block, allowLazyPersist);
           datanode.notifyNamenodeReceivingBlock(
-              block, replicaInfo.getStorageUuid());
+              block, replicaHandler.getReplica().getStorageUuid());
           break;
         case PIPELINE_SETUP_STREAMING_RECOVERY:
-          replicaInfo = datanode.data.recoverRbw(
+          replicaHandler = datanode.data.recoverRbw(
               block, newGs, minBytesRcvd, maxBytesRcvd);
           block.setGenerationStamp(newGs);
           break;
         case PIPELINE_SETUP_APPEND:
-          replicaInfo = datanode.data.append(block, newGs, minBytesRcvd);
+          replicaHandler = datanode.data.append(block, newGs, minBytesRcvd);
           if (datanode.blockScanner != null) { // remove from block scanner
             datanode.blockScanner.deleteBlock(block.getBlockPoolId(),
                 block.getLocalBlock());
           }
           block.setGenerationStamp(newGs);
           datanode.notifyNamenodeReceivingBlock(
-              block, replicaInfo.getStorageUuid());
+              block, replicaHandler.getReplica().getStorageUuid());
           break;
         case PIPELINE_SETUP_APPEND_RECOVERY:
-          replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
+          replicaHandler = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
           if (datanode.blockScanner != null) { // remove from block scanner
             datanode.blockScanner.deleteBlock(block.getBlockPoolId(),
                 block.getLocalBlock());
           }
           block.setGenerationStamp(newGs);
           datanode.notifyNamenodeReceivingBlock(
-              block, replicaInfo.getStorageUuid());
+              block, replicaHandler.getReplica().getStorageUuid());
           break;
         case TRANSFER_RBW:
         case TRANSFER_FINALIZED:
           // this is a transfer destination
-          replicaInfo = datanode.data.createTemporary(storageType, block);
+          replicaHandler =
+              datanode.data.createTemporary(storageType, block);
           break;
         default: throw new IOException("Unsupported stage " + stage + 
               " while receiving block " + block + " from " + inAddr);
         }
       }
+      replicaInfo = replicaHandler.getReplica();
       this.dropCacheBehindWrites = (cachingStrategy.getDropBehind() == null) ?
         datanode.getDnConf().dropCacheBehindWrites :
           cachingStrategy.getDropBehind();
@@ -339,6 +340,9 @@ class BlockReceiver implements Closeable {
     finally{
       IOUtils.closeStream(out);
     }
+    if (replicaHandler != null) {
+      IOUtils.cleanup(null, replicaHandler);
+    }
     if (measuredFlushTime) {
       datanode.metrics.addFlushNanos(flushTotalNanos);
     }
@@ -950,15 +954,12 @@ class BlockReceiver implements Closeable {
     //
     byte[] buf = new byte[sizePartialChunk];
     byte[] crcbuf = new byte[checksumSize];
-    ReplicaInputStreams instr = null;
-    try { 
-      instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff);
+    try (ReplicaInputStreams instr =
+        datanode.data.getTmpInputStreams(block, blkoff, ckoff)) {
       IOUtils.readFully(instr.getDataIn(), buf, 0, sizePartialChunk);
 
       // open meta file and read in crc value computer earlier
       IOUtils.readFully(instr.getChecksumIn(), crcbuf, 0, crcbuf.length);
-    } finally {
-      IOUtils.closeStream(instr);
     }
 
     // compute crc of partial chunk from data read in the block file.
@@ -1244,28 +1245,7 @@ class BlockReceiver implements Closeable {
 
           if (lastPacketInBlock) {
             // Finalize the block and close the block file
-            try {
-              finalizeBlock(startTime);
-            } catch (ReplicaNotFoundException e) {
-              // Verify that the exception is due to volume removal.
-              FsVolumeSpi volume;
-              synchronized (datanode.data) {
-                volume = datanode.data.getVolume(block);
-              }
-              if (volume == null) {
-                // ReplicaInfo has been removed due to the corresponding data
-                // volume has been removed. Don't need to check disk error.
-                LOG.info(myString
-                    + ": BlockReceiver is interrupted because the block pool "
-                    + block.getBlockPoolId() + " has been removed.", e);
-                sendAckUpstream(ack, expected, totalAckTimeNanos, 0,
-                    Status.OOB_INTERRUPTED);
-                running = false;
-                receiverThread.interrupt();
-                continue;
-              }
-              throw e;
-            }
+            finalizeBlock(startTime);
           }
 
           sendAckUpstream(ack, expected, totalAckTimeNanos,

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
@@ -143,6 +144,8 @@ class BlockSender implements java.io.Closeable {
   
   /** The file descriptor of the block being sent */
   private FileDescriptor blockInFd;
+  /** The reference to the volume where the block is located */
+  private FsVolumeReference volumeRef;
 
   // Cache-management related fields
   private final long readaheadLength;
@@ -257,6 +260,9 @@ class BlockSender implements java.io.Closeable {
       this.transferToAllowed = datanode.getDnConf().transferToAllowed &&
         (!is32Bit || length <= Integer.MAX_VALUE);
 
+      // Obtain a reference before reading data
+      this.volumeRef = datanode.data.getVolume(block).obtainReference();
+
       /* 
        * (corruptChecksumOK, meta_file_exist): operation
        * True,   True: will verify checksum  
@@ -420,6 +426,10 @@ class BlockSender implements java.io.Closeable {
       blockIn = null;
       blockInFd = null;
     }
+    if (volumeRef != null) {
+      IOUtils.cleanup(null, volumeRef);
+      volumeRef = null;
+    }
     // throw IOException if there is any
     if(ioe!= null) {
       throw ioe;

+ 49 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaHandler.java

@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * This class includes a replica being actively written and the reference to
+ * the fs volume where this replica is located.
+ */
+public class ReplicaHandler implements Closeable {
+  private final ReplicaInPipelineInterface replica;
+  private final FsVolumeReference volumeReference;
+
+  public ReplicaHandler(
+      ReplicaInPipelineInterface replica, FsVolumeReference reference) {
+    this.replica = replica;
+    this.volumeReference = reference;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (this.volumeReference != null) {
+      volumeReference.close();
+    }
+  }
+
+  public ReplicaInPipelineInterface getReplica() {
+    return replica;
+  }
+}

+ 7 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.Replica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
@@ -198,7 +199,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    * @return the meta info of the replica which is being written to
    * @throws IOException if an error occurs
    */
-  public ReplicaInPipelineInterface createTemporary(StorageType storageType,
+  public ReplicaHandler createTemporary(StorageType storageType,
       ExtendedBlock b) throws IOException;
 
   /**
@@ -208,7 +209,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    * @return the meta info of the replica which is being written to
    * @throws IOException if an error occurs
    */
-  public ReplicaInPipelineInterface createRbw(StorageType storageType,
+  public ReplicaHandler createRbw(StorageType storageType,
       ExtendedBlock b, boolean allowLazyPersist) throws IOException;
 
   /**
@@ -221,7 +222,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    * @return the meta info of the replica which is being written to
    * @throws IOException if an error occurs
    */
-  public ReplicaInPipelineInterface recoverRbw(ExtendedBlock b, 
+  public ReplicaHandler recoverRbw(ExtendedBlock b,
       long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException;
 
   /**
@@ -241,7 +242,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    * @return the meata info of the replica which is being written to
    * @throws IOException
    */
-  public ReplicaInPipelineInterface append(ExtendedBlock b, long newGS,
+  public ReplicaHandler append(ExtendedBlock b, long newGS,
       long expectedBlockLen) throws IOException;
 
   /**
@@ -254,8 +255,8 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    * @return the meta info of the replica which is being written to
    * @throws IOException
    */
-  public ReplicaInPipelineInterface recoverAppend(ExtendedBlock b, long newGS,
-      long expectedBlockLen) throws IOException;
+  public ReplicaHandler recoverAppend(
+      ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException;
   
   /**
    * Recover a failed pipeline close

+ 48 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeReference.java

@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * This is the interface for holding reference count as AutoClosable resource.
+ * It increases the reference count by one in the constructor, and decreases
+ * the reference count by one in {@link #close()}.
+ *
+ * <pre>
+ *  {@code
+ *    try (FsVolumeReference ref = volume.obtainReference()) {
+ *      // Do IOs on the volume
+ *      volume.createRwb(...);
+ *      ...
+ *    }
+ *  }
+ * </pre>
+ */
+public interface FsVolumeReference extends Closeable {
+  /**
+   * Descrese the reference count of the volume.
+   * @throws IOException it never throws IOException.
+   */
+  @Override
+  public void close() throws IOException;
+
+  /** Returns the underlying volume object */
+  public FsVolumeSpi getVolume();
+}

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
 
 import org.apache.hadoop.hdfs.StorageType;
 
@@ -26,6 +27,15 @@ import org.apache.hadoop.hdfs.StorageType;
  * This is an interface for the underlying volume.
  */
 public interface FsVolumeSpi {
+  /**
+   * Obtain a reference object that had increased 1 reference count of the
+   * volume.
+   *
+   * It is caller's responsibility to close {@link FsVolumeReference} to decrease
+   * the reference count on the volume.
+   */
+  FsVolumeReference obtainReference() throws ClosedChannelException;
+
   /** @return the StorageUuid of the volume */
   public String getStorageID();
 

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java

@@ -30,9 +30,12 @@ import org.apache.hadoop.io.IOUtils;
 public class ReplicaInputStreams implements Closeable {
   private final InputStream dataIn;
   private final InputStream checksumIn;
+  private final FsVolumeReference volumeRef;
 
   /** Create an object with a data input stream and a checksum input stream. */
-  public ReplicaInputStreams(FileDescriptor dataFd, FileDescriptor checksumFd) {
+  public ReplicaInputStreams(FileDescriptor dataFd, FileDescriptor checksumFd,
+      FsVolumeReference volumeRef) {
+    this.volumeRef = volumeRef;
     this.dataIn = new FileInputStream(dataFd);
     this.checksumIn = new FileInputStream(checksumFd);
   }
@@ -51,5 +54,6 @@ public class ReplicaInputStreams implements Closeable {
   public void close() {
     IOUtils.closeStream(dataIn);
     IOUtils.closeStream(checksumIn);
+    IOUtils.cleanup(null, volumeRef);
   }
 }

+ 11 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.io.File;
 import java.io.FileDescriptor;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -31,7 +32,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.io.nativeio.NativeIOException;
 
@@ -200,13 +203,13 @@ class FsDatasetAsyncDiskService {
    * Delete the block file and meta file from the disk asynchronously, adjust
    * dfsUsed statistics accordingly.
    */
-  void deleteAsync(FsVolumeImpl volume, File blockFile, File metaFile,
+  void deleteAsync(FsVolumeReference volumeRef, File blockFile, File metaFile,
       ExtendedBlock block, String trashDirectory) {
     LOG.info("Scheduling " + block.getLocalBlock()
         + " file " + blockFile + " for deletion");
     ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(
-        volume, blockFile, metaFile, block, trashDirectory);
-    execute(volume.getCurrentDir(), deletionTask);
+        volumeRef, blockFile, metaFile, block, trashDirectory);
+    execute(((FsVolumeImpl) volumeRef.getVolume()).getCurrentDir(), deletionTask);
   }
   
   /** A task for deleting a block file and its associated meta file, as well
@@ -216,15 +219,17 @@ class FsDatasetAsyncDiskService {
    *  files are deleted immediately.
    */
   class ReplicaFileDeleteTask implements Runnable {
+    final FsVolumeReference volumeRef;
     final FsVolumeImpl volume;
     final File blockFile;
     final File metaFile;
     final ExtendedBlock block;
     final String trashDirectory;
     
-    ReplicaFileDeleteTask(FsVolumeImpl volume, File blockFile,
+    ReplicaFileDeleteTask(FsVolumeReference volumeRef, File blockFile,
         File metaFile, ExtendedBlock block, String trashDirectory) {
-      this.volume = volume;
+      this.volumeRef = volumeRef;
+      this.volume = (FsVolumeImpl) volumeRef.getVolume();
       this.blockFile = blockFile;
       this.metaFile = metaFile;
       this.block = block;
@@ -281,6 +286,7 @@ class FsDatasetAsyncDiskService {
         LOG.info("Deleted " + block.getBlockPoolId() + " "
             + block.getLocalBlock() + " file " + blockFile);
       }
+      IOUtils.cleanup(null, volumeRef);
     }
   }
 }

+ 180 - 104
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -75,6 +76,7 @@ import org.apache.hadoop.hdfs.server.datanode.Replica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
@@ -82,6 +84,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
@@ -137,22 +140,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override // FsDatasetSpi
   public StorageReport[] getStorageReports(String bpid)
       throws IOException {
-    StorageReport[] reports;
+    List<StorageReport> reports;
     synchronized (statsLock) {
       List<FsVolumeImpl> curVolumes = getVolumes();
-      reports = new StorageReport[curVolumes.size()];
-      int i = 0;
+      reports = new ArrayList<>(curVolumes.size());
       for (FsVolumeImpl volume : curVolumes) {
-        reports[i++] = new StorageReport(volume.toDatanodeStorage(),
-                                         false,
-                                         volume.getCapacity(),
-                                         volume.getDfsUsed(),
-                                         volume.getAvailable(),
-                                         volume.getBlockPoolUsed(bpid));
+        try (FsVolumeReference ref = volume.obtainReference()) {
+          StorageReport sr = new StorageReport(volume.toDatanodeStorage(),
+              false,
+              volume.getCapacity(),
+              volume.getDfsUsed(),
+              volume.getAvailable(),
+              volume.getBlockPoolUsed(bpid));
+          reports.add(sr);
+        } catch (ClosedChannelException e) {
+          continue;
+        }
       }
     }
 
-    return reports;
+    return reports.toArray(new StorageReport[reports.size()]);
   }
 
   @Override
@@ -622,17 +629,24 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, 
                           long blkOffset, long ckoff) throws IOException {
     ReplicaInfo info = getReplicaInfo(b);
-    File blockFile = info.getBlockFile();
-    RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
-    if (blkOffset > 0) {
-      blockInFile.seek(blkOffset);
-    }
-    File metaFile = info.getMetaFile();
-    RandomAccessFile metaInFile = new RandomAccessFile(metaFile, "r");
-    if (ckoff > 0) {
-      metaInFile.seek(ckoff);
+    FsVolumeReference ref = info.getVolume().obtainReference();
+    try {
+      File blockFile = info.getBlockFile();
+      RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
+      if (blkOffset > 0) {
+        blockInFile.seek(blkOffset);
+      }
+      File metaFile = info.getMetaFile();
+      RandomAccessFile metaInFile = new RandomAccessFile(metaFile, "r");
+      if (ckoff > 0) {
+        metaInFile.seek(ckoff);
+      }
+      return new ReplicaInputStreams(
+          blockInFile.getFD(), metaInFile.getFD(), ref);
+    } catch (IOException e) {
+      IOUtils.cleanup(null, ref);
+      throw e;
     }
-    return new ReplicaInputStreams(blockInFile.getFD(), metaInFile.getFD());
   }
 
   static File moveBlockFiles(Block b, File srcfile, File destdir)
@@ -732,26 +746,27 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           + replicaInfo.getVolume().getStorageType());
     }
 
-    FsVolumeImpl targetVolume = volumes.getNextVolume(targetStorageType,
-        block.getNumBytes());
-    File oldBlockFile = replicaInfo.getBlockFile();
-    File oldMetaFile = replicaInfo.getMetaFile();
+    try (FsVolumeReference volumeRef = volumes.getNextVolume(
+        targetStorageType, block.getNumBytes())) {
+      File oldBlockFile = replicaInfo.getBlockFile();
+      File oldMetaFile = replicaInfo.getMetaFile();
+      FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume();
+      // Copy files to temp dir first
+      File[] blockFiles = copyBlockFiles(block.getBlockId(),
+          block.getGenerationStamp(), oldMetaFile, oldBlockFile,
+          targetVolume.getTmpDir(block.getBlockPoolId()),
+          replicaInfo.isOnTransientStorage());
 
-    // Copy files to temp dir first
-    File[] blockFiles = copyBlockFiles(block.getBlockId(),
-        block.getGenerationStamp(), oldMetaFile, oldBlockFile,
-        targetVolume.getTmpDir(block.getBlockPoolId()),
-        replicaInfo.isOnTransientStorage());
+      ReplicaInfo newReplicaInfo = new ReplicaInPipeline(
+          replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(),
+          targetVolume, blockFiles[0].getParentFile(), 0);
+      newReplicaInfo.setNumBytes(blockFiles[1].length());
+      // Finalize the copied files
+      newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
 
-    ReplicaInfo newReplicaInfo = new ReplicaInPipeline(
-        replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(),
-        targetVolume, blockFiles[0].getParentFile(), 0);
-    newReplicaInfo.setNumBytes(blockFiles[1].length());
-    // Finalize the copied files
-    newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
-
-    removeOldReplica(replicaInfo, newReplicaInfo, oldBlockFile, oldMetaFile,
-        oldBlockFile.length(), oldMetaFile.length(), block.getBlockPoolId());
+      removeOldReplica(replicaInfo, newReplicaInfo, oldBlockFile, oldMetaFile,
+          oldBlockFile.length(), oldMetaFile.length(), block.getBlockPoolId());
+    }
 
     // Replace the old block if any to reschedule the scanning.
     datanode.getBlockScanner().addBlock(block);
@@ -870,7 +885,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
 
   @Override  // FsDatasetSpi
-  public synchronized ReplicaInPipeline append(ExtendedBlock b,
+  public synchronized ReplicaHandler append(ExtendedBlock b,
       long newGS, long expectedBlockLen) throws IOException {
     // If the block was successfully finalized because all packets
     // were successfully processed at the Datanode but the ack for
@@ -895,8 +910,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           " expected length is " + expectedBlockLen);
     }
 
-    return append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS,
-        b.getNumBytes());
+    FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
+    ReplicaBeingWritten replica = null;
+    try {
+      replica = append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS,
+          b.getNumBytes());
+    } catch (IOException e) {
+      IOUtils.cleanup(null, ref);
+      throw e;
+    }
+    return new ReplicaHandler(replica, ref);
   }
   
   /** Append to a finalized replica
@@ -1017,22 +1040,30 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     
     return replicaInfo;
   }
-  
+
   @Override  // FsDatasetSpi
-  public synchronized ReplicaInPipeline recoverAppend(ExtendedBlock b,
-      long newGS, long expectedBlockLen) throws IOException {
+  public synchronized ReplicaHandler recoverAppend(
+      ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
     LOG.info("Recover failed append to " + b);
 
     ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
 
-    // change the replica's state/gs etc.
-    if (replicaInfo.getState() == ReplicaState.FINALIZED ) {
-      return append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo, newGS, 
-          b.getNumBytes());
-    } else { //RBW
-      bumpReplicaGS(replicaInfo, newGS);
-      return (ReplicaBeingWritten)replicaInfo;
+    FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
+    ReplicaBeingWritten replica;
+    try {
+      // change the replica's state/gs etc.
+      if (replicaInfo.getState() == ReplicaState.FINALIZED) {
+        replica = append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo,
+                         newGS, b.getNumBytes());
+      } else { //RBW
+        bumpReplicaGS(replicaInfo, newGS);
+        replica = (ReplicaBeingWritten) replicaInfo;
+      }
+    } catch (IOException e) {
+      IOUtils.cleanup(null, ref);
+      throw e;
     }
+    return new ReplicaHandler(replica, ref);
   }
 
   @Override // FsDatasetSpi
@@ -1080,8 +1111,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   @Override // FsDatasetSpi
-  public synchronized ReplicaInPipeline createRbw(StorageType storageType,
-      ExtendedBlock b, boolean allowLazyPersist) throws IOException {
+  public synchronized ReplicaHandler createRbw(
+      StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
+      throws IOException {
     ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
         b.getBlockId());
     if (replicaInfo != null) {
@@ -1090,15 +1122,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       " and thus cannot be created.");
     }
     // create a new block
-    FsVolumeImpl v;
+    FsVolumeReference ref;
     while (true) {
       try {
         if (allowLazyPersist) {
           // First try to place the block on a transient volume.
-          v = volumes.getNextTransientVolume(b.getNumBytes());
+          ref = volumes.getNextTransientVolume(b.getNumBytes());
           datanode.getMetrics().incrRamDiskBlocksWrite();
         } else {
-          v = volumes.getNextVolume(storageType, b.getNumBytes());
+          ref = volumes.getNextVolume(storageType, b.getNumBytes());
         }
       } catch (DiskOutOfSpaceException de) {
         if (allowLazyPersist) {
@@ -1110,18 +1142,25 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       }
       break;
     }
+    FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
     // create an rbw file to hold block in the designated volume
-    File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
+    File f;
+    try {
+      f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
+    } catch (IOException e) {
+      IOUtils.cleanup(null, ref);
+      throw e;
+    }
+
     ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), 
         b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
     volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
-
-    return newReplicaInfo;
+    return new ReplicaHandler(newReplicaInfo, ref);
   }
-  
+
   @Override // FsDatasetSpi
-  public synchronized ReplicaInPipeline recoverRbw(ExtendedBlock b,
-      long newGS, long minBytesRcvd, long maxBytesRcvd)
+  public synchronized ReplicaHandler recoverRbw(
+      ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
       throws IOException {
     LOG.info("Recover RBW replica " + b);
 
@@ -1160,20 +1199,25 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           minBytesRcvd + ", " + maxBytesRcvd + "].");
     }
 
-    // Truncate the potentially corrupt portion.
-    // If the source was client and the last node in the pipeline was lost,
-    // any corrupt data written after the acked length can go unnoticed. 
-    if (numBytes > bytesAcked) {
-      final File replicafile = rbw.getBlockFile();
-      truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked);
-      rbw.setNumBytes(bytesAcked);
-      rbw.setLastChecksumAndDataLen(bytesAcked, null);
-    }
+    FsVolumeReference ref = rbw.getVolume().obtainReference();
+    try {
+      // Truncate the potentially corrupt portion.
+      // If the source was client and the last node in the pipeline was lost,
+      // any corrupt data written after the acked length can go unnoticed.
+      if (numBytes > bytesAcked) {
+        final File replicafile = rbw.getBlockFile();
+        truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked);
+        rbw.setNumBytes(bytesAcked);
+        rbw.setLastChecksumAndDataLen(bytesAcked, null);
+      }
 
-    // bump the replica's generation stamp to newGS
-    bumpReplicaGS(rbw, newGS);
-    
-    return rbw;
+      // bump the replica's generation stamp to newGS
+      bumpReplicaGS(rbw, newGS);
+    } catch (IOException e) {
+      IOUtils.cleanup(null, ref);
+      throw e;
+    }
+    return new ReplicaHandler(rbw, ref);
   }
   
   @Override // FsDatasetSpi
@@ -1238,8 +1282,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   @Override // FsDatasetSpi
-  public synchronized ReplicaInPipeline createTemporary(StorageType storageType,
-      ExtendedBlock b) throws IOException {
+  public synchronized ReplicaHandler createTemporary(
+      StorageType storageType, ExtendedBlock b) throws IOException {
     ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId());
     if (replicaInfo != null) {
       if (replicaInfo.getGenerationStamp() < b.getGenerationStamp()
@@ -1254,14 +1298,22 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
             " and thus cannot be created.");
       }
     }
-    
-    FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes());
+
+    FsVolumeReference ref = volumes.getNextVolume(storageType, b.getNumBytes());
+    FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
     // create a temporary file to hold block in the designated volume
-    File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
+    File f;
+    try {
+      f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
+    } catch (IOException e) {
+      IOUtils.cleanup(null, ref);
+      throw e;
+    }
+
     ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), 
         b.getGenerationStamp(), v, f.getParentFile(), 0);
     volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
-    return newReplicaInfo;
+    return new ReplicaHandler(newReplicaInfo, ref);
   }
 
   /**
@@ -1644,10 +1696,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       // Delete the block asynchronously to make sure we can do it fast enough.
       // It's ok to unlink the block file before the uncache operation
       // finishes.
-      asyncDiskService.deleteAsync(v, f,
-          FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
-          new ExtendedBlock(bpid, invalidBlks[i]),
-          dataStorage.getTrashDirectoryForBlockFile(bpid, f));
+      try {
+        asyncDiskService.deleteAsync(v.obtainReference(), f,
+            FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
+            new ExtendedBlock(bpid, invalidBlks[i]),
+            dataStorage.getTrashDirectoryForBlockFile(bpid, f));
+      } catch (ClosedChannelException e) {
+        LOG.warn("Volume " + v + " is closed, ignore the deletion task for " +
+            "block " + invalidBlks[i]);
+      }
     }
     if (!errors.isEmpty()) {
       StringBuilder b = new StringBuilder("Failed to delete ")
@@ -2282,14 +2339,17 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       ReplicaUnderRecovery replicaInfo, String bpid, long newBlkId, long newGS)
       throws IOException {
     String blockFileName = Block.BLOCK_FILE_PREFIX + newBlkId;
-    FsVolumeImpl v = volumes.getNextVolume(
-        replicaInfo.getVolume().getStorageType(), replicaInfo.getNumBytes());
-    final File tmpDir = v.getBlockPoolSlice(bpid).getTmpDir();
-    final File destDir = DatanodeUtil.idToBlockDir(tmpDir, newBlkId);
-    final File dstBlockFile = new File(destDir, blockFileName);
-    final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS);
-    return copyBlockFiles(replicaInfo.getMetaFile(), replicaInfo.getBlockFile(),
-        dstMetaFile, dstBlockFile, true);
+    try (FsVolumeReference ref = volumes.getNextVolume(
+        replicaInfo.getVolume().getStorageType(), replicaInfo.getNumBytes())) {
+      FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
+      final File tmpDir = v.getBlockPoolSlice(bpid).getTmpDir();
+      final File destDir = DatanodeUtil.idToBlockDir(tmpDir, newBlkId);
+      final File dstBlockFile = new File(destDir, blockFileName);
+      final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS);
+      return copyBlockFiles(replicaInfo.getMetaFile(),
+          replicaInfo.getBlockFile(),
+          dstMetaFile, dstBlockFile, true);
+    }
   }
 
   @Override // FsDatasetSpi
@@ -2345,9 +2405,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     for (FsVolumeImpl volume : getVolumes()) {
       long used = 0;
       long free = 0;
-      try {
+      try (FsVolumeReference ref = volume.obtainReference()) {
         used = volume.getDfsUsed();
         free = volume.getAvailable();
+      } catch (ClosedChannelException e) {
+        continue;
       } catch (IOException e) {
         LOG.warn(e.getMessage());
         used = 0;
@@ -2379,15 +2441,23 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     List<FsVolumeImpl> curVolumes = getVolumes();
     if (!force) {
       for (FsVolumeImpl volume : curVolumes) {
-        if (!volume.isBPDirEmpty(bpid)) {
-          LOG.warn(bpid + " has some block files, cannot delete unless forced");
-          throw new IOException("Cannot delete block pool, "
-              + "it contains some block files");
+        try (FsVolumeReference ref = volume.obtainReference()) {
+          if (!volume.isBPDirEmpty(bpid)) {
+            LOG.warn(bpid + " has some block files, cannot delete unless forced");
+            throw new IOException("Cannot delete block pool, "
+                + "it contains some block files");
+          }
+        } catch (ClosedChannelException e) {
+          // ignore.
         }
       }
     }
     for (FsVolumeImpl volume : curVolumes) {
-      volume.deleteBPDirectories(bpid, force);
+      try (FsVolumeReference ref = volume.obtainReference()) {
+        volume.deleteBPDirectories(bpid, force);
+      } catch (ClosedChannelException e) {
+        // ignore.
+      }
     }
   }
   
@@ -2620,6 +2690,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
      */
     private boolean saveNextReplica() {
       RamDiskReplica block = null;
+      FsVolumeReference targetReference;
       FsVolumeImpl targetVolume;
       ReplicaInfo replicaInfo;
       boolean succeeded = false;
@@ -2637,8 +2708,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
             if (replicaInfo != null &&
                 replicaInfo.getVolume().isTransientStorage()) {
               // Pick a target volume to persist the block.
-              targetVolume = volumes.getNextVolume(
+              targetReference = volumes.getNextVolume(
                   StorageType.DEFAULT, replicaInfo.getNumBytes());
+              targetVolume = (FsVolumeImpl) targetReference.getVolume();
 
               ramDiskReplicaTracker.recordStartLazyPersist(
                   block.getBlockPoolId(), block.getBlockId(), targetVolume);
@@ -2654,7 +2726,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
                   block.getBlockPoolId(), block.getBlockId(),
                   replicaInfo.getGenerationStamp(), block.getCreationTime(),
                   replicaInfo.getMetaFile(), replicaInfo.getBlockFile(),
-                  targetVolume);
+                  targetReference);
             }
           }
         }
@@ -2678,9 +2750,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       // Don't worry about fragmentation for now. We don't expect more than one
       // transient volume per DN.
       for (FsVolumeImpl v : getVolumes()) {
-        if (v.isTransientStorage()) {
-          capacity += v.getCapacity();
-          free += v.getAvailable();
+        try (FsVolumeReference ref = v.obtainReference()) {
+          if (v.isTransientStorage()) {
+            capacity += v.getCapacity();
+            free += v.getAvailable();
+          }
+        } catch (ClosedChannelException e) {
+          // ignore.
         }
       }
 

+ 120 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -31,6 +32,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.DF;
@@ -40,8 +43,10 @@ import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.util.CloseableReferenceCount;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -62,6 +67,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
   private final File currentDir;    // <StorageDirectory>/current
   private final DF usage;           
   private final long reserved;
+  private CloseableReferenceCount reference = new CloseableReferenceCount();
 
   // Disk space reserved for open blocks.
   private AtomicLong reservedForRbw;
@@ -99,6 +105,10 @@ public class FsVolumeImpl implements FsVolumeSpi {
     if (storageType.isTransient()) {
       return null;
     }
+    if (dataset.datanode == null) {
+      // FsVolumeImpl is used in test.
+      return null;
+    }
 
     final int maxNumThreads = dataset.datanode.getConf().getInt(
         DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY,
@@ -116,7 +126,114 @@ public class FsVolumeImpl implements FsVolumeSpi {
     executor.allowCoreThreadTimeOut(true);
     return executor;
   }
-  
+
+  private void printReferenceTraceInfo(String op) {
+    StackTraceElement[] stack = Thread.currentThread().getStackTrace();
+    for (StackTraceElement ste : stack) {
+      switch (ste.getMethodName()) {
+      case "getDfsUsed":
+      case "getBlockPoolUsed":
+      case "getAvailable":
+      case "getVolumeMap":
+        return;
+      default:
+        break;
+      }
+    }
+    FsDatasetImpl.LOG.trace("Reference count: " + op + " " + this + ": " +
+        this.reference.getReferenceCount());
+    FsDatasetImpl.LOG.trace(
+        Joiner.on("\n").join(Thread.currentThread().getStackTrace()));
+  }
+
+  /**
+   * Increase the reference count. The caller must increase the reference count
+   * before issuing IOs.
+   *
+   * @throws IOException if the volume is already closed.
+   */
+  private void reference() throws ClosedChannelException {
+    this.reference.reference();
+    if (FsDatasetImpl.LOG.isTraceEnabled()) {
+      printReferenceTraceInfo("incr");
+    }
+  }
+
+  /**
+   * Decrease the reference count.
+   */
+  private void unreference() {
+    if (FsDatasetImpl.LOG.isTraceEnabled()) {
+      printReferenceTraceInfo("desc");
+    }
+    if (FsDatasetImpl.LOG.isDebugEnabled()) {
+      if (reference.getReferenceCount() <= 0) {
+        FsDatasetImpl.LOG.debug("Decrease reference count <= 0 on " + this +
+          Joiner.on("\n").join(Thread.currentThread().getStackTrace()));
+      }
+    }
+    checkReference();
+    this.reference.unreference();
+  }
+
+  private static class FsVolumeReferenceImpl implements FsVolumeReference {
+    private final FsVolumeImpl volume;
+
+    FsVolumeReferenceImpl(FsVolumeImpl volume) throws ClosedChannelException {
+      this.volume = volume;
+      volume.reference();
+    }
+
+    /**
+     * Decreases the reference count.
+     * @throws IOException it never throws IOException.
+     */
+    @Override
+    public void close() throws IOException {
+      volume.unreference();
+    }
+
+    @Override
+    public FsVolumeSpi getVolume() {
+      return this.volume;
+    }
+  }
+
+  @Override
+  public FsVolumeReference obtainReference() throws ClosedChannelException {
+    return new FsVolumeReferenceImpl(this);
+  }
+
+  private void checkReference() {
+    Preconditions.checkState(reference.getReferenceCount() > 0);
+  }
+
+  /**
+   * Close this volume and wait all other threads to release the reference count
+   * on this volume.
+   * @throws IOException if the volume is closed or the waiting is interrupted.
+   */
+  void closeAndWait() throws IOException {
+    try {
+      this.reference.setClosed();
+    } catch (ClosedChannelException e) {
+      throw new IOException("The volume has already closed.", e);
+    }
+    final int SLEEP_MILLIS = 500;
+    while (this.reference.getReferenceCount() > 0) {
+      if (FsDatasetImpl.LOG.isDebugEnabled()) {
+        FsDatasetImpl.LOG.debug(String.format(
+            "The reference count for %s is %d, wait to be 0.",
+            this, reference.getReferenceCount()));
+      }
+      try {
+        Thread.sleep(SLEEP_MILLIS);
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+    }
+  }
+
   File getCurrentDir() {
     return currentDir;
   }
@@ -250,6 +367,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
    * the block is finalized.
    */
   File createTmpFile(String bpid, Block b) throws IOException {
+    checkReference();
     return getBlockPoolSlice(bpid).createTmpFile(b);
   }
 
@@ -282,6 +400,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
    * the block is finalized.
    */
   File createRbwFile(String bpid, Block b) throws IOException {
+    checkReference();
     reserveSpaceForRbw(b.getNumBytes());
     return getBlockPoolSlice(bpid).createRbwFile(b);
   }

+ 62 - 14
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -28,6 +29,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -58,6 +60,21 @@ class FsVolumeList {
     return Collections.unmodifiableList(Arrays.asList(volumes.get()));
   }
 
+  private FsVolumeReference chooseVolume(List<FsVolumeImpl> list, long blockSize)
+      throws IOException {
+    while (true) {
+      FsVolumeImpl volume = blockChooser.chooseVolume(list, blockSize);
+      try {
+        return volume.obtainReference();
+      } catch (ClosedChannelException e) {
+        FsDatasetImpl.LOG.warn("Chosen a closed volume: " + volume);
+        // blockChooser.chooseVolume returns DiskOutOfSpaceException when the list
+        // is empty, indicating that all volumes are closed.
+        list.remove(volume);
+      }
+    }
+  }
+
   /** 
    * Get next volume.
    *
@@ -65,7 +82,7 @@ class FsVolumeList {
    * @param storageType the desired {@link StorageType} 
    * @return next volume to store the block in.
    */
-  FsVolumeImpl getNextVolume(StorageType storageType, long blockSize)
+  FsVolumeReference getNextVolume(StorageType storageType, long blockSize)
       throws IOException {
     // Get a snapshot of currently available volumes.
     final FsVolumeImpl[] curVolumes = volumes.get();
@@ -75,7 +92,7 @@ class FsVolumeList {
         list.add(v);
       }
     }
-    return blockChooser.chooseVolume(list, blockSize);
+    return chooseVolume(list, blockSize);
   }
 
   /**
@@ -84,7 +101,7 @@ class FsVolumeList {
    * @param blockSize free space needed on the volume
    * @return next volume to store the block in.
    */
-  FsVolumeImpl getNextTransientVolume(long blockSize) throws IOException {
+  FsVolumeReference getNextTransientVolume(long blockSize) throws IOException {
     // Get a snapshot of currently available volumes.
     final List<FsVolumeImpl> curVolumes = getVolumes();
     final List<FsVolumeImpl> list = new ArrayList<>(curVolumes.size());
@@ -93,13 +110,17 @@ class FsVolumeList {
         list.add(v);
       }
     }
-    return blockChooser.chooseVolume(list, blockSize);
+    return chooseVolume(list, blockSize);
   }
 
   long getDfsUsed() throws IOException {
     long dfsUsed = 0L;
     for (FsVolumeImpl v : volumes.get()) {
-      dfsUsed += v.getDfsUsed();
+      try(FsVolumeReference ref = v.obtainReference()) {
+        dfsUsed += v.getDfsUsed();
+      } catch (ClosedChannelException e) {
+        // ignore.
+      }
     }
     return dfsUsed;
   }
@@ -107,7 +128,11 @@ class FsVolumeList {
   long getBlockPoolUsed(String bpid) throws IOException {
     long dfsUsed = 0L;
     for (FsVolumeImpl v : volumes.get()) {
-      dfsUsed += v.getBlockPoolUsed(bpid);
+      try (FsVolumeReference ref = v.obtainReference()) {
+        dfsUsed += v.getBlockPoolUsed(bpid);
+      } catch (ClosedChannelException e) {
+        // ignore.
+      }
     }
     return dfsUsed;
   }
@@ -115,7 +140,11 @@ class FsVolumeList {
   long getCapacity() {
     long capacity = 0L;
     for (FsVolumeImpl v : volumes.get()) {
-      capacity += v.getCapacity();
+      try (FsVolumeReference ref = v.obtainReference()) {
+        capacity += v.getCapacity();
+      } catch (IOException e) {
+        // ignore.
+      }
     }
     return capacity;
   }
@@ -123,7 +152,11 @@ class FsVolumeList {
   long getRemaining() throws IOException {
     long remaining = 0L;
     for (FsVolumeSpi vol : volumes.get()) {
-      remaining += vol.getAvailable();
+      try (FsVolumeReference ref = vol.obtainReference()) {
+        remaining += vol.getAvailable();
+      } catch (ClosedChannelException e) {
+        // ignore
+      }
     }
     return remaining;
   }
@@ -139,7 +172,7 @@ class FsVolumeList {
     for (final FsVolumeImpl v : volumes.get()) {
       Thread t = new Thread() {
         public void run() {
-          try {
+          try (FsVolumeReference ref = v.obtainReference()) {
             FsDatasetImpl.LOG.info("Adding replicas to map for block pool " +
                 bpid + " on volume " + v + "...");
             long startTime = Time.monotonicNow();
@@ -147,6 +180,9 @@ class FsVolumeList {
             long timeTaken = Time.monotonicNow() - startTime;
             FsDatasetImpl.LOG.info("Time to add replicas to map for block pool"
                 + " " + bpid + " on volume " + v + ": " + timeTaken + "ms");
+          } catch (ClosedChannelException e) {
+            FsDatasetImpl.LOG.info("The volume " + v + " is closed while " +
+                "addng replicas, ignored.");
           } catch (IOException ioe) {
             FsDatasetImpl.LOG.info("Caught exception while adding replicas " +
                 "from " + v + ". Will throw later.", ioe);
@@ -189,16 +225,21 @@ class FsVolumeList {
 
       for(Iterator<FsVolumeImpl> i = volumeList.iterator(); i.hasNext(); ) {
         final FsVolumeImpl fsv = i.next();
-        try {
+        try (FsVolumeReference ref = fsv.obtainReference()) {
           fsv.checkDirs();
         } catch (DiskErrorException e) {
-          FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ",e);
+          FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ", e);
           if (removedVols == null) {
-            removedVols = new ArrayList<FsVolumeImpl>(1);
+            removedVols = new ArrayList<>(1);
           }
           removedVols.add(fsv);
           removeVolume(fsv);
           numFailedVolumes++;
+        } catch (ClosedChannelException e) {
+          FsDatasetImpl.LOG.debug("Caught exception when obtaining " +
+            "reference count on closed volume", e);
+        } catch (IOException e) {
+          FsDatasetImpl.LOG.error("Unexpected IOException", e);
         }
       }
       
@@ -221,7 +262,6 @@ class FsVolumeList {
    * @param newVolume the instance of new FsVolumeImpl.
    */
   void addVolume(FsVolumeImpl newVolume) {
-    // Make a copy of volumes to add new volumes.
     while (true) {
       final FsVolumeImpl[] curVolumes = volumes.get();
       final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
@@ -252,6 +292,12 @@ class FsVolumeList {
       if (volumeList.remove(target)) {
         if (volumes.compareAndSet(curVolumes,
             volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
+          try {
+            target.closeAndWait();
+          } catch (IOException e) {
+            FsDatasetImpl.LOG.warn(
+                "Error occurs when waiting volume to close: " + target, e);
+          }
           target.shutdown();
           FsDatasetImpl.LOG.info("Removed volume: " + target);
           break;
@@ -298,7 +344,7 @@ class FsVolumeList {
     for (final FsVolumeImpl v : volumes.get()) {
       Thread t = new Thread() {
         public void run() {
-          try {
+          try (FsVolumeReference ref = v.obtainReference()) {
             FsDatasetImpl.LOG.info("Scanning block pool " + bpid +
                 " on volume " + v + "...");
             long startTime = Time.monotonicNow();
@@ -306,6 +352,8 @@ class FsVolumeList {
             long timeTaken = Time.monotonicNow() - startTime;
             FsDatasetImpl.LOG.info("Time taken to scan block pool " + bpid +
                 " on " + v + ": " + timeTaken + "ms");
+          } catch (ClosedChannelException e) {
+            // ignore.
           } catch (IOException ioe) {
             FsDatasetImpl.LOG.info("Caught exception while scanning " + v +
                 ". Will throw later.", ioe);

+ 11 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java

@@ -21,7 +21,9 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 
+import javax.ws.rs.HEAD;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
@@ -175,13 +177,14 @@ class RamDiskAsyncLazyPersistService {
   void submitLazyPersistTask(String bpId, long blockId,
       long genStamp, long creationTime,
       File metaFile, File blockFile,
-      FsVolumeImpl targetVolume) throws IOException {
+      FsVolumeReference target) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("LazyWriter schedule async task to persist RamDisk block pool id: "
           + bpId + " block id: " + blockId);
     }
 
-    File lazyPersistDir  = targetVolume.getLazyPersistDir(bpId);
+    FsVolumeImpl volume = (FsVolumeImpl)target.getVolume();
+    File lazyPersistDir  = volume.getLazyPersistDir(bpId);
     if (!lazyPersistDir.exists() && !lazyPersistDir.mkdirs()) {
       FsDatasetImpl.LOG.warn("LazyWriter failed to create " + lazyPersistDir);
       throw new IOException("LazyWriter fail to find or create lazy persist dir: "
@@ -190,8 +193,8 @@ class RamDiskAsyncLazyPersistService {
 
     ReplicaLazyPersistTask lazyPersistTask = new ReplicaLazyPersistTask(
         bpId, blockId, genStamp, creationTime, blockFile, metaFile,
-        targetVolume, lazyPersistDir);
-    execute(targetVolume.getCurrentDir(), lazyPersistTask);
+        target, lazyPersistDir);
+    execute(volume.getCurrentDir(), lazyPersistTask);
   }
 
   class ReplicaLazyPersistTask implements Runnable {
@@ -201,13 +204,13 @@ class RamDiskAsyncLazyPersistService {
     final long creationTime;
     final File blockFile;
     final File metaFile;
-    final FsVolumeImpl targetVolume;
+    final FsVolumeReference targetVolume;
     final File lazyPersistDir;
 
     ReplicaLazyPersistTask(String bpId, long blockId,
         long genStamp, long creationTime,
         File blockFile, File metaFile,
-        FsVolumeImpl targetVolume, File lazyPersistDir) {
+        FsVolumeReference targetVolume, File lazyPersistDir) {
       this.bpId = bpId;
       this.blockId = blockId;
       this.genStamp = genStamp;
@@ -230,14 +233,14 @@ class RamDiskAsyncLazyPersistService {
     public void run() {
       boolean succeeded = false;
       final FsDatasetImpl dataset = (FsDatasetImpl)datanode.getFSDataset();
-      try {
+      try (FsVolumeReference ref = this.targetVolume) {
         // No FsDatasetImpl lock for the file copy
         File targetFiles[] = FsDatasetImpl.copyBlockFiles(
             blockId, genStamp, metaFile, blockFile, lazyPersistDir, true);
 
         // Lock FsDataSetImpl during onCompleteLazyPersist callback
         dataset.onCompleteLazyPersist(bpId, blockId,
-                creationTime, targetFiles, targetVolume);
+                creationTime, targetFiles, (FsVolumeImpl)ref.getVolume());
         succeeded = true;
       } catch (Exception e){
         FsDatasetImpl.LOG.warn(

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto

@@ -220,7 +220,7 @@ enum Status {
   CHECKSUM_OK = 6;
   ERROR_UNSUPPORTED = 7;
   OOB_RESTART = 8;            // Quick restart
-  OOB_INTERRUPTED = 9;        // Interrupted
+  OOB_RESERVED1 = 9;          // Reserved
   OOB_RESERVED2 = 10;         // Reserved
   OOB_RESERVED3 = 11;         // Reserved
   IN_PROGRESS = 12;

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java

@@ -97,7 +97,7 @@ public class TestWriteBlockGetsBlockLengthHint {
      * correctly propagate the hint to FsDatasetSpi.
      */
     @Override
-    public synchronized ReplicaInPipelineInterface createRbw(
+    public synchronized ReplicaHandler createRbw(
         StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
         throws IOException {
       assertThat(b.getLocalBlock().getNumBytes(), is(EXPECTED_BLOCK_LENGTH));

+ 21 - 14
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -22,6 +22,7 @@ import java.io.FileDescriptor;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -43,8 +44,8 @@ import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
@@ -147,7 +148,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
         oStream = null;
       }
     }
-
+    
     @Override
     public String getStorageUuid() {
       return storage.getStorageUuid();
@@ -431,6 +432,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
       this.storage = storage;
     }
 
+    @Override
+    public FsVolumeReference obtainReference() throws ClosedChannelException {
+      return null;
+    }
+
     @Override
     public String getStorageID() {
       return storage.getStorageUuid();
@@ -780,8 +786,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   }
 
   @Override // FsDatasetSpi
-  public synchronized ReplicaInPipelineInterface append(ExtendedBlock b,
-      long newGS, long expectedBlockLen) throws IOException {
+  public synchronized ReplicaHandler append(
+      ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
     BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null || !binfo.isFinalized()) {
@@ -789,12 +795,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
           + " is not valid, and cannot be appended to.");
     }
     binfo.unfinalizeBlock();
-    return binfo;
+    return new ReplicaHandler(binfo, null);
   }
 
   @Override // FsDatasetSpi
-  public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b,
-      long newGS, long expectedBlockLen) throws IOException {
+  public synchronized ReplicaHandler recoverAppend(
+      ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException {
     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
     BInfo binfo = map.get(b.getLocalBlock());
     if (binfo == null) {
@@ -807,7 +813,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     map.remove(b);
     binfo.theBlock.setGenerationStamp(newGS);
     map.put(binfo.theBlock, binfo);
-    return binfo;
+    return new ReplicaHandler(binfo, null);
   }
 
   @Override // FsDatasetSpi
@@ -829,8 +835,9 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   }
   
   @Override // FsDatasetSpi
-  public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b,
-      long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException {
+  public synchronized ReplicaHandler recoverRbw(
+      ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
+      throws IOException {
     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
     BInfo binfo = map.get(b.getLocalBlock());
     if ( binfo == null) {
@@ -844,18 +851,18 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     map.remove(b);
     binfo.theBlock.setGenerationStamp(newGS);
     map.put(binfo.theBlock, binfo);
-    return binfo;
+    return new ReplicaHandler(binfo, null);
   }
 
   @Override // FsDatasetSpi
-  public synchronized ReplicaInPipelineInterface createRbw(
+  public synchronized ReplicaHandler createRbw(
       StorageType storageType, ExtendedBlock b,
       boolean allowLazyPersist) throws IOException {
     return createTemporary(storageType, b);
   }
 
   @Override // FsDatasetSpi
-  public synchronized ReplicaInPipelineInterface createTemporary(
+  public synchronized ReplicaHandler createTemporary(
       StorageType storageType, ExtendedBlock b) throws IOException {
     if (isValidBlock(b)) {
           throw new ReplicaAlreadyExistsException("Block " + b + 
@@ -868,7 +875,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
     BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true);
     map.put(binfo.theBlock, binfo);
-    return binfo;
+    return new ReplicaHandler(binfo, null);
   }
 
   synchronized InputStream getBlockInputStream(ExtendedBlock b

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java

@@ -562,7 +562,7 @@ public class TestBlockRecovery {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
     }
     ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(
-        StorageType.DEFAULT, block, false);
+        StorageType.DEFAULT, block, false).getReplica();
     ReplicaOutputStreams streams = null;
     try {
       streams = replicaInfo.createStreams(true,

+ 28 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java

@@ -56,6 +56,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.logging.Log;
@@ -568,7 +570,7 @@ public class TestDataNodeHotSwapVolumes {
   @Test(timeout=180000)
   public void testRemoveVolumeBeingWritten()
       throws InterruptedException, TimeoutException, ReconfigurationException,
-      IOException {
+      IOException, BrokenBarrierException {
     // test against removing volumes on the different DataNode on the pipeline.
     for (int i = 0; i < 3; i++) {
       testRemoveVolumeBeingWrittenForDatanode(i);
@@ -582,7 +584,7 @@ public class TestDataNodeHotSwapVolumes {
    */
   private void testRemoveVolumeBeingWrittenForDatanode(int dataNodeIdx)
       throws IOException, ReconfigurationException, TimeoutException,
-      InterruptedException {
+      InterruptedException, BrokenBarrierException {
     // Starts DFS cluster with 3 DataNodes to form a pipeline.
     startDFSCluster(1, 3);
 
@@ -599,11 +601,27 @@ public class TestDataNodeHotSwapVolumes {
     out.write(writeBuf);
     out.hflush();
 
+    final CyclicBarrier barrier = new CyclicBarrier(2);
+
     List<String> oldDirs = getDataDirs(dn);
-    String newDirs = oldDirs.get(1);  // Remove the first volume.
-    dn.reconfigurePropertyImpl(
-        DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs);
+    final String newDirs = oldDirs.get(1);  // Remove the first volume.
+    final List<Exception> exceptions = new ArrayList<>();
+    Thread reconfigThread = new Thread() {
+      public void run() {
+        try {
+          barrier.await();
+          dn.reconfigurePropertyImpl(
+              DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs);
+        } catch (ReconfigurationException |
+            InterruptedException |
+            BrokenBarrierException e) {
+          exceptions.add(e);
+        }
+      }
+    };
+    reconfigThread.start();
 
+    barrier.await();
     rb.nextBytes(writeBuf);
     out.write(writeBuf);
     out.hflush();
@@ -614,5 +632,10 @@ public class TestDataNodeHotSwapVolumes {
     // Read the content back
     byte[] content = DFSTestUtil.readFileBuffer(fs, testFile);
     assertEquals(BLOCK_SIZE, content.length);
+
+    reconfigThread.join();
+    if (!exceptions.isEmpty()) {
+      throw new IOException(exceptions.get(0).getCause());
+    }
   }
 }

+ 8 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java

@@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
 import java.util.LinkedList;
 import java.util.List;
@@ -44,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
 import org.apache.hadoop.io.IOUtils;
@@ -539,7 +541,12 @@ public class TestDirectoryScanner {
     public String[] getBlockPoolList() {
       return new String[0];
     }
-    
+
+    @Override
+    public FsVolumeReference obtainReference() throws ClosedChannelException {
+      return null;
+    }
+
     @Override
     public long getAvailable() throws IOException {
       return 0;

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java

@@ -67,7 +67,7 @@ public class TestSimulatedFSDataset {
       // we pass expected len as zero, - fsdataset should use the sizeof actual
       // data written
       ReplicaInPipelineInterface bInfo = fsdataset.createRbw(
-          StorageType.DEFAULT, b, false);
+          StorageType.DEFAULT, b, false).getReplica();
       ReplicaOutputStreams out = bInfo.createStreams(true,
           DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
       try {

+ 11 - 11
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java

@@ -144,25 +144,25 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   @Override
   public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
       long ckoff) throws IOException {
-    return new ReplicaInputStreams(FileDescriptor.in, FileDescriptor.in);
+    return new ReplicaInputStreams(FileDescriptor.in, FileDescriptor.in, null);
   }
 
   @Override
-  public ReplicaInPipelineInterface createTemporary(StorageType t, ExtendedBlock b)
+  public ReplicaHandler createTemporary(StorageType t, ExtendedBlock b)
       throws IOException {
-    return new ExternalReplicaInPipeline();
+    return new ReplicaHandler(new ExternalReplicaInPipeline(), null);
   }
 
   @Override
-  public ReplicaInPipelineInterface createRbw(StorageType t, ExtendedBlock b, boolean tf)
+  public ReplicaHandler createRbw(StorageType t, ExtendedBlock b, boolean tf)
       throws IOException {
-    return new ExternalReplicaInPipeline();
+    return new ReplicaHandler(new ExternalReplicaInPipeline(), null);
   }
 
   @Override
-  public ReplicaInPipelineInterface recoverRbw(ExtendedBlock b, long newGS,
+  public ReplicaHandler recoverRbw(ExtendedBlock b, long newGS,
       long minBytesRcvd, long maxBytesRcvd) throws IOException {
-    return new ExternalReplicaInPipeline();
+    return new ReplicaHandler(new ExternalReplicaInPipeline(), null);
   }
 
   @Override
@@ -172,15 +172,15 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   }
 
   @Override
-  public ReplicaInPipelineInterface append(ExtendedBlock b, long newGS,
+  public ReplicaHandler append(ExtendedBlock b, long newGS,
       long expectedBlockLen) throws IOException {
-    return new ExternalReplicaInPipeline();
+    return new ReplicaHandler(new ExternalReplicaInPipeline(), null);
   }
 
   @Override
-  public ReplicaInPipelineInterface recoverAppend(ExtendedBlock b, long newGS,
+  public ReplicaHandler recoverAppend(ExtendedBlock b, long newGS,
       long expectedBlockLen) throws IOException {
-    return new ExternalReplicaInPipeline();
+    return new ReplicaHandler(new ExternalReplicaInPipeline(), null);
   }
 
   @Override

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java

@@ -20,10 +20,17 @@ package org.apache.hadoop.hdfs.server.datanode.extdataset;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+
 import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 
 public class ExternalVolumeImpl implements FsVolumeSpi {
+  @Override
+  public FsVolumeReference obtainReference() throws ClosedChannelException {
+    return null;
+  }
 
   @Override
   public String[] getBlockPoolList() {

+ 94 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeListTest.java

@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertNotEquals;
+import static org.mockito.Mockito.mock;
+
+public class FsVolumeListTest {
+
+  private final Configuration conf = new Configuration();
+  private VolumeChoosingPolicy<FsVolumeImpl> blockChooser =
+      new RoundRobinVolumeChoosingPolicy<>();
+  private FsDatasetImpl dataset = null;
+  private String baseDir;
+
+  @Before
+  public void setUp() {
+    dataset = mock(FsDatasetImpl.class);
+    baseDir = new FileSystemTestHelper().getTestRootDir();
+  }
+
+  @Test
+  public void testGetNextVolumeWithClosedVolume() throws IOException {
+    FsVolumeList volumeList = new FsVolumeList(0, blockChooser);
+    List<FsVolumeImpl> volumes = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      File curDir = new File(baseDir, "nextvolume-" + i);
+      curDir.mkdirs();
+      FsVolumeImpl volume = new FsVolumeImpl(dataset, "storage-id", curDir,
+          conf, StorageType.DEFAULT);
+      volume.setCapacityForTesting(1024 * 1024 * 1024);
+      volumes.add(volume);
+      volumeList.addVolume(volume);
+    }
+
+    // Close the second volume.
+    volumes.get(1).closeAndWait();
+    for (int i = 0; i < 10; i++) {
+      try (FsVolumeReference ref =
+          volumeList.getNextVolume(StorageType.DEFAULT, 128)) {
+        // volume No.2 will not be chosen.
+        assertNotEquals(ref.getVolume(), volumes.get(1));
+      }
+    }
+  }
+
+  @Test
+  public void testCheckDirsWithClosedVolume() throws IOException {
+    FsVolumeList volumeList = new FsVolumeList(0, blockChooser);
+    List<FsVolumeImpl> volumes = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      File curDir = new File(baseDir, "volume-" + i);
+      curDir.mkdirs();
+      FsVolumeImpl volume = new FsVolumeImpl(dataset, "storage-id", curDir,
+          conf, StorageType.DEFAULT);
+      volumes.add(volume);
+      volumeList.addVolume(volume);
+    }
+
+    // Close the 2nd volume.
+    volumes.get(1).closeAndWait();
+    // checkDirs() should ignore the 2nd volume since it is closed.
+    volumeList.checkDirs();
+  }
+}

+ 5 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.server.datanode.DNConf;
 import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@@ -160,14 +161,16 @@ public class TestFsDatasetImpl {
     assertEquals(actualVolumes, expectedVolumes);
   }
 
-  @Test
+  @Test(timeout = 30000)
   public void testRemoveVolumes() throws IOException {
     // Feed FsDataset with block metadata.
     final int NUM_BLOCKS = 100;
     for (int i = 0; i < NUM_BLOCKS; i++) {
       String bpid = BLOCK_POOL_IDS[NUM_BLOCKS % BLOCK_POOL_IDS.length];
       ExtendedBlock eb = new ExtendedBlock(bpid, i);
-      dataset.createRbw(StorageType.DEFAULT, eb, false);
+      try (ReplicaHandler replica =
+          dataset.createRbw(StorageType.DEFAULT, eb, false)) {
+      }
     }
     final String[] dataDirs =
         conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(",");

+ 7 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
@@ -148,7 +149,8 @@ public class TestWriteToReplica {
     };
     
     ReplicaMap replicasMap = dataSet.volumeMap;
-    FsVolumeImpl vol = dataSet.volumes.getNextVolume(StorageType.DEFAULT, 0);
+    FsVolumeImpl vol = (FsVolumeImpl) dataSet.volumes
+        .getNextVolume(StorageType.DEFAULT, 0).getVolume();
     ReplicaInfo replicaInfo = new FinalizedReplica(
         blocks[FINALIZED].getLocalBlock(), vol, vol.getCurrentDir().getParentFile());
     replicasMap.add(bpid, replicaInfo);
@@ -157,10 +159,10 @@ public class TestWriteToReplica {
     
     replicasMap.add(bpid, new ReplicaInPipeline(
         blocks[TEMPORARY].getBlockId(),
-        blocks[TEMPORARY].getGenerationStamp(), vol, 
+        blocks[TEMPORARY].getGenerationStamp(), vol,
         vol.createTmpFile(bpid, blocks[TEMPORARY].getLocalBlock()).getParentFile(), 0));
     
-    replicaInfo = new ReplicaBeingWritten(blocks[RBW].getLocalBlock(), vol, 
+    replicaInfo = new ReplicaBeingWritten(blocks[RBW].getLocalBlock(), vol,
         vol.createRbwFile(bpid, blocks[RBW].getLocalBlock()).getParentFile(), null);
     replicasMap.add(bpid, replicaInfo);
     replicaInfo.getBlockFile().createNewFile();
@@ -489,8 +491,8 @@ public class TestWriteToReplica {
     long newGenStamp = blocks[NON_EXISTENT].getGenerationStamp() * 10;
     blocks[NON_EXISTENT].setGenerationStamp(newGenStamp);
     try {
-      ReplicaInPipeline replicaInfo =
-                dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]);
+      ReplicaInPipelineInterface replicaInfo =
+          dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]).getReplica();
       Assert.assertTrue(replicaInfo.getGenerationStamp() == newGenStamp);
       Assert.assertTrue(
           replicaInfo.getBlockId() == blocks[NON_EXISTENT].getBlockId());