Source code for magpie.api.management.group.group_utils

from magpie.services import SERVICE_TYPE_DICT
from magpie.api import exception as ax, schemas as s
from magpie.api.management.resource.resource_utils import check_valid_service_or_resource_permission
from magpie.api.management.resource.resource_formats import format_resource
from magpie.api.management.service.service_formats import format_service_resources, format_service
from magpie.api.management.group.group_formats import format_group
from magpie.definitions.ziggurat_definitions import GroupService, GroupResourcePermissionService, ResourceService
from magpie.definitions.pyramid_definitions import (
    HTTPOk,
    HTTPCreated,
    HTTPForbidden,
    HTTPNotFound,
    HTTPConflict,
    HTTPInternalServerError
)
from magpie import models
from magpie.permissions import format_permissions, convert_permission
from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from magpie.definitions.pyramid_definitions import HTTPException  # noqa: F401
    from magpie.definitions.sqlalchemy_definitions import Session  # noqa: F401
    from magpie.definitions.typedefs import Str, Iterable, List, Optional, JSON, ServiceOrResourceType  # noqa: F401
    from magpie.permissions import Permission  # noqa: F401


[docs]def get_all_group_names(db_session): # type: (Session) -> List[Str] """ Get all existing group names from the database. """ group_names = ax.evaluate_call( lambda: [grp.group_name for grp in GroupService.all(models.Group, db_session=db_session)], httpError=HTTPForbidden, msgOnFail=s.Groups_GET_ForbiddenResponseSchema.description) return group_names
[docs]def get_group_resources(group, db_session): # type: (models.Group, Session) -> JSON """ Get formatted JSON body describing all service resources the ``group`` as permissions on. """ json_response = {} for svc in list(ResourceService.all(models.Service, db_session=db_session)): svc_perms = get_group_service_permissions(group=group, service=svc, db_session=db_session) svc_name = str(svc.resource_name) svc_type = str(svc.type) if svc_type not in json_response: json_response[svc_type] = {} res_perm_dict = get_group_service_resources_permissions_dict(group=group, service=svc, db_session=db_session) json_response[svc_type][svc_name] = format_service_resources( svc, db_session=db_session, service_perms=svc_perms, resources_perms_dict=res_perm_dict, show_all_children=False, show_private_url=False, ) return json_response
[docs]def create_group(group_name, db_session): # type: (Str, Session) -> HTTPException """ Creates a group if it is permitted and not conflicting. :returns: valid HTTP response on successful operations. :raises HTTPException: error HTTP response of corresponding situation. """ group = GroupService.by_group_name(group_name, db_session=db_session) group_content_error = {u"group_name": str(group_name)} ax.verify_param(group, isNone=True, httpError=HTTPConflict, withParam=False, msgOnFail=s.Groups_POST_ConflictResponseSchema.description, content=group_content_error) # noinspection PyArgumentList new_group = ax.evaluate_call(lambda: models.Group(group_name=group_name), fallback=lambda: db_session.rollback(), httpError=HTTPForbidden, content=group_content_error, msgOnFail=s.Groups_POST_ForbiddenCreateResponseSchema.description) ax.evaluate_call(lambda: db_session.add(new_group), fallback=lambda: db_session.rollback(), httpError=HTTPForbidden, content=group_content_error, msgOnFail=s.Groups_POST_ForbiddenAddResponseSchema.description) return ax.valid_http(httpSuccess=HTTPCreated, detail=s.Groups_POST_CreatedResponseSchema.description, content={u"group": format_group(new_group, basic_info=True)})
[docs]def create_group_resource_permission_response(group, resource, permission, db_session): # type: (models.Group, ServiceOrResourceType, Permission, Session) -> HTTPException """ Creates a permission on a group/resource combination if it is permitted and not conflicting. :returns: valid HTTP response on successful operations. :raises HTTPException: error HTTP response of corresponding situation. """ resource_id = resource.resource_id check_valid_service_or_resource_permission(permission.value, resource, db_session) perm_content = {u"permission_name": str(permission.value), u"resource": format_resource(resource, basic_info=True), u"group": format_group(group, basic_info=True)} existing_perm = ax.evaluate_call( lambda: GroupResourcePermissionService.get(group.id, resource_id, permission.value, db_session=db_session), fallback=lambda: db_session.rollback(), httpError=HTTPForbidden, msgOnFail=s.GroupResourcePermissions_POST_ForbiddenGetResponseSchema.description, content=perm_content ) ax.verify_param(existing_perm, isNone=True, httpError=HTTPConflict, msgOnFail=s.GroupResourcePermissions_POST_ConflictResponseSchema.description, content=perm_content) # noinspection PyArgumentList new_perm = ax.evaluate_call( lambda: models.GroupResourcePermission(resource_id=resource_id, group_id=group.id, perm_name=permission.value), fallback=lambda: db_session.rollback(), httpError=HTTPForbidden, content=perm_content, msgOnFail=s.GroupResourcePermissions_POST_ForbiddenCreateResponseSchema.description) ax.evaluate_call(lambda: db_session.add(new_perm), fallback=lambda: db_session.rollback(), httpError=HTTPForbidden, content=perm_content, msgOnFail=s.GroupResourcePermissions_POST_ForbiddenAddResponseSchema.description) return ax.valid_http(httpSuccess=HTTPCreated, content=perm_content, detail=s.GroupResourcePermissions_POST_CreatedResponseSchema.description)
[docs]def get_group_resources_permissions_dict(group, db_session, resource_ids=None, resource_types=None): # type: (models.Group, Session, Optional[Iterable[int]], Optional[Iterable[Str]]) -> JSON """ Get a dictionary of resources and corresponding permissions that a group has on the resources. Filter search by ``resource_ids`` and/or ``resource_types`` if specified. """ def get_grp_res_perm(grp, db, res_ids, res_types): res_perms_tup = GroupService.resources_with_possible_perms( grp, resource_ids=res_ids, resource_types=res_types, db_session=db) res_perms_dict = {} for res_perm in res_perms_tup: if res_perm.resource.resource_id not in res_perms_dict: res_perms_dict[res_perm.resource.resource_id] = [res_perm.perm_name] else: res_perms_dict[res_perm.resource.resource_id].append(res_perm.perm_name) return res_perms_dict return ax.evaluate_call(lambda: get_grp_res_perm(group, db_session, resource_ids, resource_types), fallback=lambda: db_session.rollback(), httpError=HTTPInternalServerError, msgOnFail=s.GroupResourcesPermissions_InternalServerErrorResponseSchema.description, content={u"group": repr(group), u"resource_ids": repr(resource_ids), u"resource_types": repr(resource_types)})
[docs]def get_group_resource_permissions_response(group, resource, db_session): # type: (models.Group, models.Resource, Session) -> HTTPException """ Get validated response with group resource permissions as content. :returns: valid HTTP response on successful operations. :raises HTTPException: error HTTP response of corresponding situation. """ def get_grp_res_perms(grp, res, db): if res.owner_group_id == grp.id: return models.RESOURCE_TYPE_DICT[res.type].permissions perms = db.query(models.GroupResourcePermission) \ .filter(models.GroupResourcePermission.resource_id == res.resource_id) \ .filter(models.GroupResourcePermission.group_id == grp.id) return [convert_permission(p) for p in perms] group_perm_names = ax.evaluate_call( lambda: format_permissions(get_grp_res_perms(group, resource, db_session)), httpError=HTTPInternalServerError, msgOnFail=s.GroupResourcePermissions_InternalServerErrorResponseSchema.description, content={u"group": repr(group), u"resource": repr(resource)}) return ax.valid_http(httpSuccess=HTTPOk, detail=s.GroupResourcePermissions_GET_OkResponseSchema.description, content={u"permission_names": group_perm_names})
[docs]def delete_group_resource_permission_response(group, resource, permission, db_session): # type: (models.Group, ServiceOrResourceType, Permission, Session) -> HTTPException """ Get validated response on deleted group resource permission. :returns: valid HTTP response on successful operations. :raises HTTPException: error HTTP response of corresponding situation. """ resource_id = resource.resource_id check_valid_service_or_resource_permission(permission.value, resource, db_session) perm_content = {u"permission_name": permission.value, u"resource": format_resource(resource, basic_info=True), u"group": format_group(group, basic_info=True)} del_perm = ax.evaluate_call( lambda: GroupResourcePermissionService.get(group.id, resource_id, permission.value, db_session=db_session), fallback=lambda: db_session.rollback(), httpError=HTTPForbidden, msgOnFail=s.GroupServicePermission_DELETE_ForbiddenGetResponseSchema.description, content=perm_content ) ax.verify_param(del_perm, notNone=True, httpError=HTTPNotFound, content=perm_content, msgOnFail=s.GroupServicePermission_DELETE_NotFoundResponseSchema.description) ax.evaluate_call(lambda: db_session.delete(del_perm), fallback=lambda: db_session.rollback(), httpError=HTTPForbidden, content=perm_content, msgOnFail=s.GroupServicePermission_DELETE_ForbiddenResponseSchema.description) return ax.valid_http(httpSuccess=HTTPOk, detail=s.GroupServicePermission_DELETE_OkResponseSchema.description)
[docs]def get_group_services(resources_permissions_dict, db_session): # type: (JSON, Session) -> JSON """ Nest and regroup the resource permissions under corresponding root service types. """ grp_svc_dict = {} for res_id, perms in resources_permissions_dict.items(): svc = ResourceService.by_resource_id(resource_id=res_id, db_session=db_session) svc_type = str(svc.type) svc_name = str(svc.resource_name) if svc_type not in grp_svc_dict: grp_svc_dict[svc_type] = {} grp_svc_dict[svc_type][svc_name] = format_service(svc, perms, show_private_url=False) return grp_svc_dict
[docs]def get_group_services_response(group, db_session): # type: (models.Group, Session) -> HTTPException """ Get validated response of services the group has permissions on. :returns: valid HTTP response on successful operations. :raises HTTPException: error HTTP response of corresponding situation. """ res_perm_dict = get_group_resources_permissions_dict(group, resource_types=[models.Service.resource_type_name], db_session=db_session) grp_svc_json = ax.evaluate_call(lambda: get_group_services(res_perm_dict, db_session), httpError=HTTPInternalServerError, msgOnFail=s.GroupServices_InternalServerErrorResponseSchema.description, content={u"group": format_group(group, basic_info=True)}) return ax.valid_http(httpSuccess=HTTPOk, detail=s.GroupServices_GET_OkResponseSchema.description, content={u"services": grp_svc_json})
[docs]def get_group_service_permissions(group, service, db_session): # type: (models.Group, models.Service, Session) -> List[Permission] """ Get all permissions the group has on a specific service. """ def get_grp_svc_perms(grp, svc, db): if svc.owner_group_id == grp.id: return SERVICE_TYPE_DICT[svc.type].permissions perms = db.query(models.GroupResourcePermission) \ .filter(models.GroupResourcePermission.resource_id == svc.resource_id) \ .filter(models.GroupResourcePermission.group_id == grp.id) return [convert_permission(p) for p in perms] return ax.evaluate_call(lambda: get_grp_svc_perms(group, service, db_session), httpError=HTTPInternalServerError, msgOnFail="Failed to obtain group service permissions", content={u"group": repr(group), u"service": repr(service)})
[docs]def get_group_service_permissions_response(group, service, db_session): # type: (models.Group, models.Service, Session) -> HTTPException """ Get validated response of found group service permissions. :returns: valid HTTP response on successful operations. :raises HTTPException: error HTTP response of corresponding situation. """ svc_perms_found = ax.evaluate_call( lambda: format_permissions(get_group_service_permissions(group, service, db_session)), httpError=HTTPInternalServerError, msgOnFail=s.GroupServicePermissions_GET_InternalServerErrorResponseSchema.description, content={u"group": format_group(group, basic_info=True), u"service": format_service(service)}) return ax.valid_http(httpSuccess=HTTPOk, detail=s.GroupServicePermissions_GET_OkResponseSchema.description, content={u"permission_names": svc_perms_found})
[docs]def get_group_service_resources_permissions_dict(group, service, db_session): # type: (models.Group, models.Service, Session) -> JSON """ Get all permissions the group has on a specific service's children resources. """ res_id = service.resource_id res_under_svc = models.resource_tree_service.from_parent_deeper(parent_id=res_id, db_session=db_session) res_ids = [resource.Resource.resource_id for resource in res_under_svc] return get_group_resources_permissions_dict(group, db_session, resource_types=None, resource_ids=res_ids)
[docs]def get_group_service_resources_response(group, service, db_session): # type: (models.Group, models.Service, Session) -> HTTPException """ Get validated response of all found service resources which the group has permissions on. :returns: valid HTTP response on successful operations. :raises HTTPException: error HTTP response of corresponding situation. """ svc_perms = get_group_service_permissions(group=group, service=service, db_session=db_session) res_perms = get_group_service_resources_permissions_dict(group=group, service=service, db_session=db_session) svc_res_json = format_service_resources( service=service, db_session=db_session, service_perms=svc_perms, resources_perms_dict=res_perms, show_all_children=False, show_private_url=False, ) return ax.valid_http(httpSuccess=HTTPOk, detail=s.GroupServiceResources_GET_OkResponseSchema.description, content={u"service": svc_res_json})