from pyramid.httpexceptions import HTTPBadRequest, HTTPConflict, HTTPForbidden, HTTPInternalServerError, HTTPOk
from pyramid.settings import asbool
from pyramid.view import view_config
from ziggurat_foundations.models.services.group import GroupService
from magpie.api import exception as ax
from magpie.api import requests as ar
from magpie.api import schemas as s
from magpie.api.management.group import group_formats as gf
from magpie.api.management.group import group_utils as gu
from magpie.api.management.service import service_utils as su
from magpie.constants import get_constant, network_enabled, protected_group_name_regex
from magpie.models import TemporaryToken, TokenOperation, UserGroupStatus
@s.GroupsAPI.get(tags=[s.GroupsTag], response_schemas=s.Groups_GET_responses)
@view_config(route_name=s.GroupsAPI.name, request_method="GET")
[docs]
def get_groups_view(request):
"""
Get list of group names.
"""
group_names = gu.get_all_group_names(request.db)
return ax.valid_http(http_success=HTTPOk, detail=s.Groups_GET_OkResponseSchema.description,
content={"group_names": group_names})
@s.GroupsAPI.post(schema=s.Groups_POST_RequestSchema, tags=[s.GroupsTag], response_schemas=s.Groups_POST_responses)
@view_config(route_name=s.GroupsAPI.name, request_method="POST")
[docs]
def create_group_view(request):
"""
Create a group.
"""
group_name = ar.get_value_multiformat_body_checked(request, "group_name")
group_desc = ar.get_multiformat_body(request, "description", default=None)
group_disc = asbool(ar.get_multiformat_body(request, "discoverable", default=False))
group_terms = ar.get_multiformat_body(request, "terms", default=None)
return gu.create_group(group_name, group_desc, group_disc, group_terms, request.db)
@s.GroupAPI.get(schema=s.Group_GET_RequestSchema, tags=[s.GroupsTag], response_schemas=s.Group_GET_responses)
@view_config(route_name=s.GroupAPI.name, request_method="GET")
[docs]
def get_group_view(request):
"""
Get group information.
"""
group = ar.get_group_matchdict_checked(request, group_name_key="group_name")
return ax.valid_http(http_success=HTTPOk, detail=s.Group_GET_OkResponseSchema.description,
content={"group": gf.format_group(group, db_session=request.db)})
@s.GroupAPI.patch(schema=s.Group_PATCH_RequestSchema, tags=[s.GroupsTag], response_schemas=s.Group_PATCH_responses)
@view_config(route_name=s.GroupAPI.name, request_method="PATCH")
[docs]
def edit_group_view(request):
"""
Update a group by name.
"""
group = ar.get_group_matchdict_checked(request, group_name_key="group_name")
special_groups = [
get_constant("MAGPIE_ANONYMOUS_GROUP", settings_container=request),
get_constant("MAGPIE_ADMIN_GROUP", settings_container=request),
]
ax.verify_param(group.group_name, not_in=True, param_compare=special_groups, param_name="group_name",
http_error=HTTPForbidden,
msg_on_fail=s.Group_PATCH_ReservedKeyword_ForbiddenResponseSchema.description)
new_group_name = ar.get_multiformat_body(request, "group_name")
new_description = ar.get_multiformat_body(request, "description")
new_discoverability = ar.get_multiformat_body(request, "discoverable")
if new_discoverability is not None:
new_discoverability = asbool(new_discoverability)
update_name = group.group_name != new_group_name and new_group_name is not None
update_desc = group.description != new_description and new_description is not None
update_disc = group.discoverable != new_discoverability and new_discoverability is not None
ax.verify_param(any([update_name, update_desc, update_disc]), is_true=True,
with_param=False, # params are not useful in response for this case
http_error=HTTPBadRequest, content={"group_name": group.group_name},
msg_on_fail=s.Group_PATCH_None_BadRequestResponseSchema.description)
if update_name:
ax.verify_param(new_group_name, not_none=True, not_empty=True, http_error=HTTPBadRequest,
msg_on_fail=s.Group_PATCH_Name_BadRequestResponseSchema.description)
group_name_size_range = range(1, 1 + get_constant("MAGPIE_GROUP_NAME_MAX_LENGTH", settings_container=request))
ax.verify_param(len(new_group_name), is_in=True, param_compare=group_name_size_range,
http_error=HTTPBadRequest,
msg_on_fail=s.Group_PATCH_Size_BadRequestResponseSchema.description)
ax.verify_param(GroupService.by_group_name(new_group_name, db_session=request.db),
is_none=True, http_error=HTTPConflict, with_param=False, # don't return group as value
msg_on_fail=s.Group_PATCH_ConflictResponseSchema.description)
if network_enabled():
anonymous_regex = protected_group_name_regex(include_admin=False)
ax.verify_param(new_group_name, not_matches=True, param_compare=anonymous_regex,
http_error=HTTPBadRequest,
msg_on_fail=s.Group_PATCH_Size_BadRequestResponseSchema.description)
group.group_name = new_group_name
if update_desc:
group.description = new_description
if update_disc:
group.discoverable = new_discoverability
return ax.valid_http(http_success=HTTPOk, detail=s.Group_PATCH_OkResponseSchema.description)
@s.GroupAPI.delete(schema=s.Group_DELETE_RequestSchema, tags=[s.GroupsTag], response_schemas=s.Group_DELETE_responses)
@view_config(route_name=s.GroupAPI.name, request_method="DELETE")
[docs]
def delete_group_view(request):
"""
Delete a group by name.
"""
group = ar.get_group_matchdict_checked(request)
special_groups = [
get_constant("MAGPIE_ANONYMOUS_GROUP", settings_container=request),
get_constant("MAGPIE_ADMIN_GROUP", settings_container=request),
]
ax.verify_param(group.group_name, not_in=True, param_compare=special_groups, param_name="group_name",
http_error=HTTPForbidden,
msg_on_fail=s.Group_DELETE_ReservedKeyword_ForbiddenResponseSchema.description)
ax.evaluate_call(lambda: request.db.delete(group),
fallback=lambda: request.db.rollback(), http_error=HTTPForbidden,
msg_on_fail=s.Group_DELETE_ForbiddenResponseSchema.description)
return ax.valid_http(http_success=HTTPOk, detail=s.Group_DELETE_OkResponseSchema.description)
@s.GroupUsersAPI.get(schema=s.GroupUsers_GET_RequestSchema, tags=[s.GroupsTag],
response_schemas=s.GroupUsers_GET_responses)
@view_config(route_name=s.GroupUsersAPI.name, request_method="GET")
[docs]
def get_group_users_view(request):
"""
List all users from a group.
Users can be filtered by status depending of input arguments.
"""
group = ar.get_group_matchdict_checked(request)
status = ar.get_query_param(request, "status", default=UserGroupStatus.ACTIVE.value)
ax.verify_param(status, is_in=True, param_compare=UserGroupStatus.values(), param_name="status",
msg_on_fail=s.UserGroup_Check_Status_BadRequestResponseSchema.description,
http_error=HTTPBadRequest)
status = UserGroupStatus.get(status)
user_names = set()
member_user_names = ax.evaluate_call(lambda: set(user.user_name for user in group.users),
http_error=HTTPForbidden,
msg_on_fail=s.GroupUsers_GET_ForbiddenResponseSchema.description)
if status in [UserGroupStatus.ACTIVE, UserGroupStatus.ALL]:
user_names = user_names.union(member_user_names)
if status in [UserGroupStatus.PENDING, UserGroupStatus.ALL]:
# Find all temporary tokens with requested group id that have a pending accept terms request
tmp_tokens = request.db.query(TemporaryToken).filter(TemporaryToken.group_id == group.id)
tmp_tokens = tmp_tokens.filter(TemporaryToken.operation == TokenOperation.GROUP_ACCEPT_TERMS)
# Find and return all user names associated with the discovered tokens
pending_user_names = set(tmp_token.user.user_name for tmp_token in tmp_tokens)
# Remove any user already belonging to the group, in case any tokens are irrelevant.
# Should not happen since related tokens are deleted upon T&C acceptation.
pending_user_names = pending_user_names - member_user_names
user_names = user_names.union(pending_user_names)
return ax.valid_http(http_success=HTTPOk, detail=s.GroupUsers_GET_OkResponseSchema.description,
content={"user_names": sorted(user_names)})
@s.GroupServicesAPI.get(schema=s.GroupServices_GET_RequestSchema, tags=[s.GroupsTag],
response_schemas=s.GroupServices_GET_responses)
@view_config(route_name=s.GroupServicesAPI.name, request_method="GET")
[docs]
def get_group_services_view(request):
"""
List all services a group has permission on.
"""
group = ar.get_group_matchdict_checked(request)
service_types = ar.get_query_param(request, ["type", "types"], default="")
service_types = su.filter_service_types(service_types)
return gu.get_group_services_response(group, request.db, service_types=service_types)
@s.GroupServicePermissionsAPI.get(schema=s.GroupServicePermissions_GET_RequestSchema,
tags=[s.GroupsTag, s.PermissionTag],
response_schemas=s.GroupServicePermissions_GET_responses)
@view_config(route_name=s.GroupServicePermissionsAPI.name, request_method="GET")
[docs]
def get_group_service_permissions_view(request):
"""
List all permissions a group has on a specific service.
"""
group = ar.get_group_matchdict_checked(request)
service = ar.get_service_matchdict_checked(request)
return gu.get_group_service_permissions_response(group, service, request.db)
@s.GroupServicePermissionsAPI.post(schema=s.GroupServicePermissions_POST_RequestSchema,
tags=[s.GroupsTag, s.PermissionTag],
response_schemas=s.GroupServicePermissions_POST_responses)
@view_config(route_name=s.GroupServicePermissionsAPI.name, request_method="POST")
[docs]
def create_group_service_permission_view(request):
"""
Create a permission on a specific resource for a group.
"""
group = ar.get_group_matchdict_checked(request)
service = ar.get_service_matchdict_checked(request)
permission = ar.get_permission_multiformat_body_checked(request, service)
return gu.create_group_resource_permission_response(group, service, permission, request.db, overwrite=False)
@s.GroupServicePermissionsAPI.put(schema=s.GroupServicePermissions_PUT_RequestSchema,
tags=[s.GroupsTag, s.PermissionTag],
response_schemas=s.GroupServicePermissions_PUT_responses)
@view_config(route_name=s.GroupServicePermissionsAPI.name, request_method="PUT")
[docs]
def replace_group_service_permissions_view(request):
"""
Create or modify an existing permission on a service for a group.
Can be used to adjust permission modifiers.
"""
group = ar.get_group_matchdict_checked(request)
service = ar.get_service_matchdict_checked(request)
permission = ar.get_permission_multiformat_body_checked(request, service)
return gu.create_group_resource_permission_response(group, service, permission, request.db, overwrite=True)
@s.GroupServicePermissionsAPI.delete(schema=s.GroupServicePermissions_DELETE_RequestSchema,
tags=[s.GroupsTag, s.PermissionTag],
response_schemas=s.GroupServicePermissions_DELETE_responses)
@view_config(route_name=s.GroupServicePermissionsAPI.name, request_method="DELETE")
[docs]
def delete_group_service_permission_view(request):
"""
Delete a permission from a specific resource for a group.
"""
group = ar.get_group_matchdict_checked(request)
service = ar.get_service_matchdict_checked(request)
permission = ar.get_permission_multiformat_body_checked(request, service)
return gu.delete_group_resource_permission_response(group, service, permission, db_session=request.db)
@s.GroupServicePermissionAPI.delete(schema=s.GroupServicePermissionName_DELETE_RequestSchema,
tags=[s.GroupsTag, s.PermissionTag],
response_schemas=s.GroupServicePermissionName_DELETE_responses)
@view_config(route_name=s.GroupServicePermissionAPI.name, request_method="DELETE")
[docs]
def delete_group_service_permission_name_view(request):
"""
Delete a permission by name from a specific service for a group.
"""
group = ar.get_group_matchdict_checked(request)
service = ar.get_service_matchdict_checked(request)
permission = ar.get_permission_matchdict_checked(request, service)
return gu.delete_group_resource_permission_response(group, service, permission, db_session=request.db)
@s.GroupResourcesAPI.get(schema=s.GroupResources_GET_RequestSchema, tags=[s.GroupsTag],
response_schemas=s.GroupResources_GET_responses)
@view_config(route_name=s.GroupResourcesAPI.name, request_method="GET")
[docs]
def get_group_resources_view(request):
"""
List all resources a group has permission on.
"""
group = ar.get_group_matchdict_checked(request)
service_types = ar.get_query_param(request, ["type", "types"], default="")
service_types = su.filter_service_types(service_types)
grp_res_json = ax.evaluate_call(lambda: gu.get_group_resources(group, request.db, service_types=service_types),
fallback=lambda: request.db.rollback(),
http_error=HTTPInternalServerError, content={"group": repr(group)},
msg_on_fail=s.GroupResources_GET_InternalServerErrorResponseSchema.description)
return ax.valid_http(http_success=HTTPOk, detail=s.GroupResources_GET_OkResponseSchema.description,
content={"resources": grp_res_json})
@s.GroupResourcePermissionsAPI.get(schema=s.GroupResourcePermissions_GET_RequestSchema,
tags=[s.GroupsTag, s.PermissionTag],
response_schemas=s.GroupResourcePermissions_GET_responses)
@view_config(route_name=s.GroupResourcePermissionsAPI.name, request_method="GET")
[docs]
def get_group_resource_permissions_view(request):
"""
List all permissions a group has on a specific resource.
"""
group = ar.get_group_matchdict_checked(request)
resource = ar.get_resource_matchdict_checked(request)
return gu.get_group_resource_permissions_response(group, resource, db_session=request.db)
@s.GroupResourcePermissionsAPI.post(schema=s.GroupResourcePermissions_POST_RequestSchema,
tags=[s.GroupsTag, s.PermissionTag],
response_schemas=s.GroupResourcePermissions_POST_responses)
@view_config(route_name=s.GroupResourcePermissionsAPI.name, request_method="POST")
[docs]
def create_group_resource_permissions_view(request):
"""
Create a permission on a specific resource for a group.
"""
group = ar.get_group_matchdict_checked(request)
resource = ar.get_resource_matchdict_checked(request)
permission = ar.get_permission_multiformat_body_checked(request, resource)
return gu.create_group_resource_permission_response(group, resource, permission, db_session=request.db)
@s.GroupResourcePermissionsAPI.put(schema=s.GroupResourcePermissions_PUT_RequestSchema,
tags=[s.GroupsTag, s.PermissionTag],
response_schemas=s.GroupResourcePermissions_PUT_responses)
@view_config(route_name=s.GroupResourcePermissionsAPI.name, request_method="PUT")
[docs]
def replace_group_resource_permissions_view(request):
"""
Create or modify an existing permission on a resource for a group.
Can be used to adjust permission modifiers.
"""
group = ar.get_group_matchdict_checked(request)
resource = ar.get_resource_matchdict_checked(request)
permission = ar.get_permission_multiformat_body_checked(request, resource)
return gu.create_group_resource_permission_response(group, resource, permission, request.db, overwrite=True)
@s.GroupResourcePermissionsAPI.delete(schema=s.GroupResourcePermissions_DELETE_RequestSchema,
tags=[s.GroupsTag, s.PermissionTag],
response_schemas=s.GroupResourcePermissions_DELETE_responses)
@view_config(route_name=s.GroupResourcePermissionsAPI.name, request_method="DELETE")
[docs]
def delete_group_resource_permissions_view(request):
"""
Delete a permission from a specific resource for a group.
"""
group = ar.get_group_matchdict_checked(request)
resource = ar.get_resource_matchdict_checked(request)
permission = ar.get_permission_multiformat_body_checked(request, resource)
return gu.delete_group_resource_permission_response(group, resource, permission, db_session=request.db)
@s.GroupResourcePermissionAPI.delete(schema=s.GroupResourcePermissionName_DELETE_RequestSchema,
tags=[s.GroupsTag, s.PermissionTag],
response_schemas=s.GroupResourcePermissionName_DELETE_responses)
@view_config(route_name=s.GroupResourcePermissionAPI.name, request_method="DELETE")
[docs]
def delete_group_resource_permission_name_view(request):
"""
Delete a permission by name from a specific resource for a group.
"""
group = ar.get_group_matchdict_checked(request)
resource = ar.get_resource_matchdict_checked(request)
permission = ar.get_permission_matchdict_checked(request, resource)
return gu.delete_group_resource_permission_response(group, resource, permission, db_session=request.db)
@s.GroupServiceResourcesAPI.get(schema=s.GroupServiceResources_GET_RequestSchema,
tags=[s.GroupsTag, s.PermissionTag],
response_schemas=s.GroupServiceResources_GET_responses)
@view_config(route_name=s.GroupServiceResourcesAPI.name, request_method="GET")
[docs]
def get_group_service_resources_view(request):
"""
List all resources under a service a group has permission on.
"""
group = ar.get_group_matchdict_checked(request)
service = ar.get_service_matchdict_checked(request)
return gu.get_group_service_resources_response(group, service, request.db)