瀏覽代碼

ZOOKEEPER-1322. Cleanup/fix logging in Quorum code. (phunt via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1240921 13f79535-47bb-0310-9956-ffa450edef68
Mahadev Konar 13 年之前
父節點
當前提交
5ddc24bf36

+ 3 - 0
CHANGES.txt

@@ -196,6 +196,9 @@ IMPROVEMENTS:
   ZOOKEEPER-1293. Remove unused readyToStart from Leader.java
   (Alex Shraer via phunt)
 
+  ZOOKEEPER-1322. Cleanup/fix logging in Quorum code. 
+  (phunt via mahadev)
+
 Release 3.4.0 - 
 
 Non-backward compatible changes:

+ 6 - 5
src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java

@@ -40,10 +40,10 @@ import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.InputArchive;
 import org.apache.jute.OutputArchive;
 import org.apache.jute.Record;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.txn.TxnHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class implements the TxnLog interface. It provides api's
@@ -601,8 +601,9 @@ public class FileTxnLog implements TxnLog {
                 long crcValue = ia.readLong("crcvalue");
                 byte[] bytes = Util.readTxnBytes(ia);
                 // Since we preallocate, we define EOF to be an
-                if (bytes == null || bytes.length==0)
-                   throw new EOFException("Failed to read");
+                if (bytes == null || bytes.length==0) {
+                    throw new EOFException("Failed to read " + logFile);
+                }
                 // EOF or corrupted record
                 // validate CRC
                 Checksum crc = makeChecksumAlgorithm();
@@ -620,7 +621,7 @@ public class FileTxnLog implements TxnLog {
                 ia = null;
                 hdr = null;
                 // this means that the file has ended
-                // we shoud go to the next file
+                // we should go to the next file
                 if (!goToNextLog()) {
                     return false;
                 }

+ 7 - 7
src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java

@@ -175,7 +175,7 @@ public class FileTxnSnapLog {
                     ((CreateSessionTxn) txn).getTimeOut());
             if (LOG.isTraceEnabled()) {
                 ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
-                        "playLog --- create session in log: "
+                        "playLog --- create session in log: 0x"
                                 + Long.toHexString(hdr.getClientId())
                                 + " with timeout: "
                                 + ((CreateSessionTxn) txn).getTimeOut());
@@ -187,7 +187,7 @@ public class FileTxnSnapLog {
             sessions.remove(hdr.getClientId());
             if (LOG.isTraceEnabled()) {
                 ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
-                        "playLog --- close session in log: "
+                        "playLog --- close session in log: 0x"
                                 + Long.toHexString(hdr.getClientId()));
             }
             rc = dt.processTxn(hdr, txn);
@@ -234,11 +234,11 @@ public class FileTxnSnapLog {
             ConcurrentHashMap<Long, Integer> sessionsWithTimeouts)
         throws IOException {
         long lastZxid = dataTree.lastProcessedZxid;
-        LOG.info("Snapshotting: " + Long.toHexString(lastZxid));
-        File snapshot=new File(
-                snapDir, Util.makeSnapshotName(lastZxid));
-        snapLog.serialize(dataTree, sessionsWithTimeouts, snapshot);
-
+        File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
+        LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),
+                snapshotFile);
+        snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile);
+        
     }
 
     /**

+ 33 - 30
src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java

@@ -27,14 +27,14 @@ import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
 import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
 import org.apache.zookeeper.server.util.ZxidUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -270,7 +270,7 @@ public class FastLeaderElection implements Election {
                                 n.peerEpoch = response.buffer.getLong();
                             } else {
                                 if(LOG.isInfoEnabled()){
-                                    LOG.info("Backward compatibility mode, server id: " + n.sid);
+                                    LOG.info("Backward compatibility mode, server id=" + n.sid);
                                 }
                                 n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid);
                             }
@@ -315,10 +315,10 @@ public class FastLeaderElection implements Election {
                                 if(ackstate == QuorumPeer.ServerState.LOOKING){
                                     if(LOG.isDebugEnabled()){
                                         LOG.debug("Sending new notification. My id =  " +
-                                                self.getId() + ", Recipient = " +
-                                                response.sid + " zxid =" +
-                                                current.getZxid() + " leader=" +
-                                                current.getId());
+                                                self.getId() + " recipient=" +
+                                                response.sid + " zxid=0x" +
+                                                Long.toHexString(current.getZxid()) +
+                                                " leader=" + current.getId());
                                     }
                                     ToSend notmsg = new ToSend(
                                             ToSend.mType.notification,
@@ -482,10 +482,10 @@ public class FastLeaderElection implements Election {
 
     private void leaveInstance(Vote v) {
         if(LOG.isDebugEnabled()){
-            LOG.debug("About to leave FLE instance: Leader= "
-                + v.getId() + ", Zxid = " +
-                v.getZxid() + ", My id = " + self.getId()
-                + ", My state = " + self.getPeerState());
+            LOG.debug("About to leave FLE instance: leader="
+                + v.getId() + ", zxid=0x" +
+                Long.toHexString(v.getZxid()) + ", my id=" + self.getId()
+                + ", my state=" + self.getPeerState());
         }
         recvqueue.clear();
     }
@@ -520,10 +520,10 @@ public class FastLeaderElection implements Election {
                     sid,
                     proposedEpoch);
             if(LOG.isDebugEnabled()){
-                LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), "  +
-                      proposedZxid + " (n.zxid), " + logicalclock  +
+                LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +
+                      Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock)  +
                       " (n.round), " + sid + " (recipient), " + self.getId() +
-                      " (myid), " + proposedEpoch + " (n.peerEpoch)");
+                      " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
             }
             sendqueue.offer(notmsg);
         }
@@ -531,10 +531,12 @@ public class FastLeaderElection implements Election {
 
 
     private void printNotification(Notification n){
-        LOG.info("Notification: " + n.leader + " (n.leader), " + n.zxid +
-                " (n.zxid), " + n.electionEpoch + " (n.round), " + n.state +
-                " (n.state), " + n.sid + " (n.sid), " + n.peerEpoch + " (n.peerEPoch), " +
-                self.getPeerState() + " (my state)");
+        LOG.info("Notification: " + n.leader + " (n.leader), 0x"
+                + Long.toHexString(n.zxid) + " (n.zxid), 0x"
+                + Long.toHexString(n.electionEpoch) + " (n.round), " + n.state
+                + " (n.state), " + n.sid + " (n.sid), 0x"
+                + Long.toHexString(n.peerEpoch) + " (n.peerEPoch), "
+                + self.getPeerState() + " (my state)");
     }
 
     /**
@@ -545,8 +547,8 @@ public class FastLeaderElection implements Election {
      * @param zxid  Last zxid observed by the issuer of this vote
      */
     private boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
-        LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: " + 
-                newZxid + ", proposed zxid: " + curZxid);
+        LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
+                Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
         if(self.getQuorumVerifier().getWeight(newId) == 0){
             return false;
         }
@@ -618,9 +620,9 @@ public class FastLeaderElection implements Election {
 
     synchronized void updateProposal(long leader, long zxid, long epoch){
         if(LOG.isDebugEnabled()){
-            LOG.debug("Updating proposal: " + leader + " (newleader), " + zxid +
-                  " (newzxid), " + proposedLeader + " (oldleader), " +
-                  proposedZxid + " (oldzxid)");
+            LOG.debug("Updating proposal: " + leader + " (newleader), 0x"
+                    + Long.toHexString(zxid) + " (newzxid), " + proposedLeader
+                    + " (oldleader), 0x" + Long.toHexString(proposedZxid) + " (oldzxid)");
         }
         proposedLeader = leader;
         proposedZxid = zxid;
@@ -718,7 +720,7 @@ public class FastLeaderElection implements Election {
             }
 
             LOG.info("New election. My id =  " + self.getId() +
-                    ", Proposed zxid = " + proposedZxid);
+                    ", proposed zxid=0x" + Long.toHexString(proposedZxid));
             sendNotifications();
 
             /*
@@ -775,8 +777,9 @@ public class FastLeaderElection implements Election {
                             sendNotifications();
                         } else if (n.electionEpoch < logicalclock) {
                             if(LOG.isDebugEnabled()){
-                                LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = " + n.electionEpoch
-                                        + ", Logical clock" + logicalclock);
+                                LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+                                        + Long.toHexString(n.electionEpoch)
+                                        + ", logicalclock=0x" + Long.toHexString(logicalclock));
                             }
                             break;
                         } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
@@ -786,10 +789,10 @@ public class FastLeaderElection implements Election {
                         }
 
                         if(LOG.isDebugEnabled()){
-                            LOG.debug("Adding vote: From = " + n.sid +
-                                    ", Proposed leader = " + n.leader +
-                                    ", Proposed zxid = " + n.zxid +
-                                    ", Proposed election epoch = " + n.electionEpoch);
+                            LOG.debug("Adding vote: from=" + n.sid +
+                                    ", proposed leader=" + n.leader +
+                                    ", proposed zxid=0x" + Long.toHexString(n.zxid) +
+                                    ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                         }
 
                         recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

+ 18 - 21
src/java/main/org/apache/zookeeper/server/quorum/Leader.java

@@ -32,20 +32,20 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.jute.BinaryOutputArchive;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.server.FinalRequestProcessor;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
 import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.ZxidUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class has the control logic for the Leader.
@@ -464,11 +464,11 @@ public class Leader {
      */
     synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Ack zxid: 0x" + Long.toHexString(zxid));
+            LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
             for (Proposal p : outstandingProposals.values()) {
                 long packetZxid = p.packet.getZxid();
-                LOG.trace("outstanding proposal: 0x"
-                        + Long.toHexString(packetZxid));
+                LOG.trace("outstanding proposal: 0x{}",
+                        Long.toHexString(packetZxid));
             }
             LOG.trace("outstanding proposals all");
         }
@@ -481,31 +481,29 @@ public class Leader {
         }
         if (lastCommitted >= zxid) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("proposal has already been committed, pzxid:"
-                        + lastCommitted
-                        + " zxid: 0x" + Long.toHexString(zxid));
+                LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}",
+                        Long.toHexString(lastCommitted), Long.toHexString(zxid));
             }
             // The proposal has already been committed
             return;
         }
         Proposal p = outstandingProposals.get(zxid);
         if (p == null) {
-            LOG.warn("Trying to commit future proposal: zxid 0x"
-                    + Long.toHexString(zxid) + " from " + followerAddr);
+            LOG.warn("Trying to commit future proposal: zxid 0x{} from {}",
+                    Long.toHexString(zxid), followerAddr);
             return;
         }
 
         p.ackSet.add(sid);
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Count for zxid: 0x" + Long.toHexString(zxid)
-                    + " is " + p.ackSet.size());
+            LOG.debug("Count for zxid: 0x{} is {}",
+                    Long.toHexString(zxid), p.ackSet.size());
         }
         if (self.getQuorumVerifier().containsQuorum(p.ackSet)){
             if (zxid != lastCommitted+1) {
-                LOG.warn("Commiting zxid 0x" + Long.toHexString(zxid)
-                        + " from " + followerAddr + " not first!");
-                LOG.warn("First is "
-                        + (lastCommitted+1));
+                LOG.warn("Commiting zxid 0x{} from {} not first!",
+                        Long.toHexString(zxid), followerAddr);
+                LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
             }
             outstandingProposals.remove(zxid);
             if (p.request != null) {
@@ -514,7 +512,7 @@ public class Leader {
             // We don't commit the new leader proposal
             if ((zxid & 0xffffffffL) != 0) {
                 if (p.request == null) {
-                    LOG.warn("Going to commmit null: " + p);
+                    LOG.warn("Going to commmit null request for proposal: {}", p);
                 }
                 commit(zxid);
                 inform(p);
@@ -527,9 +525,8 @@ public class Leader {
                 return;
             } else {
                 lastCommitted = zxid;
-                if(LOG.isInfoEnabled()){
-                    LOG.info("Have quorum of supporters; starting up and setting last processed zxid: " + zk.getZxid());
-                }
+                LOG.info("Have quorum of supporters; starting up and setting last processed zxid: 0x{}",
+                        Long.toHexString(zk.getZxid()));
                 zk.startup();
                 zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
             }

+ 21 - 10
src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java

@@ -35,8 +35,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.Record;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.KeeperException.SessionExpiredException;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.server.ByteBufferInputStream;
@@ -47,6 +45,8 @@ import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.server.util.ZxidUtils;
 import org.apache.zookeeper.txn.TxnHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * There will be an instance of this class created by the Leader for each
@@ -261,9 +261,9 @@ public class LearnerHandler extends Thread {
             	this.sid = leader.followerCounter.getAndDecrement();
             }
 
-            LOG.info("Follower sid: " + this.sid + " : info : "
-                    + leader.self.quorumPeers.get(this.sid));
-
+            LOG.info("Follower sid: " + sid + " : info : "
+                    + leader.self.quorumPeers.get(sid));
+                        
             if (qp.getType() == Leader.OBSERVERINFO) {
                   learnerType = LearnerType.OBSERVER;
             }
@@ -316,16 +316,18 @@ public class LearnerHandler extends Thread {
                 rl.lock();
                 final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
                 final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
-                LOG.info("Synchronizing with Follower sid: " + this.sid
-                        +" maxCommittedLog ="+Long.toHexString(maxCommittedLog)
-                        +" minCommittedLog = "+Long.toHexString(minCommittedLog)
-                        +" peerLastZxid = "+Long.toHexString(peerLastZxid));
+                LOG.info("Synchronizing with Follower sid: " + sid
+                        +" maxCommittedLog=0x"+Long.toHexString(maxCommittedLog)
+                        +" minCommittedLog=0x"+Long.toHexString(minCommittedLog)
+                        +" peerLastZxid=0x"+Long.toHexString(peerLastZxid));
 
                 List<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
 
                 if (proposals.size() != 0) {
+                    LOG.debug("proposal size is {}", proposals.size());
                     if ((maxCommittedLog >= peerLastZxid)
                             && (minCommittedLog <= peerLastZxid)) {
+                        LOG.debug("Sending proposals to follower");
 
                         // as we look through proposals, this variable keeps track of previous
                         // proposal Id.
@@ -369,16 +371,25 @@ public class LearnerHandler extends Thread {
                             }
                         }
                     } else if (peerLastZxid > maxCommittedLog) {
+                        LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}",
+                                Long.toHexString(maxCommittedLog),
+                                Long.toHexString(updates));
+
                         packetToSend = Leader.TRUNC;
                         zxidToSend = maxCommittedLog;
                         updates = zxidToSend;
+                    } else {
+                        LOG.warn("Unhandled proposal scenario");
                     }
                 } else {
                     // just let the state transfer happen
-                }
+                    LOG.debug("proposals is empty");
+                }               
 
                 leaderLastZxid = leader.startForwarding(this, updates);
                 if (peerLastZxid == leaderLastZxid) {
+                    LOG.debug("Leader and follower are in sync, sending empty diff. zxid=0x{}",
+                            Long.toHexString(leaderLastZxid));
                     // We are in sync so we'll do an empty diff
                     packetToSend = Leader.DIFF;
                     zxidToSend = leaderLastZxid;

+ 18 - 14
src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java

@@ -36,8 +36,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.jmx.ZKMBeanInfo;
 import org.apache.zookeeper.server.ServerCnxnFactory;
@@ -47,6 +45,8 @@ import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.ZxidUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class manages the quorum protocol. There are three states this server
@@ -422,12 +422,14 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
             try {
                 currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
             } catch(FileNotFoundException e) {
-                // pick a reasonable epoch number
-                // this should only happen once when moving to a
-                // new code version
-                LOG.info(CURRENT_EPOCH_FILENAME + " not found! Creating with a reasonable default. This should only happen when you are upgrading your installation");
-                currentEpoch = epochOfZxid;
-                writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
+            	// pick a reasonable epoch number
+            	// this should only happen once when moving to a
+            	// new code version
+            	currentEpoch = epochOfZxid;
+            	LOG.info(CURRENT_EPOCH_FILENAME
+            	        + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
+            	        currentEpoch);
+            	writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
             }
             if (epochOfZxid > currentEpoch) {
                 throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
@@ -435,12 +437,14 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
             try {
                 acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
             } catch(FileNotFoundException e) {
-                // pick a reasonable epoch number
-                // this should only happen once when moving to a
-                // new code version
-                LOG.info(ACCEPTED_EPOCH_FILENAME + " not found! Creating with a reasonable default. This should only happen when you are upgrading your installation");
-                acceptedEpoch = epochOfZxid;
-                writeLongToFile(CURRENT_EPOCH_FILENAME, acceptedEpoch);
+            	// pick a reasonable epoch number
+            	// this should only happen once when moving to a
+            	// new code version
+            	acceptedEpoch = epochOfZxid;
+            	LOG.info(ACCEPTED_EPOCH_FILENAME
+            	        + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
+            	        acceptedEpoch);
+            	writeLongToFile(CURRENT_EPOCH_FILENAME, acceptedEpoch);
             }
             if (acceptedEpoch < currentEpoch) {
                 throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + " is less than the accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch));