123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697 |
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include <cppunit/extensions/HelperMacros.h>
- #include <sys/types.h>
- #include <netinet/in.h>
- #include <errno.h>
- #include <iostream>
- #include <sstream>
- #include <arpa/inet.h>
- #include <exception>
- #include <stdlib.h>
- extern "C" {
- #include <src/addrvec.h>
- }
- #include "Util.h"
- #include "LibCMocks.h"
- #include "ZKMocks.h"
- using namespace std;
- static const int portOffset = 2000;
- class Client
- {
- private:
- // Member variables
- zhandle_t *zh;
- unsigned int seed;
- public:
- /**
- * Create a client with given connection host string and add to our internal
- * vector of clients. These are disconnected and cleaned up in tearDown().
- */
- Client(const string hosts, unsigned int seed) :
- seed((seed * seed) + 0xAFAFAFAF)
- {
- reSeed();
- zh = zookeeper_init(hosts.c_str(),0,1000,0,0,0);
- CPPUNIT_ASSERT(zh);
- // Set the flag to disable ZK from reconnecting to a different server.
- // Our reconfig test case will do explicit server shuffling through
- // zoo_cycle_next_server, and the reconnection attempts would interfere
- // with the server states the tests cases assume.
- zh->disable_reconnection_attempt = 1;
- reSeed();
- cycleNextServer();
- }
- void close()
- {
- zookeeper_close(zh);
- zh = NULL;
- }
- bool isReconfig()
- {
- return zh->reconfig != 0;
- }
- /**
- * re-seed this client with it's own previously generated seed so its
- * random choices are unique and separate from the other clients
- */
- void reSeed()
- {
- srandom(seed);
- srand48(seed);
- }
- /**
- * Get the server that this client is currently connected to.
- */
- string getServer()
- {
- const char* addrstring = zoo_get_current_server(zh);
- return string(addrstring);
- }
- /**
- * Get the server this client is currently connected to with no port
- * specification.
- */
- string getServerNoPort()
- {
- string addrstring = getServer();
- size_t found = addrstring.find_last_of(":");
- CPPUNIT_ASSERT(found != string::npos);
- // ipv6 address case (to remove leading and trailing bracket)
- if (addrstring.find("[") != string::npos)
- {
- return addrstring.substr(1, found-2);
- }
- else
- {
- return addrstring.substr(0, found);
- }
- }
- /**
- * Get the port of the server this client is currently connected to.
- */
- uint32_t getServerPort()
- {
- string addrstring = getServer();
- size_t found = addrstring.find_last_of(":");
- CPPUNIT_ASSERT(found != string::npos);
- string portStr = addrstring.substr(found+1);
- stringstream ss(portStr);
- uint32_t port;
- ss >> port;
- CPPUNIT_ASSERT(port >= portOffset);
- return port;
- }
- /**
- * Cycle to the next available server on the next connect attempt. It also
- * calls into getServer (above) to return the server connected to.
- */
- string cycleNextServer()
- {
- zoo_cycle_next_server(zh);
- return getServer();
- }
- void cycleUntilServer(const string requested)
- {
- // Call cycleNextServer until the one it's connected to is the one
- // specified (disregarding port).
- string first;
- while(true)
- {
- string next = cycleNextServer();
- if (first.empty())
- {
- first = next;
- }
- // Else we've looped around!
- else if (first == next)
- {
- CPPUNIT_ASSERT(false);
- }
- // Strip port off
- string server = getServerNoPort();
- // If it matches the requested host we're now 'connected' to the right host
- if (server == requested)
- {
- break;
- }
- }
- }
- /**
- * Set servers for this client.
- */
- void setServers(const string new_hosts)
- {
- int rc = zoo_set_servers(zh, new_hosts.c_str());
- CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
- }
- /**
- * Set servers for this client and validate reconfig value matches expected.
- */
- void setServersAndVerifyReconfig(const string new_hosts, bool is_reconfig)
- {
- setServers(new_hosts);
- CPPUNIT_ASSERT_EQUAL(is_reconfig, isReconfig());
- }
- /**
- * Sets the server list this client is connecting to AND if this requires
- * the client to be reconfigured (as dictated by internal client policy)
- * then it will trigger a call to cycleNextServer.
- */
- void setServersAndCycleIfNeeded(const string new_hosts)
- {
- setServers(new_hosts);
- if (isReconfig())
- {
- cycleNextServer();
- }
- }
- };
- class Zookeeper_reconfig : public CPPUNIT_NS::TestFixture
- {
- CPPUNIT_TEST_SUITE(Zookeeper_reconfig);
- // Test cases
- CPPUNIT_TEST(testcycleNextServer);
- CPPUNIT_TEST(testMigrateOrNot);
- CPPUNIT_TEST(testMigrationCycle);
- CPPUNIT_TEST(testAddrVecContainsIPv4);
- #ifdef AF_INET6
- CPPUNIT_TEST(testAddrVecContainsIPv6);
- #endif
- // In threaded mode each 'create' is a thread -- it's not practical to create
- // 10,000 threads to test load balancing. The load balancing code can easily
- // be tested in single threaded mode as concurrency doesn't affect the algorithm.
- #ifndef THREADED
- CPPUNIT_TEST(testMigrateProbability);
- CPPUNIT_TEST(testLoadBalancing);
- #endif
- CPPUNIT_TEST_SUITE_END();
- FILE *logfile;
- double slackPercent;
- static const int numClients = 10000;
- static const int portOffset = 2000;
- vector<Client> clients;
- vector<uint32_t> numClientsPerHost;
- public:
- Zookeeper_reconfig() :
- slackPercent(10.0)
- {
- logfile = openlogfile("Zookeeper_reconfig");
- }
- ~Zookeeper_reconfig()
- {
- if (logfile)
- {
- fflush(logfile);
- fclose(logfile);
- logfile = 0;
- }
- }
- void setUp()
- {
- zoo_set_log_stream(logfile);
- zoo_deterministic_conn_order(1);
- numClientsPerHost.resize(numClients);
- }
- void tearDown()
- {
- for (unsigned int i = 0; i < clients.size(); i++)
- {
- clients.at(i).close();
- }
- }
- /**
- * Create a client with given connection host string and add to our internal
- * vector of clients. These are disconnected and cleaned up in tearDown().
- */
- Client& createClient(const string hosts)
- {
- Client client(hosts, clients.size());
- clients.push_back(client);
- return clients.back();
- }
- /**
- * Same as createClient(hosts) only it takes a specific host that this client
- * should simulate being connected to.
- */
- Client& createClient(const string hosts, const string host)
- {
- // Ensure requested host is in the list
- size_t found = hosts.find(host);
- CPPUNIT_ASSERT(found != hosts.npos);
- Client client(hosts, clients.size());
- client.cycleUntilServer(host);
- clients.push_back(client);
- return clients.back();
- }
- /**
- * Create a connection host list starting at 'start' and stopping at 'stop'
- * where start >= stop. This creates a connection string with host:port pairs
- * separated by commas. The given 'octet' is the starting octet that is used
- * as the last octet in the host's IP. This is decremented on each iteration.
- * Each port will be portOffset + octet.
- */
- string createHostList(uint32_t start, uint32_t stop = 1, uint32_t octet = 0)
- {
- if (octet == 0)
- {
- octet = start;
- }
- stringstream ss;
- for (uint32_t i = start; i >= stop; i--, octet--)
- {
- ss << "10.10.10." << octet << ":" << portOffset + octet;
- if (i > stop)
- {
- ss << ", ";
- }
- }
- return ss.str();
- }
- /**
- * Gets the lower bound of the number of clients per server that we expect
- * based on the probabilistic load balancing algorithm implemented by the
- * client code.
- */
- double lowerboundClientsPerServer(int numClients, int numServers)
- {
- return (1 - slackPercent/100.0) * numClients / numServers;
- }
- /**
- * Gets the upper bound of the number of clients per server that we expect
- * based on the probabilistic load balancing algorithm implemented by the
- * client code.
- */
- double upperboundClientsPerServer(int numClients, int numServers)
- {
- return (1 + slackPercent/100.0) * numClients / numServers;
- }
- /**
- * Update all the clients to use a new list of servers. This will also cause
- * the client to cycle to the next server as needed (e.g. due to a reconfig).
- * It then updates the number of clients connected to the server based on
- * this change.
- *
- * Afterwards it validates that all of the servers have the correct amount of
- * clients based on the probabilistic load balancing algorithm.
- */
- void updateAllClientsAndServers(int start, int stop = 1)
- {
- string newServers = createHostList(start, stop);
- int numServers = start - stop + 1;
- for (int i = 0; i < numClients; i++) {
- Client &client = clients.at(i);
- client.reSeed();
- client.setServersAndCycleIfNeeded(newServers);
- numClientsPerHost.at(client.getServerPort() - portOffset - 1)++;
- }
- int offset = stop - 1;
- for (int index = offset; index < numServers; index++) {
- if (numClientsPerHost.at(index) > upperboundClientsPerServer(numClients, numServers))
- {
- cout << "INDEX=" << index << " too many -- actual=" << numClientsPerHost.at(index)
- << " expected=" << upperboundClientsPerServer(numClients, numServers) << endl;
- }
- CPPUNIT_ASSERT(numClientsPerHost.at(index) <= upperboundClientsPerServer(numClients, numServers));
- if (numClientsPerHost.at(index) < lowerboundClientsPerServer(numClients, numServers))
- {
- cout << "INDEX=" << index << " too few -- actual=" << numClientsPerHost.at(index)
- << " expected=" << lowerboundClientsPerServer(numClients, numServers) << endl;
- }
- CPPUNIT_ASSERT(numClientsPerHost.at(index) >= lowerboundClientsPerServer(numClients, numServers));
- numClientsPerHost.at(index) = 0; // prepare for next test
- }
- }
- /*-------------------------------------------------------------------------*
- * TESTCASES
- *------------------------------------------------------------------------*/
- /**
- * Very basic sunny day test to ensure basic functionality of zoo_set_servers
- * and zoo_cycle_next_server.
- */
- void testcycleNextServer()
- {
- const string initial_hosts = createHostList(10); // 2010..2001
- const string new_hosts = createHostList(4); // 2004..2001
- Client &client = createClient(initial_hosts);
- client.setServersAndVerifyReconfig(new_hosts, true);
- for (int i = 0; i < 10; i++)
- {
- string next = client.cycleNextServer();
- }
- }
- /**
- * Test the migration policy implicit within the probabilistic load balancing
- * algorithm the Client implements. Tests all the corner cases whereby the
- * list of servers is decreased, increased, and stays the same. Also combines
- * various combinations of the currently connected server being in the new
- * configuration and not.
- */
- void testMigrateOrNot()
- {
- const string initial_hosts = createHostList(4); // 2004..2001
- Client &client = createClient(initial_hosts, "10.10.10.3");
- // Ensemble size decreasing, my server is in the new list
- client.setServersAndVerifyReconfig(createHostList(3), false);
- // Ensemble size decreasing, my server is NOT in the new list
- client.setServersAndVerifyReconfig(createHostList(2), true);
- // Ensemble size stayed the same, my server is NOT in the new list
- client.setServersAndVerifyReconfig(createHostList(2), true);
- // Ensemble size increased, my server is not in the new ensemble
- client.setServers(createHostList(4));
- client.cycleUntilServer("10.10.10.1");
- client.setServersAndVerifyReconfig(createHostList(7,2), true);
- }
- /**
- * This tests that as a client is in reconfig mode it will properly try to
- * connect to all the new servers first. Then it will try to connect to all
- * the 'old' servers that are staying in the new configuration. Finally it
- * will fallback to the normal behavior of trying servers in round-robin.
- */
- void testMigrationCycle()
- {
- int num_initial = 4;
- const string initial_hosts = createHostList(num_initial); // {2004..2001}
- int num_new = 10;
- string new_hosts = createHostList(12, 3); // {2012..2003}
- // servers from the old list that appear in the new list {2004..2003}
- int num_staying = 2;
- string oldStaying = createHostList(4, 3);
- // servers in the new list that are not in the old list {2012..2005}
- int num_coming = 8;
- string newComing = createHostList(12, 5);
- // Ensemble in increasing in size, my server is not in the new ensemble
- // load on the old servers must be decreased, so must connect to one of
- // new servers (pNew = 1)
- Client &client = createClient(initial_hosts, "10.10.10.1");
- client.setServersAndVerifyReconfig(new_hosts, true);
- // Since we're in reconfig mode, next connect should be from new list
- // We should try all the new servers *BEFORE* trying any old servers
- string seen;
- for (int i = 0; i < num_coming; i++) {
- client.cycleNextServer();
- // Assert next server is in the 'new' list
- stringstream next;
- next << client.getServerNoPort() << ":" << client.getServerPort();
- size_t found = newComing.find(next.str());
- CPPUNIT_ASSERT_MESSAGE(next.str() + " not in newComing list",
- found != string::npos);
- // Assert not in seen list then append
- found = seen.find(next.str());
- CPPUNIT_ASSERT_MESSAGE(next.str() + " in seen list",
- found == string::npos);
- seen += next.str() + ", ";
- }
- // Now it should start connecting to the old servers
- seen.clear();
- for (int i = 0; i < num_staying; i++) {
- client.cycleNextServer();
- // Assert it's in the old list
- stringstream next;
- next << client.getServerNoPort() << ":" << client.getServerPort();
- size_t found = oldStaying.find(next.str());
- CPPUNIT_ASSERT(found != string::npos);
- // Assert not in seen list then append
- found = seen.find(next.str());
- CPPUNIT_ASSERT(found == string::npos);
- seen += next.str() + ", ";
- }
- // NOW it goes back to normal as we've tried all the new and old
- string first = client.cycleNextServer();
- for (int i = 0; i < num_new - 1; i++) {
- client.cycleNextServer();
- }
- CPPUNIT_ASSERT_EQUAL(first, client.cycleNextServer());
- }
- /**
- * Test the migration probability to ensure that it conforms to our expected
- * lower and upper bounds of the number of clients per server as we are
- * reconfigured.
- *
- * In this case, the list of servers is increased and the client's server is
- * in the new list. Whether to move or not depends on the difference of
- * server sizes with probability 1 - |old|/|new| the client disconnects.
- *
- * In the test below 1-9/10 = 1/10 chance of disconnecting
- */
- void testMigrateProbability()
- {
- const string initial_hosts = createHostList(9); // 10.10.10.9:2009...10.10.10.1:2001
- string new_hosts = createHostList(10); // 10.10.10.10:2010...10.10.10.1:2001
- uint32_t numDisconnects = 0;
- for (int i = 0; i < numClients; i++) {
- Client &client = createClient(initial_hosts, "10.10.10.3");
- client.setServers(new_hosts);
- if (client.isReconfig())
- {
- numDisconnects++;
- }
- }
- // should be numClients/10 in expectation, we test that it's numClients/10 +- slackPercent
- CPPUNIT_ASSERT(numDisconnects < upperboundClientsPerServer(numClients, 10));
- }
- /**
- * Tests the probabilistic load balancing algorithm implemented by the Client
- * code.
- *
- * Test strategy:
- *
- * (1) Start with 9 servers and 10,000 clients. Remove a server, update
- * everything, and ensure that the clients are redistributed properly.
- *
- * (2) Remove two more nodes and repeat the same validations of proper client
- * redistribution. Ensure no clients are connected to the two removed
- * nodes.
- *
- * (3) Remove the first server in the list and simultaneously add the three
- * previously removed servers. Ensure everything is redistributed and
- * no clients are connected to the one missing node.
- *
- * (4) Add the one missing server back into the mix and validate.
- */
- void testLoadBalancing()
- {
- zoo_deterministic_conn_order(0);
- uint32_t numServers = 9;
- const string initial_hosts = createHostList(numServers); // 10.10.10.9:2009...10.10.10.1:2001
- // Create connections to servers
- for (int i = 0; i < numClients; i++) {
- Client &client = createClient(initial_hosts);
- numClientsPerHost.at(client.getServerPort() - portOffset - 1)++;
- }
- for (uint32_t i = 0; i < numServers; i++) {
- CPPUNIT_ASSERT(numClientsPerHost.at(i) <= upperboundClientsPerServer(numClients, numServers));
- CPPUNIT_ASSERT(numClientsPerHost.at(i) >= lowerboundClientsPerServer(numClients, numServers));
- numClientsPerHost.at(i) = 0; // prepare for next test
- }
- // remove last server
- numServers = 8;
- updateAllClientsAndServers(numServers);
- CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(numServers));
- // Remove two more nodes
- numServers = 6;
- updateAllClientsAndServers(numServers);
- CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(numServers));
- CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(numServers+1));
- CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(numServers+2));
- // remove host 0 (first one in list) and add back 6, 7, and 8
- numServers = 8;
- updateAllClientsAndServers(numServers, 1);
- CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(0));
- // add back host number 0
- numServers = 9;
- updateAllClientsAndServers(numServers);
- }
- /**
- * This tests that client can detect server's ipv4 address change.
- *
- * (1) We generate some address and put in addr, which saddr point to
- * (2) Add all addresses that differ by one bit from the source
- * (3) Add same address, but set ipv6 protocol
- * (4) Ensure, that our address is not equal to any of generated,
- * and that it equals to itself
- */
- void testAddrVecContainsIPv4() {
- addrvec_t vec;
- addrvec_init(&vec);
- sockaddr_storage addr;
- sockaddr_in* saddr = (sockaddr_in*)&addr;
- saddr->sin_family = AF_INET;
- saddr->sin_port = htons((u_short)1234);
- saddr->sin_addr.s_addr = INADDR_ANY;
- CPPUNIT_ASSERT(sizeof(saddr->sin_addr.s_addr) == 4);
- for (int i = 0; i < 32; i++) {
- saddr->sin_addr.s_addr ^= (1 << i);
- addrvec_append(&vec, &addr);
- saddr->sin_addr.s_addr ^= (1 << i);
- }
- saddr->sin_family = AF_INET6;
- addrvec_append(&vec, &addr);
- saddr->sin_family = AF_INET;
- CPPUNIT_ASSERT(!addrvec_contains(&vec, &addr));
- addrvec_append(&vec, &addr);
- CPPUNIT_ASSERT(addrvec_contains(&vec, &addr));
- addrvec_free(&vec);
- }
- /**
- * This tests that client can detect server's ipv6 address change.
- *
- * Same logic as in previous testAddrVecContainsIPv4 method,
- * but we keep in mind, that ipv6 is 128-bit long.
- */
- #ifdef AF_INET6
- void testAddrVecContainsIPv6() {
- addrvec_t vec;
- addrvec_init(&vec);
- sockaddr_storage addr;
- sockaddr_in6* saddr = (sockaddr_in6*)&addr;
- saddr->sin6_family = AF_INET6;
- saddr->sin6_port = htons((u_short)1234);
- saddr->sin6_addr = in6addr_any;
- CPPUNIT_ASSERT(sizeof(saddr->sin6_addr.s6_addr) == 16);
- for (int i = 0; i < 16; i++) {
- for (int j = 0; j < 8; j++) {
- saddr->sin6_addr.s6_addr[i] ^= (1 << j);
- addrvec_append(&vec, &addr);
- saddr->sin6_addr.s6_addr[i] ^= (1 << j);
- }
- }
- saddr->sin6_family = AF_INET;
- addrvec_append(&vec, &addr);
- saddr->sin6_family = AF_INET6;
- CPPUNIT_ASSERT(!addrvec_contains(&vec, &addr));
- addrvec_append(&vec, &addr);
- CPPUNIT_ASSERT(addrvec_contains(&vec, &addr));
- addrvec_free(&vec);
- }
- #endif
- };
- CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_reconfig);
|