Browse Source

ZOOKEEPER-4794: Reduce the ZKDatabase#committedLog memory usage.

Reduce the committed log memory usage.
Fix ci.
Reviewers: eolivelli, hangc0276, anmolnar
Author: horizonzy
Closes #2115 from horizonzy/reduce-committed-log-memory-usage
Yan Zhao 1 year ago
parent
commit
18c78cd10b

+ 5 - 14
zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java

@@ -19,7 +19,6 @@
 package org.apache.zookeeper.server;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -169,24 +168,16 @@ public class Request {
                 && this.type != OpCode.createSession;
     }
 
-    private transient byte[] serializeData;
-
-    @SuppressFBWarnings(value = "EI_EXPOSE_REP")
     public byte[] getSerializeData() {
         if (this.hdr == null) {
             return null;
         }
-
-        if (this.serializeData == null) {
-            try {
-                this.serializeData = Util.marshallTxnEntry(this.hdr, this.txn, this.txnDigest);
-            } catch (IOException e) {
-                LOG.error("This really should be impossible.", e);
-                this.serializeData = new byte[32];
-            }
+        try {
+            return Util.marshallTxnEntry(this.hdr, this.txn, this.txnDigest);
+        } catch (IOException e) {
+            LOG.error("This really should be impossible.", e);
+            return new byte[32];
         }
-
-        return this.serializeData;
     }
 
     /**

+ 3 - 4
zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogProposalIterator.java

@@ -58,20 +58,19 @@ public class TxnLogProposalIterator implements Iterator<Proposal> {
     @Override
     public Proposal next() {
 
-        Proposal p = new Proposal();
+        Proposal p;
         try {
             byte[] serializedData = Util.marshallTxnEntry(itr.getHeader(), itr.getTxn(), itr.getDigest());
 
             QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, itr.getHeader().getZxid(), serializedData, null);
-            p.packet = pp;
-            p.request = null;
-
+            p = new Proposal(pp);
             // This is the only place that can throw IO exception
             hasNext = itr.next();
 
         } catch (IOException e) {
             LOG.error("Unable to read txnlog from disk", e);
             hasNext = false;
+            p = new Proposal();
         }
 
         return p;

+ 4 - 9
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java

@@ -54,9 +54,8 @@ import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog.PlayBackListener;
 import org.apache.zookeeper.server.persistence.SnapStream;
 import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
-import org.apache.zookeeper.server.quorum.Leader;
 import org.apache.zookeeper.server.quorum.Leader.Proposal;
-import org.apache.zookeeper.server.quorum.QuorumPacket;
+import org.apache.zookeeper.server.quorum.Leader.PureRequestProposal;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.txn.TxnDigest;
@@ -323,19 +322,15 @@ public class ZKDatabase {
             wl.lock();
             if (committedLog.size() > commitLogCount) {
                 committedLog.remove();
-                minCommittedLog = committedLog.peek().packet.getZxid();
+                minCommittedLog = committedLog.peek().getZxid();
             }
             if (committedLog.isEmpty()) {
                 minCommittedLog = request.zxid;
                 maxCommittedLog = request.zxid;
             }
-            byte[] data = request.getSerializeData();
-            QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
-            Proposal p = new Proposal();
-            p.packet = pp;
-            p.request = request;
+            PureRequestProposal p = new PureRequestProposal(request);
             committedLog.add(p);
-            maxCommittedLog = p.packet.getZxid();
+            maxCommittedLog = p.getZxid();
         } finally {
             wl.unlock();
         }

+ 49 - 5
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java

@@ -88,14 +88,60 @@ public class Leader extends LearnerMaster {
 
     public static class Proposal extends SyncedLearnerTracker {
 
-        public QuorumPacket packet;
-        public Request request;
+        private QuorumPacket packet;
+        protected Request request;
+
+        public Proposal() {
+        }
+
+        public Proposal(QuorumPacket packet) {
+            this.packet = packet;
+        }
+
+        public Proposal(Request request, QuorumPacket packet) {
+            this.request = request;
+            this.packet = packet;
+        }
+
+        public QuorumPacket getQuorumPacket() {
+            return packet;
+        }
+
+        public Request getRequest() {
+            return request;
+        }
+
+        public long getZxid() {
+            return packet.getZxid();
+        }
 
         @Override
         public String toString() {
             return packet.getType() + ", " + packet.getZxid() + ", " + request;
         }
+    }
 
+    public static class PureRequestProposal extends Proposal {
+
+        public PureRequestProposal(Request request) {
+            this.request = request;
+        }
+
+        @Override
+        public QuorumPacket getQuorumPacket() {
+            byte[] data = request.getSerializeData();
+            return new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
+        }
+
+        @Override
+        public long getZxid() {
+            return request.zxid;
+        }
+
+        @Override
+        public String toString() {
+            return request.toString();
+        }
     }
 
     // log ack latency if zxid is a multiple of ackLoggingFrequency. If <=0, disable logging.
@@ -1258,9 +1304,7 @@ public class Leader extends LearnerMaster {
         proposalStats.setLastBufferSize(data.length);
         QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
 
-        Proposal p = new Proposal();
-        p.packet = pp;
-        p.request = request;
+        Proposal p = new Proposal(request, pp);
 
         synchronized (this) {
             p.addQuorumVerifier(self.getQuorumVerifier());

+ 2 - 2
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java

@@ -957,7 +957,7 @@ public class LearnerHandler extends ZooKeeperThread {
         while (itr.hasNext()) {
             Proposal propose = itr.next();
 
-            long packetZxid = propose.packet.getZxid();
+            long packetZxid = propose.getZxid();
             // abort if we hit the limit
             if ((maxZxid != null) && (packetZxid > maxZxid)) {
                 break;
@@ -1020,7 +1020,7 @@ public class LearnerHandler extends ZooKeeperThread {
 
             // Since this is already a committed proposal, we need to follow
             // it by a commit packet
-            queuePacket(propose.packet);
+            queuePacket(propose.getQuorumPacket());
             queueOpPacket(Leader.COMMIT, packetZxid);
             queuedZxid = packetZxid;
 

+ 5 - 5
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumOracleMaj.java

@@ -123,18 +123,18 @@ public class QuorumOracleMaj extends QuorumMaj {
         LOG.debug("Start Revalidation outstandingProposals");
         try {
             while (outstandingProposal.size() >= 1) {
-                outstandingProposal.sort((o1, o2) -> (int) (o1.packet.getZxid() - o2.packet.getZxid()));
+                outstandingProposal.sort((o1, o2) -> (int) (o1.getZxid() - o2.getZxid()));
 
                 Leader.Proposal p;
                 int i = 0;
                 while (i < outstandingProposal.size()) {
                     p = outstandingProposal.get(i);
-                    if (p.request.zxid > lastCommitted) {
-                        LOG.debug("Re-validate outstanding proposal: 0x{} size:{} lastCommitted:{}", Long.toHexString(p.request.zxid), outstandingProposal.size(), Long.toHexString(lastCommitted));
-                        if (!self.tryToCommit(p, p.request.zxid, null)) {
+                    if (p.getZxid() > lastCommitted) {
+                        LOG.debug("Re-validate outstanding proposal: 0x{} size:{} lastCommitted:{}", Long.toHexString(p.getZxid()), outstandingProposal.size(), Long.toHexString(lastCommitted));
+                        if (!self.tryToCommit(p, p.getZxid(), null)) {
                             break;
                         } else {
-                            lastCommitted = p.request.zxid;
+                            lastCommitted = p.getZxid();
                             outstandingProposal.remove(p);
                         }
                     }

+ 4 - 1
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java

@@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import java.io.File;
+import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.Set;
@@ -143,7 +144,9 @@ public class LeaderWithObserverTest {
         long zxid = leader.zk.getZxid();
 
         // things needed for waitForNewLeaderAck to run (usually in leader.lead(), but we're not running leader here)
-        leader.newLeaderProposal.packet = new QuorumPacket(0, zxid, null, null);
+        Field field = Leader.Proposal.class.getDeclaredField("packet");
+        field.setAccessible(true);
+        field.set(leader.newLeaderProposal, new QuorumPacket(0, zxid, null, null));
         leader.newLeaderProposal.addQuorumVerifier(peer.getQuorumVerifier());
 
         Set<Long> ackSet = leader.newLeaderProposal.qvAcksetPairs.get(0).getAckset();

+ 7 - 7
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java

@@ -86,14 +86,14 @@ public class LearnerHandlerTest extends ZKTestCase {
 
         public long getmaxCommittedLog() {
             if (!committedLog.isEmpty()) {
-                return committedLog.getLast().packet.getZxid();
+                return committedLog.getLast().getZxid();
             }
             return 0;
         }
 
         public long getminCommittedLog() {
             if (!committedLog.isEmpty()) {
-                return committedLog.getFirst().packet.getZxid();
+                return committedLog.getFirst().getZxid();
             }
             return 0;
         }
@@ -107,7 +107,7 @@ public class LearnerHandlerTest extends ZKTestCase {
         }
 
         public Iterator<Proposal> getProposalsFromTxnLog(long peerZxid, long limit) {
-            if (peerZxid >= txnLog.peekFirst().packet.getZxid()) {
+            if (peerZxid >= txnLog.peekFirst().getZxid()) {
                 return txnLog.iterator();
             } else {
                 return Collections.emptyIterator();
@@ -150,10 +150,10 @@ public class LearnerHandlerTest extends ZKTestCase {
     }
 
     Proposal createProposal(long zxid) {
-        Proposal p = new Proposal();
-        p.packet = new QuorumPacket();
-        p.packet.setZxid(zxid);
-        p.packet.setType(Leader.PROPOSAL);
+        QuorumPacket packet = new QuorumPacket();
+        packet.setZxid(zxid);
+        packet.setType(Leader.PROPOSAL);
+        Proposal p = new Proposal(packet);
         return p;
     }
 

+ 1 - 1
zookeeper-server/src/test/java/org/apache/zookeeper/test/GetProposalFromTxnTest.java

@@ -107,7 +107,7 @@ public class GetProposalFromTxnTest extends ZKTestCase {
         while (itr.hasNext()) {
             Proposal proposal = itr.next();
             TxnLogEntry logEntry = SerializeUtils.deserializeTxn(
-                    proposal.packet.getData());
+                    proposal.getQuorumPacket().getData());
             TxnHeader hdr = logEntry.getHeader();
             Record rec = logEntry.getTxn();
             if (hdr.getType() == OpCode.create) {

+ 2 - 2
zookeeper-server/src/test/java/org/apache/zookeeper/test/LocalSessionRequestTest.java

@@ -82,8 +82,8 @@ public class LocalSessionRequestTest extends ZKTestCase {
         QuorumPeer peer = qb.getPeerList().get(peerId);
         ZKDatabase db = peer.getActiveServer().getZKDatabase();
         for (Proposal p : db.getCommittedLog()) {
-            assertFalse(p.request.sessionId == sessionId,
-                    "Should not see " + Request.op2String(p.request.type)
+            assertFalse(p.getRequest().sessionId == sessionId,
+                    "Should not see " + Request.op2String(p.getRequest().type)
                             + " request from local session 0x" + session + " on the " + peerType);
         }
     }