Selaa lähdekoodia

ZOOKEEPER-197. create checksums for snapshots

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@706510 13f79535-47bb-0310-9956-ffa450edef68
Patrick D. Hunt 16 vuotta sitten
vanhempi
commit
04ac2dc3e5

+ 2 - 0
CHANGES.txt

@@ -35,6 +35,8 @@ Backward compatibile changes:
 
   BUGFIXES: 
 
+  ZOOKEEPER-197. create checksums for snapshots (mahadev via phunt)
+
   ZOOKEEPER-198. apache license header missing from FollowerSyncRequest.java
   (phunt)
 

+ 21 - 2
src/java/main/org/apache/zookeeper/server/persistence/FileSnap.java

@@ -28,11 +28,15 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
+import java.util.zip.Adler32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
 
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.InputArchive;
 import org.apache.jute.OutputArchive;
+import org.apache.log4j.Logger;
 import org.apache.zookeeper.server.DataTree;
 import org.apache.zookeeper.server.util.SerializeUtils;
 
@@ -46,6 +50,7 @@ public class FileSnap implements SnapShot {
     File snapDir;
     private static final int VERSION=2;
     private static final long dbId=-1;
+    private static final Logger LOG = Logger.getLogger(FileSnap.class);
     public final static int MAGIC = ByteBuffer.wrap("AK47".getBytes()).getInt();
     public FileSnap(File snapDir) {
         this.snapDir = snapDir;
@@ -61,10 +66,18 @@ public class FileSnap implements SnapShot {
         if (snap == null) {
             return -1L;
         }
+        LOG.info("Reading snapshot " + snap);
         InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));
-        InputArchive ia=BinaryInputArchive.getArchive(snapIS);
+        CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32());
+        InputArchive ia=BinaryInputArchive.getArchive(crcIn);
         deserialize(dt,sessions, ia);
+        long checkSum = crcIn.getChecksum().getValue();
+        long val = ia.readLong("val");
+        if (val != checkSum) {
+            throw new IOException("CRC corruption in snapshot :  " + snap);
+        }
         snapIS.close();
+        crcIn.close();
         dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), "snapshot");
         return dt.lastProcessedZxid;
     }
@@ -124,10 +137,16 @@ public class FileSnap implements SnapShot {
     public void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)
             throws IOException {
         OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));
-        OutputArchive oa = BinaryOutputArchive.getArchive(sessOS);
+        CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32());
+        //CheckedOutputStream cout = new CheckedOutputStream()
+        OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);
         FileHeader header = new FileHeader(MAGIC, VERSION, dbId);
         serialize(dt,sessions,oa, header);
+        long val = crcOut.getChecksum().getValue();
+        oa.writeLong(val, "val");
+        oa.writeString("/", "path");
         sessOS.flush();
+        crcOut.close();
         sessOS.close();
     }
    

+ 155 - 0
src/java/test/org/apache/zookeeper/server/CRCTest.java

@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.server.persistence.FileSnap;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
+import org.apache.zookeeper.test.ClientBase;
+
+public class CRCTest extends TestCase implements Watcher{
+    
+    private static final Logger LOG = Logger.getLogger(CRCTest.class);
+    private static String HOSTPORT = "127.0.0.1:2357";
+    ZooKeeperServer zks;
+    private CountDownLatch startSignal;
+    
+    @Override
+    protected void setUp() throws Exception {
+        LOG.info("STARTING " + getName());
+        ServerStats.registerAsConcrete();
+    }
+    @Override
+    protected void tearDown() throws Exception {
+        ServerStats.unregister();
+        LOG.info("FINISHED " + getName());
+    }
+    
+    /**
+     * corrupt a file by writing m at 500 b
+     * offset
+     * @param file the file to be corrupted
+     * @throws IOException
+     */
+    private void corruptFile(File file) throws IOException {
+        // corrupt the logfile
+        RandomAccessFile raf  = new RandomAccessFile(file, "rw");
+        byte[] b = "mahadev".getBytes();
+        long writeLen = 500L;
+        raf.seek(writeLen);
+        //corruptting the data
+        raf.write(b);
+        raf.close();
+    }
+
+    /** test checksums for the logs and snapshots.
+     * the reader should fail on reading 
+     * a corrupt snapshot and a corrupt log
+     * file
+     * @throws Exception
+     */
+   public void testChecksums() throws Exception {
+        File tmpDir = ClientBase.createTmpDir();
+        ClientBase.setupTestEnv();
+        zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
+        SyncRequestProcessor.snapCount = 150;
+        final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
+        NIOServerCnxn.Factory f = new NIOServerCnxn.Factory(PORT);
+        f.startup(zks);
+        LOG.info("starting up the zookeeper server .. waiting");
+        assertTrue("waiting for server being up", 
+                ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));
+        ZooKeeper zk = new ZooKeeper(HOSTPORT, 20000, this);
+        for (int i =0; i < 2000; i++) {
+            zk.create("/crctest- " + i , ("/crctest- " + i).getBytes(), 
+                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        }
+        f.shutdown();
+        assertTrue("waiting for server down",
+                   ClientBase.waitForServerDown(HOSTPORT,
+                           ClientBase.CONNECTION_TIMEOUT));
+
+        File versionDir = new File(tmpDir, "version-2");
+        File[] list = versionDir.listFiles();
+        //there should be only two files 
+        // one the snapshot and the other logFile
+        File snapFile = null;
+        File logFile = null;
+        for (File file: list) {
+            LOG.info("file is " + file);
+            if (file.getName().startsWith("log")) {
+                logFile = file;
+                corruptFile(logFile);
+            }
+        }
+        FileTxnLog flog = new FileTxnLog(versionDir);
+        TxnIterator itr = flog.read(1);
+        //we will get a checksum failure
+        try {
+            while (itr.next()) {
+            }
+            assertTrue(false);
+        } catch(IOException ie) {
+            LOG.info("crc corruption", ie);
+        }
+        itr.close();
+        // find the last snapshot
+        FileSnap snap = new FileSnap(versionDir);
+        snapFile = snap.findMostRecentSnapshot();
+        // corrupt this file
+        corruptFile(snapFile);
+        DataTree dt = new DataTree();
+        Map<Long, Integer> sessions = 
+            new ConcurrentHashMap<Long, Integer>();
+        try {   
+            snap.deserialize(dt, sessions);
+            assertTrue(false);
+        } catch(IOException ie) {
+            LOG.info("checksu failure in snapshot", ie);    
+        }
+         
+   }
+    
+    public void process(WatchedEvent event) {
+        LOG.info("Event:" + event.getState() + " " + event.getType() + " " + event.getPath());
+        if (event.getState() == KeeperState.SyncConnected
+                && startSignal != null && startSignal.getCount() > 0)
+        {              
+            startSignal.countDown();      
+        }
+    }
+}

+ 3 - 3
src/java/test/org/apache/zookeeper/test/ClientBase.java

@@ -44,7 +44,7 @@ import org.apache.zookeeper.server.persistence.FileTxnLog;
 public abstract class ClientBase extends TestCase {
     protected static final Logger LOG = Logger.getLogger(ClientBase.class);
 
-    static final int CONNECTION_TIMEOUT = 30000;
+    public static final int CONNECTION_TIMEOUT = 30000;
     static final File BASETEST =
         new File(System.getProperty("build.test.dir", "build"));
 
@@ -191,7 +191,7 @@ public abstract class ClientBase extends TestCase {
     }
 
 
-    static File createTmpDir() throws IOException {
+    public static File createTmpDir() throws IOException {
         return createTmpDir(BASETEST);
     }
     static File createTmpDir(File parentDir) throws IOException {
@@ -240,7 +240,7 @@ public abstract class ClientBase extends TestCase {
     /**
      * Test specific setup
      */
-    static void setupTestEnv() {
+    public static void setupTestEnv() {
         // during the tests we run with 100K prealloc in the logs.
         // on windows systems prealloc of 64M was seen to take ~15seconds
         // resulting in test failure (client timeout on first session).