import logging
from sevenbridges.decorators import inplace_reload
from sevenbridges.errors import ResourceNotModified, SbgError
from sevenbridges.meta.fields import (
DictField,
HrefField,
UuidField,
StringField,
CompoundField,
DateTimeField,
BooleanField,
IntegerField,
)
from sevenbridges.meta.resource import Resource
from sevenbridges.meta.transformer import Transform
from sevenbridges.models.enums import AutomationRunActions, RequestParameters
from sevenbridges.models.file import File
from sevenbridges.models.member import Permissions
from sevenbridges.transfer.upload import CodePackageUpload
logger = logging.getLogger(__name__)
[docs]class AutomationPackage(Resource):
"""
Central resource for managing automation packages.
"""
_URL = {
'query': '/automation/automations/{automation_id}/packages',
'get': '/automation/packages/{id}',
'archive': "/automation/automations/{automation_id}"
"/packages/{id}/actions/archive",
'restore': "/automation/automations/{automation_id}"
"/packages/{id}/actions/restore",
}
id = StringField(read_only=True)
automation = UuidField(read_only=True)
version = StringField(read_only=True)
location = StringField(read_only=True)
schema = DictField(read_only=True)
created_by = StringField(read_only=True)
created_on = DateTimeField(read_only=True)
archived = BooleanField(read_only=True)
custom_url = StringField(read_only=False)
memory_limit = IntegerField(read_only=False)
python = StringField(read_only=True)
def __eq__(self, other):
if type(other) is not type(self):
return False
return self is other or self.id == other.id
def __str__(self):
return f'<AutomationPackage: id={self.id}>'
[docs] @classmethod
def query(cls, automation, offset=None, limit=None, api=None):
"""
Query (List) automation packages.
:param automation: Automation id.
:param offset: Pagination offset.
:param limit: Pagination limit.
:param api: Api instance.
:return: collection object
"""
automation_id = Transform.to_automation(automation)
api = api or cls._API
return super()._query(
url=cls._URL['query'].format(automation_id=automation_id),
offset=offset,
limit=limit,
api=api,
)
[docs] @classmethod
def create(cls, automation, version, location, schema, memory_limit=None,
python=None, api=None):
"""
Create a code package.
:param automation: Automation id.
:param version: File ID of the uploaded code package.
:param location: The code package version.
:param schema: IO schema for main step of execution.
:param python: Version of Python to execute Code Package with. Allowed
values are '3.6', '3.7', '3.8'. Default is '3.6'.
:param memory_limit: Memory limit in MB.
:param api: Api instance.
:return:
"""
automation_id = Transform.to_automation(automation)
api = api if api else cls._API
if version is None:
raise SbgError('Code package version is required!')
if location is None:
raise SbgError('Code package location is required!')
if schema is None:
raise SbgError('Schema is required!')
data = {
'version': version,
'location': location,
'schema': schema,
'memory_limit': memory_limit,
'python': python
}
extra = {
'resource': cls.__name__,
'query': data
}
package_data = api.post(
cls._URL['query'].format(automation_id=automation_id), data=data
).json()
logger.info(
'Add code package to automation with id %s',
automation_id, extra=extra
)
return AutomationPackage(api=api, **package_data)
@inplace_reload
def archive(self):
"""
Archive package
:return: AutomationPackage object.
"""
automation_id = Transform.to_automation(self.automation)
extra = {
'resource': type(self).__name__,
'query': {
'id': self.id,
}
}
logger.info('Archive automation package', extra=extra)
package_data = self._api.post(
url=self._URL['archive'].format(
automation_id=automation_id, id=self.id
)
).json()
return AutomationPackage(api=self._api, **package_data)
@inplace_reload
def restore(self):
"""
Restore archived package
:return: AutomationPackage object.
"""
automation_id = Transform.to_automation(self.automation)
extra = {
'resource': type(self).__name__,
'query': {
'id': self.id,
}
}
logger.info('Restore archived automation package', extra=extra)
package_data = self._api.post(
url=self._URL['restore'].format(
automation_id=automation_id, id=self.id
)
).json()
return AutomationPackage(api=self._api, **package_data)
@inplace_reload
def save(self, inplace=True):
"""
Saves all modification to the automation package on the server.
:param inplace Apply edits on the current instance or get a new one.
:return: AutomationPackage instance.
"""
modified_data = self._modified_data()
if modified_data:
extra = {
'resource': type(self).__name__,
'query': {
'id': self.id,
'modified_data': modified_data
}
}
logger.info('Saving automation package', extra=extra)
data = self._api.patch(url=self._URL['get'].format(id=self.id),
data=modified_data).json()
return AutomationPackage(api=self._api, **data)
else:
raise ResourceNotModified()
[docs]class AutomationMember(Resource):
"""
Central resource for managing automation members.
"""
_URL = {
'query': '/automation/automations/{automation_id}/members',
'get': '/automation/automations/{automation_id}/members/{id}',
}
href = HrefField(read_only=True)
id = StringField(read_only=True)
username = StringField(read_only=True)
email = StringField(read_only=True)
type = StringField(read_only=True)
name = StringField(read_only=True)
permissions = CompoundField(Permissions, read_only=False)
def __eq__(self, other):
if type(other) is not type(self):
return False
return self is other or self.username == other.username
def __str__(self):
return f'<AutomationMember: username={self.username}>'
[docs] @classmethod
def query(cls, automation=None, offset=None, limit=None, api=None):
"""
Query (List) apps.
:param automation: Automation id.
:param offset: Pagination offset.
:param limit: Pagination limit.
:param api: Api instance.
:return: collection object
"""
automation_id = Transform.to_automation(automation)
api = api or cls._API
return super()._query(
url=cls._URL['query'].format(automation_id=automation_id),
automation_id=automation_id,
offset=offset,
limit=limit,
api=api,
)
# noinspection PyMethodOverriding
[docs] @classmethod
def get(cls, id, automation, api=None):
"""
Fetches the resource from the server.
:param id: Automation member username
:param automation: Automation id or object
:param api: sevenbridges Api instance.
:return: AutomationMember object.
"""
username = Transform.to_resource(id)
automation = Transform.to_automation(automation)
api = api or cls._API
member = api.get(url=cls._URL['get'].format(
automation_id=automation,
id=username
)).json()
return AutomationMember(api=api, **member)
[docs] @classmethod
def add(cls, user, permissions, automation, api=None):
"""
Add a member to the automation.
:param user: Member username
:param permissions: Permissions dictionary.
:param automation: Automation object or id
:param api: sevenbridges Api instance
:return: Automation member object.
"""
user = Transform.to_user(user)
automation = Transform.to_automation(automation)
api = api or cls._API
data = {'username': user}
if isinstance(permissions, dict):
data.update({
'permissions': permissions
})
member_data = api.post(
url=cls._URL['query'].format(automation_id=automation),
data=data
).json()
return AutomationMember(api=api, **member_data)
[docs] @classmethod
def add_team(cls, team, permissions, automation, api=None):
"""
Add a team as a member to the automation.
:param team: Team object or team id
:param permissions: Permissions dictionary.
:param automation: Automation object or id
:param api: sevenbridges Api instance
:return: Automation member object.
"""
team = Transform.to_team(team)
automation = Transform.to_automation(automation)
api = api or cls._API
data = {'id': team,
'type': 'team'}
if isinstance(permissions, dict):
data.update({
'permissions': permissions
})
member_data = api.post(
url=cls._URL['query'].format(automation_id=automation),
data=data
).json()
return AutomationMember(api=api, **member_data)
[docs] @classmethod
def remove(cls, user, automation, api=None):
"""
Remove a member from the automation.
:param user: Member username
:param automation: Automation id
:param api: sevenbridges Api instance
:return: None
"""
user = Transform.to_user(user)
automation = Transform.to_automation(automation)
api = api or cls._API
api.delete(
cls._URL['get'].format(automation_id=automation, id=user)
)
[docs] @classmethod
def remove_team(cls, team, automation, api=None):
"""
Remove a team member from the automation.
:param team: Team object or team id
:param automation: Automation id
:param api: sevenbridges Api instance
:return: None
"""
team = Transform.to_team(team)
automation = Transform.to_automation(automation)
api = api or cls._API
api.delete(
cls._URL['get'].format(automation_id=automation, id=team)
)
@inplace_reload
def save(self, inplace=True):
"""
Saves modification to the api server.
"""
modified = self._modified_data()
if modified:
new_data = self.permissions.copy()
new_data.update(modified['permissions'])
data = {
'permissions': new_data
}
url = self.href
self._api.patch(url=url, data=data, append_base=False)
else:
raise ResourceNotModified()
[docs]class Automation(Resource):
"""
Central resource for managing automations.
"""
# noinspection PyProtectedMember
_URL = {
'query': '/automation/automations',
'get': '/automation/automations/{id}',
'member': AutomationMember._URL['get'],
'members': AutomationMember._URL['query'],
'packages': AutomationPackage._URL['query'],
'archive': '/automation/automations/{automation_id}/actions/archive',
'restore': '/automation/automations/{automation_id}/actions/restore'
}
href = HrefField(read_only=True)
id = UuidField(read_only=True)
name = StringField(read_only=False)
description = StringField(read_only=False)
billing_group = UuidField(read_only=False)
owner = StringField(read_only=True)
created_by = StringField(read_only=True)
created_on = DateTimeField(read_only=True)
modified_by = StringField(read_only=True)
modified_on = DateTimeField(read_only=False)
archived = BooleanField(read_only=True)
secret_settings = DictField(read_only=False)
memory_limit = IntegerField(read_only=False)
project_based = BooleanField(read_only=False)
def __eq__(self, other):
if type(other) is not type(self):
return False
return self is other or self.id == other.id
def __str__(self):
return f'<Automation: id={self.id} name={self.name}>'
[docs] @classmethod
def query(
cls, name=None, include_archived=False, project_based=None,
offset=None, limit=None, api=None
):
"""
Query (List) automations.
:param name: Automation name.
:param include_archived: Include archived automations also
:param project_based: Search project based automations
:param offset: Pagination offset.
:param limit: Pagination limit.
:param api: Api instance.
:return: collection object
"""
api = api or cls._API
return super()._query(
url=cls._URL['query'],
name=name,
include_archived=include_archived,
project_based=project_based,
offset=offset,
limit=limit,
api=api,
)
[docs] @classmethod
def create(cls, name, description=None, billing_group=None,
secret_settings=None, project_based=None, memory_limit=None,
api=None):
"""
Create a automation template.
:param name: Automation name.
:param billing_group: Automation billing group.
:param description: Automation description.
:param secret_settings: Automation settings.
:param project_based: Create project based automation template.
:param memory_limit: Memory limit in MB.
:param api: Api instance.
:return:
"""
api = api if api else cls._API
if name is None:
raise SbgError('Automation name is required!')
data = {
'name': name,
}
if billing_group:
data['billing_group'] = Transform.to_billing_group(billing_group)
if description:
data['description'] = description
if secret_settings:
data['secret_settings'] = secret_settings
if project_based:
data['project_based'] = project_based
if memory_limit:
data['memory_limit'] = memory_limit
extra = {
'resource': cls.__name__,
'query': data
}
logger.info('Creating automation template', extra=extra)
automation_data = api.post(url=cls._URL['query'], data=data).json()
return Automation(api=api, **automation_data)
@inplace_reload
def save(self, inplace=True):
"""
Saves all modification to the automation template on the server.
:param inplace Apply edits on the current instance or get a new one.
:return: Automation instance.
"""
modified_data = self._modified_data()
if modified_data:
extra = {
'resource': type(self).__name__,
'query': {
'id': self.id,
'modified_data': modified_data
}
}
logger.info('Saving automation template', extra=extra)
data = self._api.patch(url=self._URL['get'].format(id=self.id),
data=modified_data).json()
return Automation(api=self._api, **data)
else:
raise ResourceNotModified()
@inplace_reload
def archive(self):
"""
Archive automation
:return: Automation instance.
"""
extra = {
'resource': type(self).__name__,
'query': {
'id': self.id,
}
}
logger.info('Archive automation', extra=extra)
automation_data = self._api.post(
url=self._URL['archive'].format(automation_id=self.id)
).json()
return Automation(api=self._api, **automation_data)
@inplace_reload
def restore(self):
"""
Restore archived automation
:return: Automation instance.
"""
extra = {
'resource': type(self).__name__,
'query': {
'id': self.id,
}
}
logger.info('Restore archived automation', extra=extra)
automation_data = self._api.post(
url=self._URL['restore'].format(automation_id=self.id)
).json()
return Automation(api=self._api, **automation_data)
[docs] def get_packages(self, offset=None, limit=None, api=None):
"""
Return list of packages that belong to this automation
:param offset: Pagination offset.
:param limit: Pagination limit.
:param api: sevenbridges Api instance.
:return: AutomationPackage collection
"""
api = api or self._API
return AutomationPackage.query(
automation=self.id, offset=offset, limit=limit, api=api
)
[docs] @classmethod
def get_package(cls, package, api=None):
"""
Return specified automation member
:param package: Automation Package Id
:param api: sevenbridges Api instance.
:return: AutomationMember object
"""
package_id = Transform.to_automation_package(package)
api = api or cls._API
return AutomationPackage.get(
id=package_id, api=api
)
[docs] def add_package(
self, version, file_path, schema, file_name=None,
python=None, retry_count=RequestParameters.DEFAULT_RETRY_COUNT,
timeout=RequestParameters.DEFAULT_TIMEOUT, part_size=None,
api=None
):
"""
Add a code package to automation template.
:param version: The code package version.
:param file_path: Path to the code package file to be uploaded.
:param schema: IO schema for main step of execution.
:param part_size: Size of upload part in bytes.
:param file_name: Optional file name.
:param python: Version of Python to execute Code Package with. Allowed
values are '3.6', '3.7', '3.8'. Default is '3.6'.
:param retry_count: Upload retry count.
:param timeout: Timeout for s3/google session.
:param api: sevenbridges Api instance.
:return: AutomationPackage
"""
api = api or self._API
if version is None:
raise SbgError('Code package version is required!')
if file_path is None:
raise SbgError('Code package file path is required!')
# Multipart upload the code package:
upload = CodePackageUpload(
file_path,
self.id,
api=api,
part_size=part_size,
file_name=file_name,
retry_count=retry_count,
timeout=timeout
)
upload.start()
upload.wait()
package_file = upload.result()
# Create the automation package:
return AutomationPackage.create(
self.id,
version=version,
location=package_file.id,
schema=schema,
python=python,
api=api
)
[docs] def get_member(self, username, api=None):
"""
Return specified automation member
:param username: Member username
:param api: sevenbridges Api instance.
:return: AutomationMember object
"""
member = Transform.to_automation_member(username)
api = api or self._API
return AutomationMember.get(
id=member, automation=self.id, api=api
)
[docs] def get_members(self, offset=None, limit=None, api=None):
"""
Return list of automation members
:param offset: Pagination offset.
:param limit: Pagination limit.
:param api: sevenbridges Api instance.
:return: AutomationMember collection
"""
api = api or self._API
return AutomationMember.query(
automation=self.id, offset=offset, limit=limit, api=api
)
[docs] def add_member(self, user, permissions, api=None):
"""
Add member to the automation
:param user: Member username
:param permissions: Member permissions
:param api: sevenbridges Api instance
:return: AutomationMember object
"""
api = api or self._API
return AutomationMember.add(
automation=self.id, user=user, permissions=permissions, api=api
)
[docs] def remove_member(self, user, api=None):
"""
Remove a member from the automation
:param user: Member username
:param api: sevenbridges Api instance
:return: None
"""
api = api or self._API
AutomationMember.remove(automation=self.id, user=user, api=api)
[docs] def add_team_member(self, team, permissions, api=None):
"""
Add team member to the automation
:param team: Team object or team id
:param permissions: Member permissions
:param api: sevenbridges Api instance
:return: AutomationMember object
"""
api = api or self._API
return AutomationMember.add_team(
automation=self.id, team=team, permissions=permissions, api=api
)
[docs] def remove_team_member(self, team, api=None):
"""
Remove a team member from the automation
:param team: Team object or team id
:param api: sevenbridges Api instance
:return: None
"""
api = api or self._API
AutomationMember.remove_team(automation=self.id, team=team, api=api)
[docs] def get_runs(self, package=None, status=None, name=None,
created_by=None, created_from=None, created_to=None,
project_id=None, order_by=None, order=None, offset=None,
limit=None, api=None):
"""
Query automation runs that belong to this automation
:param package: Package id
:param status: Run status
:param name: Automation run name
:param created_by: Username of member that created the run
:param created_from: Date the run was created after
:param created_to: Date the run was created before
:param project_id: Search runs by project id, if run is project based
:param order_by: Property by which to order results
:param order: Ascending or Descending ("asc" or "desc")
:param offset: Pagination offset.
:param limit: Pagination limit.
:param api: sevenbridges Api instance
:return: AutomationRun collection
"""
api = api or self._API
return AutomationRun.query(
automation=self.id, package=package, status=status, name=name,
created_by=created_by, created_from=created_from,
created_to=created_to, project_id=project_id, order_by=order_by,
order=order, offset=offset, limit=limit, api=api
)
[docs]class AutomationRun(Resource):
"""
Central resource for managing automation runs.
"""
_URL = {
'query': '/automation/runs',
'get': '/automation/runs/{id}',
'actions': '/automation/runs/{id}/actions/{action}',
'state': '/automation/runs/{id}/state',
}
href = HrefField(read_only=True)
id = StringField(read_only=True)
name = StringField(read_only=False)
automation = CompoundField(Automation, read_only=True)
package = CompoundField(AutomationPackage, read_only=True)
inputs = DictField(read_only=False)
outputs = DictField(read_only=True)
settings = DictField(read_only=False)
created_on = DateTimeField(read_only=True)
start_time = DateTimeField(read_only=True)
end_time = DateTimeField(read_only=True)
resumed_from = StringField(read_only=True)
created_by = StringField(read_only=True)
status = StringField(read_only=True)
message = StringField(read_only=True)
execution_details = DictField(read_only=True)
memory_limit = IntegerField(read_only=False)
project_id = StringField(read_only=True)
def __eq__(self, other):
if type(other) is not type(self):
return False
return self is other or self.id == other.id
def __str__(self):
return f'<AutomationRun: id={self.id}>'
[docs] @classmethod
def query(cls, automation=None, package=None, status=None, name=None,
created_by=None, created_from=None, created_to=None,
project_id=None, order_by=None, order=None, offset=None,
limit=None, api=None):
"""
Query (List) automation runs.
:param name: Automation run name
:param automation: Automation template
:param package: Package
:param status: Run status
:param created_by: Username of user that created the run
:param order_by: Property by which to order results
:param order: Ascending or descending ("asc" or "desc")
:param created_from: Date the run is created after
:param created_to: Date the run is created before
:param project_id: Id of project if Automation run is project based
:param offset: Pagination offset.
:param limit: Pagination limit.
:param api: Api instance.
:return: collection object
"""
if automation:
automation = Transform.to_automation(automation)
if package:
package = Transform.to_automation_package(package)
api = api or cls._API
return super()._query(
url=cls._URL['query'],
name=name,
automation_id=automation,
package_id=package,
status=status,
created_by=created_by,
created_from=created_from,
created_to=created_to,
project_id=project_id,
order_by=order_by,
order=order,
offset=offset,
limit=limit,
api=api,
)
[docs] @classmethod
def create(cls, package, inputs=None, settings=None, resume_from=None,
name=None, secret_settings=None, memory_limit=None, api=None):
"""
Create and start a new run.
:param package: Automation package id
:param inputs: Input dictionary
:param settings: Settings override dictionary
:param resume_from: Run to resume from
:param name: Automation run name
:param secret_settings: dict to override secret_settings from
automation template
:param memory_limit: Memory limit in MB.
:param api: sevenbridges Api instance
:return: AutomationRun object
"""
package = Transform.to_automation_package(package)
data = {'package': package}
if inputs:
data['inputs'] = inputs
else:
data['inputs'] = dict()
if settings:
data['settings'] = settings
if resume_from:
data['resume_from'] = resume_from
if name:
data['name'] = name
if secret_settings:
data['secret_settings'] = secret_settings
if memory_limit:
data['memory_limit'] = memory_limit
api = api or cls._API
automation_run = api.post(
url=cls._URL['query'],
data=data,
).json()
return AutomationRun(api=api, **automation_run)
@inplace_reload
def save(self, inplace=True):
"""
Saves all modification to the automation run on the server.
:param inplace Apply edits on the current instance or get a new one.
:return: Automation run instance.
"""
modified_data = self._modified_data()
if modified_data:
extra = {
'resource': type(self).__name__,
'query': {
'id': self.id,
'modified_data': modified_data
}
}
logger.info('Saving automation run', extra=extra)
data = self._api.patch(url=self._URL['get'].format(id=self.id),
data=modified_data).json()
return AutomationRun(api=self._api, **data)
else:
raise ResourceNotModified()
[docs] @classmethod
def rerun(
cls, id, package=None, inputs=None, settings=None, resume_from=None,
name=None, secret_settings=None, merge=True, api=None
):
"""
Create and start rerun of existing automation.
:param id: Automation id to rerun
:param package: Automation package id
:param inputs: Input dictionary
:param settings: Settings override dictionary
:param resume_from: Run to resume from
:param name: Automation run name
:param secret_settings: dict to override secret_settings from
automation template
:param merge: merge settings and inputs of run
:param api: sevenbridges Api instance
:return: AutomationRun object
"""
data = {'merge': merge}
if package:
data['package'] = package
if inputs:
data['inputs'] = inputs
if settings:
data['settings'] = settings
if resume_from:
data['resume_from'] = resume_from
if name:
data['name'] = name
if secret_settings:
data['secret_settings'] = secret_settings
api = api or cls._API
automation_run = api.post(
url=cls._URL['actions'].format(
id=id, action=AutomationRunActions.RERUN
)
).json()
return AutomationRun(api=api, **automation_run)
[docs] def stop(self, api=None):
"""
Stop automation run.
:param api: sevenbridges Api instance.
:return: AutomationRun object
"""
api = api or self._API
return api.post(
url=self._URL['actions'].format(
id=self.id, action=AutomationRunActions.STOP
)
).content
[docs] def get_log_file(self, api=None):
"""
Retrieve automation run log.
:param api: sevenbridges Api instance
:return: Log string
"""
api = api or self._API
log_file_data = self.execution_details.get('log_file')
return File(api=api, **log_file_data) if log_file_data else None
[docs] def get_state(self, api=None):
"""
Retrieve automation run state.
:param api: sevenbridges Api instance
:return: State file json contents as string
"""
api = api or self._API
return api.get(self._URL['state'].format(id=self.id)).json()