Source code for sevenbridges.http.client

import copy
import json
import logging
import platform
import threading
from datetime import datetime

import requests
import urllib3

import sevenbridges
from sevenbridges.errors import SbgError, URITooLong
from sevenbridges.config import Config, format_proxies
from sevenbridges.models.enums import RequestParameters
from sevenbridges.decorators import check_for_error, throttle
from sevenbridges.http.error_handlers import maintenance_sleeper

logger = logging.getLogger(__name__)

client_info = {
    'version': sevenbridges.__version__,
    'os': platform.system(),
    'python': platform.python_version(),
    'requests': requests.__version__,
    'urllib3': urllib3.__version__,
}


[docs]class AAHeader: key = 'X-Sbg-Advance-Access' value = 'Advance'
[docs]class RequestSession(requests.Session): """Client session class"""
[docs] def send(self, request, **kwargs): """Send prepared request :param request: Prepared request to be sent :param kwargs: request keyword arguments :return: Request response """ if len(request.url) > RequestParameters.MAX_URL_LENGTH: raise URITooLong( message=( 'Request url too large, ' 'likely too many query parameters provided.' ) ) return super().send(request, **kwargs)
[docs]def generate_session( pool_connections, pool_maxsize, pool_block, proxies=None, retry_count=None, backoff_factor=None, ): """ Utility method to generate request sessions. :param pool_connections: The number of urllib3 connection pools to cache. :param pool_maxsize: The maximum number of connections to save in the pool. :param pool_block: Whether the connection pool should block for connections. :param proxies: Proxies dictionary. :param retry_count: Number of retries to attempt :param backoff_factor: Backoff factor for retries :return: requests.Session object. """ session = RequestSession() # Retry if no response from server, this applies to failed DNS lookups, # socket connections and connection timeouts retry_count = retry_count or RequestParameters.DEFAULT_RETRY_COUNT backoff_factor = backoff_factor or RequestParameters.DEFAULT_BACKOFF_FACTOR retries = urllib3.Retry(total=retry_count, backoff_factor=backoff_factor) # noinspection PyUnresolvedReferences adapter = requests.adapters.HTTPAdapter( pool_connections=pool_connections, pool_maxsize=pool_maxsize, pool_block=pool_block, max_retries=retries ) session.mount('http://', adapter) session.mount('https://', adapter) session.proxies = proxies return session
# noinspection PyBroadException
[docs]def config_vars(profiles, advance_access): """ Utility method to fetch config vars using ini section profile :param profiles: profile name. :param advance_access: advance_access flag. :return: """ for profile in profiles: try: config = Config(profile, advance_access=advance_access) url = config.api_endpoint token = config.auth_token proxies = config.proxies aa = config.advance_access return url, token, proxies, aa except Exception: pass return None, None, None, None
[docs]def mask_secrets(request_data): masked = copy.deepcopy(request_data) masked['headers']['X-SBG-Auth-Token'] = '*****' masked['headers']['X-SBG-Session-Id'] = '*****' return masked
[docs]class HttpClient: """ Implementation of all low-level API stuff, creating and sending requests, returning raw responses, authorization, etc. """ def __init__( self, url=None, token=None, oauth_token=None, config=None, timeout=None, proxies=None, error_handlers=None, advance_access=False, pool_connections=None, pool_maxsize=None, pool_block=True, max_parallel_requests=None, retry_count=None, backoff_factor=None ): if (url, token, config) == (None, None, None): url, token, proxies, advance_access = config_vars( [None, 'default'], advance_access ) elif config is not None: url = config.api_endpoint token = config.auth_token proxies = config.proxies advance_access = advance_access or config.advance_access else: proxies = format_proxies(proxies) if not url: raise SbgError( 'URL is missing! Configuration may contain errors, ' 'or the url parameter is missing.' ) self.url = url.rstrip('/') self.pool_connections = pool_connections self.pool_maxsize = pool_maxsize self.pool_block = pool_block self._session = generate_session( pool_connections=pool_connections, pool_maxsize=pool_maxsize, pool_block=pool_block, proxies=proxies, retry_count=retry_count, backoff_factor=backoff_factor, ) self.timeout = timeout self._throttle_limit = ( threading.Semaphore(max_parallel_requests) if max_parallel_requests else None ) self._limit = None self._remaining = None self._reset = None self._request_id = None self.headers = { 'Content-Type': 'application/json', 'User-Agent': ( 'sevenbridges-python/{version} ({os}, Python/{python}; ' 'requests/{requests}; urllib3/{urllib3})'.format(**client_info) ) } self.timeout = timeout self.token = token self.oauth_token = oauth_token if self.token: self.headers['X-SBG-Auth-Token'] = token elif self.oauth_token: self.headers['Authorization'] = f'Bearer {oauth_token}' else: raise SbgError( 'Required authorization model not selected!. ' 'Please provide at least one token value.' ) self.aa = advance_access if self.aa: logger.warning( 'Advance access features enabled. ' 'AA API calls can be subject to changes.' ) self.error_handlers = [maintenance_sleeper] if error_handlers and isinstance(error_handlers, list): for handler in error_handlers: if handler not in self.error_handlers: self.error_handlers.append(handler) @property def session(self): return self._session @property def limit(self): self._rate_limit() return int(self._limit) if self._limit else self._limit @property def remaining(self): self._rate_limit() return int(self._remaining) if self._remaining else self._remaining @property def reset_time(self): self._rate_limit() return datetime.fromtimestamp( float(self._reset) ) if self._reset else self._reset @property def request_id(self): return self._request_id
[docs] def add_error_handler(self, handler): if callable(handler) and handler not in self.error_handlers: self.error_handlers.append(handler)
[docs] def remove_error_handler(self, handler): if callable(handler) and handler in self.error_handlers: self.error_handlers.remove(handler)
def _rate_limit(self): self._request('GET', url='/rate_limit', append_base=True) @throttle @check_for_error def _request(self, verb, url, headers=None, params=None, data=None, append_base=False, stream=False): if not url: raise SbgError(message='Request url must be provided') if append_base: url = self.url + url if not headers: headers = self.headers else: headers.update(self.headers) if not (self.token or self.oauth_token): raise SbgError(message='Api instance must be authenticated.') if hasattr(self, '_session_id'): if 'X-SBG-Auth-Token' in self.headers: del self.headers['X-SBG-Auth-Token'] elif 'Authorization' in self.headers: del self.headers['Authorization'] self.headers['X-SBG-Session-Id'] = getattr(self, '_session_id') # If advance access is enabled if self.aa: self.headers[AAHeader.key] = AAHeader.value request_data = { 'verb': verb, 'url': url, 'headers': headers, 'params': params } masked_request_data = mask_secrets(request_data) if not stream: masked_request_data.update({'data': data}) logger.debug( "Request %s", masked_request_data, extra=masked_request_data ) response = self._session.request( verb, url, params=params, data=json.dumps(data), headers=headers, timeout=self.timeout, stream=stream ) else: logger.debug( 'Stream Request %s', masked_request_data, extra=masked_request_data ) response = self._session.request( verb, url, params=params, stream=stream, allow_redirects=True, ) if self.error_handlers: while True: for error_handler in self.error_handlers: handled_response = error_handler(self, response) # if error handler 'is_repeatable', and error handling # occurred, iterate again if hasattr(error_handler, 'is_repeatable'): if response != handled_response: response = handled_response break else: response = handled_response else: break headers = response.headers self._limit = headers.get('X-RateLimit-Limit', self._limit) self._remaining = headers.get('X-RateLimit-Remaining', self._remaining) self._reset = headers.get('X-RateLimit-Reset', self._reset) self._last_response_time = response.elapsed.total_seconds() self._request_id = headers.get('X-Request-Id', self._request_id) return response
[docs] def get(self, url, headers=None, params=None, data=None, append_base=True, stream=False): return self._request( 'GET', url=url, headers=headers, params=params, data=data, append_base=append_base, stream=stream )
[docs] def post(self, url, headers=None, params=None, data=None, append_base=True): return self._request('POST', url=url, headers=headers, params=params, data=data, append_base=append_base)
[docs] def put(self, url, headers=None, params=None, data=None, append_base=True): return self._request('PUT', url=url, headers=headers, params=params, data=data, append_base=append_base)
[docs] def patch(self, url, headers=None, params=None, data=None, append_base=True): return self._request('PATCH', url=url, headers=headers, params=params, data=data, append_base=append_base)
[docs] def delete(self, url, headers=None, params=None, append_base=True): return self._request('DELETE', url=url, headers=headers, params=params, data={}, append_base=append_base)
def __repr__(self): return '<API(%s) - "%s">' % (self.url, self.token) __str__ = __repr__