Browse Source

ZOOKEEPER-938. Support Kerberos authentication of clients. (Eugene Koontz via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1159432 13f79535-47bb-0310-9956-ffa450edef68
Mahadev Konar 13 years ago
parent
commit
f650291795
23 changed files with 2450 additions and 300 deletions
  1. 3 0
      CHANGES.txt
  2. 47 2
      src/java/main/org/apache/zookeeper/ClientCnxn.java
  3. 313 0
      src/java/main/org/apache/zookeeper/Login.java
  4. 475 0
      src/java/main/org/apache/zookeeper/Shell.java
  5. 7 0
      src/java/main/org/apache/zookeeper/Watcher.java
  6. 2 0
      src/java/main/org/apache/zookeeper/ZooDefs.java
  7. 348 0
      src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java
  8. 4 1
      src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
  9. 19 0
      src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java
  10. 3 0
      src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
  11. 19 1
      src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
  12. 2 0
      src/java/main/org/apache/zookeeper/server/ServerCnxn.java
  13. 7 3
      src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java
  14. 131 0
      src/java/main/org/apache/zookeeper/server/ZooKeeperSaslServer.java
  15. 56 4
      src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
  16. 61 0
      src/java/main/org/apache/zookeeper/server/auth/DigestLoginModule.java
  17. 418 0
      src/java/main/org/apache/zookeeper/server/auth/KerberosName.java
  18. 70 0
      src/java/main/org/apache/zookeeper/server/auth/SASLAuthenticationProvider.java
  19. 160 0
      src/java/main/org/apache/zookeeper/server/auth/SaslServerCallbackHandler.java
  20. 0 289
      src/java/test/org/apache/zookeeper/test/CnxManagerTest.java.orig
  21. 123 0
      src/java/test/org/apache/zookeeper/test/SaslAuthFailTest.java
  22. 173 0
      src/java/test/org/apache/zookeeper/test/SaslAuthTest.java
  23. 9 0
      src/zookeeper.jute

+ 3 - 0
CHANGES.txt

@@ -424,6 +424,9 @@ NEW FEATURES:
   ZOOKEEPER-784. Server-side functionality for read-only mode (Sergey Doroshenko via henryr)
 
   ZOOKEEPER-992. MT Native Version of Windows C Client (Dheeraj Agrawal via michim)
+ 
+  ZOOKEEPER-938. Support Kerberos authentication of clients. (Eugene Koontz
+  via mahadev)
 
 Release 3.3.0 - 2010-03-24
 

+ 47 - 2
src/java/main/org/apache/zookeeper/ClientCnxn.java

@@ -53,6 +53,7 @@ import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.ZooKeeper.States;
 import org.apache.zookeeper.ZooKeeper.WatchRegistration;
 import org.apache.zookeeper.client.HostProvider;
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
 import org.apache.zookeeper.proto.AuthPacket;
 import org.apache.zookeeper.proto.ConnectRequest;
 import org.apache.zookeeper.proto.CreateResponse;
@@ -65,11 +66,15 @@ import org.apache.zookeeper.proto.ReplyHeader;
 import org.apache.zookeeper.proto.RequestHeader;
 import org.apache.zookeeper.proto.SetACLResponse;
 import org.apache.zookeeper.proto.SetDataResponse;
+import org.apache.zookeeper.proto.SetSASLResponse;
 import org.apache.zookeeper.proto.SetWatches;
 import org.apache.zookeeper.proto.WatcherEvent;
 import org.apache.zookeeper.server.ByteBufferInputStream;
 import org.apache.zookeeper.server.ZooTrace;
 
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.SaslException;
+
 /**
  * This class manages the socket i/o for the client. ClientCnxn maintains a list
  * of available servers to connect to and "transparently" switches servers it is
@@ -182,6 +187,9 @@ public class ClientCnxn {
      */
     volatile boolean seenRwServerBefore = false;
 
+
+    public ZooKeeperSaslClient zooKeeperSaslClient;
+
     public long getSessionId() {
         return sessionId;
     }
@@ -368,6 +376,13 @@ public class ClientCnxn {
 
         sendThread = new SendThread(clientCnxnSocket);
         eventThread = new EventThread();
+
+    }
+
+    // used by ZooKeeperSaslClient.queueSaslPacket().
+    public void queuePacket(RequestHeader h, ReplyHeader r, Record request,
+            Record response, AsyncCallback cb) {
+        queuePacket(h,r,request,response, cb, null, null, this, null);
     }
 
     /**
@@ -379,7 +394,7 @@ public class ClientCnxn {
     }
     /**
      * tests use this to set the auto reset
-     * @param b the vaued to set disable watches to
+     * @param b the value to set disable watches to
      */
     public static void setDisableAutoResetWatch(boolean b) {
         disableAutoWatchReset = b;
@@ -537,6 +552,11 @@ public class ClientCnxn {
                       } else {
                           cb.processResult(rc, clientPath, p.ctx, null);
                       }
+                  } else if (p.cb instanceof ZooKeeperSaslClient.ServerSaslResponseCallback) {
+                      ZooKeeperSaslClient.ServerSaslResponseCallback cb = (ZooKeeperSaslClient.ServerSaslResponseCallback) p.cb;
+                      SetSASLResponse rsp = (SetSASLResponse) p.response;
+                      // TODO : check rc (== 0, etc) as with other packet types.
+                      cb.processResult(rc,null,p.ctx,rsp.getToken(),null);
                   } else if (p.response instanceof GetDataResponse) {
                       DataCallback cb = (DataCallback) p.cb;
                       GetDataResponse rsp = (GetDataResponse) p.response;
@@ -890,6 +910,17 @@ public class ClientCnxn {
             setName(getName().replaceAll("\\(.*\\)",
                     "(" + addr.getHostName() + ":" + addr.getPort() + ")"));
 
+            if (System.getProperty("java.security.auth.login.config") != null) {
+                try {
+                    zooKeeperSaslClient = new ZooKeeperSaslClient(ClientCnxn.this, "zookeeper"+"/"+ addr.getHostName());
+                }
+                catch (LoginException e) {
+                    LOG.warn("Zookeeper client cannot authenticate using the Client section of the supplied "
+                      + "configuration file: '" + System.getProperty("java.security.auth.login.config")
+                      + "'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper "
+                      + "server allows it.");
+                }
+            }
             clientCnxnSocket.connect(addr);
         }
 
@@ -913,8 +944,22 @@ public class ClientCnxn {
                         startConnect();
                         clientCnxnSocket.updateLastSendAndHeard();
                     }
-                   
+
                     if (state.isConnected()) {
+                        if ((zooKeeperSaslClient != null) && (zooKeeperSaslClient.isComplete() != true)) {
+                            try {
+                                zooKeeperSaslClient.initialize();
+                            }
+                            catch (SaslException e) {
+                                LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
+                                state = States.AUTH_FAILED;
+                            }
+                            if (zooKeeperSaslClient.readyToSendSaslAuthEvent()) {
+                                eventThread.queueEvent(new WatchedEvent(
+                                  Watcher.Event.EventType.None,
+                                  Watcher.Event.KeeperState.SaslAuthenticated, null));
+                            }
+                        }
                         to = readTimeout - clientCnxnSocket.getIdleRecv();
                     } else {
                         to = connectTimeout - clientCnxnSocket.getIdleRecv();

+ 313 - 0
src/java/main/org/apache/zookeeper/Login.java

@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+/** 
+ * This class is responsible for refreshing Kerberos credentials for
+ * logins for both Zookeeper client and server.
+ * See ZooKeeperSaslServer for server-side usage.
+ * See ZooKeeperSaslClient for client-side usage.
+ */
+
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import javax.security.auth.callback.CallbackHandler;
+
+import org.apache.log4j.Logger;
+
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.Subject;
+import java.io.IOException;
+import java.util.Date;
+import java.util.Set;
+
+public class Login {
+    Logger LOG = Logger.getLogger(Login.class);
+    public CallbackHandler callbackHandler;
+
+    // LoginThread will sleep until 80% of time from last refresh to
+    // ticket's expiry has been reached, at which time it will wake
+    // and try to renew the ticket.
+    private static final float TICKET_RENEW_WINDOW = 0.80f;
+
+    // Regardless of TICKET_RENEW_WINDOW setting above and the ticket expiry time,
+    // thread will not sleep between refresh attempts any less than 1 minute (60*1000 milliseconds = 1 minute).
+    // Change the '1' to e.g. 5, to change this to 5 minutes.
+    private static final long MIN_TIME_BEFORE_RELOGIN = 1 * 60 * 1000L;
+
+    private Subject subject = null;
+    private Thread t = null;
+    private boolean isKrbTicket = false;
+    private boolean isUsingTicketCache = false;
+    /**
+     * LoginThread constructor. The constructor starts the thread used
+     * to periodically re-login to the Kerberos Ticket Granting Server.
+     * @param loginContextName
+     *               name of section in JAAS file that will be use to login.
+     *               Passed as first param to javax.security.auth.login.LoginContext().
+     *
+     * @param callbackHandler
+     *               Passed as second param to javax.security.auth.login.LoginContext().
+     * @throws javax.security.auth.login.LoginException
+     *               Thrown if authentication fails.
+     */
+    public Login(final String loginContextName, CallbackHandler callbackHandler)
+      throws LoginException {
+        this.callbackHandler = callbackHandler;
+        final LoginContext loginContext = login(loginContextName);
+        subject = loginContext.getSubject();
+        isKrbTicket = !subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
+        AppConfigurationEntry entries[] = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName);
+        for (AppConfigurationEntry entry: entries) {
+            if (entry.getOptions().get("useTicketCache") != null) {
+                String val = (String)entry.getOptions().get("useTicketCache");
+                if (val.equals("true")) {
+                    isUsingTicketCache = true;
+                }
+                break;
+            }
+        }
+        if (isKrbTicket && isUsingTicketCache) {
+            // Refresh the Ticket Granting Ticket (TGT) cache periodically. How often to refresh is determined by the
+            // TGT's existing expiry date and the configured MIN_TIME_BEFORE_RELOGIN. For testing and development,
+            // you can decrease the interval of expiration of tickets (for example, to 3 minutes) by running :
+            //  "modprinc -maxlife 3mins <principal>" in kadmin.
+            t = new Thread(new Runnable() {
+                public void run() {
+                    LOG.info("TGT refresh thread started.");
+                    while (true) {  // renewal thread's main loop. if it exits from here, thread will exit.
+                        KerberosTicket tgt = getTGT();
+                        long now = System.currentTimeMillis();
+                        long nextRefresh;
+                        Date nextRefreshDate;
+                        if (tgt == null) {
+                            nextRefresh = now + MIN_TIME_BEFORE_RELOGIN;
+                            nextRefreshDate = new Date(nextRefresh);
+                            LOG.warn("No TGT found: will try again at " + nextRefreshDate);
+                        }
+                        else {
+                            // determine how long to sleep from looking at ticket's expiry.
+                            // We must not allow the ticket to expire, but we should take into consideration
+                            // MIN_TIME_BEFORE_RELOGIN. Will not sleep less than MIN_TIME_BEFORE_RELOGIN, except when
+                            // unless it would cause ticket expiration.
+                            nextRefresh = getRefreshTime(tgt);
+                            long expiry = tgt.getEndTime().getTime();
+
+                            if ((nextRefresh > expiry) ||
+                              ((now + MIN_TIME_BEFORE_RELOGIN) > expiry)) {
+                                // expiry is before next scheduled refresh).
+                                LOG.info("refreshing now because expiry is before next scheduled refresh time.");
+                                nextRefresh = now;
+                            }
+                            else {
+                                if (nextRefresh < (now + MIN_TIME_BEFORE_RELOGIN)) {
+                                    // next scheduled refresh is sooner than (now + MIN_TIME_BEFORE_LOGIN).
+                                    Date until = new Date(nextRefresh);
+                                    Date newuntil = new Date(now + MIN_TIME_BEFORE_RELOGIN);
+                                    LOG.warn("TGT refresh thread time adjusted from : " + until + " to : " + newuntil + " since "
+                                      + "the former is sooner than the minimum refresh interval ("
+                                      + MIN_TIME_BEFORE_RELOGIN / 1000 + " seconds) from now.");
+                                }
+                                nextRefresh = Math.max(nextRefresh, now + MIN_TIME_BEFORE_RELOGIN);
+                            }
+                            nextRefreshDate = new Date(nextRefresh);
+                            if (nextRefresh > expiry) {
+                                Date expiryDate = new Date(expiry);
+                                LOG.error("next refresh: " + nextRefreshDate + " is later than expiry " + expiryDate
+                                  + ". This may indicated a clock skew problem. Check that this host and the KDC's "
+                                  + "hosts' clocks are in sync.");
+                                return;
+                            }
+                        }
+
+                        if (now < nextRefresh) {
+                            Date until = new Date(nextRefresh);
+                            LOG.info("TGT refresh thread sleeping until: " + until.toString());
+                            try {
+                                Thread.sleep(nextRefresh - now);
+                            }
+                            catch (InterruptedException ie) {
+                                LOG.warn("TGT renewal thread has been interrupted and will exit.");
+                                break;
+                            }
+                        }
+                        else {
+                            LOG.error("nextRefresh:" + nextRefreshDate + " is in the past: exiting refresh thread. Check"
+                              + " clock sync between this host and KDC - (KDC's clock is likely ahead of this host)."
+                              + " Manual intervention will be required for this client to successfully authenticate.");
+                            // TODO: if we have a keytab, we can use that to re-initialize and avoid the need for
+                            // manual intervention.
+                            return;
+                        }
+
+                        String cmd = "/usr/bin/kinit";
+                        if (System.getProperty("zookeeper.kinit") != null) {
+                            cmd = System.getProperty("zookeeper.kinit");
+                        }
+                        String kinitArgs = "-R";
+                        try {
+                            Shell.execCommand(cmd,kinitArgs);
+                        }
+                        catch (Shell.ExitCodeException e) {
+                            LOG.error("Could not renew TGT due to problem running shell command: '" + cmd
+                              + " " + kinitArgs + "'" + "; exception was:" + e + ". Will try shell command again at: "
+                              + nextRefreshDate);
+                        }
+                        catch (IOException e) {
+                            LOG.error("Could not renew TGT due to problem running shell command: '" + cmd
+                              + " " + kinitArgs + "'; exception was:" + e + ". Will try shell command again at: "
+                              + nextRefreshDate);
+                        }
+                        try {
+                            reloginFromTicketCache(loginContextName, loginContext);
+                            LOG.debug("renewed TGT successfully.");
+                        }
+                        catch (LoginException e) {
+                            LOG.error("Could not renew TGT due to LoginException: " + e + "."
+                              + " Will try again at: "
+                              + nextRefreshDate);
+                        }
+                    }
+                }
+            });
+            t.setDaemon(true);
+        }
+        else {
+            LOG.error("Not using Ticket Granting Ticket cache: will not start a TGT renewal thread.");
+        }
+    }
+
+    public void startThreadIfNeeded() {
+        // thread object 't' will be null if a refresh thread is not needed.
+        if (t != null) {
+            t.start();
+        }
+    }
+
+
+    private synchronized LoginContext login(final String loginContextName) throws LoginException {
+        if (loginContextName == null) {
+            throw new LoginException("loginContext name (JAAS file section header) was null. " +
+              "Please check your java.security.login.auth.config setting.");
+        }
+        LoginContext loginContext = new LoginContext(loginContextName,callbackHandler);
+        loginContext.login();
+        LOG.info("successfully logged in.");
+        return loginContext;
+    }
+
+    public Subject getSubject() {
+        return subject;
+    }
+
+    // c.f. org.apache.hadoop.security.UserGroupInformation.
+    private long getRefreshTime(KerberosTicket tgt) {
+        long start = tgt.getStartTime().getTime();
+        long expires = tgt.getEndTime().getTime();
+        LOG.info("TGT valid starting at: " + tgt.getStartTime().toString());
+        LOG.info("TGT expires: " + tgt.getEndTime().toString());
+        long proposedRefresh = start + (long) ((expires - start) * TICKET_RENEW_WINDOW);
+        if (proposedRefresh > expires) {
+            // proposedRefresh is too far in the future: it's after ticket expires: simply return now.
+            return System.currentTimeMillis();
+        }
+        else {
+            return proposedRefresh;
+        }
+    }
+
+    private synchronized KerberosTicket getTGT() {
+        Set<KerberosTicket> tickets = subject.getPrivateCredentials(KerberosTicket.class);
+        for(KerberosTicket ticket: tickets) {
+            KerberosPrincipal server = ticket.getServer();
+            if (server.getName().equals("krbtgt/" + server.getRealm() + "@" + server.getRealm())) {
+                LOG.debug("Found tgt " + ticket + ".");
+                return ticket;
+            }
+        }
+        return null;
+    }
+
+    // TODO : refactor this with login() to maximize code-sharing.
+    public synchronized void reloginFromTicketCache(final String loginContextName, LoginContext loginContext)
+        throws LoginException {
+        if (!(isKrbTicket && isUsingTicketCache)) {
+            return;
+        }
+        if (loginContext == null) {
+            throw new LoginException("login must be done first");
+        }
+        String principalName = getPrincipalName();
+        try {
+            LOG.info("Logging out " + principalName);
+            //clear up the Kerberos state. But the tokens are not cleared! As per
+            //the Java kerberos login module code, only the kerberos credentials
+            //are cleared.
+            loginContext.logout();
+            //login and also update the subject field of this instance to
+            //have the new credentials (pass it to the LoginContext constructor)
+            if (loginContextName == null) {
+                throw new LoginException("loginContext name (JAAS file section header) was null. " +
+                  "Please check your java.security.login.auth.config setting.");
+            }
+            if (subject == null) {
+                throw new LoginException("login subject was null.");
+            }
+            LOG.info("Logging in " + principalName);
+            loginContext.login();
+            if (principalName.equals("(no principal name)")) {
+                // try again to get the principal name, in case the ticket cache was manually refreshed.
+                principalName = getPrincipalName();
+            }
+            LOG.info("Login successful for " + principalName);
+        } catch (LoginException le) {
+            throw new LoginException("Login failure for " + principalName);
+        }
+    }
+
+    private String getPrincipalName() {
+        try {
+            return getSubject().getPrincipals(KerberosPrincipal.class).toArray()[0].toString();
+        }
+        catch (NullPointerException e) {
+            LOG.warn("could not display principal name because login was null or login's subject was null: returning '(no principal found)'.");
+        }
+        catch (ArrayIndexOutOfBoundsException e) {
+            LOG.warn("could not display principal name because login's subject had no principals: returning '(no principal found)'.");
+        }
+        return "(no principal found)";
+    }
+
+    public void shutdown() {
+        if ((t != null) && (t.isAlive())) {
+            t.interrupt();
+            try {
+                t.join();
+            }
+            catch (InterruptedException e) {
+                LOG.error("error while waiting for Login thread to shutdown: " + e);
+            }
+        }
+    }
+
+}
+

+ 475 - 0
src/java/main/org/apache/zookeeper/Shell.java

@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* This file copied from Hadoop's security branch,
+  * with the following changes:
+  * 1. package changed from org.apache.hadoop.util to
+  *    org.apache.zookeeper.
+  * 2. Usage of Hadoop's Configuration class removed since
+  *    it is not available in Zookeeper: instead, system properties
+  *    are used.
+  * 3. The deprecated getUlimitMemoryCommand() method removed since
+  *    it is not needed.
+  */
+
+
+package org.apache.zookeeper;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.log4j.Logger;
+
+/** 
+ * A base class for running a Unix command.
+ * 
+ * <code>Shell</code> can be used to run unix commands like <code>du</code> or
+ * <code>df</code>. It also offers facilities to gate commands by 
+ * time-intervals.
+ */
+abstract public class Shell {
+  
+  Logger LOG = Logger.getLogger(Shell.class);
+  
+  /** a Unix command to get the current user's name */
+  public final static String USER_NAME_COMMAND = "whoami";
+  /** a Unix command to get the current user's groups list */
+  public static String[] getGroupsCommand() {
+    return new String[]{"bash", "-c", "groups"};
+  }
+  /** a Unix command to get a given user's groups list */
+  public static String[] getGroupsForUserCommand(final String user) {
+    //'groups username' command return is non-consistent across different unixes
+    return new String [] {"bash", "-c", "id -Gn " + user};
+  }
+  /** a Unix command to set permission */
+  public static final String SET_PERMISSION_COMMAND = "chmod";
+  /** a Unix command to set owner */
+  public static final String SET_OWNER_COMMAND = "chown";
+  public static final String SET_GROUP_COMMAND = "chgrp";
+  /** Return a Unix command to get permission information. */
+  public static String[] getGET_PERMISSION_COMMAND() {
+    //force /bin/ls, except on windows.
+    return new String[] {(WINDOWS ? "ls" : "/bin/ls"), "-ld"};
+  }
+
+  /**Time after which the executing script would be timedout*/
+  protected long timeOutInterval = 0L;
+  /** If or not script timed out*/
+  private AtomicBoolean timedOut;
+
+  /** a Unix command to get ulimit of a process. */
+  public static final String ULIMIT_COMMAND = "ulimit";
+  
+  /** 
+   * Get the Unix command for setting the maximum virtual memory available
+   * to a given child process. This is only relevant when we are forking a
+   * process from within the Mapper or the Reducer implementations.
+   * Also see Hadoop Pipes and Hadoop Streaming.
+   * 
+   * It also checks to ensure that we are running on a *nix platform else 
+   * (e.g. in Cygwin/Windows) it returns <code>null</code>.
+   * @param memoryLimit virtual memory limit
+   * @return a <code>String[]</code> with the ulimit command arguments or 
+   *         <code>null</code> if we are running on a non *nix platform or
+   *         if the limit is unspecified.
+   */
+  public static String[] getUlimitMemoryCommand(int memoryLimit) {
+    // ulimit isn't supported on Windows
+    if (WINDOWS) {
+      return null;
+    }
+    
+    return new String[] {ULIMIT_COMMAND, "-v", String.valueOf(memoryLimit)};
+  }
+
+  /** Set to true on Windows platforms */
+  public static final boolean WINDOWS /* borrowed from Path.WINDOWS */
+                = System.getProperty("os.name").startsWith("Windows");
+  
+  private long    interval;   // refresh interval in msec
+  private long    lastTime;   // last time the command was performed
+  private Map<String, String> environment; // env for the command execution
+  private File dir;
+  private Process process; // sub process used to execute the command
+  private int exitCode;
+
+  /**If or not script finished executing*/
+  private volatile AtomicBoolean completed;
+  
+  public Shell() {
+    this(0L);
+  }
+  
+  /**
+   * @param interval the minimum duration to wait before re-executing the 
+   *        command.
+   */
+  public Shell( long interval ) {
+    this.interval = interval;
+    this.lastTime = (interval<0) ? 0 : -interval;
+  }
+  
+  /** set the environment for the command 
+   * @param env Mapping of environment variables
+   */
+  protected void setEnvironment(Map<String, String> env) {
+    this.environment = env;
+  }
+
+  /** set the working directory 
+   * @param dir The directory where the command would be executed
+   */
+  protected void setWorkingDirectory(File dir) {
+    this.dir = dir;
+  }
+
+  /** check to see if a command needs to be executed and execute if needed */
+  protected void run() throws IOException {
+    if (lastTime + interval > System.currentTimeMillis())
+      return;
+    exitCode = 0; // reset for next run
+    runCommand();
+  }
+
+  /** Run a command */
+  private void runCommand() throws IOException { 
+    ProcessBuilder builder = new ProcessBuilder(getExecString());
+    Timer timeOutTimer = null;
+    ShellTimeoutTimerTask timeoutTimerTask = null;
+    timedOut = new AtomicBoolean(false);
+    completed = new AtomicBoolean(false);
+    
+    if (environment != null) {
+      builder.environment().putAll(this.environment);
+    }
+    if (dir != null) {
+      builder.directory(this.dir);
+    }
+    
+    process = builder.start();
+    if (timeOutInterval > 0) {
+      timeOutTimer = new Timer();
+      timeoutTimerTask = new ShellTimeoutTimerTask(
+          this);
+      //One time scheduling.
+      timeOutTimer.schedule(timeoutTimerTask, timeOutInterval);
+    }
+    final BufferedReader errReader = 
+            new BufferedReader(new InputStreamReader(process
+                                                     .getErrorStream()));
+    BufferedReader inReader = 
+            new BufferedReader(new InputStreamReader(process
+                                                     .getInputStream()));
+    final StringBuffer errMsg = new StringBuffer();
+    
+    // read error and input streams as this would free up the buffers
+    // free the error stream buffer
+    Thread errThread = new Thread() {
+      @Override
+      public void run() {
+        try {
+          String line = errReader.readLine();
+          while((line != null) && !isInterrupted()) {
+            errMsg.append(line);
+            errMsg.append(System.getProperty("line.separator"));
+            line = errReader.readLine();
+          }
+        } catch(IOException ioe) {
+          LOG.warn("Error reading the error stream", ioe);
+        }
+      }
+    };
+    try {
+      errThread.start();
+    } catch (IllegalStateException ise) { }
+    try {
+      parseExecResult(inReader); // parse the output
+      // clear the input stream buffer
+      String line = inReader.readLine();
+      while(line != null) { 
+        line = inReader.readLine();
+      }
+      // wait for the process to finish and check the exit code
+      exitCode  = process.waitFor();
+      try {
+        // make sure that the error thread exits
+        errThread.join();
+      } catch (InterruptedException ie) {
+        LOG.warn("Interrupted while reading the error stream", ie);
+      }
+      completed.set(true);
+      //the timeout thread handling
+      //taken care in finally block
+      if (exitCode != 0) {
+        throw new ExitCodeException(exitCode, errMsg.toString());
+      }
+    } catch (InterruptedException ie) {
+      throw new IOException(ie.toString());
+    } finally {
+      if ((timeOutTimer!=null) && !timedOut.get()) {
+        timeOutTimer.cancel();
+      }
+      // close the input stream
+      try {
+        inReader.close();
+      } catch (IOException ioe) {
+        LOG.warn("Error while closing the input stream", ioe);
+      }
+      if (!completed.get()) {
+        errThread.interrupt();
+      }
+      try {
+        errReader.close();
+      } catch (IOException ioe) {
+        LOG.warn("Error while closing the error stream", ioe);
+      }
+      process.destroy();
+      lastTime = System.currentTimeMillis();
+    }
+  }
+
+  /** return an array containing the command name & its parameters */ 
+  protected abstract String[] getExecString();
+  
+  /** Parse the execution result */
+  protected abstract void parseExecResult(BufferedReader lines)
+  throws IOException;
+
+  /** get the current sub-process executing the given command 
+   * @return process executing the command
+   */
+  public Process getProcess() {
+    return process;
+  }
+
+  /** get the exit code 
+   * @return the exit code of the process
+   */
+  public int getExitCode() {
+    return exitCode;
+  }
+
+  /**
+   * This is an IOException with exit code added.
+   */
+  public static class ExitCodeException extends IOException {
+    int exitCode;
+    
+    public ExitCodeException(int exitCode, String message) {
+      super(message);
+      this.exitCode = exitCode;
+    }
+    
+    public int getExitCode() {
+      return exitCode;
+    }
+  }
+  
+  /**
+   * A simple shell command executor.
+   * 
+   * <code>ShellCommandExecutor</code>should be used in cases where the output 
+   * of the command needs no explicit parsing and where the command, working 
+   * directory and the environment remains unchanged. The output of the command 
+   * is stored as-is and is expected to be small.
+   */
+  public static class ShellCommandExecutor extends Shell {
+    
+    private String[] command;
+    private StringBuffer output;
+    
+    
+    public ShellCommandExecutor(String[] execString) {
+      this(execString, null);
+    }
+    
+    public ShellCommandExecutor(String[] execString, File dir) {
+      this(execString, dir, null);
+    }
+   
+    public ShellCommandExecutor(String[] execString, File dir, 
+                                 Map<String, String> env) {
+      this(execString, dir, env , 0L);
+    }
+
+    /**
+     * Create a new instance of the ShellCommandExecutor to execute a command.
+     * 
+     * @param execString The command to execute with arguments
+     * @param dir If not-null, specifies the directory which should be set
+     *            as the current working directory for the command.
+     *            If null, the current working directory is not modified.
+     * @param env If not-null, environment of the command will include the
+     *            key-value pairs specified in the map. If null, the current
+     *            environment is not modified.
+     * @param timeout Specifies the time in milliseconds, after which the
+     *                command will be killed and the status marked as timedout.
+     *                If 0, the command will not be timed out. 
+     */
+    public ShellCommandExecutor(String[] execString, File dir, 
+        Map<String, String> env, long timeout) {
+      command = execString.clone();
+      if (dir != null) {
+        setWorkingDirectory(dir);
+      }
+      if (env != null) {
+        setEnvironment(env);
+      }
+      timeOutInterval = timeout;
+    }
+        
+
+    /** Execute the shell command. */
+    public void execute() throws IOException {
+      this.run();    
+    }
+
+    protected String[] getExecString() {
+      return command;
+    }
+
+    protected void parseExecResult(BufferedReader lines) throws IOException {
+      output = new StringBuffer();
+      char[] buf = new char[512];
+      int nRead;
+      while ( (nRead = lines.read(buf, 0, buf.length)) > 0 ) {
+        output.append(buf, 0, nRead);
+      }
+    }
+    
+    /** Get the output of the shell command.*/
+    public String getOutput() {
+      return (output == null) ? "" : output.toString();
+    }
+
+    /**
+     * Returns the commands of this instance.
+     * Arguments with spaces in are presented with quotes round; other
+     * arguments are presented raw
+     *
+     * @return a string representation of the object.
+     */
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      String[] args = getExecString();
+      for (String s : args) {
+        if (s.indexOf(' ') >= 0) {
+          builder.append('"').append(s).append('"');
+        } else {
+          builder.append(s);
+        }
+        builder.append(' ');
+      }
+      return builder.toString();
+    }
+  }
+  
+  /**
+   * To check if the passed script to shell command executor timed out or
+   * not.
+   * 
+   * @return if the script timed out.
+   */
+  public boolean isTimedOut() {
+    return timedOut.get();
+  }
+  
+  /**
+   * Set if the command has timed out.
+   * 
+   */
+  private void setTimedOut() {
+    this.timedOut.set(true);
+  }
+  
+  /** 
+   * Static method to execute a shell command. 
+   * Covers most of the simple cases without requiring the user to implement  
+   * the <code>Shell</code> interface.
+   * @param cmd shell command to execute.
+   * @return the output of the executed command.
+   */
+  public static String execCommand(String ... cmd) throws IOException {
+    return execCommand(null, cmd, 0L);
+  }
+  
+  /** 
+   * Static method to execute a shell command. 
+   * Covers most of the simple cases without requiring the user to implement  
+   * the <code>Shell</code> interface.
+   * @param env the map of environment key=value
+   * @param cmd shell command to execute.
+   * @param timeout time in milliseconds after which script should be marked timeout
+   * @return the output of the executed command.o
+   */
+  
+  public static String execCommand(Map<String, String> env, String[] cmd,
+      long timeout) throws IOException {
+    ShellCommandExecutor exec = new ShellCommandExecutor(cmd, null, env, 
+                                                          timeout);
+    exec.execute();
+    return exec.getOutput();
+  }
+
+  /** 
+   * Static method to execute a shell command. 
+   * Covers most of the simple cases without requiring the user to implement  
+   * the <code>Shell</code> interface.
+   * @param env the map of environment key=value
+   * @param cmd shell command to execute.
+   * @return the output of the executed command.
+   */
+  public static String execCommand(Map<String,String> env, String ... cmd) 
+  throws IOException {
+    return execCommand(env, cmd, 0L);
+  }
+  
+  /**
+   * Timer which is used to timeout scripts spawned off by shell.
+   */
+  private static class ShellTimeoutTimerTask extends TimerTask {
+
+    private Shell shell;
+
+    public ShellTimeoutTimerTask(Shell shell) {
+      this.shell = shell;
+    }
+
+    @Override
+    public void run() {
+      Process p = shell.getProcess();
+      try {
+        p.exitValue();
+      } catch (Exception e) {
+        //Process has not terminated.
+        //So check if it has completed 
+        //if not just destroy it.
+        if (p != null && !shell.completed.get()) {
+          shell.setTimedOut();
+          p.destroy();
+        }
+      }
+    }
+  }
+}

+ 7 - 0
src/java/main/org/apache/zookeeper/Watcher.java

@@ -69,6 +69,12 @@ public interface Watcher {
              */
             ConnectedReadOnly (5),
 
+            /**
+              * SaslAuthenticated: used to notify clients that they are SASL-authenticated,
+              * so that they can perform Zookeeper actions with their SASL-authorized permissions.
+              */
+            SaslAuthenticated(6),
+
             /** The serving cluster has expired this session. The ZooKeeper
              * client connection (the session) is no longer valid. You must
              * create a new client connection (instantiate a new ZooKeeper
@@ -94,6 +100,7 @@ public interface Watcher {
                     case    3: return KeeperState.SyncConnected;
                     case    4: return KeeperState.AuthFailed;
                     case    5: return KeeperState.ConnectedReadOnly;
+                    case    6: return KeeperState.SaslAuthenticated;
                     case -112: return KeeperState.Expired;
 
                     default:

+ 2 - 0
src/java/main/org/apache/zookeeper/ZooDefs.java

@@ -58,6 +58,8 @@ public class ZooDefs {
 
         public final int setWatches = 101;
 
+        public final int sasl = 102;
+
         public final int createSession = -10;
 
         public final int closeSession = -11;

+ 348 - 0
src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java

@@ -0,0 +1,348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.client;
+
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.ClientCnxn;
+import org.apache.zookeeper.Login;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.GetSASLRequest;
+import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.proto.RequestHeader;
+import org.apache.zookeeper.proto.SetSASLResponse;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.server.auth.KerberosName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+/**
+ * This class manages SASL authentication for the client. It
+ * allows ClientCnxn to authenticate using SASL with a Zookeeper server.
+ */
+public class ZooKeeperSaslClient {
+    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSaslClient.class);
+    private static Login login = null;
+    private SaslClient saslClient;
+
+    private byte[] saslToken = new byte[0];
+    private ClientCnxn cnxn;
+
+    private enum SaslState {
+        INITIAL,INTERMEDIATE,COMPLETE
+    }
+
+    private SaslState saslState = SaslState.INITIAL;
+
+    public ZooKeeperSaslClient(ClientCnxn cnxn, String serverPrincipal) throws LoginException {
+        this.cnxn = cnxn;
+        this.saslClient = createSaslClient(serverPrincipal);
+    }
+
+    public boolean isComplete() {
+        return (saslState == SaslState.COMPLETE);
+    }
+
+    public static class ServerSaslResponseCallback implements AsyncCallback.DataCallback {
+        public void processResult(int rc, String path, Object ctx, byte data[], Stat stat) {
+            // processResult() is used by ClientCnxn's sendThread to respond to
+            // data[] contains the Zookeeper Server's SASL token.
+            // ctx is the ZooKeeperSaslClient object. We use this object's prepareSaslResponseToServer() method
+            // to reply to the Zookeeper Server's SASL token
+            ZooKeeperSaslClient client = ((ClientCnxn)ctx).zooKeeperSaslClient;
+            if (client == null) {
+                LOG.warn("sasl client was unexpectedly null: cannot respond to Zookeeper server.");
+                return;
+            }
+            byte[] usedata = data;
+            if (data != null) {
+                LOG.debug("ServerSaslResponseCallback(): saslToken server response: (length="+usedata.length+")");
+            }
+            else {
+                usedata = new byte[0];
+                LOG.debug("ServerSaslResponseCallback(): using empty data[] as server response (length="+usedata.length+")");
+            }
+            client.prepareSaslResponseToServer(usedata);
+        }
+    }
+
+    synchronized private SaslClient createSaslClient(final String servicePrincipal) throws LoginException {
+        try {
+            if (login == null) {
+                // note that the login object is static: it's shared amongst all zookeeper-related connections.
+                // createSaslClient() must be declared synchronized so that login is initialized only once.
+                login = new Login("Client",new ClientCallbackHandler(null));
+                login.startThreadIfNeeded();
+            }
+            Subject subject = login.getSubject();
+            SaslClient saslClient;
+            // Use subject.getPrincipals().isEmpty() as an indication of which SASL mechanism to use:
+            // if empty, use DIGEST-MD5; otherwise, use GSSAPI.
+            if (subject.getPrincipals().isEmpty()) {
+                // no principals: must not be GSSAPI: use DIGEST-MD5 mechanism instead.
+                LOG.info("Client will use DIGEST-MD5 as SASL mechanism.");
+                String[] mechs = {"DIGEST-MD5"};
+                String username = (String)(subject.getPublicCredentials().toArray()[0]);
+                String password = (String)(subject.getPrivateCredentials().toArray()[0]);
+                // "zk-sasl-md5" is a hard-wired 'domain' parameter shared with zookeeper server code (see ServerCnxnFactory.java)
+                saslClient = Sasl.createSaslClient(mechs, username, "zookeeper", "zk-sasl-md5", null, new ClientCallbackHandler(password));
+                return saslClient;
+            }
+            else { // GSSAPI.
+                final Object[] principals = subject.getPrincipals().toArray();
+                // determine client principal from subject.
+                final Principal clientPrincipal = (Principal)principals[0];
+                final KerberosName clientKerberosName = new KerberosName(clientPrincipal.getName());
+                // assume that server and client are in the same realm (by default; unless the system property
+                // "zookeeper.server.realm" is set).
+                String serverRealm = System.getProperty("zookeeper.server.realm",clientKerberosName.getRealm());
+                KerberosName serviceKerberosName = new KerberosName(servicePrincipal+"@"+serverRealm);
+                final String serviceName = serviceKerberosName.getServiceName();
+                final String serviceHostname = serviceKerberosName.getHostName();
+                final String clientPrincipalName = clientKerberosName.toString();
+                try {
+                    saslClient = Subject.doAs(subject,new PrivilegedExceptionAction<SaslClient>() {
+                        public SaslClient run() throws SaslException {
+                            LOG.info("Client will use GSSAPI as SASL mechanism.");
+                            String[] mechs = {"GSSAPI"};
+                            LOG.debug("creating sasl client: client="+clientPrincipalName+";service="+serviceName+";serviceHostname="+serviceHostname);
+                            SaslClient saslClient = Sasl.createSaslClient(mechs,clientPrincipalName,serviceName,serviceHostname,null,new ClientCallbackHandler(null));
+                            return saslClient;
+                        }
+                    });
+                    return saslClient;
+                }
+                catch (Exception e) {
+                    LOG.error("Error creating SASL client:" + e);
+                    e.printStackTrace();
+                    return null;
+                }
+            }
+        }
+        catch (LoginException e) {
+            throw e;
+        }
+        catch (Exception e) {
+            LOG.error("Exception while trying to create SASL client: " + e);
+            return null;
+        }
+    }
+
+    private void prepareSaslResponseToServer(byte[] serverToken) {
+        saslToken = serverToken;
+
+        if (saslClient == null) {
+            LOG.error("saslClient is unexpectedly null. Cannot respond to server's SASL message; ignoring.");
+            return;
+        }
+
+        LOG.debug("saslToken (server) length: " + saslToken.length);
+        if (!(saslClient.isComplete())) {
+            try {
+                saslToken = createSaslToken(saslToken);
+                if (saslToken != null) {
+                    LOG.debug("saslToken (client) length: " + saslToken.length);
+                    queueSaslPacket(saslToken);
+                }
+            } catch (SaslException e) {
+                // TODO sendThread should set state to AUTH_FAILED; but currently only sendThread modifies state.
+                LOG.error("SASL authentication failed.");
+            }
+        }
+    }
+
+    public byte[] createSaslToken() throws SaslException {
+        saslState = SaslState.INTERMEDIATE;
+        return createSaslToken(saslToken);
+    }
+
+    private byte[] createSaslToken(final byte[] saslToken) throws SaslException {
+        if (saslToken == null) {
+            // TODO: introspect about runtime environment (such as jaas.conf)
+            throw new SaslException("Error in authenticating with a Zookeeper Quorum member: the quorum member's saslToken is null.");
+        }
+
+        Subject subject = login.getSubject();
+        if (subject != null) {
+            synchronized(login) {
+                try {
+                    final byte[] retval =
+                        Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
+                                public byte[] run() throws SaslException {
+                                    LOG.debug("saslClient.evaluateChallenge(len="+saslToken.length+")");
+                                    return saslClient.evaluateChallenge(saslToken);
+                                }
+                            });
+                    return retval;
+                }
+                catch (PrivilegedActionException e) {
+                    String error = "An error: (" + e + ") occurred when evaluating Zookeeper Quorum Member's " +
+                      " received SASL token.";
+                    // Try to provide hints to use about what went wrong so they can fix their configuration.
+                    // TODO: introspect about e: look for GSS information.
+                    final String UNKNOWN_SERVER_ERROR_TEXT =
+                      "(Mechanism level: Server not found in Kerberos database (7) - UNKNOWN_SERVER)";
+                    if (e.toString().indexOf(UNKNOWN_SERVER_ERROR_TEXT) > -1) {
+                        error += " This may be caused by Java's being unable to resolve the Zookeeper Quorum Member's" +
+                          " hostname correctly. You may want to try to adding" +
+                          " '-Dsun.net.spi.nameservice.provider.1=dns,sun' to your client's JVMFLAGS environment.";
+                    }
+                    error += " Zookeeper Client will go to AUTH_FAILED state.";
+                    LOG.error(error);
+                    throw new SaslException(error);
+                }
+            }
+        }
+        else {
+            throw new SaslException("Cannot make SASL token without subject defined. " +
+              "For diagnosis, please look for WARNs and ERRORs in your log related to the Login class.");
+        }
+    }
+
+    public void queueSaslPacket(byte[] saslToken) {
+        LOG.debug("ClientCnxn:sendSaslPacket:length="+saslToken.length);
+        RequestHeader h = new RequestHeader();
+        h.setType(ZooDefs.OpCode.sasl);
+        GetSASLRequest request = new GetSASLRequest();
+        request.setToken(saslToken);
+        SetSASLResponse response = new SetSASLResponse();
+        ServerSaslResponseCallback cb = new ServerSaslResponseCallback();
+        ReplyHeader r = new ReplyHeader();
+        cnxn.queuePacket(h,r,request,response,cb);
+    }
+
+    public void queueSaslPacket() throws SaslException {
+        queueSaslPacket(createSaslToken());
+    }
+
+    // used by ClientCnxn to know when to emit SaslAuthenticated event.
+    // transitions internally from INTERMEDIATE to COMPLETE as a side effect if
+    // it's ready to emit this event.
+    public boolean readyToSendSaslAuthEvent() {
+        if (saslClient != null) {
+            if (saslClient.isComplete()) {
+                if (saslState == SaslState.INTERMEDIATE) {
+                    saslState = SaslState.COMPLETE;
+                    return true;
+                }
+            }
+        }
+        else {
+            LOG.warn("saslClient is null: client could not authenticate properly.");
+        }
+        return false;
+    }
+
+    public void initialize() throws SaslException {
+        if (saslState == SaslState.INITIAL) {
+            if (saslClient.hasInitialResponse()) {
+                queueSaslPacket();
+            }
+            else {
+                byte[] emptyToken = new byte[0];
+                queueSaslPacket(emptyToken);
+            }
+            saslState = SaslState.INTERMEDIATE;
+        }
+    }
+
+    // The CallbackHandler interface here refers to
+    // javax.security.auth.callback.CallbackHandler.
+    // It should not be confused with Zookeeper packet callbacks like
+    //  org.apache.zookeeper.server.auth.SaslServerCallbackHandler.
+    public static class ClientCallbackHandler implements CallbackHandler {
+        private String password = null;
+
+        public ClientCallbackHandler(String password) {
+            this.password = password;
+        }
+
+        public void handle(Callback[] callbacks) throws
+          UnsupportedCallbackException {
+            for (Callback callback : callbacks) {
+                if (callback instanceof NameCallback) {
+                    NameCallback nc = (NameCallback) callback;
+                    nc.setName(nc.getDefaultName());
+                }
+                else {
+                    if (callback instanceof PasswordCallback) {
+                        PasswordCallback pc = (PasswordCallback)callback;
+                        if (password != null) {
+                            pc.setPassword(this.password.toCharArray());
+                        } else {
+                            LOG.warn("Could not login: the client is being asked for a password, but the Zookeeper" +
+                              " client code does not currently support obtaining a password from the user." +
+                              " Make sure that the client is configured to use a ticket cache (using" +
+                              " the JAAS configuration setting 'useTicketCache=true)' and restart the client. If" +
+                              " you still get this message after that, the TGT in the ticket cache has expired and must" +
+                              " be manually refreshed. To do so, first determine if you are using a password or a" +
+                              " keytab. If the former, run kinit in a Unix shell in the environment of the user who" +
+                              " is running this Zookeeper client using the command" +
+                              " 'kinit <princ>' (where <princ> is the name of the client's Kerberos principal)." +
+                              " If the latter, do" +
+                              " 'kinit -k -t <keytab> <princ>' (where <princ> is the name of the Kerberos principal, and" +
+                              " <keytab> is the location of the keytab file). After manually refreshing your cache," +
+                              " restart this client. If you continue to see this message after manually refreshing" +
+                              " your cache, ensure that your KDC host's clock is in sync with this host's clock.");
+                        }
+                    }
+                    else {
+                        if (callback instanceof RealmCallback) {
+                            RealmCallback rc = (RealmCallback) callback;
+                            rc.setText(rc.getDefaultText());
+                        }
+                        else {
+                            if (callback instanceof AuthorizeCallback) {
+                                AuthorizeCallback ac = (AuthorizeCallback) callback;
+                                String authid = ac.getAuthenticationID();
+                                String authzid = ac.getAuthorizationID();
+                                if (authid.equals(authzid)) {
+                                    ac.setAuthorized(true);
+                                } else {
+                                    ac.setAuthorized(false);
+                                }
+                                if (ac.isAuthorized()) {
+                                    ac.setAuthorizedID(authzid);
+                                }
+                            }
+                            else {
+                                throw new UnsupportedCallbackException(callback,"Unrecognized SASL ClientCallback");
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+}

+ 4 - 1
src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java

@@ -98,6 +98,9 @@ public class NIOServerCnxn extends ServerCnxn {
         this.sock = sock;
         this.sk = sk;
         this.factory = factory;
+        if (this.factory.login != null) {
+            this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
+        }
         if (zk != null) { 
             outstandingLimit = zk.getGlobalOutstandingLimit();
         }
@@ -478,7 +481,7 @@ public class NIOServerCnxn extends ServerCnxn {
     /**
      * Set of threads for commmand ports. All the 4
      * letter commands are run via a thread. Each class
-     * maps to a correspoding 4 letter command. CommandThread
+     * maps to a corresponding 4 letter command. CommandThread
      * is the abstract class from which all the others inherit.
      */
     private abstract class CommandThread extends Thread {

+ 19 - 0
src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java

@@ -32,9 +32,14 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.zookeeper.Login;
+import org.apache.zookeeper.server.auth.SaslServerCallbackHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginException;
+
 public class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable {
     private static final Logger LOG = LoggerFactory.getLogger(NIOServerCnxnFactory.class);
 
@@ -86,6 +91,17 @@ public class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable
     Thread thread;
     @Override
     public void configure(InetSocketAddress addr, int maxcc) throws IOException {
+        if (System.getProperty("java.security.auth.login.config") != null) {
+            try {
+                saslServerCallbackHandler = new SaslServerCallbackHandler(Configuration.getConfiguration());
+                login = new Login("Server",saslServerCallbackHandler);
+                login.startThreadIfNeeded();
+            }
+            catch (LoginException e) {
+                throw new IOException("Could not configure server because SASL configuration did not allow the "
+                  + " Zookeeper server to authenticate itself properly: " + e);
+            }
+        }
         thread = new Thread(this, "NIOServerCxn.Factory:" + addr);
         thread.setDaemon(true);
         maxClientCnxns = maxcc;
@@ -254,6 +270,9 @@ public class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable
             closeAll();
             thread.interrupt();
             thread.join();
+            if (login != null) {
+                login.shutdown();
+            }
         } catch (InterruptedException e) {
             LOG.warn("Ignoring interrupted exception during shutdown", e);
         } catch (Exception e) {

+ 3 - 0
src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java

@@ -81,6 +81,9 @@ public class NettyServerCnxn extends ServerCnxn {
         this.channel = channel;
         this.zkServer = zks;
         this.factory = factory;
+        if (this.factory.login != null) {
+            this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
+        }
     }
     
     @Override

+ 19 - 1
src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java

@@ -28,6 +28,8 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.Executors;
 
+import org.apache.zookeeper.Login;
+import org.apache.zookeeper.server.auth.SaslServerCallbackHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.jboss.netty.bootstrap.ServerBootstrap;
@@ -45,6 +47,9 @@ import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.DefaultChannelGroup;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginException;
+
 public class NettyServerCnxnFactory extends ServerCnxnFactory {
     Logger LOG = LoggerFactory.getLogger(NettyServerCnxnFactory.class);
 
@@ -302,6 +307,17 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
     public void configure(InetSocketAddress addr, int maxClientCnxns)
             throws IOException
     {
+        if (System.getProperty("java.security.auth.login.config") != null) {
+            try {
+                saslServerCallbackHandler = new SaslServerCallbackHandler(Configuration.getConfiguration());
+                login = new Login("Server",saslServerCallbackHandler);
+                login.startThreadIfNeeded();
+            }
+            catch (LoginException e) {
+                throw new IOException("Could not configure server because SASL configuration did not allow the "
+                  + " Zookeeper server to authenticate itself properly: " + e);
+            }
+        }
         localAddress = addr;
         this.maxClientCnxns = maxClientCnxns;
     }
@@ -334,7 +350,9 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
     @Override
     public void shutdown() {
         LOG.info("shutdown called " + localAddress);
-        
+        if (login != null) {
+            login.shutdown();
+        }
         // null if factory never started
         if (parentChannel != null) {
             parentChannel.close().awaitUninterruptibly();

+ 2 - 0
src/java/main/org/apache/zookeeper/server/ServerCnxn.java

@@ -94,6 +94,8 @@ public abstract class ServerCnxn implements Stats, Watcher {
 
     abstract void setSessionTimeout(int sessionTimeout);
 
+    protected ZooKeeperSaslServer zooKeeperSaslServer = null;
+
     protected static class CloseRequestException extends IOException {
         private static final long serialVersionUID = -7854505709816442681L;
 

+ 7 - 3
src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java

@@ -24,13 +24,14 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 
 import javax.management.JMException;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.Login;
+import org.apache.zookeeper.server.auth.SaslServerCallbackHandler;
 
 public abstract class ServerCnxnFactory {
-    
+
     public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory";
 
     public interface PacketProcessor {
@@ -51,7 +52,10 @@ public abstract class ServerCnxnFactory {
     public abstract void closeSession(long sessionId);
 
     public abstract void configure(InetSocketAddress addr,
-            int maxClientCnxns) throws IOException;
+                                   int maxClientCnxns) throws IOException;
+
+    protected SaslServerCallbackHandler saslServerCallbackHandler;
+    public Login login;
 
     /** Maximum number of connections allowed from particular host (ip) */
     public abstract int getMaxClientCnxnsPerHost();

+ 131 - 0
src/java/main/org/apache/zookeeper/server/ZooKeeperSaslServer.java

@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.security.auth.Subject;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import org.apache.zookeeper.Login;
+
+public class ZooKeeperSaslServer {
+    Logger LOG = LoggerFactory.getLogger(ZooKeeperSaslServer.class);
+    private SaslServer saslServer;
+
+    ZooKeeperSaslServer(final Login login) {
+        saslServer = createSaslServer(login);
+    }
+
+    private SaslServer createSaslServer(final Login login) {
+        synchronized (login) {
+            Subject subject = login.getSubject();
+            if (subject != null) {
+                // server is using a JAAS-authenticated subject: determine service principal name and hostname from zk server's subject.
+                if (subject.getPrincipals().size() > 0) {
+                    try {
+                        final Object[] principals = subject.getPrincipals().toArray();
+                        final Principal servicePrincipal = (Principal)principals[0];
+
+                        // e.g. servicePrincipalNameAndHostname := "zookeeper/myhost.foo.com@FOO.COM"
+                        final String servicePrincipalNameAndHostname = servicePrincipal.getName();
+
+                        int indexOf = servicePrincipalNameAndHostname.indexOf("/");
+
+                        // e.g. servicePrincipalName := "zookeeper"
+                        final String servicePrincipalName = servicePrincipalNameAndHostname.substring(0, indexOf);
+
+                        // e.g. serviceHostnameAndKerbDomain := "myhost.foo.com@FOO.COM"
+                        final String serviceHostnameAndKerbDomain = servicePrincipalNameAndHostname.substring(indexOf+1,servicePrincipalNameAndHostname.length());
+
+                        indexOf = serviceHostnameAndKerbDomain.indexOf("@");
+                        // e.g. serviceHostname := "myhost.foo.com"
+                        final String serviceHostname = serviceHostnameAndKerbDomain.substring(0,indexOf);
+
+                        final String mech = "GSSAPI";   // TODO: should depend on zoo.cfg specified mechs, but if subject is non-null, it can be assumed to be GSSAPI.
+
+                        LOG.debug("serviceHostname is '"+ serviceHostname + "'");
+                        LOG.debug("servicePrincipalName is "+ servicePrincipalName + "'");
+                        LOG.debug("SASL mechanism(mech) is "+ mech +"'");
+
+                        try {
+                            return Subject.doAs(subject,new PrivilegedExceptionAction<SaslServer>() {
+                                public SaslServer run() {
+                                    try {
+                                        SaslServer saslServer;
+                                        saslServer = Sasl.createSaslServer(mech, servicePrincipalName, serviceHostname, null, login.callbackHandler);
+                                        return saslServer;
+                                    }
+                                    catch (SaslException e) {
+                                        LOG.error("Zookeeper Server failed to create a SaslServer to interact with a client during session initiation: " + e);
+                                        e.printStackTrace();
+                                        return null;
+                                    }
+                                }
+                            }
+                            );
+                        }
+                        catch (PrivilegedActionException e) {
+                            // TODO: exit server at this point(?)
+                            LOG.error("Zookeeper Quorum member experienced a PrivilegedActionException exception while creating a SaslServer using a JAAS principal context:" + e);
+                            e.printStackTrace();
+                        }
+                    }
+                    catch (Exception e) {
+                        LOG.error("server principal name/hostname determination error: " + e);
+                    }
+                }
+                else {
+                    // JAAS non-GSSAPI authentication: assuming and supporting only DIGEST-MD5 mechanism for now.
+                    // TODO: use 'authMech=' value in zoo.cfg.
+                    try {
+                        SaslServer saslServer = Sasl.createSaslServer("DIGEST-MD5","zookeeper","zk-sasl-md5",null, login.callbackHandler);
+                        return saslServer;
+                    }
+                    catch (SaslException e) {
+                        LOG.error("Zookeeper Quorum member failed to create a SaslServer to interact with a client during session initiation: " + e);
+                    }
+                }
+            }
+        }
+        LOG.error("failed to create saslServer object.");
+        return null;
+    }
+
+    public byte[] evaluateResponse(byte[] response) throws SaslException {
+        return saslServer.evaluateResponse(response);
+    }
+
+    public boolean isComplete() {
+        return saslServer.isComplete();
+    }
+
+    public String getAuthorizationID() {
+        return saslServer.getAuthorizationID();
+    }
+
+}
+
+
+
+

+ 56 - 4
src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -57,8 +57,10 @@ import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.proto.AuthPacket;
 import org.apache.zookeeper.proto.ConnectRequest;
 import org.apache.zookeeper.proto.ConnectResponse;
+import org.apache.zookeeper.proto.GetSASLRequest;
 import org.apache.zookeeper.proto.ReplyHeader;
 import org.apache.zookeeper.proto.RequestHeader;
+import org.apache.zookeeper.proto.SetSASLResponse;
 import org.apache.zookeeper.server.ServerCnxn.CloseRequestException;
 import org.apache.zookeeper.server.SessionTracker.Session;
 import org.apache.zookeeper.server.SessionTracker.SessionExpirer;
@@ -67,6 +69,7 @@ import org.apache.zookeeper.server.auth.ProviderRegistry;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
 import org.apache.zookeeper.server.util.ZxidUtils;
+import javax.security.sasl.SaslException;
 
 /**
  * This class implements a simple standalone ZooKeeperServer. It sets up the
@@ -888,13 +891,62 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
             }
             return;
         } else {
-            Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
-                    h.getType(), incomingBuffer, cnxn.getAuthInfo());
-            si.setOwner(ServerCnxn.me);
-            submitRequest(si);
+            if (h.getType() == OpCode.sasl) {
+                Record rsp = processSasl(incomingBuffer,cnxn);
+                ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
+                cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?
+            }
+            else {
+                Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
+                  h.getType(), incomingBuffer, cnxn.getAuthInfo());
+                si.setOwner(ServerCnxn.me);
+                submitRequest(si);
+            }
         }
         cnxn.incrOutstandingRequests(h);
     }
 
+    private Record processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn) throws IOException {
+        LOG.debug("Responding to client SASL token.");
+        GetSASLRequest clientTokenRecord = new GetSASLRequest();
+        byteBuffer2Record(incomingBuffer,clientTokenRecord);
+        byte[] clientToken = clientTokenRecord.getToken();
+        LOG.debug("Size of client SASL token: " + clientToken.length);
+        byte[] responseToken = null;
+        try {
+            ZooKeeperSaslServer saslServer  = cnxn.zooKeeperSaslServer;
+            try {
+                // note that clientToken might be empty (clientToken.length == 0):
+                // if using the DIGEST-MD5 mechanism, clientToken will be empty at the beginning of the
+                // SASL negotiation process.
+                responseToken = saslServer.evaluateResponse(clientToken);
+                if (saslServer.isComplete() == true) {
+                    String authorizationID = saslServer.getAuthorizationID();
+                    LOG.info("adding SASL authorization for authorizationID: " + authorizationID);
+                    cnxn.addAuthInfo(new Id("sasl",authorizationID));
+                }
+            }
+            catch (SaslException e) {
+                LOG.warn("Client failed to SASL authenticate: " + e);
+                if ((System.getProperty("zookeeper.allowSaslFailedClients") != null)
+                  &&
+                  (System.getProperty("zookeeper.allowSaslFailedClients").equals("true"))) {
+                    LOG.warn("Maintaining client connection despite SASL authentication failure.");
+                } else {
+                    LOG.warn("Closing client connection due to SASL authentication failure.");
+                    cnxn.close();
+                }
+            }
+        }
+        catch (NullPointerException e) {
+            LOG.error("cnxn.saslServer is null: cnxn object did not initialize its saslServer properly.");
+        }
+        if (responseToken != null) {
+            LOG.debug("Size of server SASL response: " + responseToken.length);
+        }
+        // wrap SASL response token to client inside a Response object.
+        return new SetSASLResponse(responseToken);
+    }
+
 
 }

+ 61 - 0
src/java/main/org/apache/zookeeper/server/auth/DigestLoginModule.java

@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.auth;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.spi.LoginModule;
+import java.util.Map;
+
+public class DigestLoginModule implements LoginModule {
+    private Subject subject;
+
+    public boolean abort() {
+        return false;
+    }
+
+    public boolean commit() {
+        return true;
+    }
+
+    public void initialize(Subject subject, CallbackHandler callbackHandler, Map<String,?> sharedState, Map<String,?> options) {
+        if (options.containsKey("username")) {
+            // Zookeeper client: get username and password from JAAS conf (only used if using DIGEST-MD5).
+            this.subject = subject;
+            String username = (String)options.get("username");
+            this.subject.getPublicCredentials().add((Object)username);
+            String password = (String)options.get("password");
+            this.subject.getPrivateCredentials().add((Object)password);
+        }
+        return;
+    }
+
+    public boolean logout() {
+        return true;
+    }
+
+    public boolean login() {
+        // Unlike with Krb5LoginModule, we don't do any actual login or credential passing here: authentication to Zookeeper
+        // is done later, through the SASLClient object.
+        return true;
+    }
+
+}
+
+

+ 418 - 0
src/java/main/org/apache/zookeeper/server/auth/KerberosName.java

@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ /* This file copied from Hadoop's security branch,
+  * with the following changes:
+  * 1. package changed from org.apache.hadoop.security to
+  *    org.apache.zookeeper.server.auth.
+  * 2. Usage of Hadoop's Configuration class removed since
+  *    it is not available in Zookeeper: instead, system property
+  *    "zookeeper.security.auth_to_local" is used.
+  */
+
+package org.apache.zookeeper.server.auth;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import sun.security.krb5.Config;
+import sun.security.krb5.KrbException;
+
+/**
+ * This class implements parsing and handling of Kerberos principal names. In 
+ * particular, it splits them apart and translates them down into local
+ * operating system names.
+ */
+public class KerberosName {
+  /** The first component of the name */
+  private final String serviceName;
+  /** The second component of the name. It may be null. */
+  private final String hostName;
+  /** The realm of the name. */
+  private final String realm;
+
+  /**
+   * A pattern that matches a Kerberos name with at most 2 components.
+   */
+  private static final Pattern nameParser = 
+    Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)");
+
+  /** 
+   * A pattern that matches a string with out '$' and then a single
+   * parameter with $n.
+   */
+  private static Pattern parameterPattern = 
+    Pattern.compile("([^$]*)(\\$(\\d*))?");
+
+  /**
+   * A pattern for parsing a auth_to_local rule.
+   */
+  private static final Pattern ruleParser =
+    Pattern.compile("\\s*((DEFAULT)|(RULE:\\[(\\d*):([^\\]]*)](\\(([^)]*)\\))?"+
+                    "(s/([^/]*)/([^/]*)/(g)?)?))");
+  
+  /**
+   * A pattern that recognizes simple/non-simple names.
+   */
+  private static final Pattern nonSimplePattern = Pattern.compile("[/@]");
+  
+  /**
+   * The list of translation rules.
+   */
+  private static List<Rule> rules;
+
+  private static String defaultRealm;
+  private static Config kerbConf;
+  
+  static {
+    try {
+      kerbConf = Config.getInstance();
+      defaultRealm = kerbConf.getDefaultRealm();
+    } catch (KrbException ke) {
+      if ((System.getProperty("zookeeper.requireKerberosConfig") != null) &&
+          (System.getProperty("zookeeper.requireKerberosConfig").equals("true"))) {
+        throw new IllegalArgumentException("Can't get Kerberos configuration",ke);
+      }
+    }
+    try {
+      // setConfiguration() will work even if the above try() fails due
+      // to a missing Kerberos configuration (unless zookeeper.requireKerberosConfig
+      // is set to true, which would not allow execution to reach here due to the
+      // throwing of an IllegalArgumentException above).
+      setConfiguration();
+    }
+    catch (IOException e) {
+      throw new IllegalArgumentException("Could not configure Kerberos principal name mapping.");
+    }
+  }
+
+  /**
+   * Create a name from the full Kerberos principal name.
+   * @param name
+   */
+  public KerberosName(String name) {
+    Matcher match = nameParser.matcher(name);
+    if (!match.matches()) {
+      if (name.contains("@")) {
+        throw new IllegalArgumentException("Malformed Kerberos name: " + name);
+      } else {
+        serviceName = name;
+        hostName = null;
+        realm = null;
+      }
+    } else {
+      serviceName = match.group(1);
+      hostName = match.group(3);
+      realm = match.group(4);
+    }
+  }
+
+  /**
+   * Get the configured default realm.
+   * @return the default realm from the krb5.conf
+   */
+  public String getDefaultRealm() {
+    return defaultRealm;
+  }
+
+  /**
+   * Put the name back together from the parts.
+   */
+  @Override
+  public String toString() {
+    StringBuilder result = new StringBuilder();
+    result.append(serviceName);
+    if (hostName != null) {
+      result.append('/');
+      result.append(hostName);
+    }
+    if (realm != null) {
+      result.append('@');
+      result.append(realm);
+    }
+    return result.toString();
+  }
+
+  /**
+   * Get the first component of the name.
+   * @return the first section of the Kerberos principal name
+   */
+  public String getServiceName() {
+    return serviceName;
+  }
+
+  /**
+   * Get the second component of the name.
+   * @return the second section of the Kerberos principal name, and may be null
+   */
+  public String getHostName() {
+    return hostName;
+  }
+  
+  /**
+   * Get the realm of the name.
+   * @return the realm of the name, may be null
+   */
+  public String getRealm() {
+    return realm;
+  }
+  
+  /**
+   * An encoding of a rule for translating kerberos names.
+   */
+  private static class Rule {
+    private final boolean isDefault;
+    private final int numOfComponents;
+    private final String format;
+    private final Pattern match;
+    private final Pattern fromPattern;
+    private final String toPattern;
+    private final boolean repeat;
+
+    Rule() {
+      isDefault = true;
+      numOfComponents = 0;
+      format = null;
+      match = null;
+      fromPattern = null;
+      toPattern = null;
+      repeat = false;
+    }
+
+    Rule(int numOfComponents, String format, String match, String fromPattern,
+         String toPattern, boolean repeat) {
+      isDefault = false;
+      this.numOfComponents = numOfComponents;
+      this.format = format;
+      this.match = match == null ? null : Pattern.compile(match);
+      this.fromPattern = 
+        fromPattern == null ? null : Pattern.compile(fromPattern);
+      this.toPattern = toPattern;
+      this.repeat = repeat;
+    }
+    
+    @Override
+    public String toString() {
+      StringBuilder buf = new StringBuilder();
+      if (isDefault) {
+        buf.append("DEFAULT");
+      } else {
+        buf.append("RULE:[");
+        buf.append(numOfComponents);
+        buf.append(':');
+        buf.append(format);
+        buf.append(']');
+        if (match != null) {
+          buf.append('(');
+          buf.append(match);
+          buf.append(')');
+        }
+        if (fromPattern != null) {
+          buf.append("s/");
+          buf.append(fromPattern);
+          buf.append('/');
+          buf.append(toPattern);
+          buf.append('/');
+          if (repeat) {
+            buf.append('g');
+          }
+        }
+      }
+      return buf.toString();
+    }
+    
+    /**
+     * Replace the numbered parameters of the form $n where n is from 1 to 
+     * the length of params. Normal text is copied directly and $n is replaced
+     * by the corresponding parameter.
+     * @param format the string to replace parameters again
+     * @param params the list of parameters
+     * @return the generated string with the parameter references replaced.
+     * @throws BadFormatString
+     */
+    static String replaceParameters(String format, 
+                                    String[] params) throws BadFormatString {
+      Matcher match = parameterPattern.matcher(format);
+      int start = 0;
+      StringBuilder result = new StringBuilder();
+      while (start < format.length() && match.find(start)) {
+        result.append(match.group(1));
+        String paramNum = match.group(3);
+        if (paramNum != null) {
+          try {
+            int num = Integer.parseInt(paramNum);
+            if (num < 0 || num > params.length) {
+              throw new BadFormatString("index " + num + " from " + format +
+                                        " is outside of the valid range 0 to " +
+                                        (params.length - 1));
+            }
+            result.append(params[num]);
+          } catch (NumberFormatException nfe) {
+            throw new BadFormatString("bad format in username mapping in " + 
+                                      paramNum, nfe);
+          }
+          
+        }
+        start = match.end();
+      }
+      return result.toString();
+    }
+
+    /**
+     * Replace the matches of the from pattern in the base string with the value
+     * of the to string.
+     * @param base the string to transform
+     * @param from the pattern to look for in the base string
+     * @param to the string to replace matches of the pattern with
+     * @param repeat whether the substitution should be repeated
+     * @return
+     */
+    static String replaceSubstitution(String base, Pattern from, String to, 
+                                      boolean repeat) {
+      Matcher match = from.matcher(base);
+      if (repeat) {
+        return match.replaceAll(to);
+      } else {
+        return match.replaceFirst(to);
+      }
+    }
+
+    /**
+     * Try to apply this rule to the given name represented as a parameter
+     * array.
+     * @param params first element is the realm, second and later elements are
+     *        are the components of the name "a/b@FOO" -> {"FOO", "a", "b"}
+     * @return the short name if this rule applies or null
+     * @throws IOException throws if something is wrong with the rules
+     */
+    String apply(String[] params) throws IOException {
+      String result = null;
+      if (isDefault) {
+        if (defaultRealm.equals(params[0])) {
+          result = params[1];
+        }
+      } else if (params.length - 1 == numOfComponents) {
+        String base = replaceParameters(format, params);
+        if (match == null || match.matcher(base).matches()) {
+          if (fromPattern == null) {
+            result = base;
+          } else {
+            result = replaceSubstitution(base, fromPattern, toPattern,  repeat);
+          }
+        }
+      }
+      if (result != null && nonSimplePattern.matcher(result).find()) {
+        throw new NoMatchingRule("Non-simple name " + result +
+                                 " after auth_to_local rule " + this);
+      }
+      return result;
+    }
+  }
+
+  static List<Rule> parseRules(String rules) {
+    List<Rule> result = new ArrayList<Rule>();
+    String remaining = rules.trim();
+    while (remaining.length() > 0) {
+      Matcher matcher = ruleParser.matcher(remaining);
+      if (!matcher.lookingAt()) {
+        throw new IllegalArgumentException("Invalid rule: " + remaining);
+      }
+      if (matcher.group(2) != null) {
+        result.add(new Rule());
+      } else {
+        result.add(new Rule(Integer.parseInt(matcher.group(4)),
+                            matcher.group(5),
+                            matcher.group(7),
+                            matcher.group(9),
+                            matcher.group(10),
+                            "g".equals(matcher.group(11))));
+      }
+      remaining = remaining.substring(matcher.end());
+    }
+    return result;
+  }
+
+  /**
+   * Set the static configuration to get the rules.
+   * @param conf the new configuration
+   * @throws IOException
+   */
+  public static void setConfiguration() throws IOException {
+    String ruleString = System.getProperty("zookeeper.security.auth_to_local", "DEFAULT");
+    rules = parseRules(ruleString);
+  }
+
+  @SuppressWarnings("serial")
+  public static class BadFormatString extends IOException {
+    BadFormatString(String msg) {
+      super(msg);
+    }
+    BadFormatString(String msg, Throwable err) {
+      super(msg, err);
+    }
+  }
+
+  @SuppressWarnings("serial")
+  public static class NoMatchingRule extends IOException {
+    NoMatchingRule(String msg) {
+      super(msg);
+    }
+  }
+
+  /**
+   * Get the translation of the principal name into an operating system
+   * user name.
+   * @return the short name
+   * @throws IOException
+   */
+  public String getShortName() throws IOException {
+    String[] params;
+    if (hostName == null) {
+      // if it is already simple, just return it
+      if (realm == null) {
+        return serviceName;
+      }
+      params = new String[]{realm, serviceName};
+    } else {
+      params = new String[]{realm, serviceName, hostName};
+    }
+    for(Rule r: rules) {
+      String result = r.apply(params);
+      if (result != null) {
+        return result;
+      }
+    }
+    throw new NoMatchingRule("No rules applied to " + toString());
+  }
+
+  static void printRules() throws IOException {
+    int i = 0;
+    for(Rule r: rules) {
+      System.out.println(++i + " " + r);
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    for(String arg: args) {
+      KerberosName name = new KerberosName(arg);
+      System.out.println("Name: " + name + " to " + name.getShortName());
+    }
+  }
+}

+ 70 - 0
src/java/main/org/apache/zookeeper/server/auth/SASLAuthenticationProvider.java

@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.auth;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.server.ServerCnxn;
+
+public class SASLAuthenticationProvider implements AuthenticationProvider {
+
+    public String getScheme() {
+        return "sasl";
+    }
+
+    public KeeperException.Code
+        handleAuthentication(ServerCnxn cnxn, byte[] authData)
+    {
+        // Should never call this: SASL authentication is negotiated at session initiation.
+        // TODO: consider substituting current implementation of direct ClientCnxn manipulation with
+        // a call to this method (SASLAuthenticationProvider:handleAuthentication()) at session initiation.
+        return KeeperException.Code.AUTHFAILED;
+
+    }
+
+    public boolean matches(String id,String aclExpr) {
+        if (System.getProperty("zookeeper.superUser") != null) {
+            return (id.equals(System.getProperty("zookeeper.superUser")) || id.equals(aclExpr));
+        }
+        return (id.equals("super") || id.equals(aclExpr));
+    }
+
+    public boolean isAuthenticated() {
+        return true;
+    }
+
+    public boolean isValid(String id) {
+        // Since the SASL authenticator will usually be used with Kerberos authentication,
+        // it should enforce that these names are valid according to Kerberos's
+        // syntax for principals.
+        //
+        // Use the KerberosName(id) constructor to define validity:
+        // if KerberosName(id) throws IllegalArgumentException, then id is invalid.
+        // otherwise, it is valid.
+        //
+        try {
+            new KerberosName(id);
+            return true;
+        }
+        catch (IllegalArgumentException e) {
+            return false;
+        }
+   }
+
+
+}

+ 160 - 0
src/java/main/org/apache/zookeeper/server/auth/SaslServerCallbackHandler.java

@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.auth;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+
+public class SaslServerCallbackHandler implements CallbackHandler {
+    private String userName = null;
+    private Map<String,String> credentials = new HashMap<String,String>();
+    Logger LOG = LoggerFactory.getLogger(SaslServerCallbackHandler.class);
+
+    public SaslServerCallbackHandler(Configuration configuration) throws IOException {
+        AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry("Server");
+
+        if (configurationEntries == null) {
+            String errorMessage = "could not find a 'Server' entry in this configuration: server cannot start.";
+            LOG.error(errorMessage);
+            throw(new IOException(errorMessage));
+        }
+        credentials.clear();
+        for(AppConfigurationEntry entry: configurationEntries) {
+            Map<String,?> options = entry.getOptions();
+            // Populate DIGEST-MD5 user -> password map with JAAS configuration entries from the "Server" section.
+            // Usernames are distinguished from other options by prefixing the username with a "user_" prefix.
+            Iterator it = options.entrySet().iterator();
+            while (it.hasNext()) {
+            Map.Entry pair = (Map.Entry)it.next();
+                    String key = (String)pair.getKey();
+                    if (key.substring(0,5).equals("user_")) {
+                        String userName = key.substring(5);
+                        credentials.put(userName,(String)pair.getValue());
+                    }
+                }
+            }
+            return;
+        }
+
+        public void handle(Callback[] callbacks) throws
+                UnsupportedCallbackException {
+            for (Callback callback : callbacks) {
+                if (callback instanceof NameCallback) {
+                    NameCallback nc = (NameCallback) callback;
+                    // check to see if this user is in the user password database.
+                    if (credentials.get(nc.getDefaultName()) != null) {
+                        nc.setName(nc.getDefaultName());
+                        this.userName = nc.getDefaultName();
+                    }
+                    else { // no such user.
+                        LOG.warn("User '" + nc.getDefaultName() + "' not found in list of DIGEST-MD5 authenticateable users.");
+                    }
+                }
+                else {
+                    if (callback instanceof PasswordCallback) {
+                        PasswordCallback pc = (PasswordCallback) callback;
+
+                        if ((this.userName.equals("super")
+                              &&
+                              (System.getProperty("zookeeper.SASLAuthenticationProvider.superPassword") != null))) {
+                            // superuser: use Java system property for password, if available.
+                            pc.setPassword(System.getProperty("zookeeper.SASLAuthenticationProvider.superPassword").toCharArray());
+                        }
+                        else {
+                            if (this.credentials.get(this.userName) != null) {
+                                pc.setPassword(this.credentials.get(this.userName).toCharArray());
+                            }
+                            else {
+                                LOG.warn("No password found for user: " + this.userName);
+                            }
+                        }
+                    }
+                    else {
+                        if (callback instanceof RealmCallback) {
+                            RealmCallback rc = (RealmCallback) callback;
+                            LOG.debug("client supplied realm: " + rc.getDefaultText());
+                            rc.setText(rc.getDefaultText());
+                        }
+                        else {
+                            if (callback instanceof AuthorizeCallback) {
+                                AuthorizeCallback ac = (AuthorizeCallback) callback;
+
+                                String authenticationID = ac.getAuthenticationID();
+                                String authorizationID = ac.getAuthorizationID();
+
+                                LOG.info("Successfully authenticated client: authenticationID=" + authenticationID + ";  authorizationID=" + authorizationID + ".");
+                                if (authenticationID.equals(authorizationID)) {
+                                    LOG.debug("setAuthorized(true) since " + authenticationID + "==" + authorizationID);
+                                    ac.setAuthorized(true);
+                                } else {
+                                    LOG.debug("setAuthorized(true), even though " + authenticationID + "!=" + authorizationID + ".");
+                                    ac.setAuthorized(true);
+                                }
+                                if (ac.isAuthorized()) {
+                                    LOG.debug("isAuthorized() since ac.isAuthorized() == true");
+                                    // canonicalize authorization id according to system properties:
+                                    // kerberos.removeRealmFromPrincipal(={true,false})
+                                    // kerberos.removeHostFromPrincipal(={true,false})
+                                    KerberosName kerberosName = new KerberosName(authenticationID);
+                                    try {
+                                        String userName = kerberosName.getShortName();
+                                        if (!removeHost() && (kerberosName.getHostName() != null)) {
+                                            userName += "/" + kerberosName.getServiceName();
+                                        }
+                                        if (!removeRealm() && (kerberosName.getRealm() != null)) {
+                                            userName += "@" + kerberosName.getRealm();
+                                        }
+                                        LOG.info("Setting authorizedID: " + userName);
+                                        ac.setAuthorizedID(userName);
+                                    }
+                                    catch (IOException e) {
+                                        LOG.error("Failed to set name based on Kerberos authentication rules.");
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+    private boolean removeRealm() {
+        return ((System.getProperty("zookeeper.kerberos.removeRealmFromPrincipal") != null) &&
+          (System.getProperty("zookeeper.kerberos.removeRealmFromPrincipal").equals("true")));
+    }
+
+    private boolean removeHost() {
+        return ((System.getProperty("zookeeper.kerberos.removeHostFromPrincipal") != null) &&
+          (System.getProperty("zookeeper.kerberos.removeHostFromPrincipal").equals("true")));
+    }
+}

+ 0 - 289
src/java/test/org/apache/zookeeper/test/CnxManagerTest.java.orig

@@ -1,289 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.test;
-
-import java.io.File;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.net.Socket;
-import java.util.HashMap;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-import java.io.*;
-
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.PortAssignment;
-import org.apache.zookeeper.ZKTestCase;
-import org.apache.zookeeper.server.quorum.QuorumCnxManager;
-import org.apache.zookeeper.server.quorum.QuorumPeer;
-import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
-import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
-import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class CnxManagerTest extends ZKTestCase {
-    protected static final Logger LOG = Logger.getLogger(FLENewEpochTest.class);
-    protected static final int THRESHOLD = 4;
-
-    int count;
-    HashMap<Long,QuorumServer> peers;
-    File tmpdir[];
-    int port[];
-
-    @Before
-    public void setUp() throws Exception {
-
-        this.count = 3;
-        this.peers = new HashMap<Long,QuorumServer>(count);
-        tmpdir = new File[count];
-        port = new int[count];
-
-        for(int i = 0; i < count; i++) {
-            int clientport = PortAssignment.unique();
-            peers.put(Long.valueOf(i),
-                    new QuorumServer(i,
-                            new InetSocketAddress(clientport),
-                    new InetSocketAddress(PortAssignment.unique())));
-            tmpdir[i] = ClientBase.createTmpDir();
-            port[i] = clientport;
-        }
-    }
-
-    ByteBuffer createMsg(int state, long leader, long zxid, long epoch){
-        byte requestBytes[] = new byte[28];
-        ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
-
-        /*
-         * Building notification packet to send
-         */
-
-        requestBuffer.clear();
-        requestBuffer.putInt(state);
-        requestBuffer.putLong(leader);
-        requestBuffer.putLong(zxid);
-        requestBuffer.putLong(epoch);
-
-        return requestBuffer;
-    }
-
-    class CnxManagerThread extends Thread {
-
-        boolean failed;
-        CnxManagerThread(){
-            failed = false;
-        }
-
-        public void run(){
-            try {
-                QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 2, 2, 2);
-                QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
-                QuorumCnxManager.Listener listener = cnxManager.listener;
-                if(listener != null){
-                    listener.start();
-                } else {
-                    LOG.error("Null listener when initializing cnx manager");
-                }
-
-                long sid = 1;
-                cnxManager.toSend(sid, createMsg(ServerState.LOOKING.ordinal(), 0, -1, 1));
-
-                Message m = null;
-                int numRetries = 1;
-                while((m == null) && (numRetries++ <= THRESHOLD)){
-                    m = cnxManager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
-                    if(m == null) cnxManager.connectAll();
-                }
-
-                if(numRetries > THRESHOLD){
-                    failed = true;
-                    return;
-                }
-
-                cnxManager.testInitiateConnection(sid);
-
-                m = cnxManager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
-                if(m == null){
-                    failed = true;
-                    return;
-                }
-            } catch (Exception e) {
-                LOG.error("Exception while running mock thread", e);
-                Assert.fail("Unexpected exception");
-            }
-        }
-    }
-
-    @Test
-    public void testCnxManager() throws Exception {
-        CnxManagerThread thread = new CnxManagerThread();
-
-        thread.start();
-
-        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 2, 2, 2);
-        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
-        QuorumCnxManager.Listener listener = cnxManager.listener;
-        if(listener != null){
-            listener.start();
-        } else {
-            LOG.error("Null listener when initializing cnx manager");
-        }
-
-        cnxManager.toSend(new Long(0), createMsg(ServerState.LOOKING.ordinal(), 1, -1, 1));
-
-        Message m = null;
-        int numRetries = 1;
-        while((m == null) && (numRetries++ <= THRESHOLD)){
-            m = cnxManager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
-            if(m == null) cnxManager.connectAll();
-        }
-
-        Assert.assertTrue("Exceeded number of retries", numRetries <= THRESHOLD);
-
-        thread.join(5000);
-        if (thread.isAlive()) {
-            Assert.fail("Thread didn't join");
-        } else {
-            if(thread.failed)
-                Assert.fail("Did not receive expected message");
-        }
-        
-    }
-
-    @Test
-    public void testCnxManagerTimeout() throws Exception {
-        Random rand = new Random();
-        byte b = (byte) rand.nextInt();
-        int deadPort = PortAssignment.unique();
-        String deadAddress = new String("10.1.1." + b);
-            
-        LOG.info("This is the dead address I'm trying: " + deadAddress);
-            
-        peers.put(Long.valueOf(2),
-                new QuorumServer(2,
-                        new InetSocketAddress(deadAddress, deadPort),
-                        new InetSocketAddress(deadAddress, PortAssignment.unique())));
-        tmpdir[2] = ClientBase.createTmpDir();
-        port[2] = deadPort;
-            
-        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 2, 2, 2);
-        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
-        QuorumCnxManager.Listener listener = cnxManager.listener;
-        if(listener != null){
-            listener.start();
-        } else {
-            LOG.error("Null listener when initializing cnx manager");
-        }
-
-        long begin = System.currentTimeMillis();
-        cnxManager.toSend(new Long(2), createMsg(ServerState.LOOKING.ordinal(), 1, -1, 1));
-        long end = System.currentTimeMillis();
-            
-        if((end - begin) > 6000) Assert.fail("Waited more than necessary");
-        
-    }       
-    
-    /**
-     * Tests a bug in QuorumCnxManager that causes a spin lock
-     * when a negative value is sent. This test checks if the 
-     * connection is being closed upon a message with negative
-     * length.
-     * 
-     * @throws Exception
-     */
-    @Test
-    public void testCnxManagerSpinLock() throws Exception {               
-        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 2, 2, 2);
-        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
-        QuorumCnxManager.Listener listener = cnxManager.listener;
-        if(listener != null){
-            listener.start();
-        } else {
-            LOG.error("Null listener when initializing cnx manager");
-        }
-        
-        int port = peers.get(peer.getId()).electionAddr.getPort();
-        LOG.info("Election port: " + port);
-        InetSocketAddress addr = new InetSocketAddress(port);
-        
-        Thread.sleep(1000);
-        
-        SocketChannel sc = SocketChannel.open();
-        sc.socket().connect(peers.get(new Long(1)).electionAddr, 5000);
-        
-        /*
-         * Write id first then negative length.
-         */
-        byte[] msgBytes = new byte[8];
-        ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
-        msgBuffer.putLong(new Long(2));
-        msgBuffer.position(0);
-        sc.write(msgBuffer);
-        
-        msgBuffer = ByteBuffer.wrap(new byte[4]);
-        msgBuffer.putInt(-20);
-        msgBuffer.position(0);
-        sc.write(msgBuffer);
-        
-        Thread.sleep(1000);
-        
-        try{
-            /*
-             * Write a number of times until it
-             * detects that the socket is broken.
-             */
-            for(int i = 0; i < 100; i++){
-                msgBuffer.position(0);
-                sc.write(msgBuffer);
-            }
-            Assert.fail("Socket has not been closed");
-        } catch (Exception e) {
-            LOG.info("Socket has been closed as expected");
-        }
-    }
-
-    /*
-     * Test if a receiveConnection is able to timeout on socket errors
-     */
-    @Test
-    public void testSocketTimeout() throws Exception {
-        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 2000, 2, 2);
-        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
-        QuorumCnxManager.Listener listener = cnxManager.listener;
-        if(listener != null){
-            listener.start();
-        } else {
-            LOG.error("Null listener when initializing cnx manager");
-        }
-        int port = peers.get(peer.getId()).electionAddr.getPort();
-        LOG.info("Election port: " + port);
-        InetSocketAddress addr = new InetSocketAddress(port);
-        Thread.sleep(1000);
-        
-        Socket sock = new Socket();
-        sock.connect(peers.get(new Long(1)).electionAddr, 5000);
-        long begin = System.currentTimeMillis();
-        // Read without sending data. Verify timeout.
-        cnxManager.receiveConnection(sock);
-        long end = System.currentTimeMillis();
-        if((end - begin) > ((peer.getSyncLimit() * peer.getTickTime()) + 500)) Assert.fail("Waited more than necessary");
-    }
-}

+ 123 - 0
src/java/test/org/apache/zookeeper/test/SaslAuthFailTest.java

@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.TestableZooKeeper;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.Test;
+import org.junit.Assert;
+
+public class SaslAuthFailTest extends ClientBase {
+    static {
+        System.setProperty("zookeeper.authProvider.1","org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
+        System.setProperty("zookeeper.allowSaslFailedClients","true");
+
+        try {
+            File tmpDir = createTmpDir();
+            File saslConfFile = new File(tmpDir, "jaas.conf");
+            FileWriter fwriter = new FileWriter(saslConfFile);
+
+            fwriter.write("" +
+                    "Server {\n" +
+                    "          org.apache.zookeeper.server.auth.DigestLoginModule required\n" +
+                    "          user_super=\"test\";\n" +
+                    "};\n" +
+                    "Client {\n" +
+                    "       org.apache.zookeeper.server.auth.DigestLoginModule required\n" +
+                    "       username=\"super\"\n" +
+                    "       password=\"test1\";\n" + // NOTE: wrong password ('test' != 'test1') : this is to test SASL authentication failure.
+                    "};" + "\n");
+            fwriter.close();
+            System.setProperty("java.security.auth.login.config",saslConfFile.getAbsolutePath());
+        }
+        catch (IOException e) {
+            // could not create tmp directory to hold JAAS conf file.
+        }
+    }
+
+    private AtomicInteger authFailed = new AtomicInteger(0);
+    
+    @Override
+    protected TestableZooKeeper createClient(String hp)
+    throws IOException, InterruptedException
+    {
+        File tmpDir = ClientBase.createTmpDir();
+        File saslConfFile = new File(tmpDir, "jaas_bad_password.conf");
+        FileWriter fwriter = new FileWriter(saslConfFile);
+
+        fwriter.write("" +
+                "Server {\n" +
+                "          org.apache.zookeeper.server.auth.DigestLoginModule required\n" +
+                "          user_super=\"test\";\n" +
+                "};\n" +
+                "Client {\n" +
+                "       org.apache.zookeeper.server.auth.DigestLoginModule required\n" +
+                "       username=\"super\"\n" +
+                "       password=\"test1\";\n" + // NOTE: wrong password to cause authentication failure : 'test' != 'test1'.
+                "};" + "\n");
+        fwriter.close();
+        System.setProperty("java.security.auth.login.config",saslConfFile.getAbsolutePath());
+        MyWatcher watcher = new MyWatcher();
+        return createClient(watcher, hp);
+    }
+
+    private class MyWatcher extends CountdownWatcher {
+        @Override
+        public synchronized void process(WatchedEvent event) {
+            if (event.getState() == KeeperState.AuthFailed) {
+                authFailed.incrementAndGet();
+            }
+            else {
+                super.process(event);
+            }
+        }
+    }
+
+    @Test
+    public void testBadSaslAuthNotifiesWatch() throws Exception {
+        ZooKeeper zk = createClient();
+        Thread.sleep(1000);
+        zk.close();
+    }
+
+    
+    @Test
+    public void testAuthFail() throws Exception {
+        ZooKeeper zk = createClient();
+        Thread.sleep(1000);
+        try {
+            zk.create("/path1", null, Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
+            Assert.fail("Should have gotten exception.");
+        } catch(Exception e ) {
+            // ok, exception as expected.
+            LOG.info("Got exception as expected: " + e);
+        } finally {
+            zk.close();
+        }
+    }
+}

+ 173 - 0
src/java/test/org/apache/zookeeper/test/SaslAuthTest.java

@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.TestableZooKeeper;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SaslAuthTest extends ClientBase {
+    static {
+        System.setProperty("zookeeper.authProvider.1","org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
+
+        try {
+            File tmpDir = createTmpDir();
+            File saslConfFile = new File(tmpDir, "jaas.conf");
+            FileWriter fwriter = new FileWriter(saslConfFile);
+
+            fwriter.write("" +
+                    "Server {\n" +
+                    "          org.apache.zookeeper.server.auth.DigestLoginModule required\n" +
+                    "          user_super=\"test\";\n" +
+                    "};\n" +
+                    "Client {\n" +
+                    "       org.apache.zookeeper.server.auth.DigestLoginModule required\n" +
+                    "       username=\"super\"\n" +
+                    "       password=\"test\";\n" +
+                    "};" + "\n");
+            fwriter.close();
+            System.setProperty("java.security.auth.login.config",saslConfFile.getAbsolutePath());
+        }
+        catch (IOException e) {
+            // could not create tmp directory to hold JAAS conf file : test will fail now.
+        }
+    }
+
+    private AtomicInteger authFailed = new AtomicInteger(0);
+    
+    @Override
+    protected TestableZooKeeper createClient(String hp)
+    throws IOException, InterruptedException
+    {
+        File tmpDir = ClientBase.createTmpDir();
+        File saslConfFile = new File(tmpDir, "jaas.conf");
+        FileWriter fwriter = new FileWriter(saslConfFile);
+
+        fwriter.write("" +
+                "Server {\n" +
+                "          org.apache.zookeeper.server.auth.DigestLoginModule required\n" +
+                "          user_super=\"test\";\n" +
+                "};\n" +
+                "Client {\n" +
+                "       org.apache.zookeeper.server.auth.DigestLoginModule required\n" +
+                "       username=\"super\"\n" +
+                "       password=\"test\";\n" +
+                "};" + "\n");
+        fwriter.close();
+        System.setProperty("java.security.auth.login.config",saslConfFile.getAbsolutePath());
+        MyWatcher watcher = new MyWatcher();
+        return createClient(watcher, hp);
+    }
+
+    private class MyWatcher extends CountdownWatcher {
+        @Override
+        public synchronized void process(WatchedEvent event) {
+            if (event.getState() == KeeperState.AuthFailed) {
+                authFailed.incrementAndGet();
+            }
+            else {
+                super.process(event);
+            }
+        }
+    }
+
+    @Test
+    public void testBadSaslAuthNotifiesWatch() throws Exception {
+        ZooKeeper zk = createClient();
+        Thread.sleep(1000);
+        zk.close();
+    }
+
+    
+    @Test
+    public void testAuth() throws Exception {
+        ZooKeeper zk = createClient();
+        Thread.sleep(1000);
+        try {
+            zk.create("/path1", null, Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
+            Thread.sleep(1000);
+        } finally {
+            zk.close();
+        }
+    }
+
+    @Test
+    public void testValidSaslIds() throws Exception {
+        ZooKeeper zk = createClient();
+        Thread.sleep(1000);
+
+        List<String> validIds = new ArrayList<String>();
+        validIds.add("user");
+        validIds.add("service/host.name.com");
+        validIds.add("user@KERB.REALM");
+        validIds.add("service/host.name.com@KERB.REALM");
+
+        int i = 0;
+        for(String validId: validIds) {
+            List<ACL> aclList = new ArrayList<ACL>();
+            ACL acl = new ACL(0,new Id("sasl",validId));
+            aclList.add(acl);
+            zk.create("/valid"+i,null,aclList,CreateMode.PERSISTENT);
+            i++;
+        }
+    }
+
+    @Test
+    public void testInvalidSaslIds() throws Exception {
+        ZooKeeper zk = createClient();
+        Thread.sleep(1000);
+
+        List<String> invalidIds = new ArrayList<String>();
+        invalidIds.add("user@KERB.REALM/server.com");
+        invalidIds.add("user@KERB.REALM1@KERB.REALM2");
+
+        int i = 0;
+        for(String invalidId: invalidIds) {
+            List<ACL> aclList = new ArrayList<ACL>();
+            try {
+                ACL acl = new ACL(0,new Id("sasl",invalidId));
+                aclList.add(acl);
+                zk.create("/invalid"+i,null,aclList,CreateMode.PERSISTENT);
+                Assert.fail("SASLAuthenticationProvider.isValid() failed to catch invalid Id.");
+            }
+            catch (KeeperException.InvalidACLException e) {
+                // ok.
+            }
+            finally {
+                i++;
+            }
+        }
+    }
+
+}

+ 9 - 0
src/zookeeper.jute

@@ -116,6 +116,15 @@ module org.apache.zookeeper.proto {
     class SetDataResponse {
         org.apache.zookeeper.data.Stat stat;
     }
+    class GetSASLRequest {
+        buffer token;
+    }
+    class SetSASLRequest {
+        buffer token;
+    }
+    class SetSASLResponse {
+        buffer token;
+    }
     class CreateRequest {
         ustring path;
         buffer data;