Переглянути джерело

HDFS-4297. Fix issues related to datanode concurrent reading and writing on Windows. Contributed by Arpit Agarwal and Chuan Liu.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-trunk-win@1428606 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 12 роки тому
батько
коміт
e620c86858

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.branch-trunk-win.txt

@@ -8,3 +8,6 @@ branch-trunk-win changes - unreleased
 
   HDFS-4316. branch-trunk-win contains test code accidentally added during 
   work on fixing tests on Windows. (Chris Nauroth via suresh)
+
+  HDFS-4297. Fix issues related to datanode concurrent reading and writing on
+  Windows. (Arpit Agarwal, Chuan Liu via suresh)

+ 30 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -41,6 +41,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -75,6 +76,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -90,6 +92,15 @@ import org.apache.hadoop.util.Time;
 @InterfaceAudience.Private
 class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   static final Log LOG = LogFactory.getLog(FsDatasetImpl.class);
+  private final static boolean isNativeIOAvailable;
+  static {
+    isNativeIOAvailable = NativeIO.isAvailable();
+    if (Path.WINDOWS && !isNativeIOAvailable) {
+      LOG.warn("Data node cannot fully support concurrent reading"
+          + " and writing without native code extensions on Windows.");
+    }
+  }
+
 
   @Override // FsDatasetSpi
   public List<FsVolumeImpl> getVolumes() {
@@ -147,6 +158,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     if (meta == null || !meta.exists()) {
       return null;
     }
+    if (isNativeIOAvailable) {
+      return new LengthInputStream(
+          NativeIO.getShareDeleteFileInputStream(meta),
+          meta.length());
+    }
     return new LengthInputStream(new FileInputStream(meta), meta.length());
   }
     
@@ -322,18 +338,22 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   public InputStream getBlockInputStream(ExtendedBlock b,
       long seekOffset) throws IOException {
     File blockFile = getBlockFileNoExistsCheck(b);
-    RandomAccessFile blockInFile;
-    try {
-      blockInFile = new RandomAccessFile(blockFile, "r");
-    } catch (FileNotFoundException fnfe) {
-      throw new IOException("Block " + b + " is not valid. " +
-          "Expected block file at " + blockFile + " does not exist.");
-    }
+    if (isNativeIOAvailable) {
+      return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset);
+    } else {
+      RandomAccessFile blockInFile;
+      try {
+        blockInFile = new RandomAccessFile(blockFile, "r");
+      } catch (FileNotFoundException fnfe) {
+        throw new IOException("Block " + b + " is not valid. " +
+            "Expected block file at " + blockFile + " does not exist.");
+      }
 
-    if (seekOffset > 0) {
-      blockInFile.seek(seekOffset);
+      if (seekOffset > 0) {
+        blockInFile.seek(seekOffset);
+      }
+      return new FileInputStream(blockInFile.getFD());
     }
-    return new FileInputStream(blockInFile.getFD());
   }
 
   /**

+ 14 - 15
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileConcurrentReader.java

@@ -338,33 +338,33 @@ public class TestFileConcurrentReader {
     final AtomicBoolean writerDone = new AtomicBoolean(false);
     final AtomicBoolean writerStarted = new AtomicBoolean(false);
     final AtomicBoolean error = new AtomicBoolean(false);
-    final FSDataOutputStream initialOutputStream = fileSystem.create(file);
-    final Thread writer = new Thread(new Runnable() {
-      private FSDataOutputStream outputStream = initialOutputStream;
 
+    final Thread writer = new Thread(new Runnable() {
       @Override
       public void run() {
         try {
-          for (int i = 0; !error.get() && i < numWrites; i++) {
-            try {
+          FSDataOutputStream outputStream = fileSystem.create(file);
+          if (syncType == SyncType.APPEND) {
+            outputStream.close();
+            outputStream = fileSystem.append(file);
+          }
+          try {
+            for (int i = 0; !error.get() && i < numWrites; i++) {
               final byte[] writeBuf =
-                DFSTestUtil.generateSequentialBytes(i * writeSize, writeSize);
+                  DFSTestUtil.generateSequentialBytes(i * writeSize, writeSize);
               outputStream.write(writeBuf);
               if (syncType == SyncType.SYNC) {
                 outputStream.hflush();
-              } else { // append
-                outputStream.close();
-                outputStream = fileSystem.append(file);
               }
               writerStarted.set(true);
-            } catch (IOException e) {
-              error.set(true);
-              LOG.error("error writing to file", e);
             }
+          } catch (IOException e) {
+            error.set(true);
+            LOG.error("error writing to file", e);
+          } finally {
+            outputStream.close();
           }
-
           writerDone.set(true);
-          outputStream.close();
         } catch (Exception e) {
           LOG.error("error in writer", e);
 
@@ -415,7 +415,6 @@ public class TestFileConcurrentReader {
 
       Thread.currentThread().interrupt();
     }
-    initialOutputStream.close();
   }
 
   private boolean validateSequentialBytes(byte[] buf, int startPos, int len) {