فهرست منبع

ZOOKEEPER-3020: Review of SyncRequestProcessor

1. Use ArrayDeque instead of LinkedList
2. Use ThreadLocalRandom instead of Random
3. Remove the 'running' flag - use the Thread#join facility to detect if the thread has stopped running. Using a flag can cause race condition issues and is superfluous.
4. Make static final variable names in all caps
5. General cleanup

> This class is likely to be faster than Stack when used as a stack, and faster than LinkedList when used as a queue.

https://docs.oracle.com/javase/7/docs/api/java/util/ArrayDeque.html

Author: Beluga Behr <dam6923@gmail.com>

Reviewers: andor@apache.org

Closes #876 from BELUGABEHR/ZOOKEEPER-3020-2 and squashes the following commits:

8f1df370a [Beluga Behr] General cleanup
83cab7382 [Beluga Behr] Simplify and streamline access pattern of queuedRequests
5e0b801c8 [Beluga Behr] Use a semaphore to track status of snapshot thread
d119ef740 [Beluga Behr] No need to keep a flag which states if the thread is alive
e2059cf6e [Beluga Behr] Replace LinkedList with ArrayDeque
57a044902 [Beluga Behr] Replace Random with ThreadLocalRandom
17bd1fa18 [Beluga Behr] Require that the request not be 'null'
b28ff9479 [Beluga Behr] Refactor flush method
Beluga Behr 6 سال پیش
والد
کامیت
876aaf42ea
1فایلهای تغییر یافته به همراه92 افزوده شده و 93 حذف شده
  1. 92 93
      zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java

+ 92 - 93
zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java

@@ -20,9 +20,13 @@ package org.apache.zookeeper.server;
 
 import java.io.Flushable;
 import java.io.IOException;
-import java.util.LinkedList;
-import java.util.Random;
+import java.util.ArrayDeque;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,28 +50,31 @@ import org.slf4j.LoggerFactory;
  */
 public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
         RequestProcessor {
+
     private static final Logger LOG = LoggerFactory.getLogger(SyncRequestProcessor.class);
-    private final ZooKeeperServer zks;
-    private final LinkedBlockingQueue<Request> queuedRequests =
+
+    private static final int FLUSH_SIZE = 1000;
+
+    private static final Request REQUEST_OF_DEATH = Request.requestOfDeath;
+
+    /** The number of log entries to log before starting a snapshot */
+    private static int snapCount = ZooKeeperServer.getSnapCount();
+
+    private final BlockingQueue<Request> queuedRequests =
         new LinkedBlockingQueue<Request>();
-    private final RequestProcessor nextProcessor;
 
-    private Thread snapInProcess = null;
-    volatile private boolean running;
+    private final Semaphore snapThreadMutex = new Semaphore(1);
+
+    private final ZooKeeperServer zks;
+
+    private final RequestProcessor nextProcessor;
 
     /**
      * Transactions that have been written and are waiting to be flushed to
      * disk. Basically this is the list of SyncItems whose callbacks will be
      * invoked after flush returns successfully.
      */
-    private final LinkedList<Request> toFlush = new LinkedList<Request>();
-    private final Random r = new Random();
-    /**
-     * The number of log entries to log before starting a snapshot
-     */
-    private static int snapCount = ZooKeeperServer.getSnapCount();
-
-    private final Request requestOfDeath = Request.requestOfDeath;
+    private final Queue<Request> toFlush = new ArrayDeque<>(FLUSH_SIZE);
 
     public SyncRequestProcessor(ZooKeeperServer zks,
             RequestProcessor nextProcessor) {
@@ -75,7 +82,6 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
                 .getZooKeeperServerListener());
         this.zks = zks;
         this.nextProcessor = nextProcessor;
-        running = true;
     }
 
     /**
@@ -102,103 +108,96 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
 
             // we do this in an attempt to ensure that not all of the servers
             // in the ensemble take a snapshot at the same time
-            int randRoll = r.nextInt(snapCount/2);
+            int randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2, snapCount);
             while (true) {
-                Request si = null;
-                if (toFlush.isEmpty()) {
+                Request si = queuedRequests.poll();
+                if (si == null) {
+                    flush();
                     si = queuedRequests.take();
-                } else {
-                    si = queuedRequests.poll();
-                    if (si == null) {
-                        flush(toFlush);
-                        continue;
-                    }
                 }
-                if (si == requestOfDeath) {
+  
+                if (si == REQUEST_OF_DEATH) {
                     break;
                 }
-                if (si != null) {
-                    // track the number of records written to the log
-                    if (zks.getZKDatabase().append(si)) {
-                        logCount++;
-                        if (logCount > (snapCount / 2 + randRoll)) {
-                            randRoll = r.nextInt(snapCount/2);
-                            // roll the log
-                            zks.getZKDatabase().rollLog();
-                            // take a snapshot
-                            if (snapInProcess != null && snapInProcess.isAlive()) {
-                                LOG.warn("Too busy to snap, skipping");
-                            } else {
-                                snapInProcess = new ZooKeeperThread("Snapshot Thread") {
-                                        public void run() {
-                                            try {
-                                                zks.takeSnapshot();
-                                            } catch(Exception e) {
-                                                LOG.warn("Unexpected exception", e);
-                                            }
-                                        }
-                                    };
-                                snapInProcess.start();
-                            }
-                            logCount = 0;
-                        }
-                    } else if (toFlush.isEmpty()) {
-                        // optimization for read heavy workloads
-                        // iff this is a read, and there are no pending
-                        // flushes (writes), then just pass this to the next
-                        // processor
-                        if (nextProcessor != null) {
-                            nextProcessor.processRequest(si);
-                            if (nextProcessor instanceof Flushable) {
-                                ((Flushable)nextProcessor).flush();
-                            }
+
+                // track the number of records written to the log
+                if (zks.getZKDatabase().append(si)) {
+                    logCount++;
+                    if (logCount > randRoll) {
+                        randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2, snapCount);
+                        // roll the log
+                        zks.getZKDatabase().rollLog();
+                        // take a snapshot
+                        if (!snapThreadMutex.tryAcquire()) {
+                            LOG.warn("Too busy to snap, skipping");
+                        } else {
+                            new ZooKeeperThread("Snapshot Thread") {
+                                public void run() {
+                                    try {
+                                        zks.takeSnapshot();
+                                    } catch (Exception e) {
+                                        LOG.warn("Unexpected exception", e);
+                                    } finally {
+                                      snapThreadMutex.release();
+                                    }
+                                }
+                            }.start();
                         }
-                        continue;
+                        logCount = 0;
                     }
-                    toFlush.add(si);
-                    if (toFlush.size() > 1000) {
-                        flush(toFlush);
+                } else if (toFlush.isEmpty()) {
+                    // optimization for read heavy workloads
+                    // iff this is a read, and there are no pending
+                    // flushes (writes), then just pass this to the next
+                    // processor
+                    if (nextProcessor != null) {
+                        nextProcessor.processRequest(si);
+                        if (nextProcessor instanceof Flushable) {
+                            ((Flushable)nextProcessor).flush();
+                        }
                     }
+                    continue;
+                }
+                toFlush.add(si);
+                if (toFlush.size() == FLUSH_SIZE) {
+                    flush();
                 }
             }
         } catch (Throwable t) {
             handleException(this.getName(), t);
-        } finally{
-            running = false;
         }
         LOG.info("SyncRequestProcessor exited!");
     }
 
-    private void flush(LinkedList<Request> toFlush)
-        throws IOException, RequestProcessorException
-    {
-        if (toFlush.isEmpty())
-            return;
-
-        zks.getZKDatabase().commit();
-        while (!toFlush.isEmpty()) {
-            Request i = toFlush.remove();
-            if (nextProcessor != null) {
-                nextProcessor.processRequest(i);
-            }
-        }
-        if (nextProcessor != null && nextProcessor instanceof Flushable) {
-            ((Flushable)nextProcessor).flush();
-        }
+    private void flush() throws IOException, RequestProcessorException {
+      if (this.toFlush.isEmpty()) {
+          return;
+      }
+
+      zks.getZKDatabase().commit();
+
+      if (this.nextProcessor == null) {
+        this.toFlush.clear();
+      } else {
+          while (!this.toFlush.isEmpty()) {
+              final Request i = this.toFlush.remove();
+              this.nextProcessor.processRequest(i);
+          }
+          if (this.nextProcessor instanceof Flushable) {
+              ((Flushable)this.nextProcessor).flush();
+          } 
+      }
     }
 
     public void shutdown() {
         LOG.info("Shutting down");
-        queuedRequests.add(requestOfDeath);
+        queuedRequests.add(REQUEST_OF_DEATH);
         try {
-            if(running){
-                this.join();
-            }
-            if (!toFlush.isEmpty()) {
-                flush(toFlush);
-            }
-        } catch(InterruptedException e) {
+            this.join();
+            this.flush();
+        } catch (InterruptedException e) {
             LOG.warn("Interrupted while wating for " + this + " to finish");
+            Thread.currentThread().interrupt();
         } catch (IOException e) {
             LOG.warn("Got IO exception during shutdown");
         } catch (RequestProcessorException e) {
@@ -209,8 +208,8 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
         }
     }
 
-    public void processRequest(Request request) {
-        // request.addRQRec(">sync");
+    public void processRequest(final Request request) {
+        Objects.requireNonNull(request, "Request cannot be null");
         queuedRequests.add(request);
     }