from typing import TYPE_CHECKING
from pyramid.httpexceptions import (
HTTPBadRequest,
HTTPConflict,
HTTPCreated,
HTTPForbidden,
HTTPInternalServerError,
HTTPNotFound,
HTTPOk
)
from pyramid.settings import asbool
from ziggurat_foundations.models.services.group import GroupService
from ziggurat_foundations.models.services.group_resource_permission import GroupResourcePermissionService
from ziggurat_foundations.models.services.resource import ResourceService
from magpie import models
from magpie.api import exception as ax
from magpie.api import schemas as s
from magpie.api.management.group.group_formats import format_group
from magpie.api.management.resource.resource_formats import format_resource
from magpie.api.management.resource.resource_utils import check_valid_service_or_resource_permission
from magpie.api.management.service import service_formats as sf
from magpie.api.webhooks import WebhookAction, get_permission_update_params, process_webhook_requests
from magpie.constants import network_enabled, protected_group_name_regex
from magpie.permissions import PermissionSet, PermissionType, format_permissions
from magpie.services import SERVICE_TYPE_DICT
if TYPE_CHECKING:
# pylint: disable=W0611,unused-import
from typing import Iterable, List, Optional
from pyramid.httpexceptions import HTTPException
from sqlalchemy.orm.session import Session
from magpie.typedefs import JSON, ResourcePermissionMap, ServiceOrResourceType, Str
[docs]
def get_all_group_names(db_session):
# type: (Session) -> List[Str]
"""
Get all existing group names from the database.
"""
group_names = ax.evaluate_call(
lambda: [grp.group_name for grp in GroupService.all(models.Group, db_session=db_session)],
http_error=HTTPForbidden, msg_on_fail=s.Groups_GET_ForbiddenResponseSchema.description)
return group_names
[docs]
def get_group_resources(group, db_session, service_types=None):
# type: (models.Group, Session, Optional[List[Str]]) -> JSON
"""
Get formatted JSON body describing all service resources the ``group`` as permissions on.
"""
json_response = {}
if service_types is None:
service_types = list(SERVICE_TYPE_DICT)
for svc in list(ResourceService.all(models.Service, db_session=db_session)):
svc_type = str(svc.type)
if svc_type not in service_types:
continue
svc_perms = get_group_service_permissions(group=group, service=svc, db_session=db_session)
svc_name = str(svc.resource_name)
if svc_type not in json_response:
json_response[svc_type] = {}
res_perm_dict = get_group_service_resources_permissions_dict(group=group, service=svc, db_session=db_session)
json_response[svc_type][svc_name] = sf.format_service_resources(
svc,
db_session=db_session,
service_perms=svc_perms,
resources_perms_dict=res_perm_dict,
permission_type=PermissionType.APPLIED,
show_all_children=False,
show_private_url=False,
)
return json_response
[docs]
def create_group(group_name, description, discoverable, terms, db_session):
# type: (Str, Str, bool, Str, Session) -> HTTPException
"""
Creates a group if it is permitted and not conflicting.
:returns: valid HTTP response on successful operations.
:raises HTTPException: error HTTP response of corresponding situation.
"""
description = str(description) if description else None
discoverable = asbool(discoverable)
terms = str(terms) if terms else None
group_content_error = {
"group_name": str(group_name),
"description": description,
"discoverable": discoverable
}
ax.verify_param(group_name, matches=True, param_compare=ax.PARAM_REGEX, param_name="group_name",
http_error=HTTPBadRequest, content=group_content_error,
msg_on_fail=s.Groups_POST_BadRequestResponseSchema.description)
if network_enabled():
anonymous_regex = protected_group_name_regex(include_admin=False)
ax.verify_param(group_name, not_matches=True, param_compare=anonymous_regex, param_name="group_name",
http_error=HTTPBadRequest, content=group_content_error,
msg_on_fail=s.Groups_POST_BadRequestResponseSchema.description)
if description:
ax.verify_param(description, matches=True, param_compare=ax.PARAM_REGEX, param_name="description",
http_error=HTTPBadRequest, content=group_content_error,
msg_on_fail=s.Groups_POST_BadRequestResponseSchema.description)
group = GroupService.by_group_name(group_name, db_session=db_session)
ax.verify_param(group, is_none=True, param_name="group_name", with_param=False, # don't return group as value
http_error=HTTPConflict, content=group_content_error,
msg_on_fail=s.Groups_POST_ConflictResponseSchema.description)
new_group = ax.evaluate_call(
lambda: models.Group(group_name=group_name,
description=description,
discoverable=discoverable,
terms=terms), # noqa
fallback=lambda: db_session.rollback(), http_error=HTTPForbidden, content=group_content_error,
msg_on_fail=s.Groups_POST_ForbiddenCreateResponseSchema.description)
ax.evaluate_call(lambda: db_session.add(new_group), fallback=lambda: db_session.rollback(),
http_error=HTTPForbidden, content=group_content_error,
msg_on_fail=s.Groups_POST_ForbiddenAddResponseSchema.description)
# re-fetch the created group to update fields with auto-generated group ID
new_group = ax.evaluate_call(lambda: GroupService.by_group_name(group_name, db_session=db_session),
http_error=HTTPForbidden,
msg_on_fail=s.Groups_POST_ForbiddenAddResponseSchema.description)
return ax.valid_http(http_success=HTTPCreated, detail=s.Groups_POST_CreatedResponseSchema.description,
content={"group": format_group(new_group, basic_info=True)})
[docs]
def get_similar_group_resource_permission(group, resource, permission, db_session):
# type: (models.Group, ServiceOrResourceType, PermissionSet, Session) -> Optional[PermissionSet]
"""
Obtains the group service/resource permission that corresponds to the provided one.
Lookup considers only *similar* applied permission such that other permission modifiers don't affect comparison.
"""
permission.type = PermissionType.APPLIED
res_id = resource.resource_id
err_content = {"group": format_group(group, basic_info=True),
"resource": format_resource(resource, basic_info=True),
"permission_name": str(permission), "permission": permission.json()}
def is_similar_permission():
perms_dict = get_group_resources_permissions_dict(group, resource_ids=[res_id], db_session=db_session)
perms_list = perms_dict.get(res_id, [])
return [PermissionSet(perm) for perm in perms_list if perm.like(permission)]
similar_perms = ax.evaluate_call(lambda: is_similar_permission(),
http_error=HTTPForbidden, content=err_content,
msg_on_fail=s.GroupResourcePermissions_Check_ErrorResponseSchema.description)
if not similar_perms:
return None
found_perm = similar_perms[0]
found_perm.type = PermissionType.APPLIED
return found_perm
[docs]
def create_group_resource_permission_response(group, resource, permission, db_session, overwrite=False):
# type: (models.Group, ServiceOrResourceType, PermissionSet, Session, bool) -> HTTPException
"""
Creates a permission on a group/resource combination if it is permitted and not conflicting.
:param group: group for which to create/update the permission.
:param resource: service or resource for which to create the permission.
:param permission: permission with modifiers to be applied.
:param db_session: database connection.
:param overwrite:
If the corresponding `(group, resource, permission[name])` exists, there is a conflict.
Conflict is considered only by permission-name regardless of other modifiers.
If overwrite is ``False``, the conflict will be raised and not be applied.
If overwrite is ``True``, the permission modifiers will be replaced by the new ones, or created if missing.
:returns: valid HTTP response on successful operations.
:raises HTTPException: error HTTP response of corresponding situation.
"""
resource_id = resource.resource_id
check_valid_service_or_resource_permission(permission.name, resource, db_session)
exist_perm = get_similar_group_resource_permission(group, resource, permission, db_session=db_session)
perm_content = {"permission_name": str(permission), "permission": permission.json(),
"resource": format_resource(resource, basic_info=True),
"group": format_group(group, basic_info=True)}
http_success = HTTPCreated
http_detail = s.GroupResourcePermissions_POST_CreatedResponseSchema.description
if overwrite and exist_perm:
# skip similar permission lookup since we already did it
http_success = HTTPOk
http_detail = s.GroupResourcePermissions_PUT_OkResponseSchema.description
delete_group_resource_permission_response(group, resource, exist_perm, db_session=db_session, similar=False)
else:
ax.verify_param(exist_perm, is_none=True, http_error=HTTPConflict, content=perm_content,
msg_on_fail=s.GroupResourcePermissions_POST_ConflictResponseSchema.description)
new_perm = ax.evaluate_call(
lambda: models.GroupResourcePermission(resource_id=resource_id, group_id=group.id, perm_name=str(permission)),
fallback=lambda: db_session.rollback(), http_error=HTTPForbidden, content=perm_content,
msg_on_fail=s.GroupResourcePermissions_POST_ForbiddenCreateResponseSchema.description)
ax.evaluate_call(lambda: db_session.add(new_perm), fallback=lambda: db_session.rollback(),
http_error=HTTPForbidden, content=perm_content,
msg_on_fail=s.GroupResourcePermissions_POST_ForbiddenAddResponseSchema.description)
webhook_params = get_permission_update_params(group, resource, permission, db_session)
process_webhook_requests(WebhookAction.CREATE_GROUP_PERMISSION, webhook_params)
return ax.valid_http(http_success=http_success, content=perm_content, detail=http_detail)
[docs]
def get_group_resources_permissions_dict(group, db_session, resource_ids=None, resource_types=None):
# type: (models.Group, Session, Optional[Iterable[int]], Optional[Iterable[Str]]) -> ResourcePermissionMap
"""
Get a dictionary of resources and corresponding permissions that a group has on the resources.
Filter search by ``resource_ids`` and/or ``resource_types`` if specified.
"""
def get_grp_res_perm(grp, db, res_ids, res_types):
res_perms_tup = GroupService.resources_with_possible_perms(
grp, resource_ids=res_ids, resource_types=res_types, db_session=db)
res_perms_dict = {}
for res_perm in res_perms_tup:
if res_perm.resource.resource_id not in res_perms_dict:
res_perms_dict[res_perm.resource.resource_id] = [PermissionSet(res_perm)]
else:
res_perms_dict[res_perm.resource.resource_id].append(PermissionSet(res_perm))
return res_perms_dict
return ax.evaluate_call(lambda: get_grp_res_perm(group, db_session, resource_ids, resource_types),
fallback=lambda: db_session.rollback(),
http_error=HTTPInternalServerError,
msg_on_fail=s.GroupResourcesPermissions_InternalServerErrorResponseSchema.description,
content={"group": repr(group), "resource_ids": repr(resource_ids),
"resource_types": repr(resource_types)})
[docs]
def get_group_resource_permissions_response(group, resource, db_session):
# type: (models.Group, models.Resource, Session) -> HTTPException
"""
Get validated response with group resource permissions as content.
:returns: valid HTTP response on successful operations.
:raises HTTPException: error HTTP response of corresponding situation.
"""
def get_grp_res_perms(grp, res, db):
if res.owner_group_id == grp.id:
# FIXME: no 'magpie.models.Resource.permissions' - ok for now because no owner handling...
return models.RESOURCE_TYPE_DICT[res.type].permissions
perms = db.query(models.GroupResourcePermission) \
.filter(models.GroupResourcePermission.resource_id == res.resource_id) \
.filter(models.GroupResourcePermission.group_id == grp.id)
return [PermissionSet(p, typ=PermissionType.APPLIED) for p in perms]
group_permissions = ax.evaluate_call(
lambda: format_permissions(get_grp_res_perms(group, resource, db_session), PermissionType.APPLIED),
http_error=HTTPInternalServerError,
msg_on_fail=s.GroupResourcePermissions_InternalServerErrorResponseSchema.description,
content={"group": repr(group), "resource": repr(resource)})
return ax.valid_http(http_success=HTTPOk, detail=s.GroupResourcePermissions_GET_OkResponseSchema.description,
content=group_permissions)
[docs]
def delete_group_resource_permission_response(group, resource, permission, db_session, similar=True):
# type: (models.Group, ServiceOrResourceType, PermissionSet, Session, bool) -> HTTPException
"""
Get validated response on deleted group resource permission.
:param group: group for which to delete the permission.
:param resource: service or resource for which to delete the permission.
:param permission: permission with modifiers to be deleted.
:param db_session: database connection.
:param similar:
Allow matching provided permission against any similar database permission. Otherwise, must match exactly.
:returns: valid HTTP response on successful operations.
:raises HTTPException: error HTTP response of corresponding situation.
"""
check_valid_service_or_resource_permission(permission.name, resource, db_session)
res_id = resource.resource_id
if similar:
found_perm = get_similar_group_resource_permission(group, resource, permission, db_session=db_session)
else:
found_perm = permission
del_perm = GroupResourcePermissionService.get(group.id, res_id, str(found_perm), db_session=db_session)
permission.type = PermissionType.APPLIED
perm_content = {"permission_name": str(permission), "permission": permission.json(),
"resource": format_resource(resource, basic_info=True),
"group": format_group(group, basic_info=True)}
ax.verify_param(del_perm, not_none=True, http_error=HTTPNotFound, content=perm_content,
msg_on_fail=s.GroupServicePermission_DELETE_NotFoundResponseSchema.description)
ax.evaluate_call(lambda: db_session.delete(del_perm), fallback=lambda: db_session.rollback(),
http_error=HTTPForbidden, content=perm_content,
msg_on_fail=s.GroupServicePermission_DELETE_ForbiddenResponseSchema.description)
webhook_params = get_permission_update_params(group, resource, permission, db_session)
process_webhook_requests(WebhookAction.DELETE_GROUP_PERMISSION, webhook_params)
return ax.valid_http(http_success=HTTPOk, detail=s.GroupServicePermission_DELETE_OkResponseSchema.description)
[docs]
def get_group_services(resources_permissions_dict, db_session, service_types=None):
# type: (JSON, Session, Optional[List[Str]]) -> JSON
"""
Nest and regroup the resource permissions under corresponding root service types.
"""
grp_svc_dict = {}
if service_types is None:
service_types = list(SERVICE_TYPE_DICT)
for res_id, perms in resources_permissions_dict.items():
svc = ResourceService.by_resource_id(resource_id=res_id, db_session=db_session)
svc_type = str(svc.type)
if svc_type not in service_types:
continue
svc_name = str(svc.resource_name)
if svc_type not in grp_svc_dict:
grp_svc_dict[svc_type] = {}
grp_svc_dict[svc_type][svc_name] = sf.format_service(svc, perms, show_private_url=False)
return grp_svc_dict
[docs]
def get_group_services_response(group, db_session, service_types=None):
# type: (models.Group, Session, Optional[List[Str]]) -> HTTPException
"""
Get validated response of services the group has permissions on.
:returns: valid HTTP response on successful operations.
:raises HTTPException: error HTTP response of corresponding situation.
"""
res_perm_dict = get_group_resources_permissions_dict(group,
resource_types=[models.Service.resource_type_name],
db_session=db_session)
grp_svc_json = ax.evaluate_call(lambda: get_group_services(res_perm_dict, db_session, service_types=service_types),
http_error=HTTPInternalServerError,
msg_on_fail=s.GroupServices_InternalServerErrorResponseSchema.description,
content={"group": format_group(group, basic_info=True)})
return ax.valid_http(http_success=HTTPOk, detail=s.GroupServices_GET_OkResponseSchema.description,
content={"services": grp_svc_json})
[docs]
def get_group_service_permissions(group, service, db_session):
# type: (models.Group, models.Service, Session) -> List[PermissionSet]
"""
Get all permissions the group has on a specific service.
"""
def get_grp_svc_perms():
if service.owner_group_id == group.id:
perm_type = PermissionType.OWNED
perms = SERVICE_TYPE_DICT[service.type].permissions
else:
perm_type = PermissionType.APPLIED
perms = db_session.query(models.GroupResourcePermission) \
.filter(models.GroupResourcePermission.resource_id == service.resource_id) \
.filter(models.GroupResourcePermission.group_id == group.id)
return [PermissionSet(perm, typ=perm_type) for perm in perms]
return ax.evaluate_call(lambda: get_grp_svc_perms(),
http_error=HTTPInternalServerError,
msg_on_fail="Failed to obtain group service permissions",
content={"group": repr(group), "service": repr(service)})
[docs]
def get_group_service_permissions_response(group, service, db_session):
# type: (models.Group, models.Service, Session) -> HTTPException
"""
Get validated response of found group service permissions.
:returns: valid HTTP response on successful operations.
:raises HTTPException: error HTTP response of corresponding situation.
"""
svc_perms_found = ax.evaluate_call(
lambda: format_permissions(get_group_service_permissions(group, service, db_session), PermissionType.APPLIED),
http_error=HTTPInternalServerError,
msg_on_fail=s.GroupServicePermissions_GET_InternalServerErrorResponseSchema.description,
content={"group": format_group(group, basic_info=True), "service": sf.format_service(service)})
return ax.valid_http(http_success=HTTPOk, content=svc_perms_found,
detail=s.GroupServicePermissions_GET_OkResponseSchema.description)
[docs]
def get_group_service_resources_permissions_dict(group, service, db_session):
# type: (models.Group, models.Service, Session) -> ResourcePermissionMap
"""
Get all permissions the group has on a specific service's children resources.
"""
res_id = service.resource_id
res_under_svc = models.RESOURCE_TREE_SERVICE.from_parent_deeper(parent_id=res_id, db_session=db_session)
res_ids = [resource.Resource.resource_id for resource in res_under_svc]
return get_group_resources_permissions_dict(group, db_session, resource_types=None, resource_ids=res_ids)
[docs]
def get_group_service_resources_response(group, service, db_session):
# type: (models.Group, models.Service, Session) -> HTTPException
"""
Get validated response of all found service resources which the group has permissions on.
:returns: valid HTTP response on successful operations.
:raises HTTPException: error HTTP response of corresponding situation.
"""
svc_perms = get_group_service_permissions(group=group, service=service, db_session=db_session)
res_perms = get_group_service_resources_permissions_dict(group=group, service=service, db_session=db_session)
svc_res_json = sf.format_service_resources(
service=service,
db_session=db_session,
service_perms=svc_perms,
resources_perms_dict=res_perms,
show_all_children=False,
show_private_url=False,
)
return ax.valid_http(http_success=HTTPOk, detail=s.GroupServiceResources_GET_OkResponseSchema.description,
content={"service": svc_res_json})