Source code for sevenbridges.transfer.download

import os
import time
import logging
import hashlib
import threading

import requests

from sevenbridges.errors import SbgError
from sevenbridges.http.client import generate_session
from sevenbridges.models.enums import (
    PartSize, TransferState, RequestParameters
)
from sevenbridges.transfer.utils import Part, Progress, total_parts


logger = logging.getLogger(__name__)


def _download_part(path, session, url, timeout, start_byte, end_byte):
    """
    Downloads a single part.
    :param path: File path.
    :param session: Requests session.
    :param url: Url of the resource.
    :param timeout: Session timeout.
    :param start_byte: Start byte of the part.
    :param end_byte: End byte of the part.
    :return:
    """
    try:
        fp = os.open(path, os.O_CREAT | os.O_WRONLY)
    except IOError:
        raise SbgError('Unable to open file %s' % path)

    # Prepare range headers.
    headers = {}
    if end_byte is not None:
        headers['Range'] = 'bytes=%d-%d' % (int(start_byte), int(end_byte))

    try:
        response = session.get(
            url=url, headers=headers, timeout=timeout, stream=True
        )
        response.raise_for_status()
        part_size = response.headers.get('Content-Length')
        os.lseek(fp, start_byte, os.SEEK_SET)
        for part in response.iter_content(32 * PartSize.KB):
            os.write(fp, part)
        os.close(fp)
        return Part(start=start_byte, size=float(part_size))
    except (requests.HTTPError, requests.RequestException) as e:
        raise SbgError(f'Failed to download file. Response: {e}')


def _get_content_length(session, url, timeout):
    try:
        response = session.get(url, timeout=timeout, stream=True)
    except requests.RequestException as e:
        raise SbgError(str(e))

    file_size = response.headers.get('Content-Length', None)
    if file_size is None:
        raise SbgError('Server did not provide Content-Length Headers!')

    return file_size


[docs]class DPartedFile: def __init__( self, file_path, session, url, file_size, part_size, timeout, pool ): """ Emulates the partitioned file. Uses the download pool attached to the api session to download file parts. :param file_path: Full path to the new file. :param session: Requests session. :param url: Resource url. :param file_size: Resource file size. :param part_size: Part size. :param timeout: Session timeout. :param pool: Download pool. """ self.url = url self.file_path = file_path self.session = session self.file_size = file_size self.part_size = part_size self.timeout = timeout self.submitted = 0 self.total_submitted = 0 self.total = total_parts(self.file_size, self.part_size) self.pool = pool self.parts = self.get_parts()
[docs] def submit(self): """ Partitions the file into chunks and submits them into group of 4 for download on the api download pool. """ futures = [] while self.submitted < 4 and not self.done(): part = self.parts.pop(0) futures.append( self.pool.submit( _download_part, self.file_path, self.session, self.url, self.timeout, *part ) ) self.submitted += 1 self.total_submitted += 1 return futures
[docs] def done(self): return self.total_submitted == self.total
def __iter__(self): futures = self.submit() while futures: future = futures[0] self.submitted -= 1 futures.remove(future) futures.extend(self.submit()) yield future.result()
[docs] def get_parts(self): """ Partitions the file and saves the part information in memory. """ parts = [] start_b = 0 end_byte = start_b + self.part_size - 1 for _ in range(self.total): parts.append([start_b, end_byte]) start_b = end_byte + 1 end_byte = start_b + self.part_size - 1 return parts
# noinspection PyCallingNonCallable,PyTypeChecker,PyProtectedMember
[docs]class Download(threading.Thread): def __init__( self, url, file_path, part_size=None, retry_count=None, timeout=None, api=None ): """ File multipart downloader. :param url: URL of the file. :param file_path: Local file path. :param retry_count: Number of times to retry on error. :param timeout: Connection timeout in seconds. :param part_size: Size of the parts in bytes. :param api: Api instance. """ threading.Thread.__init__(self) self.daemon = True if api is None: raise SbgError('Api instance missing.') if part_size and part_size < PartSize.DOWNLOAD_MINIMUM_PART_SIZE: self._status = TransferState.FAILED raise SbgError( f'Part size is too small! Minimum get_parts size ' f'is {PartSize.DOWNLOAD_MINIMUM_PART_SIZE}' ) self.url = url self._file_path = file_path # append unique suffix to the file suffix = hashlib.sha1(self._file_path.encode('utf-8')).hexdigest()[:10] self._temp_file = f'{self._file_path}.{suffix}' self._retry_count = ( retry_count or RequestParameters.DEFAULT_RETRY_COUNT ) self._timeout = timeout or RequestParameters.DEFAULT_TIMEOUT self._part_size = part_size or PartSize.DOWNLOAD_MINIMUM_PART_SIZE self._api = api self._bytes_done = 0 self._running = threading.Event() self._callback = None self._errorback = None self._progress_callback = None self._time_started = 0 self._session = generate_session( pool_connections=self._api.pool_connections, pool_maxsize=self._api.pool_maxsize, pool_block=self._api.pool_block, proxies=self._api.session.proxies, retry_count=self._retry_count, ) try: self._file_size = self._get_file_size() except SbgError as error: if self._errorback: self._errorback(error) else: raise error self._status = TransferState.PREPARING self._stop_signal = False def __repr__(self): return f'<Download: status={self.status}>' @property def progress(self): return (self._bytes_done / self._file_size) * 100 @property def status(self): return self._status @property def start_time(self): return self._time_started @property def duration(self): return (time.time() - self._time_started) * 1000 @property def path(self): return self._file_path
[docs] def add_callback(self, callback=None, errorback=None): """ Adds a callback that will be called when the download finishes successfully or when error is raised. """ self._callback = callback self._errorback = errorback
[docs] def add_progress_callback(self, callback=None): """ Adds a progress callback that will be called each time a get_parts is successfully downloaded. The first argument of the progress callback will be a progress object described in sevenbridges.transfer.utils :param callback: Callback function """ self._progress_callback = callback
[docs] def pause(self): """ Pauses the download. :raises SbgError: If upload is not in RUNNING state. """ if self._status == TransferState.RUNNING: self._running.clear() self._status = TransferState.PAUSED else: raise SbgError('Can not pause. Download not in RUNNING state.')
[docs] def stop(self): """ Stops the download. :raises SbgError: If download is not in PAUSED or RUNNING state. """ if self.status in (TransferState.PAUSED, TransferState.RUNNING): self._stop_signal = True self.join() self._status = TransferState.STOPPED if self._callback: return self._callback(self._status) else: raise SbgError( 'Can not stop. Download not in PAUSED or RUNNING state.' )
[docs] def resume(self): """ Resumes the download. :raises SbgError: If download is not in RUNNING state. """ if self._status != TransferState.PAUSED: self._running.set() self._status = TransferState.RUNNING else: raise SbgError('Can not pause. Download not in PAUSED state.')
[docs] def wait(self): """ Blocks until download is completed. """ self.join()
[docs] def start(self): """ Starts the download. :raises SbgError: If download is not in PREPARING state. """ if self._status == TransferState.PREPARING: self._running.set() super().start() self._status = TransferState.RUNNING self._time_started = time.time() else: raise SbgError( 'Unable to start. Download not in PREPARING state.' )
[docs] def run(self): """ Runs the thread! Should not be used use start() method instead. """ self._running.set() self._status = TransferState.RUNNING self._time_started = time.time() parted_file = DPartedFile( file_path=self._temp_file, session=self._session, url=self.url, file_size=self._file_size, part_size=self._part_size, timeout=self._timeout, pool=self._api.download_pool, ) try: for part in parted_file: if self._stop_signal: return self._running.wait() self._bytes_done += part.size if self._progress_callback: progress = Progress( parted_file.total, parted_file.total_submitted, self._bytes_done, self._file_size, self.duration ) self._progress_callback(progress) except Exception as exc: if self._errorback: return self._errorback(exc) else: raise SbgError('Download failed! %s' % str(exc)) self._status = TransferState.COMPLETED try: os.rename(self._temp_file, self._file_path) except Exception as e: raise SbgError(f'Unable to rename the file due to an error: {e}.') if self._callback: return self._callback(self._status)
def _get_file_size(self): """ Fetches file size by reading the Content-Length header for the resource. :return: File size. """ file_size = int( _get_content_length(self._session, self.url, self._timeout) ) if file_size == 0: with open(self._temp_file, 'a', encoding='utf-8'): # Create file if empty pass return file_size