TestReconfig.cc 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include <cppunit/extensions/HelperMacros.h>
  19. #include <sys/types.h>
  20. #include <netinet/in.h>
  21. #include <errno.h>
  22. #include <iostream>
  23. #include <sstream>
  24. #include <arpa/inet.h>
  25. #include <exception>
  26. #include <stdlib.h>
  27. #include "Util.h"
  28. #include "LibCMocks.h"
  29. #include "ZKMocks.h"
  30. using namespace std;
  31. static const int portOffset = 2000;
  32. class Client
  33. {
  34. private:
  35. // Member variables
  36. zhandle_t *zh;
  37. unsigned int seed;
  38. public:
  39. /**
  40. * Create a client with given connection host string and add to our internal
  41. * vector of clients. These are disconnected and cleaned up in tearDown().
  42. */
  43. Client(const string hosts, unsigned int seed) :
  44. seed((seed * seed) + 0xAFAFAFAF)
  45. {
  46. reSeed();
  47. zh = zookeeper_init(hosts.c_str(),0,1000,0,0,0);
  48. CPPUNIT_ASSERT(zh);
  49. // Set the flag to disable ZK from reconnecting to a different server.
  50. // Our reconfig test case will do explicit server shuffling through
  51. // zoo_cycle_next_server, and the reconnection attempts would interfere
  52. // with the server states the tests cases assume.
  53. zh->disable_reconnection_attempt = 1;
  54. reSeed();
  55. cycleNextServer();
  56. }
  57. void close()
  58. {
  59. zookeeper_close(zh);
  60. zh = NULL;
  61. }
  62. bool isReconfig()
  63. {
  64. return zh->reconfig != 0;
  65. }
  66. /**
  67. * re-seed this client with it's own previously generated seed so its
  68. * random choices are unique and separate from the other clients
  69. */
  70. void reSeed()
  71. {
  72. srandom(seed);
  73. srand48(seed);
  74. }
  75. /**
  76. * Get the server that this client is currently connected to.
  77. */
  78. string getServer()
  79. {
  80. const char* addrstring = zoo_get_current_server(zh);
  81. return string(addrstring);
  82. }
  83. /**
  84. * Get the server this client is currently connected to with no port
  85. * specification.
  86. */
  87. string getServerNoPort()
  88. {
  89. string addrstring = getServer();
  90. size_t found = addrstring.find_last_of(":");
  91. CPPUNIT_ASSERT(found != string::npos);
  92. // ipv6 address case (to remove leading and trailing bracket)
  93. if (addrstring.find("[") != string::npos)
  94. {
  95. return addrstring.substr(1, found-2);
  96. }
  97. else
  98. {
  99. return addrstring.substr(0, found);
  100. }
  101. }
  102. /**
  103. * Get the port of the server this client is currently connected to.
  104. */
  105. uint32_t getServerPort()
  106. {
  107. string addrstring = getServer();
  108. size_t found = addrstring.find_last_of(":");
  109. CPPUNIT_ASSERT(found != string::npos);
  110. string portStr = addrstring.substr(found+1);
  111. stringstream ss(portStr);
  112. uint32_t port;
  113. ss >> port;
  114. CPPUNIT_ASSERT(port >= portOffset);
  115. return port;
  116. }
  117. /**
  118. * Cycle to the next available server on the next connect attempt. It also
  119. * calls into getServer (above) to return the server connected to.
  120. */
  121. string cycleNextServer()
  122. {
  123. zoo_cycle_next_server(zh);
  124. return getServer();
  125. }
  126. void cycleUntilServer(const string requested)
  127. {
  128. // Call cycleNextServer until the one it's connected to is the one
  129. // specified (disregarding port).
  130. string first;
  131. while(true)
  132. {
  133. string next = cycleNextServer();
  134. if (first.empty())
  135. {
  136. first = next;
  137. }
  138. // Else we've looped around!
  139. else if (first == next)
  140. {
  141. CPPUNIT_ASSERT(false);
  142. }
  143. // Strip port off
  144. string server = getServerNoPort();
  145. // If it matches the requested host we're now 'connected' to the right host
  146. if (server == requested)
  147. {
  148. break;
  149. }
  150. }
  151. }
  152. /**
  153. * Set servers for this client.
  154. */
  155. void setServers(const string new_hosts)
  156. {
  157. int rc = zoo_set_servers(zh, new_hosts.c_str());
  158. CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
  159. }
  160. /**
  161. * Set servers for this client and validate reconfig value matches expected.
  162. */
  163. void setServersAndVerifyReconfig(const string new_hosts, bool is_reconfig)
  164. {
  165. setServers(new_hosts);
  166. CPPUNIT_ASSERT_EQUAL(is_reconfig, isReconfig());
  167. }
  168. /**
  169. * Sets the server list this client is connecting to AND if this requires
  170. * the client to be reconfigured (as dictated by internal client policy)
  171. * then it will trigger a call to cycleNextServer.
  172. */
  173. void setServersAndCycleIfNeeded(const string new_hosts)
  174. {
  175. setServers(new_hosts);
  176. if (isReconfig())
  177. {
  178. cycleNextServer();
  179. }
  180. }
  181. };
  182. class Zookeeper_reconfig : public CPPUNIT_NS::TestFixture
  183. {
  184. CPPUNIT_TEST_SUITE(Zookeeper_reconfig);
  185. // Test cases
  186. CPPUNIT_TEST(testcycleNextServer);
  187. CPPUNIT_TEST(testMigrateOrNot);
  188. CPPUNIT_TEST(testMigrationCycle);
  189. // In threaded mode each 'create' is a thread -- it's not practical to create
  190. // 10,000 threads to test load balancing. The load balancing code can easily
  191. // be tested in single threaded mode as concurrency doesn't affect the algorithm.
  192. #ifndef THREADED
  193. CPPUNIT_TEST(testMigrateProbability);
  194. CPPUNIT_TEST(testLoadBalancing);
  195. #endif
  196. CPPUNIT_TEST_SUITE_END();
  197. FILE *logfile;
  198. double slackPercent;
  199. static const int numClients = 10000;
  200. static const int portOffset = 2000;
  201. vector<Client> clients;
  202. vector<uint32_t> numClientsPerHost;
  203. public:
  204. Zookeeper_reconfig() :
  205. slackPercent(10.0)
  206. {
  207. logfile = openlogfile("Zookeeper_reconfig");
  208. }
  209. ~Zookeeper_reconfig()
  210. {
  211. if (logfile)
  212. {
  213. fflush(logfile);
  214. fclose(logfile);
  215. logfile = 0;
  216. }
  217. }
  218. void setUp()
  219. {
  220. zoo_set_log_stream(logfile);
  221. zoo_deterministic_conn_order(1);
  222. numClientsPerHost.resize(numClients);
  223. }
  224. void tearDown()
  225. {
  226. for (int i = 0; i < clients.size(); i++)
  227. {
  228. clients.at(i).close();
  229. }
  230. }
  231. /**
  232. * Create a client with given connection host string and add to our internal
  233. * vector of clients. These are disconnected and cleaned up in tearDown().
  234. */
  235. Client& createClient(const string hosts)
  236. {
  237. Client client(hosts, clients.size());
  238. clients.push_back(client);
  239. return clients.back();
  240. }
  241. /**
  242. * Same as createClient(hosts) only it takes a specific host that this client
  243. * should simulate being connected to.
  244. */
  245. Client& createClient(const string hosts, const string host)
  246. {
  247. // Ensure requested host is in the list
  248. size_t found = hosts.find(host);
  249. CPPUNIT_ASSERT(found != hosts.npos);
  250. Client client(hosts, clients.size());
  251. client.cycleUntilServer(host);
  252. clients.push_back(client);
  253. return clients.back();
  254. }
  255. /**
  256. * Create a connection host list starting at 'start' and stopping at 'stop'
  257. * where start >= stop. This creates a connection string with host:port pairs
  258. * separated by commas. The given 'octet' is the starting octet that is used
  259. * as the last octet in the host's IP. This is decremented on each iteration.
  260. * Each port will be portOffset + octet.
  261. */
  262. string createHostList(uint32_t start, uint32_t stop = 1, uint32_t octet = 0)
  263. {
  264. if (octet == 0)
  265. {
  266. octet = start;
  267. }
  268. stringstream ss;
  269. for (int i = start; i >= stop; i--, octet--)
  270. {
  271. ss << "10.10.10." << octet << ":" << portOffset + octet;
  272. if (i > stop)
  273. {
  274. ss << ", ";
  275. }
  276. }
  277. return ss.str();
  278. }
  279. /**
  280. * Gets the lower bound of the number of clients per server that we expect
  281. * based on the probabilistic load balancing algorithm implemented by the
  282. * client code.
  283. */
  284. double lowerboundClientsPerServer(int numClients, int numServers)
  285. {
  286. return (1 - slackPercent/100.0) * numClients / numServers;
  287. }
  288. /**
  289. * Gets the upper bound of the number of clients per server that we expect
  290. * based on the probabilistic load balancing algorithm implemented by the
  291. * client code.
  292. */
  293. double upperboundClientsPerServer(int numClients, int numServers)
  294. {
  295. return (1 + slackPercent/100.0) * numClients / numServers;
  296. }
  297. /**
  298. * Update all the clients to use a new list of servers. This will also cause
  299. * the client to cycle to the next server as needed (e.g. due to a reconfig).
  300. * It then updates the number of clients connected to the server based on
  301. * this change.
  302. *
  303. * Afterwards it validates that all of the servers have the correct amount of
  304. * clients based on the probabilistic load balancing algorithm.
  305. */
  306. void updateAllClientsAndServers(int start, int stop = 1)
  307. {
  308. string newServers = createHostList(start, stop);
  309. int numServers = start - stop + 1;
  310. for (int i = 0; i < numClients; i++) {
  311. Client &client = clients.at(i);
  312. client.reSeed();
  313. client.setServersAndCycleIfNeeded(newServers);
  314. numClientsPerHost.at(client.getServerPort() - portOffset - 1)++;
  315. }
  316. int offset = stop - 1;
  317. for (int index = offset; index < numServers; index++) {
  318. if (numClientsPerHost.at(index) > upperboundClientsPerServer(numClients, numServers))
  319. {
  320. cout << "INDEX=" << index << " too many -- actual=" << numClientsPerHost.at(index)
  321. << " expected=" << upperboundClientsPerServer(numClients, numServers) << endl;
  322. }
  323. CPPUNIT_ASSERT(numClientsPerHost.at(index) <= upperboundClientsPerServer(numClients, numServers));
  324. if (numClientsPerHost.at(index) < lowerboundClientsPerServer(numClients, numServers))
  325. {
  326. cout << "INDEX=" << index << " too few -- actual=" << numClientsPerHost.at(index)
  327. << " expected=" << lowerboundClientsPerServer(numClients, numServers) << endl;
  328. }
  329. CPPUNIT_ASSERT(numClientsPerHost.at(index) >= lowerboundClientsPerServer(numClients, numServers));
  330. numClientsPerHost.at(index) = 0; // prepare for next test
  331. }
  332. }
  333. /*-------------------------------------------------------------------------*
  334. * TESTCASES
  335. *------------------------------------------------------------------------*/
  336. /**
  337. * Very basic sunny day test to ensure basic functionality of zoo_set_servers
  338. * and zoo_cycle_next_server.
  339. */
  340. void testcycleNextServer()
  341. {
  342. const string initial_hosts = createHostList(10); // 2010..2001
  343. const string new_hosts = createHostList(4); // 2004..2001
  344. Client &client = createClient(initial_hosts);
  345. client.setServersAndVerifyReconfig(new_hosts, true);
  346. for (int i = 0; i < 10; i++)
  347. {
  348. string next = client.cycleNextServer();
  349. }
  350. }
  351. /**
  352. * Test the migration policy implicit within the probabilistic load balancing
  353. * algorithm the Client implements. Tests all the corner cases whereby the
  354. * list of servers is decreased, increased, and stays the same. Also combines
  355. * various combinations of the currently connected server being in the new
  356. * configuration and not.
  357. */
  358. void testMigrateOrNot()
  359. {
  360. const string initial_hosts = createHostList(4); // 2004..2001
  361. Client &client = createClient(initial_hosts, "10.10.10.3");
  362. // Ensemble size decreasing, my server is in the new list
  363. client.setServersAndVerifyReconfig(createHostList(3), false);
  364. // Ensemble size decreasing, my server is NOT in the new list
  365. client.setServersAndVerifyReconfig(createHostList(2), true);
  366. // Ensemble size stayed the same, my server is NOT in the new list
  367. client.setServersAndVerifyReconfig(createHostList(2), true);
  368. // Ensemble size increased, my server is not in the new ensemble
  369. client.setServers(createHostList(4));
  370. client.cycleUntilServer("10.10.10.1");
  371. client.setServersAndVerifyReconfig(createHostList(7,2), true);
  372. }
  373. /**
  374. * This tests that as a client is in reconfig mode it will properly try to
  375. * connect to all the new servers first. Then it will try to connect to all
  376. * the 'old' servers that are staying in the new configuration. Finally it
  377. * will fallback to the normal behavior of trying servers in round-robin.
  378. */
  379. void testMigrationCycle()
  380. {
  381. int num_initial = 4;
  382. const string initial_hosts = createHostList(num_initial); // {2004..2001}
  383. int num_new = 10;
  384. string new_hosts = createHostList(12, 3); // {2012..2003}
  385. // servers from the old list that appear in the new list {2004..2003}
  386. int num_staying = 2;
  387. string oldStaying = createHostList(4, 3);
  388. // servers in the new list that are not in the old list {2012..2005}
  389. int num_coming = 8;
  390. string newComing = createHostList(12, 5);
  391. // Ensemble in increasing in size, my server is not in the new ensemble
  392. // load on the old servers must be decreased, so must connect to one of
  393. // new servers (pNew = 1)
  394. Client &client = createClient(initial_hosts, "10.10.10.1");
  395. client.setServersAndVerifyReconfig(new_hosts, true);
  396. // Since we're in reconfig mode, next connect should be from new list
  397. // We should try all the new servers *BEFORE* trying any old servers
  398. string seen;
  399. for (int i = 0; i < num_coming; i++) {
  400. client.cycleNextServer();
  401. // Assert next server is in the 'new' list
  402. stringstream next;
  403. next << client.getServerNoPort() << ":" << client.getServerPort();
  404. size_t found = newComing.find(next.str());
  405. CPPUNIT_ASSERT_MESSAGE(next.str() + " not in newComing list",
  406. found != string::npos);
  407. // Assert not in seen list then append
  408. found = seen.find(next.str());
  409. CPPUNIT_ASSERT_MESSAGE(next.str() + " in seen list",
  410. found == string::npos);
  411. seen += found + ", ";
  412. }
  413. // Now it should start connecting to the old servers
  414. seen.clear();
  415. for (int i = 0; i < num_staying; i++) {
  416. client.cycleNextServer();
  417. // Assert it's in the old list
  418. stringstream next;
  419. next << client.getServerNoPort() << ":" << client.getServerPort();
  420. size_t found = oldStaying.find(next.str());
  421. CPPUNIT_ASSERT(found != string::npos);
  422. // Assert not in seen list then append
  423. found = seen.find(next.str());
  424. CPPUNIT_ASSERT(found == string::npos);
  425. seen += found + ", ";
  426. }
  427. // NOW it goes back to normal as we've tried all the new and old
  428. string first = client.cycleNextServer();
  429. for (int i = 0; i < num_new - 1; i++) {
  430. client.cycleNextServer();
  431. }
  432. CPPUNIT_ASSERT_EQUAL(first, client.cycleNextServer());
  433. }
  434. /**
  435. * Test the migration probability to ensure that it conforms to our expected
  436. * lower and upper bounds of the number of clients per server as we are
  437. * reconfigured.
  438. *
  439. * In this case, the list of servers is increased and the client's server is
  440. * in the new list. Whether to move or not depends on the difference of
  441. * server sizes with probability 1 - |old|/|new| the client disconnects.
  442. *
  443. * In the test below 1-9/10 = 1/10 chance of disconnecting
  444. */
  445. void testMigrateProbability()
  446. {
  447. const string initial_hosts = createHostList(9); // 10.10.10.9:2009...10.10.10.1:2001
  448. string new_hosts = createHostList(10); // 10.10.10.10:2010...10.10.10.1:2001
  449. uint32_t numDisconnects = 0;
  450. for (int i = 0; i < numClients; i++) {
  451. Client &client = createClient(initial_hosts, "10.10.10.3");
  452. client.setServers(new_hosts);
  453. if (client.isReconfig())
  454. {
  455. numDisconnects++;
  456. }
  457. }
  458. // should be numClients/10 in expectation, we test that it's numClients/10 +- slackPercent
  459. CPPUNIT_ASSERT(numDisconnects < upperboundClientsPerServer(numClients, 10));
  460. }
  461. /**
  462. * Tests the probabilistic load balancing algorithm implemented by the Client
  463. * code.
  464. *
  465. * Test strategy:
  466. *
  467. * (1) Start with 9 servers and 10,000 clients. Remove a server, update
  468. * everything, and ensure that the clients are redistributed properly.
  469. *
  470. * (2) Remove two more nodes and repeat the same validations of proper client
  471. * redistribution. Ensure no clients are connected to the two removed
  472. * nodes.
  473. *
  474. * (3) Remove the first server in the list and simultaneously add the three
  475. * previously removed servers. Ensure everything is redistributed and
  476. * no clients are connected to the one missing node.
  477. *
  478. * (4) Add the one missing server back into the mix and validate.
  479. */
  480. void testLoadBalancing()
  481. {
  482. zoo_deterministic_conn_order(0);
  483. int rc = ZOK;
  484. uint32_t numServers = 9;
  485. const string initial_hosts = createHostList(numServers); // 10.10.10.9:2009...10.10.10.1:2001
  486. // Create connections to servers
  487. for (int i = 0; i < numClients; i++) {
  488. Client &client = createClient(initial_hosts);
  489. numClientsPerHost.at(client.getServerPort() - portOffset - 1)++;
  490. }
  491. for (int i = 0; i < numServers; i++) {
  492. CPPUNIT_ASSERT(numClientsPerHost.at(i) <= upperboundClientsPerServer(numClients, numServers));
  493. CPPUNIT_ASSERT(numClientsPerHost.at(i) >= lowerboundClientsPerServer(numClients, numServers));
  494. numClientsPerHost.at(i) = 0; // prepare for next test
  495. }
  496. // remove last server
  497. numServers = 8;
  498. updateAllClientsAndServers(numServers);
  499. CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(numServers));
  500. // Remove two more nodes
  501. numServers = 6;
  502. updateAllClientsAndServers(numServers);
  503. CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(numServers));
  504. CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(numServers+1));
  505. CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(numServers+2));
  506. // remove host 0 (first one in list) and add back 6, 7, and 8
  507. numServers = 8;
  508. updateAllClientsAndServers(numServers, 1);
  509. CPPUNIT_ASSERT_EQUAL((uint32_t)0, numClientsPerHost.at(0));
  510. // add back host number 0
  511. numServers = 9;
  512. updateAllClientsAndServers(numServers);
  513. }
  514. };
  515. CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_reconfig);