cluster_utils.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. #!/usr/bin/python
  2. # Licensed to the Apache Software Foundation (ASF) under one or more
  3. # contributor license agreements. See the NOTICE file distributed with
  4. # this work for additional information regarding copyright ownership.
  5. # The ASF licenses this file to You under the Apache License, Version 2.0
  6. # (the "License"); you may not use this file except in compliance with
  7. # the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. from subprocess import call
  17. import subprocess
  18. import logging
  19. import time
  20. import re
  21. import os
  22. import yaml
  23. logger = logging.getLogger(__name__)
  24. class ClusterUtils(object):
  25. """
  26. This class contains all cluster related operations.
  27. """
  28. @classmethod
  29. def cluster_setup(cls, docker_compose_file, datanode_count,
  30. destroy_existing_cluster=True):
  31. """start a blockade cluster"""
  32. logger.info("compose file :%s", docker_compose_file)
  33. logger.info("number of DNs :%d", datanode_count)
  34. if destroy_existing_cluster:
  35. call(["docker-compose", "-f", docker_compose_file, "down"])
  36. call(["docker-compose", "-f", docker_compose_file, "up", "-d",
  37. "--scale", "datanode=" + str(datanode_count)])
  38. logger.info("Waiting 30s for cluster start up...")
  39. time.sleep(30)
  40. output = subprocess.check_output(["docker-compose", "-f",
  41. docker_compose_file, "ps"])
  42. output_array = output.split("\n")[2:-1]
  43. container_list = []
  44. for out in output_array:
  45. container = out.split(" ")[0]
  46. container_list.append(container)
  47. call(["blockade", "add", container])
  48. time.sleep(2)
  49. assert container_list, "no container found!"
  50. logger.info("blockade created with containers %s",
  51. ' '.join(container_list))
  52. return container_list
  53. @classmethod
  54. def cluster_destroy(cls, docker_compose_file):
  55. logger.info("Running docker-compose -f %s down", docker_compose_file)
  56. call(["docker-compose", "-f", docker_compose_file, "down"])
  57. @classmethod
  58. def run_freon(cls, docker_compose_file, num_volumes, num_buckets,
  59. num_keys, key_size, replication_type, replication_factor,
  60. freon_client='om'):
  61. # run freon
  62. cmd = "docker-compose -f %s " \
  63. "exec %s /opt/hadoop/bin/ozone " \
  64. "freon rk " \
  65. "--numOfVolumes %s " \
  66. "--numOfBuckets %s " \
  67. "--numOfKeys %s " \
  68. "--keySize %s " \
  69. "--replicationType %s " \
  70. "--factor %s" % (docker_compose_file, freon_client, num_volumes,
  71. num_buckets, num_keys, key_size,
  72. replication_type, replication_factor)
  73. exit_code, output = cls.run_cmd(cmd)
  74. return exit_code, output
  75. @classmethod
  76. def run_cmd(cls, cmd):
  77. command = cmd
  78. if isinstance(cmd, list):
  79. command = ' '.join(cmd)
  80. logger.info(" RUNNING: %s", command)
  81. all_output = ""
  82. myprocess = subprocess.Popen(cmd, stdout=subprocess.PIPE,
  83. stderr=subprocess.STDOUT, shell=True)
  84. while myprocess.poll() is None:
  85. op = myprocess.stdout.readline()
  86. if op:
  87. all_output += op
  88. logger.info(op)
  89. other_output = myprocess.communicate()
  90. other_output = other_output[0].strip()
  91. if other_output != "":
  92. all_output += other_output
  93. for each_line in other_output.split("\n"):
  94. logger.info(" %s", each_line.strip())
  95. reg = re.compile(r"(\r\n|\n)$")
  96. all_output = reg.sub("", all_output, 1)
  97. return myprocess.returncode, all_output
  98. @classmethod
  99. def get_ozone_confkey_value(cls, docker_compose_file, key_name):
  100. cmd = "docker-compose -f %s " \
  101. "exec om /opt/hadoop/bin/ozone " \
  102. "getconf -confKey %s" \
  103. % (docker_compose_file, key_name)
  104. exit_code, output = cls.run_cmd(cmd)
  105. assert exit_code == 0, "getconf of key=[%s] failed with output=[%s]" \
  106. % (key_name, output)
  107. return str(output).strip()
  108. @classmethod
  109. def find_scm_uuid(cls, docker_compose_file):
  110. """
  111. This function returns scm uuid.
  112. """
  113. ozone_metadata_dir = cls.get_ozone_confkey_value(docker_compose_file,
  114. "ozone.metadata.dirs")
  115. cmd = "docker-compose -f %s exec scm cat %s/scm/current/VERSION" % \
  116. (docker_compose_file, ozone_metadata_dir)
  117. exit_code, output = cls.run_cmd(cmd)
  118. assert exit_code == 0, "get scm UUID failed with output=[%s]" % output
  119. output_list = output.split("\n")
  120. output_list = [x for x in output_list if re.search(r"\w+=\w+", x)]
  121. output_dict = dict(x.split("=") for x in output_list)
  122. return str(output_dict['scmUuid']).strip()
  123. @classmethod
  124. def find_container_status(cls, docker_compose_file, datanode_index):
  125. """
  126. This function returns the datanode's container replica state.
  127. In this function, it finds all <containerID>.container files.
  128. Then, it opens each file and checks the state of the containers
  129. in the datanode.
  130. It returns 'None' as container state if it cannot find any
  131. <containerID>.container file in the datanode.
  132. Sample <containerID>.container contains state as following:
  133. state: <STATE OF THE CONTAINER REPLICA>
  134. """
  135. datanode_dir = cls.get_ozone_confkey_value(docker_compose_file,
  136. "hdds.datanode.dir")
  137. scm_uuid = cls.find_scm_uuid(docker_compose_file)
  138. container_parent_path = "%s/hdds/%s/current/containerDir0" % \
  139. (datanode_dir, scm_uuid)
  140. cmd = "docker-compose -f %s exec --index=%s datanode find %s -type f " \
  141. "-name '*.container'" \
  142. % (docker_compose_file, datanode_index, container_parent_path)
  143. exit_code, output = cls.run_cmd(cmd)
  144. container_state = "None"
  145. if exit_code == 0 and output:
  146. container_list = map(str.strip, output.split("\n"))
  147. for container_path in container_list:
  148. cmd = "docker-compose -f %s exec --index=%s datanode cat %s" \
  149. % (docker_compose_file, datanode_index, container_path)
  150. exit_code, output = cls.run_cmd(cmd)
  151. assert exit_code == 0, \
  152. "command=[%s] failed with output=[%s]" % (cmd, output)
  153. container_db_list = output.split("\n")
  154. container_db_list = [x for x in container_db_list
  155. if re.search(r"\w+:\s\w+", x)]
  156. # container_db_list will now contain the lines which has got
  157. # yaml representation , i.e , key: value
  158. container_db_info = "\n".join(container_db_list)
  159. container_db_dict = yaml.load(container_db_info)
  160. for key, value in container_db_dict.items():
  161. container_db_dict[key] = str(value).lstrip()
  162. if container_state == "None":
  163. container_state = container_db_dict['state']
  164. else:
  165. assert container_db_dict['state'] == container_state, \
  166. "all containers are not in same state"
  167. return container_state
  168. @classmethod
  169. def findall_container_status(cls, docker_compose_file, scale):
  170. """
  171. This function returns container replica states of all datanodes.
  172. """
  173. all_datanode_container_status = []
  174. for index in range(scale):
  175. all_datanode_container_status.append(
  176. cls.find_container_status(docker_compose_file, index + 1))
  177. logger.info("All datanodes container status: %s",
  178. ' '.join(all_datanode_container_status))
  179. return all_datanode_container_status
  180. @classmethod
  181. def create_volume(cls, docker_compose_file, volume_name):
  182. command = "docker-compose -f %s " \
  183. "exec ozone_client /opt/hadoop/bin/ozone " \
  184. "sh volume create /%s --user root" % \
  185. (docker_compose_file, volume_name)
  186. logger.info("Creating Volume %s", volume_name)
  187. exit_code, output = cls.run_cmd(command)
  188. assert exit_code == 0, "Ozone volume create failed with output=[%s]" \
  189. % output
  190. @classmethod
  191. def delete_volume(cls, docker_compose_file, volume_name):
  192. command = "docker-compose -f %s " \
  193. "exec ozone_client /opt/hadoop/bin/ozone " \
  194. "sh volume delete /%s" % (docker_compose_file, volume_name)
  195. logger.info("Deleting Volume %s", volume_name)
  196. exit_code, output = cls.run_cmd(command)
  197. return exit_code, output
  198. @classmethod
  199. def create_bucket(cls, docker_compose_file, bucket_name, volume_name):
  200. command = "docker-compose -f %s " \
  201. "exec ozone_client /opt/hadoop/bin/ozone " \
  202. "sh bucket create /%s/%s" % (docker_compose_file,
  203. volume_name, bucket_name)
  204. logger.info("Creating Bucket %s in volume %s",
  205. bucket_name, volume_name)
  206. exit_code, output = cls.run_cmd(command)
  207. assert exit_code == 0, "Ozone bucket create failed with output=[%s]" \
  208. % output
  209. @classmethod
  210. def delete_bucket(cls, docker_compose_file, bucket_name, volume_name):
  211. command = "docker-compose -f %s " \
  212. "exec ozone_client /opt/hadoop/bin/ozone " \
  213. "sh bucket delete /%s/%s" % (docker_compose_file,
  214. volume_name, bucket_name)
  215. logger.info("Running delete bucket of %s/%s", volume_name, bucket_name)
  216. exit_code, output = cls.run_cmd(command)
  217. return exit_code, output
  218. @classmethod
  219. def put_key(cls, docker_compose_file, bucket_name, volume_name,
  220. filepath, key_name=None, replication_factor=None):
  221. command = "docker-compose -f %s " \
  222. "exec ozone_client ls %s" % (docker_compose_file, filepath)
  223. exit_code, output = cls.run_cmd(command)
  224. assert exit_code == 0, "%s does not exist" % filepath
  225. if key_name is None:
  226. key_name = os.path.basename(filepath)
  227. command = "docker-compose -f %s " \
  228. "exec ozone_client /opt/hadoop/bin/ozone " \
  229. "sh key put /%s/%s/%s %s" % (docker_compose_file,
  230. volume_name, bucket_name,
  231. key_name, filepath)
  232. if replication_factor:
  233. command = "%s --replication=%s" % (command, replication_factor)
  234. logger.info("Creating key %s in %s/%s", key_name,
  235. volume_name, bucket_name)
  236. exit_code, output = cls.run_cmd(command)
  237. assert exit_code == 0, "Ozone put Key failed with output=[%s]" % output
  238. @classmethod
  239. def delete_key(cls, docker_compose_file, bucket_name, volume_name,
  240. key_name):
  241. command = "docker-compose -f %s " \
  242. "exec ozone_client /opt/hadoop/bin/ozone " \
  243. "sh key delete /%s/%s/%s" \
  244. % (docker_compose_file, volume_name, bucket_name, key_name)
  245. logger.info("Running delete key %s in %s/%s",
  246. key_name, volume_name, bucket_name)
  247. exit_code, output = cls.run_cmd(command)
  248. return exit_code, output
  249. @classmethod
  250. def get_key(cls, docker_compose_file, bucket_name, volume_name,
  251. key_name, filepath=None):
  252. if filepath is None:
  253. filepath = '.'
  254. command = "docker-compose -f %s " \
  255. "exec ozone_client /opt/hadoop/bin/ozone " \
  256. "sh key get /%s/%s/%s %s" % (docker_compose_file,
  257. volume_name, bucket_name,
  258. key_name, filepath)
  259. logger.info("Running get key %s in %s/%s", key_name,
  260. volume_name, bucket_name)
  261. exit_code, output = cls.run_cmd(command)
  262. assert exit_code == 0, "Ozone get Key failed with output=[%s]" % output
  263. @classmethod
  264. def find_checksum(cls, docker_compose_file, filepath, client="ozone_client"):
  265. """
  266. This function finds the checksum of a file present in a docker container.
  267. Before running any 'putKey' operation, this function is called to store
  268. the original checksum of the file. The file is then uploaded as a key.
  269. """
  270. command = "docker-compose -f %s " \
  271. "exec %s md5sum %s" % \
  272. (docker_compose_file, client, filepath)
  273. exit_code, output = cls.run_cmd(command)
  274. assert exit_code == 0, "Cant find checksum"
  275. myoutput = output.split("\n")
  276. finaloutput = ""
  277. for line in myoutput:
  278. if line.find("Warning") >= 0 or line.find("is not a tty") >= 0:
  279. logger.info("skip this line: %s", line)
  280. else:
  281. finaloutput = finaloutput + line
  282. checksum = finaloutput.split(" ")
  283. logger.info("Checksum of %s is : %s", filepath, checksum[0])
  284. return checksum[0]
  285. @classmethod
  286. def get_pipelines(cls, docker_compose_file):
  287. command = "docker-compose -f %s " \
  288. + "exec ozone_client /opt/hadoop/bin/ozone scmcli " \
  289. + "listPipelines" % (docker_compose_file)
  290. exit_code, output = cls.run_cmd(command)
  291. assert exit_code == 0, "list pipeline command failed"
  292. return output
  293. @classmethod
  294. def find_om_scm_client_datanodes(cls, container_list):
  295. om = filter(lambda x: 'om_1' in x, container_list)
  296. scm = filter(lambda x: 'scm' in x, container_list)
  297. datanodes = sorted(
  298. list(filter(lambda x: 'datanode' in x, container_list)))
  299. client = filter(lambda x: 'ozone_client' in x, container_list)
  300. return om, scm, client, datanodes