Source code for magpie.api.management.user.user_utils

from magpie.api import exception as ax, schemas as s
from magpie.api.management.service.service_formats import format_service
from magpie.api.management.resource.resource_utils import check_valid_service_or_resource_permission
from magpie.api.management.user import user_formats as uf
from magpie.constants import get_constant
from magpie.definitions.ziggurat_definitions import (
    GroupService,
    UserService,
    ResourceService,
    UserResourcePermissionService,
)
from magpie.definitions.pyramid_definitions import (
    HTTPOk,
    HTTPCreated,
    HTTPBadRequest,
    HTTPForbidden,
    HTTPNotFound,
    HTTPConflict,
    HTTPInternalServerError,
)
from magpie.permissions import convert_permission, format_permissions
from magpie.services import service_factory
from magpie import models
from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from magpie.services import ServiceInterface  # noqa: F401
    from magpie.definitions.pyramid_definitions import Request, HTTPException  # noqa: F401
    from magpie.definitions.sqlalchemy_definitions import Session  # noqa: F401
    from magpie.definitions.typedefs import (  # noqa: F401
        Any, Str, Dict, Iterable, List, Optional, ResourcePermissionType, UserServicesType, ServiceOrResourceType
    )
    from magpie.permissions import Permission  # noqa: F401


[docs]def create_user(user_name, password, email, group_name, db_session): # type: (Str, Optional[Str], Str, Str, Session) -> HTTPException """ Creates a user if it is permitted and not conflicting. Password must be set to `None` if using external identity. Created user will be part of group matching ``group_name`` (can be ``MAGPIE_ANONYMOUS_GROUP`` for minimal access). Furthermore, the user will also *always* be associated with ``MAGPIE_ANONYMOUS_GROUP`` (if not already explicitly requested with ``group_name``) to allow access to resources with public permission. The ``group_name`` **must** be an existing group. :returns: valid HTTP response on successful operation. """ def _get_group(grp_name): # type: (Str) -> models.Group grp = ax.evaluate_call(lambda: GroupService.by_group_name(grp_name, db_session=db_session), httpError=HTTPForbidden, msgOnFail=s.UserGroup_GET_ForbiddenResponseSchema.description) ax.verify_param(grp, notNone=True, httpError=HTTPBadRequest, msgOnFail=s.UserGroup_Check_BadRequestResponseSchema.description) return grp # Check that group already exists group_checked = _get_group(group_name) # Check if user already exists user_checked = ax.evaluate_call(lambda: UserService.by_user_name(user_name=user_name, db_session=db_session), httpError=HTTPForbidden, msgOnFail=s.User_Check_ForbiddenResponseSchema.description) ax.verify_param(user_checked, isNone=True, httpError=HTTPConflict, msgOnFail=s.User_Check_ConflictResponseSchema.description) # Create user with specified name and group to assign # noinspection PyArgumentList new_user = models.User(user_name=user_name, email=email) if password: UserService.set_password(new_user, password) UserService.regenerate_security_code(new_user) ax.evaluate_call(lambda: db_session.add(new_user), fallback=lambda: db_session.rollback(), httpError=HTTPForbidden, msgOnFail=s.Users_POST_ForbiddenResponseSchema.description) # Fetch user to update fields new_user = ax.evaluate_call(lambda: UserService.by_user_name(user_name, db_session=db_session), httpError=HTTPForbidden, msgOnFail=s.UserNew_POST_ForbiddenResponseSchema.description) def _add_to_group(usr, grp): # type: (models.User, models.Group) -> None # noinspection PyArgumentList group_entry = models.UserGroup(group_id=grp.id, user_id=usr.id) ax.evaluate_call(lambda: db_session.add(group_entry), fallback=lambda: db_session.rollback(), httpError=HTTPForbidden, msgOnFail=s.UserGroup_GET_ForbiddenResponseSchema.description) # Assign user to group new_user_groups = [group_name] _add_to_group(new_user, group_checked) # Also add user to anonymous group if not already done anonym_grp_name = get_constant("MAGPIE_ANONYMOUS_GROUP") if group_checked.group_name != anonym_grp_name: _add_to_group(new_user, _get_group(anonym_grp_name)) new_user_groups.append(anonym_grp_name) return ax.valid_http(httpSuccess=HTTPCreated, detail=s.Users_POST_CreatedResponseSchema.description, content={u"user": uf.format_user(new_user, new_user_groups)})
[docs]def create_user_resource_permission_response(user, resource, permission, db_session): # type: (models.User, ServiceOrResourceType, Permission, Session) -> HTTPException """ Creates a permission on a user/resource combination if it is permitted and not conflicting. :returns: valid HTTP response on successful operation. """ check_valid_service_or_resource_permission(permission.value, resource, db_session) resource_id = resource.resource_id existing_perm = UserResourcePermissionService.by_resource_user_and_perm( user_id=user.id, resource_id=resource_id, perm_name=permission.value, db_session=db_session) ax.verify_param(existing_perm, isNone=True, httpError=HTTPConflict, content={u"resource_id": resource_id, u"user_id": user.id, u"permission_name": permission.value}, msgOnFail=s.UserResourcePermissions_POST_ConflictResponseSchema.description) # noinspection PyArgumentList new_perm = models.UserResourcePermission(resource_id=resource_id, user_id=user.id, perm_name=permission.value) usr_res_data = {u"resource_id": resource_id, u"user_id": user.id, u"permission_name": permission.value} ax.verify_param(new_perm, notNone=True, httpError=HTTPForbidden, content={u"resource_id": resource_id, u"user_id": user.id}, msgOnFail=s.UserResourcePermissions_POST_ForbiddenResponseSchema.description) ax.evaluate_call(lambda: db_session.add(new_perm), fallback=lambda: db_session.rollback(), httpError=HTTPForbidden, content=usr_res_data, msgOnFail=s.UserResourcePermissions_POST_ForbiddenResponseSchema.description) return ax.valid_http(httpSuccess=HTTPCreated, content=usr_res_data, detail=s.UserResourcePermissions_POST_CreatedResponseSchema.description)
[docs]def delete_user_resource_permission_response(user, resource, permission, db_session): # type: (models.User, ServiceOrResourceType, Permission, Session) -> HTTPException """ Get validated response on deleted user resource permission. :returns: valid HTTP response on successful operations. :raises HTTPException: error HTTP response of corresponding situation. """ check_valid_service_or_resource_permission(permission.value, resource, db_session) resource_id = resource.resource_id del_perm = UserResourcePermissionService.get(user.id, resource_id, permission.value, db_session) ax.evaluate_call(lambda: db_session.delete(del_perm), fallback=lambda: db_session.rollback(), httpError=HTTPNotFound, msgOnFail=s.UserResourcePermissions_DELETE_NotFoundResponseSchema.description, content={u"resource_id": resource_id, u"user_id": user.id, u"permission_name": permission.value}) return ax.valid_http(httpSuccess=HTTPOk, detail=s.UserResourcePermissions_DELETE_OkResponseSchema.description)
[docs]def get_resource_root_service(resource, request): # type: (models.Resource, Request) -> ServiceInterface """ Retrieves the service class corresponding to the specified resource's root service-resource. """ if resource.resource_type == models.Service.resource_type_name: res_root_svc = resource else: res_root_svc = ResourceService.by_resource_id(resource.root_service_id, db_session=request.db) return service_factory(res_root_svc, request)
[docs]def filter_user_permission(resource_permission_list, user): # type: (List[ResourcePermissionType], models.User) -> Iterable[ResourcePermissionType] """ Retrieves only direct user permissions on resources amongst a list of user/group resource/service permissions. """ def is_user_perm(perm): return perm.group is None and perm.type == u"user" and perm.user.user_name == user.user_name # noinspection PyTypeChecker return filter(is_user_perm, resource_permission_list)
[docs]def get_user_resource_permissions_response(user, resource, request, inherit_groups_permissions=True, effective_permissions=False): # type: (models.User, ServiceOrResourceType, Request, bool, bool) -> HTTPException """ Retrieves user resource permissions with or without inherited group permissions. Alternatively retrieves the effective user resource permissions, where group permissions are implied as `True`. :returns: valid HTTP response on successful operations. :raises HTTPException: error HTTP response of corresponding situation. """ db_session = request.db def get_usr_res_perms(): if resource.owner_user_id == user.id: res_perm_list = models.RESOURCE_TYPE_DICT[resource.type].permissions else: if effective_permissions: svc = get_resource_root_service(resource, request) res_perm_list = svc.effective_permissions(resource, user) else: res_perm_list = ResourceService.perms_for_user(resource, user, db_session=db_session) if not inherit_groups_permissions: res_perm_list = filter_user_permission(res_perm_list, user) return format_permissions(res_perm_list) perm_names = ax.evaluate_call( lambda: get_usr_res_perms(), fallback=lambda: db_session.rollback(), httpError=HTTPInternalServerError, msgOnFail=s.UserServicePermissions_GET_NotFoundResponseSchema.description, content={u"resource_name": str(resource.resource_name), u"user_name": str(user.user_name)}) return ax.valid_http(httpSuccess=HTTPOk, content={u"permission_names": perm_names}, detail=s.UserResourcePermissions_GET_OkResponseSchema.description)
[docs]def get_user_services(user, request, cascade_resources=False, inherit_groups_permissions=False, format_as_list=False): # type: (models.User, Request, bool, bool, bool) -> UserServicesType """ Returns services by type with corresponding services by name containing sub-dict information. :param user: user for which to find services :param request: request with database session connection :param cascade_resources: If `False`, return only services with *Direct* user permissions on their corresponding service-resource. Otherwise, return every service that has at least one sub-resource with user permissions. :param inherit_groups_permissions: If `False`, return only user-specific service/sub-resources permissions. Otherwise, resolve inherited permissions using all groups the user is member of. :param format_as_list: returns as list of service dict information (not grouped by type and by name) :return: only services which the user as *Direct* or *Inherited* permissions, according to `inherit_from_resources` :rtype: dict of services by type with corresponding services by name containing sub-dict information, unless `format_as_list` is `True` """ db_session = request.db resource_type = None if cascade_resources else [models.Service.resource_type] res_perm_dict = get_user_resources_permissions_dict(user, resource_types=resource_type, request=request, inherit_groups_permissions=inherit_groups_permissions) services = {} for resource_id, perms in res_perm_dict.items(): resource = ResourceService.by_resource_id(resource_id=resource_id, db_session=db_session) service_id = resource.root_service_id or resource.resource_id is_service = resource.resource_type == models.Service.resource_type_name if not is_service: if not cascade_resources: continue else: perms = get_resource_root_service(resource, request).permissions svc = db_session.query(models.Service).filter_by(resource_id=service_id).first() if svc.type not in services: services[svc.type] = {} if svc.resource_name not in services[svc.type]: services[svc.type][svc.resource_name] = format_service(svc, perms, show_private_url=False) if not format_as_list: return services services_list = list() for svc_type in services: for svc_name in services[svc_type]: services_list.append(services[svc_type][svc_name]) return services_list
[docs]def get_user_service_permissions(user, service, request, inherit_groups_permissions=True): # type: (models.User, models.Service, Request, bool) -> List[Permission] if service.owner_user_id == user.id: usr_svc_perms = service_factory(service, request).permissions else: usr_svc_perms = ResourceService.perms_for_user(service, user, db_session=request.db) if not inherit_groups_permissions: usr_svc_perms = filter_user_permission(usr_svc_perms, user) return [convert_permission(p) for p in usr_svc_perms]
[docs]def get_user_resources_permissions_dict(user, request, resource_types=None, resource_ids=None, inherit_groups_permissions=True): # type: (models.User, Request, Optional[List[Str]], Optional[List[int]], bool) -> Dict[Str, Any] """ Creates a dictionary of resources by id with corresponding permissions of the user. :param user: user for which to find services :param request: request with database session connection :param resource_types: filter the search query with specified resource types :param resource_ids: filter the search query with specified resource ids :param inherit_groups_permissions: If `False`, return only user-specific resource permissions. Otherwise, resolve inherited permissions using all groups the user is member of. :return: only services which the user as *Direct* or *Inherited* permissions, according to `inherit_from_resources` """ ax.verify_param(user, notNone=True, httpError=HTTPNotFound, msgOnFail=s.UserResourcePermissions_GET_NotFoundResponseSchema.description) res_perm_tuple_list = UserService.resources_with_possible_perms( user, resource_ids=resource_ids, resource_types=resource_types, db_session=request.db) if not inherit_groups_permissions: res_perm_tuple_list = filter_user_permission(res_perm_tuple_list, user) resources_permissions_dict = {} for res_perm in res_perm_tuple_list: if res_perm.resource.resource_id not in resources_permissions_dict: resources_permissions_dict[res_perm.resource.resource_id] = [res_perm.perm_name] else: resources_permissions_dict[res_perm.resource.resource_id].append(res_perm.perm_name) # remove any duplicates that could be incorporated by multiple groups for res_id in resources_permissions_dict: resources_permissions_dict[res_id] = sorted(set(resources_permissions_dict[res_id])) return resources_permissions_dict
[docs]def get_user_service_resources_permissions_dict(user, service, request, inherit_groups_permissions=True): # type: (models.User, models.Service, Request, bool) -> Dict[Str, Any] resources_under_service = models.resource_tree_service.from_parent_deeper(parent_id=service.resource_id, db_session=request.db) resource_ids = [resource.Resource.resource_id for resource in resources_under_service] return get_user_resources_permissions_dict(user, request, resource_types=None, resource_ids=resource_ids, inherit_groups_permissions=inherit_groups_permissions)
[docs]def check_user_info(user_name, email, password, group_name): # type: (Str, Str, Str, Str) -> None ax.verify_param(user_name, notNone=True, notEmpty=True, httpError=HTTPBadRequest, paramName=u"user_name", msgOnFail=s.Users_CheckInfo_Name_BadRequestResponseSchema.description) ax.verify_param(len(user_name), isIn=True, httpError=HTTPBadRequest, paramName=u"user_name", paramCompare=range(1, 1 + get_constant("MAGPIE_USER_NAME_MAX_LENGTH")), msgOnFail=s.Users_CheckInfo_Size_BadRequestResponseSchema.description) ax.verify_param(user_name, paramCompare=get_constant("MAGPIE_LOGGED_USER"), notEqual=True, paramName=u"user_name", httpError=HTTPBadRequest, msgOnFail=s.Users_CheckInfo_ReservedKeyword_BadRequestResponseSchema.description) ax.verify_param(email, notNone=True, notEmpty=True, httpError=HTTPBadRequest, paramName=u"email", msgOnFail=s.Users_CheckInfo_Email_BadRequestResponseSchema.description) ax.verify_param(password, notNone=True, notEmpty=True, httpError=HTTPBadRequest, paramName=u"password", msgOnFail=s.Users_CheckInfo_Password_BadRequestResponseSchema.description) ax.verify_param(group_name, notNone=True, notEmpty=True, httpError=HTTPBadRequest, paramName=u"group_name", msgOnFail=s.Users_CheckInfo_GroupName_BadRequestResponseSchema.description)
[docs]def get_user_groups_checked(request, user): # type: (Request, models.User) -> List[Str] ax.verify_param(user, notNone=True, httpError=HTTPNotFound, msgOnFail=s.Groups_CheckInfo_NotFoundResponseSchema.description) group_names = ax.evaluate_call(lambda: [group.group_name for group in user.groups], fallback=lambda: request.db.rollback(), httpError=HTTPForbidden, msgOnFail=s.Groups_CheckInfo_ForbiddenResponseSchema.description) return sorted(group_names)