浏览代码

ZOOKEEPER-1355. Add zk.updateServerList(newServerList) (Alex Shraer, Marshall McMullen via fpj)


git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1410731 13f79535-47bb-0310-9956-ffa450edef68
Flavio Paiva Junqueira 12 年之前
父节点
当前提交
ed62259873

+ 3 - 0
CHANGES.txt

@@ -6,6 +6,9 @@ BUGFIXES:
 
 Backward compatible changes:
 
+NEW FEATURES:
+  ZOOKEEPER-1355. Add zk.updateServerList(newServerList) (Alex Shraer, Marshall McMullen via fpj)
+
 BUGFIXES:
 
   ZOOKEEPER-786. Exception in ZooKeeper.toString

+ 21 - 7
src/c/Makefile.am

@@ -19,7 +19,8 @@ libhashtable_la_SOURCES = $(HASHTABLE_SRC)
 COMMON_SRC = src/zookeeper.c include/zookeeper.h include/zookeeper_version.h include/zookeeper_log.h\
     src/recordio.c include/recordio.h include/proto.h \
     src/zk_adaptor.h generated/zookeeper.jute.c \
-    src/zookeeper_log.h src/zk_log.c src/zk_hashtable.h src/zk_hashtable.c
+    src/zookeeper_log.h src/zk_log.c src/zk_hashtable.h src/zk_hashtable.c \
+	src/addrvec.h src/addrvec.c
 
 # These are the symbols (classes, mostly) we want to export from our library.
 EXPORT_SYMBOLS = '(zoo_|zookeeper_|zhandle|Z|format_log_message|log_message|logLevel|deallocate_|zerror|is_unrecoverable)'
@@ -70,13 +71,26 @@ endif
 EXTRA_DIST+=$(wildcard ${srcdir}/tests/*.cc) $(wildcard ${srcdir}/tests/*.h) \
     ${srcdir}/tests/wrappers.opt ${srcdir}/tests/wrappers-mt.opt
 
-TEST_SOURCES = tests/TestDriver.cc tests/LibCMocks.cc tests/LibCSymTable.cc \
-    tests/MocksBase.cc  tests/ZKMocks.cc tests/Util.cc tests/ThreadingUtil.cc \
+# These tests are ordered in a logical manner such that each builds upon basic
+# functionality tested in prior tests. e.g. the most basic functionality is
+# tested in TestZookeeperInit and TestZookeeperClose and as such should be tested
+# first as a foundation with more complex test suites to follow.
+TEST_SOURCES = \
+	tests/TestDriver.cc \
+	tests/LibCMocks.cc \
+	tests/LibCSymTable.cc \
+	tests/MocksBase.cc \
+	tests/ZKMocks.cc \
+	tests/Util.cc \
+	tests/ThreadingUtil.cc \
+	tests/TestZookeeperInit.cc \
+	tests/TestZookeeperClose.cc \
+	tests/TestReconfig.cc \
     tests/TestClientRetry.cc \
-    tests/TestOperations.cc tests/TestZookeeperInit.cc \
-    tests/TestZookeeperClose.cc tests/TestClient.cc \
-    tests/TestMulti.cc tests/TestWatchers.cc
-
+	tests/TestOperations.cc \
+	tests/TestMulti.cc \
+	tests/TestClient.cc \
+	tests/TestWatchers.cc
 
 SYMBOL_WRAPPERS=$(shell cat ${srcdir}/tests/wrappers.opt)
 

+ 58 - 0
src/c/include/zookeeper.h

@@ -184,6 +184,7 @@ extern ZOOAPI const int ZOO_AUTH_FAILED_STATE;
 extern ZOOAPI const int ZOO_CONNECTING_STATE;
 extern ZOOAPI const int ZOO_ASSOCIATING_STATE;
 extern ZOOAPI const int ZOO_CONNECTED_STATE;
+extern ZOOAPI const int ZOO_NOTCONNECTED_STATE;
 // @}
 
 /**
@@ -449,6 +450,63 @@ typedef void (*watcher_fn)(zhandle_t *zh, int type,
 ZOOAPI zhandle_t *zookeeper_init(const char *host, watcher_fn fn,
   int recv_timeout, const clientid_t *clientid, void *context, int flags);
 
+/**
+ * \brief update the list of servers this client will connect to.
+ * 
+ * This method allows a client to update the connection string by providing
+ * a new comma separated list of host:port pairs, each corresponding to a
+ * ZooKeeper server. 
+ * 
+ * This function invokes a probabilistic load-balancing algorithm which may cause
+ * the client to disconnect from its current host to achieve expected uniform 
+ * connections per server in the new list. In case the current host to which the
+ * client is connected is not in the new list this call will always cause the
+ * connection to be dropped. Otherwise, the decision is based on whether the 
+ * number of servers has increased or decreased and by how much.
+ * 
+ * If the connection is dropped, the client moves to a special "reconfig" mode
+ * where he chooses a new server to connect to using the probabilistic algorithm.
+ * After finding a server or exhaustively trying all the servers in the new list,
+ * the client moves back to the normal mode of operation where it will pick an
+ * arbitrary server from the 'host' string.
+ * 
+ * See {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1355} for the 
+ * protocol and its evaluation,
+ * 
+ * \param host comma separated host:port pairs, each corresponding to a zk
+ *   server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+ * \return ZOK on success or one of the following errcodes on failure:
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZSYSTEMERROR -- a system (OS) error occured; it's worth checking errno to get details
+ */
+ZOOAPI int zoo_set_servers(zhandle_t *zh, const char *hosts);
+
+/**
+ * \brief cycle to the next server on the next connection attempt.
+ * 
+ * Note: typically this method should NOT be used outside of testing.
+ *
+ * This method allows a client to cycle through the list of servers in it's
+ * connection pool to be used on the next connection attempt. This function does
+ * not actually trigger a connection or state change in any way. Its purpose is
+ * to allow testing changing servers on the fly and the probabilistic load
+ * balancing algorithm.
+ */
+ZOOAPI void zoo_cycle_next_server(zhandle_t *zh);
+
+/**
+ * \brief get current host:port this client is connecting/connected to.
+ * 
+ * Note: typically this method should NOT be used outside of testing.
+ *
+ * This method allows a client to get the current host:port that this client
+ * is either in the process of connecting to or is currently connected to. This
+ * is mainly used for testing purposes but might also come in handy as a general
+ * purpose tool to be used by other clients.
+ */
+ZOOAPI const char* zoo_get_current_server(zhandle_t* zh);
+
 /**
  * \brief close the zookeeper handle and free up any resources.
  * 

+ 219 - 0
src/c/src/addrvec.c

@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <errno.h>
+
+#include "addrvec.h"
+
+#define ADDRVEC_DEFAULT_GROW_AMOUNT 16
+
+void addrvec_init(addrvec_t *avec)
+{
+    assert(avec);
+    avec->next = 0;
+    avec->count = 0;
+    avec->capacity = 0;
+    avec->data = NULL;
+}
+
+void addrvec_free(addrvec_t *avec)
+{
+    if (avec == NULL)
+    {
+        return;
+    }
+
+    avec->next = 0;
+    avec->count = 0;
+    avec->capacity = 0;
+    if (avec->data) {
+        free(avec->data);
+        avec->data = NULL;
+    }
+}
+
+int addrvec_alloc(addrvec_t *avec)
+{
+    addrvec_init(avec);
+    return addrvec_grow_default(avec);
+}
+
+int addrvec_alloc_capacity(addrvec_t* avec, uint32_t capacity)
+{
+    addrvec_init(avec);
+    return addrvec_grow(avec, capacity);
+}
+
+int addrvec_grow(addrvec_t *avec, uint32_t grow_amount)
+{
+    assert(avec);
+
+    if (grow_amount == 0)
+    {
+        return 0;
+    }
+
+    // Save off old data and capacity in case there is a realloc failure
+    unsigned int old_capacity = avec->capacity;
+    struct sockaddr_storage *old_data = avec->data;
+
+    avec->capacity += grow_amount;
+    avec->data = realloc(avec->data, sizeof(*avec->data) * avec->capacity);
+    if (avec->data == NULL)
+    {
+        avec->capacity = old_capacity;
+        avec->data = old_data;
+        errno = ENOMEM;
+        return 1;
+    }
+
+    return 0;
+}
+
+int addrvec_grow_default(addrvec_t *avec)
+{
+    return addrvec_grow(avec, ADDRVEC_DEFAULT_GROW_AMOUNT);
+}
+
+static int addrvec_grow_if_full(addrvec_t *avec)
+{
+    assert(avec);
+    if (avec->count == avec->capacity)
+    {
+        int rc = addrvec_grow_default(avec);
+        if (rc != 0)
+        {
+            return rc;
+        }
+    }
+
+    return 0;
+}
+
+int addrvec_contains(const addrvec_t *avec, const struct sockaddr_storage *addr)
+{
+    if (!avec || !addr)
+    { 
+        return 0;
+    }
+
+    int i = 0;
+    for (i = 0; i < avec->count; i++)
+    {
+        if(memcmp(&avec->data[i], addr, INET_ADDRSTRLEN) == 0)
+            return 1;
+    }
+
+    return 0;
+}
+
+int addrvec_append(addrvec_t *avec, const struct sockaddr_storage *addr)
+{
+    assert(avec);
+    assert(addr);
+
+    int rc = addrvec_grow_if_full(avec);
+    if (rc != 0)
+    {
+        return rc;
+    }
+
+    // Copy addrinfo into address list
+    memcpy(avec->data + avec->count, addr, sizeof(*addr));
+    ++avec->count;
+
+    return 0;
+}
+
+int addrvec_append_addrinfo(addrvec_t *avec, const struct addrinfo *addrinfo)
+{
+    assert(avec);
+    assert(addrinfo);
+
+    int rc = addrvec_grow_if_full(avec);
+    if (rc != 0)
+    {
+        return rc;
+    }
+
+    // Copy addrinfo into address list
+    memcpy(avec->data + avec->count, addrinfo->ai_addr, addrinfo->ai_addrlen);
+    ++avec->count;
+
+    return 0;
+}
+
+void addrvec_shuffle(addrvec_t *avec)
+{
+    int i = 0;
+    for (i = avec->count - 1; i > 0; --i) {
+        long int j = random()%(i+1);
+        if (i != j) {
+            struct sockaddr_storage t = avec->data[i];
+            avec->data[i] = avec->data[j];
+            avec->data[j] = t;
+        }
+    }
+}
+
+int addrvec_hasnext(const addrvec_t *avec)
+{
+    return avec->count > 0 && (avec->next < avec->count);
+}
+
+int addrvec_atend(const addrvec_t *avec)
+{
+    return avec->count > 0 && avec->next >= avec->count;
+}
+
+void addrvec_next(addrvec_t *avec, struct sockaddr_storage *next)
+{
+    // If we're at the end of the list, then reset index to start
+    if (addrvec_atend(avec))
+    {
+        avec->next = 0;
+    }
+
+    if (!addrvec_hasnext(avec))
+    {
+        next = NULL;
+        return;
+    }
+
+    *next = avec->data[avec->next++];
+}
+
+int addrvec_eq(const addrvec_t *a1, const addrvec_t *a2)
+{
+    if (a1->count != a2->count)
+    {
+        return 0;
+    }
+
+    int i;
+    for (i = 0; i < a1->count; ++i)
+    {
+        if (!addrvec_contains(a2, &a1->data[i]))
+            return 0;
+    }
+
+    return 1;
+}

+ 130 - 0
src/c/src/addrvec.h

@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ADDRVEC_H_
+#define ADDRVEC_H_
+
+#include <inttypes.h>
+
+#ifndef WIN32
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#endif
+
+/**
+ * This structure represents a list of addresses. It stores the count of the
+ * number of elements that have been inserted via calls to addrvec_append and
+ * addrvec_append_addrinfo. It also has a capacity field for the number of 
+ * addresses it has the ability to hold without needing to be enlarged.
+ */
+typedef struct _addrvec {
+    unsigned int next;                        // next index to use
+    unsigned int count;                       // number of addresses in this list
+    unsigned int capacity;                    // number of address this list can hold
+    struct sockaddr_storage *data;   // list of addresses
+} addrvec_t;
+
+/**
+ * Initialize an addrvec by clearing out all its state.
+ */
+void addrvec_init(addrvec_t *avec);
+
+/**
+ * Free any memory used internally by an addrvec
+ */
+void addrvec_free(addrvec_t *avec);
+
+/**
+ * Allocate an addrvec with a default capacity (16)
+ */
+int addrvec_alloc(addrvec_t *avec);
+
+/**
+ * Allocates an addrvec with a specified capacity
+ */
+int addrvec_alloc_capacity(addrvec_t *avec, uint32_t capacity);
+
+/**
+ * Grow an addrvec by the specified amount. This will increase the capacity
+ * of the vector and not the contents.
+ */
+int addrvec_grow(addrvec_t *avec, uint32_t grow_amount);
+
+/**
+ * Similar to addrvec_grow but uses a default growth amount of 16.
+ */
+int addrvec_grow_default(addrvec_t *avec);
+
+/**
+ * Check if an addrvec contains the specificed sockaddr_storage value.
+ * \returns 1 if it contains the value and 0 otherwise.
+ */
+int addrvec_contains(const addrvec_t *avec, const struct sockaddr_storage *addr);
+
+/**
+ * Append the given sockaddr_storage pointer into the addrvec. The contents of
+ * the given 'addr' are copied into the addrvec via memcpy.
+ */
+int addrvec_append(addrvec_t *avec, const struct sockaddr_storage *addr);
+
+/**
+ * Append the given addrinfo pointer into the addrvec. The contents of the given
+ * 'addrinfo' are copied into the addrvec via memcpy.
+ */
+int addrvec_append_addrinfo(addrvec_t *avec, const struct addrinfo *addrinfo);
+
+/**
+ * Shuffle the addrvec so that it's internal list of addresses are randomized.
+ * Uses random() and assumes it has been properly seeded.
+ */
+void addrvec_shuffle(addrvec_t *avec);
+
+/**
+ * Determine if the addrvec has a next element (e.g. it's safe to call addrvec_next)
+ * 
+ * \returns 1 if it has a next element and 0 otherwise
+ */
+int addrvec_hasnext(const addrvec_t *avec);
+
+/**
+ * Determine if the addrvec is at the end or not. Specifically, this means a
+ * subsequent call to addrvec_next will loop around to the start again.
+ */
+int addrvec_atend(const addrvec_t *avec);
+
+/**
+ * Get the next entry from the addrvec and update the associated index. 
+ * 
+ * If the current index points at (or after) the last element in the vector then
+ * it will loop back around and start at the beginning of the list.
+ */
+void addrvec_next(addrvec_t *avec, struct sockaddr_storage *next);
+
+/**
+ * Compare two addrvecs for equality. 
+ * 
+ * \returns 1 if the contents of the two lists are identical and and 0 otherwise.
+ */
+int addrvec_eq(const addrvec_t *a1, const addrvec_t *a2);
+
+#endif // ADDRVEC_H
+
+
+

+ 14 - 0
src/c/src/mt_adaptor.c

@@ -255,6 +255,7 @@ int adaptor_init(zhandle_t *zh)
     zh->adaptor_priv = adaptor_threads;
     pthread_mutex_init(&zh->to_process.lock,0);
     pthread_mutex_init(&adaptor_threads->zh_lock,0);
+    pthread_mutex_init(&adaptor_threads->reconfig_lock,0);
     // to_send must be recursive mutex    
     pthread_mutexattr_init(&recursive_mx_attr);
     pthread_mutexattr_settype(&recursive_mx_attr, PTHREAD_MUTEX_RECURSIVE);
@@ -515,6 +516,19 @@ __attribute__((constructor)) int32_t get_xid()
     return fetch_and_add(&xid,1);
 }
 
+void lock_reconfig(struct _zhandle *zh)
+{
+    struct adaptor_threads *adaptor = zh->adaptor_priv;
+    if(adaptor)
+        pthread_mutex_lock(&adaptor->reconfig_lock);
+}
+void unlock_reconfig(struct _zhandle *zh)
+{
+    struct adaptor_threads *adaptor = zh->adaptor_priv;
+    if(adaptor)
+        pthread_mutex_unlock(&adaptor->reconfig_lock);
+}
+
 void enter_critical(zhandle_t* zh)
 {
     struct adaptor_threads *adaptor = zh->adaptor_priv;

+ 4 - 0
src/c/src/st_adaptor.c

@@ -95,5 +95,9 @@ int32_t get_xid()
     }
     return xid++;
 }
+
+void lock_reconfig(struct _zhandle *zh){}
+void unlock_reconfig(struct _zhandle *zh){}
+
 void enter_critical(zhandle_t* zh){}
 void leave_critical(zhandle_t* zh){}

+ 60 - 33
src/c/src/zk_adaptor.h

@@ -28,6 +28,7 @@
 #endif
 #include "zookeeper.h"
 #include "zk_hashtable.h"
+#include "addrvec.h"
 
 /* predefined xid's values recognized as special by the server */
 #define WATCHER_EVENT_XID -1 
@@ -156,10 +157,11 @@ struct prime_struct {
 struct adaptor_threads {
      pthread_t io;
      pthread_t completion;
-     int threadsToWait;         // barrier
-     pthread_cond_t cond;       // barrier's conditional
-     pthread_mutex_t lock;      // ... and a lock
-     pthread_mutex_t zh_lock;   // critical section lock
+     int threadsToWait;             // barrier
+     pthread_cond_t cond;           // barrier's conditional
+     pthread_mutex_t lock;          // ... and a lock
+     pthread_mutex_t zh_lock;       // critical section lock
+     pthread_mutex_t reconfig_lock; // lock for reconfiguring cluster's ensemble
 #ifdef WIN32
      SOCKET self_pipe[2];
 #else
@@ -179,52 +181,71 @@ typedef struct _auth_list_head {
 /**
  * This structure represents the connection to zookeeper.
  */
-
 struct _zhandle {
 #ifdef WIN32
-    SOCKET fd; /* the descriptor used to talk to zookeeper */
+    SOCKET fd;                          // the descriptor used to talk to zookeeper
 #else
-    int fd; /* the descriptor used to talk to zookeeper */
+    int fd;                             // the descriptor used to talk to zookeeper
 #endif
-    char *hostname; /* the hostname of zookeeper */
-    struct sockaddr_storage *addrs; /* the addresses that correspond to the hostname */
-    int addrs_count; /* The number of addresses in the addrs array */
-    watcher_fn watcher; /* the registered watcher */
-    struct timeval last_recv; /* The time that the last message was received */
-    struct timeval last_send; /* The time that the last message was sent */
-    struct timeval last_ping; /* The time that the last PING was sent */
-    struct timeval next_deadline; /* The time of the next deadline */
-    int recv_timeout; /* The maximum amount of time that can go by without 
-     receiving anything from the zookeeper server */
-    buffer_list_t *input_buffer; /* the current buffer being read in */
-    buffer_head_t to_process; /* The buffers that have been read and are ready to be processed. */
-    buffer_head_t to_send; /* The packets queued to send */
-    completion_head_t sent_requests; /* The outstanding requests */
-    completion_head_t completions_to_process; /* completions that are ready to run */
-    int connect_index; /* The index of the address to connect to */
-    clientid_t client_id;
-    long long last_zxid;
-    int outstanding_sync; /* Number of outstanding synchronous requests */
-    struct _buffer_list primer_buffer; /* The buffer used for the handshake at the start of a connection */
-    struct prime_struct primer_storage; /* the connect response */
-    char primer_storage_buffer[40]; /* the true size of primer_storage */
-    volatile int state;
-    void *context;
-    auth_list_head_t auth_h; /* authentication data list */
+
+    // Hostlist and list of addresses
+    char *hostname;                     // hostname contains list of zookeeper servers to connect to
+    struct sockaddr_storage addr_cur;   // address of server we're currently connecting/connected to 
+
+    addrvec_t addrs;                    // current list of addresses we're connected to
+    addrvec_t addrs_old;                // old list of addresses that we are no longer connected to
+    addrvec_t addrs_new;                // new list of addresses to connect to if we're reconfiguring
+
+    int reconfig;                       // Are we in the process of reconfiguring cluster's ensemble
+    double pOld, pNew;                  // Probability for selecting between 'addrs_old' and 'addrs_new'
+    int delay;
+
+    watcher_fn watcher;                 // the registered watcher
+
+    // Message timings
+    struct timeval last_recv;           // time last message was received
+    struct timeval last_send;           // time last message was sent
+    struct timeval last_ping;           // time last PING was sent
+    struct timeval next_deadline;       // time of the next deadline
+    int recv_timeout;                   // max receive timeout for messages from server
+
+    // Buffers
+    buffer_list_t *input_buffer;        // current buffer being read in
+    buffer_head_t to_process;           // buffers that have been read and ready to be processed
+    buffer_head_t to_send;              // packets queued to send
+    completion_head_t sent_requests;    // outstanding requests
+    completion_head_t completions_to_process; // completions that are ready to run
+    int outstanding_sync;               // number of outstanding synchronous requests
+
+    // State info
+    volatile int state;                 // Current zookeeper state
+    void *context;                      // client-side provided context
+    clientid_t client_id;               // client-id
+    long long last_zxid;                // last zookeeper ID
+    auth_list_head_t auth_h;            // authentication data list
+
+    // Primer storage
+    struct _buffer_list primer_buffer;  // The buffer used for the handshake at the start of a connection
+    struct prime_struct primer_storage; // the connect response
+    char primer_storage_buffer[40];     // the true size of primer_storage
+
     /* zookeeper_close is not reentrant because it de-allocates the zhandler. 
      * This guard variable is used to defer the destruction of zhandle till 
      * right before top-level API call returns to the caller */
     int32_t ref_counter;
     volatile int close_requested;
     void *adaptor_priv;
+
     /* Used for debugging only: non-zero value indicates the time when the zookeeper_process
      * call returned while there was at least one unprocessed server response 
      * available in the socket recv buffer */
     struct timeval socket_readable;
-    
+
+    // Watchers
     zk_hashtable* active_node_watchers;   
     zk_hashtable* active_exist_watchers;
     zk_hashtable* active_child_watchers;
+
     /** used for chroot path at the client side **/
     char *chroot;
 };
@@ -246,13 +267,19 @@ void free_duplicate_path(const char* free_path, const char* path);
 void zoo_lock_auth(zhandle_t *zh);
 void zoo_unlock_auth(zhandle_t *zh);
 
+// ensemble reconfigure access guards
+void lock_reconfig(struct _zhandle *zh);
+void unlock_reconfig(struct _zhandle *zh);
+
 // critical section guards
 void enter_critical(zhandle_t* zh);
 void leave_critical(zhandle_t* zh);
+
 // zhandle object reference counting
 void api_prolog(zhandle_t* zh);
 int api_epilog(zhandle_t *zh, int rc);
 int32_t get_xid();
+
 // returns the new value of the ref counter
 int32_t inc_ref_counter(zhandle_t* zh,int i);
 

文件差异内容过多而无法显示
+ 462 - 116
src/c/src/zookeeper.c


+ 598 - 0
src/c/tests/TestReconfig.cc

@@ -0,0 +1,598 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cppunit/extensions/HelperMacros.h>
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <errno.h>
+#include <iostream>
+#include <sstream>
+#include <arpa/inet.h>
+#include <exception>
+#include <stdlib.h>
+
+#include "Util.h"
+#include "LibCMocks.h"
+#include "ZKMocks.h"
+
+using namespace std;
+
+static const int portOffset = 2000;
+
+class Client
+{
+
+private:    
+    // Member variables
+    zhandle_t *zh;
+    unsigned int seed;
+
+public:
+    /**
+     * Create a client with given connection host string and add to our internal
+     * vector of clients. These are disconnected and cleaned up in tearDown().
+     */
+    Client(const string hosts, unsigned int seed) :
+        seed((seed * seed) + 0xAFAFAFAF)
+    {
+        reSeed();
+
+        zh = zookeeper_init(hosts.c_str(),0,1000,0,0,0);
+        CPPUNIT_ASSERT(zh);
+
+        reSeed();
+
+        cycleNextServer();
+    }
+
+    void close()
+    {
+        zookeeper_close(zh);
+        zh = NULL;
+    }
+
+    bool isReconfig()
+    {
+        return zh->reconfig != 0;
+    }
+
+    /**
+     * re-seed this client with it's own previously generated seed so its
+     * random choices are unique and separate from the other clients
+     */
+    void reSeed()
+    {
+        srandom(seed);
+        srand48(seed);
+    }
+
+    /**
+     * Get the server that this client is currently connected to.
+     */
+    string getServer()
+    {
+        const char* addrstring = zoo_get_current_server(zh);
+        return string(addrstring);
+    }
+
+    /**
+     * Get the server this client is currently connected to with no port
+     * specification.
+     */
+    string getServerNoPort()
+    {
+        string addrstring = getServer();
+
+        size_t found = addrstring.find(":");
+        CPPUNIT_ASSERT(found != string::npos);
+
+        return addrstring.substr(0, found);
+    }
+
+    /**
+     * Get the port of the server this client is currently connected to.
+     */
+    uint32_t getServerPort()
+    {
+        string addrstring = getServer();
+
+        size_t found = addrstring.find(":");
+        CPPUNIT_ASSERT(found != string::npos);
+
+        string portStr = addrstring.substr(found+1);
+
+        stringstream ss(portStr);
+        uint32_t port;
+        ss >> port;
+
+        CPPUNIT_ASSERT(port >= portOffset);
+
+        return port;
+    }
+
+    /**
+     * Cycle to the next available server on the next connect attempt. It also
+     * calls into getServer (above) to return the server connected to.
+     */ 
+    string cycleNextServer()
+    {
+        zoo_cycle_next_server(zh);
+        return getServer();
+    }
+
+    void cycleUntilServer(const string requested)
+    {
+        // Call cycleNextServer until the one it's connected to is the one
+        // specified (disregarding port).
+        string first;
+
+        while(true)
+        {
+            string next = cycleNextServer();
+            if (first.empty())
+            {
+                first = next;
+            } 
+            // Else we've looped around!
+            else if (first == next)
+            {
+                CPPUNIT_ASSERT(false);
+            }
+
+            // Strip port off
+            string server = getServerNoPort();
+
+            // If it matches the requested host we're now 'connected' to the right host
+            if (server == requested)
+            {
+                break;
+            }
+        }
+    }
+
+    /**
+     * Set servers for this client.
+     */
+    void setServers(const string new_hosts)
+    {
+        int rc = zoo_set_servers(zh, new_hosts.c_str());
+        CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
+    }
+
+    /**
+     * Set servers for this client and validate reconfig value matches expected.
+     */
+    void setServersAndVerifyReconfig(const string new_hosts, bool is_reconfig)
+    {
+        setServers(new_hosts);
+        CPPUNIT_ASSERT_EQUAL(is_reconfig, isReconfig());
+    }
+
+    /**
+     * Sets the server list this client is connecting to AND if this requires
+     * the client to be reconfigured (as dictated by internal client policy)
+     * then it will trigger a call to cycleNextServer.
+     */
+    void setServersAndCycleIfNeeded(const string new_hosts)
+    {
+        setServers(new_hosts);
+        if (isReconfig())
+        {
+            cycleNextServer();
+        }
+    }
+};
+
+class Zookeeper_reconfig : public CPPUNIT_NS::TestFixture
+{
+    CPPUNIT_TEST_SUITE(Zookeeper_reconfig);
+
+    // Test cases
+    CPPUNIT_TEST(testcycleNextServer);
+    CPPUNIT_TEST(testMigrateOrNot);
+    CPPUNIT_TEST(testMigrationCycle);
+
+    // In threaded mode each 'create' is a thread -- it's not practical to create
+    // 10,000 threads to test load balancing. The load balancing code can easily
+    // be tested in single threaded mode as concurrency doesn't affect the algorithm.
+#ifndef THREADED
+    CPPUNIT_TEST(testMigrateProbability);
+    CPPUNIT_TEST(testLoadBalancing);
+#endif
+
+    CPPUNIT_TEST_SUITE_END();
+
+    FILE *logfile;
+
+    double slackPercent;
+    static const int numClients = 10000;
+    static const int portOffset = 2000;
+
+    vector<Client> clients;
+    vector<uint32_t> numClientsPerHost;
+
+public:
+    Zookeeper_reconfig() :
+        slackPercent(10.0)
+    {
+      logfile = openlogfile("Zookeeper_reconfig");
+    }
+
+    ~Zookeeper_reconfig() 
+    {
+      if (logfile) 
+      {
+        fflush(logfile);
+        fclose(logfile);
+        logfile = 0;
+      }
+    }
+
+    void setUp()
+    {
+        zoo_set_log_stream(logfile);
+        zoo_deterministic_conn_order(1);
+
+        numClientsPerHost.resize(numClients);
+    }
+
+    void tearDown()
+    {
+        for (int i = 0; i < clients.size(); i++)
+        {
+            clients.at(i).close();
+        }
+    }
+
+    /**
+     * Create a client with given connection host string and add to our internal
+     * vector of clients. These are disconnected and cleaned up in tearDown().
+     */
+    Client& createClient(const string hosts)
+    {
+        Client client(hosts, clients.size());
+        clients.push_back(client);
+
+        return clients.back();
+    }
+
+    /**
+     * Same as createClient(hosts) only it takes a specific host that this client
+     * should simulate being connected to.
+     */
+    Client& createClient(const string hosts, const string host)
+    {
+        // Ensure requested host is in the list
+        size_t found = hosts.find(host);
+        CPPUNIT_ASSERT(found != hosts.npos);
+
+        Client client(hosts, clients.size());
+        client.cycleUntilServer(host);
+        clients.push_back(client);
+
+        return clients.back();
+    }
+
+    /**
+     * Create a connection host list starting at 'start' and stopping at 'stop'
+     * where start >= stop. This creates a connection string with host:port pairs
+     * separated by commas. The given 'octet' is the starting octet that is used
+     * as the last octet in the host's IP. This is decremented on each iteration. 
+     * Each port will be portOffset + octet.
+     */
+    string createHostList(uint32_t start, uint32_t stop = 1, uint32_t octet = 0)
+    {
+        if (octet == 0)
+        {
+            octet = start;
+        }
+
+        stringstream ss;
+
+        for (int i = start; i >= stop; i--, octet--)
+        {
+            ss << "10.10.10." << octet << ":" << portOffset + octet;
+
+            if (i > stop)
+            {
+                ss << ", ";
+            }
+        }
+
+        return ss.str();
+    }
+
+    /**
+     * Gets the lower bound of the number of clients per server that we expect
+     * based on the probabilistic load balancing algorithm implemented by the
+     * client code.
+     */
+    double lowerboundClientsPerServer(int numClients, int numServers)
+    {
+        return (1 - slackPercent/100.0) * numClients / numServers;
+    }
+
+    /**
+     * Gets the upper bound of the number of clients per server that we expect
+     * based on the probabilistic load balancing algorithm implemented by the
+     * client code.
+     */
+    double upperboundClientsPerServer(int numClients, int numServers)
+    {
+        return (1 + slackPercent/100.0) * numClients / numServers;
+    }
+
+    /**
+     * Update all the clients to use a new list of servers. This will also cause
+     * the client to cycle to the next server as needed (e.g. due to a reconfig).
+     * It then updates the number of clients connected to the server based on
+     * this change.
+     * 
+     * Afterwards it validates that all of the servers have the correct amount of
+     * clients based on the probabilistic load balancing algorithm.
+     */
+    void updateAllClientsAndServers(int start, int stop = 1)
+    {
+        string newServers = createHostList(start, stop);
+        int numServers = start - stop + 1;
+
+        for (int i = 0; i < numClients; i++) {
+
+            Client &client = clients.at(i);
+            client.reSeed();
+
+            client.setServersAndCycleIfNeeded(newServers);
+            numClientsPerHost.at(client.getServerPort() - portOffset - 1)++;
+        }
+
+        int offset = stop - 1;
+        for (int index = offset; index < numServers; index++) {
+
+            if (numClientsPerHost.at(index) > upperboundClientsPerServer(numClients, numServers))
+            {
+                cout << "INDEX=" << index << " too many -- actual=" << numClientsPerHost.at(index) 
+                     << " expected=" << upperboundClientsPerServer(numClients, numServers) << endl;
+            }
+
+
+            CPPUNIT_ASSERT(numClientsPerHost.at(index) <= upperboundClientsPerServer(numClients, numServers));
+
+            if (numClientsPerHost.at(index) < lowerboundClientsPerServer(numClients, numServers))
+            {
+                cout << "INDEX=" << index << " too few -- actual=" << numClientsPerHost.at(index) 
+                     << " expected=" << lowerboundClientsPerServer(numClients, numServers) << endl;
+            }
+
+            CPPUNIT_ASSERT(numClientsPerHost.at(index) >= lowerboundClientsPerServer(numClients, numServers));
+            numClientsPerHost.at(index) = 0; // prepare for next test
+        }
+    }
+
+    /*-------------------------------------------------------------------------*
+     * TESTCASES
+     *------------------------------------------------------------------------*/
+
+    /**
+     * Very basic sunny day test to ensure basic functionality of zoo_set_servers
+     * and zoo_cycle_next_server.
+     */
+    void testcycleNextServer()
+    {
+        const string initial_hosts = createHostList(10); // 2010..2001
+        const string new_hosts = createHostList(4);      // 2004..2001
+
+        Client &client = createClient(initial_hosts);
+
+        client.setServersAndVerifyReconfig(new_hosts, true);
+
+        for (int i = 0; i < 10; i++)
+        {
+            string next = client.cycleNextServer();
+        }
+    }
+
+    /**
+     * Test the migration policy implicit within the probabilistic load balancing
+     * algorithm the Client implements. Tests all the corner cases whereby the
+     * list of servers is decreased, increased, and stays the same. Also combines
+     * various combinations of the currently connected server being in the new
+     * configuration and not.
+     */
+    void testMigrateOrNot()
+    {
+        const string initial_hosts = createHostList(4); // 2004..2001
+
+        Client &client = createClient(initial_hosts, "10.10.10.3");
+
+        // Ensemble size decreasing, my server is in the new list
+        client.setServersAndVerifyReconfig(createHostList(3), false);
+
+        // Ensemble size decreasing, my server is NOT in the new list
+        client.setServersAndVerifyReconfig(createHostList(2), true);
+
+        // Ensemble size stayed the same, my server is NOT in the new list
+        client.setServersAndVerifyReconfig(createHostList(2), true);
+
+        // Ensemble size increased, my server is not in the new ensemble
+        client.setServers(createHostList(4));
+        client.cycleUntilServer("10.10.10.1");
+        client.setServersAndVerifyReconfig(createHostList(7,2), true);
+    }
+
+    /**
+     * This tests that as a client is in reconfig mode it will properly try to
+     * connect to all the new servers first. Then it will try to connect to all
+     * the 'old' servers that are staying in the new configuration. Finally it
+     * will fallback to the normal behavior of trying servers in round-robin.
+     */
+    void testMigrationCycle()
+    {
+        int num_initial = 4;
+        const string initial_hosts = createHostList(num_initial); // {2004..2001}
+
+        int num_new = 10;
+        string new_hosts = createHostList(12, 3);      // {2012..2003}
+
+        // servers from the old list that appear in the new list {2004..2003}
+        int num_staying = 2;
+        string oldStaying = createHostList(4, 3);
+
+        // servers in the new list that are not in the old list  {2012..2005}
+        int num_coming = 8;
+        string newComing = createHostList(12, 5);
+
+        // Ensemble in increasing in size, my server is not in the new ensemble
+        // load on the old servers must be decreased, so must connect to one of
+        // new servers (pNew = 1)
+        Client &client = createClient(initial_hosts, "10.10.10.1");
+        client.setServersAndVerifyReconfig(new_hosts, true);
+
+        // Since we're in reconfig mode, next connect should be from new list
+        // We should try all the new servers *BEFORE* trying any old servers
+        string seen;
+        for (int i = 0; i < num_coming; i++) {
+            string next = client.cycleNextServer();
+
+            // Assert next server is in the 'new' list
+            size_t found = newComing.find(next);
+            CPPUNIT_ASSERT(found != string::npos);
+
+            // Assert not in seen list then append
+            found = seen.find(next);
+            CPPUNIT_ASSERT(found == string::npos);
+            seen += found + ", ";
+        }
+
+        // Now it should start connecting to the old servers
+        seen.clear();
+        for (int i = 0; i < num_staying; i++) {
+            string next = client.cycleNextServer();
+
+            // Assert it's in the old list
+            size_t found = oldStaying.find(next);
+            CPPUNIT_ASSERT(found != string::npos);
+
+            // Assert not in seen list then append
+            found = seen.find(next);
+            CPPUNIT_ASSERT(found == string::npos);
+            seen += found + ", ";
+        }
+
+        // NOW it goes back to normal as we've tried all the new and old
+        string first = client.cycleNextServer();
+        for (int i = 0; i < num_new - 1; i++) {
+            client.cycleNextServer();
+        }
+
+        CPPUNIT_ASSERT_EQUAL(first, client.cycleNextServer());
+    }
+
+    /**
+     * Test the migration probability to ensure that it conforms to our expected
+     * lower and upper bounds of the number of clients per server as we are 
+     * reconfigured.
+     * 
+     * In this case, the list of servers is increased and the client's server is
+     * in the new list. Whether to move or not depends on the difference of
+     * server sizes with probability 1 - |old|/|new| the client disconnects.
+     * 
+     * In the test below 1-9/10 = 1/10 chance of disconnecting
+     */
+    void testMigrateProbability()
+    {
+        const string initial_hosts = createHostList(9); // 10.10.10.9:2009...10.10.10.1:2001
+        string new_hosts = createHostList(10); // 10.10.10.10:2010...10.10.10.1:2001
+
+        uint32_t numDisconnects = 0;
+        for (int i = 0; i < numClients; i++) {
+            Client &client = createClient(initial_hosts, "10.10.10.3");
+            client.setServers(new_hosts);
+            if (client.isReconfig())
+            {
+                numDisconnects++;
+            }
+        }
+
+        // should be numClients/10 in expectation, we test that it's numClients/10 +- slackPercent
+        CPPUNIT_ASSERT(numDisconnects < upperboundClientsPerServer(numClients, 10));
+    }
+
+    /**
+     * Tests the probabilistic load balancing algorithm implemented by the Client
+     * code. 
+     * 
+     * Test strategy:
+     * 
+     * (1) Start with 9 servers and 10,000 clients. Remove a server, update
+     *     everything, and ensure that the clients are redistributed properly.
+     * 
+     * (2) Remove two more nodes and repeat the same validations of proper client
+     *     redistribution. Ensure no clients are connected to the two removed
+     *     nodes.
+     * 
+     * (3) Remove the first server in the list and simultaneously add the three
+     *     previously removed servers. Ensure everything is redistributed and
+     *     no clients are connected to the one missing node.
+     * 
+     * (4) Add the one missing server back into the mix and validate.
+     */
+    void testLoadBalancing()
+    {
+        zoo_deterministic_conn_order(0);
+
+        int rc = ZOK;
+
+        uint32_t numServers = 9;
+        const string initial_hosts = createHostList(numServers); // 10.10.10.9:2009...10.10.10.1:2001
+
+        // Create connections to servers
+        for (int i = 0; i < numClients; i++) {
+            Client &client = createClient(initial_hosts);
+            numClientsPerHost.at(client.getServerPort() - portOffset - 1)++;
+        }
+
+        for (int i = 0; i < numServers; i++) {
+            CPPUNIT_ASSERT(numClientsPerHost.at(i) <= upperboundClientsPerServer(numClients, numServers));
+            CPPUNIT_ASSERT(numClientsPerHost.at(i) >= lowerboundClientsPerServer(numClients, numServers));
+            numClientsPerHost.at(i) = 0; // prepare for next test
+        }
+
+        // remove last server
+        numServers = 8;
+        updateAllClientsAndServers(numServers);
+        CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(numServers));
+
+        // Remove two more nodes
+        numServers = 6;
+        updateAllClientsAndServers(numServers);
+        CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(numServers));
+        CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(numServers+1));
+        CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(numServers+2));
+
+        // remove host 0 (first one in list) and add back 6, 7, and 8
+        numServers = 8;
+        updateAllClientsAndServers(numServers, 1);
+        CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(0));
+
+        // add back host number 0
+        numServers = 9;
+        updateAllClientsAndServers(numServers);
+    }
+};
+
+CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_reconfig);

+ 9 - 8
src/c/tests/TestZookeeperClose.cc

@@ -102,7 +102,7 @@ public:
         // memory
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.hostname));
-        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs));
+        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs.data));
         // This cannot be maintained properly CPPUNIT_ASSERT_EQUAL(9,freeMock.callCounter);
     }
     void testCloseUnconnected1()
@@ -128,7 +128,7 @@ public:
         // memory
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.hostname));
-        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs));
+        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs.data));
         // the close request sent?
         CPPUNIT_ASSERT_EQUAL(1,zkMock.counter);
     }
@@ -140,6 +140,7 @@ public:
 
         zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
         CPPUNIT_ASSERT(zh!=0);
+        CPPUNIT_ASSERT_EQUAL(ZOO_NOTCONNECTED_STATE, zoo_state(zh));
 
         Mock_gettimeofday timeMock;
         
@@ -173,7 +174,7 @@ public:
         // memory
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.hostname));
-        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs));
+        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs.data));
         // the close request sent?
         CPPUNIT_ASSERT_EQUAL(1,(int)zkServer.closeSent);
     }
@@ -218,7 +219,7 @@ public:
         // memory
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.hostname));
-        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.addrs));
+        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.addrs.data));
         // make sure the close request NOT sent
         CPPUNIT_ASSERT_EQUAL(0,(int)zkServer.closeSent);
     }
@@ -249,7 +250,7 @@ public:
         // memory
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.hostname));
-        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs));
+        CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs.data));
         CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(adaptor));
         // Cannot be maintained accurately: CPPUNIT_ASSERT_EQUAL(10,freeMock.callCounter);
         // threads
@@ -313,7 +314,7 @@ public:
             // memory
             CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
             CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.hostname));
-            CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs));
+            CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh.addrs.data));
             CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(adaptor));
             // threads
             CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(adaptor->io));
@@ -378,7 +379,7 @@ public:
             // memory
             CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh));
             CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.hostname));
-            CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.addrs));
+            CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.addrs.data));
             CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(adaptor));
             // threads
             CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(adaptor->io));
@@ -444,7 +445,7 @@ public:
             // memory
             CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(lzh));
             CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.hostname));
-            CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.addrs));
+            CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(closeAction.lzh.addrs.data));
             CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(adaptor));
             // threads
             CPPUNIT_ASSERT_EQUAL(1,CheckedPthread::getDestroyCounter(adaptor->io));

+ 22 - 22
src/c/tests/TestZookeeperInit.cc

@@ -93,7 +93,7 @@ public:
     void testBasic()
     {
         const string EXPECTED_HOST("127.0.0.1:2121");
-        const int EXPECTED_ADDRS_COUNT =1;
+        const unsigned int EXPECTED_ADDRS_COUNT =1;
         const int EXPECTED_RECV_TIMEOUT=10000;
         clientid_t cid;
         memset(&cid,0xFE,sizeof(cid));
@@ -101,16 +101,16 @@ public:
         zh=zookeeper_init(EXPECTED_HOST.c_str(),watcher,EXPECTED_RECV_TIMEOUT,
                 &cid,(void*)1,0);
 
-        CPPUNIT_ASSERT(zh!=0);
+        CPPUNIT_ASSERT(zh != NULL);
         CPPUNIT_ASSERT(zh->fd == -1);
-        CPPUNIT_ASSERT(zh->hostname!=0);
-        CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDRS_COUNT,zh->addrs_count);
+        CPPUNIT_ASSERT(zh->hostname != NULL);
+        CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDRS_COUNT,zh->addrs.count);
         CPPUNIT_ASSERT_EQUAL(EXPECTED_HOST,string(zh->hostname));
-        CPPUNIT_ASSERT(zh->state == NOTCONNECTED_STATE_DEF);
+        CPPUNIT_ASSERT(zh->state == ZOO_NOTCONNECTED_STATE);
         CPPUNIT_ASSERT(zh->context == (void*)1);
         CPPUNIT_ASSERT_EQUAL(EXPECTED_RECV_TIMEOUT,zh->recv_timeout);
         CPPUNIT_ASSERT(zh->watcher == watcher);
-        CPPUNIT_ASSERT(zh->connect_index==0);
+        CPPUNIT_ASSERT(zh->addrs.next==0);
         CPPUNIT_ASSERT(zh->primer_buffer.buffer==zh->primer_storage_buffer);
         CPPUNIT_ASSERT(zh->primer_buffer.curr_offset ==0);
         CPPUNIT_ASSERT(zh->primer_buffer.len == sizeof(zh->primer_storage_buffer));
@@ -136,15 +136,15 @@ public:
     void testAddressResolution()
     {
         const char EXPECTED_IPS[][4]={{127,0,0,1}};
-        const int EXPECTED_ADDRS_COUNT =COUNTOF(EXPECTED_IPS);
+        const unsigned int EXPECTED_ADDRS_COUNT =COUNTOF(EXPECTED_IPS);
 
         zoo_deterministic_conn_order(1);
         zh=zookeeper_init("127.0.0.1:2121",0,10000,0,0,0);
 
         CPPUNIT_ASSERT(zh!=0);
-        CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDRS_COUNT,zh->addrs_count);
-        for(int i=0;i<zh->addrs_count;i++){
-            sockaddr_in* addr=(struct sockaddr_in*)&zh->addrs[i];
+        CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDRS_COUNT,zh->addrs.count);
+        for(int i=0;i<zh->addrs.count;i++){
+            sockaddr_in* addr=(struct sockaddr_in*)&zh->addrs.data[i];
             CPPUNIT_ASSERT(memcmp(EXPECTED_IPS[i],&addr->sin_addr,sizeof(addr->sin_addr))==0);
             CPPUNIT_ASSERT_EQUAL(2121,(int)ntohs(addr->sin_port));
         }
@@ -153,16 +153,16 @@ public:
     {
         const string EXPECTED_HOST("127.0.0.1:2121,127.0.0.2:3434");
         const char EXPECTED_IPS[][4]={{127,0,0,1},{127,0,0,2}};
-        const int EXPECTED_ADDRS_COUNT =COUNTOF(EXPECTED_IPS);
+        const unsigned int EXPECTED_ADDRS_COUNT =COUNTOF(EXPECTED_IPS);
 
         zoo_deterministic_conn_order(1);
         zh=zookeeper_init(EXPECTED_HOST.c_str(),0,1000,0,0,0);
 
         CPPUNIT_ASSERT(zh!=0);
-        CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDRS_COUNT,zh->addrs_count);
+        CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDRS_COUNT,zh->addrs.count);
 
-        for(int i=0;i<zh->addrs_count;i++){
-            sockaddr_in* addr=(struct sockaddr_in*)&zh->addrs[i];
+        for(int i=0;i<zh->addrs.count;i++){
+            sockaddr_in* addr=(struct sockaddr_in*)&zh->addrs.data[i];
             CPPUNIT_ASSERT(memcmp(EXPECTED_IPS[i],&addr->sin_addr,sizeof(addr->sin_addr))==0);
             if(i<1)
                 CPPUNIT_ASSERT_EQUAL(2121,(int)ntohs(addr->sin_port));
@@ -174,16 +174,16 @@ public:
     { 
         const string EXPECTED_HOST("127.0.0.1:2121,  127.0.0.2:3434");
         const char EXPECTED_IPS[][4]={{127,0,0,1},{127,0,0,2}};
-        const int EXPECTED_ADDRS_COUNT =COUNTOF(EXPECTED_IPS);
+        const unsigned int EXPECTED_ADDRS_COUNT =COUNTOF(EXPECTED_IPS);
 
         zoo_deterministic_conn_order(1);
         zh=zookeeper_init(EXPECTED_HOST.c_str(),0,1000,0,0,0);
 
         CPPUNIT_ASSERT(zh!=0);
-        CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDRS_COUNT,zh->addrs_count);
+        CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDRS_COUNT,zh->addrs.count);
 
-        for(int i=0;i<zh->addrs_count;i++){
-            sockaddr_in* addr=(struct sockaddr_in*)&zh->addrs[i];
+        for(int i=0;i<zh->addrs.count;i++){
+            sockaddr_in* addr=(struct sockaddr_in*)&zh->addrs.data[i];
             CPPUNIT_ASSERT(memcmp(EXPECTED_IPS[i],&addr->sin_addr,sizeof(addr->sin_addr))==0);
             if(i<1)
                 CPPUNIT_ASSERT_EQUAL(2121,(int)ntohs(addr->sin_port));
@@ -277,7 +277,7 @@ public:
     void testPermuteAddrsList()
     {
         const char EXPECTED[][5]={"\0\0\0\0","\1\1\1\1","\2\2\2\2","\3\3\3\3"};
-        const int EXPECTED_ADDR_COUNT=COUNTOF(EXPECTED);
+        const unsigned int EXPECTED_ADDR_COUNT=COUNTOF(EXPECTED);
 
         const int RAND_SEQ[]={0,1,1,-1};
         const int RAND_SIZE=COUNTOF(RAND_SEQ);
@@ -286,11 +286,11 @@ public:
         zh=zookeeper_init("0.0.0.0:123,1.1.1.1:123,2.2.2.2:123,3.3.3.3:123",0,1000,0,0,0);
 
         CPPUNIT_ASSERT(zh!=0);
-        CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDR_COUNT,zh->addrs_count);
+        CPPUNIT_ASSERT_EQUAL(EXPECTED_ADDR_COUNT,zh->addrs.count);
         const string EXPECTED_SEQ("3210");
         char ACTUAL_SEQ[EXPECTED_ADDR_COUNT+1]; ACTUAL_SEQ[EXPECTED_ADDR_COUNT]=0;
-        for(int i=0;i<zh->addrs_count;i++){
-            sockaddr_in* addr=(struct sockaddr_in*)&zh->addrs[i];
+        for(int i=0;i<zh->addrs.count;i++){
+            sockaddr_in* addr=(struct sockaddr_in*)&zh->addrs.data[i];
             // match the first byte of the EXPECTED and of the actual address
             ACTUAL_SEQ[i]=((char*)&addr->sin_addr)[0]+'0';
         }

+ 6 - 0
src/c/tests/ZKMocks.cc

@@ -507,7 +507,13 @@ void ZookeeperServer::notifyBufferSent(const std::string& buffer){
 void forceConnected(zhandle_t* zh){
     // simulate connected state
     zh->state=ZOO_CONNECTED_STATE;
+    
+    // Simulate we're connected to the first host in our host list
     zh->fd=ZookeeperServer::FD;
+    assert(zh->addrs.count > 0);
+    zh->addr_cur = zh->addrs.data[0];
+    zh->addrs.next++;
+
     zh->input_buffer=0;
     gettimeofday(&zh->last_recv,0);    
     gettimeofday(&zh->last_send,0);    

+ 34 - 0
src/docs/src/documentation/content/xdocs/zookeeperProgrammers.xml

@@ -541,6 +541,40 @@
       and the second client will be disconnected (causing the pair to attempt to re-establish
       it's connection/session indefinitely).</para>
 
+    <para> <emphasis role="bold">Updating the list of servers</emphasis>.  We allow a client to 
+      update the connection string by providing a new comma separated list of host:port pairs, 
+      each corresponding to a ZooKeeper server. The function invokes a probabilistic load-balancing 
+      algorithm which may cause the client to disconnect from its current host with the goal
+      to achieve expected uniform number of connections per server in the new list. 
+      In case the current host to which the client is connected is not in the new list
+      this call will always cause the connection to be dropped. Otherwise, the decision
+	  is based on whether the number of servers has increased or decreased and by how much.
+
+    <para>
+      For example, if the previous connection string contained 3 hosts and now the list contains
+      these 3 hosts and 2 more hosts, 40% of clients connected to each of the 3 hosts will
+      move to one of the new hosts in order to balance the load. The algorithm will cause the client 
+      to drop its connection to the current host to which it is connected with probability 0.4 and in this 
+	  case cause the client to connect to one of the 2 new hosts, chosen at random.
+    </para>
+
+	<para>
+	  Another example -- suppose we have 5 hosts and now update the list to remove 2 of the hosts, 
+	  the clients connected to the 3 remaining hosts will stay connected, whereas all clients connected 
+	  to the 2 removed hosts will need to move to one of the 3 hosts, chosen at random. If the connection
+	  is dropped, the client moves to a special mode where he chooses a new server to connect to using the
+	  probabilistic algorithm, and not just round robin. 
+    </para>
+
+    <para>
+	  In the first example, each client decides to disconnect with probability 0.4 but once the decision is
+	  made, it will try to connect to a random new server and only if it cannot connect to any of the new 
+	  servers will it try to connect to the old ones. After finding a server, or trying all servers in the 
+	  new list and failing to connect, the client moves back to the normal mode of operation where it picks
+	  an arbitrary server from the connectString and attempt to connect to it. If that fails, is will continue
+	  trying different random servers in round robin. (see above the algorithm used to initially choose a server)
+    </para>
+
   </section>
 
   <section id="ch_zkWatches">

+ 63 - 3
src/java/main/org/apache/zookeeper/ZooKeeper.java

@@ -19,6 +19,7 @@
 package org.apache.zookeeper;
 
 import org.apache.zookeeper.AsyncCallback.*;
+import org.apache.zookeeper.ClientCnxn.SendThread;
 import org.apache.zookeeper.OpResult.ErrorResult;
 import org.apache.zookeeper.client.ConnectStringParser;
 import org.apache.zookeeper.client.HostProvider;
@@ -33,7 +34,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.net.UnknownHostException;
 import java.util.*;
 
 /**
@@ -94,6 +98,61 @@ public class ZooKeeper {
         LOG = LoggerFactory.getLogger(ZooKeeper.class);
         Environment.logEnv("Client environment:", LOG);
     }
+    
+    private final StaticHostProvider hostProvider;
+    
+    /**
+     * This function allows a client to update the connection string by providing 
+     * a new comma separated list of host:port pairs, each corresponding to a 
+     * ZooKeeper server. 
+     * <p>
+     * The function invokes a <a href="https://issues.apache.org/jira/browse/ZOOKEEPER-1355">
+     * probabilistic load-balancing algorithm</a> which may cause the client to disconnect from 
+     * its current host with the goal to achieve expected uniform number of connections per server 
+     * in the new list. In case the current host to which the client is connected is not in the new
+     * list this call will always cause the connection to be dropped. Otherwise, the decision
+     * is based on whether the number of servers has increased or decreased and by how much.
+     * For example, if the previous connection string contained 3 hosts and now the list contains
+     * these 3 hosts and 2 more hosts, 40% of clients connected to each of the 3 hosts will
+     * move to one of the new hosts in order to balance the load. The algorithm will disconnect 
+     * from the current host with probability 0.4 and in this case cause the client to connect 
+     * to one of the 2 new hosts, chosen at random.
+     * <p>
+     * If the connection is dropped, the client moves to a special mode "reconfigMode" where he chooses
+     * a new server to connect to using the probabilistic algorithm. After finding a server,
+     * or exhausting all servers in the new list after trying all of them and failing to connect,
+     * the client moves back to the normal mode of operation where it will pick an arbitrary server
+     * from the connectString and attempt to connect to it. If establishment of
+     * the connection fails, another server in the connect string will be tried
+     * (the order is non-deterministic, as we random shuffle the list), until a
+     * connection is established. The client will continue attempts until the
+     * session is explicitly closed (or the session is expired by the server).
+
+     * @param connectString
+     *            comma separated host:port pairs, each corresponding to a zk
+     *            server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+     *            If the optional chroot suffix is used the example would look
+     *            like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
+     *            where the client would be rooted at "/app/a" and all paths
+     *            would be relative to this root - ie getting/setting/etc...
+     *            "/foo/bar" would result in operations being run on
+     *            "/app/a/foo/bar" (from the server perspective).     
+     *
+     * @throws IOException in cases of network failure     
+     */
+    public void updateServerList(String connectString) throws IOException {
+        ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
+        Collection<InetSocketAddress> serverAddresses = connectStringParser.getServerAddresses();
+
+        ClientCnxnSocket clientCnxnSocket = cnxn.sendThread.getClientCnxnSocket();
+        InetSocketAddress currentHost = (InetSocketAddress) clientCnxnSocket.getRemoteSocketAddress();
+
+        boolean reconfigMode = hostProvider.updateServerList(serverAddresses, currentHost);
+
+        // cause disconnection - this will cause next to be called
+        // which will in turn call nextReconfigMode
+        if (reconfigMode) clientCnxnSocket.testableCloseSocket();
+    }
 
     public ZooKeeperSaslClient getSaslClient() {
         return cnxn.zooKeeperSaslClient;
@@ -442,7 +501,8 @@ public class ZooKeeper {
 
         ConnectStringParser connectStringParser = new ConnectStringParser(
                 connectString);
-        HostProvider hostProvider = new StaticHostProvider(
+        
+        hostProvider = new StaticHostProvider(
                 connectStringParser.getServerAddresses());
         cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                 hostProvider, sessionTimeout, this, watchManager,
@@ -580,10 +640,10 @@ public class ZooKeeper {
                 + (sessionPasswd == null ? "<null>" : "<hidden>"));
 
         watchManager.defaultWatcher = watcher;
-
+       
         ConnectStringParser connectStringParser = new ConnectStringParser(
                 connectString);
-        HostProvider hostProvider = new StaticHostProvider(
+        hostProvider = new StaticHostProvider(
                 connectStringParser.getServerAddresses());
         cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                 hostProvider, sessionTimeout, this, watchManager,

+ 11 - 0
src/java/main/org/apache/zookeeper/client/HostProvider.java

@@ -19,6 +19,8 @@
 package org.apache.zookeeper.client;
 
 import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
 
 /**
  * A set of hosts a ZooKeeper client should connect to.
@@ -58,4 +60,13 @@ public interface HostProvider {
      * The HostProvider may use this notification to reset it's inner state.
      */
     public void onConnected();
+
+    /**
+     * Update the list of servers. This returns true if changing connections is necessary for load-balancing, false otherwise.
+     * @param serverAddresses new host list
+     * @param currentHost the host to which this client is currently connected
+     * @return true if changing connections is necessary for load-balancing, false otherwise  
+     */
+	boolean updateServerList(Collection<InetSocketAddress> serverAddresses, InetSocketAddress currentHost)
+			throws UnknownHostException;
 }

+ 220 - 15
src/java/main/org/apache/zookeeper/client/StaticHostProvider.java

@@ -25,11 +25,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Random;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Most simple HostProvider, resolves only on instantiation.
  * 
@@ -38,13 +38,30 @@ public final class StaticHostProvider implements HostProvider {
     private static final Logger LOG = LoggerFactory
             .getLogger(StaticHostProvider.class);
 
-    private final List<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>(
+    private List<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>(
             5);
 
+    private Random sourceOfRandomness;
     private int lastIndex = -1;
 
     private int currentIndex = -1;
 
+    /**
+     * The following fields are used to migrate clients during reconfiguration
+     */
+    private boolean reconfigMode = false;
+
+    private final List<InetSocketAddress> oldServers = new ArrayList<InetSocketAddress>(
+            5);
+
+    private final List<InetSocketAddress> newServers = new ArrayList<InetSocketAddress>(
+            5);
+
+    private int currentIndexOld = -1;
+    private int currentIndexNew = -1;
+
+    private float pOld, pNew;
+
     /**
      * Constructs a SimpleHostSet.
      * 
@@ -56,46 +73,234 @@ public final class StaticHostProvider implements HostProvider {
      */
     public StaticHostProvider(Collection<InetSocketAddress> serverAddresses)
             throws UnknownHostException {
+       sourceOfRandomness = new Random(System.currentTimeMillis() ^ this.hashCode());
+
+        this.serverAddresses = resolveAndShuffle(serverAddresses);
+        if (this.serverAddresses.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "A HostProvider may not be empty!");
+        }       
+        currentIndex = -1;
+        lastIndex = -1;              
+    }
+
+    /**
+     * Constructs a SimpleHostSet. This constructor is used from StaticHostProviderTest to produce deterministic test results
+     * by initializing sourceOfRandomness with the same seed
+     * 
+     * @param serverAddresses
+     *            possibly unresolved ZooKeeper server addresses
+     * @param randomnessSeed a seed used to initialize sourceOfRandomnes
+     * @throws UnknownHostException
+     * @throws IllegalArgumentException
+     *             if serverAddresses is empty or resolves to an empty list
+     */
+    public StaticHostProvider(Collection<InetSocketAddress> serverAddresses, long randomnessSeed)
+            throws UnknownHostException {
+        sourceOfRandomness = new Random(randomnessSeed);
+
+        this.serverAddresses = resolveAndShuffle(serverAddresses);
+        if (this.serverAddresses.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "A HostProvider may not be empty!");
+        }       
+        currentIndex = -1;
+        lastIndex = -1;              
+    }
+
+    private List<InetSocketAddress> resolveAndShuffle(Collection<InetSocketAddress> serverAddresses)
+            throws UnknownHostException {
+        List<InetSocketAddress> tmpList = new ArrayList<InetSocketAddress>(serverAddresses.size());       
         for (InetSocketAddress address : serverAddresses) {
             InetAddress resolvedAddresses[] = InetAddress.getAllByName(address
                     .getHostName());
             for (InetAddress resolvedAddress : resolvedAddresses) {
-                this.serverAddresses.add(new InetSocketAddress(resolvedAddress
+                tmpList.add(new InetSocketAddress(resolvedAddress
                         .getHostAddress(), address.getPort()));
             }
         }
+        Collections.shuffle(tmpList, sourceOfRandomness);
+        return tmpList;
+    } 
 
-        if (this.serverAddresses.isEmpty()) {
+
+    /**
+     * Update the list of servers. This returns true if changing connections is necessary for load-balancing, false
+	 * otherwise. Changing connections is necessary if one of the following holds: 
+     * a) the host to which this client is currently connected is not in serverAddresses.
+     *    Otherwise (if currentHost is in the new list serverAddresses):   
+     * b) the number of servers in the cluster is increasing - in this case the load on currentHost should decrease,
+     *    which means that SOME of the clients connected to it will migrate to the new servers. The decision whether
+     *    this client migrates or not (i.e., whether true or false is returned) is probabilistic so that the expected 
+     *    number of clients connected to each server is the same.
+     *    
+     * If true is returned, the function sets pOld and pNew that correspond to the probability to migrate to ones of the
+     * new servers in serverAddresses or one of the old servers (migrating to one of the old servers is done only
+     * if our client's currentHost is not in serverAddresses). See nextHostInReconfigMode for the selection logic.
+     * 
+     * See {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1355} for the protocol and its evaluation, and
+	 * StaticHostProviderTest for the tests that illustrate how load balancing works with this policy.
+     * @param serverAddresses new host list
+     * @param currentHost the host to which this client is currently connected
+     * @return true if changing connections is necessary for load-balancing, false otherwise  
+     */
+
+
+    @Override
+    public boolean updateServerList(Collection<InetSocketAddress> serverAddresses, InetSocketAddress currentHost) throws UnknownHostException {        
+        // Resolve server addresses and shuffle them
+        List<InetSocketAddress> resolvedList = resolveAndShuffle(serverAddresses);
+        if (resolvedList.isEmpty()) {
             throw new IllegalArgumentException(
                     "A HostProvider may not be empty!");
         }
-        Collections.shuffle(this.serverAddresses);
+        // Check if client's current server is in the new list of servers
+        boolean myServerInNewConfig = false;        
+        for (InetSocketAddress addr : resolvedList) {
+            if (addr.getHostName().equals(currentHost.getHostName())
+                    && addr.getPort() == currentHost.getPort()) {
+                myServerInNewConfig = true;
+                break;
+            }
+        }
+
+        synchronized(this) {
+            reconfigMode = true;
+
+            newServers.clear();
+            oldServers.clear();
+            // Divide the new servers into oldServers that were in the previous list
+            // and newServers that were not in the previous list
+            for (InetSocketAddress resolvedAddress : resolvedList) {                
+                if (this.serverAddresses.contains(resolvedAddress)) {
+                    oldServers.add(resolvedAddress);
+                } else {
+                    newServers.add(resolvedAddress);
+                }
+            }        
+
+            int numOld = oldServers.size();
+            int numNew = newServers.size();                        
+
+            // number of servers increased
+            if (numOld + numNew > this.serverAddresses.size()) {
+                if (myServerInNewConfig) {
+                    // my server is in new config, but load should be decreased.
+                    // Need to decide if this client
+                    // is moving to one of the new servers
+                    if (sourceOfRandomness.nextFloat() <= (1 - ((float) this.serverAddresses
+                            .size()) / (numOld + numNew))) {
+                        pNew = 1;
+                        pOld = 0;
+                    } else {
+                        // do nothing special - stay with the current server
+                        reconfigMode = false;
+                    }
+                } else {
+                    // my server is not in new config, and load on old servers must
+                    // be decreased, so connect to
+                    // one of the new servers
+                    pNew = 1;
+                    pOld = 0;
+                }
+            } else { // number of servers stayed the same or decreased
+                if (myServerInNewConfig) {
+                    // my server is in new config, and load should be increased, so
+                    // stay with this server and do nothing special
+                    reconfigMode = false;
+                } else {
+                    pOld = ((float) (numOld * (this.serverAddresses.size() - (numOld + numNew))))
+                            / ((numOld + numNew) * (this.serverAddresses.size() - numOld));
+                    pNew = 1 - pOld;
+                }
+            }
+
+            this.serverAddresses = resolvedList;    
+            currentIndexOld = -1;
+            currentIndexNew = -1; 
+            currentIndex = -1;
+            lastIndex = -1;                
+            return reconfigMode;
+        }
     }
 
-    public int size() {
+    public synchronized int size() {
         return serverAddresses.size();
     }
 
+    /**
+     * Get the next server to connect to, when in "reconfigMode", which means that 
+     * you've just updated the server list, and now trying to find some server to connect to. 
+     * Once onConnected() is called, reconfigMode is set to false. Similarly, if we tried to connect
+     * to all servers in new config and failed, reconfigMode is set to false.
+     * 
+     * While in reconfigMode, we should connect to a server in newServers with probability pNew and to servers in
+     * oldServers with probability pOld (which is just 1-pNew). If we tried out all servers in either oldServers
+     * or newServers we continue to try servers from the other set, regardless of pNew or pOld. If we tried all servers
+     * we give up and go back to the normal round robin mode
+     *
+     * When called, this should be protected by synchronized(this)
+     */
+    private InetSocketAddress nextHostInReconfigMode() {
+        boolean takeNew = (sourceOfRandomness.nextFloat() <= pNew);
+
+        // take one of the new servers if it is possible (there are still such
+        // servers we didn't try),
+        // and either the probability tells us to connect to one of the new
+        // servers or if we already
+        // tried all the old servers
+        if (((currentIndexNew + 1) < newServers.size())
+                && (takeNew || (currentIndexOld + 1) >= oldServers.size())) {
+            ++currentIndexNew;
+            return newServers.get(currentIndexNew);
+        }
+
+        // start taking old servers
+        if ((currentIndexOld + 1) < oldServers.size()) {
+            ++currentIndexOld;
+            return oldServers.get(currentIndexOld);
+        }
+
+        return null;
+    }
+
     public InetSocketAddress next(long spinDelay) {
-        ++currentIndex;
-        if (currentIndex == serverAddresses.size()) {
-            currentIndex = 0;
+        boolean needToSleep = false;
+        InetSocketAddress addr;
+
+        synchronized(this) {
+            if (reconfigMode) {
+                addr = nextHostInReconfigMode();
+                if (addr != null) return addr;                
+                //tried all servers and couldn't connect
+                reconfigMode = false;
+                needToSleep = (spinDelay > 0);
+            }        
+            ++currentIndex;
+            if (currentIndex == serverAddresses.size()) {
+                currentIndex = 0;
+            }            
+            addr = serverAddresses.get(currentIndex);
+            needToSleep = needToSleep || (currentIndex == lastIndex && spinDelay > 0);
+            if (lastIndex == -1) { 
+                // We don't want to sleep on the first ever connect attempt.
+                lastIndex = 0;
+            }
         }
-        if (currentIndex == lastIndex && spinDelay > 0) {
+        if (needToSleep) {
             try {
                 Thread.sleep(spinDelay);
             } catch (InterruptedException e) {
                 LOG.warn("Unexpected exception", e);
             }
-        } else if (lastIndex == -1) {
-            // We don't want to sleep on the first ever connect attempt.
-            lastIndex = 0;
         }
 
-        return serverAddresses.get(currentIndex);
+        return addr;
     }
 
-    public void onConnected() {
+    public synchronized void onConnected() {
         lastIndex = currentIndex;
+        reconfigMode = false;
     }
+
 }

+ 228 - 3
src/java/test/org/apache/zookeeper/test/StaticHostProviderTest.java

@@ -29,9 +29,12 @@ import org.junit.Test;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Random;
 
 public class StaticHostProviderTest extends ZKTestCase {
-
+    private Random r = new Random(1);
+    
     @Test
     public void testNextGoesRound() throws UnknownHostException {
         HostProvider hostProvider = getHostProvider(2);
@@ -85,14 +88,236 @@ public class StaticHostProviderTest extends ZKTestCase {
         assertNotSame(first, second);
     }
 
+    private final double slackPercent = 10;
+    private final int numClients = 10000;
+
+    @Test
+    public void testUpdateClientMigrateOrNot() throws UnknownHostException {
+        HostProvider hostProvider = getHostProvider(4); // 10.10.10.4:1238, 10.10.10.3:1237, 10.10.10.2:1236, 10.10.10.1:1235
+        Collection<InetSocketAddress> newList = getServerAddresses(3); // 10.10.10.3:1237, 10.10.10.2:1236, 10.10.10.1:1235
+        
+        InetSocketAddress myServer = new InetSocketAddress("10.10.10.3", 1237);
+        
+        // Number of machines becomes smaller, my server is in the new cluster
+        boolean disconnectRequired = hostProvider.updateServerList(newList, myServer);
+        assertTrue(!disconnectRequired);
+
+        // Number of machines stayed the same, my server is in the new cluster
+        disconnectRequired = hostProvider.updateServerList(newList, myServer);
+        assertTrue(!disconnectRequired);
+
+        // Number of machines became smaller, my server is not in the new
+        // cluster
+        newList = getServerAddresses(2); // 10.10.10.2:1236, 10.10.10.1:1235
+        disconnectRequired = hostProvider.updateServerList(newList, myServer);
+        assertTrue(disconnectRequired);
+
+        // Number of machines stayed the same, my server is not in the new
+        // cluster
+        disconnectRequired = hostProvider.updateServerList(newList, myServer);
+        assertTrue(disconnectRequired);
+
+        // Number of machines increased, my server is not in the new cluster
+        newList = new ArrayList<InetSocketAddress>(3);
+        for (int i = 4; i > 1; i--) { // 10.10.10.4:1238, 10.10.10.3:1237, 10.10.10.2:1236
+            newList.add(new InetSocketAddress("10.10.10." + i, 1234 + i));
+        }
+        myServer = new InetSocketAddress("10.10.10.1", 1235);
+        disconnectRequired = hostProvider.updateServerList(newList, myServer);
+        assertTrue(disconnectRequired);
+
+        // Number of machines increased, my server is in the new cluster
+        // Here whether to move or not depends on the difference of cluster
+        // sizes
+        // With probability 1 - |old|/|new} the client disconnects
+        // In the test below 1-9/10 = 1/10 chance of disconnecting
+        HostProvider[] hostProviderArray = new HostProvider[numClients];
+        newList = getServerAddresses(10);
+        int numDisconnects = 0;
+        for (int i = 0; i < numClients; i++) {
+            hostProviderArray[i] = getHostProvider(9);
+            disconnectRequired = hostProviderArray[i].updateServerList(newList, myServer);
+            if (disconnectRequired)
+                numDisconnects++;
+        }
+
+       // should be numClients/10 in expectation, we test that its numClients/10 +- slackPercent 
+        assertTrue(numDisconnects < upperboundCPS(numClients, 10));
+    }
+
+    @Test
+    public void testUpdateMigrationGoesRound() throws UnknownHostException {
+        HostProvider hostProvider = getHostProvider(4);
+        // old list (just the ports): 1238, 1237, 1236, 1235
+        Collection<InetSocketAddress> newList = new ArrayList<InetSocketAddress>(
+                10);
+        for (int i = 12; i > 2; i--) { // 1246, 1245, 1244, 1243, 1242, 1241,
+                                       // 1240, 1239, 1238, 1237
+            newList.add(new InetSocketAddress("10.10.10." + i, 1234 + i));
+        }
+
+        // servers from the old list that appear in the new list
+        Collection<InetSocketAddress> oldStaying = new ArrayList<InetSocketAddress>(2);
+        for (int i = 4; i > 2; i--) { // 1238, 1237
+            oldStaying.add(new InetSocketAddress("10.10.10." + i, 1234 + i));
+        }
+
+        // servers in the new list that are not in the old list
+        Collection<InetSocketAddress> newComing = new ArrayList<InetSocketAddress>(10);
+        for (int i = 12; i > 4; i--) {// 1246, 1245, 1244, 1243, 1242, 1241, 1240, 1139
+            newComing.add(new InetSocketAddress("10.10.10." + i, 1234 + i));
+        }
+
+        // Number of machines increases, my server is not in the new cluster
+        // load on old servers must be decreased, so must connect to one of the
+        // new servers
+        // i.e., pNew = 1.
+        boolean disconnectRequired = hostProvider.updateServerList(newList, new InetSocketAddress("10.10.10.1", 1235));
+        assertTrue(disconnectRequired);
+
+        // This means reconfigMode = true, and nextHostInReconfigMode will be
+        // called from next
+        // Since pNew = 1 we should first try the new servers
+        ArrayList<InetSocketAddress> seen = new ArrayList<InetSocketAddress>();
+        for (int i = 0; i < newComing.size(); i++) {
+            InetSocketAddress addr = hostProvider.next(0);
+            assertTrue(newComing.contains(addr));
+            assertTrue(!seen.contains(addr));
+            seen.add(addr);
+        }
+
+        // Next the old servers
+        seen.clear();
+        for (int i = 0; i < oldStaying.size(); i++) {
+            InetSocketAddress addr = hostProvider.next(0);
+            assertTrue(oldStaying.contains(addr));
+            assertTrue(!seen.contains(addr));
+            seen.add(addr);
+        }
+
+        // And now it goes back to normal next() so it should be everything
+        // together like in testNextGoesRound()
+        InetSocketAddress first = hostProvider.next(0);
+        assertTrue(first != null);
+        for (int i = 0; i < newList.size() - 1; i++) {
+            hostProvider.next(0);
+        }
+
+        assertEquals(first, hostProvider.next(0));
+    }
+
+    @Test
+    public void testUpdateLoadBalancing() throws UnknownHostException {
+        // Start with 9 servers and 10000 clients
+        boolean disconnectRequired;
+        HostProvider[] hostProviderArray = new HostProvider[numClients];
+        InetSocketAddress[] curHostForEachClient = new InetSocketAddress[numClients];
+        int[] numClientsPerHost = new int[9];
+
+        // initialization
+        for (int i = 0; i < numClients; i++) {
+            hostProviderArray[i] = getHostProvider(9);
+            curHostForEachClient[i] = hostProviderArray[i].next(0);
+            numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
+        }
+
+        for (int i = 0; i < 9; i++) {
+            assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 9));
+            assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 9));
+            numClientsPerHost[i] = 0; // prepare for next test
+        }
+
+        // remove host number 8 (the last one in a list of 9 hosts)
+        Collection<InetSocketAddress> newList = getServerAddresses(8);
+
+        for (int i = 0; i < numClients; i++) {
+            disconnectRequired = hostProviderArray[i].updateServerList(newList, curHostForEachClient[i]);
+            if (disconnectRequired) curHostForEachClient[i] = hostProviderArray[i].next(0);
+            numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
+        }
+
+        for (int i = 0; i < 8; i++) {
+            assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 8));
+            assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 8));
+            numClientsPerHost[i] = 0; // prepare for next test
+        }
+        assertTrue(numClientsPerHost[8] == 0);
+
+        // remove hosts number 6 and 7 (the currently last two in the list)
+        newList = getServerAddresses(6);
+
+        for (int i = 0; i < numClients; i++) {
+            disconnectRequired = hostProviderArray[i].updateServerList(newList, curHostForEachClient[i]);
+            if (disconnectRequired) curHostForEachClient[i] = hostProviderArray[i].next(0);
+            numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
+        }
+
+        for (int i = 0; i < 6; i++) {
+            assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 6));
+            assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 6));
+            numClientsPerHost[i] = 0; // prepare for next test
+        }
+        assertTrue(numClientsPerHost[6] == 0);
+        assertTrue(numClientsPerHost[7] == 0);
+        assertTrue(numClientsPerHost[8] == 0);
+
+        // remove host number 0 (the first one in the current list)
+        // and add back hosts 6, 7 and 8
+        newList = new ArrayList<InetSocketAddress>(8);
+        for (int i = 9; i > 1; i--) {
+            newList.add(new InetSocketAddress("10.10.10." + i, 1234 + i));
+        }
+
+        for (int i = 0; i < numClients; i++) {
+            disconnectRequired = hostProviderArray[i].updateServerList(newList, curHostForEachClient[i]);
+            if (disconnectRequired) curHostForEachClient[i] = hostProviderArray[i].next(0);
+            numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
+        }
+
+        assertTrue(numClientsPerHost[0] == 0);
+
+        for (int i = 1; i < 9; i++) {
+            assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 8));
+            assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 8));
+            numClientsPerHost[i] = 0; // prepare for next test
+        }
+
+        // add back host number 0
+        newList = getServerAddresses(9);
+
+        for (int i = 0; i < numClients; i++) {
+            disconnectRequired = hostProviderArray[i].updateServerList(newList, curHostForEachClient[i]);
+            if (disconnectRequired) curHostForEachClient[i] = hostProviderArray[i].next(0);
+            numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
+        }
+
+        for (int i = 0; i < 9; i++) {
+            assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 9));
+            assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 9));
+        }
+    }
+
     private StaticHostProvider getHostProvider(int size)
             throws UnknownHostException {
+        return new StaticHostProvider(getServerAddresses(size), r.nextLong());
+    }
+
+    private Collection<InetSocketAddress> getServerAddresses(int size) {
         ArrayList<InetSocketAddress> list = new ArrayList<InetSocketAddress>(
                 size);
         while (size > 0) {
-            list.add(new InetSocketAddress("10.10.10." + size, 1234));
+            list.add(new InetSocketAddress("10.10.10." + size, 1234 + size));
             --size;
         }
-        return new StaticHostProvider(list);
+        return list;
+    }
+
+    private double lowerboundCPS(int numClients, int numServers) {
+        return (1 - slackPercent/100.0) * numClients / numServers;
+    }
+
+    private double upperboundCPS(int numClients, int numServers) {
+        return (1 + slackPercent/100.0) * numClients / numServers;
     }
+
 }

部分文件因为文件数量过多而无法显示