浏览代码

ZOOKEEPER-568. SyncRequestProcessor snapping too frequently - counts non-log events as log events


git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@833639 13f79535-47bb-0310-9956-ffa450edef68
Benjamin Reed 15 年之前
父节点
当前提交
2a23a2599b

+ 44 - 27
src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java

@@ -34,42 +34,44 @@ import org.apache.log4j.Logger;
  */
 public class SyncRequestProcessor extends Thread implements RequestProcessor {
     private static final Logger LOG = Logger.getLogger(SyncRequestProcessor.class);
-    private ZooKeeperServer zks;
-    private LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
-    private RequestProcessor nextProcessor;
-    Thread snapInProcess = null;
-    
+    private final ZooKeeperServer zks;
+    private final LinkedBlockingQueue<Request> queuedRequests =
+        new LinkedBlockingQueue<Request>();
+    private final RequestProcessor nextProcessor;
+
+    private Thread snapInProcess = null;
+
     /**
      * Transactions that have been written and are waiting to be flushed to
      * disk. Basically this is the list of SyncItems whose callbacks will be
      * invoked after flush returns successfully.
      */
-    private LinkedList<Request> toFlush = new LinkedList<Request>();
-    private Random r = new Random(System.nanoTime());
-    private int logCount = 0;
+    private final LinkedList<Request> toFlush = new LinkedList<Request>();
+    private final Random r = new Random(System.nanoTime());
     /**
      * The number of log entries to log before starting a snapshot
      */
     private static int snapCount = ZooKeeperServer.getSnapCount();
 
-    private Request requestOfDeath = Request.requestOfDeath;
+    private final Request requestOfDeath = Request.requestOfDeath;
 
     public SyncRequestProcessor(ZooKeeperServer zks,
-            RequestProcessor nextProcessor) {
+            RequestProcessor nextProcessor)
+    {
         super("SyncThread:" + zks.getServerId());
         this.zks = zks;
         this.nextProcessor = nextProcessor;
     }
-    
+
     /**
-     * used by tests to check for changing 
+     * used by tests to check for changing
      * snapcounts
      * @param count
      */
     public static void setSnapCount(int count) {
         snapCount = count;
     }
-    
+
     /**
      * used by tests to get the snapcount
      * @return the snapcount
@@ -77,10 +79,14 @@ public class SyncRequestProcessor extends Thread implements RequestProcessor {
     public static int getSnapCount() {
         return snapCount;
     }
-    
+
     @Override
     public void run() {
         try {
+            int logCount = 0;
+
+            // we do this in an attempt to ensure that not all of the servers
+            // in the ensemble take a snapshot at the same time
             int randRoll = r.nextInt(snapCount/2);
             while (true) {
                 Request si = null;
@@ -97,7 +103,8 @@ public class SyncRequestProcessor extends Thread implements RequestProcessor {
                     break;
                 }
                 if (si != null) {
-                    zks.getLogWriter().append(si);
+                    // track the number of records written to the log
+                    if (zks.getLogWriter().append(si)) {
                         logCount++;
                         if (logCount > (snapCount / 2 + randRoll)) {
                             randRoll = r.nextInt(snapCount/2);
@@ -106,21 +113,31 @@ public class SyncRequestProcessor extends Thread implements RequestProcessor {
                             // take a snapshot
                             if (snapInProcess != null && snapInProcess.isAlive()) {
                                 LOG.warn("Too busy to snap, skipping");
-                            }
-                            else {
+                            } else {
                                 snapInProcess = new Thread("Snapshot Thread") {
-                                    public void run() {
-                                     try {
-                                         zks.takeSnapshot();
-                                     } catch(Exception e) {
-                                         LOG.warn("Unexpected exception", e);
-                                     }
-                                    }
-                                };
+                                        public void run() {
+                                            try {
+                                                zks.takeSnapshot();
+                                            } catch(Exception e) {
+                                                LOG.warn("Unexpected exception", e);
+                                            }
+                                        }
+                                    };
                                 snapInProcess.start();
                             }
                             logCount = 0;
                         }
+                    } else if (toFlush.isEmpty()) {
+                        // optimization for read heavy workloads
+                        // iff this is a read, and there are no pending
+                        // flushes (writes), then just pass this to the next
+                        // processor
+                        nextProcessor.processRequest(si);
+                        if (nextProcessor instanceof Flushable) {
+                            ((Flushable)nextProcessor).flush();
+                        }
+                        continue;
+                    }
                     toFlush.add(si);
                     if (toFlush.size() > 1000) {
                         flush(toFlush);
@@ -135,11 +152,11 @@ public class SyncRequestProcessor extends Thread implements RequestProcessor {
     }
 
     private void flush(LinkedList<Request> toFlush) throws IOException {
-        if (toFlush.size() == 0)
+        if (toFlush.isEmpty())
             return;
 
         zks.getLogWriter().commit();
-        while (toFlush.size() > 0) {
+        while (!toFlush.isEmpty()) {
             Request i = toFlush.remove();
             nextProcessor.processRequest(i);
         }

+ 7 - 2
src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java

@@ -131,9 +131,11 @@ public class FileTxnLog implements TxnLog {
      * append an entry to the transaction log
      * @param hdr the header of the transaction
      * @param txn the transaction part of the entry
+     * returns true iff something appended, otw false 
      */
-    public synchronized void append(TxnHeader hdr, Record txn)
-        throws IOException {
+    public synchronized boolean append(TxnHeader hdr, Record txn)
+        throws IOException
+    {
         if (hdr != null) {
             if (hdr.getZxid() <= lastZxidSeen) {
                 LOG.warn("Current zxid " + hdr.getZxid()
@@ -161,7 +163,10 @@ public class FileTxnLog implements TxnLog {
             crc.update(buf, 0, buf.length);
             oa.writeLong(crc.getValue(), "txnEntryCRC");
             Util.writeTxnBytes(oa, buf);
+            
+            return true;
         }
+        return false;
     }
 
     /**

+ 3 - 2
src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java

@@ -262,10 +262,11 @@ public class FileTxnSnapLog {
     /**
      * append the request to the transaction logs
      * @param si the request to be appended
+     * returns true iff something appended, otw false 
      * @throws IOException
      */
-    public void append(Request si) throws IOException {
-        txnLog.append(si.hdr, si.txn);
+    public boolean append(Request si) throws IOException {
+        return txnLog.append(si.hdr, si.txn);
     }
 
     /**

+ 2 - 1
src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java

@@ -39,9 +39,10 @@ public interface TxnLog {
      * Append a request to the transaction log
      * @param hdr the transaction header
      * @param r the transaction itself
+     * returns true iff something appended, otw false 
      * @throws IOException
      */
-    void append(TxnHeader hdr, Record r) throws IOException;
+    boolean append(TxnHeader hdr, Record r) throws IOException;
 
     /**
      * Start reading the transaction logs