from typing import TYPE_CHECKING
from pyramid.httpexceptions import (
HTTPBadRequest,
HTTPConflict,
HTTPForbidden,
HTTPNotFound,
HTTPOk,
HTTPUnprocessableEntity
)
from pyramid.settings import asbool
from pyramid.view import view_config
from magpie import models
from magpie.api import exception as ax
from magpie.api import requests as ar
from magpie.api import schemas as s
from magpie.api.management.resource import resource_utils as ru
from magpie.api.management.resource import resource_views as rv
from magpie.api.management.service import service_formats as sf
from magpie.api.management.service import service_utils as su
from magpie.permissions import Permission, PermissionType, format_permissions
from magpie.register import SERVICES_PHOENIX_ALLOWED, sync_services_phoenix
from magpie.services import SERVICE_TYPE_DICT, invalidate_service, service_factory
from magpie.utils import CONTENT_TYPE_JSON
if TYPE_CHECKING:
from typing import List, Optional, Union
from pyramid.request import Request
from sqlalchemy.orm.session import Session
from magpie.typedefs import JSON, AnyResponseType, Str
@s.ServiceTypesAPI.get(tags=[s.ServicesTag], response_schemas=s.ServiceTypes_GET_responses)
@view_config(route_name=s.ServiceTypesAPI.name, request_method="GET")
[docs]
def get_service_types_view(request): # noqa: F811
# type: (Request) -> AnyResponseType
"""
List all available service types.
"""
return ax.valid_http(http_success=HTTPOk, content={"service_types": list(sorted(SERVICE_TYPE_DICT.keys()))},
detail=s.ServiceTypes_GET_OkResponseSchema.description)
@s.ServiceTypeAPI.get(schema=s.ServiceTypes_GET_RequestSchema, tags=[s.ServicesTag],
response_schemas=s.ServiceType_GET_responses)
@view_config(route_name=s.ServiceTypeAPI.name, request_method="GET")
[docs]
def get_services_by_type_view(request):
# type: (Request) -> AnyResponseType
"""
List all registered services from a specific type.
"""
return get_services_runner(request)
@s.ServicesAPI.get(schema=s.Services_GET_RequestSchema, tags=[s.ServicesTag],
response_schemas=s.Services_GET_responses)
@view_config(route_name=s.ServicesAPI.name, request_method="GET")
[docs]
def get_services_view(request):
# type: (Request) -> AnyResponseType
"""
List all registered services.
"""
return get_services_runner(request)
[docs]
def get_services_runner(request):
# type: (Request) -> AnyResponseType
"""
Generates services response format from request conditions.
Obtains the full or filtered list of services categorized by type, or listed as flat list according to request path
and query parameters.
"""
service_type_filter = request.matchdict.get("service_type") # no check because None/empty is for 'all services'
services_as_list = asbool(ar.get_query_param(request, ["flatten", "flattened", "list"], False))
known_service_types = list(SERVICE_TYPE_DICT)
# using '/services?type={}' query (allow many, no error if unknown)
if not service_type_filter:
service_types = ar.get_query_param(request, ["type", "types"], default="")
service_types = su.filter_service_types(service_types, default_services=True)
# using '/services/types/{}' path (error if unknown)
else:
ax.verify_param(service_type_filter, param_compare=known_service_types, is_in=True,
http_error=HTTPBadRequest, msg_on_fail=s.Services_GET_BadRequestResponseSchema.description,
content={"service_type": str(service_type_filter)}, content_type=CONTENT_TYPE_JSON)
service_types = [service_type_filter]
svc_content = [] if services_as_list else {} # type: Union[List[JSON], JSON]
for service_type in service_types:
if service_type not in known_service_types:
continue
services = su.get_services_by_type(service_type, db_session=request.db)
if not services_as_list:
svc_content[service_type] = {}
for service in services:
svc_fmt = sf.format_service(service, show_private_url=True)
if services_as_list:
svc_content.append(svc_fmt) # pylint: disable=E1101
else:
svc_content[service_type][service.resource_name] = svc_fmt
# sort result
if services_as_list:
svc_content = list(sorted(svc_content, key=lambda svc: svc["service_name"]))
else:
svc_content = {svc_type: dict(sorted(svc_items.items())) for svc_type, svc_items in sorted(svc_content.items())}
return ax.valid_http(http_success=HTTPOk, content={"services": svc_content},
detail=s.Services_GET_OkResponseSchema.description)
@s.ServicesAPI.post(schema=s.Services_POST_RequestBodySchema, tags=[s.ServicesTag],
response_schemas=s.Services_POST_responses)
@view_config(route_name=s.ServicesAPI.name, request_method="POST")
[docs]
def register_service_view(request):
# type: (Request) -> AnyResponseType
"""
Registers a new service.
"""
# accomplish basic validations here, create_service will do more field-specific checks
service_name = ar.get_value_multiformat_body_checked(request, "service_name", pattern=ax.SCOPE_REGEX)
service_url = ar.get_value_multiformat_body_checked(request, "service_url", pattern=ax.URL_REGEX)
service_type = ar.get_value_multiformat_body_checked(request, "service_type")
service_push = asbool(ar.get_multiformat_body(request, "service_push", default=False))
service_cfg = ar.get_multiformat_body(request, "configuration")
return su.create_service(service_name, service_type, service_url, service_push, service_cfg, db_session=request.db)
@s.ServiceAPI.patch(schema=s.Service_PATCH_RequestSchema, tags=[s.ServicesTag],
response_schemas=s.Service_PATCH_responses)
@view_config(route_name=s.ServiceAPI.name, request_method="PATCH")
[docs]
def update_service_view(request):
# type: (Request) -> AnyResponseType
"""
Update service information.
"""
service = ar.get_service_matchdict_checked(request)
service_push = asbool(ar.get_multiformat_body(request, "service_push", default=False))
service_impl = service_factory(service, request)
def select_update(new_value, old_value):
# type: (Optional[Str], Optional[Str]) -> Optional[Str]
return new_value if new_value is not None and not new_value == "" else old_value
# None/Empty values are accepted in case of unspecified
cur_svc_name = service.resource_name
svc_name = select_update(ar.get_multiformat_body(request, "service_name"), cur_svc_name)
svc_url = select_update(ar.get_multiformat_body(request, "service_url"), service.url)
# config explicitly provided as None (null) means override (erase)
# to leave it as is, just don't specify the field
old_svc_config = service.configuration
new_svc_config = ar.get_multiformat_body(request, "configuration", {-1})
has_svc_config = not isinstance(new_svc_config, set) # use set() since not a possible JSON type (cannot use None)
ax.verify_param(svc_name, param_compare="types", not_equal=True,
param_name="service_name", http_error=HTTPForbidden,
msg_on_fail=s.Service_PATCH_ForbiddenResponseSchema_ReservedKeyword.description)
ax.verify_param(svc_name == cur_svc_name and svc_url == service.url and
(not service_impl.configurable or not has_svc_config or old_svc_config == new_svc_config),
not_equal=True, param_compare=True, param_name="service_name/service_url",
http_error=HTTPBadRequest, msg_on_fail=s.Service_PATCH_BadRequestResponseSchema.description)
if has_svc_config and old_svc_config != new_svc_config:
# configuration must be an explicit JSON object or None (erase)
if new_svc_config is not None:
ax.verify_param(new_svc_config, param_compare=dict, is_type=True, http_error=HTTPUnprocessableEntity,
msg_on_fail=s.Service_CheckConfig_UnprocessableEntityResponseSchema.description)
service.configuration = new_svc_config
if svc_name != cur_svc_name:
all_services = request.db.query(models.Service)
all_svc_names = [svc.resource_name for svc in all_services]
ax.verify_param(svc_name, not_in=True, param_compare=all_svc_names, with_param=False,
http_error=HTTPConflict, content={"service_name": str(svc_name)},
msg_on_fail=s.Service_PATCH_ConflictResponseSchema.description)
ax.verify_param(svc_name, not_none=True, not_empty=True, matches=True, param_compare=ax.SCOPE_REGEX,
http_error=HTTPBadRequest,
msg_on_fail=s.Service_PATCH_UnprocessableEntityResponseSchema.description)
def update_service_magpie_and_phoenix(_svc, new_name, new_url, svc_push, db_session):
# type: (models.Service, Str, Str, bool, Session) -> None
_svc.resource_name = new_name
_svc.url = new_url
has_getcap = Permission.GET_CAPABILITIES in SERVICE_TYPE_DICT[_svc.type].permissions
if svc_push and _svc.type in SERVICES_PHOENIX_ALLOWED and has_getcap:
# (re)apply getcapabilities to updated service to ensure updated push
su.add_service_getcapabilities_perms(_svc, db_session)
sync_services_phoenix(db_session.query(models.Service)) # push all services
old_svc_content = sf.format_service(service, show_private_url=True)
err_svc_content = {"service": old_svc_content, "new_service_name": svc_name, "new_service_url": svc_url}
ax.evaluate_call(lambda: update_service_magpie_and_phoenix(service, svc_name, svc_url, service_push, request.db),
fallback=lambda: request.db.rollback(),
http_error=HTTPForbidden, msg_on_fail=s.Service_PATCH_ForbiddenResponseSchema.description,
content=err_svc_content)
invalidate_service(cur_svc_name)
return ax.valid_http(
http_success=HTTPOk, detail=s.Service_PATCH_OkResponseSchema.description,
content={"service": sf.format_service(service, show_private_url=True, show_configuration=True)}
)
@s.ServiceAPI.get(tags=[s.ServicesTag], response_schemas=s.Service_GET_responses)
@view_config(route_name=s.ServiceAPI.name, request_method="GET")
[docs]
def get_service_view(request):
# type: (Request) -> AnyResponseType
"""
Get service information.
"""
service = ar.get_service_matchdict_checked(request)
service_info = sf.format_service(service, show_private_url=True,
show_resources_allowed=True, show_configuration=True)
return ax.valid_http(http_success=HTTPOk, detail=s.Service_GET_OkResponseSchema.description,
content={"service": service_info})
@s.ServiceAPI.delete(schema=s.Service_DELETE_RequestSchema, tags=[s.ServicesTag],
response_schemas=s.Service_DELETE_responses)
@view_config(route_name=s.ServiceAPI.name, request_method="DELETE")
[docs]
def unregister_service_view(request):
# type: (Request) -> AnyResponseType
"""
Unregister a service.
"""
service = ar.get_service_matchdict_checked(request)
service_push = asbool(ar.get_multiformat_body(request, "service_push", default=False))
svc_content = sf.format_service(service, show_private_url=True)
svc_res_id = service.resource_id
svc_name = service.resource_name
ax.evaluate_call(lambda: models.RESOURCE_TREE_SERVICE.delete_branch(resource_id=svc_res_id, db_session=request.db),
fallback=lambda: request.db.rollback(), http_error=HTTPForbidden,
msg_on_fail="Delete service from resource tree failed.", content=svc_content)
def remove_service_magpie_and_phoenix(svc, svc_push, db_session):
# type: (models.Service, bool, Session) -> None
db_session.delete(svc)
if svc_push and svc.type in SERVICES_PHOENIX_ALLOWED:
sync_services_phoenix(db_session.query(models.Service))
ax.evaluate_call(lambda: remove_service_magpie_and_phoenix(service, service_push, request.db),
fallback=lambda: request.db.rollback(), http_error=HTTPForbidden,
msg_on_fail=s.Service_DELETE_ForbiddenResponseSchema.description, content=svc_content)
invalidate_service(svc_name)
return ax.valid_http(http_success=HTTPOk, detail=s.Service_DELETE_OkResponseSchema.description)
@s.ServicePermissionsAPI.get(tags=[s.ServicesTag], response_schemas=s.ServicePermissions_GET_responses)
@view_config(route_name=s.ServicePermissionsAPI.name, request_method="GET")
[docs]
def get_service_permissions_view(request):
# type: (Request) -> AnyResponseType
"""
List all applicable permissions for a service.
"""
service = ar.get_service_matchdict_checked(request)
svc_content = sf.format_service(service, show_private_url=True)
svc_perms = ax.evaluate_call(lambda: [p.value for p in SERVICE_TYPE_DICT[service.type].permissions],
fallback=request.db.rollback(), http_error=HTTPBadRequest, content=svc_content,
msg_on_fail=s.ServicePermissions_GET_BadRequestResponseSchema.description)
return ax.valid_http(http_success=HTTPOk, detail=s.ServicePermissions_GET_OkResponseSchema.description,
content=format_permissions(svc_perms, PermissionType.ALLOWED))
@s.ServiceResourceAPI.get(schema=s.ServiceResource_GET_RequestSchema, tags=[s.ServicesTag, s.ResourcesTag],
response_schemas=s.ServiceResource_GET_responses)
@view_config(route_name=s.ServiceResourceAPI.name, request_method="GET")
[docs]
def get_service_resource_view(request):
# type: (Request) -> AnyResponseType
"""
Get resource information under a service.
"""
return rv.get_resource_handler(request)
@s.ServiceResourceAPI.delete(schema=s.ServiceResource_DELETE_RequestSchema, tags=[s.ServicesTag],
response_schemas=s.ServiceResource_DELETE_responses)
@view_config(route_name=s.ServiceResourceAPI.name, request_method="DELETE")
[docs]
def delete_service_resource_view(request):
# type: (Request) -> AnyResponseType
"""
Unregister a resource.
"""
return ru.delete_resource(request)
@s.ServiceResourcesAPI.get(schema=s.ServiceResources_GET_RequestSchema, tags=[s.ServicesTag],
response_schemas=s.ServiceResources_GET_responses)
@view_config(route_name=s.ServiceResourcesAPI.name, request_method="GET")
[docs]
def get_service_resources_view(request):
# type: (Request) -> AnyResponseType
"""
List all resources registered under a service.
"""
service = ar.get_service_matchdict_checked(request)
svc_res_json = sf.format_service_resources(service, db_session=request.db,
show_all_children=True, show_private_url=True)
return ax.valid_http(http_success=HTTPOk, content={svc_res_json["service_name"]: svc_res_json},
detail=s.ServiceResources_GET_OkResponseSchema.description)
@s.ServiceResourcesAPI.post(schema=s.ServiceResources_POST_RequestSchema, tags=[s.ServicesTag],
response_schemas=s.ServiceResources_POST_responses)
@view_config(route_name=s.ServiceResourcesAPI.name, request_method="POST")
[docs]
def create_service_resource_view(request):
# type: (Request) -> AnyResponseType
"""
Register a new resource directly under a service or under one of its children resources.
"""
service = ar.get_service_matchdict_checked(request)
resource_name = ar.get_multiformat_body(request, "resource_name")
resource_display_name = ar.get_multiformat_body(request, "resource_display_name", default=resource_name)
resource_type = ar.get_multiformat_body(request, "resource_type")
parent_id = ar.get_multiformat_body(request, "parent_id") # no check because None/empty is allowed
db_session = request.db
if parent_id is None:
parent_id = service.resource_id
else:
parent_id = ax.evaluate_call(lambda: int(parent_id),
http_error=HTTPUnprocessableEntity,
msg_on_fail=s.ServiceResources_POST_UnprocessableEntityResponseSchema.description)
# validate target service is actually the root service of the provided parent resource ID
root_service = ru.get_resource_root_service_by_id(parent_id, db_session=db_session)
ax.verify_param(root_service, not_none=True, param_name="parent_id",
msg_on_fail=s.ServiceResources_POST_NotFoundResponseSchema.description,
http_error=HTTPNotFound)
ax.verify_param(root_service.resource_id, is_equal=True,
param_compare=service.resource_id, param_name="parent_id",
msg_on_fail=s.ServiceResources_POST_ForbiddenResponseSchema.description,
http_error=HTTPForbidden)
return ru.create_resource(resource_name, resource_display_name, resource_type,
parent_id=parent_id, db_session=db_session)
@s.ServiceTypeResourcesAPI.get(schema=s.ServiceTypeResources_GET_RequestSchema, tags=[s.ServicesTag],
response_schemas=s.ServiceTypeResources_GET_responses)
@view_config(route_name=s.ServiceTypeResourcesAPI.name, request_method="GET")
[docs]
def get_service_type_resources_view(request):
# type: (Request) -> AnyResponseType
"""
List details of resource types supported under a specific service type.
"""
def _get_resource_types_info(res_type_names):
res_type_classes = [rtc for rtn, rtc in models.RESOURCE_TYPE_DICT.items() if rtn in res_type_names]
return [sf.format_service_resource_type(rtc, SERVICE_TYPE_DICT[service_type]) for rtc in res_type_classes]
service_type = ar.get_value_matchdict_checked(request, "service_type")
ax.verify_param(service_type, param_compare=SERVICE_TYPE_DICT.keys(), is_in=True, http_error=HTTPNotFound,
msg_on_fail=s.ServiceTypeResources_GET_NotFoundResponseSchema.description)
resource_types_names = ax.evaluate_call(
lambda: SERVICE_TYPE_DICT[service_type].resource_type_names,
http_error=HTTPForbidden, content={"service_type": str(service_type)},
msg_on_fail=s.ServiceTypeResourceTypes_GET_ForbiddenResponseSchema.description)
return ax.valid_http(http_success=HTTPOk, detail=s.ServiceTypeResourceTypes_GET_OkResponseSchema.description,
content={"resource_types": _get_resource_types_info(resource_types_names)})
@s.ServiceTypeResourceTypesAPI.get(schema=s.ServiceTypeResourceTypes_GET_RequestSchema, tags=[s.ServicesTag],
response_schemas=s.ServiceTypeResourceTypes_GET_responses)
@view_config(route_name=s.ServiceTypeResourceTypesAPI.name, request_method="GET")
[docs]
def get_service_type_resource_types_view(request):
# type: (Request) -> AnyResponseType
"""
List all resource types supported under a specific service type.
"""
service_type = ar.get_value_matchdict_checked(request, "service_type")
ax.verify_param(service_type, param_compare=SERVICE_TYPE_DICT.keys(), is_in=True, http_error=HTTPNotFound,
msg_on_fail=s.ServiceTypeResourceTypes_GET_NotFoundResponseSchema.description)
resource_types = ax.evaluate_call(lambda: SERVICE_TYPE_DICT[service_type].resource_type_names,
http_error=HTTPForbidden, content={"service_type": str(service_type)},
msg_on_fail=s.ServiceTypeResourceTypes_GET_ForbiddenResponseSchema.description)
return ax.valid_http(http_success=HTTPOk, detail=s.ServiceTypeResourceTypes_GET_OkResponseSchema.description,
content={"resource_types": resource_types})