inet_utils.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. #!/usr/bin/env python
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. import os
  18. import time
  19. import sys
  20. import urllib2
  21. import socket
  22. import re
  23. from ambari_commons import OSCheck
  24. from functools import wraps
  25. from exceptions import FatalException, NonFatalException, TimeoutError
  26. if OSCheck.is_windows_family():
  27. from ambari_commons.os_windows import os_run_os_command
  28. else:
  29. # MacOS not supported
  30. from ambari_commons.os_linux import os_run_os_command
  31. pass
  32. from logging_utils import *
  33. from os_check import OSCheck
  34. def openurl(url, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, *args, **kwargs):
  35. """
  36. :param url: url to open
  37. :param timeout: open timeout, raise TimeoutError on timeout
  38. :rtype urllib2.Request
  39. """
  40. try:
  41. return urllib2.urlopen(url, timeout=timeout, *args, **kwargs)
  42. except urllib2.URLError as e:
  43. # Python 2.6 timeout handling
  44. if hasattr(e, "reason") and isinstance(e.reason, socket.timeout):
  45. raise TimeoutError(e.reason)
  46. else:
  47. raise e # re-throw exception
  48. except socket.timeout as e: # Python 2.7 timeout handling
  49. raise TimeoutError(e)
  50. def download_file(link, destination, chunk_size=16 * 1024, progress_func = None):
  51. print_info_msg("Downloading {0} to {1}".format(link, destination))
  52. if os.path.exists(destination):
  53. print_warning_msg("File {0} already exists, assuming it was downloaded before".format(destination))
  54. return
  55. force_download_file(link, destination, chunk_size, progress_func = progress_func)
  56. def download_file_anyway(link, destination, chunk_size=16 * 1024, progress_func = None):
  57. print_info_msg("Trying to download {0} to {1} with python lib [urllib2].".format(link, destination))
  58. if os.path.exists(destination):
  59. print_warning_msg("File {0} already exists, assuming it was downloaded before".format(destination))
  60. return
  61. try:
  62. force_download_file(link, destination, chunk_size, progress_func = progress_func)
  63. except:
  64. print_error_msg("Download {0} with python lib [urllib2] failed with error: {1}".format(link, str(sys.exc_info())))
  65. if not os.path.exists(destination):
  66. print "Trying to download {0} to {1} with [curl] command.".format(link, destination)
  67. #print_info_msg("Trying to download {0} to {1} with [curl] command.".format(link, destination))
  68. curl_command = "curl --fail -k -o %s %s" % (destination, link)
  69. retcode, out, err = os_run_os_command(curl_command)
  70. if retcode != 0:
  71. print_error_msg("Download file {0} with [curl] command failed with error: {1}".format(link, out + err))
  72. if not os.path.exists(destination):
  73. print_error_msg("Unable to download file {0}!".format(link))
  74. print "ERROR: unable to donwload file %s!" % (link)
  75. def download_progress(file_name, downloaded_size, blockSize, totalSize):
  76. percent = int(downloaded_size * 100 / totalSize)
  77. status = "\r" + file_name
  78. if totalSize < blockSize:
  79. status += "... %d%%" % (100)
  80. else:
  81. status += "... %d%% (%.1f MB of %.1f MB)" % (
  82. percent, downloaded_size / 1024 / 1024.0, totalSize / 1024 / 1024.0)
  83. sys.stdout.write(status)
  84. sys.stdout.flush()
  85. def wait_for_port_opened(hostname, port, tries_count, try_sleep):
  86. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  87. sock.settimeout(2)
  88. for i in range(tries_count):
  89. if sock.connect_ex((hostname, port)) == 0:
  90. return True
  91. time.sleep(try_sleep)
  92. return False
  93. def find_range_components(meta):
  94. file_size = 0
  95. seek_pos = 0
  96. hdr_range = meta.getheaders("Content-Range")
  97. if len(hdr_range) > 0:
  98. range_comp1 = hdr_range[0].split('/')
  99. if len(range_comp1) > 1:
  100. range_comp2 = range_comp1[0].split(' ') #split away the "bytes" prefix
  101. if len(range_comp2) == 0:
  102. raise FatalException(12, 'Malformed Content-Range response header: "{0}".' % hdr_range)
  103. range_comp3 = range_comp2[1].split('-')
  104. seek_pos = int(range_comp3[0])
  105. if range_comp1[1] != '*': #'*' == unknown length
  106. file_size = int(range_comp1[1])
  107. if file_size == 0:
  108. #Try the old-fashioned way
  109. hdrLen = meta.getheaders("Content-Length")
  110. if len(hdrLen) == 0:
  111. raise FatalException(12, "Response header doesn't contain Content-Length. Chunked Transfer-Encoding is not supported for now.")
  112. file_size = int(hdrLen[0])
  113. return (file_size, seek_pos)
  114. def force_download_file(link, destination, chunk_size = 16 * 1024, progress_func = None):
  115. request = urllib2.Request(link)
  116. if os.path.exists(destination) and not os.path.isfile(destination):
  117. #Directory specified as target? Must be a mistake. Bail out, don't assume anything.
  118. err = 'Download target {0} is a directory.' % destination
  119. raise FatalException(1, err)
  120. (dest_path, file_name) = os.path.split(destination)
  121. temp_dest = destination + ".tmpdownload"
  122. partial_size = 0
  123. if os.path.exists(temp_dest):
  124. #Support for resuming downloads, in case the process is killed while downloading a file
  125. # set resume range
  126. # See http://stackoverflow.com/questions/6963283/python-urllib2-resume-download-doesnt-work-when-network-reconnects
  127. partial_size = os.stat(temp_dest).st_size
  128. if partial_size > chunk_size:
  129. #Re-download the last chunk, to minimize the possibilities of file corruption
  130. resume_pos = partial_size - chunk_size
  131. request.add_header("Range", "bytes=%s-" % resume_pos)
  132. else:
  133. #Make sure the full dir structure is in place, otherwise file open will fail
  134. if not os.path.exists(dest_path):
  135. os.makedirs(dest_path)
  136. response = urllib2.urlopen(request)
  137. (file_size, seek_pos) = find_range_components(response.info())
  138. print_info_msg("Downloading to: %s Bytes: %s" % (destination, file_size))
  139. if partial_size < file_size:
  140. if seek_pos == 0:
  141. #New file, create it
  142. open_mode = 'wb'
  143. else:
  144. #Resuming download of an existing file
  145. open_mode = 'rb+' #rb+ doesn't create the file, using wb to create it
  146. f = open(temp_dest, open_mode)
  147. try:
  148. #Resume the download from where it left off
  149. if seek_pos > 0:
  150. f.seek(seek_pos)
  151. file_size_dl = seek_pos
  152. while True:
  153. buffer = response.read(chunk_size)
  154. if not buffer:
  155. break
  156. file_size_dl += len(buffer)
  157. f.write(buffer)
  158. if progress_func is not None:
  159. progress_func(file_name, file_size_dl, chunk_size, file_size)
  160. finally:
  161. f.close()
  162. sys.stdout.write('\n')
  163. sys.stdout.flush()
  164. print_info_msg("Finished downloading {0} to {1}".format(link, destination))
  165. downloaded_size = os.stat(temp_dest).st_size
  166. if downloaded_size != file_size:
  167. err = 'Size of downloaded file {0} is {1} bytes, it is probably damaged or incomplete' % (destination, downloaded_size)
  168. raise FatalException(1, err)
  169. # when download is complete -> mv temp_dest destination
  170. if os.path.exists(destination):
  171. #Windows behavior: rename fails if the destination file exists
  172. os.unlink(destination)
  173. os.rename(temp_dest, destination)
  174. def resolve_address(address):
  175. """
  176. Resolves address to proper one in special cases, for example 0.0.0.0 to 127.0.0.1 on windows os.
  177. :param address: address to resolve
  178. :return: resulting address
  179. """
  180. if OSCheck.is_windows_family():
  181. if address == '0.0.0.0':
  182. return '127.0.0.1'
  183. return address
  184. def ensure_ssl_using_protocol(protocol="PROTOCOL_TLSv1", ca_certs=None):
  185. """
  186. Monkey patching ssl module to force it use tls_v1. Do this in common module to avoid problems with
  187. PythonReflectiveExecutor.
  188. :param protocol: one of ("PROTOCOL_SSLv2", "PROTOCOL_SSLv3", "PROTOCOL_SSLv23", "PROTOCOL_TLSv1", "PROTOCOL_TLSv1_1", "PROTOCOL_TLSv1_2")
  189. :param ca_certs: path to ca_certs file
  190. :return:
  191. """
  192. from functools import wraps
  193. import ssl
  194. if not hasattr(ssl.wrap_socket, "_ambari_patched"):
  195. def sslwrap(func):
  196. @wraps(func)
  197. def bar(*args, **kw):
  198. import ssl
  199. kw['ssl_version'] = getattr(ssl, protocol)
  200. if ca_certs and not 'ca_certs' in kw:
  201. kw['ca_certs'] = ca_certs
  202. kw['cert_reqs'] = ssl.CERT_REQUIRED
  203. return func(*args, **kw)
  204. bar._ambari_patched = True
  205. return bar
  206. ssl.wrap_socket = sslwrap(ssl.wrap_socket)
  207. # python 2.7 stuff goes here
  208. if hasattr(ssl, "_create_default_https_context"):
  209. if not hasattr(ssl._create_default_https_context, "_ambari_patched"):
  210. @wraps(ssl._create_default_https_context)
  211. def _create_default_https_context_patched():
  212. context = ssl.SSLContext(protocol = getattr(ssl, protocol))
  213. if ca_certs:
  214. context.load_verify_locations(ca_certs)
  215. context.verify_mode = ssl.CERT_REQUIRED
  216. context.check_hostname = False
  217. return context
  218. _create_default_https_context_patched._ambari_patched = True
  219. ssl._create_default_https_context = _create_default_https_context_patched
  220. """
  221. See RFC3986, Appendix B
  222. Tested on the following cases:
  223. "192.168.54.1"
  224. "192.168.54.2:7661
  225. "hdfs://192.168.54.3/foo/bar"
  226. "ftp://192.168.54.4:7842/foo/bar"
  227. Returns None if only a port is passed in
  228. """
  229. def get_host_from_url(uri):
  230. if uri is None:
  231. return None
  232. # if not a string, return None
  233. if not isinstance(uri, basestring):
  234. return None
  235. # RFC3986, Appendix B
  236. parts = re.findall('^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?', uri)
  237. # index of parts
  238. # scheme = 1
  239. # authority = 3
  240. # path = 4
  241. # query = 6
  242. # fragment = 8
  243. host_and_port = uri
  244. if 0 == len(parts[0][1]):
  245. host_and_port = parts[0][4]
  246. elif 0 == len(parts[0][2]):
  247. host_and_port = parts[0][1]
  248. elif parts[0][2].startswith("//"):
  249. host_and_port = parts[0][3]
  250. if -1 == host_and_port.find(':'):
  251. if host_and_port.isdigit():
  252. return None
  253. return host_and_port
  254. else:
  255. return host_and_port.split(':')[0]