Source code for sevenbridges.models.task

import time
import logging

from sevenbridges.models.bulk import BulkRecord
from sevenbridges.decorators import inplace_reload
from sevenbridges.errors import (
    SbgError, TaskValidationError
)

from sevenbridges.meta.fields import (
    HrefField, UuidField, StringField,
    CompoundField, DateTimeField,
    BooleanField, DictField
)
from sevenbridges.meta.resource import Resource
from sevenbridges.meta.transformer import Transform

from sevenbridges.models.app import App
from sevenbridges.models.file import File
from sevenbridges.models.enums import FileApiFormats, TaskStatus
from sevenbridges.models.compound.price import Price
from sevenbridges.models.compound.tasks.batch_by import BatchBy
from sevenbridges.models.compound.tasks.batch_group import BatchGroup
from sevenbridges.models.compound.tasks.execution_status import ExecutionStatus
from sevenbridges.models.compound.tasks.input import Input
from sevenbridges.models.compound.tasks.output import Output
from sevenbridges.models.execution_details import ExecutionDetails

logger = logging.getLogger(__name__)


[docs]class Task(Resource): """ Central resource for managing tasks. """ _URL = { 'query': '/tasks', 'get': '/tasks/{id}', 'delete': '/tasks/{id}', 'run': '/tasks/{id}/actions/run', 'clone': '/tasks/{id}/actions/clone', 'abort': '/tasks/{id}/actions/abort', 'execution_details': "/tasks/{id}/execution_details", 'bulk_get': '/bulk/tasks/get', } href = HrefField(read_only=True) id = UuidField(read_only=True) name = StringField(read_only=False) status = StringField(read_only=True) description = StringField(read_only=False) project = StringField(read_only=False) app = StringField(read_only=False) type = StringField(read_only=True) created_by = StringField(read_only=True) executed_by = StringField(read_only=True) start_time = DateTimeField(read_only=True) created_time = DateTimeField(read_only=True) end_time = DateTimeField(read_only=True) batch = BooleanField(read_only=False) batch_by = CompoundField(BatchBy, read_only=False) batch_group = CompoundField(BatchGroup, read_only=True) batch_input = StringField(read_only=False) parent = StringField(read_only=True) execution_status = CompoundField(ExecutionStatus, read_only=True) errors = DictField(read_only=True) warnings = DictField(read_only=True) price = CompoundField(Price, read_only=True) inputs = CompoundField(Input, read_only=False) outputs = CompoundField(Output, read_only=True) execution_settings = DictField(read_only=True) output_location = DictField(read_only=True) use_interruptible_instances = BooleanField(read_only=False) origin = StringField(read_only=True, name='origin_id') def __str__(self): return f'<Task: id={self.id}>' def __eq__(self, other): if type(other) is not type(self): return False return self is other or self.id == other.id
[docs] @classmethod def query(cls, project=None, status=None, batch=None, parent=None, created_from=None, created_to=None, started_from=None, started_to=None, ended_from=None, ended_to=None, offset=None, limit=None, order_by=None, order=None, origin=None, api=None): """ Query (List) tasks. Date parameters may be both strings and python date objects. :param project: Target project. optional. :param status: Task status. :param batch: Only batch tasks. :param parent: Parent batch task identifier. :param ended_to: All tasks that ended until this date. :param ended_from: All tasks that ended from this date. :param started_to: All tasks that were started until this date. :param started_from: All tasks that were started from this date. :param created_to: All tasks that were created until this date. :param created_from: All tasks that were created from this date. :param offset: Pagination offset. :param limit: Pagination limit. :param order_by: Property to order by. :param order: Ascending or descending ordering. :param origin: Entity that created the task, e.g. automation run, if task was created by an automation run. :param api: Api instance. :return: Collection object. """ api = api or cls._API if parent: parent = Transform.to_task(parent) if project: project = Transform.to_project(project) if created_from: created_from = Transform.to_datestring(created_from) if created_to: created_to = Transform.to_datestring(created_to) if started_from: started_from = Transform.to_datestring(started_from) if started_to: started_to = Transform.to_datestring(started_to) if ended_from: ended_from = Transform.to_datestring(ended_from) if ended_to: ended_to = Transform.to_datestring(ended_to) if origin: origin = Transform.to_automation_run(origin) return super()._query( url=cls._URL['query'], project=project, status=status, batch=batch, parent=parent, created_from=created_from, created_to=created_to, started_from=started_from, started_to=started_to, ended_from=ended_from, ended_to=ended_to, offset=offset, limit=limit, order_by=order_by, order=order, fields='_all', origin_id=origin, api=api )
[docs] @classmethod def create(cls, name, project, app, revision=None, batch_input=None, batch_by=None, inputs=None, description=None, run=False, disable_batch=False, interruptible=None, execution_settings=None, output_location=None, api=None): """ Creates a task on server. :param name: Task name. :param project: Project identifier. :param app: CWL app identifier. :param revision: CWL app revision. :param batch_input: Batch input. :param batch_by: Batch criteria. :param inputs: Input map. :param description: Task description. :param run: True if you want to run a task upon creation. :param disable_batch: If True disables batching of a batch task. :param interruptible: If True interruptible instance will be used. :param execution_settings: Execution settings for the task. :param output_location: Dictionary that allows you to define the exact location where your task outputs will be stored. :param api: Api instance. :return: Task object. :raises: TaskValidationError if validation Fails. :raises: SbgError if any exception occurs during request. """ task_data = {} params = {} project = Transform.to_project(project) app_id = Transform.to_app(app) if revision: app_id = f'{app_id}/{revision}' else: if isinstance(app, App): app_id = f'{app_id}/{app.revision}' task_inputs = { 'inputs': Task._serialize_inputs(inputs) if inputs else {} } if batch_input and batch_by: task_data['batch_input'] = batch_input task_data['batch_by'] = batch_by if disable_batch: params.update({'batch': False}) task_meta = { 'name': name, 'project': project, 'app': app_id, 'description': description, } task_data.update(task_meta) task_data.update(task_inputs) if interruptible is not None: task_data['use_interruptible_instances'] = interruptible if execution_settings: task_data.update({'execution_settings': execution_settings}) if output_location: task_data.update({'output_location': output_location}) if run: params.update({'action': 'run'}) api = api if api else cls._API created_task = api.post(cls._URL['query'], data=task_data, params=params).json() if run and 'errors' in created_task and created_task['errors']: raise TaskValidationError( 'Unable to run task! Task contains errors.', task=Task(api=api, **created_task) ) return Task(api=api, **created_task)
@inplace_reload def abort(self, inplace=True): """ Abort task :param inplace Apply action on the current object or return a new one. :return: Task object. """ extra = { 'resource': type(self).__name__, 'query': {'id': self.id} } logger.info('Aborting task', extra=extra) task_data = self._api.post( url=self._URL['abort'].format(id=self.id)).json() return Task(api=self._api, **task_data) @inplace_reload def run(self, batch=True, interruptible=None, inplace=True): """ Run task :param batch if False batching will be disabled. :param interruptible: If true interruptible instance will be used. :param inplace Apply action on the current object or return a new one. :return: Task object. """ params = {} if not batch: params['batch'] = False if interruptible is not None: params['use_interruptible_instances'] = interruptible extra = { 'resource': type(self).__name__, 'query': {'id': self.id, 'batch': batch} } logger.info('Running task', extra=extra) task_data = self._api.post( url=self._URL['run'].format(id=self.id), params=params).json() return Task(api=self._api, **task_data)
[docs] def clone(self, run=True): """ Clone task :param run: run task after cloning :return: Task object. """ params = {} if run: params.update({'action': 'run'}) extra = { 'resource': type(self).__name__, 'query': {'id': self.id, 'run': run} } logger.info('Cloning task', extra=extra) task_data = self._api.post( url=self._URL['clone'].format(id=self.id), params=params).json() return Task(api=self._api, **task_data)
@inplace_reload def save(self, inplace=True): """ Saves all modification to the task on the server. :param inplace Apply edits on the current instance or get a new one. :return: Task instance. """ modified_data = self._modified_data() if modified_data: task_request_data = {} inputs = modified_data.pop('inputs', None) execution_settings = modified_data.pop('execution_settings', None) task_request_data.update(modified_data) if inputs: task_request_data['inputs'] = self._serialize_inputs(inputs) if execution_settings: task_request_data['execution_settings'] = ( self._serialize_execution_settings(execution_settings) ) extra = { 'resource': type(self).__name__, 'query': {'id': self.id, 'data': task_request_data} } logger.info('Saving task', extra=extra) data = self._api.patch(url=self._URL['get'].format(id=self.id), data=task_request_data).json() task = Task(api=self._api, **data) return task def _serialize_execution_settings(self, execution_settings): instance_type = execution_settings.get( 'instance_type', self.execution_settings.get('instance_type', None) ) max_parallel_instances = execution_settings.get( 'max_parallel_instances', self.execution_settings.get('max_parallel_instances', None) ) use_memoization = execution_settings.get( 'use_memoization', self.execution_settings.get('use_memoization', None) ) use_elastic_disk = execution_settings.get( 'use_elastic_disk', self.execution_settings.get('use_elastic_disk', None) ) serialized_es_mapping = { 'instance_type': instance_type, 'max_parallel_instances': max_parallel_instances, 'use_memoization': use_memoization, 'use_elastic_disk': use_elastic_disk } serialized_es = dict() for key, value in serialized_es_mapping.items(): if value is not None: serialized_es[key] = value return serialized_es @staticmethod def _serialize_inputs(input_value): """ Recursively serialises input dictionary. :param input_value: input dictionary to serialize :return: serialized input dictionary """ if isinstance(input_value, list): return_value = [] for elem in input_value: return_value.append(Task._serialize_inputs(elem)) elif isinstance(input_value, dict): return_value = {} for key in input_value: return_value[key] = Task._serialize_inputs( input_value[key] ) elif isinstance(input_value, File): return_value = Task._to_api_file_format(input_value) else: return_value = input_value return return_value @staticmethod def _to_api_file_format(_file): return { 'class': ( FileApiFormats.FOLDER if _file.is_folder() else FileApiFormats.FILE ), 'path': _file.id }
[docs] def get_execution_details(self): """ Retrieves execution details for a task. :return: Execution details instance. """ extra = { 'resource': type(self).__name__, 'query': {'id': self.id} } logger.info('Get execution details', extra=extra) data = self._api.get( self._URL['execution_details'].format(id=self.id)).json() return ExecutionDetails(api=self._api, **data)
[docs] def get_batch_children(self, status=None, created_from=None, created_to=None, started_from=None, started_to=None, ended_from=None, ended_to=None, order_by=None, order=None, offset=None, limit=None, api=None): """ Retrieves batch child tasks for this task if its a batch task. :return: Collection instance. :raises SbError if task is not a batch task. """ api = api or self._api if not self.batch: raise SbgError("This task is not a batch task.") return self.query( parent=self.id, status=status, created_from=created_from, created_to=created_to, started_from=started_from, started_to=started_to, ended_from=ended_from, ended_to=ended_to, order_by=order_by, order=order, offset=offset, limit=limit, api=api, )
[docs] @classmethod def bulk_get(cls, tasks, api=None): """ Retrieve tasks with specified ids in bulk :param tasks: Tasks to be retrieved. :param api: Api instance. :return: List of TaskBulkRecord objects. """ api = api or cls._API task_ids = [Transform.to_task(task) for task in tasks] data = {'task_ids': task_ids} logger.debug('Getting tasks in bulk.') response = api.post(url=cls._URL['bulk_get'], data=data) return TaskBulkRecord.parse_records(response=response, api=api)
[docs] def wait(self=None, period=10, callback=None, *args, **kwargs): """ Wait until task is complete :param period: Time in seconds between reloads :param callback: Function to call after the task has finished, arguments and keyword arguments can be provided for it :return: Return value of provided callback function or None if a callback function was not provided """ while self.status not in TaskStatus.terminal_states: self.reload() time.sleep(period) if callback: return callback(*args, **kwargs)
[docs]class TaskBulkRecord(BulkRecord): resource = CompoundField(cls=Task, read_only=False) def __str__(self): return f'<TaskBulkRecord valid={self.valid}>'