Explorar o código

HDFS-2212. Refactor double-buffering code out of EditLogOutputStreams. Contributed by Todd Lipcon

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1151736 13f79535-47bb-0310-9956-ffa450edef68
Eli Collins %!s(int64=13) %!d(string=hai) anos
pai
achega
44320eed17

+ 3 - 0
hdfs/CHANGES.txt

@@ -616,6 +616,9 @@ Trunk (unreleased changes)
     HDFS-2195. Refactor StorageDirectory to not be an non-static inner class.
     HDFS-2195. Refactor StorageDirectory to not be an non-static inner class.
     (todd via eli)
     (todd via eli)
 
 
+    HDFS-2212. Refactor double-buffering code out of EditLogOutputStreams.
+    (todd via eli)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

+ 15 - 55
hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java

@@ -17,18 +17,14 @@
  */
  */
 package org.apache.hadoop.hdfs.server.namenode;
 package org.apache.hadoop.hdfs.server.namenode;
 
 
-import java.io.DataOutputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 
 
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 
 
@@ -46,21 +42,9 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
   private NamenodeProtocol backupNode;          // RPC proxy to backup node
   private NamenodeProtocol backupNode;          // RPC proxy to backup node
   private NamenodeRegistration bnRegistration;  // backup node registration
   private NamenodeRegistration bnRegistration;  // backup node registration
   private NamenodeRegistration nnRegistration;  // active node registration
   private NamenodeRegistration nnRegistration;  // active node registration
-  private ArrayList<BufferedOp> bufCurrent;  // current buffer for writing
-  private ArrayList<BufferedOp> bufReady;    // buffer ready for flushing
+  private EditsDoubleBuffer doubleBuf;
   private DataOutputBuffer out;     // serialized output sent to backup node
   private DataOutputBuffer out;     // serialized output sent to backup node
 
 
-  
-  private static class BufferedOp { 
-    public final FSEditLogOpCodes opCode;
-    public final byte[] bytes;
-
-    public BufferedOp(FSEditLogOpCodes opCode, byte[] bytes) {
-      this.opCode = opCode;
-      this.bytes = bytes;
-    }
-  }
-
   EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node
   EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node
                             NamenodeRegistration nnReg) // active name-node
                             NamenodeRegistration nnReg) // active name-node
   throws IOException {
   throws IOException {
@@ -78,8 +62,7 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
       Storage.LOG.error("Error connecting to: " + bnAddress, e);
       Storage.LOG.error("Error connecting to: " + bnAddress, e);
       throw e;
       throw e;
     }
     }
-    this.bufCurrent = new ArrayList<BufferedOp>();
-    this.bufReady = new ArrayList<BufferedOp>();
+    this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE);
     this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE);
     this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE);
   }
   }
 
 
@@ -95,13 +78,8 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
 
 
   @Override // EditLogOutputStream
   @Override // EditLogOutputStream
   void write(FSEditLogOp op) throws IOException {
   void write(FSEditLogOp op) throws IOException {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    DataOutputStream s = new DataOutputStream(baos);
-    FSEditLogOp.Writer w = new FSEditLogOp.Writer(s);
-    w.writeOp(op);
-
-    bufCurrent.add(new BufferedOp(op.opCode, baos.toByteArray()));
-  }
+    doubleBuf.writeOp(op);
+ }
 
 
   @Override
   @Override
   void writeRaw(byte[] bytes, int offset, int length) throws IOException {
   void writeRaw(byte[] bytes, int offset, int length) throws IOException {
@@ -113,55 +91,37 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
    */
    */
   @Override // EditLogOutputStream
   @Override // EditLogOutputStream
   void create() throws IOException {
   void create() throws IOException {
-    bufCurrent.clear();
-    assert bufReady.size() == 0 : "previous data is not flushed yet";
+    assert doubleBuf.isFlushed() : "previous data is not flushed yet";
+    this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE);
   }
   }
 
 
   @Override // EditLogOutputStream
   @Override // EditLogOutputStream
   public void close() throws IOException {
   public void close() throws IOException {
     // close should have been called after all pending transactions 
     // close should have been called after all pending transactions 
     // have been flushed & synced.
     // have been flushed & synced.
-    int size = bufCurrent.size();
+    int size = doubleBuf.countBufferedBytes();
     if (size != 0) {
     if (size != 0) {
       throw new IOException("BackupEditStream has " + size +
       throw new IOException("BackupEditStream has " + size +
                           " records still to be flushed and cannot be closed.");
                           " records still to be flushed and cannot be closed.");
     } 
     } 
     RPC.stopProxy(backupNode); // stop the RPC threads
     RPC.stopProxy(backupNode); // stop the RPC threads
-    bufCurrent = bufReady = null;
+    doubleBuf.close();
+    doubleBuf = null;
   }
   }
 
 
   @Override // EditLogOutputStream
   @Override // EditLogOutputStream
   void setReadyToFlush() throws IOException {
   void setReadyToFlush() throws IOException {
-    assert bufReady.size() == 0 : "previous data is not flushed yet";
-    ArrayList<BufferedOp>  tmp = bufReady;
-    bufReady = bufCurrent;
-    bufCurrent = tmp;
+    doubleBuf.setReadyToFlush();
   }
   }
 
 
   @Override // EditLogOutputStream
   @Override // EditLogOutputStream
   protected void flushAndSync() throws IOException {
   protected void flushAndSync() throws IOException {
-    assert out.size() == 0 : "Output buffer is not empty";
-    int bufReadySize = bufReady.size();
-    for(int idx = 0; idx < bufReadySize; idx++) {
-      BufferedOp jRec = null;
-      for(; idx < bufReadySize; idx++) {
-        jRec = bufReady.get(idx);
-        if(jRec.opCode.getOpCode() 
-           >= FSEditLogOpCodes.OP_JSPOOL_START.getOpCode())
-          break;  // special operation should be sent in a separate call to BN
-        out.write(jRec.bytes, 0, jRec.bytes.length);
-      }
-      if(out.size() > 0)
-        send(NamenodeProtocol.JA_JOURNAL);
-      if(idx == bufReadySize)
-        break;
-      // operation like start journal spool or increment checkpoint time
-      // is a separate call to BN
-      out.write(jRec.bytes, 0, jRec.bytes.length);
-      send(jRec.opCode.getOpCode());
+    // XXX: this code won't work in trunk, but it's redone
+    // in HDFS-1073 where it's simpler.
+    doubleBuf.flushTo(out);
+    if (out.size() > 0) {
+      send(NamenodeProtocol.JA_JOURNAL);
     }
     }
-    bufReady.clear();         // erase all data in the buffer
-    out.reset();              // reset buffer to the start position
   }
   }
 
 
   /**
   /**

+ 17 - 54
hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java

@@ -24,12 +24,9 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileChannel;
-import java.util.zip.Checksum;
 
 
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Writable;
 
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
 
 
@@ -43,10 +40,7 @@ class EditLogFileOutputStream extends EditLogOutputStream {
   private File file;
   private File file;
   private FileOutputStream fp; // file stream for storing edit logs
   private FileOutputStream fp; // file stream for storing edit logs
   private FileChannel fc; // channel of the file stream for sync
   private FileChannel fc; // channel of the file stream for sync
-  private DataOutputBuffer bufCurrent; // current buffer for writing
-  private DataOutputBuffer bufReady; // buffer ready for flushing
-  private FSEditLogOp.Writer writer;
-  final private int initBufferSize; // inital buffer size
+  private EditsDoubleBuffer doubleBuf;
   static ByteBuffer fill = ByteBuffer.allocateDirect(1024 * 1024); // preallocation, 1MB
   static ByteBuffer fill = ByteBuffer.allocateDirect(1024 * 1024); // preallocation, 1MB
 
 
   static {
   static {
@@ -68,10 +62,7 @@ class EditLogFileOutputStream extends EditLogOutputStream {
   EditLogFileOutputStream(File name, int size) throws IOException {
   EditLogFileOutputStream(File name, int size) throws IOException {
     super();
     super();
     file = name;
     file = name;
-    initBufferSize = size;
-    bufCurrent = new DataOutputBuffer(size);
-    bufReady = new DataOutputBuffer(size);
-    writer = new FSEditLogOp.Writer(bufCurrent);
+    doubleBuf = new EditsDoubleBuffer(size);
     RandomAccessFile rp = new RandomAccessFile(name, "rw");
     RandomAccessFile rp = new RandomAccessFile(name, "rw");
     fp = new FileOutputStream(rp.getFD()); // open for append
     fp = new FileOutputStream(rp.getFD()); // open for append
     fc = rp.getChannel();
     fc = rp.getChannel();
@@ -91,23 +82,13 @@ class EditLogFileOutputStream extends EditLogOutputStream {
   /** {@inheritDoc} */
   /** {@inheritDoc} */
   @Override
   @Override
   void write(FSEditLogOp op) throws IOException {
   void write(FSEditLogOp op) throws IOException {
-    int start = bufCurrent.getLength();
-    
-    writer.writeOp(op);
-
-    // write transaction checksum
-    int end = bufCurrent.getLength();
-    Checksum checksum = FSEditLog.getChecksum();
-    checksum.reset();
-    checksum.update(bufCurrent.getData(), start, end-start);
-    int sum = (int)checksum.getValue();
-    bufCurrent.writeInt(sum);
+    doubleBuf.writeOp(op);
   }
   }
 
 
   /** {@inheritDoc} */
   /** {@inheritDoc} */
   @Override
   @Override
   void writeRaw(byte[] bytes, int offset, int length) throws IOException {
   void writeRaw(byte[] bytes, int offset, int length) throws IOException {
-    bufCurrent.write(bytes, offset, length);
+    doubleBuf.writeRaw(bytes, offset, length);
   }
   }
 
 
   /**
   /**
@@ -117,7 +98,7 @@ class EditLogFileOutputStream extends EditLogOutputStream {
   void create() throws IOException {
   void create() throws IOException {
     fc.truncate(0);
     fc.truncate(0);
     fc.position(0);
     fc.position(0);
-    bufCurrent.writeInt(FSConstants.LAYOUT_VERSION);
+    doubleBuf.getCurrentBuf().writeInt(FSConstants.LAYOUT_VERSION);
     setReadyToFlush();
     setReadyToFlush();
     flush();
     flush();
   }
   }
@@ -128,23 +109,11 @@ class EditLogFileOutputStream extends EditLogOutputStream {
       // close should have been called after all pending transactions
       // close should have been called after all pending transactions
       // have been flushed & synced.
       // have been flushed & synced.
       // if already closed, just skip
       // if already closed, just skip
-      if(bufCurrent != null)
-      {
-        int bufSize = bufCurrent.size();
-        if (bufSize != 0) {
-          throw new IOException("FSEditStream has " + bufSize
-              + " bytes still to be flushed and cannot " + "be closed.");
-        }
-        bufCurrent.close();
-        bufCurrent = null;
-        writer = null;
-      }
-  
-      if(bufReady != null) {
-        bufReady.close();
-        bufReady = null;
+      if (doubleBuf != null) {
+        doubleBuf.close();
+        doubleBuf = null;
       }
       }
-  
+      
       // remove the last INVALID marker from transaction log.
       // remove the last INVALID marker from transaction log.
       if (fc != null && fc.isOpen()) {
       if (fc != null && fc.isOpen()) {
         fc.truncate(fc.position());
         fc.truncate(fc.position());
@@ -156,9 +125,8 @@ class EditLogFileOutputStream extends EditLogOutputStream {
         fp = null;
         fp = null;
       }
       }
     } finally {
     } finally {
-      IOUtils.cleanup(FSNamesystem.LOG, bufCurrent, bufReady, fc, fp);
-      bufCurrent = bufReady = null;
-      writer = null;
+      IOUtils.cleanup(FSNamesystem.LOG, fc, fp);
+      doubleBuf = null;
       fc = null;
       fc = null;
       fp = null;
       fp = null;
     }
     }
@@ -170,12 +138,8 @@ class EditLogFileOutputStream extends EditLogOutputStream {
    */
    */
   @Override
   @Override
   void setReadyToFlush() throws IOException {
   void setReadyToFlush() throws IOException {
-    assert bufReady.size() == 0 : "previous data is not flushed yet";
-    bufCurrent.write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker
-    DataOutputBuffer tmp = bufReady;
-    bufReady = bufCurrent;
-    bufCurrent = tmp;
-    writer = new FSEditLogOp.Writer(bufCurrent);
+    doubleBuf.getCurrentBuf().write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker
+    doubleBuf.setReadyToFlush();
   }
   }
 
 
   /**
   /**
@@ -185,8 +149,7 @@ class EditLogFileOutputStream extends EditLogOutputStream {
   @Override
   @Override
   protected void flushAndSync() throws IOException {
   protected void flushAndSync() throws IOException {
     preallocate(); // preallocate file if necessary
     preallocate(); // preallocate file if necessary
-    bufReady.writeTo(fp); // write data to file
-    bufReady.reset(); // erase all data in the buffer
+    doubleBuf.flushTo(fp);
     fc.force(false); // metadata updates not needed because of preallocation
     fc.force(false); // metadata updates not needed because of preallocation
     fc.position(fc.position() - 1); // skip back the end-of-file marker
     fc.position(fc.position() - 1); // skip back the end-of-file marker
   }
   }
@@ -196,7 +159,7 @@ class EditLogFileOutputStream extends EditLogOutputStream {
    */
    */
   @Override
   @Override
   public boolean shouldForceSync() {
   public boolean shouldForceSync() {
-    return bufReady.size() >= initBufferSize;
+    return doubleBuf.shouldForceSync();
   }
   }
   
   
   /**
   /**
@@ -205,8 +168,8 @@ class EditLogFileOutputStream extends EditLogOutputStream {
   @Override
   @Override
   long length() throws IOException {
   long length() throws IOException {
     // file size - header size + size of both buffers
     // file size - header size + size of both buffers
-    return fc.size() - EDITS_FILE_HEADER_SIZE_BYTES + bufReady.size()
-        + bufCurrent.size();
+    return fc.size() - EDITS_FILE_HEADER_SIZE_BYTES + 
+      doubleBuf.countBufferedBytes();
   }
   }
 
 
   // allocate a big chunk of data
   // allocate a big chunk of data

+ 105 - 0
hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java

@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Writer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A double-buffer for edits. New edits are written into the first buffer
+ * while the second is available to be flushed. Each time the double-buffer
+ * is flushed, the two internal buffers are swapped. This allows edits
+ * to progress concurrently to flushes without allocating new buffers each
+ * time.
+ */
+class EditsDoubleBuffer {
+
+  private DataOutputBuffer bufCurrent; // current buffer for writing
+  private DataOutputBuffer bufReady; // buffer ready for flushing
+  private final int initBufferSize;
+  private Writer writer;
+
+  public EditsDoubleBuffer(int defaultBufferSize) {
+    initBufferSize = defaultBufferSize;
+    bufCurrent = new DataOutputBuffer(initBufferSize);
+    bufReady = new DataOutputBuffer(initBufferSize);
+    writer = new FSEditLogOp.Writer(bufCurrent);
+  }
+    
+  public void writeOp(FSEditLogOp op) throws IOException {
+    writer.writeOp(op);
+  }
+
+  void writeRaw(byte[] bytes, int offset, int length) throws IOException {
+    bufCurrent.write(bytes, offset, length);
+  }
+  
+  void close() throws IOException {
+    Preconditions.checkNotNull(bufCurrent);
+    Preconditions.checkNotNull(bufReady);
+
+    int bufSize = bufCurrent.size();
+    if (bufSize != 0) {
+      throw new IOException("FSEditStream has " + bufSize
+          + " bytes still to be flushed and cannot be closed.");
+    }
+
+    IOUtils.cleanup(null, bufCurrent, bufReady);
+    bufCurrent = bufReady = null;
+  }
+  
+  void setReadyToFlush() {
+    assert isFlushed() : "previous data not flushed yet";
+    DataOutputBuffer tmp = bufReady;
+    bufReady = bufCurrent;
+    bufCurrent = tmp;
+    writer = new FSEditLogOp.Writer(bufCurrent);
+  }
+  
+  /**
+   * Writes the content of the "ready" buffer to the given output stream,
+   * and resets it. Does not swap any buffers.
+   */
+  void flushTo(OutputStream out) throws IOException {
+    bufReady.writeTo(out); // write data to file
+    bufReady.reset(); // erase all data in the buffer
+  }
+  
+  boolean shouldForceSync() {
+    return bufReady.size() >= initBufferSize;
+  }
+
+  DataOutputBuffer getCurrentBuf() {
+    return bufCurrent;
+  }
+
+  public boolean isFlushed() {
+    return bufReady.size() == 0;
+  }
+
+  public int countBufferedBytes() {
+    return bufReady.size() + bufCurrent.size();
+  }
+
+}

+ 13 - 6
hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Writable;
@@ -1341,10 +1342,10 @@ public abstract class FSEditLogOp {
    * Class for writing editlog ops
    * Class for writing editlog ops
    */
    */
   public static class Writer {
   public static class Writer {
-    private final DataOutputStream out;
+    private final DataOutputBuffer buf;
 
 
-    public Writer(DataOutputStream out) {
-      this.out = out;
+    public Writer(DataOutputBuffer out) {
+      this.buf = out;
     }
     }
 
 
     /**
     /**
@@ -1354,9 +1355,15 @@ public abstract class FSEditLogOp {
      * @throws IOException if an error occurs during writing.
      * @throws IOException if an error occurs during writing.
      */
      */
     public void writeOp(FSEditLogOp op) throws IOException {
     public void writeOp(FSEditLogOp op) throws IOException {
-      out.writeByte(op.opCode.getOpCode());
-      
-      op.writeFields(out);
+      int start = buf.getLength();
+      buf.writeByte(op.opCode.getOpCode());
+      op.writeFields(buf);
+      int end = buf.getLength();
+      Checksum checksum = FSEditLog.getChecksum();
+      checksum.reset();
+      checksum.update(buf.getData(), start, end-start);
+      int sum = (int)checksum.getValue();
+      buf.writeInt(sum);
     }
     }
   }
   }
 
 

+ 81 - 0
hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditsDoubleBuffer.java

@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.junit.Test;
+
+public class TestEditsDoubleBuffer {
+  @Test
+  public void testDoubleBuffer() throws IOException {
+    EditsDoubleBuffer buf = new EditsDoubleBuffer(1024);
+    
+    assertTrue(buf.isFlushed());
+    byte[] data = new byte[100];
+    buf.writeRaw(data, 0, data.length);
+    assertEquals("Should count new data correctly",
+        data.length, buf.countBufferedBytes());
+
+    assertTrue("Writing to current buffer should not affect flush state",
+        buf.isFlushed());
+
+    // Swap the buffers
+    buf.setReadyToFlush();
+    assertEquals("Swapping buffers should still count buffered bytes",
+        data.length, buf.countBufferedBytes());
+    assertFalse(buf.isFlushed());
+ 
+    // Flush to a stream
+    DataOutputBuffer outBuf = new DataOutputBuffer();
+    buf.flushTo(outBuf);
+    assertEquals(data.length, outBuf.getLength());
+    assertTrue(buf.isFlushed());
+    assertEquals(0, buf.countBufferedBytes());
+    
+    // Write some more
+    buf.writeRaw(data, 0, data.length);
+    assertEquals("Should count new data correctly",
+        data.length, buf.countBufferedBytes());
+    buf.setReadyToFlush();
+    buf.flushTo(outBuf);
+    
+    assertEquals(data.length * 2, outBuf.getLength());
+    
+    assertEquals(0, buf.countBufferedBytes());
+
+    outBuf.close();
+  }
+  
+  @Test
+  public void shouldFailToCloseWhenUnflushed() throws IOException {
+    EditsDoubleBuffer buf = new EditsDoubleBuffer(1024);
+    buf.writeRaw(new byte[1], 0, 1);
+    try {
+      buf.close();
+      fail("Did not fail to close with unflushed data");
+    } catch (IOException ioe) {
+      if (!ioe.toString().contains("still to be flushed")) {
+        throw ioe;
+      }
+    }
+  }
+}