Преглед изворни кода

ZOOKEEPER-3335: Improve the usage of Collections

Author: Beluga Behr <dam6923@gmail.com>

Reviewers: andor@apache.org

Closes #870 from BELUGABEHR/ZOOKEEPER-3335 and squashes the following commits:

2a1d80347 [Beluga Behr] Reverted variable name change to lessen diff
384d46755 [Beluga Behr] ZOOKEEPER-3335: Improve the usage of Collections
Beluga Behr пре 6 година
родитељ
комит
57e8318bc9

+ 3 - 2
zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java

@@ -30,14 +30,15 @@ import java.net.SocketAddress;
 import java.net.SocketException;
 import java.nio.ByteBuffer;
 import java.util.HashSet;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.Map.Entry;
+import java.util.Queue;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -132,7 +133,7 @@ public class ClientCnxn {
     /**
      * These are the packets that have been sent and are waiting for a response.
      */
-    private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();
+    private final Queue<Packet> pendingQueue = new ArrayDeque<>();
 
     /**
      * These are the packets that need to be sent.

+ 2 - 2
zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java

@@ -23,7 +23,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.text.MessageFormat;
-import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -208,7 +208,7 @@ abstract class ClientCnxnSocket {
      * @throws IOException
      * @throws InterruptedException
      */
-    abstract void doTransport(int waitTimeOut, List<Packet> pendingQueue,
+    abstract void doTransport(int waitTimeOut, Queue<Packet> pendingQueue,
             ClientCnxn cnxn)
             throws IOException, InterruptedException;
 

+ 3 - 3
zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java

@@ -27,7 +27,7 @@ import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
 import java.util.Iterator;
-import java.util.List;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingDeque;
 
@@ -65,7 +65,7 @@ public class ClientCnxnSocketNIO extends ClientCnxnSocket {
      * @throws InterruptedException
      * @throws IOException
      */
-    void doIO(List<Packet> pendingQueue, ClientCnxn cnxn)
+    void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn)
       throws InterruptedException, IOException {
         SocketChannel sock = (SocketChannel) sockKey.channel();
         if (sock == null) {
@@ -340,7 +340,7 @@ public class ClientCnxnSocketNIO extends ClientCnxnSocket {
     }
     
     @Override
-    void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn)
+    void doTransport(int waitTimeOut, Queue<Packet> pendingQueue, ClientCnxn cnxn)
             throws IOException, InterruptedException {
         selector.select(waitTimeOut);
         Set<SelectionKey> selected;

+ 3 - 3
zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java

@@ -22,7 +22,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Iterator;
-import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -262,7 +262,7 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
 
     @Override
     void doTransport(int waitTimeOut,
-                     List<Packet> pendingQueue,
+                     Queue<Packet> pendingQueue,
                      ClientCnxn cnxn)
             throws IOException, InterruptedException {
         try {
@@ -352,7 +352,7 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
     /**
      * doWrite handles writing the packets from outgoingQueue via network to server.
      */
-    private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {
+    private void doWrite(Queue<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {
         updateNow();
         boolean anyPacketsSent = false;
         while (true) {

+ 5 - 8
zookeeper-server/src/main/java/org/apache/zookeeper/ZKUtil.java

@@ -18,11 +18,11 @@
 package org.apache.zookeeper;
 
 import java.io.File;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Deque;
-import java.util.LinkedList;
 import java.util.List;
+import java.util.Queue;
 
 import org.apache.zookeeper.AsyncCallback.StringCallback;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
@@ -124,15 +124,12 @@ public class ZKUtil {
      */
     public static List<String> listSubTreeBFS(ZooKeeper zk, final String pathRoot) throws
         KeeperException, InterruptedException {
-        Deque<String> queue = new LinkedList<String>();
+        Queue<String> queue = new ArrayDeque<>();
         List<String> tree = new ArrayList<String>();
         queue.add(pathRoot);
         tree.add(pathRoot);
-        while (true) {
-            String node = queue.pollFirst();
-            if (node == null) {
-                break;
-            }
+        while (!queue.isEmpty()) {
+            String node = queue.poll();
             List<String> children = zk.getChildren(node, false);
             for (final String child : children) {
                 final String childPath = node + "/" + child;

+ 12 - 9
zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java

@@ -71,9 +71,9 @@ import java.io.IOException;
 import java.io.StringReader;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -905,14 +905,17 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements
         nextProcessor.processRequest(request);
     }
 
-    private List<ACL> removeDuplicates(List<ACL> acl) {
+    private List<ACL> removeDuplicates(final List<ACL> acls) {
+        if (acls == null || acls.isEmpty()) {
+          return Collections.emptyList();
+        }
 
-        List<ACL> retval = new LinkedList<ACL>();
-        if (acl != null) {
-            for (ACL a : acl) {
-                if (!retval.contains(a)) {
-                    retval.add(a);
-                }
+        // This would be done better with a Set but ACL hashcode/equals do not
+        // allow for null values
+        final ArrayList<ACL> retval = new ArrayList<>(acls.size());
+        for (final ACL acl : acls) {
+            if (!retval.contains(acl)) {
+              retval.add(acl);
             }
         }
         return retval;
@@ -960,7 +963,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements
         if (uniqacls == null || uniqacls.size() == 0) {
             throw new KeeperException.InvalidACLException(path);
         }
-        List<ACL> rv = new LinkedList<ACL>();
+        List<ACL> rv = new ArrayList<>();
         for (ACL a: uniqacls) {
             LOG.debug("Processing ACL: {}", a);
             if (a == null) {

+ 16 - 10
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java

@@ -21,11 +21,14 @@ package org.apache.zookeeper.server;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -84,7 +87,7 @@ public class ZKDatabase {
 
     public static final int commitLogCount = 500;
     protected static int commitLogBuffer = 700;
-    protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();
+    protected Queue<Proposal> committedLog = new ArrayDeque<>();
     protected ReentrantReadWriteLock logLock = new ReentrantReadWriteLock();
     volatile private boolean initialized = false;
 
@@ -187,18 +190,21 @@ public class ZKDatabase {
     }
 
 
-    public synchronized List<Proposal> getCommittedLog() {
+    public synchronized Collection<Proposal> getCommittedLog() {
+        final Collection<Proposal> result;
         ReadLock rl = logLock.readLock();
-        // only make a copy if this thread isn't already holding a lock
-        if(logLock.getReadHoldCount() <=0) {
+        // make a copy if this thread is not already holding a lock
+        if (logLock.getReadHoldCount() > 0) {
+          result = this.committedLog;
+        } else {
+            rl.lock();
             try {
-                rl.lock();
-                return new LinkedList<Proposal>(this.committedLog);
+                result = new ArrayList<>(this.committedLog);
             } finally {
                 rl.unlock();
             }
         }
-        return this.committedLog;
+        return Collections.unmodifiableCollection(result);
     }
 
     /**
@@ -281,8 +287,8 @@ public class ZKDatabase {
         try {
             wl.lock();
             if (committedLog.size() > commitLogCount) {
-                committedLog.removeFirst();
-                minCommittedLog = committedLog.getFirst().packet.getZxid();
+                committedLog.remove();
+                minCommittedLog = committedLog.peek().packet.getZxid();
             }
             if (committedLog.isEmpty()) {
                 minCommittedLog = request.zxid;

+ 1 - 2
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -30,7 +30,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Deque;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -348,7 +347,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         }
 
         // Clean up dead sessions
-        List<Long> deadSessions = new LinkedList<Long>();
+        List<Long> deadSessions = new ArrayList<>();
         for (Long session : zkDb.getSessions()) {
             if (zkDb.getSessionWithTimeOuts().get(session) == null) {
                 deadSessions.add(session);

+ 4 - 4
zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java

@@ -29,9 +29,10 @@ import java.io.InputStream;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.TimeUnit;
 import java.util.zip.Adler32;
 import java.util.zip.Checksum;
@@ -148,8 +149,7 @@ public class FileTxnLog implements TxnLog {
     File logDir;
     private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no");
     long dbId;
-    private LinkedList<FileOutputStream> streamsToFlush =
-        new LinkedList<FileOutputStream>();
+    private final Queue<FileOutputStream> streamsToFlush = new ArrayDeque<>();
     File logFileWrite = null;
     private FilePadding filePadding = new FilePadding();
 
@@ -392,7 +392,7 @@ public class FileTxnLog implements TxnLog {
             }
         }
         while (streamsToFlush.size() > 1) {
-            streamsToFlush.removeFirst().close();
+            streamsToFlush.poll().close();
         }
 
         // Roll the log file if we exceed the size limit

+ 9 - 11
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java

@@ -20,9 +20,11 @@ package org.apache.zookeeper.server.quorum;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Deque;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -96,8 +98,8 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
      * Requests that we are holding until commit comes in. Keys represent
      * session ids, each value is a linked list of the session's requests.
      */
-    protected final Map<Long, LinkedList<Request>> pendingRequests =
-            new HashMap<Long, LinkedList<Request>>(10000);
+    protected final Map<Long, Deque<Request>> pendingRequests =
+            new HashMap<>(10000);
 
     /** The number of requests currently being processed */
     protected final AtomicInteger numRequestsProcessing = new AtomicInteger(0);
@@ -200,13 +202,9 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
                     if (needCommit(request)
                             || pendingRequests.containsKey(request.sessionId)) {
                         // Add request to pending
-                        LinkedList<Request> requests = pendingRequests
-                                .get(request.sessionId);
-                        if (requests == null) {
-                            requests = new LinkedList<Request>();
-                            pendingRequests.put(request.sessionId, requests);
-                        }
-                        requests.addLast(request);
+                        pendingRequests
+                            .computeIfAbsent(request.sessionId, sid -> new ArrayDeque<>())
+                            .add(request);
                     }
                     else {
                         sendToNextProcessor(request);
@@ -248,7 +246,7 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
                     /*
                      * Check if request is pending, if so, update it with the committed info
                      */
-                    LinkedList<Request> sessionQueue = pendingRequests
+                    Deque<Request> sessionQueue = pendingRequests
                             .get(request.sessionId);
                     if (sessionQueue != null) {
                         // If session queue != null, then it is also not empty.

+ 4 - 3
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java

@@ -28,7 +28,8 @@ import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.ByteBuffer;
-import java.util.LinkedList;
+import java.util.ArrayDeque;
+import java.util.Deque;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
@@ -396,8 +397,8 @@ public class Learner {
         boolean snapshotNeeded = true;
         boolean syncSnapshot = false;
         readPacket(qp);
-        LinkedList<Long> packetsCommitted = new LinkedList<Long>();
-        LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
+        Deque<Long> packetsCommitted = new ArrayDeque<>();
+        Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
         synchronized (zk) {
             if (qp.getType() == Leader.DIFF) {
                 LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));

+ 2 - 1
zookeeper-server/src/test/java/org/apache/zookeeper/test/RestoreCommittedLogTest.java

@@ -19,6 +19,7 @@
 package org.apache.zookeeper.test;
 
 import java.io.File;
+import java.util.Collection;
 import java.util.List;
 
 import org.apache.zookeeper.CreateMode;
@@ -74,7 +75,7 @@ public class RestoreCommittedLogTest extends ZKTestCase{
         // start server again
         zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
         zks.startdata();
-        List<Proposal> committedLog = zks.getZKDatabase().getCommittedLog();
+        Collection<Proposal> committedLog = zks.getZKDatabase().getCommittedLog();
         int logsize = committedLog.size();
         LOG.info("committedLog size = {}", logsize);
         Assert.assertTrue("log size != 0", (logsize != 0));