| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314 |
- #!/usr/bin/env python
- '''
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- '''
- import os
- import time
- import sys
- import urllib2
- import socket
- import re
- from ambari_commons import OSCheck
- from functools import wraps
- from exceptions import FatalException, NonFatalException, TimeoutError
- if OSCheck.is_windows_family():
- from ambari_commons.os_windows import os_run_os_command
- else:
- # MacOS not supported
- from ambari_commons.os_linux import os_run_os_command
- pass
- from logging_utils import *
- from os_check import OSCheck
- def openurl(url, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, *args, **kwargs):
- """
- :param url: url to open
- :param timeout: open timeout, raise TimeoutError on timeout
- :rtype urllib2.Request
- """
- try:
- return urllib2.urlopen(url, timeout=timeout, *args, **kwargs)
- except urllib2.URLError as e:
- # Python 2.6 timeout handling
- if hasattr(e, "reason") and isinstance(e.reason, socket.timeout):
- raise TimeoutError(e.reason)
- else:
- raise e # re-throw exception
- except socket.timeout as e: # Python 2.7 timeout handling
- raise TimeoutError(e)
- def download_file(link, destination, chunk_size=16 * 1024, progress_func = None):
- print_info_msg("Downloading {0} to {1}".format(link, destination))
- if os.path.exists(destination):
- print_warning_msg("File {0} already exists, assuming it was downloaded before".format(destination))
- return
- force_download_file(link, destination, chunk_size, progress_func = progress_func)
- def download_file_anyway(link, destination, chunk_size=16 * 1024, progress_func = None):
- print_info_msg("Trying to download {0} to {1} with python lib [urllib2].".format(link, destination))
- if os.path.exists(destination):
- print_warning_msg("File {0} already exists, assuming it was downloaded before".format(destination))
- return
- try:
- force_download_file(link, destination, chunk_size, progress_func = progress_func)
- except:
- print_error_msg("Download {0} with python lib [urllib2] failed with error: {1}".format(link, str(sys.exc_info())))
- if not os.path.exists(destination):
- print "Trying to download {0} to {1} with [curl] command.".format(link, destination)
- #print_info_msg("Trying to download {0} to {1} with [curl] command.".format(link, destination))
- curl_command = "curl --fail -k -o %s %s" % (destination, link)
- retcode, out, err = os_run_os_command(curl_command)
- if retcode != 0:
- print_error_msg("Download file {0} with [curl] command failed with error: {1}".format(link, out + err))
- if not os.path.exists(destination):
- print_error_msg("Unable to download file {0}!".format(link))
- print "ERROR: unable to donwload file %s!" % (link)
- def download_progress(file_name, downloaded_size, blockSize, totalSize):
- percent = int(downloaded_size * 100 / totalSize)
- status = "\r" + file_name
- if totalSize < blockSize:
- status += "... %d%%" % (100)
- else:
- status += "... %d%% (%.1f MB of %.1f MB)" % (
- percent, downloaded_size / 1024 / 1024.0, totalSize / 1024 / 1024.0)
- sys.stdout.write(status)
- sys.stdout.flush()
- def wait_for_port_opened(hostname, port, tries_count, try_sleep):
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.settimeout(2)
- for i in range(tries_count):
- if sock.connect_ex((hostname, port)) == 0:
- return True
- time.sleep(try_sleep)
- return False
- def find_range_components(meta):
- file_size = 0
- seek_pos = 0
- hdr_range = meta.getheaders("Content-Range")
- if len(hdr_range) > 0:
- range_comp1 = hdr_range[0].split('/')
- if len(range_comp1) > 1:
- range_comp2 = range_comp1[0].split(' ') #split away the "bytes" prefix
- if len(range_comp2) == 0:
- raise FatalException(12, 'Malformed Content-Range response header: "{0}".' % hdr_range)
- range_comp3 = range_comp2[1].split('-')
- seek_pos = int(range_comp3[0])
- if range_comp1[1] != '*': #'*' == unknown length
- file_size = int(range_comp1[1])
- if file_size == 0:
- #Try the old-fashioned way
- hdrLen = meta.getheaders("Content-Length")
- if len(hdrLen) == 0:
- raise FatalException(12, "Response header doesn't contain Content-Length. Chunked Transfer-Encoding is not supported for now.")
- file_size = int(hdrLen[0])
- return (file_size, seek_pos)
- def force_download_file(link, destination, chunk_size = 16 * 1024, progress_func = None):
- request = urllib2.Request(link)
- if os.path.exists(destination) and not os.path.isfile(destination):
- #Directory specified as target? Must be a mistake. Bail out, don't assume anything.
- err = 'Download target {0} is a directory.' % destination
- raise FatalException(1, err)
- (dest_path, file_name) = os.path.split(destination)
- temp_dest = destination + ".tmpdownload"
- partial_size = 0
- if os.path.exists(temp_dest):
- #Support for resuming downloads, in case the process is killed while downloading a file
- # set resume range
- # See http://stackoverflow.com/questions/6963283/python-urllib2-resume-download-doesnt-work-when-network-reconnects
- partial_size = os.stat(temp_dest).st_size
- if partial_size > chunk_size:
- #Re-download the last chunk, to minimize the possibilities of file corruption
- resume_pos = partial_size - chunk_size
- request.add_header("Range", "bytes=%s-" % resume_pos)
- else:
- #Make sure the full dir structure is in place, otherwise file open will fail
- if not os.path.exists(dest_path):
- os.makedirs(dest_path)
- response = urllib2.urlopen(request)
- (file_size, seek_pos) = find_range_components(response.info())
- print_info_msg("Downloading to: %s Bytes: %s" % (destination, file_size))
- if partial_size < file_size:
- if seek_pos == 0:
- #New file, create it
- open_mode = 'wb'
- else:
- #Resuming download of an existing file
- open_mode = 'rb+' #rb+ doesn't create the file, using wb to create it
- f = open(temp_dest, open_mode)
- try:
- #Resume the download from where it left off
- if seek_pos > 0:
- f.seek(seek_pos)
- file_size_dl = seek_pos
- while True:
- buffer = response.read(chunk_size)
- if not buffer:
- break
- file_size_dl += len(buffer)
- f.write(buffer)
- if progress_func is not None:
- progress_func(file_name, file_size_dl, chunk_size, file_size)
- finally:
- f.close()
- sys.stdout.write('\n')
- sys.stdout.flush()
- print_info_msg("Finished downloading {0} to {1}".format(link, destination))
- downloaded_size = os.stat(temp_dest).st_size
- if downloaded_size != file_size:
- err = 'Size of downloaded file {0} is {1} bytes, it is probably damaged or incomplete' % (destination, downloaded_size)
- raise FatalException(1, err)
- # when download is complete -> mv temp_dest destination
- if os.path.exists(destination):
- #Windows behavior: rename fails if the destination file exists
- os.unlink(destination)
- os.rename(temp_dest, destination)
- def resolve_address(address):
- """
- Resolves address to proper one in special cases, for example 0.0.0.0 to 127.0.0.1 on windows os.
- :param address: address to resolve
- :return: resulting address
- """
- if OSCheck.is_windows_family():
- if address == '0.0.0.0':
- return '127.0.0.1'
- return address
- def ensure_ssl_using_protocol(protocol="PROTOCOL_TLSv1", ca_certs=None):
- """
- Monkey patching ssl module to force it use tls_v1. Do this in common module to avoid problems with
- PythonReflectiveExecutor.
- :param protocol: one of ("PROTOCOL_SSLv2", "PROTOCOL_SSLv3", "PROTOCOL_SSLv23", "PROTOCOL_TLSv1", "PROTOCOL_TLSv1_1", "PROTOCOL_TLSv1_2")
- :param ca_certs: path to ca_certs file
- :return:
- """
- from functools import wraps
- import ssl
- if not hasattr(ssl.wrap_socket, "_ambari_patched"):
- def sslwrap(func):
- @wraps(func)
- def bar(*args, **kw):
- import ssl
- kw['ssl_version'] = getattr(ssl, protocol)
- if ca_certs and not 'ca_certs' in kw:
- kw['ca_certs'] = ca_certs
- kw['cert_reqs'] = ssl.CERT_REQUIRED
- return func(*args, **kw)
- bar._ambari_patched = True
- return bar
- ssl.wrap_socket = sslwrap(ssl.wrap_socket)
- # python 2.7 stuff goes here
- if hasattr(ssl, "_create_default_https_context"):
- if not hasattr(ssl._create_default_https_context, "_ambari_patched"):
- @wraps(ssl._create_default_https_context)
- def _create_default_https_context_patched():
- context = ssl.SSLContext(protocol = getattr(ssl, protocol))
- if ca_certs:
- context.load_verify_locations(ca_certs)
- context.verify_mode = ssl.CERT_REQUIRED
- context.check_hostname = False
- return context
- _create_default_https_context_patched._ambari_patched = True
- ssl._create_default_https_context = _create_default_https_context_patched
- """
- See RFC3986, Appendix B
- Tested on the following cases:
- "192.168.54.1"
- "192.168.54.2:7661
- "hdfs://192.168.54.3/foo/bar"
- "ftp://192.168.54.4:7842/foo/bar"
- Returns None if only a port is passed in
- """
- def get_host_from_url(uri):
- if uri is None:
- return None
- # if not a string, return None
- if not isinstance(uri, basestring):
- return None
- # RFC3986, Appendix B
- parts = re.findall('^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?', uri)
- # index of parts
- # scheme = 1
- # authority = 3
- # path = 4
- # query = 6
- # fragment = 8
- host_and_port = uri
- if 0 == len(parts[0][1]):
- host_and_port = parts[0][4]
- elif 0 == len(parts[0][2]):
- host_and_port = parts[0][1]
- elif parts[0][2].startswith("//"):
- host_and_port = parts[0][3]
- if -1 == host_and_port.find(':'):
- if host_and_port.isdigit():
- return None
- return host_and_port
- else:
- return host_and_port.split(':')[0]
|