Просмотр исходного кода

ZOOKEEPER-1221. Provide accessors for Request.{hdr|txn} (Thomas Koch via phunt)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1186967 13f79535-47bb-0310-9956-ffa450edef68
Patrick D. Hunt 13 лет назад
Родитель
Сommit
e04e40167f

+ 2 - 0
CHANGES.txt

@@ -60,6 +60,8 @@ IMPROVEMENTS:
 
   ZOOKEEPER-1193. Remove upgrade code (Thomas Koch via phunt)
 
+  ZOOKEEPER-1221. Provide accessors for Request.{hdr|txn} (Thomas Koch via phunt)
+
 Release 3.4.0 - 
 
 Non-backward compatible changes:

+ 16 - 16
src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java

@@ -106,17 +106,17 @@ public class FinalRequestProcessor implements RequestProcessor {
                     zks.outstandingChangesForPath.remove(cr.path);
                 }
             }
-            if (request.hdr != null) {
-                rc = zks.getZKDatabase().processTxn(request.hdr, request.txn);
+            if (request.getHdr() != null) {
+                rc = zks.getZKDatabase().processTxn(request.getHdr(), request.getTxn());
                 if (request.type == OpCode.createSession) {
-                    if (request.txn instanceof CreateSessionTxn) {
-                        CreateSessionTxn cst = (CreateSessionTxn) request.txn;
+                    if (request.getTxn() instanceof CreateSessionTxn) {
+                        CreateSessionTxn cst = (CreateSessionTxn) request.getTxn();
                         zks.sessionTracker.addSession(request.sessionId, cst
                                 .getTimeOut());
                     } else {
                         LOG.warn("*****>>>>> Got "
-                                + request.txn.getClass() + " "
-                                + request.txn.toString());
+                                + request.getTxn().getClass() + " "
+                                + request.getTxn().toString());
                     }
                 } else if (request.type == OpCode.closeSession) {
                     zks.sessionTracker.removeSession(request.sessionId);
@@ -128,7 +128,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             }
         }
 
-        if (request.hdr != null && request.hdr.getType() == OpCode.closeSession) {
+        if (request.getHdr() != null && request.getHdr().getType() == OpCode.closeSession) {
             ServerCnxnFactory scxn = zks.getServerCnxnFactory();
             // this might be possible since
             // we might just be playing diffs from the leader
@@ -153,9 +153,9 @@ public class FinalRequestProcessor implements RequestProcessor {
         Record rsp = null;
         boolean closeSession = false;
         try {
-            if (request.hdr != null && request.hdr.getType() == OpCode.error) {
+            if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {
                 throw KeeperException.create(KeeperException.Code.get((
-                        (ErrorTxn) request.txn).getErr()));
+                        (ErrorTxn) request.getTxn()).getErr()));
             }
 
             KeeperException ke = request.getException();
@@ -308,8 +308,8 @@ public class FinalRequestProcessor implements RequestProcessor {
                 request.request.rewind();
                 ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
                 long relativeZxid = setWatches.getRelativeZxid();
-                zks.getZKDatabase().setWatches(relativeZxid, 
-                        setWatches.getDataWatches(), 
+                zks.getZKDatabase().setWatches(relativeZxid,
+                        setWatches.getDataWatches(),
                         setWatches.getExistWatches(),
                         setWatches.getChildWatches(), cnxn);
                 break;
@@ -320,7 +320,7 @@ public class FinalRequestProcessor implements RequestProcessor {
                 ByteBufferInputStream.byteBuffer2Record(request.request,
                         getACLRequest);
                 Stat stat = new Stat();
-                List<ACL> acl = 
+                List<ACL> acl =
                     zks.getZKDatabase().getACL(getACLRequest.getPath(), stat);
                 rsp = new GetACLResponse(acl, stat);
                 break;
@@ -337,9 +337,9 @@ public class FinalRequestProcessor implements RequestProcessor {
                 Long aclG;
                 synchronized(n) {
                     aclG = n.acl;
-                    
+
                 }
-                PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG), 
+                PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG),
                         ZooDefs.Perms.READ,
                         request.authInfo);
                 List<String> children = zks.getZKDatabase().getChildren(
@@ -362,7 +362,7 @@ public class FinalRequestProcessor implements RequestProcessor {
                 synchronized(n) {
                     aclG = n.acl;
                 }
-                PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG), 
+                PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG),
                         ZooDefs.Perms.READ,
                         request.authInfo);
                 List<String> children = zks.getZKDatabase().getChildren(
@@ -377,7 +377,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             // down the connection otw ZOOKEEPER-710 might happen
             // ie client on slow follower starts to renew session, fails
             // before this completes, then tries the fast follower (leader)
-            // and is successful, however the initial renew is then 
+            // and is successful, however the initial renew is then
             // successfully fwd/processed by the leader and as a result
             // the client and leader disagree on where the client is most
             // recently attached (and therefore invalid SESSION MOVED generated)

+ 32 - 33
src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java

@@ -290,7 +290,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
      * @param record
      */
     protected void pRequest2Txn(int type, long zxid, Request request, Record record) throws KeeperException {
-        request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, zks.getTime(), type);
+        request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, zks.getTime(), type));
 
         switch (type) {
             case OpCode.create:
@@ -335,17 +335,17 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
                     throw new KeeperException.NoChildrenForEphemeralsException(path);
                 }
                 int newCversion = parentRecord.stat.getCversion()+1;
-                request.txn = new CreateTxn(path, createRequest.getData(), listACL,
-                        createMode.isEphemeral(), newCversion);
+                request.setTxn(new CreateTxn(path, createRequest.getData(), listACL, createMode.isEphemeral(),
+                        newCversion));
                 StatPersisted s = new StatPersisted();
                 if (createMode.isEphemeral()) {
                     s.setEphemeralOwner(request.sessionId);
                 }
-                parentRecord = parentRecord.duplicate(request.hdr.getZxid());
+                parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
                 parentRecord.childCount++;
                 parentRecord.stat.setCversion(newCversion);
                 addChangeRecord(parentRecord);
-                addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, s, 0, listACL));
+                addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL));
                 break;
             case OpCode.delete:
                 zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
@@ -364,11 +364,11 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
                 if (nodeRecord.childCount > 0) {
                     throw new KeeperException.NotEmptyException(path);
                 }
-                request.txn = new DeleteTxn(path);
-                parentRecord = parentRecord.duplicate(request.hdr.getZxid());
+                request.setTxn(new DeleteTxn(path));
+                parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
                 parentRecord.childCount--;
                 addChangeRecord(parentRecord);
-                addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, null, -1, null));
+                addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null));
                 break;
             case OpCode.setData:
                 zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
@@ -377,8 +377,8 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
                 nodeRecord = getRecordForPath(path);
                 checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo);
                 int newVersion = checkAndIncVersion(nodeRecord.stat.getVersion(), setDataRequest.getVersion(), path);
-                request.txn = new SetDataTxn(path, setDataRequest.getData(), newVersion);
-                nodeRecord = nodeRecord.duplicate(request.hdr.getZxid());
+                request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
+                nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
                 nodeRecord.stat.setVersion(newVersion);
                 addChangeRecord(nodeRecord);
                 break;
@@ -393,15 +393,15 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
                 nodeRecord = getRecordForPath(path);
                 checkACL(zks, nodeRecord.acl, ZooDefs.Perms.ADMIN, request.authInfo);
                 newVersion = checkAndIncVersion(nodeRecord.stat.getAversion(), setAclRequest.getVersion(), path);
-                request.txn = new SetACLTxn(path, listACL, newVersion);
-                nodeRecord = nodeRecord.duplicate(request.hdr.getZxid());
+                request.setTxn(new SetACLTxn(path, listACL, newVersion));
+                nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
                 nodeRecord.stat.setAversion(newVersion);
                 addChangeRecord(nodeRecord);
                 break;
             case OpCode.createSession:
                 request.request.rewind();
                 int to = request.request.getInt();
-                request.txn = new CreateSessionTxn(to);
+                request.setTxn(new CreateSessionTxn(to));
                 request.request.rewind();
                 zks.sessionTracker.addSession(request.sessionId, to);
                 zks.setOwner(request.sessionId, request.getOwner());
@@ -423,8 +423,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
                         }
                     }
                     for (String path2Delete : es) {
-                        addChangeRecord(new ChangeRecord(request.hdr.getZxid(),
-                                path2Delete, null, 0, null));
+                        addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path2Delete, null, 0, null));
                     }
                 }
                 LOG.info("Processed session termination for sessionid: 0x"
@@ -436,8 +435,8 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
                 path = checkVersionRequest.getPath();
                 nodeRecord = getRecordForPath(path);
                 checkACL(zks, nodeRecord.acl, ZooDefs.Perms.READ, request.authInfo);
-                request.txn = new CheckVersionTxn(path,
-                        checkAndIncVersion(nodeRecord.stat.getVersion(), checkVersionRequest.getVersion(), path));
+                request.setTxn(new CheckVersionTxn(path, checkAndIncVersion(nodeRecord.stat.getVersion(),
+                        checkVersionRequest.getVersion(), path)));
                 break;
         }
     }
@@ -459,8 +458,8 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
     protected void pRequest(Request request) {
         // LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
         // request.type + " id = 0x" + Long.toHexString(request.sessionId));
-        request.hdr = null;
-        request.txn = null;
+        request.setHdr(null);
+        request.setTxn(null);
 
         try {
             switch (request.type) {
@@ -509,8 +508,8 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
                      * would be confusing in the logfiles.
                      */
                     if (ke != null) {
-                        request.hdr.setType(OpCode.error);
-                        request.txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
+                        request.getHdr().setType(OpCode.error);
+                        request.setTxn(new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue()));
                     }
 
                     /* Prep the request and convert to a Txn */
@@ -521,8 +520,8 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
                             if (ke == null) {
                                 ke = e;
                             }
-                            request.hdr.setType(OpCode.error);
-                            request.txn = new ErrorTxn(e.code().intValue());
+                            request.getHdr().setType(OpCode.error);
+                            request.setTxn(new ErrorTxn(e.code().intValue()));
                             LOG.error(">>>> Got user-level KeeperException when processing "
                                     + request.toString()
                                     + " Error Path:" + e.getPath()
@@ -540,14 +539,14 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
                     //       not sure how else to get the txn stored into our list.
                     ByteArrayOutputStream baos = new ByteArrayOutputStream();
                     BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
-                    request.txn.serialize(boa, "request") ;
+                    request.getTxn().serialize(boa, "request") ;
                     ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
 
-                    txns.add(new Txn(request.hdr.getType(), bb.array()));
+                    txns.add(new Txn(request.getHdr().getType(), bb.array()));
                 }
 
-                request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, zks.getTime(), request.type);
-                request.txn = new MultiTxn(txns);
+                request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, zks.getTime(), request.type));
+                request.setTxn(new MultiTxn(txns));
 
                 break;
 
@@ -571,9 +570,9 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
                 break;
             }
         } catch (KeeperException e) {
-            if (request.hdr != null) {
-                request.hdr.setType(OpCode.error);
-                request.txn = new ErrorTxn(e.code().intValue());
+            if (request.getHdr() != null) {
+                request.getHdr().setType(OpCode.error);
+                request.setTxn(new ErrorTxn(e.code().intValue()));
             }
             LOG.info("Got user-level KeeperException when processing "
                     + request.toString()
@@ -597,9 +596,9 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
             }
 
             LOG.error("Dumping request buffer: 0x" + sb.toString());
-            if (request.hdr != null) {
-                request.hdr.setType(OpCode.error);
-                request.txn = new ErrorTxn(Code.MARSHALLINGERROR.intValue());
+            if (request.getHdr() != null) {
+                request.getHdr().setType(OpCode.error);
+                request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
             }
         }
         request.zxid = zks.getZxid();

+ 38 - 21
src/java/main/org/apache/zookeeper/server/Request.java

@@ -22,8 +22,6 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.jute.Record;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.data.Id;
@@ -35,18 +33,9 @@ import org.apache.zookeeper.txn.TxnHeader;
  * onto the request as it is processed.
  */
 public class Request {
-    public final static Request requestOfDeath = new Request(null, 0, 0, 0,
-            null, null);
+    public final static Request requestOfDeath = new Request(null, 0, 0, 0, null, null);
 
-    /**
-     * @param cnxn
-     * @param sessionId
-     * @param xid
-     * @param type
-     * @param bb
-     */
-    public Request(ServerCnxn cnxn, long sessionId, int xid, int type,
-            ByteBuffer bb, List<Id> authInfo) {
+    public Request(ServerCnxn cnxn, long sessionId, int xid, int type, ByteBuffer bb, List<Id> authInfo) {
         this.cnxn = cnxn;
         this.sessionId = sessionId;
         this.cxid = xid;
@@ -55,6 +44,18 @@ public class Request {
         this.authInfo = authInfo;
     }
 
+    public Request(long sessionId, int xid, int type, TxnHeader hdr, Record txn, long zxid) {
+        this.sessionId = sessionId;
+        this.cxid = xid;
+        this.type = type;
+        this.hdr = hdr;
+        this.txn = txn;
+        this.zxid = zxid;
+        this.request = null;
+        this.cnxn = null;
+        this.authInfo = null;
+    }
+
     public final long sessionId;
 
     public final int cxid;
@@ -65,31 +66,47 @@ public class Request {
 
     public final ServerCnxn cnxn;
 
-    public TxnHeader hdr;
+    private TxnHeader hdr;
 
-    public Record txn;
+    private Record txn;
 
     public long zxid = -1;
 
     public final List<Id> authInfo;
 
     public final long createTime = System.currentTimeMillis();
-    
+
     private Object owner;
-    
+
     private KeeperException e;
 
     public Object getOwner() {
         return owner;
     }
-    
+
     public void setOwner(Object owner) {
         this.owner = owner;
     }
 
+    public TxnHeader getHdr() {
+        return hdr;
+    }
+
+    public void setHdr(TxnHeader hdr) {
+        this.hdr = hdr;
+    }
+
+    public Record getTxn() {
+        return txn;
+    }
+
+    public void setTxn(Record txn) {
+        this.txn = txn;
+    }
+
     /**
      * is the packet type a valid packet in zookeeper
-     * 
+     *
      * @param type
      *                the type of the packet
      * @return true if a valid packet, false if not
@@ -143,7 +160,7 @@ public class Request {
             return false;
         }
     }
-    
+
     static String op2String(int op) {
         switch (op) {
         case OpCode.notification:
@@ -232,7 +249,7 @@ public class Request {
     public void setException(KeeperException e) {
         this.e = e;
     }
-	
+
     public KeeperException getException() {
         return e;
     }

+ 4 - 8
src/java/main/org/apache/zookeeper/server/ZKDatabase.java

@@ -203,11 +203,7 @@ public class ZKDatabase {
     public long loadDataBase() throws IOException {
         PlayBackListener listener=new PlayBackListener(){
             public void onTxnLoaded(TxnHeader hdr,Record txn){
-                Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(),
-                        null, null);
-                r.txn = txn;
-                r.hdr = hdr;
-                r.zxid = hdr.getZxid();
+                Request r = new Request(0, hdr.getCxid(),hdr.getType(), hdr, txn, hdr.getZxid());
                 addCommittedProposal(r);
             }
         };
@@ -239,9 +235,9 @@ public class ZKDatabase {
             ByteArrayOutputStream baos = new ByteArrayOutputStream();
             BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
             try {
-                request.hdr.serialize(boa, "hdr");
-                if (request.txn != null) {
-                    request.txn.serialize(boa, "txn");
+                request.getHdr().serialize(boa, "hdr");
+                if (request.getTxn() != null) {
+                    request.getTxn().serialize(boa, "txn");
                 }
                 baos.close();
             } catch (IOException e) {

+ 34 - 34
src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java

@@ -40,38 +40,38 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This is a helper class 
- * above the implementations 
- * of txnlog and snapshot 
+ * This is a helper class
+ * above the implementations
+ * of txnlog and snapshot
  * classes
  */
 public class FileTxnSnapLog {
-    //the direcotry containing the 
+    //the direcotry containing the
     //the transaction logs
-    File dataDir; 
-    //the directory containing the 
+    File dataDir;
+    //the directory containing the
     //the snapshot directory
     File snapDir;
     TxnLog txnLog;
     SnapShot snapLog;
     public final static int VERSION = 2;
     public final static String version = "version-";
-    
+
     private static final Logger LOG = LoggerFactory.getLogger(FileTxnSnapLog.class);
-    
+
     /**
      * This listener helps
      * the external apis calling
      * restore to gather information
-     * while the data is being 
+     * while the data is being
      * restored.
      */
     public interface PlayBackListener {
         void onTxnLoaded(TxnHeader hdr, Record rec);
     }
-    
+
     /**
-     * the constructor which takes the datadir and 
+     * the constructor which takes the datadir and
      * snapdir.
      * @param dataDir the trasaction directory
      * @param snapDir the snapshot directory
@@ -94,7 +94,7 @@ public class FileTxnSnapLog {
         txnLog = new FileTxnLog(this.dataDir);
         snapLog = new FileSnap(this.snapDir);
     }
-    
+
     /**
      * get the datadir used by this filetxn
      * snap log
@@ -103,28 +103,28 @@ public class FileTxnSnapLog {
     public File getDataDir() {
         return this.dataDir;
     }
-    
+
     /**
-     * get the snap dir used by this 
+     * get the snap dir used by this
      * filetxn snap log
      * @return the snap dir
      */
     public File getSnapDir() {
         return this.snapDir;
     }
-    
+
     /**
-     * this function restores the server 
-     * database after reading from the 
+     * this function restores the server
+     * database after reading from the
      * snapshots and transaction logs
      * @param dt the datatree to be restored
      * @param sessions the sessions to be restored
-     * @param listener the playback listener to run on the 
+     * @param listener the playback listener to run on the
      * database restoration
      * @return the highest zxid restored
      * @throws IOException
      */
-    public long restore(DataTree dt, Map<Long, Integer> sessions, 
+    public long restore(DataTree dt, Map<Long, Integer> sessions,
             PlayBackListener listener) throws IOException {
         snapLog.deserialize(dt, sessions);
         FileTxnLog txnLog = new FileTxnLog(dataDir);
@@ -132,11 +132,11 @@ public class FileTxnSnapLog {
         long highestZxid = dt.lastProcessedZxid;
         TxnHeader hdr;
         while (true) {
-            // iterator points to 
+            // iterator points to
             // the first valid txn when initialized
             hdr = itr.getHeader();
             if (hdr == null) {
-                //empty logs 
+                //empty logs
                 return dt.lastProcessedZxid;
             }
             if (hdr.getZxid() < highestZxid && highestZxid != 0) {
@@ -153,12 +153,12 @@ public class FileTxnSnapLog {
                      hdr.getType() + " error: " + e.getMessage());
             }
             listener.onTxnLoaded(hdr, itr.getTxn());
-            if (!itr.next()) 
+            if (!itr.next())
                 break;
         }
         return highestZxid;
     }
-    
+
     /**
      * process the transaction on the datatree
      * @param hdr the hdr of the transaction
@@ -231,7 +231,7 @@ public class FileTxnSnapLog {
                   " : error: " + rc.err);
         }
     }
-    
+
     /**
      * the last logged zxid on the transaction logs
      * @return the last logged zxid
@@ -256,7 +256,7 @@ public class FileTxnSnapLog {
         File snapshot=new File(
                 snapDir, Util.makeSnapshotName(lastZxid));
         snapLog.serialize(dataTree, sessionsWithTimeouts, snapshot);
-        
+
     }
 
     /**
@@ -270,11 +270,11 @@ public class FileTxnSnapLog {
         FileTxnLog txnLog = new FileTxnLog(dataDir);
         return txnLog.truncate(zxid);
     }
-    
+
     /**
      * the most recent snapshot in the snapshot
      * directory
-     * @return the file that contains the most 
+     * @return the file that contains the most
      * recent snapshot
      * @throws IOException
      */
@@ -282,7 +282,7 @@ public class FileTxnSnapLog {
         FileSnap snaplog = new FileSnap(snapDir);
         return snaplog.findMostRecentSnapshot();
     }
-    
+
     /**
      * the n most recent snapshots
      * @param n the number of recent snapshots
@@ -297,8 +297,8 @@ public class FileTxnSnapLog {
 
     /**
      * get the snapshot logs that are greater than
-     * the given zxid 
-     * @param zxid the zxid that contains logs greater than 
+     * the given zxid
+     * @param zxid the zxid that contains logs greater than
      * zxid
      * @return
      */
@@ -309,11 +309,11 @@ public class FileTxnSnapLog {
     /**
      * append the request to the transaction logs
      * @param si the request to be appended
-     * returns true iff something appended, otw false 
+     * returns true iff something appended, otw false
      * @throws IOException
      */
     public boolean append(Request si) throws IOException {
-        return txnLog.append(si.hdr, si.txn);
+        return txnLog.append(si.getHdr(), si.getTxn());
     }
 
     /**
@@ -326,12 +326,12 @@ public class FileTxnSnapLog {
 
     /**
      * roll the transaction logs
-     * @throws IOException 
+     * @throws IOException
      */
     public void rollLog() throws IOException {
         txnLog.rollLog();
     }
-    
+
     /**
      * close the transaction log files
      * @throws IOException

+ 11 - 19
src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java

@@ -47,7 +47,6 @@ public class CommitProcessor extends Thread implements RequestProcessor {
     LinkedList<Request> committedRequests = new LinkedList<Request>();
 
     RequestProcessor nextProcessor;
-    ArrayList<Request> toProcess = new ArrayList<Request>();
 
     /**
      * This flag indicates whether we need to wait for a response to come back from the
@@ -66,24 +65,21 @@ public class CommitProcessor extends Thread implements RequestProcessor {
 
     @Override
     public void run() {
+        ArrayList<Request> toProcess = new ArrayList<Request>();
         try {
             Request nextPending = null;            
             while (!finished) {
-                int len = toProcess.size();
-                for (int i = 0; i < len; i++) {
-                    nextProcessor.processRequest(toProcess.get(i));
+                for (Request request : toProcess) {
+                    nextProcessor.processRequest(request);
                 }
                 toProcess.clear();
                 synchronized (this) {
-                    if ((queuedRequests.size() == 0 || nextPending != null)
-                            && committedRequests.size() == 0) {
+                    if ((queuedRequests.isEmpty() || nextPending != null) && committedRequests.isEmpty()) {
                         wait();
                         continue;
                     }
-                    // First check and see if the commit came in for the pending
-                    // request
-                    if ((queuedRequests.size() == 0 || nextPending != null)
-                            && committedRequests.size() > 0) {
+                    // First check and see if the commit came in for the pending request
+                    if ((queuedRequests.isEmpty() || nextPending != null) && !committedRequests.isEmpty()) {
                         Request r = committedRequests.remove();
                         /*
                          * We match with nextPending so that we can move to the
@@ -96,28 +92,26 @@ public class CommitProcessor extends Thread implements RequestProcessor {
                                 && nextPending.cxid == r.cxid) {
                             // we want to send our version of the request.
                             // the pointer to the connection in the request
-                            nextPending.hdr = r.hdr;
-                            nextPending.txn = r.txn;
+                            nextPending.setHdr(r.getHdr());
+                            nextPending.setTxn(r.getTxn());
                             nextPending.zxid = r.zxid;
                             toProcess.add(nextPending);
                             nextPending = null;
                         } else {
-                            // this request came from someone else so just
-                            // send the commit packet
+                            // this request came from someone else so just send the commit packet
                             toProcess.add(r);
                         }
                     }
                 }
 
-                // We haven't matched the pending requests, so go back to
-                // waiting
+                // We haven't matched the pending requests, so go back to waiting
                 if (nextPending != null) {
                     continue;
                 }
 
                 synchronized (this) {
                     // Process the next requests in the queuedRequests
-                    while (nextPending == null && queuedRequests.size() > 0) {
+                    while (nextPending == null && !queuedRequests.isEmpty()) {
                         Request request = queuedRequests.remove();
                         switch (request.type) {
                         case OpCode.create:
@@ -166,7 +160,6 @@ public class CommitProcessor extends Thread implements RequestProcessor {
     }
 
     synchronized public void processRequest(Request request) {
-        // request.addRQRec(">commit");
         if (LOG.isDebugEnabled()) {
             LOG.debug("Processing request:: " + request);
         }
@@ -188,5 +181,4 @@ public class CommitProcessor extends Thread implements RequestProcessor {
             nextProcessor.shutdown();
         }
     }
-
 }

+ 10 - 14
src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java

@@ -37,7 +37,7 @@ import org.apache.zookeeper.txn.TxnHeader;
  * Just like the standard ZooKeeperServer. We just replace the request
  * processors: FollowerRequestProcessor -> CommitProcessor ->
  * FinalRequestProcessor
- * 
+ *
  * A SyncRequestProcessor is also spawned off to log proposals from the leader.
  */
 public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
@@ -52,7 +52,7 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
      * Pending sync requests
      */
     ConcurrentLinkedQueue<Request> pendingSyncs;
-    
+
     /**
      * @param port
      * @param dataDir
@@ -67,7 +67,7 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
 
     public Follower getFollower(){
         return self.follower;
-    }      
+    }
 
     @Override
     protected void setupRequestProcessors() {
@@ -85,11 +85,7 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
     LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<Request>();
 
     public void logRequest(TxnHeader hdr, Record txn) {
-        Request request = new Request(null, hdr.getClientId(), hdr.getCxid(),
-                hdr.getType(), null, null);
-        request.hdr = hdr;
-        request.txn = txn;
-        request.zxid = hdr.getZxid();
+        Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
         if ((request.zxid & 0xffffffffL) != 0) {
             pendingTxns.add(request);
         }
@@ -97,7 +93,7 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
     }
 
     /**
-     * When a COMMIT message is received, eventually this method is called, 
+     * When a COMMIT message is received, eventually this method is called,
      * which matches up the zxid from the COMMIT with (hopefully) the head of
      * the pendingTxns queue and hands it to the commitProcessor to commit.
      * @param zxid - must correspond to the head of pendingTxns if it exists
@@ -118,22 +114,22 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
         Request request = pendingTxns.remove();
         commitProcessor.commit(request);
     }
-    
+
     synchronized public void sync(){
         if(pendingSyncs.size() ==0){
             LOG.warn("Not expecting a sync.");
             return;
         }
-                
+
         Request r = pendingSyncs.remove();
 		commitProcessor.commit(r);
     }
-             
+
     @Override
     public int getGlobalOutstandingLimit() {
         return super.getGlobalOutstandingLimit() / (self.getQuorumSize() - 1);
     }
-    
+
     @Override
     public void shutdown() {
         LOG.info("Shutting down");
@@ -151,7 +147,7 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
                     e);
         }
     }
-    
+
     @Override
     public String getState() {
         return "follower";

+ 90 - 93
src/java/main/org/apache/zookeeper/server/quorum/Leader.java

@@ -51,7 +51,7 @@ import org.apache.zookeeper.server.util.ZxidUtils;
  */
 public class Leader {
     private static final Logger LOG = LoggerFactory.getLogger(Leader.class);
-    
+
     static final private boolean nodelay = System.getProperty("leader.nodelay", "true").equals("true");
     static {
         LOG.info("TCP NoDelay set to: " + nodelay);
@@ -76,27 +76,27 @@ public class Leader {
 
     // the follower acceptor thread
     LearnerCnxAcceptor cnxAcceptor;
-    
+
     // list of all the followers
     public final HashSet<LearnerHandler> learners =
         new HashSet<LearnerHandler>();
 
-    // list of followers that are ready to follow (i.e synced with the leader)    
+    // list of followers that are ready to follow (i.e synced with the leader)
     public final HashSet<LearnerHandler> forwardingFollowers =
         new HashSet<LearnerHandler>();
-    
+
     protected final HashSet<LearnerHandler> observingLearners =
         new HashSet<LearnerHandler>();
-        
+
     //Pending sync requests
     public final HashMap<Long,List<LearnerSyncRequest>> pendingSyncs =
         new HashMap<Long,List<LearnerSyncRequest>>();
-    
+
     //Follower counter
     final AtomicLong followerCounter = new AtomicLong(-1);
     /**
      * Adds peer to the leader.
-     * 
+     *
      * @param learner
      *                instance of learner handle
      */
@@ -108,13 +108,13 @@ public class Leader {
 
     /**
      * Remove the learner from the learner list
-     * 
+     *
      * @param peer
      */
     void removeLearnerHandler(LearnerHandler peer) {
         synchronized (forwardingFollowers) {
-            forwardingFollowers.remove(peer);            
-        }        
+            forwardingFollowers.remove(peer);
+        }
         synchronized (learners) {
             learners.remove(peer);
         }
@@ -123,9 +123,9 @@ public class Leader {
     boolean isLearnerSynced(LearnerHandler peer){
         synchronized (forwardingFollowers) {
             return forwardingFollowers.contains(peer);
-        }        
+        }
     }
-    
+
     ServerSocket ss;
 
     Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {
@@ -144,22 +144,22 @@ public class Leader {
      * This message is for follower to expect diff
      */
     final static int DIFF = 13;
-    
+
     /**
-     * This is for follower to truncate its logs 
+     * This is for follower to truncate its logs
      */
     final static int TRUNC = 14;
-    
+
     /**
      * This is for follower to download the snapshots
      */
     final static int SNAP = 15;
-    
+
     /**
      * This tells the leader that the connecting peer is actually an observer
      */
     final static int OBSERVERINFO = 16;
-    
+
     /**
      * This message type is sent by the leader to indicate it's zxid and if
      * needed, its database.
@@ -188,7 +188,7 @@ public class Leader {
      * This message is used by the follow to ack a proposed epoch.
      */
     public static final int ACKEPOCH = 18;
-    
+
     /**
      * This message type is sent to a leader to request and mutation operation.
      * The payload will consist of a request header followed by a request.
@@ -227,27 +227,27 @@ public class Leader {
      * between the leader and the follower.
      */
     final static int SYNC = 7;
-        
+
     /**
      * This message type informs observers of a committed proposal.
      */
     final static int INFORM = 8;
 
-	ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();
+    final ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();
+
+    private final ConcurrentLinkedQueue<Proposal> toBeApplied = new ConcurrentLinkedQueue<Proposal>();
 
-    ConcurrentLinkedQueue<Proposal> toBeApplied = new ConcurrentLinkedQueue<Proposal>();
+    private final Proposal newLeaderProposal = new Proposal();
 
-    Proposal newLeaderProposal = new Proposal();
-    
     class LearnerCnxAcceptor extends Thread{
         private volatile boolean stop = false;
-        
+
         @Override
         public void run() {
             try {
                 while (!stop) {
                     try{
-                        Socket s = ss.accept();                        
+                        Socket s = ss.accept();
                         s.setSoTimeout(self.tickTime * self.syncLimit);
                         s.setTcpNoDelay(nodelay);
                         LearnerHandler fh = new LearnerHandler(s, Leader.this);
@@ -270,21 +270,21 @@ public class Leader {
                 LOG.warn("Exception while accepting follower", e);
             }
         }
-        
+
         public void halt() {
             stop = true;
         }
     }
 
     StateSummary leaderStateSummary;
-    
+
     long epoch = -1;
     boolean waitingForNewEpoch = true;
     volatile boolean readyToStart = false;
-    
+
     /**
      * This method is main function that is called to lead
-     * 
+     *
      * @throws IOException
      * @throws InterruptedException
      */
@@ -300,26 +300,26 @@ public class Leader {
         try {
             self.tick = 0;
             zk.loadData();
-            
+
             leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
 
-            // Start thread that waits for connection requests from 
+            // Start thread that waits for connection requests from
             // new followers.
             cnxAcceptor = new LearnerCnxAcceptor();
             cnxAcceptor.start();
-            
+
             readyToStart = true;
             long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
             self.setAcceptedEpoch(epoch);
-            
+
             zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
-            
+
             /*
             synchronized(this){
                 lastProposed = zk.getZxid();
             }
             */
-            
+
             newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
                     null, null);
 
@@ -330,7 +330,7 @@ public class Leader {
             }
             outstandingProposals.put(newLeaderProposal.packet.getZxid(), newLeaderProposal);
             newLeaderProposal.ackSet.add(self.getId());
-            
+
             waitForEpochAck(self.getId(), leaderStateSummary);
             self.setCurrentEpoch(epoch);
 
@@ -345,12 +345,12 @@ public class Leader {
                     StringBuilder ackToString = new StringBuilder();
                     for(Long id : newLeaderProposal.ackSet)
                         ackToString.append(id + ": ");
-                    
+
                     shutdown("Waiting for a quorum of followers, only synced with: " + ackToString);
                     HashSet<Long> followerSet = new HashSet<Long>();
                     for(LearnerHandler f : learners)
                         followerSet.add(f.getSid());
-                    
+
                     if (self.getQuorumVerifier().containsQuorum(followerSet)) {
                     //if (followers.size() >= self.quorumPeers.size() / 2) {
                         LOG.warn("Enough followers present. "+
@@ -361,7 +361,7 @@ public class Leader {
                 Thread.sleep(self.tickTime);
                 self.tick++;
             }
-            
+
             if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
                 self.cnxnFactory.setZooKeeperServer(zk);
             }
@@ -375,7 +375,7 @@ public class Leader {
             // We ping twice a tick, so we only update the tick every other
             // iteration
             boolean tickSkip = true;
-    
+
             while (true) {
                 Thread.sleep(self.tickTime / 2);
                 if (!tickSkip) {
@@ -383,7 +383,7 @@ public class Leader {
                 }
                 int syncedCount = 0;
                 HashSet<Long> syncedSet = new HashSet<Long>();
-                
+
                 // lock on the followers when we use it.
                 syncedSet.add(self.getId());
                 synchronized (learners) {
@@ -404,7 +404,7 @@ public class Leader {
                     // make sure the order is the same!
                     // the leader goes to looking
                     return;
-              } 
+              }
               tickSkip = !tickSkip;
             }
         } finally {
@@ -423,14 +423,14 @@ public class Leader {
         if (isShutdown) {
             return;
         }
-        
+
         LOG.info("Shutdown called",
                 new Exception("shutdown Leader! reason: " + reason));
 
         if (cnxAcceptor != null) {
             cnxAcceptor.halt();
         }
-        
+
         // NIO should not accept conenctions
         self.cnxnFactory.setZooKeeperServer(null);
         try {
@@ -458,7 +458,7 @@ public class Leader {
     /**
      * Keep a count of acks that are received by the leader for a particular
      * proposal
-     * 
+     *
      * @param zxid
      *                the zxid of the proposal sent out
      * @param followerAddr
@@ -473,7 +473,7 @@ public class Leader {
             }
             LOG.trace("outstanding proposals all");
         }
-        
+
         if (outstandingProposals.size() == 0) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("outstanding is 0");
@@ -495,13 +495,13 @@ public class Leader {
                     + Long.toHexString(zxid) + " from " + followerAddr);
             return;
         }
-        
+
         p.ackSet.add(sid);
         if (LOG.isDebugEnabled()) {
             LOG.debug("Count for zxid: 0x" + Long.toHexString(zxid)
                     + " is " + p.ackSet.size());
         }
-        if (self.getQuorumVerifier().containsQuorum(p.ackSet)){             
+        if (self.getQuorumVerifier().containsQuorum(p.ackSet)){
             if (zxid != lastCommitted+1) {
                 LOG.warn("Commiting zxid 0x" + Long.toHexString(zxid)
                         + " from " + followerAddr + " not first!");
@@ -538,21 +538,20 @@ public class Leader {
     }
 
     static class ToBeAppliedRequestProcessor implements RequestProcessor {
-        private RequestProcessor next;
+        private final RequestProcessor next;
 
-        private ConcurrentLinkedQueue<Proposal> toBeApplied;
+        private final Leader leader;
 
         /**
          * This request processor simply maintains the toBeApplied list. For
          * this to work next must be a FinalRequestProcessor and
          * FinalRequestProcessor.processRequest MUST process the request
          * synchronously!
-         * 
+         *
          * @param next
          *                a reference to the FinalRequestProcessor
          */
-        ToBeAppliedRequestProcessor(RequestProcessor next,
-                ConcurrentLinkedQueue<Proposal> toBeApplied) {
+        ToBeAppliedRequestProcessor(RequestProcessor next, Leader leader) {
             if (!(next instanceof FinalRequestProcessor)) {
                 throw new RuntimeException(ToBeAppliedRequestProcessor.class
                         .getName()
@@ -561,28 +560,26 @@ public class Leader {
                         + " not "
                         + next.getClass().getName());
             }
-            this.toBeApplied = toBeApplied;
+            this.leader = leader;
             this.next = next;
         }
 
         /*
          * (non-Javadoc)
-         * 
+         *
          * @see org.apache.zookeeper.server.RequestProcessor#processRequest(org.apache.zookeeper.server.Request)
          */
         public void processRequest(Request request) {
-            // request.addRQRec(">tobe");
             next.processRequest(request);
-            Proposal p = toBeApplied.peek();
-            if (p != null && p.request != null
-                    && p.request.zxid == request.zxid) {
-                toBeApplied.remove();
+            Proposal p = leader.toBeApplied.peek();
+            if (p != null && p.request != null && p.request.zxid == request.zxid) {
+                leader.toBeApplied.remove();
             }
         }
 
         /*
          * (non-Javadoc)
-         * 
+         *
          * @see org.apache.zookeeper.server.RequestProcessor#shutdown()
          */
         public void shutdown() {
@@ -593,24 +590,24 @@ public class Leader {
 
     /**
      * send a packet to all the followers ready to follow
-     * 
+     *
      * @param qp
      *                the packet to be sent
      */
     void sendPacket(QuorumPacket qp) {
         synchronized (forwardingFollowers) {
-            for (LearnerHandler f : forwardingFollowers) {                
+            for (LearnerHandler f : forwardingFollowers) {
                 f.queuePacket(qp);
             }
         }
     }
-    
+
     /**
-     * send a packet to all observers     
+     * send a packet to all observers
      */
-    void sendObserverPacket(QuorumPacket qp) {        
+    void sendObserverPacket(QuorumPacket qp) {
         synchronized(observingLearners) {
-            for (LearnerHandler f : observingLearners) {                
+            for (LearnerHandler f : observingLearners) {
                 f.queuePacket(qp);
             }
         }
@@ -620,7 +617,7 @@ public class Leader {
 
     /**
      * Create a commit packet and send it to all the members of the quorum
-     * 
+     *
      * @param zxid
      */
     public void commit(long zxid) {
@@ -630,33 +627,33 @@ public class Leader {
         QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
         sendPacket(qp);
     }
-    
+
     /**
      * Create an inform packet and send it to all observers.
      * @param zxid
      * @param proposal
      */
-    public void inform(Proposal proposal) {   
-        QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid, 
+    public void inform(Proposal proposal) {
+        QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid,
                                             proposal.packet.getData(), null);
         sendObserverPacket(qp);
     }
 
     long lastProposed;
 
-    
+
     /**
      * Returns the current epoch of the leader.
-     * 
+     *
      * @return
      */
     public long getEpoch(){
         return ZxidUtils.getEpochFromZxid(lastProposed);
     }
-    
+
     /**
      * create a proposal and send it out to all the members
-     * 
+     *
      * @param request
      * @return the proposal that is queued to send to all the members
      */
@@ -664,17 +661,17 @@ public class Leader {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
         try {
-            request.hdr.serialize(boa, "hdr");
-            if (request.txn != null) {
-                request.txn.serialize(boa, "txn");
+            request.getHdr().serialize(boa, "hdr");
+            if (request.getTxn() != null) {
+                request.getTxn().serialize(boa, "txn");
             }
             baos.close();
         } catch (IOException e) {
             LOG.warn("This really should be impossible", e);
         }
-        QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, 
+        QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
                 baos.toByteArray(), null);
-        
+
         Proposal p = new Proposal();
         p.packet = pp;
         p.request = request;
@@ -689,13 +686,13 @@ public class Leader {
         }
         return p;
     }
-            
+
     /**
      * Process sync requests
-     * 
+     *
      * @param r the request
      */
-    
+
     synchronized public void processSync(LearnerSyncRequest r){
         if(outstandingProposals.isEmpty()){
             sendSync(r);
@@ -708,23 +705,23 @@ public class Leader {
             pendingSyncs.put(lastProposed, l);
         }
     }
-        
+
     /**
      * Sends a sync message to the appropriate server
-     * 
+     *
      * @param f
      * @param r
      */
-            
+
     public void sendSync(LearnerSyncRequest r){
         QuorumPacket qp = new QuorumPacket(Leader.SYNC, 0, null, null);
         r.fh.queuePacket(qp);
     }
-                
+
     /**
      * lets the leader know that a follower is capable of following and is done
      * syncing
-     * 
+     *
      * @param handler handler of the follower
      * @return last proposed zxid
      */
@@ -762,11 +759,11 @@ public class Leader {
                 observingLearners.add(handler);
             }
         }
-                
+
         return lastProposed;
     }
 
-    private HashSet<Long> connectingFollowers = new HashSet<Long>();
+    private final HashSet<Long> connectingFollowers = new HashSet<Long>();
 	public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException {
 		synchronized(connectingFollowers) {
 			if (!waitingForNewEpoch) {
@@ -789,14 +786,14 @@ public class Leader {
                     cur = System.currentTimeMillis();
                 }
 				if (waitingForNewEpoch) {
-                    throw new InterruptedException("Timeout while waiting for epoch from quorum");        
+                    throw new InterruptedException("Timeout while waiting for epoch from quorum");
 				}
 			}
 			return epoch;
 		}
 	}
 
-	private HashSet<Long> electingFollowers = new HashSet<Long>();
+	private final HashSet<Long> electingFollowers = new HashSet<Long>();
 	private boolean electionFinished = false;
 	public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException {
 		synchronized(electingFollowers) {
@@ -813,7 +810,7 @@ public class Leader {
 			if (readyToStart && verifier.containsQuorum(electingFollowers)) {
 				electionFinished = true;
 				electingFollowers.notifyAll();
-            } else {                
+            } else {
                 long start = System.currentTimeMillis();
                 long cur = start;
                 long end = start + self.getInitLimit()*self.getTickTime();

+ 8 - 9
src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java

@@ -32,7 +32,7 @@ import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 
 /**
- * 
+ *
  * Just like the standard ZooKeeperServer. We just replace the request
  * processors: PrepRequestProcessor -> ProposalRequestProcessor ->
  * CommitProcessor -> Leader.ToBeAppliedRequestProcessor ->
@@ -55,12 +55,11 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
     public Leader getLeader(){
         return self.leader;
     }
-    
+
     @Override
     protected void setupRequestProcessors() {
         RequestProcessor finalProcessor = new FinalRequestProcessor(this);
-        RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
-                finalProcessor, getLeader().toBeApplied);
+        RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
         commitProcessor = new CommitProcessor(toBeAppliedProcessor,
                 Long.toString(getServerId()), false);
         commitProcessor.start();
@@ -75,7 +74,7 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
     public int getGlobalOutstandingLimit() {
         return super.getGlobalOutstandingLimit() / (self.getQuorumSize() - 1);
     }
-    
+
     @Override
     protected void createSessionTracker() {
         sessionTracker = new SessionTrackerImpl(this, getZKDatabase().getSessionWithTimeOuts(),
@@ -146,7 +145,7 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
         }
         jmxServerBean = null;
     }
-    
+
     @Override
     public String getState() {
         return "leader";
@@ -154,13 +153,13 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
 
     /**
      * Returns the id of the associated QuorumPeer, which will do for a unique
-     * id of this server. 
+     * id of this server.
      */
     @Override
     public long getServerId() {
         return self.getId();
-    }    
-    
+    }
+
     @Override
     protected void revalidateSession(ServerCnxn cnxn, long sessionId,
         int sessionTimeout) throws IOException {

+ 15 - 19
src/java/main/org/apache/zookeeper/server/quorum/Observer.java

@@ -32,11 +32,11 @@ import org.apache.zookeeper.txn.TxnHeader;
  * Instead, they are informed of successful proposals by the Leader. Observers
  * therefore naturally act as a relay point for publishing the proposal stream
  * and can relieve Followers of some of the connection load. Observers may
- * submit proposals, but do not vote in their acceptance. 
+ * submit proposals, but do not vote in their acceptance.
  *
- * See ZOOKEEPER-368 for a discussion of this feature. 
+ * See ZOOKEEPER-368 for a discussion of this feature.
  */
-public class Observer extends Learner{      
+public class Observer extends Learner{
 
     Observer(QuorumPeer self,ObserverZooKeeperServer observerZooKeeperServer) {
         this.self = self;
@@ -46,12 +46,12 @@ public class Observer extends Learner{
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
-        sb.append("Observer ").append(sock);        
+        sb.append("Observer ").append(sock);
         sb.append(" pendingRevalidationCount:")
             .append(pendingRevalidations.size());
         return sb.toString();
     }
-    
+
     /**
      * the main method called by the observer to observe the leader
      *
@@ -66,12 +66,12 @@ public class Observer extends Learner{
             try {
                 connectToLeader(addr);
                 long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
-                
+
                 syncWithLeader(newLeaderZxid);
                 QuorumPacket qp = new QuorumPacket();
                 while (self.isRunning()) {
                     readPacket(qp);
-                    processPacket(qp);                   
+                    processPacket(qp);
                 }
             } catch (IOException e) {
                 LOG.warn("Exception when observing the leader", e);
@@ -80,7 +80,7 @@ public class Observer extends Learner{
                 } catch (IOException e1) {
                     e1.printStackTrace();
                 }
-    
+
                 // clear pending revalidations
                 pendingRevalidations.clear();
             }
@@ -88,7 +88,7 @@ public class Observer extends Learner{
             zk.unregisterJMX(this);
         }
     }
-    
+
     /**
      * Controls the response of an observer to the receipt of a quorumpacket
      * @param qp
@@ -103,8 +103,8 @@ public class Observer extends Learner{
             LOG.warn("Ignoring proposal");
             break;
         case Leader.COMMIT:
-            LOG.warn("Ignoring commit");            
-            break;            
+            LOG.warn("Ignoring commit");
+            break;
         case Leader.UPTODATE:
             LOG.error("Received an UPTODATE message after Observer started");
             break;
@@ -114,16 +114,12 @@ public class Observer extends Learner{
         case Leader.SYNC:
             ((ObserverZooKeeperServer)zk).sync();
             break;
-        case Leader.INFORM:            
+        case Leader.INFORM:
             TxnHeader hdr = new TxnHeader();
             Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
-            Request request = new Request (null, hdr.getClientId(), 
-                                           hdr.getCxid(),
-                                           hdr.getType(), null, null);
-            request.txn = txn;
-            request.hdr = hdr;
+            Request request = new Request (hdr.getClientId(),  hdr.getCxid(), hdr.getType(), hdr, txn, 0);
             ObserverZooKeeperServer obs = (ObserverZooKeeperServer)zk;
-            obs.commitRequest(request);            
+            obs.commitRequest(request);
             break;
         }
     }
@@ -131,7 +127,7 @@ public class Observer extends Learner{
     /**
      * Shutdown the Observer.
      */
-    public void shutdown() {       
+    public void shutdown() {
         LOG.info("shutdown called", new Exception("shutdown Observer"));
         super.shutdown();
     }

+ 10 - 10
src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java

@@ -33,7 +33,7 @@ public class ProposalRequestProcessor implements RequestProcessor {
         LoggerFactory.getLogger(ProposalRequestProcessor.class);
 
     LeaderZooKeeperServer zks;
-    
+
     RequestProcessor nextProcessor;
 
     SyncRequestProcessor syncProcessor;
@@ -45,33 +45,33 @@ public class ProposalRequestProcessor implements RequestProcessor {
         AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
         syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
     }
-    
+
     /**
      * initialize this processor
      */
     public void initialize() {
         syncProcessor.start();
     }
-    
+
     public void processRequest(Request request) {
         // LOG.warn("Ack>>> cxid = " + request.cxid + " type = " +
         // request.type + " id = " + request.sessionId);
         // request.addRQRec(">prop");
-                
-        
-        /* In the following IF-THEN-ELSE block, we process syncs on the leader. 
+
+
+        /* In the following IF-THEN-ELSE block, we process syncs on the leader.
          * If the sync is coming from a follower, then the follower
          * handler adds it to syncHandler. Otherwise, if it is a client of
-         * the leader that issued the sync command, then syncHandler won't 
-         * contain the handler. In this case, we add it to syncHandler, and 
+         * the leader that issued the sync command, then syncHandler won't
+         * contain the handler. In this case, we add it to syncHandler, and
          * call processRequest on the next processor.
          */
-        
+
         if(request instanceof LearnerSyncRequest){
             zks.getLeader().processSync((LearnerSyncRequest)request);
         } else {
                 nextProcessor.processRequest(request);
-            if (request.hdr != null) {
+            if (request.getHdr() != null) {
                 // We need to sync and get consensus on any transactions
                 zks.getLeader().propose(request);
                 syncProcessor.processRequest(request);

+ 3 - 3
src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java

@@ -30,7 +30,7 @@ import org.apache.zookeeper.server.RequestProcessor;
 
 public class SendAckRequestProcessor implements RequestProcessor, Flushable {
     private static final Logger LOG = LoggerFactory.getLogger(SendAckRequestProcessor.class);
-    
+
     Learner learner;
 
     SendAckRequestProcessor(Learner peer) {
@@ -39,7 +39,7 @@ public class SendAckRequestProcessor implements RequestProcessor, Flushable {
 
     public void processRequest(Request si) {
         if(si.type != OpCode.sync){
-            QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null,
+            QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null,
                 null);
             try {
                 learner.writePacket(qp, false);
@@ -56,7 +56,7 @@ public class SendAckRequestProcessor implements RequestProcessor, Flushable {
             }
         }
     }
-    
+
     public void flush() throws IOException {
         try {
             learner.writePacket(null, true);

+ 1 - 1
src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java

@@ -168,7 +168,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
         // just make sure that we actually did get it in process at the
         // leader
         Assert.assertTrue(outstanding.size() == 1);
-        Assert.assertTrue(((Proposal) outstanding.values().iterator().next()).request.hdr.getType() == OpCode.create);
+        Assert.assertTrue(((Proposal) outstanding.values().iterator().next()).request.getHdr().getType() == OpCode.create);
         // make sure it has a chance to write it to disk
         Thread.sleep(1000);
         mt[leader].shutdown();