瀏覽代碼

HDFS-11337. (HDFS-10958 backport). Add instrumentation hooks around Datanode disk IO.

Arpit Agarwal 8 年之前
父節點
當前提交
954dae26cd
共有 36 個文件被更改,包括 1832 次插入439 次删除
  1. 7 35
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
  2. 12 17
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
  3. 27 0
      hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
  4. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  5. 6 8
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
  6. 7 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
  7. 107 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java
  8. 14 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  9. 5 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
  10. 6 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
  11. 13 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java
  12. 67 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java
  13. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
  14. 97 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java
  15. 1031 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
  16. 11 8
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
  17. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
  18. 31 45
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
  19. 3 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
  20. 7 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java
  21. 24 49
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
  22. 59 73
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
  23. 19 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
  24. 165 132
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
  25. 18 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
  26. 50 35
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
  27. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
  28. 17 7
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
  29. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
  30. 5 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
  31. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
  32. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
  33. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
  34. 6 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
  35. 3 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
  36. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java

+ 7 - 35
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java

@@ -742,49 +742,21 @@ public class NativeIO {
   }
 
   /**
-   * Create a FileInputStream that shares delete permission on the
-   * file opened, i.e. other process can delete the file the
-   * FileInputStream is reading. Only Windows implementation uses
-   * the native interface.
-   */
-  public static FileInputStream getShareDeleteFileInputStream(File f)
-      throws IOException {
-    if (!Shell.WINDOWS) {
-      // On Linux the default FileInputStream shares delete permission
-      // on the file opened.
-      //
-      return new FileInputStream(f);
-    } else {
-      // Use Windows native interface to create a FileInputStream that
-      // shares delete permission on the file opened.
-      //
-      FileDescriptor fd = Windows.createFile(
-          f.getAbsolutePath(),
-          Windows.GENERIC_READ,
-          Windows.FILE_SHARE_READ |
-              Windows.FILE_SHARE_WRITE |
-              Windows.FILE_SHARE_DELETE,
-          Windows.OPEN_EXISTING);
-      return new FileInputStream(fd);
-    }
-  }
-
-  /**
-   * Create a FileInputStream that shares delete permission on the
+   * Create a FileDescriptor that shares delete permission on the
    * file opened at a given offset, i.e. other process can delete
-   * the file the FileInputStream is reading. Only Windows implementation
+   * the file the FileDescriptor is reading. Only Windows implementation
    * uses the native interface.
    */
-  public static FileInputStream getShareDeleteFileInputStream(File f, long seekOffset)
-      throws IOException {
+  public static FileDescriptor getShareDeleteFileDescriptor(
+      File f, long seekOffset) throws IOException {
     if (!Shell.WINDOWS) {
       RandomAccessFile rf = new RandomAccessFile(f, "r");
       if (seekOffset > 0) {
         rf.seek(seekOffset);
       }
-      return new FileInputStream(rf.getFD());
+      return rf.getFD();
     } else {
-      // Use Windows native interface to create a FileInputStream that
+      // Use Windows native interface to create a FileDescriptor that
       // shares delete permission on the file opened, and set it to the
       // given offset.
       //
@@ -797,7 +769,7 @@ public class NativeIO {
           NativeIO.Windows.OPEN_EXISTING);
       if (seekOffset > 0)
         NativeIO.Windows.setFilePointer(fd, seekOffset, NativeIO.Windows.FILE_BEGIN);
-      return new FileInputStream(fd);
+      return fd;
     }
   }
 

+ 12 - 17
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java

@@ -31,7 +31,6 @@ import java.nio.channels.FileChannel;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DataChecksum;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -79,18 +78,15 @@ public class BlockMetadataHeader {
 
   /**
    * Read the checksum header from the meta file.
+   * inputStream must be closed by the caller.
    * @return the data checksum obtained from the header.
    */
-  public static DataChecksum readDataChecksum(File metaFile, int bufSize)
+  public static DataChecksum readDataChecksum(
+      FileInputStream inputStream, int bufSize, File metaFile)
       throws IOException {
-    DataInputStream in = null;
-    try {
-      in = new DataInputStream(new BufferedInputStream(
-        new FileInputStream(metaFile), bufSize));
-      return readDataChecksum(in, metaFile);
-    } finally {
-      IOUtils.closeStream(in);
-    }
+    DataInputStream in = new DataInputStream(new BufferedInputStream(
+        inputStream, bufSize));
+    return readDataChecksum(in, metaFile);
   }
 
   /**
@@ -111,6 +107,7 @@ public class BlockMetadataHeader {
 
   /**
    * Read the header without changing the position of the FileChannel.
+   * This is used by the client for short-circuit reads.
    *
    * @param fc The FileChannel to read.
    * @return the Metadata Header.
@@ -144,18 +141,16 @@ public class BlockMetadataHeader {
 
   /**
    * Reads header at the top of metadata file and returns the header.
+   * Closes the input stream after reading the header.
    *
    * @return metadata header for the block
    * @throws IOException
    */
-  public static BlockMetadataHeader readHeader(File file) throws IOException {
-    DataInputStream in = null;
-    try {
-      in = new DataInputStream(new BufferedInputStream(
-                               new FileInputStream(file)));
+  public static BlockMetadataHeader readHeader(
+      FileInputStream fis) throws IOException {
+    try (DataInputStream in = new DataInputStream(
+        new BufferedInputStream(fis))) {
       return readHeader(in);
-    } finally {
-      IOUtils.closeStream(in);
     }
   }
 

+ 27 - 0
hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml

@@ -73,6 +73,33 @@
        <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
      </Match>
 
+     <!--
+       This class exposes stream constructors. The newly created streams are not
+       supposed to be closed in the constructor. Ignore the OBL warning.
+     -->
+     <Match>
+       <Class name="org.apache.hadoop.hdfs.server.datanode.FileIoProvider$WrappedFileOutputStream" />
+       <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
+     </Match>
+
+     <!--
+       This class exposes stream constructors. The newly created streams are not
+       supposed to be closed in the constructor. Ignore the OBL warning.
+     -->
+     <Match>
+       <Class name="org.apache.hadoop.hdfs.server.datanode.FileIoProvider$WrappedFileInputStream" />
+       <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
+     </Match>
+
+     <!--
+       This class exposes stream constructors. The newly created streams are not
+       supposed to be closed in the constructor. Ignore the OBL warning.
+     -->
+     <Match>
+       <Class name="org.apache.hadoop.hdfs.server.datanode.FileIoProvider$WrappedRandomAccessFile" />
+       <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
+     </Match>
+
      <!--
       lastAppliedTxid is carefully unsynchronized in the BackupNode in a couple spots.
       See the comments in BackupImage for justification.

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -646,6 +646,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
   public static final String  DFS_DATANODE_FSDATASET_FACTORY_KEY = "dfs.datanode.fsdataset.factory";
   public static final String  DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY = "dfs.datanode.fsdataset.volume.choosing.policy";
+  public static final String DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY =
+      "dfs.datanode.fileio.events.class";
   public static final String  DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_KEY = "dfs.datanode.available-space-volume-choosing-policy.balanced-space-threshold";
   public static final long    DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT = 1024L * 1024L * 1024L * 10L; // 10 GB
   public static final String  DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY = "dfs.datanode.available-space-volume-choosing-policy.balanced-space-preference-fraction";

+ 6 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -244,8 +244,7 @@ class BlockReceiver implements Closeable {
       
       final boolean isCreate = isDatanode || isTransfer 
           || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
-      streams = replicaInfo.createStreams(isCreate, requestedChecksum,
-          datanodeSlowLogThresholdMs);
+      streams = replicaInfo.createStreams(isCreate, requestedChecksum);
       assert streams != null : "null streams!";
 
       // read checksum meta information
@@ -400,9 +399,8 @@ class BlockReceiver implements Closeable {
       checksumOut.flush();
       long flushEndNanos = System.nanoTime();
       if (isSync) {
-        long fsyncStartNanos = flushEndNanos;
         streams.syncChecksumOut();
-        datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
+        datanode.metrics.addFsyncNanos(System.nanoTime() - flushEndNanos);
       }
       flushTotalNanos += flushEndNanos - flushStartNanos;
     }
@@ -703,8 +701,10 @@ class BlockReceiver implements Closeable {
           int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
           
           // Write data to disk.
-          long duration = streams.writeToDisk(dataBuf.array(),
+          long begin = Time.monotonicNow();
+          streams.writeDataToDisk(dataBuf.array(),
               startByteToDisk, numBytesToDisk);
+          long duration = Time.monotonicNow() - begin;
 
           if (duration > maxWriteToDiskMs) {
             maxWriteToDiskMs = duration;
@@ -1029,9 +1029,7 @@ class BlockReceiver implements Closeable {
    * will be overwritten.
    */
   private void adjustCrcFilePosition() throws IOException {
-    if (streams.getDataOut() != null) {
-      streams.flushDataOut();
-    }
+    streams.flushDataOut();
     if (checksumOut != null) {
       checksumOut.flush();
     }

+ 7 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

@@ -166,6 +166,7 @@ class BlockSender implements java.io.Closeable {
   private final boolean dropCacheBehindAllReads;
   
   private long lastCacheDropOffset;
+  private final FileIoProvider fileIoProvider;
   
   @VisibleForTesting
   static long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
@@ -197,6 +198,7 @@ class BlockSender implements java.io.Closeable {
     InputStream blockIn = null;
     DataInputStream checksumIn = null;
     FsVolumeReference volumeRef = null;
+    this.fileIoProvider = datanode.getFileIoProvider();
     try {
       this.block = block;
       this.corruptChecksumOk = corruptChecksumOk;
@@ -411,7 +413,8 @@ class BlockSender implements java.io.Closeable {
         DataNode.LOG.debug("replica=" + replica);
       }
       blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
-      ris = new ReplicaInputStreams(blockIn, checksumIn, volumeRef);
+      ris = new ReplicaInputStreams(
+          blockIn, checksumIn, volumeRef, fileIoProvider);
     } catch (IOException ioe) {
       IOUtils.closeStream(this);
       org.apache.commons.io.IOUtils.closeQuietly(blockIn);
@@ -579,8 +582,9 @@ class BlockSender implements java.io.Closeable {
         FileChannel fileCh = ((FileInputStream)ris.getDataIn()).getChannel();
         LongWritable waitTime = new LongWritable();
         LongWritable transferTime = new LongWritable();
-        sockOut.transferToFully(fileCh, blockInPosition, dataLen,
-            waitTime, transferTime);
+        fileIoProvider.transferToSocketFully(
+            ris.getVolumeRef().getVolume(), sockOut, fileCh, blockInPosition,
+            dataLen, waitTime, transferTime);
         datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
         datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
         blockInPosition += dataLen;

+ 107 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java

@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * {@link FileIoEvents} that simply counts the number of operations.
+ * Not meant to be used outside of testing.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class CountingFileIoEvents implements FileIoEvents {
+  private final Map<OPERATION, Counts> counts;
+
+  private static class Counts {
+    private final AtomicLong successes = new AtomicLong(0);
+    private final AtomicLong failures = new AtomicLong(0);
+
+    @JsonProperty("Successes")
+    public long getSuccesses() {
+      return successes.get();
+    }
+
+    @JsonProperty("Failures")
+    public long getFailures() {
+      return failures.get();
+    }
+  }
+
+  public CountingFileIoEvents() {
+    counts = new HashMap<>();
+    for (OPERATION op : OPERATION.values()) {
+      counts.put(op, new Counts());
+    }
+  }
+
+  @Override
+  public long beforeMetadataOp(
+      @Nullable FsVolumeSpi volume, OPERATION op) {
+    return 0;
+  }
+
+  @Override
+  public void afterMetadataOp(
+      @Nullable FsVolumeSpi volume, OPERATION op, long begin) {
+    counts.get(op).successes.incrementAndGet();
+  }
+
+  @Override
+  public long beforeFileIo(
+      @Nullable FsVolumeSpi volume, OPERATION op, long len) {
+    return 0;
+  }
+
+  @Override
+  public void afterFileIo(
+      @Nullable FsVolumeSpi volume, OPERATION op, long begin, long len) {
+    counts.get(op).successes.incrementAndGet();
+  }
+
+  @Override
+  public void onFailure(
+      @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) {
+    counts.get(op).failures.incrementAndGet();
+
+  }
+
+  @Override
+  public String getStatistics() {
+    ObjectMapper objectMapper = new ObjectMapper();
+    try {
+      return objectMapper.writeValueAsString(counts);
+    } catch (JsonProcessingException e) {
+      // Failed to serialize. Don't log the exception call stack.
+      FileIoProvider.LOG.error("Failed to serialize statistics" + e);
+      return null;
+    }
+  }
+}

+ 14 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -300,6 +300,7 @@ public class DataNode extends ReconfigurableBase
   public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
 
   private static final String DATANODE_HTRACE_PREFIX = "datanode.htrace.";
+  private final FileIoProvider fileIoProvider;
 
   /**
    * Use {@link NetUtils#createSocketAddr(String)} instead.
@@ -409,6 +410,7 @@ public class DataNode extends ReconfigurableBase
     this.tracer = createTracer(conf);
     this.tracerConfigurationManager =
         new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf);
+    this.fileIoProvider = new FileIoProvider(conf);
     this.fileDescriptorPassingDisabledReason = null;
     this.maxNumberOfBlocksToLog = 0;
     this.confVersion = null;
@@ -434,6 +436,7 @@ public class DataNode extends ReconfigurableBase
     this.tracer = createTracer(conf);
     this.tracerConfigurationManager =
         new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf);
+    this.fileIoProvider = new FileIoProvider(conf);
     this.blockScanner = new BlockScanner(this);
     this.lastDiskErrorCheck = 0;
     this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
@@ -615,6 +618,10 @@ public class DataNode extends ReconfigurableBase
         PipelineAck.ECN.SUPPORTED;
   }
 
+  public FileIoProvider getFileIoProvider() {
+    return fileIoProvider;
+  }
+
   /**
    * Contains the StorageLocations for changed data volumes.
    */
@@ -2943,7 +2950,13 @@ public class DataNode extends ReconfigurableBase
   public synchronized String getClusterId() {
     return clusterId;
   }
-  
+
+
+  @Override // DataNodeMXBean
+  public String getFileIoProviderStatistics() {
+    return fileIoProvider.getStatistics();
+  }
+
   public void refreshNamenodes(Configuration conf) throws IOException {
     blockPoolManager.refreshNamenodes(conf);
   }

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java

@@ -105,4 +105,9 @@ public interface DataNodeMXBean {
    * Gets the network error counts on a per-Datanode basis.
    */
   public Map<String, Map<String, Long>> getDatanodeNetworkCounts();
+
+  /**
+   * Gets the {@link FileIoProvider} statistics.
+   */
+  String getFileIoProviderStatistics();
 }

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java

@@ -1395,6 +1395,12 @@ public class DataStorage extends Storage {
     bpStorageMap.remove(bpId);
   }
 
+  /**
+   * Prefer FileIoProvider#fullydelete.
+   * @param dir
+   * @return
+   */
+  @Deprecated
   public static boolean fullyDelete(final File dir) {
     boolean result = FileUtil.fullyDelete(dir);
     return result;

+ 13 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 
 /** Provide utility methods for Datanode. */
@@ -55,15 +56,17 @@ public class DatanodeUtil {
    * @throws IOException 
    * if the file already exists or if the file cannot be created.
    */
-  public static File createTmpFile(Block b, File f) throws IOException {
-    if (f.exists()) {
+  public static File createFileWithExistsCheck(
+      FsVolumeSpi volume, Block b, File f,
+      FileIoProvider fileIoProvider) throws IOException {
+    if (fileIoProvider.exists(volume, f)) {
       throw new IOException("Failed to create temporary file for " + b
           + ".  File " + f + " should not be present, but is.");
     }
     // Create the zero-length temp file
     final boolean fileCreated;
     try {
-      fileCreated = f.createNewFile();
+      fileCreated = fileIoProvider.createFile(volume, f);
     } catch (IOException ioe) {
       throw new IOException(DISK_ERROR + "Failed to create " + f, ioe);
     }
@@ -92,13 +95,17 @@ public class DatanodeUtil {
    * @return true if there are no files
    * @throws IOException if unable to list subdirectories
    */
-  public static boolean dirNoFilesRecursive(File dir) throws IOException {
-    File[] contents = dir.listFiles();
+  public static boolean dirNoFilesRecursive(
+      FsVolumeSpi volume, File dir,
+      FileIoProvider fileIoProvider) throws IOException {
+    File[] contents = fileIoProvider.listFiles(volume, dir);
     if (contents == null) {
       throw new IOException("Cannot list contents of " + dir);
     }
     for (File f : contents) {
-      if (!f.isDirectory() || (f.isDirectory() && !dirNoFilesRecursive(f))) {
+      if (!f.isDirectory() ||
+          (f.isDirectory() && !dirNoFilesRecursive(
+              volume, f, fileIoProvider))) {
         return false;
       }
     }

+ 67 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java

@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+
+import javax.annotation.Nullable;
+
+/**
+ * The default implementation of {@link FileIoEvents} that do nothing.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class DefaultFileIoEvents implements FileIoEvents {
+  @Override
+  public long beforeMetadataOp(
+      @Nullable FsVolumeSpi volume, OPERATION op) {
+    return 0;
+  }
+
+  @Override
+  public void afterMetadataOp(
+      @Nullable FsVolumeSpi volume, OPERATION op, long begin) {
+  }
+
+  @Override
+  public long beforeFileIo(
+      @Nullable FsVolumeSpi volume, OPERATION op, long len) {
+    return 0;
+  }
+
+  @Override
+  public void afterFileIo(
+      @Nullable FsVolumeSpi volume, OPERATION op, long begin, long len) {
+  }
+
+  @Override
+  public void onFailure(
+      @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) {
+  }
+
+  @Override
+  public @Nullable String getStatistics() {
+    // null is valid JSON.
+    return null;
+  }
+}

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java

@@ -50,7 +50,6 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StopWatch;
@@ -847,10 +846,12 @@ public class DirectoryScanner implements Runnable {
         throws InterruptedException {
 
       throttle();
+      final FileIoProvider fileIoProvider = datanode.getFileIoProvider();
 
       List <String> fileNames;
       try {
-        fileNames = IOUtils.listDirectory(dir, BlockDirFilter.INSTANCE);
+        fileNames = fileIoProvider.listDirectory(
+            volume, dir, BlockDirFilter.INSTANCE);
       } catch (IOException ioe) {
         LOG.warn("Exception occured while compiling report: ", ioe);
         // Initiate a check on disk failure.

+ 97 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java

@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+
+import javax.annotation.Nullable;
+
+/**
+ * The following hooks can be implemented for instrumentation/fault
+ * injection.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface FileIoEvents {
+
+  /**
+   * Invoked before a filesystem metadata operation.
+   *
+   * @param volume  target volume for the operation. Null if unavailable.
+   * @param op  type of operation.
+   * @return  timestamp at which the operation was started. 0 if
+   *          unavailable.
+   */
+  long beforeMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op);
+
+  /**
+   * Invoked after a filesystem metadata operation has completed.
+   *
+   * @param volume  target volume for the operation.  Null if unavailable.
+   * @param op  type of operation.
+   * @param begin  timestamp at which the operation was started. 0
+   *               if unavailable.
+   */
+  void afterMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op, long begin);
+
+  /**
+   * Invoked before a read/write/flush/channel transfer operation.
+   *
+   * @param volume  target volume for the operation. Null if unavailable.
+   * @param op  type of operation.
+   * @param len  length of the file IO. 0 for flush.
+   * @return  timestamp at which the operation was started. 0 if
+   *          unavailable.
+   */
+  long beforeFileIo(@Nullable FsVolumeSpi volume, OPERATION op, long len);
+
+
+  /**
+   * Invoked after a read/write/flush/channel transfer operation
+   * has completed.
+   *
+   * @param volume  target volume for the operation. Null if unavailable.
+   * @param op  type of operation.
+   * @param len   of the file IO. 0 for flush.
+   * @return  timestamp at which the operation was started. 0 if
+   *          unavailable.
+   */
+  void afterFileIo(@Nullable FsVolumeSpi volume, OPERATION op,
+                   long begin, long len);
+
+  /**
+   * Invoked if an operation fails with an exception.
+   *  @param volume  target volume for the operation. Null if unavailable.
+   * @param op  type of operation.
+   * @param e  Exception encountered during the operation.
+   * @param begin  time at which the operation was started.
+   */
+  void onFailure(
+      @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin);
+
+  /**
+   * Return statistics as a JSON string.
+   * @return
+   */
+  @Nullable String getStatistics();
+}

+ 1031 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java

@@ -0,0 +1,1031 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.HardLink;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIOException;
+import org.apache.hadoop.net.SocketOutputStream;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.Flushable;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.file.CopyOption;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION.*;
+
+/**
+ * This class abstracts out various file IO operations performed by the
+ * DataNode and invokes event hooks before and after each file IO.
+ *
+ * Behavior can be injected into these events by implementing
+ * {@link FileIoEvents} and replacing the default implementation
+ * with {@link DFSConfigKeys#DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY}.
+ *
+ * Most functions accept an optional {@link FsVolumeSpi} parameter for
+ * instrumentation/logging.
+ *
+ * Some methods may look redundant, especially the multiple variations of
+ * move/rename/list. They exist to retain behavior compatibility for existing
+ * code.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class FileIoProvider {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      FileIoProvider.class);
+
+  private final FileIoEvents eventHooks;
+
+  /**
+   * @param conf  Configuration object. May be null. When null,
+   *              the event handlers are no-ops.
+   */
+  public FileIoProvider(@Nullable Configuration conf) {
+    if (conf != null) {
+      final Class<? extends FileIoEvents> clazz = conf.getClass(
+          DFSConfigKeys.DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY,
+          DefaultFileIoEvents.class,
+          FileIoEvents.class);
+      eventHooks = ReflectionUtils.newInstance(clazz, conf);
+    } else {
+      eventHooks = new DefaultFileIoEvents();
+    }
+  }
+
+  /**
+   * Lists the types of file system operations. Passed to the
+   * IO hooks so implementations can choose behavior based on
+   * specific operations.
+   */
+  public enum OPERATION {
+    OPEN,
+    EXISTS,
+    LIST,
+    DELETE,
+    MOVE,
+    MKDIRS,
+    TRANSFER,
+    SYNC,
+    FADVISE,
+    READ,
+    WRITE,
+    FLUSH,
+    NATIVE_COPY
+  }
+
+  /**
+   * Retrieve statistics from the underlying {@link FileIoEvents}
+   * implementation as a JSON string, if it maintains them.
+   * @return statistics as a JSON string. May be null.
+   */
+  public @Nullable String getStatistics() {
+    return eventHooks.getStatistics();
+  }
+
+  /**
+   * See {@link Flushable#flush()}.
+   *
+   * @param  volume target volume. null if unavailable.
+   * @throws IOException
+   */
+  public void flush(
+      @Nullable FsVolumeSpi volume, Flushable f) throws IOException {
+    final long begin = eventHooks.beforeFileIo(volume, FLUSH, 0);
+    try {
+      f.flush();
+      eventHooks.afterFileIo(volume, FLUSH, begin, 0);
+    } catch (Exception e) {
+      eventHooks.onFailure(volume, FLUSH, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Sync the given {@link FileOutputStream}.
+   *
+   * @param  volume target volume. null if unavailable.
+   * @throws IOException
+   */
+  public void sync(
+      @Nullable FsVolumeSpi volume, FileOutputStream fos) throws IOException {
+    final long begin = eventHooks.beforeFileIo(volume, SYNC, 0);
+    try {
+      fos.getChannel().force(true);
+      eventHooks.afterFileIo(volume, SYNC, begin, 0);
+    } catch (Exception e) {
+      eventHooks.onFailure(volume, SYNC, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Call sync_file_range on the given file descriptor.
+   *
+   * @param  volume target volume. null if unavailable.
+   * @throws IOException
+   */
+  public void syncFileRange(
+      @Nullable FsVolumeSpi volume, FileDescriptor outFd,
+      long offset, long numBytes, int flags) throws NativeIOException {
+    final long begin = eventHooks.beforeFileIo(volume, SYNC, 0);
+    try {
+      NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, numBytes, flags);
+      eventHooks.afterFileIo(volume, SYNC, begin, 0);
+    } catch (Exception e) {
+      eventHooks.onFailure(volume, SYNC, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Call posix_fadvise on the given file descriptor.
+   *
+   * @param  volume target volume. null if unavailable.
+   * @throws IOException
+   */
+  public void posixFadvise(
+      @Nullable FsVolumeSpi volume, String identifier, FileDescriptor outFd,
+      long offset, long length, int flags) throws NativeIOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, FADVISE);
+    try {
+      NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
+          identifier, outFd, offset, length, flags);
+      eventHooks.afterMetadataOp(volume, FADVISE, begin);
+    } catch (Exception e) {
+      eventHooks.onFailure(volume, FADVISE, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Delete a file.
+   * @param volume  target volume. null if unavailable.
+   * @param f  File to delete.
+   * @return  true if the file was successfully deleted.
+   */
+  public boolean delete(@Nullable FsVolumeSpi volume, File f) {
+    final long begin = eventHooks.beforeMetadataOp(volume, DELETE);
+    try {
+      boolean deleted = f.delete();
+      eventHooks.afterMetadataOp(volume, DELETE, begin);
+      return deleted;
+    } catch (Exception e) {
+      eventHooks.onFailure(volume, DELETE, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Delete a file, first checking to see if it exists.
+   * @param volume  target volume. null if unavailable.
+   * @param f  File to delete
+   * @return  true if the file was successfully deleted or if it never
+   *          existed.
+   */
+  public boolean deleteWithExistsCheck(@Nullable FsVolumeSpi volume, File f) {
+    final long begin = eventHooks.beforeMetadataOp(volume, DELETE);
+    try {
+      boolean deleted = !f.exists() || f.delete();
+      eventHooks.afterMetadataOp(volume, DELETE, begin);
+      if (!deleted) {
+        LOG.warn("Failed to delete file {}", f);
+      }
+      return deleted;
+    } catch (Exception e) {
+      eventHooks.onFailure(volume, DELETE, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Transfer data from a FileChannel to a SocketOutputStream.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param sockOut  SocketOutputStream to write the data.
+   * @param fileCh  FileChannel from which to read data.
+   * @param position  position within the channel where the transfer begins.
+   * @param count  number of bytes to transfer.
+   * @param waitTime  returns the nanoseconds spent waiting for the socket
+   *                  to become writable.
+   * @param transferTime  returns the nanoseconds spent transferring data.
+   * @throws IOException
+   */
+  public void transferToSocketFully(
+      @Nullable FsVolumeSpi volume, SocketOutputStream sockOut,
+      FileChannel fileCh, long position, int count,
+      LongWritable waitTime, LongWritable transferTime) throws IOException {
+    final long begin = eventHooks.beforeFileIo(volume, TRANSFER, count);
+    try {
+      sockOut.transferToFully(fileCh, position, count,
+          waitTime, transferTime);
+      eventHooks.afterFileIo(volume, TRANSFER, begin, count);
+    } catch (Exception e) {
+      eventHooks.onFailure(volume, TRANSFER, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Create a file.
+   * @param volume  target volume. null if unavailable.
+   * @param f  File to be created.
+   * @return  true if the file does not exist and was successfully created.
+   *          false if the file already exists.
+   * @throws IOException
+   */
+  public boolean createFile(
+      @Nullable FsVolumeSpi volume, File f) throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+    try {
+      boolean created = f.createNewFile();
+      eventHooks.afterMetadataOp(volume, OPEN, begin);
+      return created;
+    } catch (Exception e) {
+      eventHooks.onFailure(volume, OPEN, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Create a FileInputStream using
+   * {@link FileInputStream#FileInputStream(File)}.
+   *
+   * Wraps the created input stream to intercept read calls
+   * before delegating to the wrapped stream.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param f  File object.
+   * @return  FileInputStream to the given file.
+   * @throws  FileNotFoundException
+   */
+  public FileInputStream getFileInputStream(
+      @Nullable FsVolumeSpi volume, File f) throws FileNotFoundException {
+    final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+    FileInputStream fis = null;
+    try {
+      fis = new WrappedFileInputStream(volume, f);
+      eventHooks.afterMetadataOp(volume, OPEN, begin);
+      return fis;
+    } catch(Exception e) {
+      org.apache.commons.io.IOUtils.closeQuietly(fis);
+      eventHooks.onFailure(volume, OPEN, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Create a FileOutputStream using
+   * {@link FileOutputStream#FileOutputStream(File, boolean)}.
+   *
+   * Wraps the created output stream to intercept write calls
+   * before delegating to the wrapped stream.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param f  File object.
+   * @param append  if true, then bytes will be written to the end of the
+   *                file rather than the beginning.
+   * @param  FileOutputStream to the given file object.
+   * @throws FileNotFoundException
+   */
+  public FileOutputStream getFileOutputStream(
+      @Nullable FsVolumeSpi volume, File f,
+      boolean append) throws FileNotFoundException {
+    final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+    FileOutputStream fos = null;
+    try {
+      fos = new WrappedFileOutputStream(volume, f, append);
+      eventHooks.afterMetadataOp(volume, OPEN, begin);
+      return fos;
+    } catch(Exception e) {
+      org.apache.commons.io.IOUtils.closeQuietly(fos);
+      eventHooks.onFailure(volume, OPEN, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Create a FileOutputStream using
+   * {@link FileOutputStream#FileOutputStream(File, boolean)}.
+   *
+   * Wraps the created output stream to intercept write calls
+   * before delegating to the wrapped stream.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param f  File object.
+   * @return  FileOutputStream to the given file object.
+   * @throws  FileNotFoundException
+   */
+  public FileOutputStream getFileOutputStream(
+      @Nullable FsVolumeSpi volume, File f) throws FileNotFoundException {
+    return getFileOutputStream(volume, f, false);
+  }
+
+  /**
+   * Create a FileOutputStream using
+   * {@link FileOutputStream#FileOutputStream(FileDescriptor)}.
+   *
+   * Wraps the created output stream to intercept write calls
+   * before delegating to the wrapped stream.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param f  File object.
+   * @return  FileOutputStream to the given file object.
+   * @throws  FileNotFoundException
+   */
+  public FileOutputStream getFileOutputStream(
+      @Nullable FsVolumeSpi volume, FileDescriptor fd) {
+    return new WrappedFileOutputStream(volume, fd);
+  }
+
+  /**
+   * Create a FileInputStream using
+   * {@link NativeIO#getShareDeleteFileDescriptor}.
+   * Wraps the created input stream to intercept input calls
+   * before delegating to the wrapped stream.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param f  File object.
+   * @param offset  the offset position, measured in bytes from the
+   *                beginning of the file, at which to set the file
+   *                pointer.
+   * @return FileOutputStream to the given file object.
+   * @throws FileNotFoundException
+   */
+  public FileInputStream getShareDeleteFileInputStream(
+      @Nullable FsVolumeSpi volume, File f,
+      long offset) throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+    FileInputStream fis = null;
+    try {
+      fis = new WrappedFileInputStream(volume,
+          NativeIO.getShareDeleteFileDescriptor(f, offset));
+      eventHooks.afterMetadataOp(volume, OPEN, begin);
+      return fis;
+    } catch(Exception e) {
+      org.apache.commons.io.IOUtils.closeQuietly(fis);
+      eventHooks.onFailure(volume, OPEN, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Create a FileInputStream using
+   * {@link FileInputStream#FileInputStream(File)} and position
+   * it at the given offset.
+   *
+   * Wraps the created input stream to intercept read calls
+   * before delegating to the wrapped stream.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param f  File object.
+   * @param offset  the offset position, measured in bytes from the
+   *                beginning of the file, at which to set the file
+   *                pointer.
+   * @throws FileNotFoundException
+   */
+  public FileInputStream openAndSeek(
+      @Nullable FsVolumeSpi volume, File f, long offset) throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+    FileInputStream fis = null;
+    try {
+      fis = new WrappedFileInputStream(volume,
+          FsDatasetUtil.openAndSeek(f, offset));
+      eventHooks.afterMetadataOp(volume, OPEN, begin);
+      return fis;
+    } catch(Exception e) {
+      org.apache.commons.io.IOUtils.closeQuietly(fis);
+      eventHooks.onFailure(volume, OPEN, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Create a RandomAccessFile using
+   * {@link RandomAccessFile#RandomAccessFile(File, String)}.
+   *
+   * Wraps the created input stream to intercept IO calls
+   * before delegating to the wrapped RandomAccessFile.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param f  File object.
+   * @param mode  See {@link RandomAccessFile} for a description
+   *              of the mode string.
+   * @return RandomAccessFile representing the given file.
+   * @throws FileNotFoundException
+   */
+  public RandomAccessFile getRandomAccessFile(
+      @Nullable FsVolumeSpi volume, File f,
+      String mode) throws FileNotFoundException {
+    final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+    RandomAccessFile raf = null;
+    try {
+      raf = new WrappedRandomAccessFile(volume, f, mode);
+      eventHooks.afterMetadataOp(volume, OPEN, begin);
+      return raf;
+    } catch(Exception e) {
+      org.apache.commons.io.IOUtils.closeQuietly(raf);
+      eventHooks.onFailure(volume, OPEN, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Delete the given directory using {@link FileUtil#fullyDelete(File)}.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param dir  directory to be deleted.
+   * @return true on success false on failure.
+   */
+  public boolean fullyDelete(@Nullable FsVolumeSpi volume, File dir) {
+    final long begin = eventHooks.beforeMetadataOp(volume, DELETE);
+    try {
+      boolean deleted = FileUtil.fullyDelete(dir);
+      eventHooks.afterMetadataOp(volume, DELETE, begin);
+      return deleted;
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, DELETE, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Move the src file to the target using
+   * {@link FileUtil#replaceFile(File, File)}.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param src  source path.
+   * @param target  target path.
+   * @throws IOException
+   */
+  public void replaceFile(
+      @Nullable FsVolumeSpi volume, File src, File target) throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, MOVE);
+    try {
+      FileUtil.replaceFile(src, target);
+      eventHooks.afterMetadataOp(volume, MOVE, begin);
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, MOVE, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Move the src file to the target using
+   * {@link Storage#rename(File, File)}.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param src  source path.
+   * @param target  target path.
+   * @throws IOException
+   */
+  public void rename(
+      @Nullable FsVolumeSpi volume, File src, File target)
+      throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, MOVE);
+    try {
+      Storage.rename(src, target);
+      eventHooks.afterMetadataOp(volume, MOVE, begin);
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, MOVE, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Move the src file to the target using
+   * {@link FileUtils#moveFile(File, File)}.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param src  source path.
+   * @param target  target path.
+   * @throws IOException
+   */
+  public void moveFile(
+      @Nullable FsVolumeSpi volume, File src, File target)
+      throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, MOVE);
+    try {
+      FileUtils.moveFile(src, target);
+      eventHooks.afterMetadataOp(volume, MOVE, begin);
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, MOVE, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Move the src file to the target using
+   * {@link Files#move(Path, Path, CopyOption...)}.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param src  source path.
+   * @param target  target path.
+   * @param options  See {@link Files#move} for a description
+   *                of the options.
+   * @throws IOException
+   */
+  public void move(
+      @Nullable FsVolumeSpi volume, Path src, Path target,
+      CopyOption... options) throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, MOVE);
+    try {
+      Files.move(src, target, options);
+      eventHooks.afterMetadataOp(volume, MOVE, begin);
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, MOVE, e, begin);
+      throw e;
+    }
+  }
+
+
+  /**
+   * Move the src file to the target using
+   * {@link NativeIO#renameTo(File, File)}.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param src  source file.
+   * @param target  target file.
+   * @param options  See {@link Files#move} for a description
+   *                of the options.
+   * @throws IOException
+   */
+  public void renameTo(
+      @Nullable FsVolumeSpi volume, File src, File target)
+      throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, MOVE);
+    try {
+      NativeIO.renameTo(src, target);
+      eventHooks.afterMetadataOp(volume, MOVE, begin);
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, MOVE, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * See {@link Storage#nativeCopyFileUnbuffered(File, File, boolean)}.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param src  an existing file to copy, must not be {@code null}
+   * @param target  the new file, must not be {@code null}
+   * @param preserveFileDate  true if the file date of the copy
+   *                         should be the same as the original
+   * @throws IOException
+   */
+  public void nativeCopyFileUnbuffered(
+      @Nullable FsVolumeSpi volume, File src, File target,
+      boolean preserveFileDate) throws IOException {
+    final long length = src.length();
+    final long begin = eventHooks.beforeFileIo(volume, NATIVE_COPY, length);
+    try {
+      Storage.nativeCopyFileUnbuffered(src, target, preserveFileDate);
+      eventHooks.afterFileIo(volume, NATIVE_COPY, begin, length);
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, NATIVE_COPY, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * See {@link File#mkdirs()}.
+   *
+   * @param volume target volume. null if unavailable.
+   * @param dir  directory to be created.
+   * @return  true only if the directory was created. false if
+   *          the directory already exists.
+   * @throws IOException if a directory with the given name does
+   *                     not exist and could not be created.
+   */
+  public boolean mkdirs(
+      @Nullable FsVolumeSpi volume, File dir) throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, MKDIRS);
+    boolean created = false;
+    boolean isDirectory;
+    try {
+      created = dir.mkdirs();
+      isDirectory = !created && dir.isDirectory();
+      eventHooks.afterMetadataOp(volume, MKDIRS, begin);
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, MKDIRS, e, begin);
+      throw e;
+    }
+
+    if (!created && !isDirectory) {
+      throw new IOException("Mkdirs failed to create " + dir);
+    }
+    return created;
+  }
+
+  /**
+   * Create the target directory using {@link File#mkdirs()} only if
+   * it doesn't exist already.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param dir  directory to be created.
+   * @throws IOException  if the directory could not created
+   */
+  public void mkdirsWithExistsCheck(
+      @Nullable FsVolumeSpi volume, File dir) throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, MKDIRS);
+    boolean succeeded = false;
+    try {
+      succeeded = dir.isDirectory() || dir.mkdirs();
+      eventHooks.afterMetadataOp(volume, MKDIRS, begin);
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, MKDIRS, e, begin);
+      throw e;
+    }
+
+    if (!succeeded) {
+      throw new IOException("Mkdirs failed to create " + dir);
+    }
+  }
+
+  /**
+   * Get a listing of the given directory using
+   * {@link FileUtil#listFiles(File)}.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param dir  Directory to be listed.
+   * @return  array of file objects representing the directory entries.
+   * @throws IOException
+   */
+  public File[] listFiles(
+      @Nullable FsVolumeSpi volume, File dir) throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, LIST);
+    try {
+      File[] children = FileUtil.listFiles(dir);
+      eventHooks.afterMetadataOp(volume, LIST, begin);
+      return children;
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, LIST, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Get a listing of the given directory using
+   * {@link FileUtil#listFiles(File)}.
+   *
+   * @param volume  target volume. null if unavailable.
+   * @param   Driectory to be listed.
+   * @return  array of strings representing the directory entries.
+   * @throws IOException
+   */
+  public String[] list(
+      @Nullable FsVolumeSpi volume, File dir) throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, LIST);
+    try {
+      String[] children = FileUtil.list(dir);
+      eventHooks.afterMetadataOp(volume, LIST, begin);
+      return children;
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, LIST, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Get a listing of the given directory using
+   * {@link IOUtils#listDirectory(File, FilenameFilter)}.
+   *
+   * @param volume target volume. null if unavailable.
+   * @param dir Directory to list.
+   * @param filter {@link FilenameFilter} to filter the directory entries.
+   * @throws IOException
+   */
+  public List<String> listDirectory(
+      @Nullable FsVolumeSpi volume, File dir,
+      FilenameFilter filter) throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, LIST);
+    try {
+      List<String> children = IOUtils.listDirectory(dir, filter);
+      eventHooks.afterMetadataOp(volume, LIST, begin);
+      return children;
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, LIST, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Retrieves the number of links to the specified file.
+   *
+   * @param volume target volume. null if unavailable.
+   * @param f file whose link count is being queried.
+   * @return number of hard-links to the given file, including the
+   *         given path itself.
+   * @throws IOException
+   */
+  public int getHardLinkCount(
+      @Nullable FsVolumeSpi volume, File f) throws IOException {
+    final long begin = eventHooks.beforeMetadataOp(volume, LIST);
+    try {
+      int count = HardLink.getLinkCount(f);
+      eventHooks.afterMetadataOp(volume, LIST, begin);
+      return count;
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, LIST, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * Check for file existence using {@link File#exists()}.
+   *
+   * @param volume target volume. null if unavailable.
+   * @param f file object.
+   * @return true if the file exists.
+   */
+  public boolean exists(@Nullable FsVolumeSpi volume, File f) {
+    final long begin = eventHooks.beforeMetadataOp(volume, EXISTS);
+    try {
+      boolean exists = f.exists();
+      eventHooks.afterMetadataOp(volume, EXISTS, begin);
+      return exists;
+    } catch(Exception e) {
+      eventHooks.onFailure(volume, EXISTS, e, begin);
+      throw e;
+    }
+  }
+
+  /**
+   * A thin wrapper over {@link FileInputStream} that allows
+   * instrumenting disk IO.
+   */
+  private final class WrappedFileInputStream extends FileInputStream {
+    private @Nullable final FsVolumeSpi volume;
+
+    /**
+     * {@inheritDoc}.
+     */
+    private WrappedFileInputStream(@Nullable FsVolumeSpi volume, File f)
+        throws FileNotFoundException {
+      super(f);
+      this.volume = volume;
+    }
+
+    /**
+     * {@inheritDoc}.
+     */
+    private WrappedFileInputStream(
+        @Nullable FsVolumeSpi volume, FileDescriptor fd) {
+      super(fd);
+      this.volume = volume;
+    }
+
+    /**
+     * {@inheritDoc}.
+     */
+    @Override
+    public int read() throws IOException {
+      final long begin = eventHooks.beforeFileIo(volume, READ, 1);
+      try {
+        int b = super.read();
+        eventHooks.afterFileIo(volume, READ, begin, 1);
+        return b;
+      } catch(Exception e) {
+        eventHooks.onFailure(volume, READ, e, begin);
+        throw e;
+      }
+    }
+
+    /**
+     * {@inheritDoc}.
+     */
+    @Override
+    public int read(@Nonnull byte[] b) throws IOException {
+      final long begin = eventHooks.beforeFileIo(volume, READ, b.length);
+      try {
+        int numBytesRead = super.read(b);
+        eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
+        return numBytesRead;
+      } catch(Exception e) {
+        eventHooks.onFailure(volume, READ, e, begin);
+        throw e;
+      }
+    }
+
+    /**
+     * {@inheritDoc}.
+     */
+    @Override
+    public int read(@Nonnull byte[] b, int off, int len) throws IOException {
+      final long begin = eventHooks.beforeFileIo(volume, READ, len);
+      try {
+        int numBytesRead = super.read(b, off, len);
+        eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
+        return numBytesRead;
+      } catch(Exception e) {
+        eventHooks.onFailure(volume, READ, e, begin);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * A thin wrapper over {@link FileOutputStream} that allows
+   * instrumenting disk IO.
+   */
+  private final class WrappedFileOutputStream extends FileOutputStream {
+    private @Nullable final FsVolumeSpi volume;
+
+    /**
+     * {@inheritDoc}.
+     */
+    private WrappedFileOutputStream(
+        @Nullable FsVolumeSpi volume, File f,
+        boolean append) throws FileNotFoundException {
+      super(f, append);
+      this.volume = volume;
+    }
+
+    /**
+     * {@inheritDoc}.
+     */
+    private WrappedFileOutputStream(
+        @Nullable FsVolumeSpi volume, FileDescriptor fd) {
+      super(fd);
+      this.volume = volume;
+    }
+
+    /**
+     * {@inheritDoc}.
+     */
+    @Override
+    public void write(int b) throws IOException {
+      final long begin = eventHooks.beforeFileIo(volume, WRITE, 1);
+      try {
+        super.write(b);
+        eventHooks.afterFileIo(volume, WRITE, begin, 1);
+      } catch(Exception e) {
+        eventHooks.onFailure(volume, WRITE, e, begin);
+        throw e;
+      }
+    }
+
+    /**
+     * {@inheritDoc}.
+     */
+    @Override
+    public void write(@Nonnull byte[] b) throws IOException {
+      final long begin = eventHooks.beforeFileIo(volume, WRITE, b.length);
+      try {
+        super.write(b);
+        eventHooks.afterFileIo(volume, WRITE, begin, b.length);
+      } catch(Exception e) {
+        eventHooks.onFailure(volume, WRITE, e, begin);
+        throw e;
+      }
+    }
+
+    /**
+     * {@inheritDoc}.
+     */
+    @Override
+    public void write(@Nonnull byte[] b, int off, int len) throws IOException {
+      final long begin = eventHooks.beforeFileIo(volume, WRITE, len);
+      try {
+        super.write(b, off, len);
+        eventHooks.afterFileIo(volume, WRITE, begin, len);
+      } catch(Exception e) {
+        eventHooks.onFailure(volume, WRITE, e, begin);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * A thin wrapper over {@link FileInputStream} that allows
+   * instrumenting IO.
+   */
+  private final class WrappedRandomAccessFile extends RandomAccessFile {
+    private @Nullable final FsVolumeSpi volume;
+
+    public WrappedRandomAccessFile(
+        @Nullable FsVolumeSpi volume, File f, String mode)
+        throws FileNotFoundException {
+      super(f, mode);
+      this.volume = volume;
+    }
+
+    @Override
+    public int read() throws IOException {
+      final long begin = eventHooks.beforeFileIo(volume, READ, 1);
+      try {
+        int b = super.read();
+        eventHooks.afterFileIo(volume, READ, begin, 1);
+        return b;
+      } catch(Exception e) {
+        eventHooks.onFailure(volume, READ, e, begin);
+        throw e;
+      }
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+      final long begin = eventHooks.beforeFileIo(volume, READ, len);
+      try {
+        int numBytesRead = super.read(b, off, len);
+        eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
+        return numBytesRead;
+      } catch(Exception e) {
+        eventHooks.onFailure(volume, READ, e, begin);
+        throw e;
+      }
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+      final long begin = eventHooks.beforeFileIo(volume, READ, b.length);
+      try {
+        int numBytesRead = super.read(b);
+        eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
+        return numBytesRead;
+      } catch(Exception e) {
+        eventHooks.onFailure(volume, READ, e, begin);
+        throw e;
+      }
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+      final long begin = eventHooks.beforeFileIo(volume, WRITE, 1);
+      try {
+        super.write(b);
+        eventHooks.afterFileIo(volume, WRITE, begin, 1);
+      } catch(Exception e) {
+        eventHooks.onFailure(volume, WRITE, e, begin);
+        throw e;
+      }
+    }
+
+    @Override
+    public void write(@Nonnull byte[] b) throws IOException {
+      final long begin = eventHooks.beforeFileIo(volume, WRITE, b.length);
+      try {
+        super.write(b);
+        eventHooks.afterFileIo(volume, WRITE, begin, b.length);
+      } catch(Exception e) {
+        eventHooks.onFailure(volume, WRITE, e, begin);
+        throw e;
+      }
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+      final long begin = eventHooks.beforeFileIo(volume, WRITE, len);
+      try {
+        super.write(b, off, len);
+        eventHooks.afterFileIo(volume, WRITE, begin, len);
+      } catch(Exception e) {
+        eventHooks.onFailure(volume, WRITE, e, begin);
+        throw e;
+      }
+    }
+  }
+}

+ 11 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java

@@ -234,7 +234,7 @@ public class ReplicaInPipeline extends ReplicaInfo
   
   @Override // ReplicaInPipelineInterface
   public ReplicaOutputStreams createStreams(boolean isCreate, 
-      DataChecksum requestedChecksum, long slowLogThresholdMs)
+      DataChecksum requestedChecksum)
       throws IOException {
     File blockFile = getBlockFile();
     File metaFile = getMetaFile();
@@ -251,7 +251,8 @@ public class ReplicaInPipeline extends ReplicaInfo
     // may differ from requestedChecksum for appends.
     final DataChecksum checksum;
 
-    RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
+    RandomAccessFile metaRAF = getFileIoProvider().getRandomAccessFile(
+        getVolume(), metaFile, "rw");
 
     if (!isCreate) {
       // For append or recovery, we must enforce the existing checksum.
@@ -293,17 +294,19 @@ public class ReplicaInPipeline extends ReplicaInfo
     FileOutputStream blockOut = null;
     FileOutputStream crcOut = null;
     try {
-      blockOut = new FileOutputStream(
-          new RandomAccessFile( blockFile, "rw" ).getFD() );
-      crcOut = new FileOutputStream(metaRAF.getFD());
+      blockOut = getFileIoProvider().getFileOutputStream(
+          getVolume(), new RandomAccessFile(blockFile, "rw").getFD());
+      crcOut = getFileIoProvider().getFileOutputStream(
+          getVolume(), metaRAF.getFD());
       if (!isCreate) {
         blockOut.getChannel().position(blockDiskSize);
         crcOut.getChannel().position(crcDiskSize);
       }
       return new ReplicaOutputStreams(blockOut, crcOut, checksum,
-          getVolume().isTransientStorage(), slowLogThresholdMs);
+          getVolume(), getFileIoProvider());
     } catch (IOException e) {
       IOUtils.closeStream(blockOut);
+      IOUtils.closeStream(crcOut);
       IOUtils.closeStream(metaRAF);
       throw e;
     }
@@ -314,11 +317,11 @@ public class ReplicaInPipeline extends ReplicaInfo
     File blockFile = getBlockFile();
     File restartMeta = new File(blockFile.getParent()  +
         File.pathSeparator + "." + blockFile.getName() + ".restart");
-    if (restartMeta.exists() && !restartMeta.delete()) {
+    if (!getFileIoProvider().deleteWithExistsCheck(getVolume(), restartMeta)) {
       DataNode.LOG.warn("Failed to delete restart meta file: " +
           restartMeta.getPath());
     }
-    return new FileOutputStream(restartMeta);
+    return getFileIoProvider().getFileOutputStream(getVolume(), restartMeta);
   }
 
   @Override

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java

@@ -69,12 +69,11 @@ public interface ReplicaInPipelineInterface extends Replica {
    * 
    * @param isCreate if it is for creation
    * @param requestedChecksum the checksum the writer would prefer to use
-   * @param slowLogThresholdMs threshold in ms to log slow io operation
    * @return output streams for writing
    * @throws IOException if any error occurs
    */
   public ReplicaOutputStreams createStreams(boolean isCreate,
-      DataChecksum requestedChecksum, long slowLogThresholdMs)
+      DataChecksum requestedChecksum)
       throws IOException;
 
   /**

+ 31 - 45
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java

@@ -20,17 +20,12 @@ package org.apache.hadoop.hdfs.server.datanode;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.HardLink;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.LightWeightResizableGSet;
@@ -67,6 +62,10 @@ abstract public class ReplicaInfo extends Block
   
   private static final Map<String, File> internedBaseDirs = new HashMap<String, File>();
 
+  /** This is used by some tests and FsDatasetUtil#computeChecksum. */
+  private static final FileIoProvider DEFAULT_FILE_IO_PROVIDER =
+      new FileIoProvider(null);
+
   /**
    * Constructor
    * @param block a block
@@ -125,7 +124,18 @@ abstract public class ReplicaInfo extends Block
   public FsVolumeSpi getVolume() {
     return volume;
   }
-  
+
+  /**
+   * Get the {@link FileIoProvider} for disk IO operations.
+   */
+  public FileIoProvider getFileIoProvider() {
+    // In tests and when invoked via FsDatasetUtil#computeChecksum, the
+    // target volume for this replica may be unknown and hence null.
+    // Use the DEFAULT_FILE_IO_PROVIDER with no-op hooks.
+    return (volume != null) ? volume.getFileIoProvider()
+        : DEFAULT_FILE_IO_PROVIDER;
+  }
+
   /**
    * Set the volume where this replica is located on disk
    */
@@ -227,30 +237,25 @@ abstract public class ReplicaInfo extends Block
    * be recovered (especially on Windows) on datanode restart.
    */
   private void breakHardlinks(File file, Block b) throws IOException {
-    File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file));
-    try {
-      FileInputStream in = new FileInputStream(file);
-      try {
-        FileOutputStream out = new FileOutputStream(tmpFile);
-        try {
-          copyBytes(in, out, 16 * 1024);
-        } finally {
-          out.close();
-        }
-      } finally {
-        in.close();
+    final FileIoProvider fileIoProvider = getFileIoProvider();
+    final File tmpFile = DatanodeUtil.createFileWithExistsCheck(
+        getVolume(), b, DatanodeUtil.getUnlinkTmpFile(file), fileIoProvider);
+    try (FileInputStream in = fileIoProvider.getFileInputStream(
+        getVolume(), file)) {
+      try (FileOutputStream out = fileIoProvider.getFileOutputStream(
+          getVolume(), tmpFile)) {
+        IOUtils.copyBytes(in, out, 16 * 1024);
       }
       if (file.length() != tmpFile.length()) {
         throw new IOException("Copy of file " + file + " size " + file.length()+
-                              " into file " + tmpFile +
-                              " resulted in a size of " + tmpFile.length());
+            " into file " + tmpFile +
+            " resulted in a size of " + tmpFile.length());
       }
-      replaceFile(tmpFile, file);
+      fileIoProvider.replaceFile(getVolume(), tmpFile, file);
     } catch (IOException e) {
-      boolean done = tmpFile.delete();
-      if (!done) {
+      if (!fileIoProvider.delete(getVolume(), tmpFile)) {
         DataNode.LOG.info("detachFile failed to delete temporary file " +
-                          tmpFile);
+            tmpFile);
       }
       throw e;
     }
@@ -319,26 +324,7 @@ abstract public class ReplicaInfo extends Block
     this.next = next;
   }
 
-  public static boolean fullyDelete(final File dir) {
-    boolean result = DataStorage.fullyDelete(dir);
-    return result;
-  }
-
-  public static int getHardLinkCount(File fileName) throws IOException {
-    int linkCount = HardLink.getLinkCount(fileName);
-    return linkCount;
-  }
-
-  public static void rename(File from, File to) throws IOException {
-    Storage.rename(from, to);
-  }
-
-  private void copyBytes(InputStream in, OutputStream out, int
-    buffSize) throws IOException{
-    IOUtils.copyBytes(in, out, buffSize);
-  }
-
-  private void replaceFile(File src, File target) throws IOException {
-    FileUtil.replaceFile(src, target);
+  int getHardLinkCount(File fileName) throws IOException {
+    return getFileIoProvider().getHardLinkCount(getVolume(), fileName);
   }
 }

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java

@@ -24,6 +24,7 @@ import java.nio.channels.ClosedChannelException;
 
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
 import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
 import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
 
@@ -207,4 +208,6 @@ public interface FsVolumeSpi
    */
   class VolumeCheckContext {
   }
+
+  FileIoProvider getFileIoProvider();
 }

+ 7 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java

@@ -24,8 +24,8 @@ import java.io.InputStream;
 import java.io.IOException;
 
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.io.nativeio.NativeIOException;
 import org.slf4j.Logger;
 
@@ -38,12 +38,15 @@ public class ReplicaInputStreams implements Closeable {
   private InputStream dataIn;
   private InputStream checksumIn;
   private FsVolumeReference volumeRef;
+  private final FileIoProvider fileIoProvider;
   private FileDescriptor dataInFd = null;
 
   /** Create an object with a data input stream and a checksum input stream. */
-  public ReplicaInputStreams(InputStream dataStream,
-      InputStream checksumStream, FsVolumeReference volumeRef) {
+  public ReplicaInputStreams(
+      InputStream dataStream, InputStream checksumStream,
+      FsVolumeReference volumeRef, FileIoProvider fileIoProvider) {
     this.volumeRef = volumeRef;
+    this.fileIoProvider = fileIoProvider;
     this.dataIn = dataStream;
     this.checksumIn = checksumStream;
     if (dataIn instanceof FileInputStream) {
@@ -103,7 +106,7 @@ public class ReplicaInputStreams implements Closeable {
   public void dropCacheBehindReads(String identifier, long offset, long len,
       int flags) throws NativeIOException {
     assert this.dataInFd != null : "null dataInFd!";
-    NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
+    fileIoProvider.posixFadvise(getVolumeRef().getVolume(),
         identifier, dataInFd, offset, len, flags);
   }
 

+ 24 - 49
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java

@@ -20,15 +20,15 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset;
 import java.io.Closeable;
 import java.io.FileDescriptor;
 import java.io.FileOutputStream;
+import java.io.Flushable;
 import java.io.OutputStream;
 import java.io.IOException;
 
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.io.nativeio.NativeIOException;
 import org.apache.hadoop.util.DataChecksum;
-import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 
 /**
@@ -43,21 +43,22 @@ public class ReplicaOutputStreams implements Closeable {
   /** Stream to checksum. */
   private final OutputStream checksumOut;
   private final DataChecksum checksum;
-  private final boolean isTransientStorage;
-  private final long slowLogThresholdMs;
+  private final FsVolumeSpi volume;
+  private final FileIoProvider fileIoProvider;
 
   /**
    * Create an object with a data output stream, a checksum output stream
    * and a checksum.
    */
-  public ReplicaOutputStreams(OutputStream dataOut,
-      OutputStream checksumOut, DataChecksum checksum,
-      boolean isTransientStorage, long slowLogThresholdMs) {
+  public ReplicaOutputStreams(
+      OutputStream dataOut, OutputStream checksumOut, DataChecksum checksum,
+      FsVolumeSpi volume, FileIoProvider fileIoProvider) {
+
     this.dataOut = dataOut;
     this.checksum = checksum;
-    this.slowLogThresholdMs = slowLogThresholdMs;
-    this.isTransientStorage = isTransientStorage;
     this.checksumOut = checksumOut;
+    this.volume = volume;
+    this.fileIoProvider = fileIoProvider;
 
     try {
       if (this.dataOut instanceof FileOutputStream) {
@@ -93,7 +94,7 @@ public class ReplicaOutputStreams implements Closeable {
 
   /** @return is writing to a transient storage? */
   public boolean isTransientStorage() {
-    return isTransientStorage;
+    return volume.isTransientStorage();
   }
 
   @Override
@@ -112,7 +113,7 @@ public class ReplicaOutputStreams implements Closeable {
    */
   public void syncDataOut() throws IOException {
     if (dataOut instanceof FileOutputStream) {
-      sync((FileOutputStream)dataOut);
+      fileIoProvider.sync(volume, (FileOutputStream) dataOut);
     }
   }
   
@@ -121,7 +122,7 @@ public class ReplicaOutputStreams implements Closeable {
    */
   public void syncChecksumOut() throws IOException {
     if (checksumOut instanceof FileOutputStream) {
-      sync((FileOutputStream)checksumOut);
+      fileIoProvider.sync(volume, (FileOutputStream) checksumOut);
     }
   }
 
@@ -129,60 +130,34 @@ public class ReplicaOutputStreams implements Closeable {
    * Flush the data stream if it supports it.
    */
   public void flushDataOut() throws IOException {
-    flush(dataOut);
+    if (dataOut != null) {
+      fileIoProvider.flush(volume, dataOut);
+    }
   }
 
   /**
    * Flush the checksum stream if it supports it.
    */
   public void flushChecksumOut() throws IOException {
-    flush(checksumOut);
-  }
-
-  private void flush(OutputStream dos) throws IOException {
-    long begin = Time.monotonicNow();
-    dos.flush();
-    long duration = Time.monotonicNow() - begin;
-    LOG.trace("ReplicaOutputStreams#flush takes {} ms.", duration);
-    if (duration > slowLogThresholdMs) {
-      LOG.warn("Slow flush took {} ms (threshold={} ms)", duration,
-          slowLogThresholdMs);
+    if (checksumOut != null) {
+      fileIoProvider.flush(volume, checksumOut);
     }
   }
 
-  private void sync(FileOutputStream fos) throws IOException {
-    long begin = Time.monotonicNow();
-    fos.getChannel().force(true);
-    long duration = Time.monotonicNow() - begin;
-    LOG.trace("ReplicaOutputStreams#sync takes {} ms.", duration);
-    if (duration > slowLogThresholdMs) {
-      LOG.warn("Slow fsync took {} ms (threshold={} ms)", duration,
-          slowLogThresholdMs);
-    }
-  }
-
-  public long writeToDisk(byte[] b, int off, int len) throws IOException {
-    long begin = Time.monotonicNow();
+  public void writeDataToDisk(byte[] b, int off, int len)
+      throws IOException {
     dataOut.write(b, off, len);
-    long duration = Time.monotonicNow() - begin;
-    LOG.trace("DatanodeIO#writeToDisk takes {} ms.", duration);
-    if (duration > slowLogThresholdMs) {
-      LOG.warn("Slow BlockReceiver write data to disk cost: {} ms " +
-          "(threshold={} ms)", duration, slowLogThresholdMs);
-    }
-    return duration;
   }
 
   public void syncFileRangeIfPossible(long offset, long nbytes,
       int flags) throws NativeIOException {
-    assert this.outFd != null : "null outFd!";
-    NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, nbytes, flags);
+    fileIoProvider.syncFileRange(
+        volume, outFd, offset, nbytes, flags);
   }
 
   public void dropCacheBehindWrites(String identifier,
       long offset, long len, int flags) throws NativeIOException {
-    assert this.outFd != null : "null outFd!";
-    NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
-        identifier, outFd, offset, len, flags);
+    fileIoProvider.posixFadvise(
+        volume, identifier, outFd, offset, len, flags);
   }
 }

+ 59 - 73
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java

@@ -32,19 +32,18 @@ import java.util.Iterator;
 import java.util.Scanner;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CachingGetSpaceUsed;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.GetSpaceUsed;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
@@ -63,7 +62,6 @@ import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.Timer;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.io.Files;
 
 /**
  * A block pool slice represents a portion of a block pool stored on a volume.
@@ -95,6 +93,7 @@ class BlockPoolSlice {
   private final long cachedDfsUsedCheckTime;
   private final Timer timer;
   private final int maxDataLength;
+  private final FileIoProvider fileIoProvider;
 
   // TODO:FEDERATION scalability issue - a thread per DU is needed
   private final GetSpaceUsed dfsUsage;
@@ -112,6 +111,7 @@ class BlockPoolSlice {
       Configuration conf, Timer timer) throws IOException {
     this.bpid = bpid;
     this.volume = volume;
+    this.fileIoProvider = volume.getFileIoProvider();
     this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
     this.finalizedDir = new File(
         currentDir, DataStorage.STORAGE_DIR_FINALIZED);
@@ -146,25 +146,20 @@ class BlockPoolSlice {
     //
     this.tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
     if (tmpDir.exists()) {
-      DataStorage.fullyDelete(tmpDir);
+      fileIoProvider.fullyDelete(volume, tmpDir);
     }
     this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
     final boolean supportAppends = conf.getBoolean(
         DFSConfigKeys.DFS_SUPPORT_APPEND_KEY,
         DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT);
     if (rbwDir.exists() && !supportAppends) {
-      FileUtil.fullyDelete(rbwDir);
-    }
-    if (!rbwDir.mkdirs()) {  // create rbw directory if not exist
-      if (!rbwDir.isDirectory()) {
-        throw new IOException("Mkdirs failed to create " + rbwDir.toString());
-      }
-    }
-    if (!tmpDir.mkdirs()) {
-      if (!tmpDir.isDirectory()) {
-        throw new IOException("Mkdirs failed to create " + tmpDir.toString());
-      }
+      fileIoProvider.fullyDelete(volume, rbwDir);
     }
+
+    // create the rbw and tmp directories if they don't exist.
+    fileIoProvider.mkdirs(volume, rbwDir);
+    fileIoProvider.mkdirs(volume, tmpDir);
+
     // Use cached value initially if available. Or the following call will
     // block until the initial du command completes.
     this.dfsUsage = new CachingGetSpaceUsed.Builder().setPath(bpDir)
@@ -271,7 +266,7 @@ class BlockPoolSlice {
    */
   void saveDfsUsed() {
     File outFile = new File(currentDir, DU_CACHE_FILE);
-    if (outFile.exists() && !outFile.delete()) {
+    if (!fileIoProvider.deleteWithExistsCheck(volume, outFile)) {
       FsDatasetImpl.LOG.warn("Failed to delete old dfsUsed file in " +
         outFile.getParent());
     }
@@ -279,10 +274,10 @@ class BlockPoolSlice {
     try {
       long used = getDfsUsed();
       try (Writer out = new OutputStreamWriter(
-          new FileOutputStream(outFile), "UTF-8")) {
+          fileIoProvider.getFileOutputStream(volume, outFile), "UTF-8")) {
         // mtime is written last, so that truncated writes won't be valid.
         out.write(Long.toString(used) + " " + Long.toString(timer.now()));
-        out.flush();
+        fileIoProvider.flush(volume, out);
       }
     } catch (IOException ioe) {
       // If write failed, the volume might be bad. Since the cache file is
@@ -297,7 +292,8 @@ class BlockPoolSlice {
    */
   File createTmpFile(Block b) throws IOException {
     File f = new File(tmpDir, b.getBlockName());
-    File tmpFile = DatanodeUtil.createTmpFile(b, f);
+    File tmpFile = DatanodeUtil.createFileWithExistsCheck(
+        volume, b, f, fileIoProvider);
     // If any exception during creation, its expected that counter will not be
     // incremented, So no need to decrement
     incrNumBlocks();
@@ -310,7 +306,8 @@ class BlockPoolSlice {
    */
   File createRbwFile(Block b) throws IOException {
     File f = new File(rbwDir, b.getBlockName());
-    File rbwFile = DatanodeUtil.createTmpFile(b, f);
+    File rbwFile = DatanodeUtil.createFileWithExistsCheck(
+        volume, b, f, fileIoProvider);
     // If any exception during creation, its expected that counter will not be
     // incremented, So no need to decrement
     incrNumBlocks();
@@ -319,12 +316,9 @@ class BlockPoolSlice {
 
   File addBlock(Block b, File f) throws IOException {
     File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
-    if (!blockDir.exists()) {
-      if (!blockDir.mkdirs()) {
-        throw new IOException("Failed to mkdirs " + blockDir);
-      }
-    }
-    File blockFile = FsDatasetImpl.moveBlockFiles(b, f, blockDir);
+    fileIoProvider.mkdirsWithExistsCheck(volume, blockDir);
+    File blockFile = ((FsDatasetImpl) volume.getDataset()).moveBlockFiles(
+        volume, b, f, blockDir);
     File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
     if (dfsUsage instanceof CachingGetSpaceUsed) {
       ((CachingGetSpaceUsed) dfsUsage).incDfsUsed(
@@ -342,9 +336,9 @@ class BlockPoolSlice {
     final File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
     final File targetBlockFile = new File(blockDir, blockFile.getName());
     final File targetMetaFile = new File(blockDir, metaFile.getName());
-    FileUtils.moveFile(blockFile, targetBlockFile);
+    fileIoProvider.moveFile(volume, blockFile, targetBlockFile);
     FsDatasetImpl.LOG.info("Moved " + blockFile + " to " + targetBlockFile);
-    FileUtils.moveFile(metaFile, targetMetaFile);
+    fileIoProvider.moveFile(volume, metaFile, targetMetaFile);
     FsDatasetImpl.LOG.info("Moved " + metaFile + " to " + targetMetaFile);
     return targetBlockFile;
   }
@@ -387,16 +381,13 @@ class BlockPoolSlice {
     File blockFile = FsDatasetUtil.getOrigFile(unlinkedTmp);
     if (blockFile.exists()) {
       // If the original block file still exists, then no recovery is needed.
-      if (!unlinkedTmp.delete()) {
+      if (!fileIoProvider.delete(volume, unlinkedTmp)) {
         throw new IOException("Unable to cleanup unlinked tmp file " +
             unlinkedTmp);
       }
       return null;
     } else {
-      if (!unlinkedTmp.renameTo(blockFile)) {
-        throw new IOException("Unable to rename unlinked tmp file " +
-            unlinkedTmp);
-      }
+      fileIoProvider.rename(volume, unlinkedTmp, blockFile);
       return blockFile;
     }
   }
@@ -409,7 +400,7 @@ class BlockPoolSlice {
    */
   private int moveLazyPersistReplicasToFinalized(File source)
       throws IOException {
-    File files[] = FileUtil.listFiles(source);
+    File[] files = fileIoProvider.listFiles(volume, source);
     int numRecovered = 0;
     for (File file : files) {
       if (file.isDirectory()) {
@@ -424,24 +415,25 @@ class BlockPoolSlice {
 
         if (blockFile.exists()) {
 
-          if (!targetDir.exists() && !targetDir.mkdirs()) {
+          try {
+            fileIoProvider.mkdirsWithExistsCheck(volume, targetDir);
+          } catch(IOException ioe) {
             LOG.warn("Failed to mkdirs " + targetDir);
             continue;
           }
 
           final File targetMetaFile = new File(targetDir, metaFile.getName());
           try {
-            ReplicaInfo.rename(metaFile, targetMetaFile);
+            fileIoProvider.rename(volume, metaFile, targetMetaFile);
           } catch (IOException e) {
             LOG.warn("Failed to move meta file from "
                 + metaFile + " to " + targetMetaFile, e);
             continue;
-
           }
 
           final File targetBlockFile = new File(targetDir, blockFile.getName());
           try {
-            ReplicaInfo.rename(blockFile, targetBlockFile);
+            fileIoProvider.rename(volume, blockFile, targetBlockFile);
           } catch (IOException e) {
             LOG.warn("Failed to move block file from "
                 + blockFile + " to " + targetBlockFile, e);
@@ -458,7 +450,7 @@ class BlockPoolSlice {
       }
     }
 
-    FileUtil.fullyDelete(source);
+    fileIoProvider.fullyDelete(volume, source);
     return numRecovered;
   }
 
@@ -491,7 +483,7 @@ class BlockPoolSlice {
           loadRwr = false;
         }
         sc.close();
-        if (!restartMeta.delete()) {
+        if (!fileIoProvider.delete(volume, restartMeta)) {
           FsDatasetImpl.LOG.warn("Failed to delete restart meta file: " +
               restartMeta.getPath());
         }
@@ -547,7 +539,7 @@ class BlockPoolSlice {
                         final RamDiskReplicaTracker lazyWriteReplicaMap,
                         boolean isFinalized)
       throws IOException {
-    File files[] = FileUtil.listFiles(dir);
+    File[] files = fileIoProvider.listFiles(volume, dir);
     for (File file : files) {
       if (file.isDirectory()) {
         addToReplicasMap(volumeMap, file, lazyWriteReplicaMap, isFinalized);
@@ -560,8 +552,9 @@ class BlockPoolSlice {
           continue;
         }
       }
-      if (!Block.isBlockFilename(file))
+      if (!Block.isBlockFilename(file)) {
         continue;
+      }
 
       long genStamp = FsDatasetUtil.getGenerationStampFromFile(
           files, file);
@@ -650,11 +643,11 @@ class BlockPoolSlice {
   private void deleteReplica(final ReplicaInfo replicaToDelete) {
     // Delete the files on disk. Failure here is okay.
     final File blockFile = replicaToDelete.getBlockFile();
-    if (!blockFile.delete()) {
+    if (!fileIoProvider.delete(volume, blockFile)) {
       LOG.warn("Failed to delete block file " + blockFile);
     }
     final File metaFile = replicaToDelete.getMetaFile();
-    if (!metaFile.delete()) {
+    if (!fileIoProvider.delete(volume, metaFile)) {
       LOG.warn("Failed to delete meta file " + metaFile);
     }
   }
@@ -681,7 +674,8 @@ class BlockPoolSlice {
         return 0;
       }
       try (DataInputStream checksumIn = new DataInputStream(
-          new BufferedInputStream(new FileInputStream(metaFile),
+          new BufferedInputStream(
+              fileIoProvider.getFileInputStream(volume, metaFile),
               ioFileBufferSize))) {
         // read and handle the common header here. For now just a version
         final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
@@ -694,9 +688,10 @@ class BlockPoolSlice {
         if (numChunks == 0) {
           return 0;
         }
-        try (InputStream blockIn = new FileInputStream(blockFile);
+        try (InputStream blockIn = fileIoProvider.getFileInputStream(
+                 volume, blockFile);
              ReplicaInputStreams ris = new ReplicaInputStreams(blockIn,
-                 checksumIn, volume.obtainReference())) {
+                 checksumIn, volume.obtainReference(), fileIoProvider)) {
           ris.skipChecksumFully((numChunks - 1) * checksumSize);
           long lastChunkStartPos = (numChunks - 1) * bytesPerChecksum;
           ris.skipDataFully(lastChunkStartPos);
@@ -715,7 +710,8 @@ class BlockPoolSlice {
           // truncate if extra bytes are present without CRC
           if (blockFile.length() > validFileLength) {
             try (RandomAccessFile blockRAF =
-                     new RandomAccessFile(blockFile, "rw")) {
+                     fileIoProvider.getRandomAccessFile(
+                         volume, blockFile, "rw")) {
               // truncate blockFile
               blockRAF.setLength(validFileLength);
             }
@@ -767,12 +763,14 @@ class BlockPoolSlice {
     }
     FileInputStream inputStream = null;
     try {
-      inputStream = new FileInputStream(replicaFile);
+      inputStream = fileIoProvider.getFileInputStream(volume, replicaFile);
       BlockListAsLongs blocksList =
           BlockListAsLongs.readFrom(inputStream, maxDataLength);
-      Iterator<BlockReportReplica> iterator = blocksList.iterator();
-      while (iterator.hasNext()) {
-        BlockReportReplica replica = iterator.next();
+      if (blocksList == null) {
+        return false;
+      }
+
+      for (BlockReportReplica replica : blocksList) {
         switch (replica.getState()) {
         case FINALIZED:
           addReplicaToReplicasMap(replica, tmpReplicaMap, lazyWriteReplicaMap, true);
@@ -809,7 +807,7 @@ class BlockPoolSlice {
       return false;
     }
     finally {
-      if (!replicaFile.delete()) {
+      if (!fileIoProvider.delete(volume, replicaFile)) {
         LOG.info("Failed to delete replica cache file: " +
             replicaFile.getPath());
       }
@@ -823,41 +821,29 @@ class BlockPoolSlice {
         blocksListToPersist.getNumberOfBlocks()== 0) {
       return;
     }
-    File tmpFile = new File(currentDir, REPLICA_CACHE_FILE + ".tmp");
-    if (tmpFile.exists() && !tmpFile.delete()) {
-      LOG.warn("Failed to delete tmp replicas file in " +
-        tmpFile.getPath());
-      return;
-    }
-    File replicaCacheFile = new File(currentDir, REPLICA_CACHE_FILE);
-    if (replicaCacheFile.exists() && !replicaCacheFile.delete()) {
-      LOG.warn("Failed to delete replicas file in " +
-          replicaCacheFile.getPath());
+    final File tmpFile = new File(currentDir, REPLICA_CACHE_FILE + ".tmp");
+    final File replicaCacheFile = new File(currentDir, REPLICA_CACHE_FILE);
+    if (!fileIoProvider.deleteWithExistsCheck(volume, tmpFile) ||
+        !fileIoProvider.deleteWithExistsCheck(volume, replicaCacheFile)) {
       return;
     }
 
     FileOutputStream out = null;
     try {
-      out = new FileOutputStream(tmpFile);
+      out = fileIoProvider.getFileOutputStream(volume, tmpFile);
       blocksListToPersist.writeTo(out);
       out.close();
       // Renaming the tmp file to replicas
-      Files.move(tmpFile, replicaCacheFile);
+      fileIoProvider.moveFile(volume, tmpFile, replicaCacheFile);
     } catch (Exception e) {
       // If write failed, the volume might be bad. Since the cache file is
       // not critical, log the error, delete both the files (tmp and cache)
       // and continue.
       LOG.warn("Failed to write replicas to cache ", e);
-      if (replicaCacheFile.exists() && !replicaCacheFile.delete()) {
-        LOG.warn("Failed to delete replicas file: " +
-            replicaCacheFile.getPath());
-      }
+      fileIoProvider.deleteWithExistsCheck(volume, replicaCacheFile);
     } finally {
       IOUtils.closeStream(out);
-      if (tmpFile.exists() && !tmpFile.delete()) {
-        LOG.warn("Failed to delete tmp file in " +
-            tmpFile.getPath());
-      }
+      fileIoProvider.deleteWithExistsCheck(volume, tmpFile);
     }
   }
 

+ 19 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -33,6 +34,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
@@ -232,6 +234,7 @@ class FsDatasetAsyncDiskService {
     final File metaFile;
     final ExtendedBlock block;
     final String trashDirectory;
+    private final FileIoProvider fileIoProvider;
     
     ReplicaFileDeleteTask(FsVolumeReference volumeRef, File blockFile,
         File metaFile, ExtendedBlock block, String trashDirectory) {
@@ -241,6 +244,7 @@ class FsDatasetAsyncDiskService {
       this.metaFile = metaFile;
       this.block = block;
       this.trashDirectory = trashDirectory;
+      this.fileIoProvider = volume.getFileIoProvider();
     }
 
     @Override
@@ -252,13 +256,17 @@ class FsDatasetAsyncDiskService {
     }
 
     private boolean deleteFiles() {
-      return blockFile.delete() && (metaFile.delete() || !metaFile.exists());
+      return fileIoProvider.delete(volume, blockFile) &&
+          (fileIoProvider.delete(volume, metaFile) ||
+           !fileIoProvider.exists(volume, metaFile));
     }
 
     private boolean moveFiles() {
       File trashDirFile = new File(trashDirectory);
-      if (!trashDirFile.exists() && !trashDirFile.mkdirs()) {
-        LOG.error("Failed to create trash directory " + trashDirectory);
+      try {
+        fileIoProvider.mkdirsWithExistsCheck(
+            volume, trashDirFile);
+      } catch (IOException e) {
         return false;
       }
 
@@ -269,8 +277,14 @@ class FsDatasetAsyncDiskService {
 
       File newBlockFile = new File(trashDirectory, blockFile.getName());
       File newMetaFile = new File(trashDirectory, metaFile.getName());
-      return (blockFile.renameTo(newBlockFile) &&
-              metaFile.renameTo(newMetaFile));
+
+      try {
+        fileIoProvider.renameTo(volume, blockFile, newBlockFile);
+        fileIoProvider.renameTo(volume, metaFile, newMetaFile);
+        return true;
+      } catch(IOException ioe) {
+        return false;
+      }
     }
 
     @Override

+ 165 - 132
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -17,38 +17,10 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.*;
-import java.util.concurrent.Executor;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.TimeUnit;
-
-import javax.management.NotCompliantMBeanException;
-import javax.management.ObjectName;
-import javax.management.StandardMBean;
-
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -57,13 +29,12 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
-import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@@ -74,11 +45,10 @@ import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
-import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetricHelper;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.Replica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
@@ -101,6 +71,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetricHelper;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -115,6 +86,7 @@ import org.apache.hadoop.metrics2.MetricsCollector;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -124,9 +96,34 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Timer;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
 
 /**************************************************
  * FSDataset manages a set of data blocks.  Each block
@@ -195,10 +192,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   public Block getStoredBlock(String bpid, long blkid)
       throws IOException {
     try(AutoCloseableLock lock = datasetLock.acquire()) {
-      File blockfile = getFile(bpid, blkid, false);
+      File blockfile = null;
+
+      ReplicaInfo info = volumeMap.get(bpid, blkid);
+      if (info != null) {
+        blockfile = info.getBlockFile();
+      }
       if (blockfile == null) {
         return null;
       }
+
       final File metafile = FsDatasetUtil.findMetaFile(blockfile);
       final long gs = FsDatasetUtil.parseGenerationStamp(blockfile, metafile);
       return new Block(blkid, blockfile.length(), gs);
@@ -233,17 +236,30 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
       throws IOException {
     File meta = FsDatasetUtil.getMetaFile(getBlockFile(b), b.getGenerationStamp());
+    FsVolumeSpi volume = null;
+
     if (meta == null || !meta.exists()) {
       return null;
     }
+
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      final ReplicaInfo replicaInfo = getReplicaInfo(b);
+      if (replicaInfo != null) {
+        volume = replicaInfo.getVolume();
+      }
+    }
+
     if (isNativeIOAvailable) {
       return new LengthInputStream(
-          NativeIO.getShareDeleteFileInputStream(meta),
+          datanode.getFileIoProvider().getShareDeleteFileInputStream(
+              volume, meta, 0),
           meta.length());
     }
-    return new LengthInputStream(new FileInputStream(meta), meta.length());
+    return new LengthInputStream(
+        datanode.getFileIoProvider().getFileInputStream(volume, meta),
+        meta.length());
   }
-    
+
   final DataNode datanode;
   final DataStorage dataStorage;
   private final FsVolumeList volumes;
@@ -758,42 +774,49 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     return f;
   }
 
-  /**
-   * Return the File associated with a block, without first
-   * checking that it exists. This should be used when the
-   * next operation is going to open the file for read anyway,
-   * and thus the exists check is redundant.
-   *
-   * @param touch if true then update the last access timestamp of the
-   *              block. Currently used for blocks on transient storage.
-   */
-  private File getBlockFileNoExistsCheck(ExtendedBlock b,
-                                         boolean touch)
-      throws IOException {
-    final File f;
-    try(AutoCloseableLock lock = datasetLock.acquire()) {
-      f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId(), touch);
+  @Override // FsDatasetSpi
+  public InputStream getBlockInputStream(ExtendedBlock b,
+      long seekOffset) throws IOException {
+    ReplicaInfo info;
+    try (AutoCloseableLock lock = datasetLock.acquire()) {
+      info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
     }
-    if (f == null) {
-      throw new IOException("Block " + b + " is not valid");
+
+    final File blockFile = info != null ? info.getBlockFile() : null;
+
+    if (blockFile != null && info.getVolume().isTransientStorage()) {
+      ramDiskReplicaTracker.touch(b.getBlockPoolId(), b.getBlockId());
+      datanode.getMetrics().incrRamDiskBlocksReadHits();
+    }
+
+    if(blockFile != null &&
+        datanode.getFileIoProvider().exists(
+            info.getVolume(), blockFile)) {
+      return getDataInputStream(info, seekOffset);
+    } else {
+      throw new IOException("Block " + b + " is not valid. " +
+          "Expected block file at " + blockFile + " does not exist.");
     }
-    return f;
   }
 
-  @Override // FsDatasetSpi
-  public InputStream getBlockInputStream(ExtendedBlock b,
-      long seekOffset) throws IOException {
-    File blockFile = getBlockFileNoExistsCheck(b, true);
-    if (isNativeIOAvailable) {
-      return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset);
+  private InputStream getDataInputStream(
+      ReplicaInfo info, long seekOffset) throws IOException {
+    FileInputStream fis;
+    final File blockFile = info.getBlockFile();
+    final FileIoProvider fileIoProvider = datanode.getFileIoProvider();
+    if (NativeIO.isAvailable()) {
+      fis = fileIoProvider.getShareDeleteFileInputStream(
+          info.getVolume(), blockFile, seekOffset);
     } else {
       try {
-        return openAndSeek(blockFile, seekOffset);
+        fis = fileIoProvider.openAndSeek(
+            info.getVolume(), blockFile, seekOffset);
       } catch (FileNotFoundException fnfe) {
-        throw new IOException("Block " + b + " is not valid. " +
-            "Expected block file at " + blockFile + " does not exist.");
+        throw new IOException("Expected block file at " + blockFile +
+            " does not exist.");
       }
     }
+    return fis;
   }
 
   /**
@@ -846,53 +869,40 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
       long blkOffset, long metaOffset) throws IOException {
     try(AutoCloseableLock lock = datasetLock.acquire()) {
-      ReplicaInfo info = getReplicaInfo(b);
+      final ReplicaInfo info = getReplicaInfo(b);
+      final FileIoProvider fileIoProvider = datanode.getFileIoProvider();
       FsVolumeReference ref = info.getVolume().obtainReference();
+      InputStream blockInStream = null;
+      InputStream metaInStream = null;
       try {
-        InputStream blockInStream = openAndSeek(info.getBlockFile(), blkOffset);
-        try {
-          InputStream metaInStream =
-              openAndSeek(info.getMetaFile(), metaOffset);
-          return new ReplicaInputStreams(blockInStream, metaInStream, ref);
-        } catch (IOException e) {
-          IOUtils.cleanup(null, blockInStream);
-          throw e;
-        }
+        blockInStream = fileIoProvider.openAndSeek(
+            info.getVolume(), info.getBlockFile(), blkOffset);
+        metaInStream = fileIoProvider.openAndSeek(
+                  info.getVolume(), info.getMetaFile(), metaOffset);
+        return new ReplicaInputStreams(
+            blockInStream, metaInStream, ref, fileIoProvider);
       } catch (IOException e) {
-        IOUtils.cleanup(null, ref);
+        IOUtils.cleanup(null, ref, blockInStream);
         throw e;
       }
     }
   }
 
-  private static FileInputStream openAndSeek(File file, long offset)
-      throws IOException {
-    RandomAccessFile raf = null;
-    try {
-      raf = new RandomAccessFile(file, "r");
-      if (offset > 0) {
-        raf.seek(offset);
-      }
-      return new FileInputStream(raf.getFD());
-    } catch(IOException ioe) {
-      IOUtils.cleanup(null, raf);
-      throw ioe;
-    }
-  }
-
-  static File moveBlockFiles(Block b, File srcfile, File destdir)
-      throws IOException {
+  File moveBlockFiles(
+      FsVolumeSpi volume, Block b, File srcfile,
+      File destdir) throws IOException {
     final File dstfile = new File(destdir, b.getBlockName());
     final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp());
     final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp());
+    final FileIoProvider fileIoProvider = datanode.getFileIoProvider();
     try {
-      NativeIO.renameTo(srcmeta, dstmeta);
+      fileIoProvider.renameTo(volume, srcmeta, dstmeta);
     } catch (IOException e) {
       throw new IOException("Failed to move meta file for " + b
           + " from " + srcmeta + " to " + dstmeta, e);
     }
     try {
-      NativeIO.renameTo(srcfile, dstfile);
+      fileIoProvider.renameTo(volume, srcfile, dstfile);
     } catch (IOException e) {
       throw new IOException("Failed to move block file for " + b
           + " from " + srcfile + " to " + dstfile.getAbsolutePath(), e);
@@ -1031,8 +1041,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   static void computeChecksum(File srcMeta, File dstMeta,
       File blockFile, int smallBufferSize, final Configuration conf)
       throws IOException {
-    final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta,
-        DFSUtilClient.getIoFileBufferSize(conf));
+    DataChecksum checksum;
+
+    try (FileInputStream fis = new FileInputStream(srcMeta)) {
+      checksum = BlockMetadataHeader.readDataChecksum(fis,
+          DFSUtilClient.getIoFileBufferSize(conf), srcMeta);
+    }
+
     final byte[] data = new byte[1 << 16];
     final byte[] crcs = new byte[checksum.getChecksumSize(data.length)];
 
@@ -1051,7 +1066,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
       int offset = 0;
       try (InputStream dataIn = isNativeIOAvailable ?
-          NativeIO.getShareDeleteFileInputStream(blockFile) :
+          new FileInputStream(NativeIO.getShareDeleteFileDescriptor(
+              blockFile, 0)) :
           new FileInputStream(blockFile)) {
 
         for (int n; (n = dataIn.read(data, offset, data.length - offset)) != -1; ) {
@@ -1078,7 +1094,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
   }
 
-  static private void truncateBlock(File blockFile, File metaFile,
+  private void truncateBlock(FsVolumeSpi volume, File blockFile, File metaFile,
       long oldlen, long newlen) throws IOException {
     LOG.info("truncateBlock: blockFile=" + blockFile
         + ", metaFile=" + metaFile
@@ -1093,7 +1109,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           + ") to newlen (=" + newlen + ")");
     }
 
-    DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum();
+    final FileIoProvider fileIoProvider = datanode.getFileIoProvider();
+    DataChecksum dcs;
+    try (FileInputStream fis = fileIoProvider.getFileInputStream(
+        volume, metaFile)) {
+      dcs = BlockMetadataHeader.readHeader(fis).getChecksum();
+    }
+
     int checksumsize = dcs.getChecksumSize();
     int bpc = dcs.getBytesPerChecksum();
     long n = (newlen - 1)/bpc + 1;
@@ -1102,16 +1124,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     int lastchunksize = (int)(newlen - lastchunkoffset);
     byte[] b = new byte[Math.max(lastchunksize, checksumsize)];
 
-    RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
-    try {
+
+    try (final RandomAccessFile blockRAF = fileIoProvider.getRandomAccessFile(
+        volume, blockFile, "rw")) {
       //truncate blockFile
       blockRAF.setLength(newlen);
 
       //read last chunk
       blockRAF.seek(lastchunkoffset);
       blockRAF.readFully(b, 0, lastchunksize);
-    } finally {
-      blockRAF.close();
     }
 
     //compute checksum
@@ -1119,13 +1140,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     dcs.writeValue(b, 0, false);
 
     //update metaFile
-    RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
-    try {
+    try (final RandomAccessFile metaRAF = fileIoProvider.getRandomAccessFile(
+        volume, metaFile, "rw")) {
       metaRAF.setLength(newmetalen);
       metaRAF.seek(newmetalen - checksumsize);
       metaRAF.write(b, 0, checksumsize);
-    } finally {
-      metaRAF.close();
     }
   }
 
@@ -1220,7 +1239,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         LOG.debug("Renaming " + oldmeta + " to " + newmeta);
       }
       try {
-        NativeIO.renameTo(oldmeta, newmeta);
+        datanode.getFileIoProvider().renameTo(
+            replicaInfo.getVolume(), oldmeta, newmeta);
       } catch (IOException e) {
         throw new IOException("Block " + replicaInfo + " reopen failed. " +
             " Unable to move meta file  " + oldmeta +
@@ -1233,10 +1253,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
             + ", file length=" + blkfile.length());
       }
       try {
-        NativeIO.renameTo(blkfile, newBlkFile);
+        datanode.getFileIoProvider().renameTo(
+            replicaInfo.getVolume(), blkfile, newBlkFile);
       } catch (IOException e) {
         try {
-          NativeIO.renameTo(newmeta, oldmeta);
+          datanode.getFileIoProvider().renameTo(
+              replicaInfo.getVolume(), newmeta, oldmeta);
         } catch (IOException ex) {
           LOG.warn("Cannot move meta file " + newmeta +
               "back to the finalized directory " + oldmeta, ex);
@@ -1389,7 +1411,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       LOG.debug("Renaming " + oldmeta + " to " + newmeta);
     }
     try {
-      NativeIO.renameTo(oldmeta, newmeta);
+      datanode.getFileIoProvider().renameTo(
+          replicaInfo.getVolume(), oldmeta, newmeta);
     } catch (IOException e) {
       replicaInfo.setGenerationStamp(oldGS); // restore old GS
       throw new IOException("Block " + replicaInfo + " reopen failed. " +
@@ -1520,7 +1543,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         // any corrupt data written after the acked length can go unnoticed.
         if (numBytes > bytesAcked) {
           final File replicafile = rbw.getBlockFile();
-          truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked);
+          truncateBlock(
+              rbw.getVolume(), replicafile, rbw.getMetaFile(),
+              numBytes, bytesAcked);
           rbw.setNumBytes(bytesAcked);
           rbw.setLastChecksumAndDataLen(bytesAcked, null);
         }
@@ -1585,8 +1610,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
       // move block files to the rbw directory
       BlockPoolSlice bpslice = v.getBlockPoolSlice(b.getBlockPoolId());
-      final File dest = moveBlockFiles(b.getLocalBlock(), temp.getBlockFile(),
-          bpslice.getRbwDir());
+      final File dest = moveBlockFiles(
+          v, b.getLocalBlock(), temp.getBlockFile(), bpslice.getRbwDir());
       // create RBW
       final ReplicaBeingWritten rbw = new ReplicaBeingWritten(
           blockId, numBytes, expectedGs,
@@ -2318,16 +2343,21 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         return;
       }
 
-      final long diskGS = diskMetaFile != null && diskMetaFile.exists() ?
+      final FileIoProvider fileIoProvider = datanode.getFileIoProvider();
+      final boolean diskMetaFileExists = diskMetaFile != null &&
+          fileIoProvider.exists(vol, diskMetaFile);
+      final boolean diskFileExists = diskFile != null &&
+          fileIoProvider.exists(vol, diskFile);
+
+      final long diskGS = diskMetaFileExists ?
           Block.getGenerationStamp(diskMetaFile.getName()) :
-            HdfsConstants.GRANDFATHER_GENERATION_STAMP;
+          HdfsConstants.GRANDFATHER_GENERATION_STAMP;
 
-      if (diskFile == null || !diskFile.exists()) {
+      if (!diskFileExists) {
         if (memBlockInfo == null) {
           // Block file does not exist and block does not exist in memory
           // If metadata file exists then delete it
-          if (diskMetaFile != null && diskMetaFile.exists()
-              && diskMetaFile.delete()) {
+          if (diskMetaFileExists && fileIoProvider.delete(vol, diskMetaFile)) {
             LOG.warn("Deleted a metadata file without a block "
                 + diskMetaFile.getAbsolutePath());
           }
@@ -2343,8 +2373,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           LOG.warn("Removed block " + blockId
               + " from memory with missing block file on the disk");
           // Finally remove the metadata file
-          if (diskMetaFile != null && diskMetaFile.exists()
-              && diskMetaFile.delete()) {
+          if (diskMetaFileExists && fileIoProvider.delete(vol, diskMetaFile)) {
             LOG.warn("Deleted a metadata file for the deleted block "
                 + diskMetaFile.getAbsolutePath());
           }
@@ -2374,10 +2403,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
        */
       // Compare block files
       File memFile = memBlockInfo.getBlockFile();
-      if (memFile.exists()) {
+      final boolean memFileExists = memFile != null &&
+          fileIoProvider.exists(vol, memFile);
+      if (memFileExists) {
         if (memFile.compareTo(diskFile) != 0) {
           if (diskMetaFile.exists()) {
-            if (memBlockInfo.getMetaFile().exists()) {
+            if (fileIoProvider.exists(vol, memBlockInfo.getMetaFile())) {
               // We have two sets of block+meta files. Decide which one to
               // keep.
               ReplicaInfo diskBlockInfo = new FinalizedReplica(
@@ -2386,7 +2417,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
                   memBlockInfo, diskBlockInfo, volumeMap);
             }
           } else {
-            if (!diskFile.delete()) {
+            if (!fileIoProvider.delete(vol, diskFile)) {
               LOG.warn("Failed to delete " + diskFile + ". Will retry on next scan");
             }
           }
@@ -2410,9 +2441,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
       // Compare generation stamp
       if (memBlockInfo.getGenerationStamp() != diskGS) {
-        File memMetaFile = FsDatasetUtil.getMetaFile(diskFile, 
+        File memMetaFile = FsDatasetUtil.getMetaFile(diskFile,
             memBlockInfo.getGenerationStamp());
-        if (memMetaFile.exists()) {
+        if (fileIoProvider.exists(vol, memMetaFile)) {
           if (memMetaFile.compareTo(diskMetaFile) != 0) {
             LOG.warn("Metadata file in memory "
                 + memMetaFile.getAbsolutePath()
@@ -2663,7 +2694,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
     if (rur.getNumBytes() > newlength) {
       rur.breakHardLinksIfNeeded();
-      truncateBlock(blockFile, metaFile, rur.getNumBytes(), newlength);
+      truncateBlock(
+          rur.getVolume(), blockFile, metaFile,
+          rur.getNumBytes(), newlength);
       if(!copyOnTruncate) {
         // update RUR with the new length
         rur.setNumBytes(newlength);

+ 18 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java

@@ -18,8 +18,10 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.io.File;
+import java.io.FileDescriptor;
 import java.io.FilenameFilter;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.util.Arrays;
 
 import com.google.common.base.Preconditions;
@@ -28,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+import org.apache.hadoop.io.IOUtils;
 
 /** Utility methods. */
 @InterfaceAudience.Private
@@ -73,6 +76,21 @@ public class FsDatasetUtil {
     return matches[0];
   }
 
+  public static FileDescriptor openAndSeek(File file, long offset)
+      throws IOException {
+    RandomAccessFile raf = null;
+    try {
+      raf = new RandomAccessFile(file, "r");
+      if (offset > 0) {
+        raf.seek(offset);
+      }
+      return raf.getFD();
+    } catch(IOException ioe) {
+      IOUtils.cleanup(null, raf);
+      throw ioe;
+    }
+  }
+
   /**
    * Find the meta-file for the specified block file
    * and then return the generation stamp from the name of the meta-file.

+ 50 - 35
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java

@@ -19,13 +19,12 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.io.BufferedWriter;
 import java.io.File;
-import java.io.FileOutputStream;
+import java.io.FileInputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.RandomAccessFile;
 import java.nio.channels.ClosedChannelException;
-import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.util.Collections;
@@ -44,8 +43,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.DF;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -60,7 +59,6 @@ import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.CloseableReferenceCount;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.StringUtils;
@@ -111,6 +109,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
   // limit the visible capacity for tests. If negative, then we just
   // query from the filesystem.
   protected volatile long configuredCapacity;
+  private final FileIoProvider fileIoProvider;
 
   /**
    * Per-volume worker pool that processes new blocks to cache.
@@ -134,6 +133,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
     this.usage = new DF(parent, conf);
     this.storageType = storageType;
     this.configuredCapacity = -1;
+    // dataset.datanode may be null in some tests.
+    this.fileIoProvider = dataset.datanode != null ?
+        dataset.datanode.getFileIoProvider() : new FileIoProvider(conf);
     cacheExecutor = initializeCacheExecutor(parent);
   }
 
@@ -622,8 +624,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
      */
     private String getNextSubDir(String prev, File dir)
           throws IOException {
-      List<String> children =
-          IOUtils.listDirectory(dir, SubdirFilter.INSTANCE);
+      List<String> children = fileIoProvider.listDirectory(
+          FsVolumeImpl.this, dir, SubdirFilter.INSTANCE);
       cache = null;
       cacheMs = 0;
       if (children.size() == 0) {
@@ -676,8 +678,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
       }
       File dir = Paths.get(bpidDir.getAbsolutePath(), "current", "finalized",
                     state.curFinalizedDir, state.curFinalizedSubDir).toFile();
-      List<String> entries =
-          IOUtils.listDirectory(dir, BlockFileFilter.INSTANCE);
+      List<String> entries = fileIoProvider.listDirectory(
+          FsVolumeImpl.this, dir, BlockFileFilter.INSTANCE);
       if (entries.size() == 0) {
         entries = null;
       } else {
@@ -784,19 +786,18 @@ public class FsVolumeImpl implements FsVolumeSpi {
     public void save() throws IOException {
       state.lastSavedMs = Time.now();
       boolean success = false;
-      try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
-                new FileOutputStream(getTempSaveFile(), false), "UTF-8"))) {
+      try (BufferedWriter writer = new BufferedWriter(
+          new OutputStreamWriter(fileIoProvider.getFileOutputStream(
+              FsVolumeImpl.this, getTempSaveFile()), "UTF-8"))) {
         WRITER.writeValue(writer, state);
         success = true;
       } finally {
         if (!success) {
-          if (getTempSaveFile().delete()) {
-            LOG.debug("save({}, {}): error deleting temporary file.",
-                storageID, bpid);
-          }
+          fileIoProvider.delete(FsVolumeImpl.this, getTempSaveFile());
         }
       }
-      Files.move(getTempSaveFile().toPath(), getSaveFile().toPath(),
+      fileIoProvider.move(FsVolumeImpl.this,
+          getTempSaveFile().toPath(), getSaveFile().toPath(),
           StandardCopyOption.ATOMIC_MOVE);
       if (LOG.isTraceEnabled()) {
         LOG.trace("save({}, {}): saved {}", storageID, bpid,
@@ -982,11 +983,12 @@ public class FsVolumeImpl implements FsVolumeSpi {
     File finalizedDir = new File(bpCurrentDir,
         DataStorage.STORAGE_DIR_FINALIZED);
     File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
-    if (finalizedDir.exists() && !DatanodeUtil.dirNoFilesRecursive(
-        finalizedDir)) {
+    if (fileIoProvider.exists(this, finalizedDir) &&
+        !DatanodeUtil.dirNoFilesRecursive(this, finalizedDir, fileIoProvider)) {
       return false;
     }
-    if (rbwDir.exists() && FileUtil.list(rbwDir).length != 0) {
+    if (fileIoProvider.exists(this, rbwDir) &&
+        fileIoProvider.list(this, rbwDir).length != 0) {
       return false;
     }
     return true;
@@ -1007,35 +1009,38 @@ public class FsVolumeImpl implements FsVolumeSpi {
         DataStorage.STORAGE_DIR_LAZY_PERSIST);
     File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
     if (force) {
-      DataStorage.fullyDelete(bpDir);
+      fileIoProvider.fullyDelete(this, bpDir);
     } else {
-      if (!rbwDir.delete()) {
+      if (!fileIoProvider.delete(this, rbwDir)) {
         throw new IOException("Failed to delete " + rbwDir);
       }
-      if (!DatanodeUtil.dirNoFilesRecursive(finalizedDir) ||
-          !FileUtil.fullyDelete(finalizedDir)) {
+      if (!DatanodeUtil.dirNoFilesRecursive(
+              this, finalizedDir, fileIoProvider) ||
+          !fileIoProvider.fullyDelete(
+              this, finalizedDir)) {
         throw new IOException("Failed to delete " + finalizedDir);
       }
       if (lazypersistDir.exists() &&
-        ((!DatanodeUtil.dirNoFilesRecursive(lazypersistDir) ||
-          !FileUtil.fullyDelete(lazypersistDir)))) {
+          ((!DatanodeUtil.dirNoFilesRecursive(
+              this, lazypersistDir, fileIoProvider) ||
+              !fileIoProvider.fullyDelete(this, lazypersistDir)))) {
         throw new IOException("Failed to delete " + lazypersistDir);
       }
-      DataStorage.fullyDelete(tmpDir);
-      for (File f : FileUtil.listFiles(bpCurrentDir)) {
-        if (!f.delete()) {
+      fileIoProvider.fullyDelete(this, tmpDir);
+      for (File f : fileIoProvider.listFiles(this, bpCurrentDir)) {
+        if (!fileIoProvider.delete(this, f)) {
           throw new IOException("Failed to delete " + f);
         }
       }
-      if (!bpCurrentDir.delete()) {
+      if (!fileIoProvider.delete(this, bpCurrentDir)) {
         throw new IOException("Failed to delete " + bpCurrentDir);
       }
-      for (File f : FileUtil.listFiles(bpDir)) {
-        if (!f.delete()) {
+      for (File f : fileIoProvider.listFiles(this, bpDir)) {
+        if (!fileIoProvider.delete(this, f)) {
           throw new IOException("Failed to delete " + f);
         }
       }
-      if (!bpDir.delete()) {
+      if (!fileIoProvider.delete(this, bpDir)) {
         throw new IOException("Failed to delete " + bpDir);
       }
     }
@@ -1059,9 +1064,13 @@ public class FsVolumeImpl implements FsVolumeSpi {
   @Override
   public byte[] loadLastPartialChunkChecksum(
       File blockFile, File metaFile) throws IOException {
-    // readHeader closes the temporary FileInputStream.
-    DataChecksum dcs = BlockMetadataHeader
-        .readHeader(metaFile).getChecksum();
+    DataChecksum dcs;
+
+    try (FileInputStream fis = fileIoProvider.getFileInputStream(
+        this, metaFile)) {
+      dcs = BlockMetadataHeader.readHeader(fis).getChecksum();
+    }
+
     final int checksumSize = dcs.getChecksumSize();
     final long onDiskLen = blockFile.length();
     final int bytesPerChecksum = dcs.getBytesPerChecksum();
@@ -1075,7 +1084,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
     long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
         (onDiskLen / bytesPerChecksum) * checksumSize;
     byte[] lastChecksum = new byte[checksumSize];
-    try (RandomAccessFile raf = new RandomAccessFile(metaFile, "r")) {
+    try (RandomAccessFile raf = fileIoProvider.getRandomAccessFile(
+        this, metaFile, "r")) {
       raf.seek(offsetInChecksum);
       int readBytes = raf.read(lastChecksum, 0, checksumSize);
       if (readBytes == -1) {
@@ -1090,4 +1100,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
     }
     return lastChecksum;
   }
+
+  @Override
+  public FileIoProvider getFileIoProvider() {
+    return fileIoProvider;
+  }
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java

@@ -702,7 +702,7 @@ public class TestFileAppend{
       ReplicaBeingWritten rbw =
           (ReplicaBeingWritten)replicaHandler.getReplica();
       ReplicaOutputStreams
-          outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM, 300);
+          outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM);
       OutputStream dataOutput = outputStreams.getDataOut();
 
       byte[] appendBytes = new byte[1];

+ 17 - 7
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -116,7 +116,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
       DatanodeStorage.State.NORMAL;
 
   private final AutoCloseableLock datasetLock;
-  
+  private final FileIoProvider fileIoProvider;
+
   static final byte[] nullCrcFileData;
   static {
     DataChecksum checksum = DataChecksum.newDataChecksum(
@@ -254,8 +255,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     }
 
     @Override
-    synchronized public ReplicaOutputStreams createStreams(boolean isCreate, 
-        DataChecksum requestedChecksum, long slowLogThresholdMs)
+    synchronized public ReplicaOutputStreams createStreams(boolean isCreate,
+        DataChecksum requestedChecksum)
         throws IOException {
       if (finalized) {
         throw new IOException("Trying to write to a finalized replica "
@@ -263,7 +264,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
       } else {
         SimulatedOutputStream crcStream = new SimulatedOutputStream();
         return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum,
-            volume.isTransientStorage(), slowLogThresholdMs);
+            volume, fileIoProvider);
       }
     }
 
@@ -444,12 +445,15 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
           map.get(bpid).getUsed(), 0L);
     }
   }
-  
+
   static class SimulatedVolume implements FsVolumeSpi {
     private final SimulatedStorage storage;
+    private final FileIoProvider fileIoProvider;
 
-    SimulatedVolume(final SimulatedStorage storage) {
+    SimulatedVolume(final SimulatedStorage storage,
+                    final FileIoProvider fileIoProvider) {
       this.storage = storage;
+      this.fileIoProvider = fileIoProvider;
     }
 
     @Override
@@ -531,6 +535,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
       return null;
     }
 
+    @Override
+    public FileIoProvider getFileIoProvider() {
+      return fileIoProvider;
+    }
+
     @Override
     public VolumeCheckResult check(VolumeCheckContext context)
         throws Exception {
@@ -562,10 +571,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     }
 
     registerMBean(datanodeUuid);
+    this.fileIoProvider = new FileIoProvider(conf);
     this.storage = new SimulatedStorage(
         conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
         conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));
-    this.volume = new SimulatedVolume(this.storage);
+    this.volume = new SimulatedVolume(this.storage, this.fileIoProvider);
     this.datasetLock = new AutoCloseableLock();
   }
 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java

@@ -642,7 +642,7 @@ public class TestBlockRecovery {
     ReplicaOutputStreams streams = null;
     try {
       streams = replicaInfo.createStreams(true,
-          DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300);
+          DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
       streams.getChecksumOut().write('a');
       dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1));
       BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java

@@ -886,6 +886,11 @@ public class TestDirectoryScanner {
       return null;
     }
 
+    @Override
+    public FileIoProvider getFileIoProvider() {
+      return null;
+    }
+
     @Override
     public VolumeCheckResult check(VolumeCheckContext context)
         throws Exception {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java

@@ -83,7 +83,7 @@ public class TestSimulatedFSDataset {
       ReplicaInPipelineInterface bInfo = fsdataset.createRbw(
           StorageType.DEFAULT, b, false).getReplica();
       ReplicaOutputStreams out = bInfo.createStreams(true,
-          DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300);
+          DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
       try {
         OutputStream dataOut  = out.getDataOut();
         assertEquals(0, fsdataset.getLength(b));

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java

@@ -133,7 +133,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   @Override
   public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
       long ckoff) throws IOException {
-    return new ReplicaInputStreams(null, null, null);
+    return new ReplicaInputStreams(null, null, null, null);
   }
 
   @Override

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java

@@ -57,10 +57,10 @@ public class ExternalReplicaInPipeline implements ReplicaInPipelineInterface {
 
   @Override
   public ReplicaOutputStreams createStreams(boolean isCreate,
-      DataChecksum requestedChecksum, long slowLogThresholdMs)
+      DataChecksum requestedChecksum)
       throws IOException {
-    return new ReplicaOutputStreams(null, null, requestedChecksum, false,
-        slowLogThresholdMs);
+    return new ReplicaOutputStreams(null, null, requestedChecksum,
+        null, null);
   }
 
   @Override

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java

@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.nio.channels.ClosedChannelException;
 
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
 import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
@@ -109,6 +110,11 @@ public class ExternalVolumeImpl implements FsVolumeSpi {
     return null;
   }
 
+  @Override
+  public FileIoProvider getFileIoProvider() {
+    return null;
+  }
+
   @Override
   public VolumeCheckResult check(VolumeCheckContext context)
       throws Exception {

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java

@@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.server.datanode.DNConf;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
@@ -164,6 +165,8 @@ public class TestFsDatasetImpl {
     this.conf = new Configuration();
     this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0);
 
+    final FileIoProvider fileIoProvider = new FileIoProvider(conf);
+    when(datanode.getFileIoProvider()).thenReturn(fileIoProvider);
     when(datanode.getConf()).thenReturn(conf);
     final DNConf dnConf = new DNConf(datanode);
     when(datanode.getDnConf()).thenReturn(dnConf);

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java

@@ -103,6 +103,8 @@ public class TestHdfsConfigFields extends TestConfigurationFieldsBase {
         .add(DFSConfigKeys.DFS_DATANODE_STARTUP_KEY);
     configurationPropsToSkipCompare
         .add(DFSConfigKeys.DFS_NAMENODE_STARTUP_KEY);
+    configurationPropsToSkipCompare
+        .add(DFSConfigKeys.DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY);
 
     // Allocate
     xmlPropsToSkipCompare = new HashSet<String>();