zookeeper_ganglia.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """ Python Ganglia Module for ZooKeeper monitoring
  15. Inspired by: http://gist.github.com/448007
  16. Copy this file to /usr/lib/ganglia/python_plugins
  17. """
  18. import sys
  19. import socket
  20. import time
  21. import re
  22. import copy
  23. from StringIO import StringIO
  24. TIME_BETWEEN_QUERIES = 20
  25. ZK_METRICS = {
  26. 'time' : 0,
  27. 'data' : {}
  28. }
  29. ZK_LAST_METRICS = copy.deepcopy(ZK_METRICS)
  30. class ZooKeeperServer(object):
  31. def __init__(self, host='localhost', port='2181', timeout=1):
  32. self._address = (host, int(port))
  33. self._timeout = timeout
  34. def get_stats(self):
  35. """ Get ZooKeeper server stats as a map """
  36. global ZK_METRICS, ZK_LAST_METRICS
  37. # update cache
  38. ZK_METRICS = {
  39. 'time' : time.time(),
  40. 'data' : {}
  41. }
  42. data = self._send_cmd('mntr')
  43. if data:
  44. parsed_data = self._parse(data)
  45. else:
  46. data = self._send_cmd('stat')
  47. parsed_data = self._parse_stat(data)
  48. ZK_METRICS['data'] = parsed_data
  49. ZK_LAST_METRICS = copy.deepcopy(ZK_METRICS)
  50. return parsed_data
  51. def _create_socket(self):
  52. return socket.socket()
  53. def _send_cmd(self, cmd):
  54. """ Send a 4letter word command to the server """
  55. s = self._create_socket()
  56. s.settimeout(self._timeout)
  57. s.connect(self._address)
  58. s.send(cmd)
  59. data = s.recv(2048)
  60. s.close()
  61. return data
  62. def _parse(self, data):
  63. """ Parse the output from the 'mntr' 4letter word command """
  64. h = StringIO(data)
  65. result = {}
  66. for line in h.readlines():
  67. try:
  68. key, value = self._parse_line(line)
  69. result[key] = value
  70. except ValueError:
  71. pass # ignore broken lines
  72. return result
  73. def _parse_stat(self, data):
  74. """ Parse the output from the 'stat' 4letter word command """
  75. global ZK_METRICS, ZK_LAST_METRICS
  76. h = StringIO(data)
  77. result = {}
  78. version = h.readline()
  79. if version:
  80. result['zk_version'] = version[version.index(':')+1:].strip()
  81. # skip all lines until we find the empty one
  82. while h.readline().strip(): pass
  83. for line in h.readlines():
  84. m = re.match('Latency min/avg/max: (\d+)/(\d+)/(\d+)', line)
  85. if m is not None:
  86. result['zk_min_latency'] = int(m.group(1))
  87. result['zk_avg_latency'] = int(m.group(2))
  88. result['zk_max_latency'] = int(m.group(3))
  89. continue
  90. m = re.match('Received: (\d+)', line)
  91. if m is not None:
  92. cur_packets = int(m.group(1))
  93. packet_delta = cur_packets - ZK_LAST_METRICS['data'].get('zk_packets_received_total', cur_packets)
  94. time_delta = ZK_METRICS['time'] - ZK_LAST_METRICS['time']
  95. time_delta = 10.0
  96. try:
  97. result['zk_packets_received_total'] = cur_packets
  98. result['zk_packets_received'] = packet_delta / float(time_delta)
  99. except ZeroDivisionError:
  100. result['zk_packets_received'] = 0
  101. continue
  102. m = re.match('Sent: (\d+)', line)
  103. if m is not None:
  104. cur_packets = int(m.group(1))
  105. packet_delta = cur_packets - ZK_LAST_METRICS['data'].get('zk_packets_sent_total', cur_packets)
  106. time_delta = ZK_METRICS['time'] - ZK_LAST_METRICS['time']
  107. try:
  108. result['zk_packets_sent_total'] = cur_packets
  109. result['zk_packets_sent'] = packet_delta / float(time_delta)
  110. except ZeroDivisionError:
  111. result['zk_packets_sent'] = 0
  112. continue
  113. m = re.match('Outstanding: (\d+)', line)
  114. if m is not None:
  115. result['zk_outstanding_requests'] = int(m.group(1))
  116. continue
  117. m = re.match('Mode: (.*)', line)
  118. if m is not None:
  119. result['zk_server_state'] = m.group(1)
  120. continue
  121. m = re.match('Node count: (\d+)', line)
  122. if m is not None:
  123. result['zk_znode_count'] = int(m.group(1))
  124. continue
  125. return result
  126. def _parse_line(self, line):
  127. try:
  128. key, value = map(str.strip, line.split('\t'))
  129. except ValueError:
  130. raise ValueError('Found invalid line: %s' % line)
  131. if not key:
  132. raise ValueError('The key is mandatory and should not be empty')
  133. try:
  134. value = int(value)
  135. except (TypeError, ValueError):
  136. pass
  137. return key, value
  138. def metric_handler(name):
  139. if time.time() - ZK_LAST_METRICS['time'] > TIME_BETWEEN_QUERIES:
  140. zk = ZooKeeperServer(metric_handler.host, metric_handler.port, 5)
  141. try:
  142. metric_handler.info = zk.get_stats()
  143. except Exception, e:
  144. print >>sys.stderr, e
  145. metric_handler.info = {}
  146. return metric_handler.info.get(name, 0)
  147. def metric_init(params=None):
  148. params = params or {}
  149. metric_handler.host = params.get('host', 'localhost')
  150. metric_handler.port = int(params.get('port', 2181))
  151. metric_handler.timestamp = 0
  152. metrics = {
  153. 'zk_avg_latency': {'units': 'ms'},
  154. 'zk_max_latency': {'units': 'ms'},
  155. 'zk_min_latency': {'units': 'ms'},
  156. 'zk_packets_received': {
  157. 'units': 'pps',
  158. 'value_type': 'float',
  159. 'format': '%f'
  160. },
  161. 'zk_packets_sent': {
  162. 'units': 'pps',
  163. 'value_type': 'double',
  164. 'format': '%f'
  165. },
  166. 'zk_num_alive_connections': {'units': 'connections'},
  167. 'zk_outstanding_requests': {'units': 'connections'},
  168. 'zk_znode_count': {'units': 'znodes'},
  169. 'zk_watch_count': {'units': 'watches'},
  170. 'zk_ephemerals_count': {'units': 'znodes'},
  171. 'zk_approximate_data_size': {'units': 'bytes'},
  172. 'zk_open_file_descriptor_count': {'units': 'descriptors'},
  173. 'zk_max_file_descriptor_count': {'units': 'descriptors'},
  174. 'zk_learners': {'units': 'nodes'},
  175. 'zk_synced_followers': {'units': 'nodes'},
  176. 'zk_pending_syncs': {'units': 'syncs'},
  177. 'zk_last_proposal_size': {'units': 'bytes'},
  178. 'zk_min_proposal_size': {'units': 'bytes'},
  179. 'zk_max_proposal_size': {'units': 'bytes'}
  180. }
  181. metric_handler.descriptors = {}
  182. for name, updates in metrics.iteritems():
  183. descriptor = {
  184. 'name': name,
  185. 'call_back': metric_handler,
  186. 'time_max': 90,
  187. 'value_type': 'int',
  188. 'units': '',
  189. 'slope': 'both',
  190. 'format': '%d',
  191. 'groups': 'zookeeper',
  192. }
  193. descriptor.update(updates)
  194. metric_handler.descriptors[name] = descriptor
  195. return metric_handler.descriptors.values()
  196. def metric_cleanup():
  197. pass
  198. if __name__ == '__main__':
  199. ds = metric_init({'host':'localhost', 'port': '2181'})
  200. while True:
  201. for d in ds:
  202. print "%s=%s" % (d['name'], metric_handler(d['name']))
  203. time.sleep(10)