소스 검색

ZOOKEEPER-549. Refactor Followers and related classes into a Peer->Follower hierarchy in preparation for Observers (henry robinson via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@831371 13f79535-47bb-0310-9956-ffa450edef68
Mahadev Konar 15 년 전
부모
커밋
0957b8404e

+ 3 - 0
CHANGES.txt

@@ -115,6 +115,9 @@ IMPROVEMENTS:
   ZOOKEEPER-530. Memory corruption: Zookeeper c client IPv6 implementation
   does not honor struct sockaddr_in6 size (isabel drost via mahadev)
 
+  ZOOKEEPER-549. Refactor Followers and related classes into a Peer->Follower
+  hierarchy in preparation for Observers (henry robinson via mahadev)
+
 NEW FEATURES:
   ZOOKEEPER-539. generate eclipse project via ant target. (phunt via mahadev)
 

+ 63 - 331
src/java/main/org/apache/zookeeper/server/quorum/Follower.java

@@ -18,52 +18,28 @@
 
 package org.apache.zookeeper.server.quorum;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.net.ConnectException;
 import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.HashMap;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.jute.BinaryInputArchive;
-import org.apache.jute.BinaryOutputArchive;
-import org.apache.jute.InputArchive;
-import org.apache.jute.OutputArchive;
 import org.apache.jute.Record;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.server.ZooTrace;
-import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.util.SerializeUtils;
-import org.apache.zookeeper.txn.SetDataTxn;
 import org.apache.zookeeper.txn.TxnHeader;
 
 /**
  * This class has the control logic for the Follower.
  */
-public class Follower {
-    private static final Logger LOG = Logger.getLogger(Follower.class);
-
-    static final private boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true");
-    static {
-        LOG.info("TCP NoDelay set to: " + nodelay);
-    }
-
-    QuorumPeer self;
-
-    FollowerZooKeeperServer zk;
+public class Follower extends Learner{
 
+    private long lastQueued;
+    // This is the same object as this.zk, but we cache the downcast op
+    FollowerZooKeeperServer fzk = null;
+    
     Follower(QuorumPeer self,FollowerZooKeeperServer zk) {
         this.self = self;
         this.zk=zk;
+        this.fzk = zk;
     }
 
     @Override
@@ -76,235 +52,24 @@ public class Follower {
         return sb.toString();
     }
 
-    private InputArchive leaderIs;
-
-    private OutputArchive leaderOs;
-
-    private BufferedOutputStream bufferedOutput;
-
-    public Socket sock;
-
-    /**
-     * write a packet to the leader
-     *
-     * @param pp
-     *                the proposal packet to be sent to the leader
-     * @throws IOException
-     */
-    void writePacket(QuorumPacket pp, boolean flush) throws IOException {
-        synchronized (leaderOs) {
-            if (pp != null) {
-                leaderOs.writeRecord(pp, "packet");
-            }
-            if (flush) {
-                bufferedOutput.flush();
-            }
-        }
-    }
-
-    /**
-     * read a packet from the leader
-     *
-     * @param pp
-     *                the packet to be instantiated
-     * @throws IOException
-     */
-    void readPacket(QuorumPacket pp) throws IOException {
-        synchronized (leaderIs) {
-            leaderIs.readRecord(pp, "packet");
-        }
-        long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
-        if (pp.getType() == Leader.PING) {
-            traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
-        }
-        if (LOG.isTraceEnabled()) {
-            ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp);
-        }
-    }
-
     /**
      * the main method called by the follower to follow the leader
      *
      * @throws InterruptedException
      */
     void followLeader() throws InterruptedException {
-        zk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
-
-        try {
-            InetSocketAddress addr = null;
-            // Find the leader by id
-            Vote current = self.getCurrentVote();
-            for (QuorumServer s : self.quorumPeers.values()) {
-                if (s.id == current.id) {
-                    addr = s.addr;
-                    break;
-                }
-            }
-            if (addr == null) {
-                LOG.warn("Couldn't find the leader with id = "
-                        + current.id);
-            }
-            LOG.info("Following " + addr);
-            sock = new Socket();
+        fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
+        try {            
+            InetSocketAddress addr = findLeader();            
             try {
-                QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
-                sock.setSoTimeout(self.tickTime * self.initLimit);
-                for (int tries = 0; tries < 5; tries++) {
-                    try {
-                        //sock = new Socket();
-                        //sock.setSoTimeout(self.tickTime * self.initLimit);
-                        sock.connect(addr, self.tickTime * self.syncLimit);
-                        sock.setTcpNoDelay(nodelay);
-                        break;
-                    } catch (IOException e) {
-                        if (tries == 4) {
-                            LOG.error("Unexpected exception",e);
-                            throw e;
-                        } else {
-                            LOG.warn("Unexpected exception, tries="+tries,e);
-                            sock = new Socket();
-                            sock.setSoTimeout(self.tickTime * self.initLimit);
-                        }
-                    }
-                    Thread.sleep(1000);
-                }
-                leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
-                        sock.getInputStream()));
-                bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
-                leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
-                
-                /*
-                 * Send follower info, including last zxid and sid
-                 */
+                connectToLeader(addr);
+                long newLeaderZxid = registerWithLeader(Leader.FOLLOWERINFO);
+                syncWithLeader(newLeaderZxid);                
                 QuorumPacket qp = new QuorumPacket();
-                qp.setType(Leader.FOLLOWERINFO);
-                long sentLastZxid = self.getLastLoggedZxid();
-                qp.setZxid(sentLastZxid);
-                
-                /*
-                 * Add sid to payload
-                 */
-                ByteArrayOutputStream bsid = new ByteArrayOutputStream();
-                DataOutputStream dsid = new DataOutputStream(bsid);
-                dsid.writeLong(self.getId());
-                qp.setData(bsid.toByteArray());
-                
-                writePacket(qp, true);
-                readPacket(qp);
-                long newLeaderZxid = qp.getZxid();
-    
-                if (qp.getType() != Leader.NEWLEADER) {
-                    LOG.error("First packet should have been NEWLEADER");
-                    throw new IOException("First packet should have been NEWLEADER");
-                }
-                readPacket(qp);
-                synchronized (zk) {
-                    if (qp.getType() == Leader.DIFF) {
-                        LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid()));
-                        zk.loadData();
-                    }
-                    else if (qp.getType() == Leader.SNAP) {
-                        LOG.info("Getting a snapshot from leader");
-                        // The leader is going to dump the database
-                        zk.deserializeSnapshot(leaderIs);
-                        String signature = leaderIs.readString("signature");
-                        if (!signature.equals("BenWasHere")) {
-                            LOG.error("Missing signature. Got " + signature);
-                            throw new IOException("Missing signature");
-                        }
-                    } else if (qp.getType() == Leader.TRUNC) {
-                        //we need to truncate the log to the lastzxid of the leader
-                        LOG.warn("Truncating log to get in sync with the leader 0x"
-                                + Long.toHexString(qp.getZxid()));
-                        boolean truncated=zk.getLogWriter().truncateLog(qp.getZxid());
-                        if (!truncated) {
-                            // not able to truncate the log
-                            LOG.fatal("Not able to truncate the log "
-                                    + Long.toHexString(qp.getZxid()));
-                            System.exit(13);
-                        }
-    
-                        zk.loadData();
-                    }
-                    else {
-                        LOG.fatal("Got unexpected packet from leader "
-                                + qp.getType() + " exiting ... " );
-                        System.exit(13);
-
-                    }
-                    zk.dataTree.lastProcessedZxid = newLeaderZxid;
-                }
-                ack.setZxid(newLeaderZxid & ~0xffffffffL);
-                writePacket(ack, true);
-                sock.setSoTimeout(self.tickTime * self.syncLimit);
-                zk.startup();
-                
                 while (self.running) {
                     readPacket(qp);
-                    switch (qp.getType()) {
-                    case Leader.PING:
-                        // Send back the ping with our session data
-                        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                        DataOutputStream dos = new DataOutputStream(bos);
-                        HashMap<Long, Integer> touchTable = zk
-                                .getTouchSnapshot();
-                        for (Entry<Long, Integer> entry : touchTable.entrySet()) {
-                            dos.writeLong(entry.getKey());
-                            dos.writeInt(entry.getValue());
-                        }
-                        qp.setData(bos.toByteArray());
-                        writePacket(qp, true);
-                        break;
-                    case Leader.PROPOSAL:
-                        TxnHeader hdr = new TxnHeader();
-                        BinaryInputArchive ia = BinaryInputArchive
-                                .getArchive(new ByteArrayInputStream(qp.getData()));
-                        Record txn = SerializeUtils.deserializeTxn(ia, hdr);
-                        if (hdr.getZxid() != lastQueued + 1) {
-                            LOG.warn("Got zxid 0x"
-                                    + Long.toHexString(hdr.getZxid())
-                                    + " expected 0x"
-                                    + Long.toHexString(lastQueued + 1));
-                        }
-                        lastQueued = hdr.getZxid();
-                        zk.logRequest(hdr, txn);
-                        break;
-                    case Leader.COMMIT:
-                        zk.commit(qp.getZxid());
-                        break;
-                    case Leader.UPTODATE:
-                        zk.takeSnapshot();
-                        self.cnxnFactory.setZooKeeperServer(zk);
-                        break;
-                    case Leader.REVALIDATE:
-                        ByteArrayInputStream bis = new ByteArrayInputStream(qp
-                                .getData());
-                        DataInputStream dis = new DataInputStream(bis);
-                        long sessionId = dis.readLong();
-                        boolean valid = dis.readBoolean();
-                        synchronized (pendingRevalidations) {
-                            ServerCnxn cnxn = pendingRevalidations
-                                    .remove(sessionId);
-                            if (cnxn == null) {
-                                LOG.warn("Missing session 0x"
-                                        + Long.toHexString(sessionId)
-                                        + " for validation");
-                            } else {
-                                cnxn.finishSessionInit(valid);
-                            }
-                        }
-                        if (LOG.isTraceEnabled()) {
-                            ZooTrace.logTraceMessage(LOG,
-                                    ZooTrace.SESSION_TRACE_MASK,
-                                    "Session 0x" + Long.toHexString(sessionId)
-                                    + " is valid: " + valid);
-                        }
-                        break;
-                    case Leader.SYNC:
-                        zk.sync();
-                        break;
-                    }
-                }
+                    processPacket(qp);                   
+                }                              
             } catch (IOException e) {
                 LOG.warn("Exception when following the leader", e);
                 try {
@@ -314,95 +79,65 @@ public class Follower {
                 }
     
                 synchronized (pendingRevalidations) {
-                    // clear pending revalitions
+                    // clear pending revalidations
                     pendingRevalidations.clear();
                     pendingRevalidations.notifyAll();
                 }
             }
         } finally {
-            zk.unregisterJMX(this);
+            zk.unregisterJMX((Learner)this);
         }
     }
 
-    private long lastQueued;
-
-    final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations =
-        new ConcurrentHashMap<Long, ServerCnxn>();
-    
-    public int getPendingRevalidationsCount() {
-        return pendingRevalidations.size();
-    }
-
     /**
-     * validate a seesion for a client
-     *
-     * @param clientId
-     *                the client to be revailidated
-     * @param timeout
-     *                the timeout for which the session is valid
-     * @return
+     * Examine the packet received in qp and dispatch based on its contents.
+     * @param qp
      * @throws IOException
-     * @throws InterruptedException
      */
-    void validateSession(ServerCnxn cnxn, long clientId, int timeout)
-            throws IOException, InterruptedException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(baos);
-        dos.writeLong(clientId);
-        dos.writeInt(timeout);
-        dos.close();
-        QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos
-                .toByteArray(), null);
-        pendingRevalidations.put(clientId, cnxn);
-        if (LOG.isTraceEnabled()) {
-            ZooTrace.logTraceMessage(LOG,
-                                     ZooTrace.SESSION_TRACE_MASK,
-                                     "To validate session 0x"
-                                     + Long.toHexString(clientId));
+    protected void processPacket(QuorumPacket qp) throws IOException{
+        switch (qp.getType()) {
+        case Leader.PING:            
+            ping(qp);            
+            break;
+        case Leader.PROPOSAL:            
+            TxnHeader hdr = new TxnHeader();
+            BinaryInputArchive ia = BinaryInputArchive
+            .getArchive(new ByteArrayInputStream(qp.getData()));
+            Record txn = SerializeUtils.deserializeTxn(ia, hdr);
+            if (hdr.getZxid() != lastQueued + 1) {
+                LOG.warn("Got zxid 0x"
+                        + Long.toHexString(hdr.getZxid())
+                        + " expected 0x"
+                        + Long.toHexString(lastQueued + 1));
+            }
+            lastQueued = hdr.getZxid();
+            fzk.logRequest(hdr, txn);
+            break;
+        case Leader.COMMIT:
+            fzk.commit(qp.getZxid());
+            break;
+        case Leader.UPTODATE:
+            fzk.takeSnapshot();
+            self.cnxnFactory.setZooKeeperServer(fzk);
+            break;
+        case Leader.REVALIDATE:
+            revalidate(qp);
+            break;
+        case Leader.SYNC:
+            fzk.sync();
+            break;
         }
-        writePacket(qp, true);
     }
 
+
     /**
-     * send a request packet to the leader
-     *
-     * @param request
-     *                the request from the client
-     * @throws IOException
+     * The zxid of the last operation seen
+     * @return zxid
      */
-    void request(Request request) throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream oa = new DataOutputStream(baos);
-        oa.writeLong(request.sessionId);
-        oa.writeInt(request.cxid);
-        oa.writeInt(request.type);
-        if (request.request != null) {
-            request.request.rewind();
-            int len = request.request.remaining();
-            byte b[] = new byte[len];
-            request.request.get(b);
-            request.request.rewind();
-            oa.write(b);
-        }
-        oa.close();
-        QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos
-                .toByteArray(), request.authInfo);
-//        QuorumPacket qp;
-//        if(request.type == OpCode.sync){
-//            qp = new QuorumPacket(Leader.SYNC, -1, baos
-//                    .toByteArray(), request.authInfo);
-//        }
-//        else{
-//        qp = new QuorumPacket(Leader.REQUEST, -1, baos
-//                .toByteArray(), request.authInfo);
-//        }
-        writePacket(qp, true);
-    }
-
     public long getZxid() {
         try {
-            synchronized (zk) {
-                return zk.getZxid();
+            synchronized (fzk) {
+                return fzk.getZxid();
             }
         } catch (NullPointerException e) {
             LOG.warn("error getting zxid", e);
@@ -410,20 +145,17 @@ public class Follower {
         return -1;
     }
     
-    public long getLastQueued() {
+    /**
+     * The zxid of the last operation queued
+     * @return zxid
+     */
+    protected long getLastQueued() {
         return lastQueued;
     }
 
-    public void shutdown() {
-        // set the zookeeper server to null
-        self.cnxnFactory.setZooKeeperServer(null);
-        // clear all the connections
-        self.cnxnFactory.clear();
-        // shutdown previous zookeeper
-        if (zk != null) {
-            zk.shutdown();
-
-        }
+    @Override
+    public void shutdown() {    
         LOG.info("shutdown called", new Exception("shutdown Follower"));
+        super.shutdown();
     }
 }

+ 17 - 105
src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java

@@ -19,20 +19,15 @@
 package org.apache.zookeeper.server.quorum;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.jute.Record;
 import org.apache.log4j.Logger;
-import org.apache.zookeeper.jmx.MBeanRegistry;
-import org.apache.zookeeper.server.DataTreeBean;
 import org.apache.zookeeper.server.FinalRequestProcessor;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
-import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.SyncRequestProcessor;
-import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.txn.TxnHeader;
 
@@ -41,13 +36,11 @@ import org.apache.zookeeper.txn.TxnHeader;
  * processors: FollowerRequestProcessor -> CommitProcessor ->
  * FinalRequestProcessor
  * 
- * A SyncRequestProcessor is also spawn off to log proposals from the leader.
+ * A SyncRequestProcessor is also spawned off to log proposals from the leader.
  */
-public class FollowerZooKeeperServer extends ZooKeeperServer {
+public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
     private static final Logger LOG = Logger.getLogger(FollowerZooKeeperServer.class);
 
-    private QuorumPeer self;
-
     CommitProcessor commitProcessor;
 
     SyncRequestProcessor syncProcessor;
@@ -65,19 +58,13 @@ public class FollowerZooKeeperServer extends ZooKeeperServer {
     FollowerZooKeeperServer(FileTxnSnapLog logFactory,QuorumPeer self,
             DataTreeBuilder treeBuilder) throws IOException {
         super(logFactory, self.tickTime,treeBuilder);
-        this.self = self;
+        this.self = self;        
         this.pendingSyncs = new ConcurrentLinkedQueue<Request>();
     }
 
     public Follower getFollower(){
         return self.follower;
-    }
-    
-    @Override
-    protected void createSessionTracker() {
-        sessionTracker = new FollowerSessionTracker(this, sessionsWithTimeouts,
-                self.getId());
-    }
+    }      
 
     @Override
     protected void setupRequestProcessors() {
@@ -88,28 +75,10 @@ public class FollowerZooKeeperServer extends ZooKeeperServer {
         firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
         ((FollowerRequestProcessor) firstProcessor).start();
         syncProcessor = new SyncRequestProcessor(this,
-                new SendAckRequestProcessor(getFollower()));
+                new SendAckRequestProcessor((Learner)getFollower()));
         syncProcessor.start();
     }
-
-    @Override
-    protected void revalidateSession(ServerCnxn cnxn, long sessionId,
-            int sessionTimeout) throws IOException, InterruptedException {
-        getFollower().validateSession(cnxn, sessionId, sessionTimeout);
-    }
-
-    public HashMap<Long, Integer> getTouchSnapshot() {
-        if (sessionTracker != null) {
-            return ((FollowerSessionTracker) sessionTracker).snapshot();
-        }
-        return new HashMap<Long, Integer>();
-    }
-
-    @Override
-    public long getServerId() {
-        return self.getId();
-    }
-
+    
     LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<Request>();
 
     public void logRequest(TxnHeader hdr, Record txn) {
@@ -124,6 +93,12 @@ public class FollowerZooKeeperServer extends ZooKeeperServer {
         syncProcessor.processRequest(request);
     }
 
+    /**
+     * When a COMMIT message is received, eventually this method is called, 
+     * which matches up the zxid from the COMMIT with (hopefully) the head of
+     * the pendingTxns queue and hands it to the commitProcessor to commit.
+     * @param zxid - must correspond to the head of pendingTxns if it exists
+     */
     public void commit(long zxid) {
         if (pendingTxns.size() == 0) {
             LOG.warn("Committing " + Long.toHexString(zxid)
@@ -155,14 +130,6 @@ public class FollowerZooKeeperServer extends ZooKeeperServer {
     public int getGlobalOutstandingLimit() {
         return super.getGlobalOutstandingLimit() / (self.getQuorumSize() - 1);
     }
-
-    /**
-     * Do not do anything in the follower.
-     */
-    @Override
-    public void addCommittedProposal(Request r) {
-        //do nothing
-    }
     
     @Override
     public void shutdown() {
@@ -180,69 +147,14 @@ public class FollowerZooKeeperServer extends ZooKeeperServer {
                     e);
         }
     }
-
-
-    @Override
-    protected void registerJMX() {
-        // register with JMX
-        try {
-            jmxDataTreeBean = new DataTreeBean(dataTree);
-            MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
-        } catch (Exception e) {
-            LOG.warn("Failed to register with JMX", e);
-            jmxDataTreeBean = null;
-        }
-    }
-
-    public void registerJMX(FollowerBean followerBean,
-            LocalPeerBean localPeerBean)
-    {
-        // register with JMX
-        if (self.jmxLeaderElectionBean != null) {
-            try {
-                MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
-            } catch (Exception e) {
-                LOG.warn("Failed to register with JMX", e);
-            }
-            self.jmxLeaderElectionBean = null;
-        }
-
-        try {
-            jmxServerBean = followerBean;
-            MBeanRegistry.getInstance().register(followerBean, localPeerBean);
-        } catch (Exception e) {
-            LOG.warn("Failed to register with JMX", e);
-            jmxServerBean = null;
-        }
-    }
-
-    @Override
-    protected void unregisterJMX() {
-        // unregister from JMX
-        try {
-            if (jmxDataTreeBean != null) {
-                MBeanRegistry.getInstance().unregister(jmxDataTreeBean);
-            }
-        } catch (Exception e) {
-            LOG.warn("Failed to unregister with JMX", e);
-        }
-        jmxDataTreeBean = null;
-    }
-
-    protected void unregisterJMX(Follower follower) {
-        // unregister from JMX
-        try {
-            if (jmxServerBean != null) {
-                MBeanRegistry.getInstance().unregister(jmxServerBean);
-            }
-        } catch (Exception e) {
-            LOG.warn("Failed to unregister with JMX", e);
-        }
-        jmxServerBean = null;
-    }
     
     @Override
     public String getState() {
         return "follower";
     }
+
+    @Override
+    public Learner getLearner() {
+        return getFollower();
+    }
 }

+ 42 - 41
src/java/main/org/apache/zookeeper/server/quorum/Leader.java

@@ -72,48 +72,48 @@ public class Leader {
     QuorumPeer self;
 
     // the follower acceptor thread
-    FollowerCnxAcceptor cnxAcceptor;
+    LearnerCnxAcceptor cnxAcceptor;
     
     // list of all the followers
-    public HashSet<FollowerHandler> followers = new HashSet<FollowerHandler>();
+    public HashSet<LearnerHandler> learners = new HashSet<LearnerHandler>();
 
-    // list of followers that are ready to follow (i.e synced with the leader)
-    public HashSet<FollowerHandler> forwardingFollowers = new HashSet<FollowerHandler>();
-    
+    // list of followers that are ready to follow (i.e synced with the leader)    
+    public HashSet<LearnerHandler> forwardingFollowers = new HashSet<LearnerHandler>();
+        
     //Pending sync requests
-    public HashMap<Long,List<FollowerSyncRequest>> pendingSyncs = new HashMap<Long,List<FollowerSyncRequest>>();
+    public HashMap<Long,List<LearnerSyncRequest>> pendingSyncs = new HashMap<Long,List<LearnerSyncRequest>>();
     
     //Follower counter
     AtomicLong followerCounter = new AtomicLong(-1);
     /**
-     * Adds follower to the leader.
+     * Adds peer to the leader.
      * 
-     * @param follower
-     *                instance of follower handle
+     * @param learner
+     *                instance of learner handle
      */
-    void addFollowerHandler(FollowerHandler follower) {
-        synchronized (followers) {
-            followers.add(follower);
+    void addLearnerHandler(LearnerHandler learner) {
+        synchronized (learners) {
+            learners.add(learner);
         }
     }
 
     /**
-     * Remove the follower from the followers list
+     * Remove the learner from the learner list
      * 
-     * @param follower
+     * @param peer
      */
-    void removeFollowerHandler(FollowerHandler follower) {
+    void removeLearnerHandler(LearnerHandler peer) {
         synchronized (forwardingFollowers) {
-            forwardingFollowers.remove(follower);
-        }
-        synchronized (followers) {
-            followers.remove(follower);
+            forwardingFollowers.remove(peer);            
+        }        
+        synchronized (learners) {
+            learners.remove(peer);
         }
     }
 
-    boolean isFollowerSynced(FollowerHandler follower){
+    boolean isLearnerSynced(LearnerHandler peer){
         synchronized (forwardingFollowers) {
-            return forwardingFollowers.contains(follower);
+            return forwardingFollowers.contains(peer);
         }        
     }
     
@@ -209,7 +209,7 @@ public class Leader {
 
     Proposal newLeaderProposal = new Proposal();
     
-    class FollowerCnxAcceptor extends Thread{
+    class LearnerCnxAcceptor extends Thread{
         private volatile boolean stop = false;
         
         @Override
@@ -217,10 +217,10 @@ public class Leader {
             try {
                 while (!stop) {
                     try{
-                        Socket s = ss.accept();
+                        Socket s = ss.accept();                        
                         s.setSoTimeout(self.tickTime * self.syncLimit);
                         s.setTcpNoDelay(nodelay);
-                        FollowerHandler fh = new FollowerHandler(s, Leader.this);
+                        LearnerHandler fh = new LearnerHandler(s, Leader.this);
                         fh.start();
                     } catch (SocketException e) {
                         if (stop) {
@@ -267,9 +267,10 @@ public class Leader {
             synchronized(this){
                 lastProposed = zk.getZxid();
             }
-            
+                      
             newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
-                    null, null);
+                    null, null);            
+
             if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
                 LOG.info("NEWLEADER proposal has Zxid of "
                         + Long.toHexString(newLeaderProposal.packet.getZxid()));
@@ -278,7 +279,7 @@ public class Leader {
             
             // Start thread that waits for connection requests from 
             // new followers.
-            cnxAcceptor = new FollowerCnxAcceptor();
+            cnxAcceptor = new LearnerCnxAcceptor();
             cnxAcceptor.start();
             
             // We have to get at least a majority of servers in sync with
@@ -296,7 +297,7 @@ public class Leader {
                     
                     shutdown("Waiting for a quorum of followers, only synced with: " + ackToString);
                     HashSet<Long> followerSet = new HashSet<Long>();
-                    for(FollowerHandler f : followers)
+                    for(LearnerHandler f : learners)
                         followerSet.add(f.getSid());
                     
                     if (self.getQuorumVerifier().containsQuorum(followerSet)) {
@@ -333,8 +334,8 @@ public class Leader {
                 
                 // lock on the followers when we use it.
                 syncedSet.add(self.getId());
-                synchronized (followers) {
-                    for (FollowerHandler f : followers) {
+                synchronized (learners) {
+                    for (LearnerHandler f : learners) {
                         if (f.synced()) {
                             syncedCount++;
                             syncedSet.add(f.getSid());
@@ -388,10 +389,10 @@ public class Leader {
         } catch (IOException e) {
             LOG.warn("Ignoring unexpected exception during close",e);
         }
-        synchronized (followers) {
-            for (Iterator<FollowerHandler> it = followers.iterator(); it
+        synchronized (learners) {
+            for (Iterator<LearnerHandler> it = learners.iterator(); it
                     .hasNext();) {
-                FollowerHandler f = it.next();
+                LearnerHandler f = it.next();
                 it.remove();
                 f.shutdown();
             }
@@ -466,7 +467,7 @@ public class Leader {
                 commit(zxid);
                 zk.commitProcessor.commit(p.request);
                 if(pendingSyncs.containsKey(zxid)){
-                    for(FollowerSyncRequest r: pendingSyncs.remove(zxid)) {
+                    for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
                         sendSync(r);
                     }
                 }
@@ -538,12 +539,12 @@ public class Leader {
      */
     void sendPacket(QuorumPacket qp) {
         synchronized (forwardingFollowers) {
-            for (FollowerHandler f : forwardingFollowers) {
+            for (LearnerHandler f : forwardingFollowers) {                
                 f.queuePacket(qp);
             }
         }
     }
-
+    
     long lastCommitted = -1;
 
     /**
@@ -604,13 +605,13 @@ public class Leader {
      * @param r the request
      */
     
-    synchronized public void processSync(FollowerSyncRequest r){
+    synchronized public void processSync(LearnerSyncRequest r){
         if(outstandingProposals.isEmpty()){
             sendSync(r);
         } else {
-            List<FollowerSyncRequest> l = pendingSyncs.get(lastProposed);
+            List<LearnerSyncRequest> l = pendingSyncs.get(lastProposed);
             if (l == null) {
-                l = new ArrayList<FollowerSyncRequest>();
+                l = new ArrayList<LearnerSyncRequest>();
             }
             l.add(r);
             pendingSyncs.put(lastProposed, l);
@@ -624,7 +625,7 @@ public class Leader {
      * @param r
      */
             
-    public void sendSync(FollowerSyncRequest r){
+    public void sendSync(LearnerSyncRequest r){
         QuorumPacket qp = new QuorumPacket(Leader.SYNC, 0, null, null);
         r.fh.queuePacket(qp);
     }
@@ -636,7 +637,7 @@ public class Leader {
      * @param handler handler of the follower
      * @return last proposed zxid
      */
-    synchronized public long startForwarding(FollowerHandler handler,
+    synchronized public long startForwarding(LearnerHandler handler,
             long lastSeenZxid) {
         // Queue up any outstanding requests enabling the receipt of
         // new requests

+ 2 - 2
src/java/main/org/apache/zookeeper/server/quorum/LeaderBean.java

@@ -20,7 +20,7 @@ package org.apache.zookeeper.server.quorum;
 
 import org.apache.zookeeper.server.ZooKeeperServerBean;
 import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.quorum.FollowerHandler;
+import org.apache.zookeeper.server.quorum.LearnerHandler;
 import org.apache.zookeeper.server.quorum.Leader;
 
 /**
@@ -44,7 +44,7 @@ public class LeaderBean extends ZooKeeperServerBean implements LeaderMXBean {
     
     public String followerInfo() {
         StringBuffer sb = new StringBuffer();
-        for (FollowerHandler handler : leader.followers) {
+        for (LearnerHandler handler : leader.learners) {
             sb.append(handler.toString()).append("\n");
         }
         return sb.toString();

+ 375 - 0
src/java/main/org/apache/zookeeper/server/quorum/Learner.java

@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.quorum;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+
+/**
+ * This class is the superclass of two of the three main actors in a ZK
+ * ensemble: Followers and Observers. Both Followers and Observers share 
+ * a good deal of code which is moved into Peer to avoid duplication. 
+ */
+public class Learner {       
+    QuorumPeer self;
+    LearnerZooKeeperServer zk;
+    
+    protected BufferedOutputStream bufferedOutput;
+    
+    protected Socket sock;
+    
+    /**
+     * Socket getter
+     * @return 
+     */
+    public Socket getSocket() {
+        return sock;
+    }
+    
+    protected InputArchive leaderIs;
+    protected OutputArchive leaderOs;    
+    
+    protected static final Logger LOG = Logger.getLogger(Learner.class);
+
+    static final private boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true");
+    static {
+        LOG.info("TCP NoDelay set to: " + nodelay);
+    }   
+    
+    final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations =
+        new ConcurrentHashMap<Long, ServerCnxn>();
+    
+    public int getPendingRevalidationsCount() {
+        return pendingRevalidations.size();
+    }
+    
+    /**
+     * validate a session for a client
+     *
+     * @param clientId
+     *                the client to be revalidated
+     * @param timeout
+     *                the timeout for which the session is valid
+     * @return
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    void validateSession(ServerCnxn cnxn, long clientId, int timeout)
+            throws IOException, InterruptedException {
+        LOG.info("Revalidating client: " + clientId);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+        dos.writeLong(clientId);
+        dos.writeInt(timeout);
+        dos.close();
+        QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos
+                .toByteArray(), null);
+        pendingRevalidations.put(clientId, cnxn);
+        if (LOG.isTraceEnabled()) {
+            ZooTrace.logTraceMessage(LOG,
+                                     ZooTrace.SESSION_TRACE_MASK,
+                                     "To validate session 0x"
+                                     + Long.toHexString(clientId));
+        }
+        writePacket(qp, true);
+    }     
+    
+    /**
+     * write a packet to the leader
+     *
+     * @param pp
+     *                the proposal packet to be sent to the leader
+     * @throws IOException
+     */
+    void writePacket(QuorumPacket pp, boolean flush) throws IOException {
+        synchronized (leaderOs) {
+            if (pp != null) {
+                leaderOs.writeRecord(pp, "packet");
+            }
+            if (flush) {
+                bufferedOutput.flush();
+            }
+        }
+    }
+
+    /**
+     * read a packet from the leader
+     *
+     * @param pp
+     *                the packet to be instantiated
+     * @throws IOException
+     */
+    void readPacket(QuorumPacket pp) throws IOException {
+        synchronized (leaderIs) {
+            leaderIs.readRecord(pp, "packet");
+        }
+        long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
+        if (pp.getType() == Leader.PING) {
+            traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
+        }
+        if (LOG.isTraceEnabled()) {
+            ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp);
+        }
+    }
+    
+    /**
+     * send a request packet to the leader
+     *
+     * @param request
+     *                the request from the client
+     * @throws IOException
+     */
+    void request(Request request) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream oa = new DataOutputStream(baos);
+        oa.writeLong(request.sessionId);
+        oa.writeInt(request.cxid);
+        oa.writeInt(request.type);
+        if (request.request != null) {
+            request.request.rewind();
+            int len = request.request.remaining();
+            byte b[] = new byte[len];
+            request.request.get(b);
+            request.request.rewind();
+            oa.write(b);
+        }
+        oa.close();
+        QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos
+                .toByteArray(), request.authInfo);
+        writePacket(qp, true);
+    }
+    
+    /**
+     * Returns the address of the node we think is the leader.
+     */
+    protected InetSocketAddress findLeader() {
+        InetSocketAddress addr = null;
+        // Find the leader by id
+        Vote current = self.getCurrentVote();
+        for (QuorumServer s : self.getView().values()) {
+            if (s.id == current.id) {
+                addr = s.addr;
+                break;
+            }
+        }
+        if (addr == null) {
+            LOG.warn("Couldn't find the leader with id = "
+                    + current.id);
+        }
+        return addr;
+    }
+    
+    /**
+     * Establish a connection with the Leader found by findLeader. Retries
+     * 5 times before giving up. 
+     * @param addr - the address of the Leader to connect to.
+     * @throws IOException - if the socket connection fails on the 5th attempt
+     * @throws ConnectException
+     * @throws InterruptedException
+     */
+    protected void connectToLeader(InetSocketAddress addr) 
+    throws IOException, ConnectException, InterruptedException {
+        sock = new Socket();        
+        sock.setSoTimeout(self.tickTime * self.initLimit);
+        for (int tries = 0; tries < 5; tries++) {
+            try {
+                sock.connect(addr, self.tickTime * self.syncLimit);
+                sock.setTcpNoDelay(nodelay);
+                break;
+            } catch (IOException e) {
+                if (tries == 4) {
+                    LOG.error("Unexpected exception",e);
+                    throw e;
+                } else {
+                    LOG.warn("Unexpected exception, tries="+tries+
+                            ", connecting to " + addr,e);
+                    sock = new Socket();
+                    sock.setSoTimeout(self.tickTime * self.initLimit);
+                }
+            }
+            Thread.sleep(1000);
+        }
+        leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
+                sock.getInputStream()));
+        bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
+        leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
+    }   
+    
+    /**
+     * Once connected to the leader, perform the handshake protocol to
+     * establish a following / observing connection. 
+     * @param pktType
+     * @return the zxid the Leader sends for synchronization purposes.
+     * @throws IOException
+     */
+    protected long registerWithLeader(int pktType) throws IOException{
+        /*
+         * Send follower info, including last zxid and sid
+         */
+        QuorumPacket qp = new QuorumPacket();                
+        qp.setType(pktType);
+        long sentLastZxid = self.getLastLoggedZxid();
+        qp.setZxid(sentLastZxid);
+        
+        /*
+         * Add sid to payload
+         */
+        ByteArrayOutputStream bsid = new ByteArrayOutputStream();
+        DataOutputStream dsid = new DataOutputStream(bsid);
+        dsid.writeLong(self.getId());
+        qp.setData(bsid.toByteArray());
+        
+        writePacket(qp, true);
+        readPacket(qp);        
+
+        if (qp.getType() != Leader.NEWLEADER) {
+            LOG.error("First packet should have been NEWLEADER");
+            throw new IOException("First packet should have been NEWLEADER");
+        }
+        
+        return qp.getZxid();
+    } 
+    
+    /**
+     * Finally, synchronize our history with the Leader. 
+     * @param newLeaderZxid
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    protected void syncWithLeader(long newLeaderZxid) throws IOException, InterruptedException{
+        QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
+        QuorumPacket qp = new QuorumPacket();
+        
+        readPacket(qp);        
+        synchronized (zk) {
+            if (qp.getType() == Leader.DIFF) {
+                LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid()));                
+                zk.loadData();
+            }
+            else if (qp.getType() == Leader.SNAP) {
+                LOG.info("Getting a snapshot from leader");
+                // The leader is going to dump the database
+                zk.deserializeSnapshot(leaderIs);
+                String signature = leaderIs.readString("signature");
+                if (!signature.equals("BenWasHere")) {
+                    LOG.error("Missing signature. Got " + signature);
+                    throw new IOException("Missing signature");
+                }
+            } else if (qp.getType() == Leader.TRUNC) {
+                //we need to truncate the log to the lastzxid of the leader
+                LOG.warn("Truncating log to get in sync with the leader 0x"
+                        + Long.toHexString(qp.getZxid()));
+                boolean truncated=zk.getLogWriter().truncateLog(qp.getZxid());
+                if (!truncated) {
+                    // not able to truncate the log
+                    LOG.fatal("Not able to truncate the log "
+                            + Long.toHexString(qp.getZxid()));
+                    System.exit(13);
+                }
+
+                zk.loadData();
+            }
+            else {
+                LOG.fatal("Got unexpected packet from leader "
+                        + qp.getType() + " exiting ... " );
+                System.exit(13);
+
+            }
+            zk.dataTree.lastProcessedZxid = newLeaderZxid;
+        }
+        ack.setZxid(newLeaderZxid & ~0xffffffffL);
+        writePacket(ack, true);
+        sock.setSoTimeout(self.tickTime * self.syncLimit);
+        zk.startup();
+    }
+    
+    protected void revalidate(QuorumPacket qp) throws IOException {
+        ByteArrayInputStream bis = new ByteArrayInputStream(qp
+                .getData());
+        DataInputStream dis = new DataInputStream(bis);
+        long sessionId = dis.readLong();
+        boolean valid = dis.readBoolean();
+        synchronized (pendingRevalidations) {
+            ServerCnxn cnxn = pendingRevalidations
+                    .remove(sessionId);
+            if (cnxn == null) {
+                LOG.warn("Missing session 0x"
+                        + Long.toHexString(sessionId)
+                        + " for validation");
+            } else {
+                cnxn.finishSessionInit(valid);
+            }
+        }
+        if (LOG.isTraceEnabled()) {
+            ZooTrace.logTraceMessage(LOG,
+                    ZooTrace.SESSION_TRACE_MASK,
+                    "Session 0x" + Long.toHexString(sessionId)
+                    + " is valid: " + valid);
+        }
+    }
+        
+    protected void ping(QuorumPacket qp) throws IOException {
+        // Send back the ping with our session data
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos);
+        HashMap<Long, Integer> touchTable = zk
+                .getTouchSnapshot();
+        for (Entry<Long, Integer> entry : touchTable.entrySet()) {
+            dos.writeLong(entry.getKey());
+            dos.writeInt(entry.getValue());
+        }
+        qp.setData(bos.toByteArray());
+        writePacket(qp, true);
+    }
+    
+    
+    /**
+     * Shutdown the Peer
+     */
+    public void shutdown() {
+        // set the zookeeper server to null
+        self.cnxnFactory.setZooKeeperServer(null);
+        // clear all the connections
+        self.cnxnFactory.clear();
+        // shutdown previous zookeeper
+        if (zk != null) {
+            zk.shutdown();
+        }
+    }
+}

+ 30 - 24
src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java → src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java

@@ -43,20 +43,24 @@ import org.apache.zookeeper.txn.TxnHeader;
 
 /**
  * There will be an instance of this class created by the Leader for each
- * follower.All communication for a given Follower will be handled by this
+ * learner. All communication with a learner is handled by this
  * class.
  */
-public class FollowerHandler extends Thread {
-    private static final Logger LOG = Logger.getLogger(FollowerHandler.class);
+public class LearnerHandler extends Thread {
+    private static final Logger LOG = Logger.getLogger(LearnerHandler.class);
 
-    public final Socket sock;
+    protected final Socket sock;    
+
+    public Socket getSocket() {
+        return sock;
+    }
 
     final Leader leader;
 
     long tickOfLastAck;
     
     /**
-     * ZooKeeper server identifier of this follower
+     * ZooKeeper server identifier of this learner
      */
     protected long sid = 0;
     
@@ -65,7 +69,7 @@ public class FollowerHandler extends Thread {
     }                    
 
     /**
-     * The packets to be sent to the follower
+     * The packets to be sent to the learner
      */
     final LinkedBlockingQueue<QuorumPacket> queuedPackets =
         new LinkedBlockingQueue<QuorumPacket>();
@@ -76,17 +80,17 @@ public class FollowerHandler extends Thread {
 
     private BufferedOutputStream bufferedOutput;
 
-    FollowerHandler(Socket sock, Leader leader) throws IOException {
-        super("FollowerHandler-" + sock.getRemoteSocketAddress());
+    LearnerHandler(Socket sock, Leader leader) throws IOException {
+        super("LeanerHandler-" + sock.getRemoteSocketAddress());
         this.sock = sock;
         this.leader = leader;
-        leader.addFollowerHandler(this);
+        leader.addLearnerHandler(this);
     }
     
     @Override
     public String toString() {
         StringBuffer sb = new StringBuffer();
-        sb.append("FollowerHandler ").append(sock);
+        sb.append("LearnerHandler ").append(sock);
         sb.append(" tickOfLastAck:").append(tickOfLastAck());
         sb.append(" synced?:").append(synced());
         sb.append(" queuedPacketLength:").append(queuedPackets.size());
@@ -97,7 +101,7 @@ public class FollowerHandler extends Thread {
      * If this packet is queued, the sender thread will exit
      */
     final QuorumPacket proposalOfDeath = new QuorumPacket();
-
+   
     /**
      * This method will use the thread to send packets added to the
      * queuedPackets list
@@ -151,7 +155,7 @@ public class FollowerHandler extends Thread {
             break;
         case Leader.FOLLOWERINFO:
             type = "FOLLOWERINFO";
-            break;
+            break;    
         case Leader.NEWLEADER:
             type = "NEWLEADER";
             break;
@@ -199,13 +203,13 @@ public class FollowerHandler extends Thread {
     }
 
     /**
-     * This thread will receive packets from the follower and process them and
-     * also listen to new connections from new followers.
+     * This thread will receive packets from the peer and process them and
+     * also listen to new connections from new peers.
      */
     @Override
     public void run() {
         try {
-
+            
             ia = BinaryInputArchive.getArchive(new BufferedInputStream(sock
                     .getInputStream()));
             bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
@@ -213,7 +217,7 @@ public class FollowerHandler extends Thread {
 
             QuorumPacket qp = new QuorumPacket();
             ia.readRecord(qp, "packet");
-            if(qp.getType() != leader.FOLLOWERINFO){
+            if(qp.getType() != Leader.FOLLOWERINFO) {
             	LOG.error("First packet " + qp.toString()
                         + " is not FOLLOWERINFO!");
                 return;
@@ -224,6 +228,7 @@ public class FollowerHandler extends Thread {
             } else {
             	this.sid = leader.followerCounter.getAndDecrement();
             }
+
             LOG.info("Follower sid: " + this.sid + " : info : "
                     + leader.self.quorumPeers.get(this.sid));
             
@@ -261,7 +266,8 @@ public class FollowerHandler extends Thread {
                 else {
                     logTxns = false;
                 }            
-            }
+						}
+            
             //check if we decided to send a diff or we need to send a truncate
             // we avoid using epochs for truncating because epochs make things
             // complicated. Two epochs might have the last 32 bits as same.
@@ -270,7 +276,7 @@ public class FollowerHandler extends Thread {
             // things simple we just send sanpshot.
             if (logTxns && (peerLastZxid > leader.zk.maxCommittedLog)) {
                 // this is the only case that we are sure that
-                // we can ask the follower to truncate the log
+                // we can ask the peer to truncate the log
                 packetToSend = Leader.TRUNC;
                 zxidToSend = leader.zk.maxCommittedLog;
                 updates = zxidToSend;
@@ -303,14 +309,14 @@ public class FollowerHandler extends Thread {
                         + Long.toHexString(peerLastZxid) + " " 
                         + " zxid of leader is 0x"
                         + Long.toHexString(leaderLastZxid));
-                // Dump data to follower
+                // Dump data to peer
                 leader.zk.serializeSnapshot(oa);
                 oa.writeString("BenWasHere", "signature");
             }
             bufferedOutput.flush();
             
             // Mutation packets will be queued during the serialize,
-            // so we need to mark when the follower can actually start
+            // so we need to mark when the peer can actually start
             // using the data
             //
             queuedPackets
@@ -392,14 +398,14 @@ public class FollowerHandler extends Thread {
                     qp.setData(bos.toByteArray());
                     queuedPackets.add(qp);
                     break;
-                case Leader.REQUEST:
+                case Leader.REQUEST:                    
                     bb = ByteBuffer.wrap(qp.getData());
                     sessionId = bb.getLong();
                     cxid = bb.getInt();
                     type = bb.getInt();
                     bb = bb.slice();
                     if(type == OpCode.sync){
-                     	leader.zk.submitRequest(new FollowerSyncRequest(this, sessionId, cxid, type, bb,
+                     	leader.zk.submitRequest(new LearnerSyncRequest(this, sessionId, cxid, type, bb,
                                 qp.getAuthinfo()));
                     } else {
                         Request si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
@@ -439,7 +445,7 @@ public class FollowerHandler extends Thread {
         } catch (IOException e) {
             LOG.warn("Ignoring unexpected exception during socket close", e);
         }
-        leader.removeFollowerHandler(this);
+        leader.removeLearnerHandler(this);
     }
 
     public long tickOfLastAck() {
@@ -447,7 +453,7 @@ public class FollowerHandler extends Thread {
     }
 
     /**
-     * ping calls from the leader to the followers
+     * ping calls from the leader to the peers
      */
     public void ping() {
         long id;

+ 2 - 2
src/java/main/org/apache/zookeeper/server/quorum/FollowerSessionTracker.java → src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java

@@ -28,7 +28,7 @@ import org.apache.zookeeper.server.SessionTrackerImpl;
  * This is really just a shell of a SessionTracker that tracks session activity
  * to be forwarded to the Leader using a PING.
  */
-public class FollowerSessionTracker implements SessionTracker {
+public class LearnerSessionTracker implements SessionTracker {
     SessionExpirer expirer;
 
     HashMap<Long, Integer> touchTable = new HashMap<Long, Integer>();
@@ -41,7 +41,7 @@ public class FollowerSessionTracker implements SessionTracker {
     /**
      * 
      */
-    public FollowerSessionTracker(SessionExpirer expirer,
+    public LearnerSessionTracker(SessionExpirer expirer,
             ConcurrentHashMap<Long, Integer> sessionsWithTimeouts, long id) {
         this.expirer = expirer;
         this.sessionsWithTimeouts = sessionsWithTimeouts;

+ 3 - 3
src/java/main/org/apache/zookeeper/server/quorum/FollowerSyncRequest.java → src/java/main/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java

@@ -24,9 +24,9 @@ import java.util.List;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.server.Request;
 
-public class FollowerSyncRequest extends Request {
-	FollowerHandler fh;
-	public FollowerSyncRequest(FollowerHandler fh, long sessionId, int xid, int type,
+public class LearnerSyncRequest extends Request {
+	LearnerHandler fh;
+	public LearnerSyncRequest(LearnerHandler fh, long sessionId, int xid, int type,
 			ByteBuffer bb, List<Id> authInfo) {
 		super(null, sessionId, xid, type, bb, authInfo);
 		this.fh = fh;

+ 149 - 0
src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java

@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.quorum;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.server.DataTreeBean;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServerBean;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+
+/**
+ * Parent class for all ZooKeeperServers for Learners 
+ */
+public abstract class LearnerZooKeeperServer extends ZooKeeperServer {    
+    
+    protected QuorumPeer self;
+    
+    public LearnerZooKeeperServer(FileTxnSnapLog logFactory, int tickTime,
+            DataTreeBuilder treeBuilder) throws IOException {
+        super(logFactory,tickTime,treeBuilder);
+    }
+
+    /**
+     * Abstract method to return the learner associated with this server.
+     * Since the Learner may change under our feet (when QuorumPeer reassigns
+     * it) we can't simply take a reference here. Instead, we need the 
+     * subclasses to implement this.     
+     */
+    abstract public Learner getLearner();        
+    
+    /**
+     * Returns the current state of the session tracker. This is only currently
+     * used by a Learner to build a ping response packet.
+     * 
+     */
+    protected HashMap<Long, Integer> getTouchSnapshot() {
+        if (sessionTracker != null) {
+            return ((LearnerSessionTracker) sessionTracker).snapshot();
+        }
+        return new HashMap<Long, Integer>();
+    }
+    
+    /**
+     * Returns the id of the associated QuorumPeer, which will do for a unique
+     * id of this server. 
+     */
+    public long getServerId() {
+        return self.getId();
+    }    
+    
+    /**
+     * Learners don't make use of this method, only Leaders.
+     */
+    @Override
+    public void addCommittedProposal(Request request) {
+        // Don't do anything!
+    }
+    
+    @Override
+    protected void createSessionTracker() {
+        sessionTracker = new LearnerSessionTracker(this, sessionsWithTimeouts,
+                self.getId());
+    }
+    
+    @Override
+    protected void revalidateSession(ServerCnxn cnxn, long sessionId,
+            int sessionTimeout) throws IOException, InterruptedException {
+        getLearner().validateSession(cnxn, sessionId, sessionTimeout);
+    }
+    
+    @Override
+    protected void registerJMX() {
+        // register with JMX
+        try {
+            jmxDataTreeBean = new DataTreeBean(dataTree);
+            MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
+        } catch (Exception e) {
+            LOG.warn("Failed to register with JMX", e);
+            jmxDataTreeBean = null;
+        }
+    }
+
+    public void registerJMX(ZooKeeperServerBean serverBean,
+            LocalPeerBean localPeerBean)
+    {
+        // register with JMX
+        if (self.jmxLeaderElectionBean != null) {
+            try {
+                MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
+            } catch (Exception e) {
+                LOG.warn("Failed to register with JMX", e);
+            }
+            self.jmxLeaderElectionBean = null;
+        }
+
+        try {
+            jmxServerBean = serverBean;
+            MBeanRegistry.getInstance().register(serverBean, localPeerBean);
+        } catch (Exception e) {
+            LOG.warn("Failed to register with JMX", e);
+            jmxServerBean = null;
+        }
+    }
+
+    @Override
+    protected void unregisterJMX() {
+        // unregister from JMX
+        try {
+            if (jmxDataTreeBean != null) {
+                MBeanRegistry.getInstance().unregister(jmxDataTreeBean);
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to unregister with JMX", e);
+        }
+        jmxDataTreeBean = null;
+    }
+
+    protected void unregisterJMX(Learner peer) {
+        // unregister from JMX
+        try {
+            if (jmxServerBean != null) {
+                MBeanRegistry.getInstance().unregister(jmxServerBean);
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to unregister with JMX", e);
+        }
+        jmxServerBean = null;
+    }
+}

+ 2 - 2
src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java

@@ -62,8 +62,8 @@ public class ProposalRequestProcessor implements RequestProcessor {
          * call processRequest on the next processor.
          */
         
-        if(request instanceof FollowerSyncRequest){
-            zks.getLeader().processSync((FollowerSyncRequest)request);
+        if(request instanceof LearnerSyncRequest){
+            zks.getLeader().processSync((LearnerSyncRequest)request);
         } else {
                 nextProcessor.processRequest(request);
             if (request.hdr != null) {

+ 23 - 6
src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java

@@ -452,7 +452,7 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
         try {
             jmxQuorumBean = new QuorumBean(this);
             MBeanRegistry.getInstance().register(jmxQuorumBean, null);
-            for(QuorumServer s: quorumPeers.values()){
+            for(QuorumServer s: getView().values()){
                 ZKMBeanInfo p;
                 if (getId() == s.id) {
                     p = jmxLocalPeerBean = new LocalPeerBean(this);
@@ -548,16 +548,33 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
         }
     }
 
+    /**
+     * A 'view' is a node's current opinion of the membership of the
+     * ensemble. 
+     */
+    public Map<Long,QuorumPeer.QuorumServer> getView() {
+        return this.quorumPeers;
+    }
+    
+    /**
+     * Check if a node is in the current view. With static membership, the
+     * result of this check will never change; only when dynamic membership
+     * is introduced will this be more useful.
+     */
+    public boolean viewContains(Long sid) {
+        return this.quorumPeers.containsKey(sid);
+    }
+    
     public String[] getQuorumPeers() {
         List<String> l = new ArrayList<String>();
         synchronized (this) {
             if (leader != null) {
-                synchronized (leader.followers) {
-                    for (FollowerHandler fh : leader.followers) {
-                        if (fh.sock == null)
+                synchronized (leader.learners) {
+                    for (LearnerHandler fh : leader.learners) {
+                        if (fh.getSocket() == null)
                             continue;
-                        String s = fh.sock.getRemoteSocketAddress().toString();
-                        if (leader.isFollowerSynced(fh))
+                        String s = fh.getSocket().getRemoteSocketAddress().toString();
+                        if (leader.isLearnerSynced(fh))
                             s += "*";
                         l.add(s);
                     }

+ 9 - 9
src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java

@@ -30,10 +30,10 @@ import org.apache.zookeeper.server.RequestProcessor;
 public class SendAckRequestProcessor implements RequestProcessor, Flushable {
     private static final Logger LOG = Logger.getLogger(SendAckRequestProcessor.class);
     
-    Follower follower;
+    Learner learner;
 
-    SendAckRequestProcessor(Follower follower) {
-        this.follower = follower;
+    SendAckRequestProcessor(Learner peer) {
+        this.learner = peer;
     }
 
     public void processRequest(Request si) {
@@ -41,12 +41,12 @@ public class SendAckRequestProcessor implements RequestProcessor, Flushable {
             QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null,
                 null);
             try {
-                follower.writePacket(qp, false);
+                learner.writePacket(qp, false);
             } catch (IOException e) {
                 LOG.warn("Closing connection to leader, exception during packet send", e);
                 try {
-                    if (!follower.sock.isClosed()) {
-                        follower.sock.close();
+                    if (!learner.sock.isClosed()) {
+                        learner.sock.close();
                     }
                 } catch (IOException e1) {
                     // Nothing to do, we are shutting things down, so an exception here is irrelevant
@@ -58,12 +58,12 @@ public class SendAckRequestProcessor implements RequestProcessor, Flushable {
     
     public void flush() throws IOException {
         try {
-            follower.writePacket(null, true);
+            learner.writePacket(null, true);
         } catch(IOException e) {
             LOG.warn("Closing connection to leader, exception during packet send", e);
             try {
-                if (!follower.sock.isClosed()) {
-                    follower.sock.close();
+                if (!learner.sock.isClosed()) {
+                    learner.sock.close();
                 }
             } catch (IOException e1) {
                     // Nothing to do, we are shutting things down, so an exception here is irrelevant

+ 3 - 3
src/java/test/config/findbugsExcludeFile.xml

@@ -51,8 +51,8 @@
 
    <!-- Two unrecoverable errors while following the leader  -->
    <Match>
-     <Class name="org.apache.zookeeper.server.quorum.Follower" />
-       <Method name="followLeader" />
+     <Class name="org.apache.zookeeper.server.quorum.Learner" />
+       <Method name="syncWithLeader" />
        <Bug pattern="DM_EXIT" />
    </Match>
 
@@ -91,7 +91,7 @@
      <Bug code="IS"/>
   </Match>
   <Match>
-     <Class name="org.apache.zookeeper.server.quorum.FollowerSessionTracker"/>
+     <Class name="org.apache.zookeeper.server.quorum.LearnerSessionTracker"/>
        <Bug code="UrF"/>
   </Match>
   <Match>

+ 25 - 4
src/java/test/org/apache/zookeeper/test/QuorumTest.java

@@ -30,7 +30,7 @@ import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.server.quorum.FollowerHandler;
+import org.apache.zookeeper.server.quorum.LearnerHandler;
 import org.apache.zookeeper.server.quorum.Leader;
 import org.junit.Before;
 import org.junit.Test;
@@ -91,6 +91,27 @@ public class QuorumTest extends QuorumBase {
         ct.testClientWithWatcherObj();
     }
     
+    @Test
+    public void testGetView() {                
+        ct.assertEquals(5,qb.s1.getView().size());        
+        ct.assertEquals(5,qb.s2.getView().size());        
+        ct.assertEquals(5,qb.s3.getView().size());        
+        ct.assertEquals(5,qb.s4.getView().size());
+        ct.assertEquals(5,qb.s5.getView().size());
+    }
+    
+    @Test
+    public void testViewContains() {
+        // Test view contains self
+        ct.assertTrue(qb.s1.viewContains(qb.s1.getId()));
+        
+        // Test view contains other servers
+        ct.assertTrue(qb.s1.viewContains(qb.s2.getId()));
+        
+        // Test view does not contain non-existant servers
+        ct.assertFalse(qb.s1.viewContains(-1L));
+    }
+    
     volatile int counter = 0;
     volatile int errors = 0;
     @Test
@@ -117,9 +138,9 @@ public class QuorumTest extends QuorumBase {
                 }
             }, null);
         }
-        ArrayList<FollowerHandler> fhs = new ArrayList<FollowerHandler>(leader.forwardingFollowers);
-        for(FollowerHandler f: fhs) {
-            f.sock.shutdownInput();
+        ArrayList<LearnerHandler> fhs = new ArrayList<LearnerHandler>(leader.forwardingFollowers);
+        for(LearnerHandler f: fhs) {
+            f.getSocket().shutdownInput();
         }
         for(int i = 0; i < 5000; i++) {
             zk.setData("/blah/blah", new byte[0], -1, new AsyncCallback.StatCallback() {