from magpie.api.exception import evaluate_call, verify_param
from magpie.api import schemas as s
from magpie.constants import get_constant
from magpie.definitions import ziggurat_definitions as zig
from magpie.definitions.pyramid_definitions import (
IAuthenticationPolicy,
HTTPBadRequest,
HTTPForbidden,
HTTPNotFound,
HTTPUnprocessableEntity,
HTTPInternalServerError,
)
from magpie.utils import CONTENT_TYPE_JSON
from magpie import models
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from magpie.definitions.pyramid_definitions import Request # noqa: F401
from magpie.definitions.typedefs import Any, Str, Optional, ServiceOrResourceType # noqa: F401
from magpie.permissions import Permission # noqa: F401
[docs]def get_request_method_content(request):
# 'request' object stores GET content into 'GET' property, while other methods are in 'POST' property
method_property = "GET" if request.method == "GET" else "POST"
return getattr(request, method_property)
[docs]def get_multiformat_post(request, key, default=None):
return get_multiformat_any(request, key, default)
[docs]def get_permission_multiformat_post_checked(request, service_or_resource, permission_name_key="permission_name"):
# type: (Request, ServiceOrResourceType, Str) -> Permission
"""
Retrieves the permission from the body and validates that it is allowed for the specified `service` or `resource`.
"""
# import here to avoid circular import error with undefined functions between (api_request, resource_utils)
from magpie.api.management.resource.resource_utils import check_valid_service_or_resource_permission
perm_name = get_value_multiformat_post_checked(request, permission_name_key)
return check_valid_service_or_resource_permission(perm_name, service_or_resource, request.db)
[docs]def get_value_multiformat_post_checked(request, key, default=None):
val = get_multiformat_any(request, key, default=default)
verify_param(val, notNone=True, notEmpty=True, httpError=HTTPUnprocessableEntity,
paramName=key, msgOnFail=s.UnprocessableEntityResponseSchema.description)
return val
[docs]def get_user(request, user_name_or_token=None):
# type: (Request, Optional[Str]) -> models.User
logged_user_name = get_constant("MAGPIE_LOGGED_USER")
if user_name_or_token is None:
user_name_or_token = logged_user_name
if user_name_or_token == logged_user_name:
curr_user = request.user
if curr_user:
return curr_user
else:
anonymous_user = get_constant("MAGPIE_ANONYMOUS_USER")
anonymous = evaluate_call(lambda: zig.UserService.by_user_name(anonymous_user, db_session=request.db),
fallback=lambda: request.db.rollback(), httpError=HTTPForbidden,
msgOnFail=s.User_CheckAnonymous_ForbiddenResponseSchema.description)
verify_param(anonymous, notNone=True, httpError=HTTPNotFound,
msgOnFail=s.User_CheckAnonymous_NotFoundResponseSchema.description)
return anonymous
else:
authn_policy = request.registry.queryUtility(IAuthenticationPolicy)
principals = authn_policy.effective_principals(request)
admin_group = zig.GroupService.by_group_name(get_constant("MAGPIE_ADMIN_GROUP"), db_session=request.db)
admin_principal = "group:{}".format(admin_group.id)
if admin_principal not in principals:
raise HTTPForbidden()
user = evaluate_call(lambda: zig.UserService.by_user_name(user_name_or_token, db_session=request.db),
fallback=lambda: request.db.rollback(),
httpError=HTTPForbidden, msgOnFail=s.User_GET_ForbiddenResponseSchema.description)
verify_param(user, notNone=True, httpError=HTTPNotFound,
msgOnFail=s.User_GET_NotFoundResponseSchema.description)
return user
[docs]def get_user_matchdict_checked_or_logged(request, user_name_key="user_name"):
logged_user_name = get_constant("MAGPIE_LOGGED_USER")
logged_user_path = s.UserAPI.path.replace("{" + user_name_key + "}", logged_user_name)
if user_name_key not in request.matchdict and request.path_info.startswith(logged_user_path):
return get_user(request, logged_user_name)
return get_user_matchdict_checked(request, user_name_key)
[docs]def get_user_matchdict_checked(request, user_name_key="user_name"):
user_name = get_value_matchdict_checked(request, user_name_key)
return get_user(request, user_name)
[docs]def get_group_matchdict_checked(request, group_name_key="group_name"):
group_name = get_value_matchdict_checked(request, group_name_key)
group = evaluate_call(lambda: zig.GroupService.by_group_name(group_name, db_session=request.db),
fallback=lambda: request.db.rollback(), httpError=HTTPForbidden,
msgOnFail=s.Group_MatchDictCheck_ForbiddenResponseSchema.description)
verify_param(group, notNone=True, httpError=HTTPNotFound,
msgOnFail=s.Group_MatchDictCheck_NotFoundResponseSchema.description)
return group
[docs]def get_resource_matchdict_checked(request, resource_name_key="resource_id"):
# type: (Request, Str) -> models.Resource
resource_id = get_value_matchdict_checked(request, resource_name_key)
resource_id = evaluate_call(lambda: int(resource_id), httpError=HTTPBadRequest,
msgOnFail=s.Resource_MatchDictCheck_BadRequestResponseSchema.description)
resource = evaluate_call(lambda: zig.ResourceService.by_resource_id(resource_id, db_session=request.db),
fallback=lambda: request.db.rollback(), httpError=HTTPForbidden,
msgOnFail=s.Resource_MatchDictCheck_ForbiddenResponseSchema.description)
verify_param(resource, notNone=True, httpError=HTTPNotFound,
msgOnFail=s.Resource_MatchDictCheck_NotFoundResponseSchema.description)
return resource
[docs]def get_service_matchdict_checked(request, service_name_key="service_name"):
service_name = get_value_matchdict_checked(request, service_name_key)
service = evaluate_call(lambda: models.Service.by_service_name(service_name, db_session=request.db),
fallback=lambda: request.db.rollback(), httpError=HTTPForbidden,
msgOnFail=s.Service_MatchDictCheck_ForbiddenResponseSchema.description)
verify_param(service, notNone=True, httpError=HTTPNotFound, content={u'service_name': service_name},
msgOnFail=s.Service_MatchDictCheck_NotFoundResponseSchema.description)
return service
[docs]def get_permission_matchdict_checked(request, service_or_resource, permission_name_key="permission_name"):
# type: (Request, models.Resource, Str) -> Permission
"""
Obtains the `permission` specified in the ``request`` path and validates that it is allowed for the specified
``service_or_resource`` which can be a `service` or a children `resource`.
Allowed permissions correspond to the direct `service` permissions or restrained permissions of the `resource`
under its root `service`.
:returns: found permission name if valid for the service/resource
"""
# import here to avoid circular import error with undefined functions between (api_request, resource_utils)
from magpie.api.management.resource.resource_utils import check_valid_service_or_resource_permission
perm_name = get_value_matchdict_checked(request, permission_name_key)
return check_valid_service_or_resource_permission(perm_name, service_or_resource, request.db)
[docs]def get_value_matchdict_checked(request, key):
val = request.matchdict.get(key)
verify_param(val, notNone=True, notEmpty=True, httpError=HTTPUnprocessableEntity,
paramName=key, msgOnFail=s.UnprocessableEntityResponseSchema.description)
return val
[docs]def get_query_param(request, case_insensitive_key, default=None):
# type: (Request, Str, Optional[Any]) -> Any
"""
Retrieves a query string value by name (case insensitive), or returns the default if not present.
"""
for p in request.params:
if p.lower() == case_insensitive_key:
return request.params.get(p)
return default