瀏覽代碼

HDFS-4971. Merge change r1525688 from branch-2.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.1-beta@1525689 13f79535-47bb-0310-9956-ffa450edef68
Jing Zhao 11 年之前
父節點
當前提交
f1f2fbe403

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java

@@ -97,7 +97,7 @@ public class AsyncDataService {
   void writeAsync(OpenFileCtx openFileCtx) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Scheduling write back task for fileId: "
-          + openFileCtx.copyLatestAttr().getFileId());
+          + openFileCtx.getLatestAttr().getFileId());
     }
     WriteBackTask wbTask = new WriteBackTask(openFileCtx);
     execute(wbTask);
@@ -125,7 +125,7 @@ public class AsyncDataService {
     public String toString() {
       // Called in AsyncDataService.execute for displaying error messages.
       return "write back data for fileId"
-          + openFileCtx.copyLatestAttr().getFileId() + " with nextOffset "
+          + openFileCtx.getLatestAttr().getFileId() + " with nextOffset "
           + openFileCtx.getNextOffset();
     }
 

+ 24 - 23
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java

@@ -17,19 +17,34 @@
  */
 package org.apache.hadoop.hdfs.nfs.nfs3;
 
+import java.util.Comparator;
+
+import com.google.common.base.Preconditions;
+
 /**
  * OffsetRange is the range of read/write request. A single point (e.g.,[5,5])
  * is not a valid range.
  */
-public class OffsetRange implements Comparable<OffsetRange> {
+public class OffsetRange {
+  
+  public static final Comparator<OffsetRange> ReverseComparatorOnMin = 
+      new Comparator<OffsetRange>() {
+    @Override
+    public int compare(OffsetRange o1, OffsetRange o2) {
+      if (o1.getMin() == o2.getMin()) {
+        return o1.getMax() < o2.getMax() ? 
+            1 : (o1.getMax() > o2.getMax() ? -1 : 0);
+      } else {
+        return o1.getMin() < o2.getMin() ? 1 : -1;
+      }
+    }
+  };
+  
   private final long min;
   private final long max;
 
   OffsetRange(long min, long max) {
-    if ((min >= max) || (min < 0) || (max < 0)) {
-      throw new IllegalArgumentException("Wrong offset range: (" + min + ","
-          + max + ")");
-    }
+    Preconditions.checkArgument(min >= 0 && max >= 0 && min < max);
     this.min = min;
     this.max = max;
   }
@@ -49,24 +64,10 @@ public class OffsetRange implements Comparable<OffsetRange> {
 
   @Override
   public boolean equals(Object o) {
-    assert (o instanceof OffsetRange);
-    OffsetRange range = (OffsetRange) o;
-    return (min == range.getMin()) && (max == range.getMax());
-  }
-
-  private static int compareTo(long left, long right) {
-    if (left < right) {
-      return -1;
-    } else if (left > right) {
-      return 1;
-    } else {
-      return 0;
+    if (o instanceof OffsetRange) {
+      OffsetRange range = (OffsetRange) o;
+      return (min == range.getMin()) && (max == range.getMax());
     }
-  }
-
-  @Override
-  public int compareTo(OffsetRange other) {
-    final int d = compareTo(min, other.getMin());
-    return d != 0 ? d : compareTo(max, other.getMax());
+    return false;
   }
 }

File diff suppressed because it is too large
+ 417 - 366
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java


+ 55 - 29
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java

@@ -27,6 +27,8 @@ import org.apache.hadoop.nfs.nfs3.FileHandle;
 import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
 import org.jboss.netty.channel.Channel;
 
+import com.google.common.base.Preconditions;
+
 /**
  * WriteCtx saves the context of one write request, such as request, channel,
  * xid and reply status.
@@ -49,13 +51,21 @@ class WriteCtx {
   private final long offset;
   private final int count;
   private final WriteStableHow stableHow;
-  private byte[] data;
+  private volatile byte[] data;
   
   private final Channel channel;
   private final int xid;
   private boolean replied;
 
-  private DataState dataState;
+  /** 
+   * Data belonging to the same {@link OpenFileCtx} may be dumped to a file. 
+   * After being dumped to the file, the corresponding {@link WriteCtx} records 
+   * the dump file and the offset.  
+   */
+  private RandomAccessFile raf;
+  private long dumpFileOffset;
+  
+  private volatile DataState dataState;
 
   public DataState getDataState() {
     return dataState;
@@ -64,12 +74,13 @@ class WriteCtx {
   public void setDataState(DataState dataState) {
     this.dataState = dataState;
   }
-
-  private RandomAccessFile raf;
-  private long dumpFileOffset;
   
-  // Return the dumped data size
-  public long dumpData(FileOutputStream dumpOut, RandomAccessFile raf)
+  /** 
+   * Writing the data into a local file. After the writing, if 
+   * {@link #dataState} is still ALLOW_DUMP, set {@link #data} to null and set 
+   * {@link #dataState} to DUMPED.
+   */
+  long dumpData(FileOutputStream dumpOut, RandomAccessFile raf)
       throws IOException {
     if (dataState != DataState.ALLOW_DUMP) {
       if (LOG.isTraceEnabled()) {
@@ -84,48 +95,63 @@ class WriteCtx {
     if (LOG.isDebugEnabled()) {
       LOG.debug("After dump, new dumpFileOffset:" + dumpFileOffset);
     }
-    data = null;
-    dataState = DataState.DUMPED;
-    return count;
+    // it is possible that while we dump the data, the data is also being
+    // written back to HDFS. After dump, if the writing back has not finished
+    // yet, we change its flag to DUMPED and set the data to null. Otherwise
+    // this WriteCtx instance should have been removed from the buffer.
+    if (dataState == DataState.ALLOW_DUMP) {
+      synchronized (this) {
+        if (dataState == DataState.ALLOW_DUMP) {
+          data = null;
+          dataState = DataState.DUMPED;
+          return count;
+        }
+      }
+    }
+    return 0;
   }
 
-  public FileHandle getHandle() {
+  FileHandle getHandle() {
     return handle;
   }
   
-  public long getOffset() {
+  long getOffset() {
     return offset;
   }
 
-  public int getCount() {
+  int getCount() {
     return count;
   }
 
-  public WriteStableHow getStableHow() {
+  WriteStableHow getStableHow() {
     return stableHow;
   }
 
-  public byte[] getData() throws IOException {
+  byte[] getData() throws IOException {
     if (dataState != DataState.DUMPED) {
-      if (data == null) {
-        throw new IOException("Data is not dumpted but has null:" + this);
-      }
-    } else {
-      // read back
-      if (data != null) {
-        throw new IOException("Data is dumpted but not null");
-      }
-      data = new byte[count];
-      raf.seek(dumpFileOffset);
-      int size = raf.read(data, 0, count);
-      if (size != count) {
-        throw new IOException("Data count is " + count + ", but read back "
-            + size + "bytes");
+      synchronized (this) {
+        if (dataState != DataState.DUMPED) {
+          Preconditions.checkState(data != null);
+          return data;
+        }
       }
     }
+    // read back from dumped file
+    this.loadData();
     return data;
   }
 
+  private void loadData() throws IOException {
+    Preconditions.checkState(data == null);
+    data = new byte[count];
+    raf.seek(dumpFileOffset);
+    int size = raf.read(data, 0, count);
+    if (size != count) {
+      throw new IOException("Data count is " + count + ", but read back "
+          + size + "bytes");
+    }
+  }
+
   Channel getChannel() {
     return channel;
   }

+ 6 - 2
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java

@@ -67,8 +67,8 @@ public class WriteManager {
    */
   private long streamTimeout;
   
-  public static final long DEFAULT_STREAM_TIMEOUT = 10 * 1000; // 10 second
-  public static final long MINIMIUM_STREAM_TIMEOUT = 1 * 1000; // 1 second
+  public static final long DEFAULT_STREAM_TIMEOUT = 10 * 60 * 1000; //10 minutes
+  public static final long MINIMIUM_STREAM_TIMEOUT = 10 * 1000; //10 seconds
   
   void addOpenFileStream(FileHandle h, OpenFileCtx ctx) {
     openFileMap.put(h, ctx);
@@ -215,6 +215,10 @@ public class WriteManager {
         LOG.info("Inactive stream, fileId=" + fileHandle.getFileId()
             + " commitOffset=" + commitOffset);
         return true;
+      } else if (ret == OpenFileCtx.COMMIT_INACTIVE_WITH_PENDING_WRITE) {
+        LOG.info("Inactive stream with pending writes, fileId="
+            + fileHandle.getFileId() + " commitOffset=" + commitOffset);
+        return false;
       }
       assert (ret == OpenFileCtx.COMMIT_WAIT || ret == OpenFileCtx.COMMIT_ERROR);
       if (ret == OpenFileCtx.COMMIT_ERROR) {

+ 5 - 3
hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.nfs.nfs3;
 
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
 
@@ -51,8 +52,9 @@ public class TestOffsetRange {
     OffsetRange r3 = new OffsetRange(1, 3);
     OffsetRange r4 = new OffsetRange(3, 4);
 
-    assertTrue(r2.compareTo(r3) == 0);
-    assertTrue(r2.compareTo(r1) == 1);
-    assertTrue(r2.compareTo(r4) == -1);
+    assertEquals(0, OffsetRange.ReverseComparatorOnMin.compare(r2, r3));
+    assertEquals(0, OffsetRange.ReverseComparatorOnMin.compare(r2, r2));
+    assertTrue(OffsetRange.ReverseComparatorOnMin.compare(r2, r1) < 0);
+    assertTrue(OffsetRange.ReverseComparatorOnMin.compare(r2, r4) > 0);
   }
 }

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -98,6 +98,9 @@ Release 2.1.1-beta - 2013-09-23
     HDFS-5212. Refactor RpcMessage and NFS3Response to support different 
     types of authentication information. (jing9)
 
+    HDFS-4971. Move IO operations out of locking in OpenFileCtx. (brandonli and
+    jing9)
+
   OPTIMIZATIONS
 
   BUG FIXES

Some files were not shown because too many files changed in this diff