瀏覽代碼

ZOOKEEPER-1147. Add support for local sessions (Jay Shrauner, thawan via thawan)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1530781 13f79535-47bb-0310-9956-ffa450edef68
Thawan Kooburat 11 年之前
父節點
當前提交
b71191f514
共有 41 個文件被更改,包括 2373 次插入222 次删除
  1. 2 0
      CHANGES.txt
  2. 2 1
      src/c/include/zookeeper.h
  3. 35 4
      src/java/main/org/apache/zookeeper/KeeperException.java
  4. 11 6
      src/java/main/org/apache/zookeeper/cli/CreateCommand.java
  5. 21 6
      src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
  6. 31 7
      src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
  7. 28 14
      src/java/main/org/apache/zookeeper/server/Request.java
  8. 50 3
      src/java/main/org/apache/zookeeper/server/SessionTracker.java
  9. 59 28
      src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java
  10. 1 1
      src/java/main/org/apache/zookeeper/server/TraceFormatter.java
  11. 52 18
      src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
  12. 4 3
      src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
  13. 32 6
      src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
  14. 81 0
      src/java/main/org/apache/zookeeper/server/quorum/LeaderRequestProcessor.java
  15. 203 0
      src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java
  16. 40 11
      src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
  17. 3 4
      src/java/main/org/apache/zookeeper/server/quorum/Learner.java
  18. 2 2
      src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
  19. 160 40
      src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java
  20. 29 22
      src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
  21. 45 0
      src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java
  22. 26 2
      src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
  23. 2 2
      src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
  24. 35 1
      src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
  25. 13 3
      src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
  26. 7 4
      src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
  27. 99 0
      src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
  28. 27 3
      src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
  29. 89 0
      src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java
  30. 25 7
      src/java/test/org/apache/zookeeper/server/PrepRequestProcessorTest.java
  31. 15 7
      src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
  32. 113 0
      src/java/test/org/apache/zookeeper/test/DuplicateLocalSessionUpgradeTest.java
  33. 176 0
      src/java/test/org/apache/zookeeper/test/LeaderSessionTrackerTest.java
  34. 148 0
      src/java/test/org/apache/zookeeper/test/LocalSessionRequestTest.java
  35. 137 0
      src/java/test/org/apache/zookeeper/test/LocalSessionsOnlyTest.java
  36. 48 5
      src/java/test/org/apache/zookeeper/test/QuorumBase.java
  37. 7 9
      src/java/test/org/apache/zookeeper/test/QuorumTest.java
  38. 43 2
      src/java/test/org/apache/zookeeper/test/QuorumUtil.java
  39. 22 1
      src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java
  40. 218 0
      src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java
  41. 232 0
      src/java/test/org/apache/zookeeper/test/SessionUpgradeTest.java

+ 2 - 0
CHANGES.txt

@@ -16,6 +16,8 @@ NEW FEATURES:
   ZOOKEEPER-1400. Allow logging via callback instead of raw FILE pointer
   (Marshall McMullen via michim)
 
+  ZOOKEEPER-1147. Add support for local sessions (Jay Shrauner, thawan via thawan)
+
 BUGFIXES:
 
   ZOOKEEPER-786. Exception in ZooKeeper.toString

+ 2 - 1
src/c/include/zookeeper.h

@@ -116,8 +116,9 @@ enum ZOO_ERRORS {
   ZSESSIONMOVED = -118, /*!<session moved to another server, so operation is ignored */
   ZNEWCONFIGNOQUORUM = -120,  /*!< No quorum of new config is connected and up-to-date with the leader of last commmitted config - try
                                  invoking reconfiguration after new servers are connected and synced */
-  ZRECONFIGINPROGRESS = -121  /*!< Reconfiguration requested while another reconfiguration is currently in progress. This is currently
+  ZRECONFIGINPROGRESS = -121,  /*!< Reconfiguration requested while another reconfiguration is currently in progress. This is currently
                                        not supported. Please retry. */
+  ZEPHEMERALONLOCALSESSION = -122 /*!< Attempt to create ephemeral node on a local session */
 };
 
 #ifdef __cplusplus

+ 35 - 4
src/java/main/org/apache/zookeeper/KeeperException.java

@@ -135,7 +135,8 @@ public abstract class KeeperException extends Exception {
                 return new SessionMovedException();
             case NOTREADONLY:
                 return new NotReadOnlyException();
-            	
+            case EPHEMERALONLOCALSESSION:
+                return new EphemeralOnLocalSessionException();
             case OK:
             default:
                 throw new IllegalArgumentException("Invalid exception code");
@@ -222,6 +223,9 @@ public abstract class KeeperException extends Exception {
         @Deprecated
         public static final int BadArguments = -8;
 
+        @Deprecated
+        public static final int UnknownSession= -12;
+
         /**
          * @deprecated deprecated in 3.1.0, use {@link Code#APIERROR} instead
          */
@@ -291,6 +295,9 @@ public abstract class KeeperException extends Exception {
         @Deprecated
         public static final int ReconfigInProgress= -121;
         
+        @Deprecated
+        public static final int EphemeralOnLocalSession = -122;
+
     }
 
     /** Codes which represent the various KeeperException
@@ -328,6 +335,8 @@ public abstract class KeeperException extends Exception {
         NEWCONFIGNOQUORUM (NewConfigNoQuorum),
         /** Another reconfiguration is in progress -- concurrent reconfigs not supported (yet) */
         RECONFIGINPROGRESS (ReconfigInProgress),
+        /** Unknown session (internal server use only) */
+        UNKNOWNSESSION (UnknownSession),
         
         /** API errors.
          * This is never thrown by the server, it shouldn't be used other than
@@ -361,7 +370,9 @@ public abstract class KeeperException extends Exception {
         /** Session moved to another server, so operation is ignored */
         SESSIONMOVED (-118),
         /** State-changing request is passed to read-only server */
-        NOTREADONLY (-119);
+        NOTREADONLY (-119),
+        /** Attempt to create ephemeral node on a local session */
+        EPHEMERALONLOCALSESSION (EphemeralOnLocalSession);
 
         private static final Map<Integer,Code> lookup
             = new HashMap<Integer,Code>();
@@ -442,6 +453,8 @@ public abstract class KeeperException extends Exception {
                 return "Session moved";
             case NOTREADONLY:
                 return "Not a read-only call";
+            case EPHEMERALONLOCALSESSION:
+                return "Ephemeral node on local session";
             default:
                 return "Unknown error " + code;
         }
@@ -502,7 +515,7 @@ public abstract class KeeperException extends Exception {
      * If this exception was thrown by a multi-request then the (partial) results
      * and error codes can be retrieved using this getter.
      * @return A copy of the list of results from the operations in the multi-request.
-     * 
+     *
      * @since 3.4.0
      *
      */
@@ -701,7 +714,16 @@ public abstract class KeeperException extends Exception {
             super(Code.SESSIONEXPIRED);
         }
     }
-    
+
+    /**
+     * @see Code#UNKNOWNSESSION
+     */
+    public static class UnknownSessionException extends KeeperException {
+        public UnknownSessionException() {
+            super(Code.UNKNOWNSESSION);
+        }
+    }
+
     /**
      * @see Code#SESSIONMOVED
      */
@@ -720,6 +742,15 @@ public abstract class KeeperException extends Exception {
         }
     }
 
+    /**
+     * @see Code#EPHEMERALONLOCALSESSION
+     */
+    public static class EphemeralOnLocalSessionException extends KeeperException {
+        public EphemeralOnLocalSessionException() {
+            super(Code.EPHEMERALONLOCALSESSION);
+        }
+    }
+
     /**
      * @see Code#SYSTEMERROR
      */

+ 11 - 6
src/java/main/org/apache/zookeeper/cli/CreateCommand.java

@@ -32,16 +32,16 @@ public class CreateCommand extends CliCommand {
     private static Options options = new Options();
     private String[] args;
     private CommandLine cl;
-    
+
     {
         options.addOption(new Option("e", false, "ephemeral"));
         options.addOption(new Option("s", false, "sequential"));
     }
-    
+
     public CreateCommand() {
         super("create", "[-s] [-e] path [data] [acl]");
     }
-    
+
 
     @Override
     public CliCommand parse(String[] cmdArgs) throws ParseException {
@@ -54,7 +54,7 @@ public class CreateCommand extends CliCommand {
         return this;
     }
 
-    
+
     @Override
     public boolean exec() throws KeeperException, InterruptedException {
         CreateMode flags = CreateMode.PERSISTENT;
@@ -74,8 +74,13 @@ public class CreateCommand extends CliCommand {
         if (args.length > 3) {
             acl = AclParser.parse(args[3]);
         }
-        String newPath = zk.create(path, data, acl, flags);
-        err.println("Created " + newPath);
+        try {
+            String newPath = zk.create(path, data, acl, flags);
+            err.println("Created " + newPath);
+        } catch(KeeperException.EphemeralOnLocalSessionException e) {
+            err.println("Unable to create ephemeral node on a local session");
+            return false;
+        }
         return true;
     }
 }

+ 21 - 6
src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java

@@ -96,6 +96,11 @@ public class FinalRequestProcessor implements RequestProcessor {
         }
         ProcessTxnResult rc = null;
         synchronized (zks.outstandingChanges) {
+            // Need to process local session requests
+            rc = zks.processTxn(request);
+
+            // request.hdr is set for write requests, which are the only ones
+            // that add to outstandingChanges.
             if (request.getHdr() != null) {
                 TxnHeader hdr = request.getHdr();
                 Record txn = request.getTxn();
@@ -111,16 +116,15 @@ public class FinalRequestProcessor implements RequestProcessor {
                         zks.outstandingChangesForPath.remove(cr.path);
                     }
                 }
-
-                rc = zks.processTxn(hdr, txn);
             }
+
             // do not add non quorum packets to the queue.
-            if (Request.isQuorum(request.type)) {
+            if (request.isQuorum()) {
                 zks.getZKDatabase().addCommittedProposal(request);
             }
         }
 
-        if (request.getHdr() != null && request.getHdr().getType() == OpCode.closeSession) {
+        if (request.type == OpCode.closeSession) {
             ServerCnxnFactory scxn = zks.getServerCnxnFactory();
             // this might be possible since
             // we might just be playing diffs from the leader
@@ -145,8 +149,19 @@ public class FinalRequestProcessor implements RequestProcessor {
         Record rsp = null;
         try {
             if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {
-                throw KeeperException.create(KeeperException.Code.get((
-                        (ErrorTxn) request.getTxn()).getErr()));
+                /*
+                 * When local session upgrading is disabled, leader will
+                 * reject the ephemeral node creation due to session expire.
+                 * However, if this is the follower that issue the request,
+                 * it will have the correct error code, so we should use that
+                 * and report to user
+                 */
+                if (request.getException() != null) {
+                    throw request.getException();
+                } else {
+                    throw KeeperException.create(KeeperException.Code
+                            .get(((ErrorTxn) request.getTxn()).getErr()));
+                }
             }
 
             KeeperException ke = request.getException();

+ 31 - 7
src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java

@@ -349,11 +349,12 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
 
         switch (type) {
             case OpCode.create: {
-                zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                 CreateRequest createRequest = (CreateRequest)record;
                 if (deserialize) {
                     ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
                 }
+                CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
+                validateCreateRequest(createMode, request);
                 String path = createRequest.getPath();
                 String parentPath = validatePathForCreate(path, request.sessionId);
 
@@ -365,7 +366,6 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
 
                 checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo);
                 int parentCVersion = parentRecord.stat.getCversion();
-                CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
                 if (createMode.isSequential()) {
                     path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
                 }
@@ -402,11 +402,12 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
                 break;
             }
             case OpCode.create2: {
-                zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                 Create2Request createRequest = (Create2Request)record;
                 if (deserialize) {
                     ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
                 }
+                CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
+                validateCreateRequest(createMode, request);
                 String path = createRequest.getPath();
                 String parentPath = validatePathForCreate(path, request.sessionId);
 
@@ -418,7 +419,6 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
 
                 checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo);
                 int parentCVersion = parentRecord.stat.getCversion();
-                CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
                 if (createMode.isSequential()) {
                     path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
                 }
@@ -624,7 +624,13 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
                 int to = request.request.getInt();
                 request.setTxn(new CreateSessionTxn(to));
                 request.request.rewind();
-                zks.sessionTracker.addSession(request.sessionId, to);
+                if (request.isLocalSession()) {
+                    // This will add to local session tracker if it is enabled
+                    zks.sessionTracker.addSession(request.sessionId, to);
+                } else {
+                    // Explicitly add to global session if the flag is not set
+                    zks.sessionTracker.addGlobalSession(request.sessionId, to);
+                }
                 zks.setOwner(request.sessionId, request.getOwner());
                 break;
             case OpCode.closeSession:
@@ -791,7 +797,10 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
             //create/close session don't require request record
             case OpCode.createSession:
             case OpCode.closeSession:
-                pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
+                if (!request.isLocalSession()) {
+                    pRequest2Txn(request.type, zks.getNextZxid(), request,
+                                 null, true);
+                }
                 break;
 
             //All the rest don't need to create a Txn - just verify session
@@ -855,7 +864,22 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
         }
         return retval;
     }
-
+    
+    private void validateCreateRequest(CreateMode createMode, Request request)
+            throws KeeperException {
+        if (createMode.isEphemeral()) {
+            // Exception is set when local session failed to upgrade
+            // so we just need to report the error
+            if (request.getException() != null) {
+                throw request.getException();
+            }
+            zks.sessionTracker.checkGlobalSession(request.sessionId,
+                    request.getOwner());
+        } else {
+            zks.sessionTracker.checkSession(request.sessionId,
+                    request.getOwner());
+        }
+    }
 
     /**
      * This method checks out the acl making sure it isn't null or empty,

+ 28 - 14
src/java/main/org/apache/zookeeper/server/Request.java

@@ -83,6 +83,19 @@ public class Request {
 
     public QuorumVerifier qv = null;
     
+    /**
+     * If this is a create or close request for a local-only session.
+     */
+    private boolean isLocalSession = false;
+
+    public boolean isLocalSession() {
+        return isLocalSession;
+    }
+
+    public void setLocalSession(boolean isLocalSession) {
+        this.isLocalSession = isLocalSession;
+    }
+
     public Object getOwner() {
         return owner;
     }
@@ -119,43 +132,41 @@ public class Request {
         switch (type) {
         case OpCode.notification:
             return false;
+        case OpCode.check:
+        case OpCode.closeSession:
         case OpCode.create:
         case OpCode.create2:
-        case OpCode.delete:
         case OpCode.createSession:
+        case OpCode.delete:
         case OpCode.exists:
-        case OpCode.getData:
-        case OpCode.check:
-        case OpCode.multi:
-        case OpCode.setData:
-        case OpCode.sync:
         case OpCode.getACL:
-        case OpCode.setACL:
         case OpCode.getChildren:
         case OpCode.getChildren2:
+        case OpCode.getData:
+        case OpCode.multi:
         case OpCode.ping:
-        case OpCode.closeSession:
-        case OpCode.setWatches:
         case OpCode.reconfig:
+        case OpCode.setACL:
+        case OpCode.setData:
+        case OpCode.setWatches:
+        case OpCode.sync:
             return true;
         default:
             return false;
         }
     }
 
-    static boolean isQuorum(int type) {
-        switch (type) {
+    public boolean isQuorum() {
+        switch (this.type) {
         case OpCode.exists:
         case OpCode.getACL:
         case OpCode.getChildren:
         case OpCode.getChildren2:
         case OpCode.getData:
             return false;
-        case OpCode.error:
-        case OpCode.closeSession:
         case OpCode.create:
         case OpCode.create2:
-        case OpCode.createSession:
+        case OpCode.error:
         case OpCode.delete:
         case OpCode.setACL:
         case OpCode.setData:
@@ -163,6 +174,9 @@ public class Request {
         case OpCode.multi:
         case OpCode.reconfig:
             return true;
+        case OpCode.closeSession:
+        case OpCode.createSession:
+            return !this.isLocalSession;
         default:
             return false;
         }

+ 50 - 3
src/java/main/org/apache/zookeeper/server/SessionTracker.java

@@ -44,7 +44,22 @@ public interface SessionTracker {
 
     long createSession(int sessionTimeout);
 
-    void addSession(long id, int to);
+    /**
+     * Add a global session to those being tracked.
+     * @param id sessionId
+     * @param to sessionTimeout
+     * @return whether the session was newly added (if false, already existed)
+     */
+    boolean addGlobalSession(long id, int to);
+
+    /**
+     * Add a session to those being tracked. The session is added as a local
+     * session if they are enabled, otherwise as global.
+     * @param id sessionId
+     * @param to sessionTimeout
+     * @return whether the session was newly added (if false, already existed)
+     */
+    boolean addSession(long id, int to);
 
     /**
      * @param sessionId
@@ -60,7 +75,7 @@ public interface SessionTracker {
     void setSessionClosing(long sessionId);
 
     /**
-     * 
+     *
      */
     void shutdown();
 
@@ -69,7 +84,39 @@ public interface SessionTracker {
      */
     void removeSession(long sessionId);
 
-    void checkSession(long sessionId, Object owner) throws KeeperException.SessionExpiredException, SessionMovedException;
+    /**
+     * @param sessionId
+     * @return whether or not the SessionTracker is aware of this session
+     */
+    boolean isTrackingSession(long sessionId);
+
+    /**
+     * Checks whether the SessionTracker is aware of this session, the session
+     * is still active, and the owner matches. If the owner wasn't previously
+     * set, this sets the owner of the session.
+     *
+     * UnknownSessionException should never been thrown to the client. It is
+     * only used internally to deal with possible local session from other
+     * machine
+     *
+     * @param sessionId
+     * @param owner
+     */
+    public void checkSession(long sessionId, Object owner)
+            throws KeeperException.SessionExpiredException,
+            KeeperException.SessionMovedException,
+            KeeperException.UnknownSessionException;
+
+    /**
+     * Strictly check that a given session is a global session or not
+     * @param sessionId
+     * @param owner
+     * @throws KeeperException.SessionExpiredException
+     * @throws KeeperException.SessionMovedException
+     */
+    public void checkGlobalSession(long sessionId, Object owner)
+            throws KeeperException.SessionExpiredException,
+            KeeperException.SessionMovedException;
 
     void setOwner(long id, Object owner) throws SessionExpiredException;
 

+ 59 - 28
src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java

@@ -20,18 +20,15 @@ package org.apache.zookeeper.server;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
 import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This is a full featured SessionTracker. It tracks session in grouped by tick
@@ -47,8 +44,7 @@ public class SessionTrackerImpl extends Thread implements SessionTracker {
 
     private final ExpiryQueue<SessionImpl> sessionExpiryQueue;
 
-    private final ConcurrentHashMap<Long, Integer> sessionsWithTimeout;
-    private final long serverId;
+    private final ConcurrentMap<Long, Integer> sessionsWithTimeout;
     private final AtomicLong nextSessionId = new AtomicLong();
 
     public static class SessionImpl implements Session {
@@ -73,6 +69,10 @@ public class SessionTrackerImpl extends Thread implements SessionTracker {
         }
     }
 
+    /**
+     * Generates an initial sessionId. High order byte is serverId, next 5
+     * 5 bytes are from timestamp, and low order 2 bytes are 0s.
+     */
     public static long initializeNextSession(long id) {
         long nextSid = 0;
         nextSid = (System.currentTimeMillis() << 24) >> 8;
@@ -83,15 +83,14 @@ public class SessionTrackerImpl extends Thread implements SessionTracker {
     private final SessionExpirer expirer;
 
     public SessionTrackerImpl(SessionExpirer expirer,
-            ConcurrentHashMap<Long, Integer> sessionsWithTimeout, int tickTime,
-            long sid)
+            ConcurrentMap<Long, Integer> sessionsWithTimeout, int tickTime,
+            long serverId)
     {
         super("SessionTracker");
         this.expirer = expirer;
         this.sessionExpiryQueue = new ExpiryQueue<SessionImpl>(tickTime);
         this.sessionsWithTimeout = sessionsWithTimeout;
-        this.serverId = sid;
-        this.nextSessionId.set(initializeNextSession(sid));
+        this.nextSessionId.set(initializeNextSession(serverId));
         for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) {
             addSession(e.getKey(), e.getValue());
         }
@@ -103,7 +102,7 @@ public class SessionTrackerImpl extends Thread implements SessionTracker {
         pwriter.print("Session ");
         sessionExpiryQueue.dump(pwriter);
     }
-
+    
     @Override
     public String toString() {
         StringWriter sw = new StringWriter();
@@ -151,6 +150,10 @@ public class SessionTrackerImpl extends Thread implements SessionTracker {
         return true;
     }
 
+    public int getSessionTimeout(long sessionId) {
+        return sessionsWithTimeout.get(sessionId);
+    }
+
     synchronized public void setSessionClosing(long sessionId) {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Session closing: 0x" + Long.toHexString(sessionId));
@@ -192,31 +195,49 @@ public class SessionTrackerImpl extends Thread implements SessionTracker {
         return sessionId;
     }
 
-    synchronized public void addSession(long id, int sessionTimeout) {
+    public boolean addGlobalSession(long id, int sessionTimeout) {
+        return addSession(id, sessionTimeout);
+    }
+
+    public synchronized boolean addSession(long id, int sessionTimeout) {
+        boolean added = false;
+
         sessionsWithTimeout.put(id, sessionTimeout);
         if (sessionsById.get(id) == null) {
             SessionImpl s = new SessionImpl(id, sessionTimeout);
             sessionsById.put(id, s);
-            if (LOG.isTraceEnabled()) {
-                ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
-                        "SessionTrackerImpl --- Adding session 0x"
-                        + Long.toHexString(id) + " " + sessionTimeout);
-            }
-        } else {
-            if (LOG.isTraceEnabled()) {
-                ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
-                        "SessionTrackerImpl --- Existing session 0x"
-                        + Long.toHexString(id) + " " + sessionTimeout);
-            }
+            added = true;
+            LOG.debug("Adding session 0x" + Long.toHexString(id));
+        }
+        if (LOG.isTraceEnabled()) {
+            String actionStr = added ? "Adding" : "Existing";
+            ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
+                    "SessionTrackerImpl --- " + actionStr + " session 0x"
+                    + Long.toHexString(id) + " " + sessionTimeout);
         }
         touchSession(id, sessionTimeout);
+        return added;
+    }
+
+    public boolean isTrackingSession(long sessionId) {
+        return sessionsById.containsKey(sessionId);
     }
 
-    synchronized public void checkSession(long sessionId, Object owner) throws KeeperException.SessionExpiredException, KeeperException.SessionMovedException {
+    public synchronized void checkSession(long sessionId, Object owner)
+            throws KeeperException.SessionExpiredException,
+            KeeperException.SessionMovedException,
+            KeeperException.UnknownSessionException {
+        LOG.debug("Checking session 0x" + Long.toHexString(sessionId));
         SessionImpl session = sessionsById.get(sessionId);
-        if (session == null || session.isClosing()) {
+
+        if (session == null) {
+            throw new KeeperException.UnknownSessionException();
+        }
+
+        if (session.isClosing()) {
             throw new KeeperException.SessionExpiredException();
         }
+
         if (session.owner == null) {
             session.owner = owner;
         } else if (session.owner != owner) {
@@ -231,4 +252,14 @@ public class SessionTrackerImpl extends Thread implements SessionTracker {
         }
         session.owner = owner;
     }
+
+    public void checkGlobalSession(long sessionId, Object owner)
+            throws KeeperException.SessionExpiredException,
+            KeeperException.SessionMovedException {
+        try {
+            checkSession(sessionId, owner);
+        } catch (KeeperException.UnknownSessionException e) {
+            throw new KeeperException.SessionExpiredException();
+        }
+    }
 }

+ 1 - 1
src/java/main/org/apache/zookeeper/server/TraceFormatter.java

@@ -29,7 +29,7 @@ import org.apache.zookeeper.ZooDefs.OpCode;
 
 public class TraceFormatter {
 
-    static String op2String(int op) {
+    public static String op2String(int op) {
         switch (op) {
         case OpCode.notification:
             return "notification";

+ 52 - 18
src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -287,6 +287,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         return hzxid.get();
     }
 
+    public SessionTracker getSessionTracker() {
+        return sessionTracker;
+    }
+    
     long getNextZxid() {
         return hzxid.incrementAndGet();
     }
@@ -300,7 +304,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     }
 
     private void close(long sessionId) {
-        submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);
+        Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null);
+        setLocalSessionFlag(si);
+        submitRequest(si);
     }
 
     public void closeSession(long sessionId) {
@@ -409,7 +415,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
                 tickTime, 1);
     }
-    
+
     protected void startSessionTracker() {
         ((SessionTrackerImpl)sessionTracker).start();
     }
@@ -518,13 +524,19 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     }
 
     long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
+        if (passwd == null) {
+            // Possible since it's just deserialized from a packet on the wire.
+            passwd = new byte[0];
+        }
         long sessionId = sessionTracker.createSession(timeout);
         Random r = new Random(sessionId ^ superSecret);
         r.nextBytes(passwd);
         ByteBuffer to = ByteBuffer.allocate(4);
         to.putInt(timeout);
         cnxn.setSessionId(sessionId);
-        submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);
+        Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
+        setLocalSessionFlag(si);
+        submitRequest(si);
         return sessionId;
     }
 
@@ -554,6 +566,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         if (checkPasswd(sessionId, passwd)) {
             revalidateSession(cnxn, sessionId, sessionTimeout);
         } else {
+            LOG.warn("Incorrect password from " + cnxn.getRemoteSocketAddress()
+                    + " for session 0x" + Long.toHexString(sessionId));
             finishSessionInit(cnxn, false);
         }
     }
@@ -618,15 +632,13 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     }
 
     /**
-     * @param cnxn
-     * @param sessionId
-     * @param xid
-     * @param bb
+     * If the underlying Zookeeper server support local session, this method
+     * will set a isLocalSession to true if a request is associated with
+     * a local session.
+     *
+     * @param si
      */
-    private void submitRequest(ServerCnxn cnxn, long sessionId, int type,
-            int xid, ByteBuffer bb, List<Id> authInfo) {
-        Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
-        submitRequest(si);
+    protected void setLocalSessionFlag(Request si) {
     }
 
     public void submitRequest(Request si) {
@@ -919,6 +931,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
                 Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
                   h.getType(), incomingBuffer, cnxn.getAuthInfo());
                 si.setOwner(ServerCnxn.me);
+                // Always treat packet from the client as a possible
+                // local request.
+                setLocalSessionFlag(si);
                 submitRequest(si);
             }
         }
@@ -966,17 +981,36 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         // wrap SASL response token to client inside a Response object.
         return new SetSASLResponse(responseToken);
     }
-    
+
+    // entry point for quorum/Learner.java
     public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
+        return processTxn(null, hdr, txn);
+    }
+
+    // entry point for FinalRequestProcessor.java
+    public ProcessTxnResult processTxn(Request request) {
+        return processTxn(request, request.getHdr(), request.getTxn());
+    }
+
+    private ProcessTxnResult processTxn(Request request, TxnHeader hdr,
+                                        Record txn) {
         ProcessTxnResult rc;
-        int opCode = hdr.getType();
-        long sessionId = hdr.getClientId();
-        rc = getZKDatabase().processTxn(hdr, txn);
+        int opCode = request != null ? request.type : hdr.getType();
+        long sessionId = request != null ? request.sessionId : hdr.getClientId();
+        if (hdr != null) {
+            rc = getZKDatabase().processTxn(hdr, txn);
+        } else {
+            rc = new ProcessTxnResult();
+        }
         if (opCode == OpCode.createSession) {
-            if (txn instanceof CreateSessionTxn) {
+            if (hdr != null && txn instanceof CreateSessionTxn) {
                 CreateSessionTxn cst = (CreateSessionTxn) txn;
-                sessionTracker.addSession(sessionId, cst
-                        .getTimeOut());
+                sessionTracker.addGlobalSession(sessionId, cst.getTimeOut());
+            } else if (request != null && request.isLocalSession()) {
+                request.request.rewind();
+                int timeout = request.request.getInt();
+                request.request.rewind();
+                sessionTracker.addSession(request.sessionId, timeout);
             } else {
                 LOG.warn("*****>>>>> Got "
                         + txn.getClass() + " "

+ 4 - 3
src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java

@@ -134,11 +134,12 @@ public class CommitProcessor extends Thread implements RequestProcessor {
             case OpCode.reconfig:
             case OpCode.multi:
             case OpCode.setACL:
-            case OpCode.createSession:
-            case OpCode.closeSession:
                 return true;
             case OpCode.sync:
-                return matchSyncs;
+                return matchSyncs;    
+            case OpCode.createSession:
+            case OpCode.closeSession:
+                return !request.isLocalSession();
             default:
                 return false;
         }

+ 32 - 6
src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java

@@ -18,15 +18,17 @@
 
 package org.apache.zookeeper.server.quorum;
 
+import java.io.IOException;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.OpCode;
-import org.apache.zookeeper.server.RequestProcessor;
 import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestProcessor;
 import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.txn.ErrorTxn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This RequestProcessor forwards any requests that modify the state of the
@@ -83,12 +85,17 @@ public class FollowerRequestProcessor extends Thread implements
                 case OpCode.setData:
                 case OpCode.reconfig:
                 case OpCode.setACL:
-                case OpCode.createSession:
-                case OpCode.closeSession:
                 case OpCode.multi:
                 case OpCode.check:
                     zks.getFollower().request(request);
                     break;
+                case OpCode.createSession:
+                case OpCode.closeSession:
+                    // Don't forward local sessions to the leader.
+                    if (!request.isLocalSession()) {
+                        zks.getFollower().request(request);
+                    }
+                    break;
                 }
             }
         } catch (Exception e) {
@@ -99,6 +106,25 @@ public class FollowerRequestProcessor extends Thread implements
 
     public void processRequest(Request request) {
         if (!finished) {
+            // Before sending the request, check if the request requires a
+            // global session and what we have is a local session. If so do
+            // an upgrade.
+            Request upgradeRequest = null;
+            try {
+                upgradeRequest = zks.checkUpgradeSession(request);
+            } catch (KeeperException ke) {
+                if (request.getHdr() != null) {
+                    request.getHdr().setType(OpCode.error);
+                    request.setTxn(new ErrorTxn(ke.code().intValue()));
+                }
+                request.setException(ke);
+                LOG.info("Error creating upgrade request",  ke);
+            } catch (IOException ie) {
+                LOG.error("Unexpected error in upgrade", ie);
+            }
+            if (upgradeRequest != null) {
+                queuedRequests.add(upgradeRequest);
+            }
             queuedRequests.add(request);
         }
     }

+ 81 - 0
src/java/main/org/apache/zookeeper/server/quorum/LeaderRequestProcessor.java

@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.IOException;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.txn.ErrorTxn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Responsible for performing local session upgrade. Only request submitted
+ * directly to the leader should go through this processor.
+ */
+public class LeaderRequestProcessor implements RequestProcessor {
+    private static final Logger LOG = LoggerFactory
+            .getLogger(LeaderRequestProcessor.class);
+
+    private final LeaderZooKeeperServer lzks;
+
+    private final RequestProcessor nextProcessor;
+
+    public LeaderRequestProcessor(LeaderZooKeeperServer zks,
+            RequestProcessor nextProcessor) {
+        this.lzks = zks;
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public void processRequest(Request request)
+            throws RequestProcessorException {
+        // Check if this is a local session and we are trying to create
+        // an ephemeral node, in which case we upgrade the session
+        Request upgradeRequest = null;
+        try {
+            upgradeRequest = lzks.checkUpgradeSession(request);
+        } catch (KeeperException ke) {
+            if (request.getHdr() != null) {
+                LOG.debug("Updating header");
+                request.getHdr().setType(OpCode.error);
+                request.setTxn(new ErrorTxn(ke.code().intValue()));
+            }
+            request.setException(ke);
+            LOG.info("Error creating upgrade request " + ke.getMessage());
+        } catch (IOException ie) {
+            LOG.error("Unexpected error in upgrade", ie);
+        }
+        if (upgradeRequest != null) {
+            nextProcessor.processRequest(upgradeRequest);
+        }
+
+        nextProcessor.processRequest(request);
+    }
+
+    @Override
+    public void shutdown() {
+        LOG.info("Shutting down");
+        nextProcessor.shutdown();
+    }
+
+}

+ 203 - 0
src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java

@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.quorum;
+
+import java.io.PrintWriter;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.KeeperException.SessionMovedException;
+import org.apache.zookeeper.KeeperException.UnknownSessionException;
+import org.apache.zookeeper.server.SessionTrackerImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The leader session tracker tracks local and global sessions on the leader.
+ */
+public class LeaderSessionTracker extends UpgradeableSessionTracker {
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderSessionTracker.class);
+
+    private final boolean localSessionsEnabled;
+    private final SessionTrackerImpl globalSessionTracker;
+
+    /**
+     * Server id of the leader
+     */
+    private final long serverId;
+
+    public LeaderSessionTracker(SessionExpirer expirer,
+            ConcurrentMap<Long, Integer> sessionsWithTimeouts,
+            int tickTime, long id, boolean localSessionsEnabled) {
+
+        this.globalSessionTracker = new SessionTrackerImpl(
+            expirer, sessionsWithTimeouts, tickTime, id);
+
+        this.localSessionsEnabled = localSessionsEnabled;
+        if (this.localSessionsEnabled) {
+            createLocalSessionTracker(expirer, tickTime, id);
+        }
+        serverId = id;
+    }
+
+    public void removeSession(long sessionId) {
+        if (localSessionTracker != null) {
+            localSessionTracker.removeSession(sessionId);
+        }
+        globalSessionTracker.removeSession(sessionId);
+    }
+
+    public void start() {
+        globalSessionTracker.start();
+        if (localSessionTracker != null) {
+            localSessionTracker.start();
+        }
+    }
+
+    public void shutdown() {
+        if (localSessionTracker != null) {
+            localSessionTracker.shutdown();
+        }
+        globalSessionTracker.shutdown();
+    }
+
+    public boolean isGlobalSession(long sessionId) {
+        return globalSessionTracker.isTrackingSession(sessionId);
+    }
+
+    public boolean addGlobalSession(long sessionId, int sessionTimeout) {
+        boolean added =
+            globalSessionTracker.addSession(sessionId, sessionTimeout);
+        if (localSessionsEnabled && added) {
+            // Only do extra logging so we know what kind of session this is
+            // if we're supporting both kinds of sessions
+            LOG.info("Adding global session 0x" + Long.toHexString(sessionId));
+        }
+        return added;
+    }
+
+    public boolean addSession(long sessionId, int sessionTimeout) {
+        boolean added;
+        if (localSessionsEnabled && !isGlobalSession(sessionId)) {
+            added = localSessionTracker.addSession(sessionId, sessionTimeout);
+            // Check for race condition with session upgrading
+            if (isGlobalSession(sessionId)) {
+                added = false;
+                localSessionTracker.removeSession(sessionId);
+            } else if (added) {
+              LOG.info("Adding local session 0x" + Long.toHexString(sessionId));
+            }
+        } else {
+            added = addGlobalSession(sessionId, sessionTimeout);
+        }
+        return added;
+    }
+
+    public boolean touchSession(long sessionId, int sessionTimeout) {
+        if (localSessionTracker != null &&
+            localSessionTracker.touchSession(sessionId, sessionTimeout)) {
+            return true;
+        }
+        return globalSessionTracker.touchSession(sessionId, sessionTimeout);
+    }
+
+    public long createSession(int sessionTimeout) {
+        if (localSessionsEnabled) {
+            return localSessionTracker.createSession(sessionTimeout);
+        }
+        return globalSessionTracker.createSession(sessionTimeout);
+    }
+
+    // Returns the serverId from the sessionId (the high order byte)
+    public static long getServerIdFromSessionId(long sessionId) {
+        return sessionId >> 56;
+    }
+
+    public void checkSession(long sessionId, Object owner)
+            throws SessionExpiredException, SessionMovedException,
+            UnknownSessionException {
+        if (localSessionTracker != null) {
+            try {
+                localSessionTracker.checkSession(sessionId, owner);
+                // A session can both be a local and global session during
+                // upgrade
+                if (!isGlobalSession(sessionId)) {
+                    return;
+                }
+            } catch(UnknownSessionException e) {
+                // Ignore. We'll check instead whether it's a global session
+            }
+        }
+        try {
+            globalSessionTracker.checkSession(sessionId, owner);
+            // if we can get here, it is a valid global session
+            return;
+        } catch (UnknownSessionException e) {
+            // Ignore. This may be local session from other servers.
+        }
+
+        /*
+         * if local session is not enabled or it used to be our local session
+         * throw sessions expires
+         */
+        if (!localSessionsEnabled
+                || (getServerIdFromSessionId(sessionId) == serverId)) {
+            throw new SessionExpiredException();
+        }
+    }
+
+    public void checkGlobalSession(long sessionId, Object owner)
+            throws SessionExpiredException, SessionMovedException {
+        try {
+            globalSessionTracker.checkSession(sessionId, owner);
+        } catch (UnknownSessionException e) {
+            // For global session, if we don't know it, it is already expired
+            throw new SessionExpiredException();
+        }
+    }
+
+    public void setOwner(long sessionId, Object owner)
+            throws SessionExpiredException {
+        if (localSessionTracker != null) {
+            try {
+                localSessionTracker.setOwner(sessionId, owner);
+                return;
+            } catch(SessionExpiredException e) {
+                // Ignore. We'll check instead whether it's a global session
+            }
+        }
+        globalSessionTracker.setOwner(sessionId, owner);
+    }
+
+    public void dumpSessions(PrintWriter pwriter) {
+      if (localSessionTracker != null) {
+          pwriter.print("Local ");
+          localSessionTracker.dumpSessions(pwriter);
+          pwriter.print("Global ");
+      }
+      globalSessionTracker.dumpSessions(pwriter);
+    }
+
+    public void setSessionClosing(long sessionId) {
+        // call is no-op if session isn't tracked so safe to call both
+        if (localSessionTracker != null) {
+            localSessionTracker.setSessionClosing(sessionId);
+        }
+        globalSessionTracker.setSessionClosing(sessionId);
+    }
+}

+ 40 - 11
src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java

@@ -25,9 +25,9 @@ import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.server.DataTreeBean;
 import org.apache.zookeeper.server.FinalRequestProcessor;
 import org.apache.zookeeper.server.PrepRequestProcessor;
+import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
 import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.server.SessionTrackerImpl;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 
@@ -39,8 +39,11 @@ import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
  * FinalRequestProcessor
  */
 public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
+
     CommitProcessor commitProcessor;
 
+    PrepRequestProcessor prepRequestProcessor;
+
     /**
      * @param port
      * @param dataDir
@@ -64,8 +67,9 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
         ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                 commitProcessor);
         proposalProcessor.initialize();
-        firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
-        ((PrepRequestProcessor)firstProcessor).start();
+        prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
+        prepRequestProcessor.start();
+        firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
     }
 
     @Override
@@ -75,20 +79,45 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
 
     @Override
     public void createSessionTracker() {
-        sessionTracker = new SessionTrackerImpl(this, getZKDatabase().getSessionWithTimeOuts(),
-                tickTime, self.getId());
-    }
-    
-    @Override
-    protected void startSessionTracker() {
-        ((SessionTrackerImpl)sessionTracker).start();
+        sessionTracker = new LeaderSessionTracker(
+                this, getZKDatabase().getSessionWithTimeOuts(),
+                tickTime, self.getId(), self.areLocalSessionsEnabled());
     }
 
-
     public boolean touch(long sess, int to) {
         return sessionTracker.touchSession(sess, to);
     }
 
+    public boolean checkIfValidGlobalSession(long sess, int to) {
+        if (self.areLocalSessionsEnabled() &&
+            !upgradeableSessionTracker.isGlobalSession(sess)) {
+            return false;
+        }
+        return sessionTracker.touchSession(sess, to);
+    }
+
+    /**
+     * Requests coming from the learner should go directly to
+     * PrepRequestProcessor
+     *
+     * @param request
+     */
+    public void submitLearnerRequest(Request request) {
+        /*
+         * Requests coming from the learner should have gone through
+         * submitRequest() on each server which already perform some request
+         * validation, so we don't need to do it again.
+         *
+         * Additionally, LearnerHandler should start submitting requests into
+         * the leader's pipeline only when the leader's server is started, so we
+         * can submit the request directly into PrepRequestProcessor.
+         *
+         * This is done so that requests from learners won't go through
+         * LeaderRequestProcessor which perform local session upgrade.
+         */
+        prepRequestProcessor.processRequest(request);
+    }
+
     @Override
     protected void registerJMX() {
         // register with JMX

+ 3 - 4
src/java/main/org/apache/zookeeper/server/quorum/Learner.java

@@ -30,6 +30,7 @@ import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -536,8 +537,7 @@ public class Learner {
         DataInputStream dis = new DataInputStream(bis);
         long sessionId = dis.readLong();
         boolean valid = dis.readBoolean();
-        ServerCnxn cnxn = pendingRevalidations
-        .remove(sessionId);
+        ServerCnxn cnxn = pendingRevalidations.remove(sessionId);
         if (cnxn == null) {
             LOG.warn("Missing session 0x"
                     + Long.toHexString(sessionId)
@@ -557,8 +557,7 @@ public class Learner {
         // Send back the ping with our session data
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
-        HashMap<Long, Integer> touchTable = zk
-                .getTouchSnapshot();
+        Map<Long, Integer> touchTable = zk.getTouchSnapshot();
         for (Entry<Long, Integer> entry : touchTable.entrySet()) {
             dos.writeLong(entry.getKey());
             dos.writeInt(entry.getValue());

+ 2 - 2
src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java

@@ -525,7 +525,7 @@ public class LearnerHandler extends Thread {
                     ByteArrayOutputStream bos = new ByteArrayOutputStream();
                     DataOutputStream dos = new DataOutputStream(bos);
                     dos.writeLong(id);
-                    boolean valid = leader.zk.touch(id, to);
+                    boolean valid = leader.zk.checkIfValidGlobalSession(id, to);
                     if (valid) {
                         try {
                             //set the session owner
@@ -559,7 +559,7 @@ public class LearnerHandler extends Thread {
                         si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
                     }
                     si.setOwner(this);
-                    leader.zk.submitRequest(si);
+                    leader.zk.submitLearnerRequest(si);
                     break;
                 default:
                 }

+ 160 - 40
src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java

@@ -15,82 +15,202 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.zookeeper.server.quorum;
 
 import java.io.PrintWriter;
-import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.zookeeper.server.SessionTracker;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.KeeperException.SessionMovedException;
+import org.apache.zookeeper.KeeperException.UnknownSessionException;
 import org.apache.zookeeper.server.SessionTrackerImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * This is really just a shell of a SessionTracker that tracks session activity
- * to be forwarded to the Leader using a PING.
+ * The learner session tracker is used by learners (followers and observers) to
+ * track zookeeper sessions which may or may not be echoed to the leader.  When
+ * a new session is created it is saved locally in a wrapped
+ * LocalSessionTracker.  It can subsequently be upgraded to a global session
+ * as required.  If an upgrade is requested the session is removed from local
+ * collections while keeping the same session ID.  It is up to the caller to
+ * queue a session creation request for the leader.
+ * A secondary function of the learner session tracker is to remember sessions
+ * which have been touched in this service.  This information is passed along
+ * to the leader with a ping.
  */
-public class LearnerSessionTracker implements SessionTracker {
-    SessionExpirer expirer;
+public class LearnerSessionTracker extends UpgradeableSessionTracker {
+    private static final Logger LOG = LoggerFactory.getLogger(LearnerSessionTracker.class);
+
+    private final SessionExpirer expirer;
+    // Touch table for the global sessions
+    private final AtomicReference<Map<Long, Integer>> touchTable =
+        new AtomicReference<Map<Long, Integer>>();
+    private final long serverId;
+    private final AtomicLong nextSessionId = new AtomicLong();
 
-    HashMap<Long, Integer> touchTable = new HashMap<Long, Integer>();
-    long serverId = 1;
-    long nextSessionId=0;
-    
-    private ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
+    private final boolean localSessionsEnabled;
+    private final ConcurrentMap<Long, Integer> globalSessionsWithTimeouts;
 
     public LearnerSessionTracker(SessionExpirer expirer,
-            ConcurrentHashMap<Long, Integer> sessionsWithTimeouts, long id) {
+            ConcurrentMap<Long, Integer> sessionsWithTimeouts,
+            int tickTime, long id, boolean localSessionsEnabled) {
         this.expirer = expirer;
-        this.sessionsWithTimeouts = sessionsWithTimeouts;
+        this.touchTable.set(new ConcurrentHashMap<Long, Integer>());
+        this.globalSessionsWithTimeouts = sessionsWithTimeouts;
         this.serverId = id;
-        nextSessionId = SessionTrackerImpl.initializeNextSession(this.serverId);
-        
+        nextSessionId.set(SessionTrackerImpl.initializeNextSession(serverId));
+
+        this.localSessionsEnabled = localSessionsEnabled;
+        if (this.localSessionsEnabled) {
+            createLocalSessionTracker(expirer, tickTime, id);
+        }
     }
 
-    synchronized public void removeSession(long sessionId) {
-        sessionsWithTimeouts.remove(sessionId);
-        touchTable.remove(sessionId);
+    public void removeSession(long sessionId) {
+        if (localSessionTracker != null) {
+            localSessionTracker.removeSession(sessionId);
+        }
+        globalSessionsWithTimeouts.remove(sessionId);
+        touchTable.get().remove(sessionId);
+    }
+
+    public void start() {
+        if (localSessionTracker != null) {
+            localSessionTracker.start();
+        }
     }
 
     public void shutdown() {
+        if (localSessionTracker != null) {
+            localSessionTracker.shutdown();
+        }
     }
 
-    synchronized public void addSession(long sessionId, int sessionTimeout) {
-        sessionsWithTimeouts.put(sessionId, sessionTimeout);
-        touchTable.put(sessionId, sessionTimeout);
+    public boolean isGlobalSession(long sessionId) {
+        return globalSessionsWithTimeouts.containsKey(sessionId);
     }
 
-    synchronized public boolean touchSession(long sessionId, int sessionTimeout) {
-        touchTable.put(sessionId, sessionTimeout);
-        return true;
+    public boolean addGlobalSession(long sessionId, int sessionTimeout) {
+        boolean added =
+            globalSessionsWithTimeouts.put(sessionId, sessionTimeout) == null;
+        if (localSessionsEnabled && added) {
+            // Only do extra logging so we know what kind of session this is
+            // if we're supporting both kinds of sessions
+            LOG.info("Adding global session 0x" + Long.toHexString(sessionId));
+        }
+        touchTable.get().put(sessionId, sessionTimeout);
+        return added;
     }
 
-    synchronized HashMap<Long, Integer> snapshot() {
-        HashMap<Long, Integer> oldTouchTable = touchTable;
-        touchTable = new HashMap<Long, Integer>();
-        return oldTouchTable;
+    public boolean addSession(long sessionId, int sessionTimeout) {
+        boolean added;
+        if (localSessionsEnabled && !isGlobalSession(sessionId)) {
+            added = localSessionTracker.addSession(sessionId, sessionTimeout);
+            // Check for race condition with session upgrading
+            if (isGlobalSession(sessionId)) {
+                added = false;
+                localSessionTracker.removeSession(sessionId);
+            } else if (added) {
+                LOG.info("Adding local session 0x"
+                         + Long.toHexString(sessionId));
+            }
+        } else {
+            added = addGlobalSession(sessionId, sessionTimeout);
+        }
+        return added;
     }
 
+    public boolean touchSession(long sessionId, int sessionTimeout) {
+        if (localSessionsEnabled) {
+            if (localSessionTracker.touchSession(sessionId, sessionTimeout)) {
+                return true;
+            }
+            if (!isGlobalSession(sessionId)) {
+                return false;
+            }
+        }
+        touchTable.get().put(sessionId, sessionTimeout);
+        return true;
+    }
 
-    synchronized public long createSession(int sessionTimeout) {
-        return (nextSessionId++);
+    public Map<Long, Integer> snapshot() {
+        return touchTable.getAndSet(new ConcurrentHashMap<Long, Integer>());
     }
 
-    public void checkSession(long sessionId, Object owner)  {
-        // Nothing to do here. Sessions are checked at the Leader
+    public long createSession(int sessionTimeout) {
+        if (localSessionsEnabled) {
+            return localSessionTracker.createSession(sessionTimeout);
+        }
+        return nextSessionId.getAndIncrement();
     }
-    
-    public void setOwner(long sessionId, Object owner) {
-        // Nothing to do here. Sessions are checked at the Leader
+
+    public void checkSession(long sessionId, Object owner)
+            throws SessionExpiredException, SessionMovedException  {
+        if (localSessionTracker != null) {
+            try {
+                localSessionTracker.checkSession(sessionId, owner);
+                return;
+            } catch (UnknownSessionException e) {
+                // Check whether it's a global session. We can ignore those
+                // because they are handled at the leader, but if not, rethrow.
+                // We check local session status first to avoid race condition
+                // with session upgrading.
+                if (!isGlobalSession(sessionId)) {
+                    throw new SessionExpiredException();
+                }
+            }
+        }
+    }
+
+    public void setOwner(long sessionId, Object owner)
+            throws SessionExpiredException {
+        if (localSessionTracker != null) {
+            try {
+                localSessionTracker.setOwner(sessionId, owner);
+                return;
+            } catch (SessionExpiredException e) {
+                // Check whether it's a global session. We can ignore those
+                // because they are handled at the leader, but if not, rethrow.
+                // We check local session status first to avoid race condition
+                // with session upgrading.
+                if (!isGlobalSession(sessionId)) {
+                    throw e;
+                }
+            }
+        }
     }
 
     public void dumpSessions(PrintWriter pwriter) {
-    	// the original class didn't have tostring impl, so just
-    	// dup what we had before
-    	pwriter.println(toString());
+        if (localSessionTracker != null) {
+            pwriter.print("Local ");
+            localSessionTracker.dumpSessions(pwriter);
+        }
+        pwriter.print("Global Sessions(");
+        pwriter.print(globalSessionsWithTimeouts.size());
+        pwriter.println("):");
+        SortedSet<Long> sessionIds = new TreeSet<Long>(
+                globalSessionsWithTimeouts.keySet());
+        for (long sessionId : sessionIds) {
+            pwriter.print("0x");
+            pwriter.print(Long.toHexString(sessionId));
+            pwriter.print("\t");
+            pwriter.print(globalSessionsWithTimeouts.get(sessionId));
+            pwriter.println("ms");
+        }
     }
 
     public void setSessionClosing(long sessionId) {
-        // Nothing to do here.
+        // Global sessions handled on the leader; this call is a no-op if
+        // not tracked as a local session so safe to call in both cases.
+        if (localSessionTracker != null) {
+            localSessionTracker.setSessionClosing(sessionId);
+        }
     }
 }

+ 29 - 22
src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java

@@ -18,19 +18,23 @@
 package org.apache.zookeeper.server.quorum;
 
 import java.io.IOException;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.Map;
 
 import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
 import org.apache.zookeeper.server.DataTreeBean;
+import org.apache.zookeeper.server.quorum.LearnerSessionTracker;
 import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServerBean;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 
 /**
- * Parent class for all ZooKeeperServers for Learners 
+ * Parent class for all ZooKeeperServers for Learners
  */
-public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer {    
+public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer {
+
     public LearnerZooKeeperServer(FileTxnSnapLog logFactory, int tickTime,
             int minSessionTimeout, int maxSessionTimeout,
             ZKDatabase zkDb, QuorumPeer self)
@@ -42,47 +46,50 @@ public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer {
     /**
      * Abstract method to return the learner associated with this server.
      * Since the Learner may change under our feet (when QuorumPeer reassigns
-     * it) we can't simply take a reference here. Instead, we need the 
-     * subclasses to implement this.     
+     * it) we can't simply take a reference here. Instead, we need the
+     * subclasses to implement this.
      */
-    abstract public Learner getLearner();        
-    
+    abstract public Learner getLearner();
+
     /**
      * Returns the current state of the session tracker. This is only currently
      * used by a Learner to build a ping response packet.
-     * 
+     *
      */
-    protected HashMap<Long, Integer> getTouchSnapshot() {
+    protected Map<Long, Integer> getTouchSnapshot() {
         if (sessionTracker != null) {
             return ((LearnerSessionTracker) sessionTracker).snapshot();
         }
-        return new HashMap<Long, Integer>();
+        Map<Long, Integer> map = Collections.emptyMap();
+        return map;
     }
-    
+
     /**
      * Returns the id of the associated QuorumPeer, which will do for a unique
-     * id of this server. 
+     * id of this server.
      */
     @Override
     public long getServerId() {
         return self.getId();
-    }    
-    
+    }
+
     @Override
     public void createSessionTracker() {
-        sessionTracker = new LearnerSessionTracker(this, getZKDatabase().getSessionWithTimeOuts(),
-                self.getId());
+        sessionTracker = new LearnerSessionTracker(
+                this, getZKDatabase().getSessionWithTimeOuts(),
+                this.tickTime, self.getId(), self.areLocalSessionsEnabled());
     }
-    
-    @Override
-    protected void startSessionTracker() {}
-    
+
     @Override
     protected void revalidateSession(ServerCnxn cnxn, long sessionId,
             int sessionTimeout) throws IOException {
-        getLearner().validateSession(cnxn, sessionId, sessionTimeout);
+        if (upgradeableSessionTracker.isLocalSession(sessionId)) {
+            super.revalidateSession(cnxn, sessionId, sessionTimeout);
+        } else {
+            getLearner().validateSession(cnxn, sessionId, sessionTimeout);
+        }
     }
-    
+
     @Override
     protected void registerJMX() {
         // register with JMX

+ 45 - 0
src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java

@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.quorum;
+
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.zookeeper.server.SessionTrackerImpl;
+
+/**
+ * Local session tracker.
+ */
+public class LocalSessionTracker extends SessionTrackerImpl {
+    public LocalSessionTracker(SessionExpirer expirer,
+            ConcurrentMap<Long, Integer> sessionsWithTimeouts,
+            int tickTime, long id) {
+        super(expirer, sessionsWithTimeouts, tickTime, id);
+    }
+
+    public boolean isLocalSession(long sessionId) {
+        return isTrackingSession(sessionId);
+    }
+
+    public boolean isGlobalSession(long sessionId) {
+        return false;
+    }
+
+    public boolean addGlobalSession(long sessionId, int sessionTimeout) {
+        throw new UnsupportedOperationException();
+    }
+}

+ 26 - 2
src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java

@@ -18,15 +18,18 @@
 
 package org.apache.zookeeper.server.quorum;
 
+import java.io.IOException;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.server.RequestProcessor;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.txn.ErrorTxn;
 
 /**
  * This RequestProcessor forwards any requests that modify the state of the
@@ -91,12 +94,17 @@ public class ObserverRequestProcessor extends Thread implements
                 case OpCode.setData:
                 case OpCode.reconfig:
                 case OpCode.setACL:
-                case OpCode.createSession:
-                case OpCode.closeSession:
                 case OpCode.multi:
                 case OpCode.check:
                     zks.getObserver().request(request);
                     break;
+                case OpCode.createSession:
+                case OpCode.closeSession:
+                    // Don't forward local sessions to the leader.
+                    if (!request.isLocalSession()) {
+                        zks.getObserver().request(request);
+                    }
+                    break;
                 }
             }
         } catch (Exception e) {
@@ -110,6 +118,22 @@ public class ObserverRequestProcessor extends Thread implements
      */
     public void processRequest(Request request) {
         if (!finished) {
+            Request upgradeRequest = null;
+            try {
+                upgradeRequest = zks.checkUpgradeSession(request);
+            } catch (KeeperException ke) {
+                if (request.getHdr() != null) {
+                    request.getHdr().setType(OpCode.error);
+                    request.setTxn(new ErrorTxn(ke.code().intValue()));
+                }
+                request.setException(ke);
+                LOG.info("Error creating upgrade request",  ke);
+            } catch (IOException ie) {
+                LOG.error("Unexpected error in upgrade", ie);
+            }
+            if (upgradeRequest != null) {
+                queuedRequests.add(upgradeRequest);
+            }
             queuedRequests.add(request);
         }
     }

+ 2 - 2
src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java

@@ -68,10 +68,10 @@ public class ProposalRequestProcessor implements RequestProcessor {
          * call processRequest on the next processor.
          */
 
-        if(request instanceof LearnerSyncRequest){
+        if (request instanceof LearnerSyncRequest){
             zks.getLeader().processSync((LearnerSyncRequest)request);
         } else {
-                nextProcessor.processRequest(request);
+            nextProcessor.processRequest(request);
             if (request.getHdr() != null) {
                 // We need to sync and get consensus on any transactions
                 try {

+ 35 - 1
src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java

@@ -380,6 +380,18 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
      */
     protected int tickTime;
 
+    /**
+     * Whether learners in this quorum should create new sessions as local.
+     * False by default to preserve existing behavior.
+     */
+    protected boolean localSessionsEnabled = false;
+
+    /**
+     * Whether learners in this quorum should upgrade local sessions to
+     * global. Only matters if local sessions are enabled.
+     */
+    protected boolean localSessionsUpgradingEnabled = true;
+
     /**
      * Minimum number of milliseconds to allow for session timeout.
      * A value of -1 indicates unset, use default.
@@ -1142,6 +1154,28 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
         return fac.getMaxClientCnxnsPerHost();
     }
 
+    /** Whether local sessions are enabled */
+    public boolean areLocalSessionsEnabled() {
+        return localSessionsEnabled;
+    }
+
+    /** Whether to enable local sessions */
+    public void enableLocalSessions(boolean flag) {
+        LOG.info("Local sessions " + (flag ? "enabled" : "disabled"));
+        localSessionsEnabled = flag;
+    }
+
+    /** Whether local sessions are allowed to upgrade to global sessions */
+    public boolean isLocalSessionsUpgradingEnabled() {
+        return localSessionsUpgradingEnabled;
+    }
+
+    /** Whether to allow local sessions to upgrade to global sessions */
+    public void enableLocalSessionsUpgrading(boolean flag) {
+        LOG.info("Local session upgrading " + (flag ? "enabled" : "disabled"));
+        localSessionsUpgradingEnabled = flag;
+    }
+
     /** minimum session timeout in milliseconds */
     public int getMinSessionTimeout() {
         return minSessionTimeout == -1 ? tickTime * 2 : minSessionTimeout;
@@ -1158,7 +1192,7 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
         return maxSessionTimeout == -1 ? tickTime * 20 : maxSessionTimeout;
     }
 
-    /** minimum session timeout in milliseconds */
+    /** maximum session timeout in milliseconds */
     public void setMaxSessionTimeout(int max) {
         LOG.info("maxSessionTimeout set to " + max);
         this.maxSessionTimeout = max;

+ 13 - 3
src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java

@@ -62,6 +62,8 @@ public class QuorumPeerConfig {
     protected int minSessionTimeout = -1;
     /** defaults to -1 if not set explicitly */
     protected int maxSessionTimeout = -1;
+    protected boolean localSessionsEnabled = false;
+    protected boolean localSessionsUpgradingEnabled = false;
 
     protected int initLimit;
     protected int syncLimit;
@@ -196,6 +198,10 @@ public class QuorumPeerConfig {
                 dataLogDir = vff.create(value);
             } else if (key.equals("clientPort")) {
                 clientPort = Integer.parseInt(value);
+            } else if (key.equals("localSessionsEnabled")) {
+                localSessionsEnabled = Boolean.parseBoolean(value);
+            } else if (key.equals("localSessionsUpgradingEnabled")) {
+                localSessionsUpgradingEnabled = Boolean.parseBoolean(value);
             } else if (key.equals("clientPortAddress")) {
                 clientPortAddress = value.trim();
             } else if (key.equals("tickTime")) {
@@ -503,12 +509,16 @@ public class QuorumPeerConfig {
     public int getMaxClientCnxns() { return maxClientCnxns; }
     public int getMinSessionTimeout() { return minSessionTimeout; }
     public int getMaxSessionTimeout() { return maxSessionTimeout; }
+    public boolean areLocalSessionsEnabled() { return localSessionsEnabled; }
+    public boolean isLocalSessionsUpgradingEnabled() {
+        return localSessionsUpgradingEnabled;
+    }
 
     public int getInitLimit() { return initLimit; }
     public int getSyncLimit() { return syncLimit; }
     public int getElectionAlg() { return electionAlg; }
-    public int getElectionPort() { return electionPort; }    
-    
+    public int getElectionPort() { return electionPort; }
+
     public int getSnapRetainCount() {
         return snapRetainCount;
     }
@@ -521,7 +531,7 @@ public class QuorumPeerConfig {
         return syncEnabled;
     }
 
-    public QuorumVerifier getQuorumVerifier() {   
+    public QuorumVerifier getQuorumVerifier() {
         return quorumVerifier;
     }
     

+ 7 - 4
src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java

@@ -110,7 +110,7 @@ public class QuorumPeerMain {
                 .getDataDir(), config.getDataLogDir(), config
                 .getSnapRetainCount(), config.getPurgeInterval());
         purgeMgr.start();
-        
+
         if (args.length == 1 && config.isDistributed()) {
             runFromConfig(config);
         } else {
@@ -127,17 +127,20 @@ public class QuorumPeerMain {
       } catch (JMException e) {
           LOG.warn("Unable to register log4j JMX control", e);
       }
-  
+
       LOG.info("Starting quorum peer");
       try {
           ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
           cnxnFactory.configure(config.getClientPortAddress(),
                                 config.getMaxClientCnxns());
-  
-          quorumPeer = new QuorumPeer();          
+
+          quorumPeer = new QuorumPeer();
           quorumPeer.setTxnFactory(new FileTxnSnapLog(
                       config.getDataLogDir(),
                       config.getDataDir()));
+          quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
+          quorumPeer.enableLocalSessionsUpgrading(
+              config.isLocalSessionsUpgradingEnabled());
           //quorumPeer.setQuorumPeers(config.getAllMembers());
           quorumPeer.setElectionType(config.getElectionAlg());
           quorumPeer.setMyid(config.getServerId());

+ 99 - 0
src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java

@@ -17,8 +17,16 @@
  */
 package org.apache.zookeeper.server.quorum;
 
+import java.io.IOException;
 import java.io.PrintWriter;
+import java.nio.ByteBuffer;
 
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.server.ByteBufferInputStream;
+import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
@@ -28,7 +36,9 @@ import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
  * a quorum.
  */
 public abstract class QuorumZooKeeperServer extends ZooKeeperServer {
+
     public final QuorumPeer self;
+    protected UpgradeableSessionTracker upgradeableSessionTracker;
 
     protected QuorumZooKeeperServer(FileTxnSnapLog logFactory, int tickTime,
             int minSessionTimeout, int maxSessionTimeout,
@@ -38,6 +48,95 @@ public abstract class QuorumZooKeeperServer extends ZooKeeperServer {
         this.self = self;
     }
 
+    @Override
+    protected void startSessionTracker() {
+        upgradeableSessionTracker = (UpgradeableSessionTracker) sessionTracker;
+        upgradeableSessionTracker.start();
+    }
+
+    public Request checkUpgradeSession(Request request)
+            throws IOException, KeeperException {
+        // If this is a request for a local session and it is to
+        // create an ephemeral node, then upgrade the session and return
+        // a new session request for the leader.
+        // This is called by the request processor thread (either follower
+        // or observer request processor), which is unique to a learner.
+        // So will not be called concurrently by two threads.
+        if (request.type != OpCode.create ||
+            !upgradeableSessionTracker.isLocalSession(request.sessionId)) {
+            return null;
+        }
+        CreateRequest createRequest = new CreateRequest();
+        request.request.rewind();
+        ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
+        request.request.rewind();
+        CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
+        if (!createMode.isEphemeral()) {
+            return null;
+        }
+        // Uh oh.  We need to upgrade before we can proceed.
+        if (!self.isLocalSessionsUpgradingEnabled()) {
+            throw new KeeperException.EphemeralOnLocalSessionException();
+        }
+
+        return makeUpgradeRequest(request.sessionId);
+    }
+
+    private Request makeUpgradeRequest(long sessionId) {
+        // Make sure to atomically check local session status, upgrade
+        // session, and make the session creation request.  This is to
+        // avoid another thread upgrading the session in parallel.
+        synchronized (upgradeableSessionTracker) {
+            if (upgradeableSessionTracker.isLocalSession(sessionId)) {
+                int timeout = upgradeableSessionTracker.upgradeSession(sessionId);
+                ByteBuffer to = ByteBuffer.allocate(4);
+                to.putInt(timeout);
+                return new Request(
+                        null, sessionId, 0, OpCode.createSession, to, null);
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Implements the SessionUpgrader interface,
+     *
+     * @param sessionId
+     */
+    public void upgrade(long sessionId) {
+        Request request = makeUpgradeRequest(sessionId);
+        if (request != null) {
+            LOG.info("Upgrading session 0x" + Long.toHexString(sessionId));
+            // This must be a global request
+            submitRequest(request);
+        }
+    }
+
+    @Override
+    protected void setLocalSessionFlag(Request si) {
+        // We need to set isLocalSession to tree for these type of request
+        // so that the request processor can process them correctly.
+        switch (si.type) {
+        case OpCode.createSession:
+            if (self.areLocalSessionsEnabled()) {
+                // All new sessions local by default.
+                si.setLocalSession(true);
+            }
+            break;
+        case OpCode.closeSession:
+            String reqType = "global";
+            if (upgradeableSessionTracker.isLocalSession(si.sessionId)) {
+                si.setLocalSession(true);
+                reqType = "local";
+            }
+            LOG.info("Submitting " + reqType + " closeSession request"
+                    + " for session 0x" + Long.toHexString(si.sessionId));
+            break;
+        default:
+            break;
+        }
+    }
+
     @Override
     public void dumpConf(PrintWriter pwriter) {
         super.dumpConf(pwriter);

+ 27 - 3
src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java

@@ -18,6 +18,8 @@
 
 package org.apache.zookeeper.server.quorum;
 
+import java.io.PrintWriter;
+
 import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.server.DataTreeBean;
 import org.apache.zookeeper.server.FinalRequestProcessor;
@@ -36,11 +38,16 @@ import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
  * The very first processor in the chain of request processors is a
  * ReadOnlyRequestProcessor which drops state-changing requests.
  */
-public class ReadOnlyZooKeeperServer extends QuorumZooKeeperServer {
+public class ReadOnlyZooKeeperServer extends ZooKeeperServer {
 
+    protected final QuorumPeer self;
     private volatile boolean shutdown = false;
-    ReadOnlyZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) {
-        super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, zkDb, self);
+
+    ReadOnlyZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self,
+                            ZKDatabase zkDb) {
+        super(logFactory, self.tickTime, self.minSessionTimeout,
+              self.maxSessionTimeout, zkDb);
+        this.self = self;
     }
 
     @Override
@@ -141,4 +148,21 @@ public class ReadOnlyZooKeeperServer extends QuorumZooKeeperServer {
         super.shutdown();
     }
 
+    @Override
+    public void dumpConf(PrintWriter pwriter) {
+        super.dumpConf(pwriter);
+
+        pwriter.print("initLimit=");
+        pwriter.println(self.getInitLimit());
+        pwriter.print("syncLimit=");
+        pwriter.println(self.getSyncLimit());
+        pwriter.print("electionAlg=");
+        pwriter.println(self.getElectionType());
+        pwriter.print("electionPort=");
+        pwriter.println(self.getElectionAddress().getPort());
+        pwriter.print("quorumPort=");
+        pwriter.println(self.getQuorumAddress().getPort());
+        pwriter.print("peerType=");
+        pwriter.println(self.getLearnerType().ordinal());
+    }
 }

+ 89 - 0
src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java

@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.quorum;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.server.SessionTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A session tracker that supports upgradeable local sessions.
+ */
+public abstract class UpgradeableSessionTracker implements SessionTracker {
+    private static final Logger LOG = LoggerFactory.getLogger(UpgradeableSessionTracker.class);
+
+    private ConcurrentMap<Long, Integer> localSessionsWithTimeouts;
+    protected LocalSessionTracker localSessionTracker;
+
+    public void start() {}
+
+    public void createLocalSessionTracker(SessionExpirer expirer,
+            int tickTime, long id) {
+        this.localSessionsWithTimeouts =
+            new ConcurrentHashMap<Long, Integer>();
+        this.localSessionTracker = new LocalSessionTracker(
+            expirer, this.localSessionsWithTimeouts, tickTime, id);
+    }
+
+    public boolean isTrackingSession(long sessionId) {
+        return isLocalSession(sessionId) || isGlobalSession(sessionId);
+    }
+
+    public boolean isLocalSession(long sessionId) {
+        return localSessionTracker != null &&
+            localSessionTracker.isTrackingSession(sessionId);
+    }
+
+    abstract public boolean isGlobalSession(long sessionId);
+
+    /**
+     * Upgrades the session to a global session.
+     * This simply removes the session from the local tracker and marks
+     * it as global.  It is up to the caller to actually
+     * queue up a transaction for the session.
+     *
+     * @param sessionId
+     * @return session timeout (-1 if not a local session)
+     */
+    public int upgradeSession(long sessionId) {
+        if (localSessionsWithTimeouts == null) {
+            return -1;
+        }
+        // We won't race another upgrade attempt because only one thread
+        // will get the timeout from the map
+        Integer timeout = localSessionsWithTimeouts.remove(sessionId);
+        if (timeout != null) {
+            LOG.info("Upgrading session 0x" + Long.toHexString(sessionId));
+            // Add as global before removing as local
+            addGlobalSession(sessionId, timeout);
+            localSessionTracker.removeSession(sessionId);
+            return timeout;
+        }
+        return -1;
+    }
+
+    public void checkGlobalSession(long sessionId, Object owner)
+            throws KeeperException.SessionExpiredException,
+            KeeperException.SessionMovedException {
+        throw new UnsupportedOperationException();
+    }
+}

+ 25 - 7
src/java/test/org/apache/zookeeper/server/PrepRequestProcessorTest.java

@@ -76,15 +76,19 @@ public class PrepRequestProcessorTest extends ClientBase {
     
     private class MySessionTracker implements SessionTracker {
         @Override
-        public void addSession(long id, int to) {
+        public boolean addGlobalSession(long id, int to) {
             // TODO Auto-generated method stub
-            
+            return false;
+        }
+        @Override
+        public boolean addSession(long id, int to) {
+            // TODO Auto-generated method stub
+            return false;
         }
         @Override
         public void checkSession(long sessionId, Object owner)
                 throws SessionExpiredException, SessionMovedException {
             // TODO Auto-generated method stub
-            
         }
         @Override
         public long createSession(int sessionTimeout) {
@@ -94,23 +98,27 @@ public class PrepRequestProcessorTest extends ClientBase {
         @Override
         public void dumpSessions(PrintWriter pwriter) {
             // TODO Auto-generated method stub
-            
+
         }
          @Override
         public void removeSession(long sessionId) {
             // TODO Auto-generated method stub
-            
+
+        }
+        public int upgradeSession(long sessionId) {
+             // TODO Auto-generated method stub
+             return 0;
         }
         @Override
         public void setOwner(long id, Object owner)
                 throws SessionExpiredException {
             // TODO Auto-generated method stub
-            
+
         }
         @Override
         public void shutdown() {
             // TODO Auto-generated method stub
-            
+
         }
         @Override
         public boolean touchSession(long sessionId, int sessionTimeout) {
@@ -121,5 +129,15 @@ public class PrepRequestProcessorTest extends ClientBase {
         public void setSessionClosing(long sessionId) {
           // TODO Auto-generated method stub
         }
+        @Override
+        public boolean isTrackingSession(long sessionId) {
+            // TODO Auto-generated method stub
+            return false;
+        }
+        @Override
+        public void checkGlobalSession(long sessionId, Object owner)
+                throws SessionExpiredException, SessionMovedException {
+            // TODO Auto-generated method stub
+        }
     }
 }

+ 15 - 7
src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java

@@ -113,7 +113,6 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
     @Test
     public void testEarlyLeaderAbandonment() throws Exception {
         ClientBase.setupTestEnv();
-
         final int SERVER_COUNT = 3;
         final int clientPorts[] = new int[SERVER_COUNT];
         StringBuilder sb = new StringBuilder();
@@ -143,10 +142,12 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
 
         for (int i = 0; i < SERVER_COUNT; i++) {
             mt[i].start();
-        }
+            // Recreate a client session since the previous session was not persisted.
+            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+         }
+
+        waitForAll(zk, States.CONNECTED);
 
-        waitForAll(zk, States.CONNECTED);          
-                          
 
         // ok lets find the leader and kill everything else, we have a few
         // seconds, so it should be plenty of time
@@ -182,6 +183,8 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
         }
         for (int i = 0; i < SERVER_COUNT; i++) {
             if (i != leader) {
+                // Recreate a client session since the previous session was not persisted.
+                zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
                 waitForOne(zk[i], States.CONNECTED);
                 zk[i].create("/zk" + i, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
             }
@@ -306,24 +309,29 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
     }
 
     private void waitForOne(ZooKeeper zk, States state) throws InterruptedException {
+        int iterations = ClientBase.CONNECTION_TIMEOUT / 500;
         while (zk.getState() != state) {
+            if (iterations-- == 0) {
+                throw new RuntimeException("Waiting too long");
+            }
             Thread.sleep(500);
         }
     }
 
     private void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException {
-        int iterations = 10;
+        int iterations = ClientBase.CONNECTION_TIMEOUT / 1000;
         boolean someoneNotConnected = true;
-        while (someoneNotConnected) {           
+        while (someoneNotConnected) {
             if (iterations-- == 0) {
                 ClientBase.logAllStackTraces();
                 throw new RuntimeException("Waiting too long");
             }
 
             someoneNotConnected = false;
-            for (ZooKeeper zk : zks) {                
+            for (ZooKeeper zk : zks) {
                 if (zk.getState() != state) {
                     someoneNotConnected = true;
+                    break;
                 }
             }
             Thread.sleep(1000);

+ 113 - 0
src/java/test/org/apache/zookeeper/test/DuplicateLocalSessionUpgradeTest.java

@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * When request are route incorrectly, both follower and the leader will perform
+ * local session upgrade. So we saw CreateSession twice in txnlog This doesn't
+ * affect the correctness but cause the ensemble to see more load than
+ * necessary.
+ */
+public class DuplicateLocalSessionUpgradeTest extends ZKTestCase {
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(DuplicateLocalSessionUpgradeTest.class);
+
+    private final QuorumBase qb = new QuorumBase();
+
+    private static final int CONNECTION_TIMEOUT = ClientBase.CONNECTION_TIMEOUT;
+
+    @Before
+    public void setUp() throws Exception {
+        LOG.info("STARTING quorum " + getClass().getName());
+        qb.localSessionsEnabled = true;
+        qb.localSessionsUpgradingEnabled = true;
+        qb.setUp();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        LOG.info("STOPPING quorum " + getClass().getName());
+        qb.tearDown();
+    }
+
+    @Test
+    public void testLocalSessionUpgradeOnFollower() throws Exception {
+        testLocalSessionUpgrade(false);
+    }
+
+    @Test
+    public void testLocalSessionUpgradeOnLeader() throws Exception {
+        testLocalSessionUpgrade(true);
+    }
+
+    private void testLocalSessionUpgrade(boolean testLeader) throws Exception {
+
+        int leaderIdx = qb.getLeaderIndex();
+        Assert.assertFalse("No leader in quorum?", leaderIdx == -1);
+        int followerIdx = (leaderIdx + 1) % 5;
+        int testPeerIdx = testLeader ? leaderIdx : followerIdx;
+        String hostPorts[] = qb.hostPort.split(",");
+
+        CountdownWatcher watcher = new CountdownWatcher();
+        ZooKeeper zk = qb.createClient(watcher, hostPorts[testPeerIdx],
+                CONNECTION_TIMEOUT);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+        final String firstPath = "/first";
+        final String secondPath = "/ephemeral";
+
+        // Just create some node so that we know the current zxid
+        zk.create(firstPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+
+        // Now, try an ephemeral node. This will trigger session upgrade
+        // so there will be createSession request inject into the pipeline
+        // prior to this request
+        zk.create(secondPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.EPHEMERAL);
+
+        Stat firstStat = zk.exists(firstPath, null);
+        Assert.assertNotNull(firstStat);
+
+        Stat secondStat = zk.exists(secondPath, null);
+        Assert.assertNotNull(secondStat);
+
+        long zxidDiff = secondStat.getCzxid() - firstStat.getCzxid();
+
+        // If there is only one createSession request in between, zxid diff
+        // will be exactly 2. The alternative way of checking is to actually
+        // read txnlog but this should be sufficient
+        Assert.assertEquals(2L, zxidDiff);
+
+    }
+}

+ 176 - 0
src/java/test/org/apache/zookeeper/test/LeaderSessionTrackerTest.java

@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Due to race condition or bad client code, the leader may get request from
+ * expired session. We need to make sure that we never allow ephmeral node
+ * to be created in those case, but we do allow normal node to be created.
+ */
+public class LeaderSessionTrackerTest extends ZKTestCase implements Watcher {
+
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(LeaderSessionTrackerTest.class);
+
+    QuorumUtil qu;
+
+    @Before
+    public void setUp() throws Exception {
+        qu = new QuorumUtil(1);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        qu.shutdownAll();
+    }
+
+    @Test
+    public void testExpiredSessionWithLocalSession() throws Exception {
+        testCreateEphemeral(true);
+    }
+
+    @Test
+    public void testExpiredSessionWithoutLocalSession() throws Exception {
+        testCreateEphemeral(false);
+    }
+
+    /**
+     * When we create ephemeral node, we need to check against global
+     * session, so the leader never accept request from an expired session
+     * (that we no longer track)
+     *
+     * This is not the same as SessionInvalidationTest since session
+     * is not in closing state
+     */
+    public void testCreateEphemeral(boolean localSessionEnabled) throws Exception {
+        QuorumUtil qu = new QuorumUtil(1);
+        if (localSessionEnabled) {
+            qu.enableLocalSession(true);
+        }
+        qu.startAll();
+
+        QuorumPeer leader = qu.getLeaderQuorumPeer();
+
+        ZooKeeper zk = new ZooKeeper(qu.getConnectString(leader),
+                CONNECTION_TIMEOUT, this);
+
+        CreateRequest createRequest = new CreateRequest("/impossible",
+                new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL.toFlag());
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+        createRequest.serialize(boa, "request");
+        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+
+        // Mimic sessionId generated by follower's local session tracker
+        long sid = qu.getFollowerQuorumPeers().get(0).getActiveServer()
+                .getServerId();
+        long fakeSessionId = (sid << 56) + 1;
+
+        LOG.info("Fake session Id: " + Long.toHexString(fakeSessionId));
+
+        Request request = new Request(null, fakeSessionId, 0, OpCode.create,
+                bb, new ArrayList<Id>());
+
+        // Submit request directly to leader
+        leader.getActiveServer().submitRequest(request);
+
+        // Make sure that previous request is finished
+        zk.create("/ok", new byte[0], Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+
+        Stat stat = zk.exists("/impossible", null);
+        Assert.assertEquals("Node from fake session get created", null, stat);
+
+    }
+
+    /**
+     * When local session is enabled, leader will allow persistent node
+     * to be create for unknown session
+     */
+    @Test
+    public void testCreatePersistent() throws Exception {
+        QuorumUtil qu = new QuorumUtil(1);
+        qu.enableLocalSession(true);
+        qu.startAll();
+
+        QuorumPeer leader = qu.getLeaderQuorumPeer();
+
+        ZooKeeper zk = new ZooKeeper(qu.getConnectString(leader),
+                CONNECTION_TIMEOUT, this);
+
+        CreateRequest createRequest = new CreateRequest("/success",
+                new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT.toFlag());
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+        createRequest.serialize(boa, "request");
+        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+
+        // Mimic sessionId generated by follower's local session tracker
+        long sid = qu.getFollowerQuorumPeers().get(0).getActiveServer()
+                .getServerId();
+        long locallSession = (sid << 56) + 1;
+
+        LOG.info("Local session Id: " + Long.toHexString(locallSession));
+
+        Request request = new Request(null, locallSession, 0, OpCode.create,
+                bb, new ArrayList<Id>());
+
+        // Submit request directly to leader
+        leader.getActiveServer().submitRequest(request);
+
+        // Make sure that previous request is finished
+        zk.create("/ok", new byte[0], Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+
+        Stat stat = zk.exists("/success", null);
+        Assert.assertTrue("Request from local sesson failed", stat != null);
+
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+    }
+
+}

+ 148 - 0
src/java/test/org/apache/zookeeper/test/LocalSessionRequestTest.java

@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.TraceFormatter;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.quorum.Leader.Proposal;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Validate that open/close session request of a local session to not propagate
+ * to other machines in the quorum. We verify this by checking that
+ * these request doesn't show up in committedLog on other machines.
+ */
+public class LocalSessionRequestTest extends ZKTestCase {
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(LocalSessionRequestTest.class);
+    // Need to be short since we need to wait for session to expire
+    public static final int CONNECTION_TIMEOUT = 4000;
+
+    private final QuorumBase qb = new QuorumBase();
+
+    @Before
+    public void setUp() throws Exception {
+        LOG.info("STARTING quorum " + getClass().getName());
+        qb.localSessionsEnabled = true;
+        qb.localSessionsUpgradingEnabled = true;
+        qb.setUp();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        LOG.info("STOPPING quorum " + getClass().getName());
+        qb.tearDown();
+    }
+
+    @Test
+    public void testLocalSessionsOnFollower() throws Exception {
+        testOpenCloseSession(false);
+    }
+
+    @Test
+    public void testLocalSessionsOnLeader() throws Exception {
+        testOpenCloseSession(true);
+    }
+
+    /**
+     * Walk through the target peer commmittedLog.
+     * @param sessionId
+     * @param peerId
+     */
+    private void validateRequestLog(long sessionId, int peerId) {
+        String session = Long.toHexString(sessionId);
+        LOG.info("Searching for txn of session 0x " + session +
+                " on peer " + peerId);
+        String peerType = peerId == qb.getLeaderIndex() ? "leader" : "follower";
+        QuorumPeer peer = qb.getPeerList().get(peerId);
+        ZKDatabase db = peer.getActiveServer().getZKDatabase();
+        for (Proposal p : db.getCommittedLog()) {
+            Assert.assertFalse("Should not see " +
+                               TraceFormatter.op2String(p.request.type) +
+                               " request from local session 0x" + session +
+                               " on the " + peerType,
+                               p.request.sessionId == sessionId);
+        }
+    }
+
+    /**
+     * Test that a CloseSession request generated by both the server (client
+     * disconnect) or by the client (client explicitly issue close()) doesn't
+     * get committed by the ensemble
+     */
+    public void testOpenCloseSession(boolean onLeader) throws Exception {
+        int leaderIdx = qb.getLeaderIndex();
+        Assert.assertFalse("No leader in quorum?", leaderIdx == -1);
+        int followerIdx = (leaderIdx + 1) % 5;
+        int testPeerIdx = onLeader ? leaderIdx : followerIdx;
+        int verifyPeerIdx = onLeader ? followerIdx : leaderIdx;
+
+        String hostPorts[] = qb.hostPort.split(",");
+
+        CountdownWatcher watcher = new CountdownWatcher();
+        DisconnectableZooKeeper client = new DisconnectableZooKeeper(
+                hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+        long localSessionId1 = client.getSessionId();
+
+        // Cut the connection, so the server will create closeSession as part
+        // of expiring the session.
+        client.dontReconnect();
+        client.disconnect();
+        watcher.reset();
+
+        // We don't validate right away, will do another session create first
+
+        ZooKeeper zk = qb.createClient(watcher, hostPorts[testPeerIdx],
+                CONNECTION_TIMEOUT);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+        long localSessionId2 = zk.getSessionId();
+
+        // Send closeSession request.
+        zk.close();
+        watcher.reset();
+
+        // This should be enough time for the first session to expire and for
+        // the closeSession request to propagate to other machines (if there is a bug)
+        // Since it is time sensitive, we have false negative when test
+        // machine is under load
+        Thread.sleep(CONNECTION_TIMEOUT * 2);
+
+        // Validate that we don't see any txn from the first session
+        validateRequestLog(localSessionId1, verifyPeerIdx);
+
+        // Validate that we don't see any txn from the second session
+        validateRequestLog(localSessionId2, verifyPeerIdx);
+
+        qb.shutdownServers();
+
+    }
+}

+ 137 - 0
src/java/test/org/apache/zookeeper/test/LocalSessionsOnlyTest.java

@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.util.HashMap;
+import java.util.Map.Entry;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests learners configured to use local sessions only. Expected
+ * behavior is that sessions created on the learner will never be
+ * made global.  Operations requiring a global session (e.g.
+ * creation of ephemeral nodes) will fail with an error.
+ */
+public class LocalSessionsOnlyTest extends ZKTestCase {
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(LocalSessionsOnlyTest.class);
+    public static final int CONNECTION_TIMEOUT = ClientBase.CONNECTION_TIMEOUT;
+
+    private final QuorumBase qb = new QuorumBase();
+
+    @Before
+    public void setUp() throws Exception {
+        LOG.info("STARTING quorum " + getClass().getName());
+        qb.localSessionsEnabled = true;
+        qb.localSessionsUpgradingEnabled = false;
+        qb.setUp();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        LOG.info("STOPPING quorum " + getClass().getName());
+        qb.tearDown();
+    }
+
+    @Test
+    public void testLocalSessionsOnFollower() throws Exception {
+        testLocalSessions(false);
+    }
+
+    @Test
+    public void testLocalSessionsOnLeader() throws Exception {
+        testLocalSessions(true);
+    }
+
+    private void testLocalSessions(boolean testLeader) throws Exception {
+        String nodePrefix = "/testLocalSessions-"
+            + (testLeader ? "leaderTest-" : "followerTest-");
+        int leaderIdx = qb.getLeaderIndex();
+        Assert.assertFalse("No leader in quorum?", leaderIdx == -1);
+        int followerIdx = (leaderIdx + 1) % 5;
+        int testPeerIdx = testLeader ? leaderIdx : followerIdx;
+        String hostPorts[] = qb.hostPort.split(",");
+
+        CountdownWatcher watcher = new CountdownWatcher();
+        ZooKeeper zk = qb.createClient(watcher, hostPorts[testPeerIdx],
+                                       CONNECTION_TIMEOUT);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+        long localSessionId = zk.getSessionId();
+
+        // Try creating some data.
+        for (int i = 0; i < 5; i++) {
+            zk.create(nodePrefix + i, new byte[0],
+                      ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        }
+
+        // Now, try an ephemeral node.  This should fail since we
+        // cannot create ephemeral nodes on a local session.
+        try {
+            zk.create(nodePrefix + "ephemeral", new byte[0],
+                      ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+            Assert.fail("Ephemeral node creation should fail.");
+        } catch (KeeperException.EphemeralOnLocalSessionException e) {
+        }
+
+        // Close the session.
+        zk.close();
+
+        // Validate data on both follower and leader
+        HashMap<String, Integer> peers = new HashMap<String, Integer>();
+        peers.put("leader", leaderIdx);
+        peers.put("follower", followerIdx);
+        for (Entry<String, Integer> entry: peers.entrySet()) {
+            watcher.reset();
+            // Try reconnecting with a new session.
+            // The data should be persisted, even though the session was not.
+            zk = qb.createClient(watcher, hostPorts[entry.getValue()],
+                                 CONNECTION_TIMEOUT);
+            watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+            long newSessionId = zk.getSessionId();
+            Assert.assertFalse(newSessionId == localSessionId);
+
+            for (int i = 0; i < 5; i++) {
+                Assert.assertNotNull("Data not exists in " + entry.getKey(),
+                        zk.exists(nodePrefix + i, null));
+            }
+
+            // We may get the correct exception but the txn may go through
+            Assert.assertNull("Data exists in " + entry.getKey(),
+                    zk.exists(nodePrefix + "ephemeral", null));
+
+            zk.close();
+        }
+        qb.shutdownServers();
+    }
+}

+ 48 - 5
src/java/test/org/apache/zookeeper/test/QuorumBase.java

@@ -21,6 +21,7 @@ package org.apache.zookeeper.test;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.Set;
@@ -33,6 +34,7 @@ import org.apache.zookeeper.server.quorum.Election;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
 import org.apache.zookeeper.server.util.OSMXBean;
 import org.junit.Assert;
 import org.junit.Test;
@@ -60,7 +62,10 @@ public class QuorumBase extends ClientBase {
     protected int portClient3;
     protected int portClient4;
     protected int portClient5;
-    
+
+    protected boolean localSessionsEnabled = false;
+    protected boolean localSessionsUpgradingEnabled = false;
+
     @Test
     // This just avoids complaints by junit
     public void testNull() {
@@ -188,6 +193,17 @@ public class QuorumBase extends ClientBase {
         LOG.info("QuorumPeer 4 voting view: " + s4.getVotingView());
         LOG.info("QuorumPeer 5 voting view: " + s5.getVotingView());       
         
+        s1.enableLocalSessions(localSessionsEnabled);
+        s2.enableLocalSessions(localSessionsEnabled);
+        s3.enableLocalSessions(localSessionsEnabled);
+        s4.enableLocalSessions(localSessionsEnabled);
+        s5.enableLocalSessions(localSessionsEnabled);
+        s1.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled);
+        s2.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled);
+        s3.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled);
+        s4.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled);
+        s5.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled);
+
         LOG.info("start QuorumPeer 1");
         s1.start();
         LOG.info("start QuorumPeer 2");
@@ -230,9 +246,33 @@ public class QuorumBase extends ClientBase {
         }
         JMXEnv.ensureAll(ensureNames.toArray(new String[ensureNames.size()]));
     }
-    
-    
-    public void setupServers() throws IOException {        
+
+    public int getLeaderIndex() {
+      if (s1.getPeerState() == ServerState.LEADING) {
+        return 0;
+      } else if (s2.getPeerState() == ServerState.LEADING) {
+        return 1;
+      } else if (s3.getPeerState() == ServerState.LEADING) {
+        return 2;
+      } else if (s4.getPeerState() == ServerState.LEADING) {
+        return 3;
+      } else if (s5.getPeerState() == ServerState.LEADING) {
+        return 4;
+      }
+      return -1;
+    }
+
+    public ArrayList<QuorumPeer> getPeerList() {
+        ArrayList<QuorumPeer> peers = new ArrayList<QuorumPeer>();
+        peers.add(s1);
+        peers.add(s2);
+        peers.add(s3);
+        peers.add(s4);
+        peers.add(s5);
+        return peers;
+    }
+
+    public void setupServers() throws IOException {
         setupServer(1);
         setupServer(2);
         setupServer(3);
@@ -303,7 +343,7 @@ public class QuorumBase extends ClientBase {
             Assert.assertEquals(portClient5, s5.getClientPort());
         }
     }
-    
+
     @Override
     public void tearDown() throws Exception {
         LOG.info("TearDown started");
@@ -334,6 +374,9 @@ public class QuorumBase extends ClientBase {
     }
 
     public static void shutdown(QuorumPeer qp) {
+        if (qp == null) {
+            return;
+        }
         try {
             LOG.info("Shutting down quorum peer " + qp.getName());
             qp.shutdown();

+ 7 - 9
src/java/test/org/apache/zookeeper/test/QuorumTest.java

@@ -304,22 +304,20 @@ public class QuorumTest extends ZKTestCase {
         while(qu.getPeer(index).peer.leader == null)
             index++;
 
-        ZooKeeper zk = new ZooKeeper(
-                "127.0.0.1:" + qu.getPeer((index == 1)?2:1).peer.getClientPort(),
-                ClientBase.CONNECTION_TIMEOUT, watcher);
-        watcher.waitForConnected(CONNECTION_TIMEOUT);
-
         // break the quorum
         qu.shutdown(index);
-
-        // Wait until we disconnect to proceed
-        watcher.waitForDisconnected(CONNECTION_TIMEOUT);
         
         // try to reestablish the quorum
         qu.start(index);
+        
+        // Connect the client after services are restarted (otherwise we would get
+        // SessionExpiredException as the previous local session was not persisted).
+        ZooKeeper zk = new ZooKeeper(
+                "127.0.0.1:" + qu.getPeer((index == 1)?2:1).peer.getClientPort(),
+                ClientBase.CONNECTION_TIMEOUT, watcher);
 
         try{
-            watcher.waitForConnected(30000);      
+            watcher.waitForConnected(CONNECTION_TIMEOUT);      
         } catch(TimeoutException e) {
             Assert.fail("client could not connect to reestablished quorum: giving up after 30+ seconds.");
         }

+ 43 - 2
src/java/test/org/apache/zookeeper/test/QuorumUtil.java

@@ -21,13 +21,14 @@ package org.apache.zookeeper.test;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.server.quorum.Election;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
@@ -35,6 +36,8 @@ import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.util.OSMXBean;
 import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Utility for quorum testing. Setups 2n+1 peers and allows to start/stop all
@@ -73,6 +76,8 @@ public class QuorumUtil {
 
     private int electionAlg;
 
+    private boolean localSessionEnabled;
+
     /**
      * Initializes 2n+1 quorum peers which will form a ZooKeeper ensemble.
      *
@@ -129,6 +134,11 @@ public class QuorumUtil {
     // This was added to avoid running into the problem of ZOOKEEPER-1539
     public boolean disableJMXTest = false;
     
+
+    public void enableLocalSession(boolean localSessionEnabled) {
+        this.localSessionEnabled = localSessionEnabled;
+    }
+
     public void startAll() throws IOException {
         shutdownAll();
         for (int i = 1; i <= ALL; ++i) {
@@ -191,6 +201,9 @@ public class QuorumUtil {
         LOG.info("Creating QuorumPeer " + ps.id + "; public port " + ps.clientPort);
         ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg,
                 ps.id, tickTime, initLimit, syncLimit);
+        if (localSessionEnabled) {
+            ps.peer.enableLocalSessions(true);
+        }
         Assert.assertEquals(ps.clientPort, ps.peer.getClientPort());
 
         ps.peer.start();
@@ -207,6 +220,9 @@ public class QuorumUtil {
         LOG.info("Creating QuorumPeer " + ps.id + "; public port " + ps.clientPort);
         ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg,
                 ps.id, tickTime, initLimit, syncLimit);
+        if (localSessionEnabled) {
+            ps.peer.enableLocalSessions(true);
+        }
         Assert.assertEquals(ps.clientPort, ps.peer.getClientPort());
 
         ps.peer.start();
@@ -252,6 +268,31 @@ public class QuorumUtil {
         return hostPort;
     }
 
+    public String getConnectString(QuorumPeer peer) {
+        return "127.0.0.1:" + peer.getClientPort();
+    }
+
+    public QuorumPeer getLeaderQuorumPeer() {
+        for (PeerStruct ps: peers.values()) {
+            if (ps.peer.leader != null) {
+               return ps.peer;
+            }
+        }
+        throw new RuntimeException("Unable to find a leader peer");
+    }
+
+    public List<QuorumPeer> getFollowerQuorumPeers() {
+        List<QuorumPeer> peerList = new ArrayList<QuorumPeer>(ALL - 1); 
+
+        for (PeerStruct ps: peers.values()) {
+            if (ps.peer.leader == null) {
+               peerList.add(ps.peer);      
+            }
+        }
+
+        return Collections.unmodifiableList(peerList);
+    }
+
     public void tearDown() throws Exception {
         LOG.info("TearDown started");
 

+ 22 - 1
src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java

@@ -123,6 +123,12 @@ public class ReadOnlyModeTest extends ZKTestCase {
 
         watcher.reset();
         qu.shutdown(2);
+        zk.close();
+
+        // Re-connect the client (in case we were connected to the shut down
+        // server and the local session was not persisted).
+        zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT,
+                watcher, true);
         watcher.waitForConnected(CONNECTION_TIMEOUT);
 
         // read operation during r/o mode
@@ -140,6 +146,13 @@ public class ReadOnlyModeTest extends ZKTestCase {
         qu.start(2);
         Assert.assertTrue("waiting for server up", ClientBase.waitForServerUp(
                 "127.0.0.1:" + qu.getPeer(2).clientPort, CONNECTION_TIMEOUT));
+        zk.close();
+        watcher.reset();
+
+        // Re-connect the client (in case we were connected to the shut down
+        // server and the local session was not persisted).
+        zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT,
+                watcher, true);
         watcher.waitForConnected(CONNECTION_TIMEOUT);
         zk.setData(node, "We're in the quorum now".getBytes(), -1);
 
@@ -175,6 +188,15 @@ public class ReadOnlyModeTest extends ZKTestCase {
         // kill peer and wait no more than 5 seconds for read-only server
         // to be started (which should take one tickTime (2 seconds))
         qu.shutdown(2);
+
+        // Re-connect the client (in case we were connected to the shut down
+        // server and the local session was not persisted).
+        zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT,
+                new Watcher() {
+                    public void process(WatchedEvent event) {
+                        states.add(event.getState());
+                    }
+                }, true);
         long start = System.currentTimeMillis();
         while (!(zk.getState() == States.CONNECTEDREADONLY)) {
             Thread.sleep(200);
@@ -228,7 +250,6 @@ public class ReadOnlyModeTest extends ZKTestCase {
     @SuppressWarnings("deprecation")
     @Test(timeout = 90000)
     public void testSeekForRwServer() throws Exception {
-
         // setup the logger to capture all logs
         Layout layout = Logger.getRootLogger().getAppender("CONSOLE")
                 .getLayout();

+ 218 - 0
src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java

@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.SessionTracker.Session;
+import org.apache.zookeeper.server.SessionTracker.SessionExpirer;
+import org.apache.zookeeper.server.quorum.LeaderSessionTracker;
+import org.apache.zookeeper.server.quorum.LearnerSessionTracker;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Validate various type of sessions against leader session tracker and learner
+ * session tracker
+ */
+public class SessionTrackerCheckTest extends ZKTestCase {
+
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(SessionTrackerCheckTest.class);
+    public static final int TICK_TIME = 1000;
+    public static final int CONNECTION_TIMEOUT = TICK_TIME * 10;
+
+    private ConcurrentHashMap<Long, Integer> sessionsWithTimeouts =
+            new ConcurrentHashMap<Long, Integer>();
+
+    private class Expirer implements SessionExpirer {
+        long sid;
+
+        public Expirer(long sid) {
+            this.sid = sid;
+        }
+
+        public void expire(Session session) {
+        }
+
+        public long getServerId() {
+            return sid;
+        }
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        sessionsWithTimeouts.clear();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+    }
+
+    @Test
+    public void testLearnerSessionTracker() throws Exception {
+        Expirer expirer = new Expirer(1);
+        // With local session on
+        LearnerSessionTracker tracker = new LearnerSessionTracker(expirer,
+                sessionsWithTimeouts, TICK_TIME, expirer.sid, true);
+
+        // Unknown session
+        long sessionId = 0xb100ded;
+        try {
+            tracker.checkSession(sessionId, null);
+            Assert.fail("Unknown session should have failed");
+        } catch (SessionExpiredException e) {
+            // Get expected exception
+        }
+
+        // Global session
+        sessionsWithTimeouts.put(sessionId, CONNECTION_TIMEOUT);
+        try {
+            tracker.checkSession(sessionId, null);
+        } catch (Exception e) {
+            Assert.fail("Global session should not fail");
+        }
+
+        // Local session
+        sessionId = 0xf005ba11;
+        tracker.addSession(sessionId, CONNECTION_TIMEOUT);
+        try {
+            tracker.checkSession(sessionId, null);
+        } catch (Exception e) {
+            Assert.fail("Local session should not fail");
+        }
+
+        // During session upgrade
+        sessionsWithTimeouts.put(sessionId, CONNECTION_TIMEOUT);
+        try {
+            tracker.checkSession(sessionId, null);
+        } catch (Exception e) {
+            Assert.fail("Session during upgrade should not fail");
+        }
+
+        // With local session off
+        tracker = new LearnerSessionTracker(expirer, sessionsWithTimeouts,
+                TICK_TIME, expirer.sid, false);
+
+        // Should be noop
+        sessionId = 0xdeadbeef;
+        try {
+            tracker.checkSession(sessionId, null);
+        } catch (Exception e) {
+            Assert.fail("Should not get any exception");
+        }
+
+    }
+
+    @Test
+    public void testLeaderSessionTracker() throws Exception {
+        Expirer expirer = new Expirer(2);
+        // With local session on
+        LeaderSessionTracker tracker = new LeaderSessionTracker(expirer,
+                sessionsWithTimeouts, TICK_TIME, expirer.sid, true);
+
+        // Local session from other server
+        long sessionId = ((expirer.sid + 1) << 56) + 1;
+        try {
+            tracker.checkSession(sessionId, null);
+        } catch (Exception e) {
+            Assert.fail("local session from other server should not fail");
+        }
+
+        // Global session
+        tracker.addGlobalSession(sessionId, CONNECTION_TIMEOUT);
+        try {
+            tracker.checkSession(sessionId, null);
+        } catch (Exception e) {
+            Assert.fail("Global session should not fail");
+        }
+        try {
+            tracker.checkGlobalSession(sessionId, null);
+        } catch (Exception e) {
+            Assert.fail("Global session should not fail " + e);
+        }
+
+        // Local session from the leader
+        sessionId = (expirer.sid << 56) + 1;
+        ;
+        tracker.addSession(sessionId, CONNECTION_TIMEOUT);
+        try {
+            tracker.checkSession(sessionId, null);
+        } catch (Exception e) {
+            Assert.fail("Local session on the leader should not fail");
+        }
+
+        // During session upgrade
+        tracker.addGlobalSession(sessionId, CONNECTION_TIMEOUT);
+        try {
+            tracker.checkSession(sessionId, null);
+        } catch (Exception e) {
+            Assert.fail("Session during upgrade should not fail");
+        }
+        try {
+            tracker.checkGlobalSession(sessionId, null);
+        } catch (Exception e) {
+            Assert.fail("Global session should not fail " + e);
+        }
+
+        // With local session off
+        tracker = new LeaderSessionTracker(expirer, sessionsWithTimeouts,
+                TICK_TIME, expirer.sid, false);
+
+        // Global session
+        sessionId = 0xdeadbeef;
+        tracker.addSession(sessionId, CONNECTION_TIMEOUT);
+        try {
+            tracker.checkSession(sessionId, null);
+        } catch (Exception e) {
+            Assert.fail("Global session should not fail");
+        }
+        try {
+            tracker.checkGlobalSession(sessionId, null);
+        } catch (Exception e) {
+            Assert.fail("Global session should not fail");
+        }
+
+        // Local session from other server
+        sessionId = ((expirer.sid + 1) << 56) + 2;
+        try {
+            tracker.checkSession(sessionId, null);
+            Assert.fail("local session from other server should fail");
+        } catch (SessionExpiredException e) {
+            // Got expected exception
+        }
+
+        // Local session from the leader
+        sessionId = ((expirer.sid) << 56) + 2;
+        try {
+            tracker.checkSession(sessionId, null);
+            Assert.fail("local session from the leader should fail");
+        } catch (SessionExpiredException e) {
+            // Got expected exception
+        }
+
+    }
+
+}

+ 232 - 0
src/java/test/org/apache/zookeeper/test/SessionUpgradeTest.java

@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests that session upgrade works from local to global sessions.
+ * Expected behavior is that if global-only sessions are unset,
+ * and no upgrade interval is specified, then sessions will be
+ * created locally to the host.  They will be upgraded to global
+ * sessions iff an operation is done on that session which requires
+ * persistence, i.e. creating an ephemeral node.
+ */
+public class SessionUpgradeTest extends ZKTestCase {
+    protected static final Logger LOG = LoggerFactory.getLogger(SessionUpgradeTest.class);
+    public static final int CONNECTION_TIMEOUT = ClientBase.CONNECTION_TIMEOUT;
+
+    private final QuorumBase qb = new QuorumBase();
+
+    @Before
+    public void setUp() throws Exception {
+        LOG.info("STARTING quorum " + getClass().getName());
+        qb.localSessionsEnabled = true;
+        qb.localSessionsUpgradingEnabled = true;
+        qb.setUp();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        LOG.info("STOPPING quorum " + getClass().getName());
+        qb.tearDown();
+    }
+
+    @Test
+    public void testLocalSessionsWithoutEphemeralOnFollower() throws Exception {
+        testLocalSessionsWithoutEphemeral(false);
+    }
+
+    @Test
+    public void testLocalSessionsWithoutEphemeralOnLeader() throws Exception {
+        testLocalSessionsWithoutEphemeral(true);
+    }
+
+    private void testLocalSessionsWithoutEphemeral(boolean testLeader)
+            throws Exception {
+        String nodePrefix = "/testLocalSessions-"
+            + (testLeader ? "leaderTest-" : "followerTest-");
+        int leaderIdx = qb.getLeaderIndex();
+        Assert.assertFalse("No leader in quorum?", leaderIdx == -1);
+        int followerIdx = (leaderIdx + 1) % 5;
+        int otherFollowerIdx = (leaderIdx + 2) % 5;
+        int testPeerIdx = testLeader ? leaderIdx : followerIdx;
+        String hostPorts[] = qb.hostPort.split(",");
+        CountdownWatcher watcher = new CountdownWatcher();
+        DisconnectableZooKeeper zk = new DisconnectableZooKeeper(
+                hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+        // Try creating some data.
+        for (int i = 0; i < 5; i++) {
+            zk.create(nodePrefix + i, new byte[0],
+                      ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        }
+
+        long localSessionId = zk.getSessionId();
+        byte[] localSessionPwd = zk.getSessionPasswd().clone();
+
+        // Try connecting with the same session id on a different
+        // server.  This should fail since it is a local sesion.
+        try {
+            watcher.reset();
+            DisconnectableZooKeeper zknew = new DisconnectableZooKeeper(
+                    hostPorts[otherFollowerIdx], CONNECTION_TIMEOUT, watcher,
+                    localSessionId, localSessionPwd);
+
+            zknew.create(nodePrefix + "5", new byte[0],
+                         ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            Assert.fail("Connection on the same session ID should fail.");
+        } catch (KeeperException.SessionExpiredException e) {
+        } catch (KeeperException.ConnectionLossException e) {
+        }
+
+        // If we're testing a follower, also check the session id on the
+        // leader. This should also fail
+        if (!testLeader) {
+            try {
+                watcher.reset();
+                DisconnectableZooKeeper zknew = new DisconnectableZooKeeper(
+                        hostPorts[leaderIdx], CONNECTION_TIMEOUT,
+                        watcher, localSessionId, localSessionPwd);
+
+                zknew.create(nodePrefix + "5", new byte[0],
+                             ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                             CreateMode.PERSISTENT);
+                Assert.fail("Connection on the same session ID should fail.");
+            } catch (KeeperException.SessionExpiredException e) {
+            } catch (KeeperException.ConnectionLossException e) {
+            }
+        }
+
+        // However, we should be able to disconnect and reconnect to the same
+        // server with the same session id (as long as we do it quickly
+        // before expiration).
+        zk.disconnect();
+
+        watcher.reset();
+        zk = new DisconnectableZooKeeper(
+                hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher,
+                localSessionId, localSessionPwd);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+        zk.create(nodePrefix + "6", new byte[0],
+                  ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        // If we explicitly close the session, then the session id should no
+        // longer be valid.
+        zk.close();
+        try {
+            watcher.reset();
+            zk = new DisconnectableZooKeeper(
+                    hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher,
+                    localSessionId, localSessionPwd);
+
+            zk.create(nodePrefix + "7", new byte[0],
+                      ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            Assert.fail("Reconnecting to a closed session ID should fail.");
+        } catch (KeeperException.SessionExpiredException e) {
+        }
+    }
+
+    @Test
+    public void testUpgradeWithEphemeralOnFollower() throws Exception {
+        testUpgradeWithEphemeral(false);
+    }
+
+    @Test
+    public void testUpgradeWithEphemeralOnLeader() throws Exception {
+        testUpgradeWithEphemeral(true);
+    }
+
+    private void testUpgradeWithEphemeral(boolean testLeader)
+            throws Exception {
+        String nodePrefix = "/testUpgrade-"
+            + (testLeader ? "leaderTest-" : "followerTest-");
+        int leaderIdx = qb.getLeaderIndex();
+        Assert.assertFalse("No leader in quorum?", leaderIdx == -1);
+        int followerIdx = (leaderIdx + 1) % 5;
+        int otherFollowerIdx = (leaderIdx + 2) % 5;
+        int testPeerIdx = testLeader ? leaderIdx : followerIdx;
+        String hostPorts[] = qb.hostPort.split(",");
+
+        CountdownWatcher watcher = new CountdownWatcher();
+        DisconnectableZooKeeper zk = new DisconnectableZooKeeper(
+                hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+        // Create some ephemeral nodes.  This should force the session to
+        // be propagated to the other servers in the ensemble.
+        for (int i = 0; i < 5; i++) {
+            zk.create(nodePrefix + i, new byte[0],
+                      ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        }
+
+        // We should be able to reconnect with the same session id on a
+        // different server, since it has been propagated.
+        long localSessionId = zk.getSessionId();
+        byte[] localSessionPwd = zk.getSessionPasswd().clone();
+
+        zk.disconnect();
+        watcher.reset();
+        zk = new DisconnectableZooKeeper(
+                hostPorts[otherFollowerIdx], CONNECTION_TIMEOUT, watcher,
+                localSessionId, localSessionPwd);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+        // The created ephemeral nodes are still around.
+        for (int i = 0; i < 5; i++) {
+            Assert.assertNotNull(zk.exists(nodePrefix + i, null));
+        }
+
+        // When we explicitly close the session, we should not be able to
+        // reconnect with the same session id
+        zk.close();
+
+        try {
+            watcher.reset();
+            zk = new DisconnectableZooKeeper(
+                    hostPorts[otherFollowerIdx], CONNECTION_TIMEOUT, watcher,
+                    localSessionId, localSessionPwd);
+            zk.exists(nodePrefix + "0", null);
+            Assert.fail("Reconnecting to a closed session ID should fail.");
+        } catch (KeeperException.SessionExpiredException e) {
+        }
+
+        watcher.reset();
+        // And the ephemeral nodes will be gone since the session died.
+        zk = new DisconnectableZooKeeper(
+                hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+        for (int i = 0; i < 5; i++) {
+            Assert.assertNull(zk.exists(nodePrefix + i, null));
+        }
+    }
+}