Selaa lähdekoodia

ZOOKEEPER-212. fix the snapshot to be asynchronous. (mahadev and ben)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@718433 13f79535-47bb-0310-9956-ffa450edef68
Mahadev Konar 16 vuotta sitten
vanhempi
commit
f7673ae440

+ 2 - 1
CHANGES.txt

@@ -7,7 +7,8 @@ Backward compatibile changes:
 BUGFIXES: 
    ZOOKEEPER-223. change default level in root logger to INFO. (pat via
 mahadev) 
-
+   
+   ZOOKEEPER-212. fix the snapshot to be asynchronous. (mahadev and ben)
 
 Release 3.0.0 - 2008-10-21
 

+ 17 - 5
src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java

@@ -37,6 +37,8 @@ public class SyncRequestProcessor extends Thread implements RequestProcessor {
     private LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
     private RequestProcessor nextProcessor;
     boolean timeToDie = false;
+    Thread snapInProcess = null;
+    
     /**
      * Transactions that have been written and are waiting to be flushed to
      * disk. Basically this is the list of SyncItems whose callbacks will be
@@ -60,10 +62,6 @@ public class SyncRequestProcessor extends Thread implements RequestProcessor {
         start();
     }
 
-    private void startSnapshot() throws IOException {
-        zks.takeSnapshot();
-    }
-
     @Override
     public void run() {
         try {
@@ -89,7 +87,21 @@ public class SyncRequestProcessor extends Thread implements RequestProcessor {
                             // roll the log
                             zks.getLogWriter().rollLog();
                             // take a snapshot
-                            startSnapshot();
+                            if (snapInProcess != null && snapInProcess.isAlive()) {
+                                LOG.warn("Too busy to snap, skipping");
+                            }
+                            else {
+                                snapInProcess = new Thread("Snapshot Thread") {
+                                    public void run() {
+                                     try {
+                                         zks.takeSnapshot();
+                                     } catch(Exception e) {
+                                         LOG.warn("Unexpected exception", e);
+                                     }
+                                    }
+                                };
+                                snapInProcess.start();
+                            }
                             logCount = 0;
                         }
                     toFlush.add(si);

+ 66 - 0
src/java/test/org/apache/zookeeper/test/DBSizeTest.java

@@ -0,0 +1,66 @@
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.Test;
+
+public class DBSizeTest extends ClientBase {
+    String snapCount;
+    @Override
+    protected void setUp() throws Exception {
+        // Change the snapcount to happen more often
+        snapCount = System.getProperty("zookeeper.snapCount", "1024");
+        System.setProperty("zookeeper.snapCount", "10");
+        super.setUp();
+    }
+    
+
+    @Override
+    protected void tearDown() throws Exception {
+        System.setProperty("zookeeper.snapCount", snapCount);
+        super.tearDown();
+    }
+
+
+    // Test that the latency of requests doesn't increase with
+    // the size of the database
+    @Test
+    public void testDBScale()
+        throws IOException, InterruptedException, KeeperException
+    {
+        String path = "/SIZE";
+        byte data[] = new byte[1024];
+        ZooKeeper zk = null;
+        try {
+            zk = createClient();
+            long startTime = System.currentTimeMillis();
+            zk.create(path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            long baseLatency = System.currentTimeMillis() - startTime;
+            
+            for(int i = 0; i < 16; i++) {
+                startTime = System.currentTimeMillis();
+                zk.create(path + '/' + i, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                long latency = System.currentTimeMillis() - startTime;
+                System.out.println("Latency = " + latency);
+                //assertTrue(latency < baseLatency + 10);
+                for(int j = 0; j < 1024; j++) {
+                    zk.create(path + '/' + i + '/' + j, data, Ids.OPEN_ACL_UNSAFE, 
+                            CreateMode.EPHEMERAL, new AsyncCallback.StringCallback() {
+                        public void processResult(int rc, String path,
+                                Object ctx, String name) {
+                        }}, null);
+                }
+            }
+        } finally {
+            if(zk != null)
+                zk.close();
+        }
+    }
+
+
+}