瀏覽代碼

HDFS-2169. Clean up TestCheckpoint and remove TODOs. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1073@1148587 13f79535-47bb-0310-9956-ffa450edef68
Todd Lipcon 14 年之前
父節點
當前提交
2c44868e87

+ 1 - 0
hdfs/CHANGES.HDFS-1073.txt

@@ -77,3 +77,4 @@ HDFS-2104. Add a flag to the 2NN to format its checkpoint dirs on startup.
 HDFS-2135. Fix regression of HDFS-1955 in HDFS-1073 branch. (todd)
 HDFS-2160. Fix CreateEditsLog test tool in HDFS-1073 branch. (todd)
 HDFS-2168. Reenable TestEditLog.testFailedOpen and fix exposed bug. (todd)
+HDFS-2169. Clean up TestCheckpoint and remove TODOs (todd)

+ 40 - 37
hdfs/src/java/org/apache/hadoop/hdfs/server/common/Storage.java

@@ -640,13 +640,17 @@ public abstract class Storage extends StorageInfo {
         LOG.info("Locking is disabled");
         return;
       }
-      this.lock = tryLock();
-      if (lock == null) {
+      FileLock newLock = tryLock();
+      if (newLock == null) {
         String msg = "Cannot lock storage " + this.root 
           + ". The directory is already locked.";
         LOG.info(msg);
         throw new IOException(msg);
       }
+      // Don't overwrite lock until success - this way if we accidentally
+      // call lock twice, the internal state won't be cleared by the second
+      // (failed) lock attempt
+      lock = newLock;
     }
 
     /**
@@ -691,6 +695,40 @@ public abstract class Storage extends StorageInfo {
     public String toString() {
       return "Storage Directory " + this.root;
     }
+
+    /**
+     * Check whether underlying file system supports file locking.
+     * 
+     * @return <code>true</code> if exclusive locks are supported or
+     *         <code>false</code> otherwise.
+     * @throws IOException
+     * @see StorageDirectory#lock()
+     */
+    public boolean isLockSupported() throws IOException {
+      FileLock firstLock = null;
+      FileLock secondLock = null;
+      try {
+        firstLock = lock;
+        if(firstLock == null) {
+          firstLock = tryLock();
+          if(firstLock == null)
+            return true;
+        }
+        secondLock = tryLock();
+        if(secondLock == null)
+          return true;
+      } finally {
+        if(firstLock != null && firstLock != lock) {
+          firstLock.release();
+          firstLock.channel().close();
+        }
+        if(secondLock != null) {
+          secondLock.release();
+          secondLock.channel().close();
+        }
+      }
+      return false;
+    }
   }
 
   /**
@@ -837,41 +875,6 @@ public abstract class Storage extends StorageInfo {
     }
   }
 
-  /**
-   * Check whether underlying file system supports file locking.
-   * 
-   * @return <code>true</code> if exclusive locks are supported or
-   *         <code>false</code> otherwise.
-   * @throws IOException
-   * @see StorageDirectory#lock()
-   */
-  public boolean isLockSupported(int idx) throws IOException {
-    StorageDirectory sd = storageDirs.get(idx);
-    FileLock firstLock = null;
-    FileLock secondLock = null;
-    try {
-      firstLock = sd.lock;
-      if(firstLock == null) {
-        firstLock = sd.tryLock();
-        if(firstLock == null)
-          return true;
-      }
-      secondLock = sd.tryLock();
-      if(secondLock == null)
-        return true;
-    } finally {
-      if(firstLock != null && firstLock != sd.lock) {
-        firstLock.release();
-        firstLock.channel().close();
-      }
-      if(secondLock != null) {
-        secondLock.release();
-        secondLock.channel().close();
-      }
-    }
-    return false;
-  }
-
   public static String getBuildVersion() {
     return VersionInfo.getRevision();
   }

+ 299 - 237
hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java

@@ -68,6 +68,7 @@ import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
 
 import static org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil.assertNNHasCheckpoints;
 import static org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil.getNameNodeCurrentDirs;
@@ -88,10 +89,11 @@ public class TestCheckpoint extends TestCase {
   static final int fileSize = 8192;
   static final int numDatanodes = 3;
   short replication = 3;
-  
+    
   @Override
   public void setUp() throws IOException {
     FileUtil.fullyDeleteContents(new File(MiniDFSCluster.getBaseDirectory()));
+    ErrorSimulator.initializeErrorSimulationEvent(5);
   }
 
   static void writeFile(FileSystem fileSys, Path name, int repl)
@@ -122,67 +124,52 @@ public class TestCheckpoint extends TestCase {
     assertTrue(!fileSys.exists(name));
   }
 
-  /**
-   * put back the old namedir
-   */
-  private void resurrectNameDir(File namedir) 
-    throws IOException {
-    String parentdir = namedir.getParent();
-    String name = namedir.getName();
-    File oldname =  new File(parentdir, name + ".old");
-    if (!oldname.renameTo(namedir)) {
-      assertTrue(false);
-    }
-  }
-
-  /**
-   * remove one namedir
-   */
-  private void removeOneNameDir(File namedir) 
-    throws IOException {
-    String parentdir = namedir.getParent();
-    String name = namedir.getName();
-    File newname =  new File(parentdir, name + ".old");
-    if (!namedir.renameTo(newname)) {
-      assertTrue(false);
-    }
-  }
-
   /*
    * Verify that namenode does not startup if one namedir is bad.
    */
-  private void testNamedirError(Configuration conf, Collection<URI> namedirs) 
-    throws IOException {
-    System.out.println("Starting testNamedirError");
-    MiniDFSCluster cluster = null;
-
-    if (namedirs.size() <= 1) {
-      return;
-    }
+  public void testNameDirError() throws IOException {
+    LOG.info("Starting testNameDirError");
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(0)
+      .build();
     
-    //
-    // Remove one namedir & Restart cluster. This should fail.
-    //
-    File first = new File(namedirs.iterator().next().getPath());
-    removeOneNameDir(first);
-    try {
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).format(false).build();
-      cluster.shutdown();
-      assertTrue(false);
-    } catch (Throwable t) {
-      // no nothing
+    Collection<URI> nameDirs = cluster.getNameDirs(0);
+    cluster.shutdown();
+    cluster = null;
+    
+    for (URI nameDirUri : nameDirs) {
+      File dir = new File(nameDirUri.getPath());
+      
+      try {
+        // Simulate the mount going read-only
+        dir.setWritable(false);
+        cluster = new MiniDFSCluster.Builder(conf)
+          .numDataNodes(0)
+          .format(false)
+          .build();
+        fail("NN should have failed to start with " + dir + " set unreadable");
+      } catch (IOException ioe) {
+        GenericTestUtils.assertExceptionContains(
+            "storage directory does not exist or is not accessible",
+            ioe);
+      } finally {
+        if (cluster != null) {
+          cluster.shutdown();
+          cluster = null;
+        }
+        dir.setWritable(true);
+      }
     }
-    resurrectNameDir(first); // put back namedir
   }
 
-
   /**
    * Checks that an IOException in NNStorage.writeTransactionIdFile is handled
    * correctly (by removing the storage directory)
    * See https://issues.apache.org/jira/browse/HDFS-2011
    */
   public void testWriteTransactionIdHandlesIOE() throws Exception {
-    System.out.println("Check IOException handled correctly by writeTransactionIdFile");
+    LOG.info("Check IOException handled correctly by writeTransactionIdFile");
     ArrayList<URI> fsImageDirs = new ArrayList<URI>();
     ArrayList<URI> editsDirs = new ArrayList<URI>();
     File filePath =
@@ -210,21 +197,20 @@ public class TestCheckpoint extends TestCase {
     assertTrue("Removed directory wasn't what was expected",
                listRsd.size() > 0 && listRsd.get(listRsd.size() - 1).getRoot().
                toString().indexOf("storageDirToCheck") != -1);
-    System.out.println("Successfully checked IOException is handled correctly "
-                       + "by writeTransactionIdFile");
   }
 
   /*
    * Simulate namenode crashing after rolling edit log.
    */
   @SuppressWarnings("deprecation")
-  private void testSecondaryNamenodeError1(Configuration conf)
+  public void testSecondaryNamenodeError1()
     throws IOException {
-    System.out.println("Starting testSecondaryNamenodeError 1");
+    LOG.info("Starting testSecondaryNamenodeError1");
+    Configuration conf = new HdfsConfiguration();
     Path file1 = new Path("checkpointxx.dat");
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
                                                .numDataNodes(numDatanodes)
-                                               .format(false).build();
+                                               .build();
     cluster.waitActive();
     FileSystem fileSys = cluster.getFileSystem();
     try {
@@ -258,7 +244,6 @@ public class TestCheckpoint extends TestCase {
     // Then take another checkpoint to verify that the 
     // namenode restart accounted for the rolled edit logs.
     //
-    System.out.println("Starting testSecondaryNamenodeError 2");
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes)
                                               .format(false).build();
     cluster.waitActive();
@@ -280,13 +265,13 @@ public class TestCheckpoint extends TestCase {
    * Simulate a namenode crash after uploading new image
    */
   @SuppressWarnings("deprecation")
-  private void testSecondaryNamenodeError2(Configuration conf)
-    throws IOException {
-    System.out.println("Starting testSecondaryNamenodeError 21");
+  public void testSecondaryNamenodeError2() throws IOException {
+    LOG.info("Starting testSecondaryNamenodeError2");
+    Configuration conf = new HdfsConfiguration();
     Path file1 = new Path("checkpointyy.dat");
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
                                                .numDataNodes(numDatanodes)
-                                               .format(false).build();
+                                               .build();
     cluster.waitActive();
     FileSystem fileSys = cluster.getFileSystem();
     try {
@@ -320,7 +305,6 @@ public class TestCheckpoint extends TestCase {
     // Then take another checkpoint to verify that the 
     // namenode restart accounted for the rolled edit logs.
     //
-    System.out.println("Starting testSecondaryNamenodeError 22");
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes).format(false).build();
     cluster.waitActive();
     fileSys = cluster.getFileSystem();
@@ -340,13 +324,13 @@ public class TestCheckpoint extends TestCase {
    * Simulate a secondary namenode crash after rolling the edit log.
    */
   @SuppressWarnings("deprecation")
-  private void testSecondaryNamenodeError3(Configuration conf)
-    throws IOException {
-    System.out.println("Starting testSecondaryNamenodeError 31");
+  public void testSecondaryNamenodeError3() throws IOException {
+    LOG.info("Starting testSecondaryNamenodeError3");
+    Configuration conf = new HdfsConfiguration();
     Path file1 = new Path("checkpointzz.dat");
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
                                                .numDataNodes(numDatanodes)
-                                               .format(false).build();
+                                               .build();
 
     cluster.waitActive();
     FileSystem fileSys = cluster.getFileSystem();
@@ -389,7 +373,6 @@ public class TestCheckpoint extends TestCase {
     // Then take another checkpoint to verify that the 
     // namenode restart accounted for the twice-rolled edit logs.
     //
-    System.out.println("Starting testSecondaryNamenodeError 32");
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes).format(false).build();
     cluster.waitActive();
     fileSys = cluster.getFileSystem();
@@ -411,13 +394,13 @@ public class TestCheckpoint extends TestCase {
    * Used to truncate primary fsimage file.
    */
   @SuppressWarnings("deprecation")
-  void testSecondaryFailsToReturnImage(Configuration conf)
-    throws IOException {
-    System.out.println("Starting testSecondaryFailsToReturnImage");
+  public void testSecondaryFailsToReturnImage() throws IOException {
+    LOG.info("Starting testSecondaryFailsToReturnImage");
+    Configuration conf = new HdfsConfiguration();
     Path file1 = new Path("checkpointRI.dat");
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
                                                .numDataNodes(numDatanodes)
-                                               .format(false).build();
+                                               .build();
     cluster.waitActive();
     FileSystem fileSys = cluster.getFileSystem();
     FSImage image = cluster.getNameNode().getFSImage();
@@ -437,8 +420,9 @@ public class TestCheckpoint extends TestCase {
         secondary.doCheckpoint();  // this should fail
         fail("Checkpoint succeeded even though we injected an error!");
       } catch (IOException e) {
-        System.out.println("testSecondaryFailsToReturnImage: doCheckpoint() " +
-            "failed predictably - " + e);
+        // check that it's the injected exception
+        GenericTestUtils.assertExceptionContains(
+            "If this exception is not caught", e);
       }
       ErrorSimulator.clearErrorSimulation(2);
 
@@ -460,17 +444,26 @@ public class TestCheckpoint extends TestCase {
 
   /**
    * Simulate 2NN failing to send the whole file (error type 3)
-   * and secondary sending a corrupt file (error type 4). The
-   * length and digest headers in the HTTP transfer should prevent
-   * these from corrupting the NN.
+   * The length header in the HTTP transfer should prevent
+   * this from corrupting the NN.
    */
-  void testNameNodeImageSendFail(Configuration conf)
-    throws IOException {
-    System.out.println("Starting testNameNodeImageSendFail");
-    doSendFailTest(conf, 3, "is not of the advertised size");
-    doSendFailTest(conf, 4, "does not match advertised digest");
+  public void testNameNodeImageSendFailWrongSize()
+      throws IOException {
+    LOG.info("Starting testNameNodeImageSendFailWrongSize");
+    doSendFailTest(3, "is not of the advertised size");
   }
-  
+
+  /**
+   * Simulate 2NN sending a corrupt image (error type 4)
+   * The digest header in the HTTP transfer should prevent
+   * this from corrupting the NN.
+   */
+  public void testNameNodeImageSendFailWrongDigest()
+      throws IOException {
+    LOG.info("Starting testNameNodeImageSendFailWrongDigest");
+    doSendFailTest(4, "does not match advertised digest");
+  }
+
   /**
    * Run a test where the 2NN runs into some kind of error when
    * sending the checkpoint back to the NN.
@@ -478,14 +471,13 @@ public class TestCheckpoint extends TestCase {
    * @param exceptionSubstring an expected substring of the triggered exception
    */
   @SuppressWarnings("deprecation")
-  private void doSendFailTest(Configuration conf,
-      int errorType, String exceptionSubstring)
+  private void doSendFailTest(int errorType, String exceptionSubstring)
       throws IOException {
-    
+    Configuration conf = new HdfsConfiguration();
     Path file1 = new Path("checkpoint-doSendFailTest-" + errorType + ".dat");
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
                                                .numDataNodes(numDatanodes)
-                                               .format(false).build();
+                                               .build();
     cluster.waitActive();
     FileSystem fileSys = cluster.getFileSystem();
     try {
@@ -501,14 +493,14 @@ public class TestCheckpoint extends TestCase {
         fail("Did not get expected exception");
       } catch (IOException e) {
         // We only sent part of the image. Have to trigger this exception
-        assertTrue(e.getMessage().contains(exceptionSubstring));
+        GenericTestUtils.assertExceptionContains(exceptionSubstring, e);
       }
       ErrorSimulator.clearErrorSimulation(errorType);
       secondary.shutdown(); // secondary namenode crash!
 
       // start new instance of secondary and verify that 
-      // a new rollEditLog suceedes inspite of the fact that 
-      // edits.new already exists. TODO update this comment!
+      // a new rollEditLog succedes in spite of the fact that we had
+      // a partially failed checkpoint previously.
       //
       secondary = startSecondaryNameNode(conf);
       secondary.doCheckpoint();  // this should work correctly
@@ -526,159 +518,247 @@ public class TestCheckpoint extends TestCase {
   }
   
   /**
-   * Test different startup scenarios.
-   * <p><ol>
-   * <li> Start of primary name-node in secondary directory must succeed. 
-   * <li> Start of secondary node when the primary is already running in 
-   *      this directory must fail.
-   * <li> Start of primary name-node if secondary node is already running in 
-   *      this directory must fail.
-   * <li> Start of two secondary nodes in the same directory must fail.
-   * <li> Import of a checkpoint must fail if primary 
-   * directory contains a valid image.
-   * <li> Import of the secondary image directory must succeed if primary 
-   * directory does not exist.
-   * <li> Recover failed checkpoint for secondary node.
-   * <li> Complete failed checkpoint for secondary node.
-   * </ol>
+   * Test that the NN locks its storage and edits directories, and won't start up
+   * if the directories are already locked
+   **/
+  public void testNameDirLocking() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(0)
+      .build();
+    
+    // Start a NN, and verify that lock() fails in all of the configured
+    // directories
+    StorageDirectory savedSd = null;
+    try {
+      NNStorage storage = cluster.getNameNode().getFSImage().getStorage();
+      for (StorageDirectory sd : storage.dirIterable(null)) {
+        assertLockFails(sd);
+        savedSd = sd;
+      }
+    } finally {
+      cluster.shutdown();
+    }
+    assertNotNull(savedSd);
+    
+    // Lock one of the saved directories, then start the NN, and make sure it
+    // fails to start
+    assertClusterStartFailsWhenDirLocked(conf, savedSd);
+  }
+
+  /**
+   * Test that, if the edits dir is separate from the name dir, it is
+   * properly locked.
+   **/
+  public void testSeparateEditsDirLocking() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    File editsDir = new File(MiniDFSCluster.getBaseDirectory() +
+        "/testSeparateEditsDirLocking");
+    
+    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
+        editsDir.getAbsolutePath());
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .manageNameDfsDirs(false)
+      .numDataNodes(0)
+      .build();
+    
+    // Start a NN, and verify that lock() fails in all of the configured
+    // directories
+    StorageDirectory savedSd = null;
+    try {
+      NNStorage storage = cluster.getNameNode().getFSImage().getStorage();
+      for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
+        assertEquals(editsDir.getAbsoluteFile(), sd.getRoot());
+        assertLockFails(sd);
+        savedSd = sd;
+      }
+    } finally {
+      cluster.shutdown();
+    }
+    assertNotNull(savedSd);
+    
+    // Lock one of the saved directories, then start the NN, and make sure it
+    // fails to start
+    assertClusterStartFailsWhenDirLocked(conf, savedSd);
+  }
+  
+  /**
+   * Test that the SecondaryNameNode properly locks its storage directories.
    */
   @SuppressWarnings("deprecation")
-  void testStartup(Configuration conf) throws IOException {
-    System.out.println("Startup of the name-node in the checkpoint directory.");
-    String primaryDirs = conf.get(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
-    String primaryEditsDirs = conf.get(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY);
-    String checkpointDirs = conf.get(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY);
-    String checkpointEditsDirs = conf.get(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY);
-    NameNode nn = startNameNode(conf, checkpointDirs, checkpointEditsDirs,
-                                 StartupOption.REGULAR);
-
-    // Starting secondary node in the same directory as the primary
-    System.out.println("Startup of secondary in the same dir as the primary.");
+  public void testSecondaryNameNodeLocking() throws Exception {
+    // Start a primary NN so that the secondary will start successfully
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(0)
+      .build();
+    
     SecondaryNameNode secondary = null;
     try {
+      StorageDirectory savedSd = null;
+      // Start a secondary NN, then make sure that all of its storage
+      // dirs got locked.
       secondary = startSecondaryNameNode(conf);
-      assertFalse(secondary.getFSImage().getStorage().isLockSupported(0));
+      
+      NNStorage storage = secondary.getFSImage().getStorage();
+      for (StorageDirectory sd : storage.dirIterable(null)) {
+        assertLockFails(sd);
+        savedSd = sd;
+      }
+      LOG.info("===> Shutting down first 2NN");
       secondary.shutdown();
-    } catch (IOException e) { // expected to fail
-      assertTrue(secondary == null);
-    }
-    nn.stop(); nn = null;
-
-    // Starting primary node in the same directory as the secondary
-    System.out.println("Startup of primary in the same dir as the secondary.");
-    // secondary won't start without primary
-    nn = startNameNode(conf, primaryDirs, primaryEditsDirs,
-                        StartupOption.REGULAR);
-    boolean succeed = false;
-    do {
+      secondary = null;
+
+      LOG.info("===> Locking a dir, starting second 2NN");
+      // Lock one of its dirs, make sure it fails to start
+      LOG.info("Trying to lock" + savedSd);
+      savedSd.lock();
       try {
         secondary = startSecondaryNameNode(conf);
-        succeed = true;
-      } catch(IOException ie) { // keep trying
-        System.out.println("Try again: " + ie.getLocalizedMessage());
+        assertFalse("Should fail to start 2NN when " + savedSd + " is locked",
+            savedSd.isLockSupported());
+      } catch (IOException ioe) {
+        GenericTestUtils.assertExceptionContains("already locked", ioe);
+      } finally {
+        savedSd.unlock();
       }
-    } while(!succeed);
-    nn.stop(); nn = null;
-    try {
-      nn = startNameNode(conf, checkpointDirs, checkpointEditsDirs,
-                          StartupOption.REGULAR);
-      assertFalse(nn.getFSImage().getStorage().isLockSupported(0));
-      nn.stop(); nn = null;
-    } catch (IOException e) { // expected to fail
-      assertTrue(nn == null);
+      
+    } finally {
+      if (secondary != null) {
+        secondary.shutdown();
+      }
+      cluster.shutdown();
     }
+  }
+  
 
-    // Try another secondary in the same directory
-    System.out.println("Startup of two secondaries in the same dir.");
-    // secondary won't start without primary
-    nn = startNameNode(conf, primaryDirs, primaryEditsDirs,
-                        StartupOption.REGULAR);
-    SecondaryNameNode secondary2 = null;
+  /**
+   * Assert that the given storage directory can't be locked, because
+   * it's already locked.
+   */
+  private static void assertLockFails(StorageDirectory sd) {
     try {
-      secondary2 = startSecondaryNameNode(conf);
-      assertFalse(secondary2.getFSImage().getStorage().isLockSupported(0));
-      secondary2.shutdown();
-    } catch (IOException e) { // expected to fail
-      assertTrue(secondary2 == null);
+      sd.lock();
+      // If the above line didn't throw an exception, then
+      // locking must not be supported
+      assertFalse(sd.isLockSupported());
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("already locked", ioe);
     }
-    nn.stop(); nn = null;
-    secondary.shutdown();
+  }
+  
+  /**
+   * Assert that, if sdToLock is locked, the cluster is not allowed to start up.
+   * @param conf cluster conf to use
+   * @param sdToLock the storage directory to lock
+   */
+  private static void assertClusterStartFailsWhenDirLocked(
+      Configuration conf, StorageDirectory sdToLock) throws IOException {
+    // Lock the edits dir, then start the NN, and make sure it fails to start
+    sdToLock.lock();
+    try {      
+      MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .manageNameDfsDirs(false)
+        .numDataNodes(0)
+        .build();
+      assertFalse("cluster should fail to start after locking " +
+          sdToLock, sdToLock.isLockSupported());
+      cluster.shutdown();
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("already locked", ioe);
+    } finally {
+      sdToLock.unlock();
+    }
+  }
 
-    // Import a checkpoint with existing primary image.
-    System.out.println("Import a checkpoint with existing primary image.");
+  /**
+   * Test the importCheckpoint startup option. Verifies:
+   * 1. if the NN already contains an image, it will not be allowed
+   *   to import a checkpoint.
+   * 2. if the NN does not contain an image, importing a checkpoint
+   *    succeeds and re-saves the image
+   */
+  @SuppressWarnings("deprecation")
+  public void testImportCheckpoint() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    Path testPath = new Path("/testfile");
+    SecondaryNameNode snn = null;
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(0)
+      .build();
+    Collection<URI> nameDirs = cluster.getNameDirs(0);
     try {
-      nn = startNameNode(conf, primaryDirs, primaryEditsDirs,
-                          StartupOption.IMPORT);
-      fail("Importing a checkpoint with existing primary image did not fail");
-    } catch (IOException e) { // expected to fail
-      assertNull(nn);
+      // Make an entry in the namespace, used for verifying checkpoint
+      // later.
+      cluster.getFileSystem().mkdirs(testPath);
+      
+      // Take a checkpoint
+      snn = startSecondaryNameNode(conf);
+      snn.doCheckpoint();
+    } finally {
+      if (snn != null) {
+        snn.shutdown();
+      }
+      cluster.shutdown();
+      cluster = null;
     }
     
-    // Remove current image and import a checkpoint.
-    System.out.println("Import a checkpoint with existing primary image.");
-    List<URI> nameDirs = (List<URI>)FSNamesystem.getNamespaceDirs(conf);
-    List<URI> nameEditsDirs = (List<URI>)FSNamesystem.
-                                  getNamespaceEditsDirs(conf);
-    File newestImageFile = FSImageTestUtil.findNewestImageFile(
-        nameDirs.get(0).getPath() + "/current");
-    long fsimageLength = newestImageFile.length();
-    assertTrue(fsimageLength > 0);
+    LOG.info("Trying to import checkpoint when the NameNode already " +
+    		"contains an image. This should fail.");
+    try {
+      cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(0)
+      .format(false)
+      .startupOption(StartupOption.IMPORT)
+      .build();
+      fail("NameNode did not fail to start when it already contained " +
+      		"an image");
+    } catch (IOException ioe) {
+      // Expected
+      GenericTestUtils.assertExceptionContains(
+          "NameNode already contains an image", ioe);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+        cluster = null;
+      }
+    }
     
+    LOG.info("Removing NN storage contents");
     for(URI uri : nameDirs) {
       File dir = new File(uri.getPath());
-      if(dir.exists())
-        if(!(FileUtil.fullyDelete(dir)))
-          throw new IOException("Cannot remove directory: " + dir);
-      if (!dir.mkdirs())
-        throw new IOException("Cannot create directory " + dir);
-    }
-
-    for(URI uri : nameEditsDirs) {
-      File dir = new File(uri.getPath());
-      if(dir.exists())
-        if(!(FileUtil.fullyDelete(dir)))
-          throw new IOException("Cannot remove directory: " + dir);
-      if (!dir.mkdirs())
-        throw new IOException("Cannot create directory " + dir);
+      LOG.info("Cleaning " + dir);
+      removeAndRecreateDir(dir);
     }
     
-    nn = startNameNode(conf, primaryDirs, primaryEditsDirs,
-                        StartupOption.IMPORT);
-    // Verify that image file sizes did not change.
-    FSImage image = nn.getFSImage();
-    for (StorageDirectory sd : image.getStorage().dirIterable(NameNodeDirType.IMAGE)) {
-      // TODO it would make more sense if the newest had the same txid, but
-      // on import, it actually re-saves with a 1-higher txid
-      File recreatedImageFile = FSImageTestUtil.findNewestImageFile(
-          sd.getCurrentDir().toString());
-      assertEquals(fsimageLength, recreatedImageFile.length());
-    }
-    nn.stop();
+    LOG.info("Trying to import checkpoint");
+    try {
+      cluster = new MiniDFSCluster.Builder(conf)
+        .format(false)
+        .numDataNodes(0)
+        .startupOption(StartupOption.IMPORT)
+        .build();
+      
+      assertTrue("Path from checkpoint should exist after import",
+          cluster.getFileSystem().exists(testPath));
 
-    // TODO: need tests that make sure that partially completed checkpoints
-    // don't leave an fsimage_ckpt file around, and that the 2NN cleans up
-    // any such files at startup
-    
-    // Check that everything starts ok now.
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes).format(false).build();
-    cluster.waitActive();
-    cluster.shutdown();
+      // Make sure that the image got saved on import
+      FSImageTestUtil.assertNNHasCheckpoints(cluster, Ints.asList(3));
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
   }
-
-  NameNode startNameNode( Configuration conf,
-                          String imageDirs,
-                          String editsDirs,
-                          StartupOption start) throws IOException {
-    conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "hdfs://localhost:0");
-    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");  
-    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, imageDirs);
-    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, editsDirs);
-    String[] args = new String[]{start.getName()};
-    NameNode nn = NameNode.createNameNode(args, conf);
-    assertTrue(nn.isInSafeMode());
-    return nn;
+  
+  private static void removeAndRecreateDir(File dir) throws IOException {
+    if(dir.exists())
+      if(!(FileUtil.fullyDelete(dir)))
+        throw new IOException("Cannot remove directory: " + dir);
+    if (!dir.mkdirs())
+      throw new IOException("Cannot create directory " + dir);
   }
-
+  
   // This deprecation suppress warning does not work due to known Java bug:
   // http://bugs.sun.com/view_bug.do?bug_id=6460147
   @SuppressWarnings("deprecation")
@@ -705,8 +785,6 @@ public class TestCheckpoint extends TestCase {
   public void testCheckpoint() throws IOException {
     Path file1 = new Path("checkpoint.dat");
     Path file2 = new Path("checkpoint2.dat");
-    Collection<URI> namedirs = null;
-
     Configuration conf = new HdfsConfiguration();
     conf.set(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, "0.0.0.0:0");
     replication = (short)conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, 3);
@@ -721,8 +799,7 @@ public class TestCheckpoint extends TestCase {
       //
       assertTrue(!fileSys.exists(file1));
       assertTrue(!fileSys.exists(file2));
-      namedirs = cluster.getNameDirs(0);
-
+      
       //
       // Create file1
       //
@@ -733,7 +810,6 @@ public class TestCheckpoint extends TestCase {
       // Take a checkpoint
       //
       SecondaryNameNode secondary = startSecondaryNameNode(conf);
-      ErrorSimulator.initializeErrorSimulationEvent(5);
       secondary.doCheckpoint();
       secondary.shutdown();
     } finally {
@@ -791,16 +867,6 @@ public class TestCheckpoint extends TestCase {
       fileSys.close();
       cluster.shutdown();
     }
-
-    // file2 is left behind.
-
-    testNameNodeImageSendFail(conf);
-    testSecondaryNamenodeError1(conf);
-    testSecondaryNamenodeError2(conf);
-    testSecondaryNamenodeError3(conf);
-    testNamedirError(conf, namedirs);
-    testSecondaryFailsToReturnImage(conf);
-    testStartup(conf);
   }
 
   /**
@@ -862,7 +928,7 @@ public class TestCheckpoint extends TestCase {
       for(URI uri : editsDirs) {
         File ed = new File(uri.getPath());
         File curDir = new File(ed, "current");
-        System.err.println("Files in " + curDir + ":\n  " +
+        LOG.info("Files in " + curDir + ":\n  " +
             Joiner.on("\n  ").join(curDir.list()));
         // Verify that the first edits file got finalized
         File originalEdits = new File(curDir,
@@ -948,8 +1014,6 @@ public class TestCheckpoint extends TestCase {
     
     Configuration conf = new HdfsConfiguration();
 
-    ErrorSimulator.initializeErrorSimulationEvent(5);
-
     try {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes)
           .format(true).build();
@@ -1047,10 +1111,9 @@ public class TestCheckpoint extends TestCase {
    * if it hasn't changed.
    */
   @SuppressWarnings("deprecation")
-  public void testSecondaryImageDownload()
-    throws IOException {
+  public void testSecondaryImageDownload() throws IOException {
+    LOG.info("Starting testSecondaryImageDownload");
     Configuration conf = new HdfsConfiguration();
-    System.out.println("Starting testSecondaryImageDownload");
     conf.set(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, "0.0.0.0:0");
     Path dir = new Path("/checkpoint");
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
@@ -1373,7 +1436,6 @@ public class TestCheckpoint extends TestCase {
    * Test that the primary NN will not serve any files to a 2NN who doesn't
    * share its namespace ID, and also will not accept any files from one.
    */
-  @SuppressWarnings("deprecation")
   public void testNamespaceVerifiedOnFileTransfer() throws IOException {
     MiniDFSCluster cluster = null;