import functools
import requests
from requests.adapters import HTTPAdapter, Retry
MAX_REQUEST_RETRIES = 3
RETRY_BACKOFF_FACTOR = 1
RETRY_ON_STATUS = [408, 429, 500, 502, 503, 504]
class HTTPSAdaptor(HTTPAdapter):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._key_password = None
def get_connection(self, url, proxies=None):
conn = super().get_connection(url, proxies)
if self._key_password is not None:
setattr(conn, 'key_password', self._key_password)
return conn
def set_key_password(self, password):
self._key_password = password
def create_requests_session(
username=None, password=None,
ssl_context=None,
timeout=None,
max_retry=MAX_REQUEST_RETRIES,
retry_backoff_factor=RETRY_BACKOFF_FACTOR
):
session = requests.Session()
retries = Retry(
total=max_retry,
connect=max_retry,
backoff_factor=retry_backoff_factor
)
if username and password:
session.auth = (username, password)
session.mount('http://', HTTPAdapter(max_retries=retries))
if ssl_context:
https_adaptor = HTTPSAdaptor(max_retries=retries)
https_adaptor.set_key_password(ssl_context.ssl_keyfile_password)
session.mount('https://', https_adaptor)
f = functools.partial(session.request,
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 6.0; WOW64; rv:24.0) Gecko/20100101 '
'Firefox/24.0'},
timeout=timeout,
verify=ssl_context.ssl_ca_file,
cert=(ssl_context.ssl_certfile, ssl_context.ssl_keyfile))
session.request = f
else:
session.mount('https://', HTTPAdapter(max_retries=retries))
return session