TestClientRetry.cc 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include <cppunit/extensions/HelperMacros.h>
  19. #include "CppAssertHelper.h"
  20. #include <signal.h>
  21. #include <stdlib.h>
  22. #include <unistd.h>
  23. #include "Vector.h"
  24. using namespace std;
  25. #include <zookeeper.h>
  26. #include "Util.h"
  27. #include "WatchUtil.h"
  28. class Zookeeper_clientretry : public CPPUNIT_NS::TestFixture
  29. {
  30. CPPUNIT_TEST_SUITE(Zookeeper_clientretry);
  31. #ifdef THREADED
  32. CPPUNIT_TEST(testRetry);
  33. #endif
  34. CPPUNIT_TEST_SUITE_END();
  35. static void watcher(zhandle_t *, int type, int state, const char *path,void*v){
  36. watchctx_t *ctx = (watchctx_t*)v;
  37. if (state == ZOO_CONNECTED_STATE) {
  38. ctx->connected = true;
  39. } else {
  40. ctx->connected = false;
  41. }
  42. if (type != ZOO_SESSION_EVENT) {
  43. evt_t evt;
  44. evt.path = path;
  45. evt.type = type;
  46. ctx->putEvent(evt);
  47. }
  48. }
  49. static const char hostPorts[];
  50. const char *getHostPorts() {
  51. return hostPorts;
  52. }
  53. zhandle_t *createClient(watchctx_t *ctx) {
  54. zhandle_t *zk = zookeeper_init(hostPorts, watcher, 10000, 0,
  55. ctx, 0);
  56. ctx->zh = zk;
  57. sleep(1);
  58. return zk;
  59. }
  60. FILE *logfile;
  61. public:
  62. Zookeeper_clientretry() {
  63. logfile = openlogfile("Zookeeper_clientretry");
  64. }
  65. ~Zookeeper_clientretry() {
  66. if (logfile) {
  67. fflush(logfile);
  68. fclose(logfile);
  69. logfile = 0;
  70. }
  71. }
  72. void setUp()
  73. {
  74. zoo_set_log_stream(logfile);
  75. char cmd[1024];
  76. sprintf(cmd, "%s stop %s", ZKSERVER_CMD, getHostPorts());
  77. CPPUNIT_ASSERT(system(cmd) == 0);
  78. /* we are testing that if max cnxns is exceeded the server does the right thing */
  79. sprintf(cmd, "ZKMAXCNXNS=1 %s startClean %s", ZKSERVER_CMD, getHostPorts());
  80. CPPUNIT_ASSERT(system(cmd) == 0);
  81. struct sigaction act;
  82. act.sa_handler = SIG_IGN;
  83. sigemptyset(&act.sa_mask);
  84. act.sa_flags = 0;
  85. CPPUNIT_ASSERT(sigaction(SIGPIPE, &act, NULL) == 0);
  86. }
  87. void tearDown()
  88. {
  89. char cmd[1024];
  90. sprintf(cmd, "%s stop %s", ZKSERVER_CMD, getHostPorts());
  91. CPPUNIT_ASSERT(system(cmd) == 0);
  92. /* restart the server in "normal" mode */
  93. sprintf(cmd, "%s startClean %s", ZKSERVER_CMD, getHostPorts());
  94. CPPUNIT_ASSERT(system(cmd) == 0);
  95. struct sigaction act;
  96. act.sa_handler = SIG_IGN;
  97. sigemptyset(&act.sa_mask);
  98. act.sa_flags = 0;
  99. CPPUNIT_ASSERT(sigaction(SIGPIPE, &act, NULL) == 0);
  100. }
  101. bool waitForEvent(zhandle_t *zh, watchctx_t *ctx, int seconds) {
  102. time_t expires = time(0) + seconds;
  103. while(ctx->countEvents() == 0 && time(0) < expires) {
  104. yield(zh, 1);
  105. }
  106. return ctx->countEvents() > 0;
  107. }
  108. static zhandle_t *async_zk;
  109. void testRetry()
  110. {
  111. watchctx_t ctx1, ctx2;
  112. zhandle_t *zk1 = createClient(&ctx1);
  113. CPPUNIT_ASSERT_EQUAL(true, ctx1.waitForConnected(zk1));
  114. zhandle_t *zk2 = createClient(&ctx2);
  115. zookeeper_close(zk1);
  116. CPPUNIT_ASSERT_EQUAL(true, ctx2.waitForConnected(zk2));
  117. ctx1.zh = 0;
  118. }
  119. };
  120. zhandle_t *Zookeeper_clientretry::async_zk;
  121. const char Zookeeper_clientretry::hostPorts[] = "127.0.0.1:22181";
  122. CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_clientretry);