瀏覽代碼

merge -c 1442312 from trunk to branch-2 to fix HADOOP-9276. Allow BoundedByteArrayOutputStream to be resettable. Contributed by Arun Murthy

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1442316 13f79535-47bb-0310-9956-ffa450edef68
Hitesh Shah 12 年之前
父節點
當前提交
ca765ecb05

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -135,6 +135,9 @@ Release 2.0.3-alpha - Unreleased
     HADOOP-9247. Parametrize Clover "generateXxx" properties to make them
     re-definable via -D in mvn calls. (Ivan A. Veselovsky via suresh)
 
+    HADOOP-9276. Allow BoundedByteArrayOutputStream to be resettable.
+    (Arun Murthy via hitesh)
+
   OPTIMIZATIONS
 
     HADOOP-8866. SampleQuantiles#query is O(N^2) instead of O(N). (Andrew Wang

+ 31 - 16
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/BoundedByteArrayOutputStream.java

@@ -32,9 +32,10 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
 @InterfaceStability.Unstable
 public class BoundedByteArrayOutputStream extends OutputStream {
-  private final byte[] buffer;
+  private byte[] buffer;
+  private int startOffset;
   private int limit;
-  private int count;
+  private int currentPointer;
 
   /**
    * Create a BoundedByteArrayOutputStream with the specified
@@ -52,20 +53,30 @@ public class BoundedByteArrayOutputStream extends OutputStream {
    * @param limit The maximum limit upto which data can be written
    */
   public BoundedByteArrayOutputStream(int capacity, int limit) {
+    this(new byte[capacity], 0, limit);
+  }
+
+  protected BoundedByteArrayOutputStream(byte[] buf, int offset, int limit) {
+    resetBuffer(buf, offset, limit);
+  }
+  
+  protected void resetBuffer(byte[] buf, int offset, int limit) {
+    int capacity = buf.length - offset;
     if ((capacity < limit) || (capacity | limit) < 0) {
       throw new IllegalArgumentException("Invalid capacity/limit");
     }
-    this.buffer = new byte[capacity];
-    this.limit = limit;
-    this.count = 0;
+    this.buffer = buf;
+    this.startOffset = offset;
+    this.currentPointer = offset;
+    this.limit = offset + limit;
   }
-
+  
   @Override
   public void write(int b) throws IOException {
-    if (count >= limit) {
+    if (currentPointer >= limit) {
       throw new EOFException("Reaching the limit of the buffer.");
     }
-    buffer[count++] = (byte) b;
+    buffer[currentPointer++] = (byte) b;
   }
 
   @Override
@@ -77,12 +88,12 @@ public class BoundedByteArrayOutputStream extends OutputStream {
       return;
     }
 
-    if (count + len > limit) {
+    if (currentPointer + len > limit) {
       throw new EOFException("Reach the limit of the buffer");
     }
 
-    System.arraycopy(b, off, buffer, count, len);
-    count += len;
+    System.arraycopy(b, off, buffer, currentPointer, len);
+    currentPointer += len;
   }
 
   /**
@@ -90,17 +101,17 @@ public class BoundedByteArrayOutputStream extends OutputStream {
    * @param newlim New Limit
    */
   public void reset(int newlim) {
-    if (newlim > buffer.length) {
+    if (newlim > (buffer.length - startOffset)) {
       throw new IndexOutOfBoundsException("Limit exceeds buffer size");
     }
     this.limit = newlim;
-    this.count = 0;
+    this.currentPointer = startOffset;
   }
 
   /** Reset the buffer */
   public void reset() {
-    this.limit = buffer.length;
-    this.count = 0;
+    this.limit = buffer.length - startOffset;
+    this.currentPointer = startOffset;
   }
 
   /** Return the current limit */
@@ -119,6 +130,10 @@ public class BoundedByteArrayOutputStream extends OutputStream {
    * currently in the buffer.
    */
   public int size() {
-    return count;
+    return currentPointer - startOffset;
+  }
+  
+  public int available() {
+    return limit - currentPointer;
   }
 }

+ 57 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestBoundedByteArrayOutputStream.java

@@ -88,4 +88,61 @@ public class TestBoundedByteArrayOutputStream extends TestCase {
     assertTrue("Writing beyond limit did not throw an exception",
         caughtException);
   }
+  
+  
+  static class ResettableBoundedByteArrayOutputStream 
+  extends BoundedByteArrayOutputStream {
+
+    public ResettableBoundedByteArrayOutputStream(int capacity) {
+      super(capacity);
+    }
+
+    public void resetBuffer(byte[] buf, int offset, int length) {
+      super.resetBuffer(buf, offset, length);
+    }
+    
+  }
+  
+  public void testResetBuffer() throws IOException {
+    
+    ResettableBoundedByteArrayOutputStream stream = 
+      new ResettableBoundedByteArrayOutputStream(SIZE);
+
+    // Write to the stream, get the data back and check for contents
+    stream.write(INPUT, 0, SIZE);
+    assertTrue("Array Contents Mismatch",
+        Arrays.equals(INPUT, stream.getBuffer()));
+    
+    // Try writing beyond end of buffer. Should throw an exception
+    boolean caughtException = false;
+    
+    try {
+      stream.write(INPUT[0]);
+    } catch (Exception e) {
+      caughtException = true;
+    }
+    
+    assertTrue("Writing beyond limit did not throw an exception",
+        caughtException);
+    
+    //Reset the stream and try, should succeed
+    byte[] newBuf = new byte[SIZE];
+    stream.resetBuffer(newBuf, 0, newBuf.length);
+    assertTrue("Limit did not get reset correctly", 
+        (stream.getLimit() == SIZE));
+    stream.write(INPUT, 0, SIZE);
+    assertTrue("Array Contents Mismatch",
+        Arrays.equals(INPUT, stream.getBuffer()));
+  
+    // Try writing one more byte, should fail
+    caughtException = false;
+    try {
+      stream.write(INPUT[0]);
+    } catch (Exception e) {
+      caughtException = true;
+    }
+    assertTrue("Writing beyond limit did not throw an exception",
+        caughtException);
+  }
+
 }