Source code for sevenbridges.models.volume

import logging

from sevenbridges.decorators import inplace_reload
from sevenbridges.errors import ResourceNotModified
from sevenbridges.meta.collection import Collection, VolumeCollection
from sevenbridges.meta.fields import (
    HrefField, StringField, CompoundField, DateTimeField, BooleanField
)
from sevenbridges.meta.resource import Resource
from sevenbridges.meta.transformer import Transform
from sevenbridges.models.compound.volumes.service import VolumeService
from sevenbridges.models.compound.volumes.volume_object import VolumeObject
from sevenbridges.models.compound.volumes.volume_prefix import VolumePrefix
from sevenbridges.models.enums import VolumeType
from sevenbridges.models.link import Link, VolumeLink
from sevenbridges.models.member import Member

logger = logging.getLogger(__name__)


# noinspection PyProtectedMember
[docs]class Volume(Resource): """ Central resource for managing volumes. """ _URL = { 'query': '/storage/volumes', 'get': '/storage/volumes/{id}', 'delete': '/storage/volumes/{id}', 'list': '/storage/volumes/{id}/list', 'object': '/storage/volumes/{id}/object', 'member': '/storage/volumes/{id}/members/{username}', 'members_query': '/storage/volumes/{id}/members', } href = HrefField(read_only=True) id = StringField(read_only=True) name = StringField(read_only=False) description = StringField(read_only=False) access_mode = StringField(read_only=False) service = CompoundField(VolumeService, read_only=False) created_on = DateTimeField(read_only=True) modified_on = DateTimeField(read_only=True) active = BooleanField(read_only=False) def __eq__(self, other): if type(other) is not type(self): return False return self is other or self.id == other.id def __str__(self): return f'<Volume: id={self.id}>'
[docs] @classmethod def query(cls, offset=None, limit=None, api=None): """ Query (List) volumes. :param offset: Pagination offset. :param limit: Pagination limit. :param api: Api instance. :return: Collection object. """ api = api or cls._API return super()._query( url=cls._URL['query'], offset=offset, limit=limit, fields='_all', api=api )
[docs] @classmethod def create_s3_volume(cls, name, bucket, access_key_id, secret_access_key, access_mode, description=None, prefix=None, properties=None, api=None): """ Create s3 volume. :param name: Volume name. :param bucket: Referenced bucket. :param access_key_id: Amazon access key identifier. :param secret_access_key: Amazon secret access key. :param access_mode: Access Mode. :param description: Volume description. :param prefix: Volume prefix. :param properties: Volume properties. :param api: Api instance. :return: Volume object. """ service = {'type': VolumeType.S3, 'bucket': bucket, 'credentials': {'access_key_id': access_key_id, 'secret_access_key': secret_access_key } } if prefix: service['prefix'] = prefix if properties: service['properties'] = properties data = {'name': name, 'service': service, 'access_mode': access_mode } if description: data['description'] = description api = api or cls._API extra = { 'resource': cls.__name__, 'query': data } logger.info('Creating s3 volume', extra=extra) response = api.post(url=cls._URL['query'], data=data).json() return Volume(api=api, **response)
[docs] @classmethod def create_s3_volume_role_auth(cls, name, bucket, role_arn, external_id, access_mode, description=None, prefix=None, properties=None, api=None): """ Create s3 volume using IAM Role auth. :param name: Volume name. :param bucket: Referenced bucket. :param role_arn: Amazon role ARN. :param external_id: Amazon role external id. :param access_mode: Access Mode. :param description: Volume description. :param prefix: Volume prefix. :param properties: Volume properties. :param api: Api instance. :return: Volume object. """ service = {'type': VolumeType.S3, 'bucket': bucket, 'credentials': {'role_arn': role_arn, 'external_id': external_id } } if prefix: service['prefix'] = prefix if properties: service['properties'] = properties data = {'name': name, 'service': service, 'access_mode': access_mode } if description: data['description'] = description api = api or cls._API extra = { 'resource': cls.__name__, 'query': data } logger.info('Creating s3 volume using role auth', extra=extra) response = api.post(url=cls._URL['query'], data=data).json() return Volume(api=api, **response)
[docs] @classmethod def create_google_volume(cls, name, bucket, client_email, private_key, access_mode, description=None, prefix=None, properties=None, api=None): """ Create google volume. :param name: Volume name. :param bucket: Referenced bucket. :param client_email: Google client email. :param private_key: Google client private key. :param access_mode: Access Mode. :param description: Volume description. :param prefix: Volume prefix. :param properties: Volume properties. :param api: Api instance. :return: Volume object. """ service = {'type': VolumeType.GOOGLE, 'bucket': bucket, 'credentials': {'client_email': client_email, 'private_key': private_key } } if prefix: service['prefix'] = prefix if properties: service['properties'] = properties data = {'name': name, 'service': service, 'access_mode': access_mode } if description: data['description'] = description api = api or cls._API extra = { 'resource': cls.__name__, 'query': data } logger.info('Creating google volume', extra=extra) response = api.post(url=cls._URL['query'], data=data).json() return Volume(api=api, **response)
[docs] @classmethod def create_google_iam_volume(cls, name, bucket, configuration, access_mode, description=None, prefix=None, properties=None, api=None): """ Create google volume. :param name: Volume name. :param bucket: Referenced bucket. :param configuration: Google configuration. :param access_mode: Access Mode. :param description: Volume description. :param prefix: Volume prefix. :param properties: Volume properties. :param api: Api instance. :return: Volume object. """ service = {'type': VolumeType.GOOGLE, 'bucket': bucket, 'credentials': {'configuration': configuration} } if prefix: service['prefix'] = prefix if properties: service['properties'] = properties data = {'name': name, 'service': service, 'access_mode': access_mode } if description: data['description'] = description api = api or cls._API extra = { 'resource': cls.__name__, 'query': data } logger.info('Creating google volume', extra=extra) response = api.post(url=cls._URL['query'], data=data).json() return Volume(api=api, **response)
[docs] @classmethod def create_oss_volume(cls, name, bucket, endpoint, access_key_id, secret_access_key, access_mode, description=None, prefix=None, properties=None, api=None): """ Create oss volume. :param name: Volume name. :param bucket: Referenced bucket. :param access_key_id: Access key identifier. :param secret_access_key: Secret access key. :param access_mode: Access Mode. :param endpoint: Volume Endpoint. :param description: Volume description. :param prefix: Volume prefix. :param properties: Volume properties. :param api: Api instance. :return: Volume object. """ service = { 'type': VolumeType.OSS, 'bucket': bucket, 'endpoint': endpoint, 'credentials': { 'access_key_id': access_key_id, 'secret_access_key': secret_access_key } } if prefix: service['prefix'] = prefix if properties: service['properties'] = properties data = { 'name': name, 'service': service, 'access_mode': access_mode } if description: data['description'] = description api = api or cls._API extra = { 'resource': cls.__name__, 'query': data } logger.info('Creating oss volume', extra=extra) response = api.post(url=cls._URL['query'], data=data).json() return Volume(api=api, **response)
@inplace_reload def save(self, inplace=True): """ Saves all modification to the volume on the server. """ modified_data = self._modified_data() if modified_data: extra = { 'resource': type(self).__name__, 'query': { 'id': self.id, 'modified_data': modified_data } } logger.info('Saving volume', extra=extra) data = self._api.patch(url=self._URL['get'].format(id=self.id), data=modified_data).json() volume = Volume(api=self._api, **data) return volume else: raise ResourceNotModified()
[docs] def list(self, prefix=None, limit=None, fields='_all'): params = {} if prefix: params['prefix'] = prefix if limit: params['limit'] = limit params['fields'] = fields data = self._api.get( url=self._URL['list'].format(id=self.id), params=params ).json() href = data['href'] links = [VolumeLink(**link) for link in data['links']] objects = [ VolumeObject(api=self._api, **item) for item in data['items'] ] prefixes = [ VolumePrefix(api=self._api, **prefix) for prefix in # noqa: F812 data['prefixes'] ] return VolumeCollection( href=href, items=objects, links=links, prefixes=prefixes, api=self._api )
[docs] def get_volume_object_info(self, location): """ Fetches information about single volume object - usually file :param location: object location :return: """ param = {'location': location} data = self._api.get(url=self._URL['object'].format( id=self.id), params=param).json() return VolumeObject(api=self._api, **data)
[docs] def get_imports(self, project=None, state=None, offset=None, limit=None): """ Fetches imports for this volume. :param project: Optional project identifier. :param state: Optional state. :param offset: Pagination offset. :param limit: Pagination limit. :return: Collection object. """ return self._api.imports.query(volume=self, project=project, state=state, offset=offset, limit=limit)
[docs] def get_exports(self, state=None, offset=None, limit=None): """ Fetches exports for this volume. :param state: Optional state. :param offset: Pagination offset. :param limit: Pagination limit. :return: Collection object. """ return self._api.exports.query(volume=self, state=state, offset=offset, limit=limit )
[docs] def get_members(self, offset=None, limit=None): """ Retrieves volume members. :param offset: Pagination offset. :param limit: Pagination limit. :return: Collection object. """ extra = { 'resource': type(self).__name__, 'query': {'id': self.id} } logger.info('Get volume members', extra=extra) response = self._api.get( url=self._URL['members_query'].format(id=self.id), params={'offset': offset, 'limit': limit}) data = response.json() total = response.headers['x-total-matching-query'] members = [Member(api=self._api, **member) for member in data['items']] links = [Link(**link) for link in data['links']] href = data['href'] return Collection(resource=Member, href=href, total=total, items=members, links=links, api=self._api)
[docs] def add_member(self, user, permissions): """ Add a member to the volume. :param user: Member username :param permissions: Permissions dictionary. :return: Member object. """ user = Transform.to_user(user) data = {'username': user, 'type': 'USER'} if 'execute' in permissions: permissions.pop('execute') if isinstance(permissions, dict): data.update({ 'permissions': permissions }) extra = { 'resource': type(self).__name__, 'query': { 'id': self.id, 'data': data, } } logger.info('Adding volume member', extra=extra) response = self._api.post( url=self._URL['members_query'].format(id=self.id), data=data) member_data = response.json() return Member(api=self._api, **member_data)
[docs] def add_member_team(self, team, permissions): """ Add a member (team) to a volume. :param team: Team object or team identifier. :param permissions: Permissions dictionary. :return: Member object. """ team = Transform.to_team(team) data = {'username': team, 'type': 'TEAM'} if 'execute' in permissions: permissions.pop('execute') if isinstance(permissions, dict): data.update({ 'permissions': permissions }) extra = { 'resource': type(self).__name__, 'query': { 'id': self.id, 'data': data, } } logger.info('Adding volume team member', extra=extra) response = self._api.post( url=self._URL['members_query'].format(id=self.id), data=data) member_data = response.json() return Member(api=self._api, **member_data)
[docs] def add_member_division(self, division, permissions): """ Add a member (team) to a volume. :param division: Division object or division identifier. :param permissions: Permissions dictionary. :return: Member object. """ division = Transform.to_division(division) if 'execute' in permissions: permissions.pop('execute') data = {'username': division, 'type': 'DIVISION'} if isinstance(permissions, dict): data.update({ 'permissions': permissions }) extra = { 'resource': type(self).__name__, 'query': { 'id': self.id, 'data': data, } } logger.info('Adding volume division member', extra=extra) response = self._api.post( url=self._URL['members_query'].format(id=self.id), data=data) member_data = response.json() return Member(api=self._api, **member_data)
[docs] def get_member(self, username, api=None): """ Fetches information about a single volume member :param username: Member name :param api: Api instance :return: Member object """ api = api if api else self._API response = api.get( url=self._URL['member'].format(id=self.id, username=username), ) data = response.json() return Member(api=api, **data)
[docs] def remove_member(self, user): """ Remove member from the volume. :param user: User to be removed. """ username = Transform.to_user(user) extra = { 'resource': type(self).__name__, 'query': { 'id': self.id, 'user': user, } } logger.info('Removing volume member', extra=extra) self._api.delete( url=self._URL['member'].format(id=self.id, username=username) )