"""
User Views, both for specific user-name provided as request path variable and special keyword for logged session user.
"""
from pyramid.httpexceptions import HTTPBadRequest, HTTPForbidden, HTTPNotFound, HTTPOk
from pyramid.settings import asbool
from pyramid.view import view_config
from ziggurat_foundations.models.services.group import GroupService
from ziggurat_foundations.models.services.resource import ResourceService
from magpie import models
from magpie.api import exception as ax
from magpie.api import requests as ar
from magpie.api import schemas as s
from magpie.api.management.service import service_formats as sf
from magpie.api.management.service import service_utils as su
from magpie.api.management.user import user_formats as uf
from magpie.api.management.user import user_utils as uu
from magpie.api.webhooks import WebhookAction, process_webhook_requests
from magpie.constants import MAGPIE_CONTEXT_PERMISSION, MAGPIE_LOGGED_PERMISSION, get_constant
from magpie.models import UserGroupStatus
from magpie.permissions import PermissionType, format_permissions
from magpie.utils import get_logger
[docs]
LOGGER = get_logger(__name__)
@s.UsersAPI.get(schema=s.Users_GET_RequestSchema, tags=[s.UsersTag], response_schemas=s.Users_GET_responses)
@view_config(route_name=s.UsersAPI.name, request_method="GET")
[docs]
def get_users_view(request):
"""
List all registered user names or details.
"""
query = request.params.get("status")
status = None
if query is not None:
status = models.UserStatuses.get(query)
allowed = models.UserStatuses.allowed()
ax.verify_param(status, not_none=True, param_name="status",
param_content={"compare": allowed}, # provide literals in error response
http_error=HTTPBadRequest, msg_on_fail=s.Users_GET_BadRequestSchema.description)
detail = asbool(request.params.get("detail", False))
user_list = ax.evaluate_call(lambda: models.UserSearchService.by_status(status, db_session=request.db),
fallback=lambda: request.db.rollback(), http_error=HTTPForbidden,
msg_on_fail=s.Users_GET_ForbiddenResponseSchema.description)
if detail:
data = {"users": list(sorted([uf.format_user(user, basic_info=True) for user in user_list],
key=lambda user: user["user_name"]))}
else:
data = {"user_names": list(sorted(user.user_name for user in user_list))}
return ax.valid_http(http_success=HTTPOk, content=data, detail=s.Users_GET_OkResponseSchema.description)
@s.UsersAPI.post(schema=s.Users_POST_RequestSchema, tags=[s.UsersTag], response_schemas=s.Users_POST_responses)
@view_config(route_name=s.UsersAPI.name, request_method="POST")
[docs]
def create_user_view(request):
"""
Create a new user.
"""
user_name = ar.get_multiformat_body(request, "user_name")
email = ar.get_multiformat_body(request, "email")
password = ar.get_multiformat_body(request, "password")
group_name = ar.get_multiformat_body(request, "group_name")
return uu.create_user(user_name, password, email, group_name, db_session=request.db)
@s.UserAPI.patch(schema=s.User_PATCH_RequestSchema, tags=[s.UsersTag], response_schemas=s.User_PATCH_responses)
@s.LoggedUserAPI.patch(schema=s.User_PATCH_RequestSchema, tags=[s.LoggedUserTag],
response_schemas=s.LoggedUser_PATCH_responses)
@view_config(route_name=s.UserAPI.name, request_method="PATCH", permission=MAGPIE_LOGGED_PERMISSION)
[docs]
def update_user_view(request):
"""
Update user information by user name.
"""
user = ar.get_user_matchdict_checked_or_logged(request)
uu.check_user_editable(user, request)
new_user_name = ar.get_multiformat_body(request, "user_name", default=user.user_name)
new_email = ar.get_multiformat_body(request, "email", default=user.email)
new_password = ar.get_multiformat_body(request, "password", default=user.user_password)
new_status = models.UserStatuses.get(ar.get_multiformat_body(request, "status", default=None))
uu.update_user(user, request, new_user_name, new_password, new_email, new_status)
return ax.valid_http(http_success=HTTPOk, detail=s.Users_PATCH_OkResponseSchema.description)
@s.UserAPI.get(schema=s.User_GET_RequestSchema, tags=[s.UsersTag],
response_schemas=s.User_GET_responses, api_security=s.SecurityAuthenticatedAPI)
@s.LoggedUserAPI.get(schema=s.User_GET_RequestSchema, tags=[s.LoggedUserTag],
response_schemas=s.LoggedUser_GET_responses, api_security=s.SecurityAuthenticatedAPI)
@view_config(route_name=s.UserAPI.name, request_method="GET", permission=MAGPIE_CONTEXT_PERMISSION)
[docs]
def get_user_view(request):
"""
Get user information by name.
"""
user = ar.get_user_matchdict_checked_or_logged(request)
return ax.valid_http(http_success=HTTPOk, content={"user": uf.format_user(user)},
detail=s.User_GET_OkResponseSchema.description)
@s.UserAPI.delete(schema=s.User_DELETE_RequestSchema, tags=[s.UsersTag], response_schemas=s.User_DELETE_responses)
@s.LoggedUserAPI.delete(schema=s.User_DELETE_RequestSchema, tags=[s.LoggedUserTag],
response_schemas=s.LoggedUser_DELETE_responses)
@view_config(route_name=s.UserAPI.name, request_method="DELETE", permission=MAGPIE_LOGGED_PERMISSION)
[docs]
def delete_user_view(request):
"""
Delete a user by name.
"""
user = ar.get_user_matchdict_checked_or_logged(request)
uu.check_user_editable(user, request)
ax.verify_param(user.user_name, not_equal=True, with_param=False, # avoid leaking username details
param_compare=get_constant("MAGPIE_ADMIN_USER", request),
http_error=HTTPForbidden, msg_on_fail=s.User_DELETE_ForbiddenResponseSchema.description)
ax.evaluate_call(lambda: request.db.delete(user), fallback=lambda: request.db.rollback(),
http_error=HTTPForbidden, msg_on_fail=s.User_DELETE_ForbiddenResponseSchema.description)
# Process any webhook requests
webhook_params = {"user.name": user.user_name, "user.id": user.id, "user.email": user.email}
process_webhook_requests(WebhookAction.DELETE_USER, webhook_params)
return ax.valid_http(http_success=HTTPOk, detail=s.User_DELETE_OkResponseSchema.description)
@s.UserGroupsAPI.get(schema=s.UserGroups_GET_RequestSchema, tags=[s.UsersTag],
response_schemas=s.UserGroups_GET_responses, api_security=s.SecurityAuthenticatedAPI)
@s.LoggedUserGroupsAPI.get(schema=s.UserGroups_GET_RequestSchema, tags=[s.LoggedUserTag],
response_schemas=s.LoggedUserGroups_GET_responses, api_security=s.SecurityAuthenticatedAPI)
@view_config(route_name=s.UserGroupsAPI.name, request_method="GET", permission=MAGPIE_CONTEXT_PERMISSION)
[docs]
def get_user_groups_view(request):
"""
List all groups a user belongs to.
Groups can be filtered by status depending of input arguments.
"""
user = ar.get_user_matchdict_checked_or_logged(request)
status = ar.get_query_param(request, "status", default=UserGroupStatus.ACTIVE.value)
ax.verify_param(status, is_in=True, param_compare=UserGroupStatus.values(), param_name="status",
msg_on_fail=s.UserGroup_Check_Status_BadRequestResponseSchema.description,
http_error=HTTPBadRequest)
status = UserGroupStatus.get(status)
group_names = user.get_groups_by_status(status, request.db)
return ax.valid_http(http_success=HTTPOk, content={"group_names": sorted(group_names)},
detail=s.UserGroups_GET_OkResponseSchema.description)
@s.UserGroupsAPI.post(schema=s.UserGroups_POST_RequestSchema, tags=[s.UsersTag],
response_schemas=s.UserGroups_POST_responses)
@s.LoggedUserGroupsAPI.post(schema=s.UserGroups_POST_RequestSchema, tags=[s.LoggedUserTag],
response_schemas=s.LoggedUserGroups_POST_responses)
@view_config(route_name=s.UserGroupsAPI.name, request_method="POST")
[docs]
def assign_user_group_view(request):
"""
Assign a user to a group.
"""
user = ar.get_user_matchdict_checked_or_logged(request)
uu.check_user_editable(user, request)
group_name = ar.get_value_multiformat_body_checked(request, "group_name")
group = ax.evaluate_call(lambda: GroupService.by_group_name(group_name, db_session=request.db),
fallback=lambda: request.db.rollback(), http_error=HTTPForbidden,
msg_on_fail=s.UserGroups_POST_ForbiddenResponseSchema.description)
ax.verify_param(group, not_none=True, http_error=HTTPNotFound,
msg_on_fail=s.UserGroups_POST_GroupNotFoundResponseSchema.description)
return uu.create_pending_or_assign_user_group(user, group, db_session=request.db)
@s.UserGroupAPI.delete(schema=s.UserGroup_DELETE_RequestSchema, tags=[s.UsersTag],
response_schemas=s.UserGroup_DELETE_responses)
@s.LoggedUserGroupAPI.delete(schema=s.UserGroup_DELETE_RequestSchema, tags=[s.LoggedUserTag],
response_schemas=s.LoggedUserGroup_DELETE_responses)
@view_config(route_name=s.UserGroupAPI.name, request_method="DELETE")
[docs]
def delete_user_group_view(request):
"""
Removes a user from a group.
"""
user = ar.get_user_matchdict_checked_or_logged(request)
uu.check_user_editable(user, request)
group = ar.get_group_matchdict_checked(request)
uu.delete_user_group(user, group, request.db)
return ax.valid_http(http_success=HTTPOk, detail=s.UserGroup_DELETE_OkResponseSchema.description)
@s.UserResourcesAPI.get(schema=s.UserResources_GET_RequestSchema(),
tags=[s.UsersTag], api_security=s.SecurityAuthenticatedAPI,
response_schemas=s.UserResources_GET_responses)
@s.LoggedUserResourcesAPI.get(schema=s.UserResources_GET_RequestSchema(),
tags=[s.LoggedUserTag], api_security=s.SecurityAuthenticatedAPI,
response_schemas=s.LoggedUserResources_GET_responses)
@view_config(route_name=s.UserResourcesAPI.name, request_method="GET", permission=MAGPIE_CONTEXT_PERMISSION)
[docs]
def get_user_resources_view(request):
"""
List all resources a user has permissions on.
"""
inherit_groups_perms = asbool(ar.get_query_param(request, ["inherit", "inherited"]))
resolve_groups_perms = asbool(ar.get_query_param(request, ["resolve", "resolved"]))
filtered_perms = asbool(ar.get_query_param(request, ["filter", "filtered"]))
service_types = ar.get_query_param(request, ["type", "types"], default="")
service_types = su.filter_service_types(service_types, default_services=True)
user = ar.get_user_matchdict_checked_or_logged(request)
db = request.db
# skip admin-only full listing of resources if filtered view is requested
is_admin = False
if not filtered_perms and request.user is not None:
admin_group = get_constant("MAGPIE_ADMIN_GROUP", settings_container=request)
is_admin = admin_group in [group.group_name for group in request.user.groups]
def build_json_user_resource_tree(usr):
json_res = {}
perm_type = PermissionType.INHERITED if inherit_groups_perms else PermissionType.DIRECT
services = ResourceService.all(models.Service, db_session=db)
services = services.filter(models.Service.type.in_(service_types)) # pylint: disable=E1101,no-member
# add service-types so they are ordered and listed if no service of that type was defined
for svc_type in sorted(service_types):
json_res[svc_type] = {}
for svc in services:
svc_perms = uu.get_user_service_permissions(
user=usr, service=svc, request=request,
inherit_groups_permissions=inherit_groups_perms, resolve_groups_permissions=resolve_groups_perms)
res_perms_dict = uu.get_user_service_resources_permissions_dict(
user=usr, service=svc, request=request,
inherit_groups_permissions=inherit_groups_perms, resolve_groups_permissions=resolve_groups_perms)
# always allow admin to view full resource tree, unless explicitly requested to be filtered
# otherwise (non-admin), only add details if there is at least one resource permission (any level)
if (is_admin and not filtered_perms) or (svc_perms or res_perms_dict):
json_res[svc.type][svc.resource_name] = sf.format_service_resources(
svc,
db_session=db,
service_perms=svc_perms,
resources_perms_dict=res_perms_dict,
permission_type=perm_type,
show_all_children=False,
show_private_url=False,
)
return json_res
usr_res_dict = ax.evaluate_call(lambda: build_json_user_resource_tree(user),
fallback=lambda: db.rollback(), http_error=HTTPNotFound,
msg_on_fail=s.UserResources_GET_NotFoundResponseSchema.description,
content={"user_name": user.user_name,
"resource_types": [models.Service.resource_type_name]})
return ax.valid_http(http_success=HTTPOk, content={"resources": usr_res_dict},
detail=s.UserResources_GET_OkResponseSchema.description)
@s.UserResourcePermissionsAPI.get(schema=s.UserResourcePermissions_GET_RequestSchema(),
tags=[s.UsersTag, s.PermissionTag], api_security=s.SecurityAuthenticatedAPI,
response_schemas=s.UserResourcePermissions_GET_responses)
@s.LoggedUserResourcePermissionsAPI.get(schema=s.UserResourcePermissions_GET_RequestSchema(),
tags=[s.LoggedUserTag, s.PermissionTag],
api_security=s.SecurityAuthenticatedAPI,
response_schemas=s.LoggedUserResourcePermissions_GET_responses)
@view_config(route_name=s.UserResourcePermissionsAPI.name, request_method="GET", permission=MAGPIE_CONTEXT_PERMISSION)
[docs]
def get_user_resource_permissions_view(request):
"""
List all permissions a user has on a specific resource.
"""
user = ar.get_user_matchdict_checked_or_logged(request)
resource = ar.get_resource_matchdict_checked(request, "resource_id")
inherit_groups_perms = asbool(ar.get_query_param(request, ["inherit", "inherited"]))
resolve_groups_perms = asbool(ar.get_query_param(request, ["resolve", "resolved"]))
effective_perms = asbool(ar.get_query_param(request, "effective"))
return uu.get_user_resource_permissions_response(user, resource, request,
inherit_groups_permissions=inherit_groups_perms,
resolve_groups_permissions=resolve_groups_perms,
effective_permissions=effective_perms)
@s.UserResourcePermissionsAPI.post(schema=s.UserResourcePermissions_POST_RequestSchema,
tags=[s.UsersTag, s.PermissionTag],
response_schemas=s.UserResourcePermissions_POST_responses)
@s.LoggedUserResourcePermissionsAPI.post(schema=s.UserResourcePermissions_POST_RequestSchema,
tags=[s.LoggedUserTag, s.PermissionTag],
response_schemas=s.LoggedUserResourcePermissions_POST_responses)
@view_config(route_name=s.UserResourcePermissionsAPI.name, request_method="POST")
[docs]
def create_user_resource_permissions_view(request):
"""
Create a permission on specific resource for a user.
"""
user = ar.get_user_matchdict_checked_or_logged(request)
uu.check_user_editable(user, request)
resource = ar.get_resource_matchdict_checked(request)
permission = ar.get_permission_multiformat_body_checked(request, resource)
return uu.create_user_resource_permission_response(user, resource, permission, request.db, overwrite=False)
@s.UserResourcePermissionsAPI.put(schema=s.UserResourcePermissions_PUT_RequestSchema,
tags=[s.UsersTag, s.PermissionTag],
response_schemas=s.UserResourcePermissions_PUT_responses)
@s.LoggedUserResourcePermissionsAPI.put(schema=s.UserResourcePermissions_PUT_RequestSchema,
tags=[s.LoggedUserTag, s.PermissionTag],
response_schemas=s.LoggedUserResourcePermissions_PUT_responses)
@view_config(route_name=s.UserResourcePermissionsAPI.name, request_method="PUT")
[docs]
def replace_user_resource_permissions_view(request):
"""
Create or modify an existing permission on a resource for a user.
Can be used to adjust permission modifiers.
"""
user = ar.get_user_matchdict_checked_or_logged(request)
uu.check_user_editable(user, request)
resource = ar.get_resource_matchdict_checked(request)
permission = ar.get_permission_multiformat_body_checked(request, resource)
return uu.create_user_resource_permission_response(user, resource, permission, request.db, overwrite=True)
@s.UserResourcePermissionsAPI.delete(schema=s.UserResourcePermissions_DELETE_RequestSchema,
tags=[s.UsersTag, s.PermissionTag],
response_schemas=s.UserResourcePermissions_DELETE_responses)
@s.LoggedUserResourcePermissionsAPI.delete(schema=s.UserResourcePermissions_DELETE_RequestSchema,
tags=[s.LoggedUserTag, s.PermissionTag],
response_schemas=s.LoggedUserResourcePermissions_DELETE_responses)
@view_config(route_name=s.UserResourcePermissionsAPI.name, request_method="DELETE")
[docs]
def delete_user_resource_permissions_view(request):
"""
Delete a permission from a specific resource for a user (not including his groups permissions).
"""
user = ar.get_user_matchdict_checked_or_logged(request)
uu.check_user_editable(user, request)
resource = ar.get_resource_matchdict_checked(request)
permission = ar.get_permission_multiformat_body_checked(request, resource)
return uu.delete_user_resource_permission_response(user, resource, permission, request.db)
@s.UserResourcePermissionAPI.delete(schema=s.UserResourcePermissionName_DELETE_RequestSchema,
tags=[s.UsersTag, s.PermissionTag],
response_schemas=s.UserResourcePermissionName_DELETE_responses)
@s.LoggedUserResourcePermissionAPI.delete(schema=s.UserResourcePermissionName_DELETE_RequestSchema,
tags=[s.LoggedUserTag, s.PermissionTag],
response_schemas=s.LoggedUserResourcePermissionName_DELETE_responses)
@view_config(route_name=s.UserResourcePermissionAPI.name, request_method="DELETE")
[docs]
def delete_user_resource_permission_name_view(request):
"""
Delete a permission by name from a resource for a user (not including his groups permissions).
"""
user = ar.get_user_matchdict_checked_or_logged(request)
uu.check_user_editable(user, request)
resource = ar.get_resource_matchdict_checked(request)
permission = ar.get_permission_matchdict_checked(request, resource)
return uu.delete_user_resource_permission_response(user, resource, permission, request.db)
@s.UserServicesAPI.get(schema=s.UserServices_GET_RequestSchema,
tags=[s.UsersTag], api_security=s.SecurityAuthenticatedAPI,
response_schemas=s.UserServices_GET_responses)
@s.LoggedUserServicesAPI.get(schema=s.UserServices_GET_RequestSchema,
tags=[s.LoggedUserTag], api_security=s.SecurityAuthenticatedAPI,
response_schemas=s.LoggedUserServices_GET_responses)
@view_config(route_name=s.UserServicesAPI.name, request_method="GET", permission=MAGPIE_CONTEXT_PERMISSION)
[docs]
def get_user_services_view(request):
"""
List all services a user has permissions on.
"""
user = ar.get_user_matchdict_checked_or_logged(request)
cascade_resources = asbool(ar.get_query_param(request, "cascade"))
inherit_groups_perms = asbool(ar.get_query_param(request, ["inherit", "inherited"]))
resolve_groups_perms = asbool(ar.get_query_param(request, ["resolve", "resolved"]))
format_as_list = asbool(ar.get_query_param(request, ["flatten", "flattened", "list"]))
service_types = ar.get_query_param(request, ["type", "types"], default="")
service_types = su.filter_service_types(service_types) # don't use default service types to populate response
svc_json = uu.get_user_services(user, request=request,
cascade_resources=cascade_resources,
inherit_groups_permissions=inherit_groups_perms,
resolve_groups_permissions=resolve_groups_perms,
format_as_list=format_as_list,
service_types=service_types)
return ax.valid_http(http_success=HTTPOk, content={"services": svc_json},
detail=s.UserServices_GET_OkResponseSchema.description)
@s.UserServicePermissionsAPI.get(schema=s.UserServicePermissions_GET_RequestSchema(),
tags=[s.UsersTag], api_security=s.SecurityAuthenticatedAPI,
response_schemas=s.UserServicePermissions_GET_responses)
@s.LoggedUserServicePermissionsAPI.get(schema=s.UserServicePermissions_GET_RequestSchema(),
tags=[s.LoggedUserTag], api_security=s.SecurityAuthenticatedAPI,
response_schemas=s.LoggedUserServicePermissions_GET_responses)
@view_config(route_name=s.UserServicePermissionsAPI.name, request_method="GET", permission=MAGPIE_CONTEXT_PERMISSION)
[docs]
def get_user_service_permissions_view(request):
"""
List all permissions a user has on a service.
"""
user = ar.get_user_matchdict_checked_or_logged(request)
service = ar.get_service_matchdict_checked(request)
inherit_groups_perms = asbool(ar.get_query_param(request, ["inherit", "inherited"]))
resolve_groups_perms = asbool(ar.get_query_param(request, ["resolve", "resolved"]))
perm_type = PermissionType.INHERITED if inherit_groups_perms else PermissionType.DIRECT
perms = ax.evaluate_call(lambda: uu.get_user_service_permissions(service=service, user=user, request=request,
inherit_groups_permissions=inherit_groups_perms,
resolve_groups_permissions=resolve_groups_perms),
fallback=lambda: request.db.rollback(), http_error=HTTPNotFound,
msg_on_fail=s.UserServicePermissions_GET_NotFoundResponseSchema.description,
content={"service_name": str(service.resource_name), "user_name": str(user.user_name)})
return ax.valid_http(http_success=HTTPOk, content=format_permissions(perms, perm_type),
detail=s.UserServicePermissions_GET_OkResponseSchema.description)
@s.UserServicePermissionsAPI.post(schema=s.UserServicePermissions_POST_RequestSchema,
tags=[s.UsersTag, s.PermissionTag],
response_schemas=s.UserServicePermissions_POST_responses)
@s.LoggedUserServicePermissionsAPI.post(schema=s.UserServicePermissions_POST_RequestSchema,
tags=[s.LoggedUserTag, s.PermissionTag],
response_schemas=s.LoggedUserServicePermissions_POST_responses)
@view_config(route_name=s.UserServicePermissionsAPI.name, request_method="POST")
[docs]
def create_user_service_permissions_view(request):
"""
Create a permission on a service for a user.
"""
user = ar.get_user_matchdict_checked_or_logged(request)
uu.check_user_editable(user, request)
service = ar.get_service_matchdict_checked(request)
permission = ar.get_permission_multiformat_body_checked(request, service)
return uu.create_user_resource_permission_response(user, service, permission, request.db, overwrite=False)
@s.UserServicePermissionsAPI.put(schema=s.UserServicePermissions_PUT_RequestSchema,
tags=[s.UsersTag, s.PermissionTag],
response_schemas=s.UserServicePermissions_PUT_responses)
@s.LoggedUserServicePermissionsAPI.put(schema=s.UserServicePermissions_PUT_RequestSchema,
tags=[s.LoggedUserTag, s.PermissionTag],
response_schemas=s.LoggedUserServicePermissions_PUT_responses)
@view_config(route_name=s.UserServicePermissionsAPI.name, request_method="PUT")
[docs]
def replace_user_service_permissions_view(request):
"""
Create or modify an existing permission on a service for a user.
Can be used to adjust permission modifiers.
"""
user = ar.get_user_matchdict_checked_or_logged(request)
uu.check_user_editable(user, request)
service = ar.get_service_matchdict_checked(request)
permission = ar.get_permission_multiformat_body_checked(request, service)
return uu.create_user_resource_permission_response(user, service, permission, request.db, overwrite=True)
@s.UserServicePermissionsAPI.delete(schema=s.UserServicePermissions_DELETE_RequestSchema,
tags=[s.UsersTag, s.PermissionTag],
response_schemas=s.UserServicePermissions_DELETE_responses)
@s.LoggedUserServicePermissionsAPI.delete(schema=s.UserServicePermissions_DELETE_RequestSchema,
tags=[s.LoggedUserTag, s.PermissionTag],
response_schemas=s.LoggedUserServicePermissions_DELETE_responses)
@view_config(route_name=s.UserServicePermissionsAPI.name, request_method="DELETE")
[docs]
def delete_user_service_permissions_view(request):
"""
Delete a permission from a service for a user (not including his groups permissions).
"""
user = ar.get_user_matchdict_checked_or_logged(request)
uu.check_user_editable(user, request)
service = ar.get_service_matchdict_checked(request)
permission = ar.get_permission_multiformat_body_checked(request, service)
return uu.delete_user_resource_permission_response(user, service, permission, request.db)
@s.UserServicePermissionAPI.delete(schema=s.UserServicePermissionName_DELETE_RequestSchema,
tags=[s.UsersTag, s.PermissionTag],
response_schemas=s.UserServicePermissionName_DELETE_responses)
@s.LoggedUserServicePermissionAPI.delete(schema=s.UserServicePermissionName_DELETE_RequestSchema,
tags=[s.LoggedUserTag, s.PermissionTag],
response_schemas=s.LoggedUserServicePermissionName_DELETE_responses)
@view_config(route_name=s.UserServicePermissionAPI.name, request_method="DELETE")
[docs]
def delete_user_service_permission_name_view(request):
"""
Delete a permission by name from a service for a user (not including his groups permissions).
"""
user = ar.get_user_matchdict_checked_or_logged(request)
uu.check_user_editable(user, request)
service = ar.get_service_matchdict_checked(request)
permission = ar.get_permission_matchdict_checked(request, service)
return uu.delete_user_resource_permission_response(user, service, permission, request.db)
@s.UserServiceResourcesAPI.get(schema=s.UserServiceResources_GET_RequestSchema,
tags=[s.UsersTag], api_security=s.SecurityAuthenticatedAPI,
response_schemas=s.UserServiceResources_GET_responses)
@s.LoggedUserServiceResourcesAPI.get(schema=s.UserServiceResources_GET_RequestSchema,
tags=[s.LoggedUserTag], api_security=s.SecurityAuthenticatedAPI,
response_schemas=s.LoggedUserServiceResources_GET_responses)
@view_config(route_name=s.UserServiceResourcesAPI.name, request_method="GET", permission=MAGPIE_CONTEXT_PERMISSION)
[docs]
def get_user_service_resources_view(request):
"""
List all resources under a service a user has permission on.
"""
inherit_groups_perms = asbool(ar.get_query_param(request, ["inherit", "inherited"]))
resolve_groups_perms = asbool(ar.get_query_param(request, ["resolve", "resolved"]))
user = ar.get_user_matchdict_checked_or_logged(request)
service = ar.get_service_matchdict_checked(request)
service_perms = uu.get_user_service_permissions(
user, service, request=request,
inherit_groups_permissions=inherit_groups_perms,
resolve_groups_permissions=resolve_groups_perms)
resources_perms_dict = uu.get_user_service_resources_permissions_dict(
user, service, request=request,
inherit_groups_permissions=inherit_groups_perms,
resolve_groups_permissions=resolve_groups_perms)
user_svc_res_json = sf.format_service_resources(
service=service,
db_session=request.db,
service_perms=service_perms,
resources_perms_dict=resources_perms_dict,
permission_type=PermissionType.INHERITED if inherit_groups_perms else PermissionType.DIRECT,
show_all_children=False,
show_private_url=False,
)
return ax.valid_http(http_success=HTTPOk, content={"service": user_svc_res_json},
detail=s.UserServiceResources_GET_OkResponseSchema.description)