cluster.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. #!/usr/bin/python
  2. # Licensed to the Apache Software Foundation (ASF) under one or more
  3. # contributor license agreements. See the NOTICE file distributed with
  4. # this work for additional information regarding copyright ownership.
  5. # The ASF licenses this file to You under the Apache License, Version 2.0
  6. # (the "License"); you may not use this file except in compliance with
  7. # the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import logging
  17. import os
  18. import re
  19. import subprocess
  20. import yaml
  21. from os import environ
  22. from subprocess import call
  23. from ozone import util
  24. from ozone.constants import Command
  25. from ozone.blockade import Blockade
  26. from ozone.client import OzoneClient
  27. from ozone.container import Container
  28. from ozone.exceptions import ContainerNotFoundError
  29. class Configuration:
  30. """
  31. Configurations to be used while starting Ozone Cluster.
  32. Here @property decorators is used to achieve getters, setters and delete
  33. behaviour for 'datanode_count' attribute.
  34. @datanode_count.setter will set the value for 'datanode_count' attribute.
  35. @datanode_count.deleter will delete the current value of 'datanode_count'
  36. attribute.
  37. """
  38. def __init__(self):
  39. if "MAVEN_TEST" in os.environ:
  40. compose_dir = environ.get("MAVEN_TEST")
  41. self.docker_compose_file = os.path.join(compose_dir, "docker-compose.yaml")
  42. elif "OZONE_HOME" in os.environ:
  43. compose_dir = os.path.join(environ.get("OZONE_HOME"), "compose", "ozoneblockade")
  44. self.docker_compose_file = os.path.join(compose_dir, "docker-compose.yaml")
  45. else:
  46. __parent_dir__ = os.path.dirname(os.path.dirname(os.path.dirname(
  47. os.path.dirname(os.path.realpath(__file__)))))
  48. self.docker_compose_file = os.path.join(__parent_dir__,
  49. "compose", "ozoneblockade",
  50. "docker-compose.yaml")
  51. self._datanode_count = 3
  52. os.environ["DOCKER_COMPOSE_FILE"] = self.docker_compose_file
  53. @property
  54. def datanode_count(self):
  55. return self._datanode_count
  56. @datanode_count.setter
  57. def datanode_count(self, datanode_count):
  58. self._datanode_count = datanode_count
  59. @datanode_count.deleter
  60. def datanode_count(self):
  61. del self._datanode_count
  62. class OzoneCluster(object):
  63. """
  64. This represents Ozone Cluster.
  65. Here @property decorators is used to achieve getters, setters and delete
  66. behaviour for 'om', 'scm', 'datanodes' and 'client' attributes.
  67. """
  68. __logger__ = logging.getLogger(__name__)
  69. def __init__(self, conf):
  70. self.conf = conf
  71. self.docker_compose_file = conf.docker_compose_file
  72. self._om = None
  73. self._scm = None
  74. self._datanodes = None
  75. self._client = None
  76. self.scm_uuid = None
  77. self.datanode_dir = None
  78. @property
  79. def om(self):
  80. return self._om
  81. @om.setter
  82. def om(self, om):
  83. self._om = om
  84. @om.deleter
  85. def om(self):
  86. del self._om
  87. @property
  88. def scm(self):
  89. return self._scm
  90. @scm.setter
  91. def scm(self, scm):
  92. self._scm = scm
  93. @scm.deleter
  94. def scm(self):
  95. del self._scm
  96. @property
  97. def datanodes(self):
  98. return self._datanodes
  99. @datanodes.setter
  100. def datanodes(self, datanodes):
  101. self._datanodes = datanodes
  102. @datanodes.deleter
  103. def datanodes(self):
  104. del self._datanodes
  105. @property
  106. def client(self):
  107. return self._client
  108. @client.setter
  109. def client(self, client):
  110. self._client = client
  111. @client.deleter
  112. def client(self):
  113. del self._client
  114. @classmethod
  115. def create(cls, config=Configuration()):
  116. return OzoneCluster(config)
  117. def start(self):
  118. """
  119. Start Ozone Cluster in docker containers.
  120. """
  121. # check if proper env $HDDS_VERSION and $HADOOP_RUNNER_VERSION
  122. # are set.
  123. # check if docker is up.
  124. self.__logger__.info("Starting Ozone Cluster")
  125. if Blockade.blockade_status() == 0:
  126. Blockade.blockade_destroy()
  127. Blockade.blockade_up()
  128. call([Command.docker_compose, "-f", self.docker_compose_file,
  129. "up", "-d", "--scale",
  130. "datanode=" + str(self.conf.datanode_count)])
  131. self.__logger__.info("Waiting 10s for cluster start up...")
  132. # Remove the sleep and wait only till the cluster is out of safemode
  133. # time.sleep(10)
  134. output = subprocess.check_output([Command.docker_compose, "-f",
  135. self.docker_compose_file, "ps"])
  136. node_list = []
  137. for out in output.split("\n")[2:-1]:
  138. node = out.split(" ")[0]
  139. node_list.append(node)
  140. Blockade.blockade_add(node)
  141. self.om = filter(lambda x: 'om' in x, node_list)[0]
  142. self.scm = filter(lambda x: 'scm' in x, node_list)[0]
  143. self.datanodes = sorted(list(filter(lambda x: 'datanode' in x, node_list)))
  144. self.client = filter(lambda x: 'ozone_client' in x, node_list)[0]
  145. self.scm_uuid = self.__get_scm_uuid__()
  146. self.datanode_dir = self.get_conf_value("hdds.datanode.dir")
  147. assert node_list, "no node found in the cluster!"
  148. self.__logger__.info("blockade created with nodes %s", ' '.join(node_list))
  149. def get_conf_value(self, key):
  150. """
  151. Returns the value of given configuration key.
  152. """
  153. command = [Command.ozone, "getconf -confKey " + key]
  154. exit_code, output = util.run_docker_command(command, self.om)
  155. return str(output).strip()
  156. def scale_datanode(self, datanode_count):
  157. """
  158. Commission new datanodes to the running cluster.
  159. """
  160. call([Command.docker_compose, "-f", self.docker_compose_file,
  161. "up", "-d", "--scale", "datanode=" + datanode_count])
  162. def partition_network(self, *args):
  163. """
  164. Partition the network which is used by the cluster.
  165. """
  166. Blockade.blockade_create_partition(*args)
  167. def restore_network(self):
  168. """
  169. Restores the network partition.
  170. """
  171. Blockade.blockade_join()
  172. def __get_scm_uuid__(self):
  173. """
  174. Returns SCM's UUID.
  175. """
  176. ozone_metadata_dir = self.get_conf_value("ozone.metadata.dirs")
  177. command = "cat %s/scm/current/VERSION" % ozone_metadata_dir
  178. exit_code, output = util.run_docker_command(command, self.scm)
  179. output_list = output.split("\n")
  180. key_value = [x for x in output_list if re.search(r"\w+=\w+", x)]
  181. uuid = [token for token in key_value if 'scmUuid' in token]
  182. return uuid.pop().split("=")[1].strip()
  183. def get_client(self):
  184. return OzoneClient(self)
  185. def get_container(self, container_id):
  186. command = [Command.ozone, "scmcli list -c=1 -s=%s | grep containerID", container_id - 1]
  187. exit_code, output = util.run_docker_command(command, self.om)
  188. if exit_code != 0:
  189. raise ContainerNotFoundError(container_id)
  190. return Container(container_id, self)
  191. def get_containers_on_datanode(self, datanode):
  192. """
  193. Returns all the container on given datanode.
  194. """
  195. container_parent_path = "%s/hdds/%s/current/containerDir0" % \
  196. (self.datanode_dir, self.scm_uuid)
  197. command = "find %s -type f -name '*.container'" % container_parent_path
  198. exit_code, output = util.run_docker_command(command, datanode)
  199. containers = []
  200. container_list = map(str.strip, output.split("\n"))
  201. for container_path in container_list:
  202. # Reading the container file.
  203. exit_code, output = util.run_docker_command(
  204. "cat " + container_path, datanode)
  205. if exit_code is not 0:
  206. continue
  207. data = output.split("\n")
  208. # Reading key value pairs from container file.
  209. key_value = [x for x in data if re.search(r"\w+:\s\w+", x)]
  210. content = "\n".join(key_value)
  211. content_yaml = yaml.load(content)
  212. if content_yaml is None:
  213. continue
  214. containers.append(Container(content_yaml.get('containerID'), self))
  215. return containers
  216. def get_container_state(self, container_id, datanode):
  217. container_parent_path = "%s/hdds/%s/current/containerDir0" % \
  218. (self.datanode_dir, self.scm_uuid)
  219. command = "find %s -type f -name '%s.container'" % (container_parent_path, container_id)
  220. exit_code, output = util.run_docker_command(command, datanode)
  221. container_path = output.strip()
  222. if not container_path:
  223. raise ContainerNotFoundError("Container not found!")
  224. # Reading the container file.
  225. exit_code, output = util.run_docker_command("cat " + container_path, datanode)
  226. if exit_code != 0:
  227. raise ContainerNotFoundError("Container not found!")
  228. data = output.split("\n")
  229. # Reading key value pairs from container file.
  230. key_value = [x for x in data if re.search(r"\w+:\s\w+", x)]
  231. content = "\n".join(key_value)
  232. content_yaml = yaml.load(content)
  233. return str(content_yaml.get('state')).lstrip()
  234. def get_container_datanodes(self, container_id):
  235. result = []
  236. for datanode in self.datanodes:
  237. container_parent_path = "%s/hdds/%s/current/containerDir0" % \
  238. (self.datanode_dir, self.scm_uuid)
  239. command = "find %s -type f -name '%s.container'" % (container_parent_path, container_id)
  240. exit_code, output = util.run_docker_command(command, datanode)
  241. if exit_code == 0:
  242. result.append(datanode)
  243. return result
  244. def stop(self):
  245. """
  246. Stops the Ozone Cluster.
  247. """
  248. self.__logger__.info("Stopping Ozone Cluster")
  249. call([Command.docker_compose, "-f", self.docker_compose_file, "down"])
  250. Blockade.blockade_destroy()