Browse Source

Merging -r1035145:r1035410 from trunk to federation branch

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1078146 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 14 năm trước cách đây
mục cha
commit
ecf19bbd72

+ 13 - 0
CHANGES.txt

@@ -292,6 +292,12 @@ Release 0.22.0 - Unreleased
     HDFS-811. Add metrics, failure reporting and additional tests for HDFS-457.
     (eli)
 
+    HDFS-895. Allow hflush/sync to occur in parallel with new writes
+    to the file. (Todd Lipcon via hairong)
+
+    HDFS-1500. TestOfflineImageViewer failing on trunk. (Todd Lipcon
+    via hairong)
+
   IMPROVEMENTS
 
     HDFS-1304. Add a new unit test for HftpFileSystem.open(..).  (szetszwo)
@@ -425,6 +431,8 @@ Release 0.22.0 - Unreleased
 
     HDFS-556. Provide info on failed volumes in the web ui. (eli)
 
+    HDFS-697. Enable asserts for tests by default. (eli)
+
   OPTIMIZATIONS
 
     HDFS-1140. Speedup INode.getPathComponents. (Dmytro Molkov via shv)
@@ -576,9 +584,14 @@ Release 0.22.0 - Unreleased
     other's service policies.  (Aaron T. Myers via tomwhite)
 
     HDFS-1440. Fix TestComputeInvalidateWork failure. (suresh)
+ 
+    HDFS-1498. FSDirectory#unprotectedConcat calls setModificationTime 
+    on a file. (eli)
 
     HDFS-1625. Ignore disk space values in TestDataNodeMXBean.  (szetszwo)
 
+    HDFS-1466. TestFcHdfsSymlink relies on /tmp/test not existing. (eli)
+
 Release 0.21.1 - Unreleased
 
     HDFS-1411. Correct backup node startup command in hdfs user guide.

+ 2 - 0
build.xml

@@ -104,6 +104,7 @@
 
   <property name="test.hdfs.rpc.engine" value=""/>
   <property name="test.libhdfs.dir" value="${test.build.dir}/libhdfs"/>
+  <property name="test.junit.jvmargs" value="-ea" />
 
   <property name="web.src.dir" value="${basedir}/src/web"/>
   <property name="src.webapps" value="${basedir}/src/webapps"/>
@@ -592,6 +593,7 @@
         maxmemory="${test.junit.maxmemory}"
         dir="${basedir}" timeout="${test.timeout}"
         errorProperty="tests.failed" failureProperty="tests.failed">
+        <jvmarg value="${test.junit.jvmargs}" />
         <sysproperty key="test.build.data" value="@{test.dir}/data"/>
         <sysproperty key="test.cache.data" value="${test.cache.data}"/>     
         <sysproperty key="test.debug.data" value="${test.debug.data}"/>

+ 123 - 61
src/java/org/apache/hadoop/hdfs/DFSOutputStream.java

@@ -114,12 +114,14 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
   private Packet currentPacket = null;
   private DataStreamer streamer;
   private long currentSeqno = 0;
+  private long lastQueuedSeqno = -1;
+  private long lastAckedSeqno = -1;
   private long bytesCurBlock = 0; // bytes writen in current block
   private int packetSize = 0; // write packet size, including the header.
   private int chunksPerPacket = 0;
   private volatile IOException lastException = null;
   private long artificialSlowdown = 0;
-  private long lastFlushOffset = -1; // offset when flush was invoked
+  private long lastFlushOffset = 0; // offset when flush was invoked
   //persist blocks on namenode
   private final AtomicBoolean persistBlocks = new AtomicBoolean(false);
   private volatile boolean appendChunk = false;   // appending to existing partial block
@@ -433,6 +435,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
               one = dataQueue.getFirst(); // regular data packet
             }
           }
+          assert one != null;
 
           // get new block from namenode.
           if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
@@ -669,6 +672,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
             block.setNumBytes(one.getLastByteOffsetBlock());
 
             synchronized (dataQueue) {
+              lastAckedSeqno = seqno;
               ackQueue.removeFirst();
               dataQueue.notifyAll();
             }
@@ -719,8 +723,21 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
       
       if (!streamerClosed && dfsClient.clientRunning) {
         if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
+
+          // If we had an error while closing the pipeline, we go through a fast-path
+          // where the BlockReceiver does not run. Instead, the DataNode just finalizes
+          // the block immediately during the 'connect ack' process. So, we want to pull
+          // the end-of-block packet from the dataQueue, since we don't actually have
+          // a true pipeline to send it over.
+          //
+          // We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that
+          // a client waiting on close() will be aware that the flush finished.
           synchronized (dataQueue) {
-            dataQueue.remove();  // remove the end of block packet
+            assert dataQueue.size() == 1;
+            Packet endOfBlockPacket = dataQueue.remove();  // remove the end of block packet
+            assert endOfBlockPacket.lastPacketInBlock;
+            assert lastAckedSeqno == endOfBlockPacket.seqno - 1;
+            lastAckedSeqno = endOfBlockPacket.seqno;
             dataQueue.notifyAll();
           }
           endBlock();
@@ -1130,14 +1147,20 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
     }
   }
 
-  private void queuePacket(Packet packet) {
+  private void queueCurrentPacket() {
     synchronized (dataQueue) {
-      dataQueue.addLast(packet);
+      if (currentPacket == null) return;
+      dataQueue.addLast(currentPacket);
+      lastQueuedSeqno = currentPacket.seqno;
+      if (DFSClient.LOG.isDebugEnabled()) {
+        DFSClient.LOG.debug("Queued packet " + currentPacket.seqno);
+      }
+      currentPacket = null;
       dataQueue.notifyAll();
     }
   }
 
-  private void waitAndQueuePacket(Packet packet) throws IOException {
+  private void waitAndQueueCurrentPacket() throws IOException {
     synchronized (dataQueue) {
       // If queue is full, then wait till we have enough space
       while (!closed && dataQueue.size() + ackQueue.size()  > MAX_PACKETS) {
@@ -1147,7 +1170,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
         }
       }
       isClosed();
-      queuePacket(packet);
+      queueCurrentPacket();
     }
   }
 
@@ -1201,8 +1224,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
             ", blockSize=" + blockSize +
             ", appendChunk=" + appendChunk);
       }
-      waitAndQueuePacket(currentPacket);
-      currentPacket = null;
+      waitAndQueueCurrentPacket();
 
       // If the reopened file did not end at chunk boundary and the above
       // write filled up its partial chunk. Tell the summer to generate full 
@@ -1224,10 +1246,9 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
         currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, 
             bytesCurBlock);
         currentPacket.lastPacketInBlock = true;
-        waitAndQueuePacket(currentPacket);
-        currentPacket = null;
+        waitAndQueueCurrentPacket();
         bytesCurBlock = 0;
-        lastFlushOffset = -1;
+        lastFlushOffset = 0;
       }
     }
   }
@@ -1244,60 +1265,88 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
    * but not neccessary on the DN's OS buffers. 
    *
    * It is a synchronous operation. When it returns,
-   * it gurantees that flushed data become visible to new readers. 
+   * it guarantees that flushed data become visible to new readers. 
    * It is not guaranteed that data has been flushed to 
    * persistent store on the datanode. 
    * Block allocations are persisted on namenode.
    */
   @Override
-  public synchronized void hflush() throws IOException {
+  public void hflush() throws IOException {
     dfsClient.checkOpen();
     isClosed();
     try {
-      /* Record current blockOffset. This might be changed inside
-       * flushBuffer() where a partial checksum chunk might be flushed.
-       * After the flush, reset the bytesCurBlock back to its previous value,
-       * any partial checksum chunk will be sent now and in next packet.
-       */
-      long saveOffset = bytesCurBlock;
-
-      // flush checksum buffer, but keep checksum buffer intact
-      flushBuffer(true);
-
-      if(DFSClient.LOG.isDebugEnabled()) {
-        DFSClient.LOG.debug("DFSClient flush() : saveOffset " + saveOffset +  
+      long toWaitFor;
+      synchronized (this) {
+        /* Record current blockOffset. This might be changed inside
+         * flushBuffer() where a partial checksum chunk might be flushed.
+         * After the flush, reset the bytesCurBlock back to its previous value,
+         * any partial checksum chunk will be sent now and in next packet.
+         */
+        long saveOffset = bytesCurBlock;
+        Packet oldCurrentPacket = currentPacket;
+        // flush checksum buffer, but keep checksum buffer intact
+        flushBuffer(true);
+        // bytesCurBlock potentially incremented if there was buffered data
+
+        if (DFSClient.LOG.isDebugEnabled()) {
+          DFSClient.LOG.debug(
+            "DFSClient flush() : saveOffset " + saveOffset +  
             " bytesCurBlock " + bytesCurBlock +
             " lastFlushOffset " + lastFlushOffset);
-      }
-      
-      // Flush only if we haven't already flushed till this offset.
-      if (lastFlushOffset != bytesCurBlock) {
+        }
+        // Flush only if we haven't already flushed till this offset.
+        if (lastFlushOffset != bytesCurBlock) {
+          assert bytesCurBlock > lastFlushOffset;
+          // record the valid offset of this flush
+          lastFlushOffset = bytesCurBlock;
+          waitAndQueueCurrentPacket();
+        } else {
+          // We already flushed up to this offset.
+          // This means that we haven't written anything since the last flush
+          // (or the beginning of the file). Hence, we should not have any
+          // packet queued prior to this call, since the last flush set
+          // currentPacket = null.
+          assert oldCurrentPacket == null :
+            "Empty flush should not occur with a currentPacket";
+
+          // just discard the current packet since it is already been sent.
+          currentPacket = null;
+        }
+        // Restore state of stream. Record the last flush offset 
+        // of the last full chunk that was flushed.
+        //
+        bytesCurBlock = saveOffset;
+        toWaitFor = lastQueuedSeqno;
+      } // end synchronized
 
-        // record the valid offset of this flush
-        lastFlushOffset = bytesCurBlock;
-
-        // wait for all packets to be sent and acknowledged
-        flushInternal();
-      } else {
-        // just discard the current packet since it is already been sent.
-        currentPacket = null;
-      }
-      
-      // Restore state of stream. Record the last flush offset 
-      // of the last full chunk that was flushed.
-      //
-      bytesCurBlock = saveOffset;
+      waitForAckedSeqno(toWaitFor);
 
       // If any new blocks were allocated since the last flush, 
       // then persist block locations on namenode. 
       //
       if (persistBlocks.getAndSet(false)) {
-        dfsClient.namenode.fsync(src, dfsClient.clientName);
+        try {
+          dfsClient.namenode.fsync(src, dfsClient.clientName);
+        } catch (IOException ioe) {
+          DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, ioe);
+          // If we got an error here, it might be because some other thread called
+          // close before our hflush completed. In that case, we should throw an
+          // exception that the stream is closed.
+          isClosed();
+          // If we aren't closed but failed to sync, we should expose that to the
+          // caller.
+          throw ioe;
+        }
       }
     } catch (IOException e) {
-        lastException = new IOException("IOException flush:" + e);
-        closeThreads(true);
-        throw e;
+      DFSClient.LOG.warn("Error while syncing", e);
+      synchronized (this) {
+        if (!closed) {
+          lastException = new IOException("IOException flush:" + e);
+          closeThreads(true);
+        }
+      }
+      throw e;
     }
   }
 
@@ -1338,26 +1387,39 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
    * Waits till all existing data is flushed and confirmations 
    * received from datanodes. 
    */
-  private synchronized void flushInternal() throws IOException {
-    dfsClient.checkOpen();
-    isClosed();
-    //
-    // If there is data in the current buffer, send it across
-    //
-    if (currentPacket != null) {
-      queuePacket(currentPacket);
-      currentPacket = null;
+  private void flushInternal() throws IOException {
+    long toWaitFor;
+    synchronized (this) {
+      dfsClient.checkOpen();
+      isClosed();
+      //
+      // If there is data in the current buffer, send it across
+      //
+      queueCurrentPacket();
+      toWaitFor = lastQueuedSeqno;
     }
 
+    waitForAckedSeqno(toWaitFor);
+  }
+
+  private void waitForAckedSeqno(long seqno) throws IOException {
+    if (DFSClient.LOG.isDebugEnabled()) {
+      DFSClient.LOG.debug("Waiting for ack for: " + seqno);
+    }
     synchronized (dataQueue) {
-      while (!closed && dataQueue.size() + ackQueue.size() > 0) {
+      while (!closed) {
+        isClosed();
+        if (lastAckedSeqno >= seqno) {
+          break;
+        }
         try {
-          dataQueue.wait();
-        } catch (InterruptedException  e) {
+          dataQueue.wait(1000); // when we receive an ack, we notify on dataQueue
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
         }
       }
-      isClosed();
     }
+    isClosed();
   }
 
   /**
@@ -1409,7 +1471,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
       flushBuffer();       // flush from all upper layers
 
       if (currentPacket != null) { 
-        waitAndQueuePacket(currentPacket);
+        waitAndQueueCurrentPacket();
       }
 
       if (bytesCurBlock != 0) {

+ 4 - 2
src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java

@@ -1650,8 +1650,10 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     FileChannel channel = file.getChannel();
     long oldPos = channel.position();
     long newPos = oldPos - checksumSize;
-    DataNode.LOG.info("Changing meta file offset of block " + b + " from " + 
-        oldPos + " to " + newPos);
+    if (DataNode.LOG.isDebugEnabled()) {
+      DataNode.LOG.debug("Changing meta file offset of block " + b + " from " +
+          oldPos + " to " + newPos);
+    }
     channel.position(newPos);
   }
 

+ 1 - 1
src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -990,7 +990,7 @@ class FSDirectory implements Closeable {
     }
     
     long now = now();
-    trgInode.setModificationTime(now);
+    trgInode.setModificationTimeForce(now);
     trgParent.setModificationTime(now);
     // update quota on the parent directory ('count' files removed, 0 space)
     unprotectedUpdateCount(trgINodes, trgINodes.length-1, - count, 0);

+ 1 - 0
src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java

@@ -335,6 +335,7 @@ public class SecondaryNameNode implements Runnable {
           public Void run() throws Exception {
             checkpointImage.cTime = sig.cTime;
             checkpointImage.checkpointTime = sig.checkpointTime;
+            checkpointImage.imageDigest = sig.imageDigest;
         
             // get fsimage
             String fileid = "getimage=1";

+ 1 - 1
src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java

@@ -120,7 +120,7 @@ class ImageLoaderCurrent implements ImageLoader {
   protected final DateFormat dateFormat = 
                                       new SimpleDateFormat("yyyy-MM-dd HH:mm");
   private static int [] versions = 
-           {-16, -17, -18, -19, -20, -21, -22, -23, -24, -25, -26, -27};
+    {-16, -17, -18, -19, -20, -21, -22, -23, -24, -25, -26, -27};
   private int imageVersion = 0;
 
   /* (non-Javadoc)

+ 5 - 5
src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj

@@ -86,13 +86,13 @@ privileged public aspect DFSClientAspects {
     LOG.info("FI: before pipelineClose:");
   }
 
-  pointcut checkAckQueue(DFSOutputStream.Packet cp):
-    call (void DFSOutputStream.waitAndQueuePacket(
-            DFSOutputStream.Packet))
+  pointcut checkAckQueue(DFSOutputStream stream):
+    call (void DFSOutputStream.waitAndQueueCurrentPacket())
     && withincode (void DFSOutputStream.writeChunk(..))
-    && args(cp);
+    && this(stream);
 
-  after(DFSOutputStream.Packet cp) : checkAckQueue (cp) {
+  after(DFSOutputStream stream) : checkAckQueue (stream) {
+    DFSOutputStream.Packet cp = stream.currentPacket;
     PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
     if (pTest != null && pTest instanceof PipelinesTest) {
       LOG.debug("FI: Recording packet # " + cp.seqno

+ 6 - 5
src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsSymlink.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import static org.apache.hadoop.fs.FileContextTestHelper.*;
 import org.apache.hadoop.ipc.RemoteException;
 import static org.junit.Assert.*;
 import org.junit.Test;
@@ -46,11 +47,11 @@ public class TestFcHdfsSymlink extends FileContextSymlinkBaseTest {
     return "hdfs";
   }
 
-  protected String testBaseDir1() {
+  protected String testBaseDir1() throws IOException {
     return "/test1";
   }
   
-  protected String testBaseDir2() {
+  protected String testBaseDir2() throws IOException {
     return "/test2";
   }
 
@@ -83,11 +84,11 @@ public class TestFcHdfsSymlink extends FileContextSymlinkBaseTest {
   @Test
   /** Link from Hdfs to LocalFs */
   public void testLinkAcrossFileSystems() throws IOException {
-    Path localDir  = new Path("file:///tmp/test");
-    Path localFile = new Path("file:///tmp/test/file");
+    Path localDir  = new Path("file://"+getAbsoluteTestRootDir(fc)+"/test");
+    Path localFile = new Path("file://"+getAbsoluteTestRootDir(fc)+"/test/file");
     Path link      = new Path(testBaseDir1(), "linkToFile");
     FileContext localFc = FileContext.getLocalFSFileContext();
-    localFc.delete(new Path("file:///tmp/test"), true);
+    localFc.delete(localDir, true);
     localFc.mkdir(localDir, FileContext.DEFAULT_PERM, true);
     localFc.setWorkingDirectory(localDir);
     assertEquals(localDir, localFc.getWorkingDirectory());

+ 256 - 0
src/test/hdfs/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java

@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.junit.Test;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.log4j.Level;
+
+/**
+ * This class tests hflushing concurrently from many threads.
+ */
+public class TestMultiThreadedHflush {
+  static final int blockSize = 1024*1024;
+  static final int numBlocks = 10;
+  static final int fileSize = numBlocks * blockSize + 1;
+
+  private static final int NUM_THREADS = 10;
+  private static final int WRITE_SIZE = 517;
+  private static final int NUM_WRITES_PER_THREAD = 1000;
+  
+  private byte[] toWrite = null;
+
+  {
+    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)InterDatanodeProtocol.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  /*
+   * creates a file but does not close it
+   */ 
+  private FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
+    throws IOException {
+    FSDataOutputStream stm = fileSys.create(name, true,
+                                            fileSys.getConf().getInt("io.file.buffer.size", 4096),
+                                            (short)repl, (long)blockSize);
+    return stm;
+  }
+  
+  private void initBuffer(int size) {
+    long seed = AppendTestUtil.nextLong();
+    toWrite = AppendTestUtil.randomBytes(seed, size);
+  }
+
+  private class WriterThread extends Thread {
+    private final FSDataOutputStream stm;
+    private final AtomicReference<Throwable> thrown;
+    private final int numWrites;
+    private final CountDownLatch countdown;
+
+    public WriterThread(FSDataOutputStream stm,
+      AtomicReference<Throwable> thrown,
+      CountDownLatch countdown, int numWrites) {
+      this.stm = stm;
+      this.thrown = thrown;
+      this.numWrites = numWrites;
+      this.countdown = countdown;
+    }
+
+    public void run() {
+      try {
+        countdown.await();
+        for (int i = 0; i < numWrites && thrown.get() == null; i++) {
+          doAWrite();
+        }
+      } catch (Throwable t) {
+        thrown.compareAndSet(null, t);
+      }
+    }
+
+    private void doAWrite() throws IOException {
+      stm.write(toWrite);
+      stm.hflush();
+    }
+  }
+
+
+  /**
+   * Test case where a bunch of threads are both appending and flushing.
+   * They all finish before the file is closed.
+   */
+  @Test
+  public void testMultipleHflushers() throws Exception {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+
+    FileSystem fs = cluster.getFileSystem();
+    Path p = new Path("/multiple-hflushers.dat");
+    try {
+      doMultithreadedWrites(conf, p, NUM_THREADS, WRITE_SIZE, NUM_WRITES_PER_THREAD);
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test case where a bunch of threads are continuously calling hflush() while another
+   * thread appends some data and then closes the file.
+   *
+   * The hflushing threads should eventually catch an IOException stating that the stream
+   * was closed -- and not an NPE or anything like that.
+   */
+  @Test
+  public void testHflushWhileClosing() throws Throwable {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    FileSystem fs = cluster.getFileSystem();
+    Path p = new Path("/hflush-and-close.dat");
+
+    final FSDataOutputStream stm = createFile(fs, p, 1);
+
+
+    ArrayList<Thread> flushers = new ArrayList<Thread>();
+    final AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
+    try {
+      for (int i = 0; i < 10; i++) {
+        Thread flusher = new Thread() {
+            public void run() {
+              try {
+                while (true) {
+                  try {
+                    stm.hflush();
+                  } catch (IOException ioe) {
+                    if (!ioe.toString().contains("DFSOutputStream is closed")) {
+                      throw ioe;
+                    } else {
+                      return;
+                    }
+                  }
+                }
+              } catch (Throwable t) {
+                thrown.set(t);
+              }
+            }
+          };
+        flusher.start();
+        flushers.add(flusher);
+      }
+
+      // Write some data
+      for (int i = 0; i < 10000; i++) {
+        stm.write(1);
+      }
+
+      // Close it while the flushing threads are still flushing
+      stm.close();
+
+      // Wait for the flushers to all die.
+      for (Thread t : flushers) {
+        t.join();
+      }
+
+      // They should have all gotten the expected exception, not anything
+      // else.
+      if (thrown.get() != null) {
+        throw thrown.get();
+      }
+
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
+  public void doMultithreadedWrites(
+    Configuration conf, Path p, int numThreads, int bufferSize, int numWrites)
+    throws Exception {
+    initBuffer(bufferSize);
+
+    // create a new file.
+    FileSystem fs = p.getFileSystem(conf);
+    FSDataOutputStream stm = createFile(fs, p, 1);
+    System.out.println("Created file simpleFlush.dat");
+
+    // There have been a couple issues with flushing empty buffers, so do
+    // some empty flushes first.
+    stm.hflush();
+    stm.hflush();
+    stm.write(1);
+    stm.hflush();
+    stm.hflush();
+
+    CountDownLatch countdown = new CountDownLatch(1);
+    ArrayList<Thread> threads = new ArrayList<Thread>();
+    AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
+    for (int i = 0; i < numThreads; i++) {
+      Thread t = new WriterThread(stm, thrown, countdown, numWrites);
+      threads.add(t);
+      t.start();
+    }
+
+    // Start all the threads at the same time for maximum raciness!
+    countdown.countDown();
+
+    for (Thread t : threads) {
+      t.join();
+    }
+    if (thrown.get() != null) {
+      throw new RuntimeException("Deferred", thrown.get());
+    }
+    stm.close();
+    System.out.println("Closed file.");
+  }
+
+  public static void main(String args[]) throws Exception {
+    if (args.length != 1) {
+      System.err.println(
+        "usage: " + TestMultiThreadedHflush.class.getSimpleName() +
+        " <path to test file> ");
+      System.exit(1);
+    }
+    TestMultiThreadedHflush test = new TestMultiThreadedHflush();
+    Configuration conf = new Configuration();
+    Path p = new Path(args[0]);
+    long st = System.nanoTime();
+    test.doMultithreadedWrites(conf, p, 10, 511, 50000);
+    long et = System.nanoTime();
+
+    System.out.println("Finished in " + ((et - st) / 1000000) + "ms");
+  }
+
+}

+ 9 - 19
src/test/hdfs/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java

@@ -76,7 +76,7 @@ public class TestOfflineImageViewer extends TestCase {
   
   // Main entry point into testing.  Necessary since we only want to generate
   // the fsimage file once and use it for multiple tests. 
-  public void testOIV() {
+  public void testOIV() throws Exception {
     File originalFsimage = null;
     try {
     originalFsimage = initFsimage();
@@ -98,7 +98,7 @@ public class TestOfflineImageViewer extends TestCase {
 
   // Create a populated namespace for later testing.  Save its contents to a
   // data structure and store its fsimage location.
-  private File initFsimage() {
+  private File initFsimage() throws IOException {
     MiniDFSCluster cluster = null;
     File orig = null;
     try {
@@ -131,11 +131,9 @@ public class TestOfflineImageViewer extends TestCase {
       URI [] files = cluster.getNameDirs(0).toArray(new URI[0]);
       orig =  new File(files[0].getPath(), "current/fsimage");
       
-      if(!orig.exists())
+      if (!orig.exists()) {
         fail("Didn't generate or can't find fsimage.");
-
-    } catch (IOException e) {
-      fail("Failed trying to generate fsimage file: " + e.getMessage());
+      }
     } finally {
       if(cluster != null)
         cluster.shutdown();
@@ -152,7 +150,7 @@ public class TestOfflineImageViewer extends TestCase {
 
   // Verify that we can correctly generate an ls-style output for a valid 
   // fsimage
-  private void outputOfLSVisitor(File originalFsimage) {
+  private void outputOfLSVisitor(File originalFsimage) throws IOException {
     File testFile = new File(ROOT, "/basicCheck");
     File outputFile = new File(ROOT, "/basicCheckOutput");
     
@@ -167,8 +165,6 @@ public class TestOfflineImageViewer extends TestCase {
       HashMap<String, LsElements> fileOutput = readLsfile(outputFile);
       
       compareNamespaces(writtenFiles, fileOutput);
-    } catch (IOException e) {
-      fail("Failed reading valid file: " + e.getMessage());
     } finally {
       if(testFile.exists()) testFile.delete();
       if(outputFile.exists()) outputFile.delete();
@@ -178,7 +174,7 @@ public class TestOfflineImageViewer extends TestCase {
   
   // Confirm that attempting to read an fsimage file with an unsupported
   // layout results in an error
-  public void unsupportedFSLayoutVersion(File originalFsimage) {
+  public void unsupportedFSLayoutVersion(File originalFsimage) throws IOException {
     File testFile = new File(ROOT, "/invalidLayoutVersion");
     File outputFile = new File(ROOT, "invalidLayoutVersionOutput");
     
@@ -196,8 +192,6 @@ public class TestOfflineImageViewer extends TestCase {
           throw e; // wasn't error we were expecting
         System.out.println("Correctly failed at reading bad image version.");
       }
-    } catch (IOException e) {
-      fail("Problem testing unsupported layout version: " + e.getMessage());
     } finally {
       if(testFile.exists()) testFile.delete();
       if(outputFile.exists()) outputFile.delete();
@@ -205,7 +199,7 @@ public class TestOfflineImageViewer extends TestCase {
   }
   
   // Verify that image viewer will bail on a file that ends unexpectedly
-  private void truncatedFSImage(File originalFsimage) {
+  private void truncatedFSImage(File originalFsimage) throws IOException {
     File testFile = new File(ROOT, "/truncatedFSImage");
     File outputFile = new File(ROOT, "/trucnatedFSImageOutput");
     try {
@@ -221,9 +215,7 @@ public class TestOfflineImageViewer extends TestCase {
       } catch (EOFException e) {
         System.out.println("Correctly handled EOF");
       }
-      
-    } catch (IOException e) {
-      fail("Failed testing truncatedFSImage: " + e.getMessage());
+
     } finally {
       if(testFile.exists()) testFile.delete();
       if(outputFile.exists()) outputFile.delete();
@@ -373,7 +365,7 @@ public class TestOfflineImageViewer extends TestCase {
     }
   }
 
-  private void outputOfFileDistributionVisitor(File originalFsimage) {
+  private void outputOfFileDistributionVisitor(File originalFsimage) throws IOException {
     File testFile = new File(ROOT, "/basicCheck");
     File outputFile = new File(ROOT, "/fileDistributionCheckOutput");
 
@@ -394,8 +386,6 @@ public class TestOfflineImageViewer extends TestCase {
         assertEquals(row.length, 2);
         totalFiles += Integer.parseInt(row[1]);
       }
-    } catch (IOException e) {
-      fail("Failed reading valid file: " + e.getMessage());
     } finally {
       if(testFile.exists()) testFile.delete();
       if(outputFile.exists()) outputFile.delete();