Browse Source

svn merge 1340749:1340750 HDFS-2717. BookKeeper Journal output stream doesn't check addComplete rc. Contributed by Ivan Kelly.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1340754 13f79535-47bb-0310-9956-ffa450edef68
Uma Maheswara Rao G 13 years ago
parent
commit
cb96cf17a9

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -84,6 +84,9 @@ Release 2.0.1-alpha - UNRELEASED
 
 
     HDFS-3444. hdfs groups command doesn't work with security enabled. (atm)
     HDFS-3444. hdfs groups command doesn't work with security enabled. (atm)
 
 
+    HDFS-2717. BookKeeper Journal output stream doesn't check addComplete rc.
+    (Ivan Kelly via umamahesh)
+
 Release 2.0.0-alpha - UNRELEASED
 Release 2.0.0-alpha - UNRELEASED
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 24 - 0
hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java

@@ -33,6 +33,9 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import java.io.IOException;
 import java.io.IOException;
 
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 /**
 /**
  * Output stream for BookKeeper Journal.
  * Output stream for BookKeeper Journal.
  * Multiple complete edit log entries are packed into a single bookkeeper
  * Multiple complete edit log entries are packed into a single bookkeeper
@@ -44,11 +47,15 @@ import java.io.IOException;
  */
  */
 class BookKeeperEditLogOutputStream
 class BookKeeperEditLogOutputStream
   extends EditLogOutputStream implements AddCallback {
   extends EditLogOutputStream implements AddCallback {
+  static final Log LOG = LogFactory.getLog(BookKeeperEditLogOutputStream.class);
+
   private final DataOutputBuffer bufCurrent;
   private final DataOutputBuffer bufCurrent;
   private final AtomicInteger outstandingRequests;
   private final AtomicInteger outstandingRequests;
   private final int transmissionThreshold;
   private final int transmissionThreshold;
   private final LedgerHandle lh;
   private final LedgerHandle lh;
   private CountDownLatch syncLatch;
   private CountDownLatch syncLatch;
+  private final AtomicInteger transmitResult
+    = new AtomicInteger(BKException.Code.OK);
   private final WriteLock wl;
   private final WriteLock wl;
   private final Writer writer;
   private final Writer writer;
 
 
@@ -141,6 +148,11 @@ class BookKeeperEditLogOutputStream
     } catch (InterruptedException ie) {
     } catch (InterruptedException ie) {
       throw new IOException("Interrupted waiting on latch", ie);
       throw new IOException("Interrupted waiting on latch", ie);
     }
     }
+    if (transmitResult.get() != BKException.Code.OK) {
+      throw new IOException("Failed to write to bookkeeper; Error is ("
+                            + transmitResult.get() + ") "
+                            + BKException.getMessage(transmitResult.get()));
+    }
 
 
     syncLatch = null;
     syncLatch = null;
     // wait for whatever we wait on
     // wait for whatever we wait on
@@ -154,6 +166,12 @@ class BookKeeperEditLogOutputStream
   private void transmit() throws IOException {
   private void transmit() throws IOException {
     wl.checkWriteLock();
     wl.checkWriteLock();
 
 
+    if (!transmitResult.compareAndSet(BKException.Code.OK,
+                                     BKException.Code.OK)) {
+      throw new IOException("Trying to write to an errored stream;"
+          + " Error code : (" + transmitResult.get()
+          + ") " + BKException.getMessage(transmitResult.get()));
+    }
     if (bufCurrent.getLength() > 0) {
     if (bufCurrent.getLength() > 0) {
       byte[] entry = Arrays.copyOf(bufCurrent.getData(),
       byte[] entry = Arrays.copyOf(bufCurrent.getData(),
                                    bufCurrent.getLength());
                                    bufCurrent.getLength());
@@ -168,6 +186,12 @@ class BookKeeperEditLogOutputStream
                           long entryId, Object ctx) {
                           long entryId, Object ctx) {
     synchronized(this) {
     synchronized(this) {
       outstandingRequests.decrementAndGet();
       outstandingRequests.decrementAndGet();
+      if (!transmitResult.compareAndSet(BKException.Code.OK, rc)) {
+        LOG.warn("Tried to set transmit result to (" + rc + ") \""
+            + BKException.getMessage(rc) + "\""
+            + " but is already (" + transmitResult.get() + ") \""
+            + BKException.getMessage(transmitResult.get()) + "\"");
+      }
       CountDownLatch l = syncLatch;
       CountDownLatch l = syncLatch;
       if (l != null) {
       if (l != null) {
         l.countDown();
         l.countDown();

+ 31 - 19
hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java

@@ -61,7 +61,7 @@ import org.apache.commons.logging.LogFactory;
  * </property>
  * </property>
  *
  *
  * <property>
  * <property>
- *   <name>dfs.namenode.edits.journalPlugin.bookkeeper</name>
+ *   <name>dfs.namenode.edits.journal-plugin.bookkeeper</name>
  *   <value>org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager</value>
  *   <value>org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager</value>
  * </property>
  * </property>
  * }
  * }
@@ -212,15 +212,15 @@ public class BookKeeperJournalManager implements JournalManager {
       throw new IOException("We've already seen " + txId
       throw new IOException("We've already seen " + txId
           + ". A new stream cannot be created with it");
           + ". A new stream cannot be created with it");
     }
     }
-    if (currentLedger != null) {
-      throw new IOException("Already writing to a ledger, id="
-                            + currentLedger.getId());
-    }
     try {
     try {
+      if (currentLedger != null) {
+        // bookkeeper errored on last stream, clean up ledger
+        currentLedger.close();
+      }
       currentLedger = bkc.createLedger(ensembleSize, quorumSize,
       currentLedger = bkc.createLedger(ensembleSize, quorumSize,
                                        BookKeeper.DigestType.MAC,
                                        BookKeeper.DigestType.MAC,
                                        digestpw.getBytes());
                                        digestpw.getBytes());
-      String znodePath = inprogressZNode();
+      String znodePath = inprogressZNode(txId);
       EditLogLedgerMetadata l = new EditLogLedgerMetadata(znodePath,
       EditLogLedgerMetadata l = new EditLogLedgerMetadata(znodePath,
           HdfsConstants.LAYOUT_VERSION,  currentLedger.getId(), txId);
           HdfsConstants.LAYOUT_VERSION,  currentLedger.getId(), txId);
       /* Write the ledger metadata out to the inprogress ledger znode
       /* Write the ledger metadata out to the inprogress ledger znode
@@ -258,7 +258,7 @@ public class BookKeeperJournalManager implements JournalManager {
   @Override
   @Override
   public void finalizeLogSegment(long firstTxId, long lastTxId)
   public void finalizeLogSegment(long firstTxId, long lastTxId)
       throws IOException {
       throws IOException {
-    String inprogressPath = inprogressZNode();
+    String inprogressPath = inprogressZNode(firstTxId);
     try {
     try {
       Stat inprogressStat = zkc.exists(inprogressPath, false);
       Stat inprogressStat = zkc.exists(inprogressPath, false);
       if (inprogressStat == null) {
       if (inprogressStat == null) {
@@ -372,21 +372,33 @@ public class BookKeeperJournalManager implements JournalManager {
   @Override
   @Override
   public void recoverUnfinalizedSegments() throws IOException {
   public void recoverUnfinalizedSegments() throws IOException {
     wl.acquire();
     wl.acquire();
-
     synchronized (this) {
     synchronized (this) {
       try {
       try {
-        EditLogLedgerMetadata l
-          = EditLogLedgerMetadata.read(zkc, inprogressZNode());
-        long endTxId = recoverLastTxId(l);
-        if (endTxId == HdfsConstants.INVALID_TXID) {
-          LOG.error("Unrecoverable corruption has occurred in segment "
-                    + l.toString() + " at path " + inprogressZNode()
-                    + ". Unable to continue recovery.");
-          throw new IOException("Unrecoverable corruption, please check logs.");
+        List<String> children = zkc.getChildren(ledgerPath, false);
+        for (String child : children) {
+          if (!child.startsWith("inprogress_")) {
+            continue;
+          }
+          String znode = ledgerPath + "/" + child;
+          EditLogLedgerMetadata l
+            = EditLogLedgerMetadata.read(zkc, znode);
+          long endTxId = recoverLastTxId(l);
+          if (endTxId == HdfsConstants.INVALID_TXID) {
+            LOG.error("Unrecoverable corruption has occurred in segment "
+                      + l.toString() + " at path " + znode
+                      + ". Unable to continue recovery.");
+            throw new IOException("Unrecoverable corruption,"
+                                  + " please check logs.");
+          }
+          finalizeLogSegment(l.getFirstTxId(), endTxId);
         }
         }
-        finalizeLogSegment(l.getFirstTxId(), endTxId);
       } catch (KeeperException.NoNodeException nne) {
       } catch (KeeperException.NoNodeException nne) {
           // nothing to recover, ignore
           // nothing to recover, ignore
+      } catch (KeeperException ke) {
+        throw new IOException("Couldn't get list of inprogress segments", ke);
+      } catch (InterruptedException ie) {
+        throw new IOException("Interrupted getting list of inprogress segments",
+                              ie);
       } finally {
       } finally {
         if (wl.haveLock()) {
         if (wl.haveLock()) {
           wl.release();
           wl.release();
@@ -495,8 +507,8 @@ public class BookKeeperJournalManager implements JournalManager {
   /**
   /**
    * Get the znode path for the inprogressZNode
    * Get the znode path for the inprogressZNode
    */
    */
-  String inprogressZNode() {
-    return ledgerPath + "/inprogress";
+  String inprogressZNode(long startTxid) {
+    return ledgerPath + "/inprogress_" + Long.toString(startTxid, 16);
   }
   }
 
 
   /**
   /**

+ 223 - 28
hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java

@@ -28,6 +28,8 @@ import java.util.Iterator;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 
 
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.util.LocalBookKeeper;
 import org.apache.bookkeeper.util.LocalBookKeeper;
 
 
 import java.io.RandomAccessFile;
 import java.io.RandomAccessFile;
@@ -74,11 +76,15 @@ public class TestBookKeeperJournalManager {
   
   
   private static final long DEFAULT_SEGMENT_SIZE = 1000;
   private static final long DEFAULT_SEGMENT_SIZE = 1000;
   private static final String zkEnsemble = "localhost:2181";
   private static final String zkEnsemble = "localhost:2181";
+  final static private int numBookies = 5;
 
 
   private static Thread bkthread;
   private static Thread bkthread;
   protected static Configuration conf = new Configuration();
   protected static Configuration conf = new Configuration();
   private ZooKeeper zkc;
   private ZooKeeper zkc;
 
 
+  
+  static int nextPort = 6000; // next port for additionally created bookies
+
   private static ZooKeeper connectZooKeeper(String ensemble) 
   private static ZooKeeper connectZooKeeper(String ensemble) 
       throws IOException, KeeperException, InterruptedException {
       throws IOException, KeeperException, InterruptedException {
     final CountDownLatch latch = new CountDownLatch(1);
     final CountDownLatch latch = new CountDownLatch(1);
@@ -96,9 +102,72 @@ public class TestBookKeeperJournalManager {
     return zkc;
     return zkc;
   }
   }
 
 
+  private static BookieServer newBookie() throws Exception {
+    int port = nextPort++;
+    ServerConfiguration bookieConf = new ServerConfiguration();
+    bookieConf.setBookiePort(port);
+    File tmpdir = File.createTempFile("bookie" + Integer.toString(port) + "_",
+                                      "test");
+    tmpdir.delete();
+    tmpdir.mkdir();
+
+    bookieConf.setZkServers(zkEnsemble);
+    bookieConf.setJournalDirName(tmpdir.getPath());
+    bookieConf.setLedgerDirNames(new String[] { tmpdir.getPath() });
+
+    BookieServer b = new BookieServer(bookieConf);
+    b.start();
+    for (int i = 0; i < 10 && !b.isRunning(); i++) {
+      Thread.sleep(10000);
+    }
+    if (!b.isRunning()) {
+      throw new IOException("Bookie would not start");
+    }
+    return b;
+  }
+
+  /**
+   * Check that a number of bookies are available
+   * @param count number of bookies required
+   * @param timeout number of seconds to wait for bookies to start
+   * @throws IOException if bookies are not started by the time the timeout hits
+   */
+  private static int checkBookiesUp(int count, int timeout) throws Exception {
+    ZooKeeper zkc = connectZooKeeper(zkEnsemble);
+    try {
+      boolean up = false;
+      int mostRecentSize = 0;
+      for (int i = 0; i < timeout; i++) {
+        try {
+          List<String> children = zkc.getChildren("/ledgers/available",
+                                                  false);
+          mostRecentSize = children.size();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Found " + mostRecentSize + " bookies up, "
+                      + "waiting for " + count);
+            if (LOG.isTraceEnabled()) {
+              for (String child : children) {
+                LOG.trace(" server: " + child);
+              }
+            }
+          }
+          if (mostRecentSize == count) {
+            up = true;
+            break;
+          }
+        } catch (KeeperException e) {
+          // ignore
+        }
+        Thread.sleep(1000);
+      }
+      return mostRecentSize;
+    } finally {
+      zkc.close();
+    }
+  }
+
   @BeforeClass
   @BeforeClass
   public static void setupBookkeeper() throws Exception {
   public static void setupBookkeeper() throws Exception {
-    final int numBookies = 5;
     bkthread = new Thread() {
     bkthread = new Thread() {
         public void run() {
         public void run() {
           try {
           try {
@@ -118,29 +187,8 @@ public class TestBookKeeperJournalManager {
     if (!LocalBookKeeper.waitForServerUp(zkEnsemble, 10000)) {
     if (!LocalBookKeeper.waitForServerUp(zkEnsemble, 10000)) {
       throw new Exception("Error starting zookeeper/bookkeeper");
       throw new Exception("Error starting zookeeper/bookkeeper");
     }
     }
-
-    ZooKeeper zkc = connectZooKeeper(zkEnsemble);
-    try {
-      boolean up = false;
-      for (int i = 0; i < 10; i++) {
-        try {
-          List<String> children = zkc.getChildren("/ledgers/available", 
-                                                  false);
-          if (children.size() == numBookies) {
-            up = true;
-            break;
-          }
-        } catch (KeeperException e) {
-          // ignore
-        }
-        Thread.sleep(1000);
-      }
-      if (!up) {
-        throw new IOException("Not enough bookies started");
-      }
-    } finally {
-      zkc.close();
-    }
+    assertEquals("Not all bookies started", 
+                 numBookies, checkBookiesUp(numBookies, 10));
   }
   }
   
   
   @Before
   @Before
@@ -178,7 +226,7 @@ public class TestBookKeeperJournalManager {
     String zkpath = bkjm.finalizedLedgerZNode(1, 100);
     String zkpath = bkjm.finalizedLedgerZNode(1, 100);
     
     
     assertNotNull(zkc.exists(zkpath, false));
     assertNotNull(zkc.exists(zkpath, false));
-    assertNull(zkc.exists(bkjm.inprogressZNode(), false));
+    assertNull(zkc.exists(bkjm.inprogressZNode(1), false));
   }
   }
 
 
   @Test
   @Test
@@ -385,11 +433,158 @@ public class TestBookKeeperJournalManager {
 
 
 
 
     assertNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false));
     assertNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false));
-    assertNotNull(zkc.exists(bkjm.inprogressZNode(), false));
+    assertNotNull(zkc.exists(bkjm.inprogressZNode(1), false));
 
 
     bkjm.recoverUnfinalizedSegments();
     bkjm.recoverUnfinalizedSegments();
 
 
     assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false));
     assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false));
-    assertNull(zkc.exists(bkjm.inprogressZNode(), false));
+    assertNull(zkc.exists(bkjm.inprogressZNode(1), false));
+  }
+
+  /**
+   * Test that if enough bookies fail to prevent an ensemble,
+   * writes the bookkeeper will fail. Test that when once again
+   * an ensemble is available, it can continue to write.
+   */
+  @Test
+  public void testAllBookieFailure() throws Exception {
+    BookieServer bookieToFail = newBookie();
+    BookieServer replacementBookie = null;
+
+    try {
+      int ensembleSize = numBookies + 1;
+      assertEquals("New bookie didn't start",
+                   ensembleSize, checkBookiesUp(ensembleSize, 10));
+
+      // ensure that the journal manager has to use all bookies,
+      // so that a failure will fail the journal manager
+      Configuration conf = new Configuration();
+      conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
+                  ensembleSize);
+      conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
+                  ensembleSize);
+      long txid = 1;
+      BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+          URI.create("bookkeeper://" + zkEnsemble
+                     + "/hdfsjournal-allbookiefailure"));
+      EditLogOutputStream out = bkjm.startLogSegment(txid);
+
+      for (long i = 1 ; i <= 3; i++) {
+        FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+        op.setTransactionId(txid++);
+        out.write(op);
+      }
+      out.setReadyToFlush();
+      out.flush();
+      bookieToFail.shutdown();
+      assertEquals("New bookie didn't die",
+                   numBookies, checkBookiesUp(numBookies, 10));
+
+      try {
+        for (long i = 1 ; i <= 3; i++) {
+          FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+          op.setTransactionId(txid++);
+          out.write(op);
+        }
+        out.setReadyToFlush();
+        out.flush();
+        fail("should not get to this stage");
+      } catch (IOException ioe) {
+        LOG.debug("Error writing to bookkeeper", ioe);
+        assertTrue("Invalid exception message",
+                   ioe.getMessage().contains("Failed to write to bookkeeper"));
+      }
+      replacementBookie = newBookie();
+
+      assertEquals("New bookie didn't start",
+                   numBookies+1, checkBookiesUp(numBookies+1, 10));
+      out = bkjm.startLogSegment(txid);
+      for (long i = 1 ; i <= 3; i++) {
+        FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+        op.setTransactionId(txid++);
+        out.write(op);
+      }
+
+      out.setReadyToFlush();
+      out.flush();
+
+    } catch (Exception e) {
+      LOG.error("Exception in test", e);
+      throw e;
+    } finally {
+      if (replacementBookie != null) {
+        replacementBookie.shutdown();
+      }
+      bookieToFail.shutdown();
+
+      if (checkBookiesUp(numBookies, 30) != numBookies) {
+        LOG.warn("Not all bookies from this test shut down, expect errors");
+      }
+    }
   }
   }
-}
+
+  /**
+   * Test that a BookKeeper JM can continue to work across the
+   * failure of a bookie. This should be handled transparently
+   * by bookkeeper.
+   */
+  @Test
+  public void testOneBookieFailure() throws Exception {
+    BookieServer bookieToFail = newBookie();
+    BookieServer replacementBookie = null;
+
+    try {
+      int ensembleSize = numBookies + 1;
+      assertEquals("New bookie didn't start",
+                   ensembleSize, checkBookiesUp(ensembleSize, 10));
+
+      // ensure that the journal manager has to use all bookies,
+      // so that a failure will fail the journal manager
+      Configuration conf = new Configuration();
+      conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
+                  ensembleSize);
+      conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
+                  ensembleSize);
+      long txid = 1;
+      BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+          URI.create("bookkeeper://" + zkEnsemble
+                     + "/hdfsjournal-onebookiefailure"));
+      EditLogOutputStream out = bkjm.startLogSegment(txid);
+      for (long i = 1 ; i <= 3; i++) {
+        FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+        op.setTransactionId(txid++);
+        out.write(op);
+      }
+      out.setReadyToFlush();
+      out.flush();
+
+      replacementBookie = newBookie();
+      assertEquals("replacement bookie didn't start",
+                   ensembleSize+1, checkBookiesUp(ensembleSize+1, 10));
+      bookieToFail.shutdown();
+      assertEquals("New bookie didn't die",
+                   ensembleSize, checkBookiesUp(ensembleSize, 10));
+
+      for (long i = 1 ; i <= 3; i++) {
+        FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+        op.setTransactionId(txid++);
+        out.write(op);
+      }
+      out.setReadyToFlush();
+      out.flush();
+    } catch (Exception e) {
+      LOG.error("Exception in test", e);
+      throw e;
+    } finally {
+      if (replacementBookie != null) {
+        replacementBookie.shutdown();
+      }
+      bookieToFail.shutdown();
+
+      if (checkBookiesUp(numBookies, 30) != numBookies) {
+        LOG.warn("Not all bookies from this test shut down, expect errors");
+      }
+    }
+  }
+
+}