Browse Source

HDFS-909. Wait until edits syncing is finishes before purging edits. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@936131 13f79535-47bb-0310-9956-ffa450edef68
Konstantin Shvachko 15 years ago
parent
commit
3729725c41

+ 1 - 1
.eclipse.templates/.classpath

@@ -34,7 +34,7 @@
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/slf4j-api-1.5.8.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/test/slf4j-log4j12-1.4.3.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/xmlenc-0.52.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/test/mockito-all-1.8.0.jar"/>	
+	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/test/mockito-all-1.8.2.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/aspectjrt-1.6.5.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/cactus.core.framework.uberjar.javaEE.14-1.8.0.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/cactus.integration.ant-1.8.0.jar"/>

+ 3 - 0
CHANGES.txt

@@ -831,6 +831,9 @@ Release 0.20.3 - Unreleased
     HDFS-1041. DFSClient.getFileChecksum(..) should retry if connection to
     the first datanode fails.  (szetszwo)
 
+    HDFS-909. Wait until edits syncing is finishes before purging edits.
+    (Todd Lipcon via shv)
+
 Release 0.20.2 - Unreleased
 
   IMPROVEMENTS

+ 1 - 1
ivy/libraries.properties

@@ -77,4 +77,4 @@ xerces.version=1.4.4
 
 aspectj.version=1.6.5
 
-mockito-all.version=1.8.0
+mockito-all.version=1.8.2

+ 81 - 11
src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

@@ -113,7 +113,8 @@ public class FSEditLog {
   private long lastPrintTime;
 
   // is a sync currently running?
-  private boolean isSyncRunning;
+  private volatile boolean isSyncRunning;
+
 
   // these are statistics counters.
   private long numTransactions;        // number of transactions
@@ -159,6 +160,14 @@ public class FSEditLog {
     return editStreams == null ? 0 : editStreams.size();
   }
 
+  /**
+   * Return the currently active edit streams.
+   * This should be used only by unit tests.
+   */
+  ArrayList<EditLogOutputStream> getEditStreams() {
+    return editStreams;
+  }
+
   boolean isOpen() {
     return getNumEditStreams() > 0;
   }
@@ -201,6 +210,8 @@ public class FSEditLog {
   }
 
   synchronized void createEditLogFile(File name) throws IOException {
+    waitForSyncToFinish();
+
     EditLogOutputStream eStream = new EditLogFileOutputStream(name,
         sizeOutputFlushBuffer);
     eStream.create();
@@ -211,12 +222,7 @@ public class FSEditLog {
    * Shutdown the file store.
    */
   synchronized void close() {
-    while (isSyncRunning) {
-      try {
-        wait(1000);
-      } catch (InterruptedException ie) { 
-      }
-    }
+    waitForSyncToFinish();
     if (editStreams == null || editStreams.isEmpty()) {
       return;
     }
@@ -883,9 +889,52 @@ public class FSEditLog {
       metrics.transactions.inc((end-start));
   }
 
-  //
-  // Sync all modifications done by this thread.
-  //
+  /**
+   * Blocks until all ongoing edits have been synced to disk.
+   * This differs from logSync in that it waits for edits that have been
+   * written by other threads, not just edits from the calling thread.
+   *
+   * NOTE: this should be done while holding the FSNamesystem lock, or
+   * else more operations can start writing while this is in progress.
+   */
+  void logSyncAll() throws IOException {
+    // Record the most recent transaction ID as our own id
+    synchronized (this) {
+      TransactionId id = myTransactionId.get();
+      id.txid = txid;
+    }
+    // Then make sure we're synced up to this point
+    logSync();
+  }
+  
+  /**
+   * Sync all modifications done by this thread.
+   *
+   * The internal concurrency design of this class is as follows:
+   *   - Log items are written synchronized into an in-memory buffer,
+   *     and each assigned a transaction ID.
+   *   - When a thread (client) would like to sync all of its edits, logSync()
+   *     uses a ThreadLocal transaction ID to determine what edit number must
+   *     be synced to.
+   *   - The isSyncRunning volatile boolean tracks whether a sync is currently
+   *     under progress.
+   *
+   * The data is double-buffered within each edit log implementation so that
+   * in-memory writing can occur in parallel with the on-disk writing.
+   *
+   * Each sync occurs in three steps:
+   *   1. synchronized, it swaps the double buffer and sets the isSyncRunning
+   *      flag.
+   *   2. unsynchronized, it flushes the data to storage
+   *   3. synchronized, it resets the flag and notifies anyone waiting on the
+   *      sync.
+   *
+   * The lack of synchronization on step 2 allows other threads to continue
+   * to write into the memory buffer while the sync is in progress.
+   * Because this step is unsynchronized, actions that need to avoid
+   * concurrency with sync() should be synchronized and also call
+   * waitForSyncToFinish() before assuming they are running alone.
+   */
   public void logSync() throws IOException {
     ArrayList<EditLogOutputStream> errorStreams = null;
     long syncStart = 0;
@@ -1219,6 +1268,7 @@ public class FSEditLog {
    * Closes the current edit log and opens edits.new. 
    */
   synchronized void rollEditLog() throws IOException {
+    waitForSyncToFinish();
     Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS);
     if(!it.hasNext()) 
       return;
@@ -1251,6 +1301,8 @@ public class FSEditLog {
    * @throws IOException
    */
   synchronized void divertFileStreams(String dest) throws IOException {
+    waitForSyncToFinish();
+
     assert getNumEditStreams() >= getNumEditsDirs() :
       "Inconsistent number of streams";
     ArrayList<EditLogOutputStream> errorStreams = null;
@@ -1287,10 +1339,25 @@ public class FSEditLog {
    * Reopens the edits file.
    */
   synchronized void purgeEditLog() throws IOException {
+    waitForSyncToFinish();
     revertFileStreams(
         Storage.STORAGE_DIR_CURRENT + "/" + NameNodeFile.EDITS_NEW.getName());
   }
 
+
+  /**
+   * The actual sync activity happens while not synchronized on this object.
+   * Thus, synchronized activities that require that they are not concurrent
+   * with file operations should wait for any running sync to finish.
+   */
+  synchronized void waitForSyncToFinish() {
+    while (isSyncRunning) {
+      try {
+        wait(1000);
+      } catch (InterruptedException ie) {}
+    }
+  }
+
   /**
    * Revert file streams from file edits.new back to file edits.<p>
    * Close file streams, which are currently writing into getRoot()/source.
@@ -1300,6 +1367,8 @@ public class FSEditLog {
    * @throws IOException
    */
   synchronized void revertFileStreams(String source) throws IOException {
+    waitForSyncToFinish();
+
     assert getNumEditStreams() >= getNumEditsDirs() :
       "Inconsistent number of streams";
     ArrayList<EditLogOutputStream> errorStreams = null;
@@ -1311,7 +1380,8 @@ public class FSEditLog {
       EditLogOutputStream eStream = itE.next();
       StorageDirectory sd = itD.next();
       if(!eStream.getName().startsWith(sd.getRoot().getPath()))
-        throw new IOException("Inconsistent order of edit streams: " + eStream);
+        throw new IOException("Inconsistent order of edit streams: " + eStream +
+                              " does not start with " + sd.getRoot().getPath());
       try {
         // close old stream
         closeStream(eStream);

+ 4 - 0
src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -3855,6 +3855,10 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
    * @throws IOException
    */
   synchronized void enterSafeMode() throws IOException {
+    // Ensure that any concurrent operations have been fully synced
+    // before entering safe mode. This ensures that the FSImage
+    // is entirely stable on disk as soon as we're in safe mode.
+    getEditLog().logSyncAll();
     if (!isInSafeMode()) {
       safeMode = new SafeModeInfo();
       return;

+ 487 - 0
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java

@@ -0,0 +1,487 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.*;
+
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
+
+import static org.junit.Assert.*;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * This class tests various synchronization bugs in FSEditLog rolling
+ * and namespace saving.
+ */
+public class TestEditLogRace {
+  private static final Log LOG = LogFactory.getLog(TestEditLogRace.class);
+
+  private static final String NAME_DIR =
+    MiniDFSCluster.getBaseDirectory() + "name1";
+
+  // This test creates NUM_THREADS threads and each thread continuously writes
+  // transactions
+  static final int NUM_THREADS = 16;
+
+  /**
+   * The number of times to roll the edit log during the test. Since this
+   * tests for a race condition, higher numbers are more likely to find
+   * a bug if it exists, but the test will take longer.
+   */
+  static final int NUM_ROLLS = 30;
+
+  /**
+   * The number of times to save the fsimage and create an empty edit log.
+   */
+  static final int NUM_SAVE_IMAGE = 30;
+
+  private List<Transactions> workers = new ArrayList<Transactions>();
+
+  private static final int NUM_DATA_NODES = 1;
+
+  /**
+   * Several of the test cases work by introducing a sleep
+   * into an operation that is usually fast, and then verifying
+   * that another operation blocks for at least this amount of time.
+   * This value needs to be significantly longer than the average
+   * time for an fsync() or enterSafeMode().
+   */
+  private static final int BLOCK_TIME = 10;
+  
+  //
+  // an object that does a bunch of transactions
+  //
+  static class Transactions implements Runnable {
+    FSNamesystem namesystem;
+    short replication = 3;
+    long blockSize = 64;
+    volatile boolean stopped = false;
+    volatile Thread thr;
+    AtomicReference<Throwable> caught;
+
+    Transactions(FSNamesystem ns, AtomicReference<Throwable> caught) {
+      namesystem = ns;
+      this.caught = caught;
+    }
+
+    // add a bunch of transactions.
+    public void run() {
+      thr = Thread.currentThread();
+      PermissionStatus p = namesystem.createFsOwnerPermissions(
+                                          new FsPermission((short)0777));
+      int i = 0;
+      while (!stopped) {
+        try {
+          String dirname = "/thr-" + thr.getId() + "-dir-" + i; 
+          namesystem.mkdirs(dirname, p, true);
+          namesystem.delete(dirname, true);
+        } catch (SafeModeException sme) {
+          // This is OK - the tests will bring NN in and out of safemode
+        } catch (Throwable e) {
+          LOG.warn("Got error in transaction thread", e);
+          caught.compareAndSet(null, e);
+          break;
+        }
+        i++;
+      }
+    }
+
+    public void stop() {
+      stopped = true;
+    }
+
+    public Thread getThread() {
+      return thr;
+    }
+  }
+
+  private void startTransactionWorkers(FSNamesystem namesystem,
+                                       AtomicReference<Throwable> caughtErr) {
+    // Create threads and make them run transactions concurrently.
+    for (int i = 0; i < NUM_THREADS; i++) {
+      Transactions trans = new Transactions(namesystem, caughtErr);
+      new Thread(trans, "TransactionThread-" + i).start();
+      workers.add(trans);
+    }
+  }
+
+  private void stopTransactionWorkers() {
+    // wait for all transactions to get over
+    for (Transactions worker : workers) {
+      worker.stop();
+    }
+
+    for (Transactions worker : workers) {
+      Thread thr = worker.getThread();
+      try {
+        if (thr != null) thr.join();
+      } catch (InterruptedException ie) {}
+    }
+  }
+
+  /**
+   * Tests rolling edit logs while transactions are ongoing.
+   */
+  @Test
+  public void testEditLogRolling() throws Exception {
+    // start a cluster 
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+    FileSystem fileSys = null;
+
+
+    AtomicReference<Throwable> caughtErr = new AtomicReference<Throwable>();
+    try {
+      cluster = new MiniDFSCluster(conf, NUM_DATA_NODES, true, null);
+      cluster.waitActive();
+      fileSys = cluster.getFileSystem();
+      final FSNamesystem namesystem = cluster.getNamesystem();
+
+      FSImage fsimage = namesystem.getFSImage();
+      FSEditLog editLog = fsimage.getEditLog();
+
+      // set small size of flush buffer
+      editLog.setBufferCapacity(2048);
+      editLog.close();
+      editLog.open();
+
+      startTransactionWorkers(namesystem, caughtErr);
+
+      for (int i = 0; i < NUM_ROLLS && caughtErr.get() == null; i++) {
+        try {
+          Thread.sleep(20);
+        } catch (InterruptedException e) {}
+
+        LOG.info("Starting roll " + i + ".");
+        editLog.rollEditLog();
+        LOG.info("Roll complete " + i + ".");
+
+        verifyEditLogs(namesystem, fsimage);
+
+        LOG.info("Starting purge " + i + ".");
+        editLog.purgeEditLog();
+        LOG.info("Complete purge " + i + ".");
+      }
+    } finally {
+      stopTransactionWorkers();
+      if (caughtErr.get() != null) {
+        throw new RuntimeException(caughtErr.get());
+      }
+
+      if(fileSys != null) fileSys.close();
+      if(cluster != null) cluster.shutdown();
+    }
+  }
+
+  private void verifyEditLogs(FSNamesystem namesystem, FSImage fsimage)
+    throws IOException {
+    // Verify that we can read in all the transactions that we have written.
+    // If there were any corruptions, it is likely that the reading in
+    // of these transactions will throw an exception.
+    for (Iterator<StorageDirectory> it = 
+           fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+      File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS);
+      System.out.println("Verifying file: " + editFile);
+      int numEdits = namesystem.getEditLog().loadFSEdits(
+        new EditLogFileInputStream(editFile));
+      System.out.println("Number of edits: " + numEdits);
+    }
+  }
+
+  /**
+   * Tests saving fs image while transactions are ongoing.
+   */
+  @Test
+  public void testSaveNamespace() throws Exception {
+    // start a cluster 
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+    FileSystem fileSys = null;
+
+    AtomicReference<Throwable> caughtErr = new AtomicReference<Throwable>();
+    try {
+      cluster = new MiniDFSCluster(conf, NUM_DATA_NODES, true, null);
+      cluster.waitActive();
+      fileSys = cluster.getFileSystem();
+      final FSNamesystem namesystem = cluster.getNamesystem();
+
+      FSImage fsimage = namesystem.getFSImage();
+      FSEditLog editLog = fsimage.getEditLog();
+
+      // set small size of flush buffer
+      editLog.setBufferCapacity(2048);
+      editLog.close();
+      editLog.open();
+
+      startTransactionWorkers(namesystem, caughtErr);
+
+      for (int i = 0; i < NUM_SAVE_IMAGE && caughtErr.get() == null; i++) {
+        try {
+          Thread.sleep(20);
+        } catch (InterruptedException e) {}
+
+
+        LOG.info("Save " + i + ": entering safe mode");
+        namesystem.enterSafeMode();
+
+        // Verify edit logs before the save
+        verifyEditLogs(namesystem, fsimage);
+
+        LOG.info("Save " + i + ": saving namespace");
+        namesystem.saveNamespace();
+        LOG.info("Save " + i + ": leaving safemode");
+
+        // Verify that edit logs post save are also not corrupt
+        verifyEditLogs(namesystem, fsimage);
+
+        namesystem.leaveSafeMode(false);
+        LOG.info("Save " + i + ": complete");
+
+      }
+    } finally {
+      stopTransactionWorkers();
+      if (caughtErr.get() != null) {
+        throw new RuntimeException(caughtErr.get());
+      }
+      if(fileSys != null) fileSys.close();
+      if(cluster != null) cluster.shutdown();
+    }
+  }
+ 
+  private Configuration getConf() {
+    Configuration conf = new HdfsConfiguration();
+    FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
+    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, NAME_DIR);
+    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, NAME_DIR);
+    conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false); 
+    return conf;
+  }
+
+
+  /**
+   * The logSync() method in FSEditLog is unsynchronized whiel syncing
+   * so that other threads can concurrently enqueue edits while the prior
+   * sync is ongoing. This test checks that the log is saved correctly
+   * if the saveImage occurs while the syncing thread is in the unsynchronized middle section.
+   * 
+   * This replicates the following manual test proposed by Konstantin:
+   *   I start the name-node in debugger.
+   *   I do -mkdir and stop the debugger in logSync() just before it does flush.
+   *   Then I enter safe mode with another client
+   *   I start saveNamepsace and stop the debugger in
+   *     FSImage.saveFSImage() -> FSEditLog.createEditLogFile()
+   *     -> EditLogFileOutputStream.create() ->
+   *     after truncating the file but before writing LAYOUT_VERSION into it.
+   *   Then I let logSync() run.
+   *   Then I terminate the name-node.
+   *   After that the name-node wont start, since the edits file is broken.
+   */
+  @Test
+  public void testSaveImageWhileSyncInProgress() throws Exception {
+    Configuration conf = getConf();
+    NameNode.initMetrics(conf, NamenodeRole.ACTIVE);
+    NameNode.format(conf);
+    final FSNamesystem namesystem = new FSNamesystem(conf);
+
+    try {
+      FSImage fsimage = namesystem.getFSImage();
+      FSEditLog editLog = fsimage.getEditLog();
+
+      ArrayList<EditLogOutputStream> streams = editLog.getEditStreams();
+      EditLogOutputStream spyElos = spy(streams.get(0));
+      streams.set(0, spyElos);
+
+      final AtomicReference<Throwable> deferredException =
+          new AtomicReference<Throwable>();
+      final CountDownLatch waitToEnterFlush = new CountDownLatch(1);
+      
+      final Thread doAnEditThread = new Thread() {
+        public void run() {
+          try {
+            LOG.info("Starting mkdirs");
+            namesystem.mkdirs("/test",
+                new PermissionStatus("test","test", new FsPermission((short)00755)),
+                true);
+            LOG.info("mkdirs complete");
+          } catch (Throwable ioe) {
+            deferredException.set(ioe);
+            waitToEnterFlush.countDown();
+          }
+        }
+      };
+      
+      Answer<Void> blockingFlush = new Answer<Void>() {
+        @Override
+        public Void answer(InvocationOnMock invocation) throws Throwable {
+          LOG.info("Flush called");
+          if (Thread.currentThread() == doAnEditThread) {
+            LOG.info("edit thread: Telling main thread we made it to flush section...");
+            // Signal to main thread that the edit thread is in the racy section
+            waitToEnterFlush.countDown();
+            LOG.info("edit thread: sleeping for " + BLOCK_TIME + "secs");
+            Thread.sleep(BLOCK_TIME*1000);
+            LOG.info("Going through to flush. This will allow the main thread to continue.");
+          }
+          invocation.callRealMethod();
+          LOG.info("Flush complete");
+          return null;
+        }
+      };
+      doAnswer(blockingFlush).when(spyElos).flush();
+      
+      doAnEditThread.start();
+      // Wait for the edit thread to get to the logsync unsynchronized section
+      LOG.info("Main thread: waiting to enter flush...");
+      waitToEnterFlush.await();
+      assertNull(deferredException.get());
+      LOG.info("Main thread: detected that logSync is in unsynchronized section.");
+      LOG.info("Trying to enter safe mode.");
+      LOG.info("This should block for " + BLOCK_TIME + "sec, since flush will sleep that long");
+      
+      long st = System.currentTimeMillis();
+      namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+      long et = System.currentTimeMillis();
+      LOG.info("Entered safe mode");
+      // Make sure we really waited for the flush to complete!
+      assertTrue(et - st > (BLOCK_TIME - 1)*1000);
+
+      // Once we're in safe mode, save namespace.
+      namesystem.saveNamespace();
+
+      LOG.info("Joining on edit thread...");
+      doAnEditThread.join();
+      assertNull(deferredException.get());
+
+      verifyEditLogs(namesystem, fsimage);
+    } finally {
+      LOG.info("Closing namesystem");
+      if(namesystem != null) namesystem.close();
+    }
+  }
+  
+  /**
+   * Most of the FSNamesystem methods have a synchronized section where they
+   * update the name system itself and write to the edit log, and then
+   * unsynchronized, they call logSync. This test verifies that, if an
+   * operation has written to the edit log but not yet synced it,
+   * we wait for that sync before entering safe mode.
+   */
+  @Test
+  public void testSaveRightBeforeSync() throws Exception {
+    Configuration conf = getConf();
+    NameNode.initMetrics(conf, NamenodeRole.ACTIVE);
+    NameNode.format(conf);
+    final FSNamesystem namesystem = new FSNamesystem(conf);
+
+    try {
+      FSImage fsimage = namesystem.getFSImage();
+      FSEditLog editLog = spy(fsimage.getEditLog());
+      fsimage.editLog = editLog;
+      
+      final AtomicReference<Throwable> deferredException =
+          new AtomicReference<Throwable>();
+      final CountDownLatch waitToEnterSync = new CountDownLatch(1);
+      
+      final Thread doAnEditThread = new Thread() {
+        public void run() {
+          try {
+            LOG.info("Starting mkdirs");
+            namesystem.mkdirs("/test",
+                new PermissionStatus("test","test", new FsPermission((short)00755)),
+                true);
+            LOG.info("mkdirs complete");
+          } catch (Throwable ioe) {
+            deferredException.set(ioe);
+            waitToEnterSync.countDown();
+          }
+        }
+      };
+      
+      Answer<Void> blockingSync = new Answer<Void>() {
+        @Override
+        public Void answer(InvocationOnMock invocation) throws Throwable {
+          LOG.info("logSync called");
+          if (Thread.currentThread() == doAnEditThread) {
+            LOG.info("edit thread: Telling main thread we made it just before logSync...");
+            waitToEnterSync.countDown();
+            LOG.info("edit thread: sleeping for " + BLOCK_TIME + "secs");
+            Thread.sleep(BLOCK_TIME*1000);
+            LOG.info("Going through to logSync. This will allow the main thread to continue.");
+          }
+          invocation.callRealMethod();
+          LOG.info("logSync complete");
+          return null;
+        }
+      };
+      doAnswer(blockingSync).when(editLog).logSync();
+      
+      doAnEditThread.start();
+      LOG.info("Main thread: waiting to just before logSync...");
+      waitToEnterSync.await();
+      assertNull(deferredException.get());
+      LOG.info("Main thread: detected that logSync about to be called.");
+      LOG.info("Trying to enter safe mode.");
+      LOG.info("This should block for " + BLOCK_TIME + "sec, since we have pending edits");
+      
+      long st = System.currentTimeMillis();
+      namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+      long et = System.currentTimeMillis();
+      LOG.info("Entered safe mode");
+      // Make sure we really waited for the flush to complete!
+      assertTrue(et - st > (BLOCK_TIME - 1)*1000);
+
+      // Once we're in safe mode, save namespace.
+      namesystem.saveNamespace();
+
+      LOG.info("Joining on edit thread...");
+      doAnEditThread.join();
+      assertNull(deferredException.get());
+
+      verifyEditLogs(namesystem, fsimage);
+    } finally {
+      LOG.info("Closing namesystem");
+      if(namesystem != null) namesystem.close();
+    }
+  }  
+}