瀏覽代碼

ZOOKEEPER-2872: Interrupted snapshot sync causes data loss

This requires the fix in ZOOKEEPER-2870: Improve the efficiency of AtomicFileOutputStream

Author: Brian Nixon <nixon@fb.com>

Reviewers: Michael Han <hanm@apache.org>

Closes #333 from enixon/snap-sync
Brian Nixon 7 年之前
父節點
當前提交
0706b40afa

+ 6 - 2
src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -300,9 +300,13 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         takeSnapshot();
         takeSnapshot();
     }
     }
 
 
-    public void takeSnapshot(){
+    public void takeSnapshot() {
+        takeSnapshot(false);
+    }
+
+    public void takeSnapshot(boolean syncSnap){
         try {
         try {
-            txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());
+            txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
         } catch (IOException e) {
         } catch (IOException e) {
             LOG.error("Severe unrecoverable error, exiting", e);
             LOG.error("Severe unrecoverable error, exiting", e);
             // This is a severe error that we cannot recover from,
             // This is a severe error that we cannot recover from,

+ 8 - 4
src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java

@@ -37,6 +37,7 @@ import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.InputArchive;
 import org.apache.jute.InputArchive;
 import org.apache.jute.OutputArchive;
 import org.apache.jute.OutputArchive;
+import org.apache.zookeeper.common.AtomicFileOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.server.DataTree;
 import org.apache.zookeeper.server.DataTree;
@@ -213,12 +214,15 @@ public class FileSnap implements SnapShot {
      * @param dt the datatree to be serialized
      * @param dt the datatree to be serialized
      * @param sessions the sessions to be serialized
      * @param sessions the sessions to be serialized
      * @param snapShot the file to store snapshot into
      * @param snapShot the file to store snapshot into
+     * @param fsync sync the file immediately after write
      */
      */
-    public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)
+    public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot, boolean fsync)
             throws IOException {
             throws IOException {
         if (!close) {
         if (!close) {
-            try (OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));
-                 CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32())) {
+            try (CheckedOutputStream crcOut =
+                         new CheckedOutputStream(new BufferedOutputStream(fsync ? new AtomicFileOutputStream(snapShot) :
+                                                                                  new FileOutputStream(snapShot)),
+                                                 new Adler32())) {
                 //CheckedOutputStream cout = new CheckedOutputStream()
                 //CheckedOutputStream cout = new CheckedOutputStream()
                 OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);
                 OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);
                 FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
                 FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
@@ -226,7 +230,7 @@ public class FileSnap implements SnapShot {
                 long val = crcOut.getChecksum().getValue();
                 long val = crcOut.getChecksum().getValue();
                 oa.writeLong(val, "val");
                 oa.writeLong(val, "val");
                 oa.writeString("/", "path");
                 oa.writeString("/", "path");
-                sessOS.flush();
+                crcOut.flush();
             }
             }
         }
         }
     }
     }

+ 5 - 3
src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java

@@ -196,7 +196,7 @@ public class FileTxnSnapLog {
             if (trustEmptyDB) {
             if (trustEmptyDB) {
                 /* TODO: (br33d) we should either put a ConcurrentHashMap on restore()
                 /* TODO: (br33d) we should either put a ConcurrentHashMap on restore()
                  *       or use Map on save() */
                  *       or use Map on save() */
-                save(dt, (ConcurrentHashMap<Long, Integer>)sessions);
+                save(dt, (ConcurrentHashMap<Long, Integer>)sessions, false);
 
 
                 /* return a zxid of 0, since we know the database is empty */
                 /* return a zxid of 0, since we know the database is empty */
                 return 0L;
                 return 0L;
@@ -335,16 +335,18 @@ public class FileTxnSnapLog {
      * @param dataTree the datatree to be serialized onto disk
      * @param dataTree the datatree to be serialized onto disk
      * @param sessionsWithTimeouts the session timeouts to be
      * @param sessionsWithTimeouts the session timeouts to be
      * serialized onto disk
      * serialized onto disk
+     * @param syncSnap sync the snapshot immediately after write
      * @throws IOException
      * @throws IOException
      */
      */
     public void save(DataTree dataTree,
     public void save(DataTree dataTree,
-            ConcurrentHashMap<Long, Integer> sessionsWithTimeouts)
+                     ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
+                     boolean syncSnap)
         throws IOException {
         throws IOException {
         long lastZxid = dataTree.lastProcessedZxid;
         long lastZxid = dataTree.lastProcessedZxid;
         File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
         File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
         LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),
         LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),
                 snapshotFile);
                 snapshotFile);
-        snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile);
+        snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);
 
 
     }
     }
 
 

+ 5 - 3
src/java/main/org/apache/zookeeper/server/persistence/SnapShot.java

@@ -44,11 +44,13 @@ public interface SnapShot {
     /**
     /**
      * persist the datatree and the sessions into a persistence storage
      * persist the datatree and the sessions into a persistence storage
      * @param dt the datatree to be serialized
      * @param dt the datatree to be serialized
-     * @param sessions 
+     * @param sessions the session timeouts to be serialized
+     * @param name the object name to store snapshot into
+     * @param fsync sync the snapshot immediately after write
      * @throws IOException
      * @throws IOException
      */
      */
-    void serialize(DataTree dt, Map<Long, Integer> sessions, 
-            File name) 
+    void serialize(DataTree dt, Map<Long, Integer> sessions,
+                   File name, boolean fsync)
         throws IOException;
         throws IOException;
     
     
     /**
     /**

+ 6 - 2
src/java/main/org/apache/zookeeper/server/quorum/Learner.java

@@ -361,6 +361,7 @@ public class Learner {
         // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
         // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
         // For SNAP and TRUNC the snapshot is needed to save that history
         // For SNAP and TRUNC the snapshot is needed to save that history
         boolean snapshotNeeded = true;
         boolean snapshotNeeded = true;
+        boolean syncSnapshot = false;
         readPacket(qp);
         readPacket(qp);
         LinkedList<Long> packetsCommitted = new LinkedList<Long>();
         LinkedList<Long> packetsCommitted = new LinkedList<Long>();
         LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
         LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
@@ -387,6 +388,9 @@ public class Learner {
                     throw new IOException("Missing signature");                   
                     throw new IOException("Missing signature");                   
                 }
                 }
                 zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
                 zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
+
+                // immediately persist the latest snapshot when there is txn log gap
+                syncSnapshot = true;
             } else if (qp.getType() == Leader.TRUNC) {
             } else if (qp.getType() == Leader.TRUNC) {
                 //we need to truncate the log to the lastzxid of the leader
                 //we need to truncate the log to the lastzxid of the leader
                 LOG.warn("Truncating log to get in sync with the leader 0x"
                 LOG.warn("Truncating log to get in sync with the leader 0x"
@@ -513,7 +517,7 @@ public class Learner {
                        }
                        }
                     }
                     }
                     if (isPreZAB1_0) {
                     if (isPreZAB1_0) {
-                        zk.takeSnapshot();
+                        zk.takeSnapshot(syncSnapshot);
                         self.setCurrentEpoch(newEpoch);
                         self.setCurrentEpoch(newEpoch);
                     }
                     }
                     self.setZooKeeperServer(zk);
                     self.setZooKeeperServer(zk);
@@ -533,7 +537,7 @@ public class Learner {
                    }
                    }
 
 
                    if (snapshotNeeded) {
                    if (snapshotNeeded) {
-                       zk.takeSnapshot();
+                       zk.takeSnapshot(syncSnapshot);
                    }
                    }
                    
                    
                     self.setCurrentEpoch(newEpoch);
                     self.setCurrentEpoch(newEpoch);

+ 3 - 3
src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java

@@ -435,7 +435,7 @@ public class Zab1_0Test extends ZKTestCase {
             Assert.assertTrue(zxid > ZxidUtils.makeZxid(1, 0));
             Assert.assertTrue(zxid > ZxidUtils.makeZxid(1, 0));
             
             
             // Generate snapshot and close files.
             // Generate snapshot and close files.
-            snapLog.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());
+            snapLog.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), false);
             snapLog.close();
             snapLog.close();
             
             
             QuorumPeer peer = createQuorumPeer(tmpDir);
             QuorumPeer peer = createQuorumPeer(tmpDir);
@@ -714,7 +714,7 @@ public class Zab1_0Test extends ZKTestCase {
                     Assert.assertEquals(1, f.self.getAcceptedEpoch());
                     Assert.assertEquals(1, f.self.getAcceptedEpoch());
                     Assert.assertEquals(1, f.self.getCurrentEpoch());
                     Assert.assertEquals(1, f.self.getCurrentEpoch());
                     //Make sure that we did take the snapshot now
                     //Make sure that we did take the snapshot now
-                    verify(f.zk).takeSnapshot();
+                    verify(f.zk).takeSnapshot(true);
                     Assert.assertEquals(firstZxid, f.fzk.getLastProcessedZxid());
                     Assert.assertEquals(firstZxid, f.fzk.getLastProcessedZxid());
                     
                     
                     // Make sure the data was recorded in the filesystem ok
                     // Make sure the data was recorded in the filesystem ok
@@ -1367,7 +1367,7 @@ public class Zab1_0Test extends ZKTestCase {
             FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
             FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
             File version2 = new File(tmpDir, "version-2");
             File version2 = new File(tmpDir, "version-2");
             version2.mkdir();
             version2.mkdir();
-            logFactory.save(new DataTree(), new ConcurrentHashMap<Long, Integer>());
+            logFactory.save(new DataTree(), new ConcurrentHashMap<Long, Integer>(), false);
             long zxid = ZxidUtils.makeZxid(3, 3);
             long zxid = ZxidUtils.makeZxid(3, 3);
             logFactory.append(new Request(1, 1, ZooDefs.OpCode.error,
             logFactory.append(new Request(1, 1, ZooDefs.OpCode.error,
                     new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.error),
                     new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.error),

+ 1 - 1
src/java/test/org/apache/zookeeper/test/TruncateTest.java

@@ -75,7 +75,7 @@ public class TruncateTest extends ZKTestCase {
         ZKDatabase zkdb = new ZKDatabase(snaplog);
         ZKDatabase zkdb = new ZKDatabase(snaplog);
         // make sure to snapshot, so that we have something there when
         // make sure to snapshot, so that we have something there when
         // truncateLog reloads the db
         // truncateLog reloads the db
-        snaplog.save(zkdb.getDataTree(), zkdb.getSessionWithTimeOuts());
+        snaplog.save(zkdb.getDataTree(), zkdb.getSessionWithTimeOuts(), false);
 
 
         for (int i = 1; i <= 100; i++) {
         for (int i = 1; i <= 100; i++) {
             append(zkdb, i);
             append(zkdb, i);