Source code for magpie.api.management.service.service_utils

from typing import TYPE_CHECKING

import six
from pyramid.httpexceptions import (
    HTTPBadRequest,
    HTTPConflict,
    HTTPCreated,
    HTTPForbidden,
    HTTPInternalServerError,
    HTTPUnprocessableEntity
)
from ziggurat_foundations.models.services.group import GroupService
from ziggurat_foundations.models.services.resource import ResourceService

from magpie import models
from magpie.api import exception as ax
from magpie.api import schemas as s
from magpie.api.management.group.group_utils import create_group_resource_permission_response
from magpie.api.management.service.service_formats import format_service
from magpie.constants import get_constant
from magpie.permissions import Permission
from magpie.register import SERVICES_PHOENIX_ALLOWED, sync_services_phoenix
from magpie.services import SERVICE_TYPE_DICT
from magpie.utils import get_logger

if TYPE_CHECKING:
    # pylint: disable=W0611,unused-import
    from typing import Iterable, List, Optional

    from pyramid.httpexceptions import HTTPException
    from sqlalchemy.orm.session import Session

    from magpie.typedefs import JSON, Str

[docs] LOGGER = get_logger(__name__)
[docs] def create_service(service_name, service_type, service_url, service_push, service_config, db_session): # type: (Str, Str, Str, bool, Optional[JSON], Session) -> HTTPException """ Generates an instance to register a new service. """ def _add_service_magpie_and_phoenix(svc, svc_push, db): db.add(svc) if svc_push and svc.type in SERVICES_PHOENIX_ALLOWED: sync_services_phoenix(db.query(models.Service)) # sometimes, resource ID is not updated, fetch the service to obtain it if not svc.resource_id: svc = ax.evaluate_call(lambda: models.Service.by_service_name(service_name, db_session=db_session), fallback=lambda: db_session.rollback(), http_error=HTTPInternalServerError, msg_on_fail=s.Services_POST_InternalServerErrorResponseSchema.description, content={"service_name": str(service_name), "resource_id": svc.resource_id}) ax.verify_param(svc.resource_id, not_none=True, param_compare=int, is_type=True, http_error=HTTPInternalServerError, msg_on_fail=s.Services_POST_InternalServerErrorResponseSchema.description, content={"service_name": str(service_name), "resource_id": svc.resource_id}, param_name="service_name") return svc ax.verify_param(service_type, not_none=True, not_empty=True, is_type=True, param_name="service_type", param_compare=six.string_types, http_error=HTTPBadRequest, msg_on_fail=s.Services_POST_BadRequestResponseSchema.description) ax.verify_param(service_type, is_in=True, param_compare=SERVICE_TYPE_DICT.keys(), param_name="service_type", http_error=HTTPBadRequest, msg_on_fail=s.Services_POST_BadRequestResponseSchema.description) ax.verify_param(service_url, matches=True, param_compare=ax.URL_REGEX, param_name="service_url", http_error=HTTPBadRequest, msg_on_fail=s.Services_POST_Params_BadRequestResponseSchema.description) ax.verify_param(service_name, not_empty=True, not_none=True, matches=True, param_name="service_name", param_compare=ax.SCOPE_REGEX, http_error=HTTPBadRequest, msg_on_fail=s.Services_POST_Params_BadRequestResponseSchema.description) ax.verify_param(models.Service.by_service_name(service_name, db_session=db_session), is_none=True, param_name="service_name", with_param=False, content={"service_name": str(service_name)}, http_error=HTTPConflict, msg_on_fail=s.Services_POST_ConflictResponseSchema.description) if service_config is not None: ax.verify_param(service_config, param_name="configuration", param_compare=dict, is_type=True, http_error=HTTPUnprocessableEntity, msg_on_fail=s.Service_CheckConfig_UnprocessableEntityResponseSchema.description) service = ax.evaluate_call( lambda: models.Service( resource_name=str(service_name), resource_type=models.Service.resource_type_name, configuration=service_config, url=str(service_url), type=str(service_type) ), fallback=lambda: db_session.rollback(), http_error=HTTPForbidden, msg_on_fail=s.Services_POST_UnprocessableEntityResponseSchema.description, content={"service_name": str(service_name), "resource_type": models.Service.resource_type_name, "service_url": str(service_url), "service_type": str(service_type)} ) service = ax.evaluate_call(lambda: _add_service_magpie_and_phoenix(service, service_push, db_session), fallback=lambda: db_session.rollback(), http_error=HTTPForbidden, msg_on_fail=s.Services_POST_ForbiddenResponseSchema.description, content=format_service(service, show_private_url=True)) return ax.valid_http(http_success=HTTPCreated, detail=s.Services_POST_CreatedResponseSchema.description, content={"service": format_service(service, show_private_url=True, show_configuration=True)})
[docs] def get_services_by_type(service_type, db_session): # type: (Str, Session) -> Iterable[models.Service] """ Obtains all services that correspond to requested service-type. """ ax.verify_param(service_type, not_none=True, not_empty=True, http_error=HTTPBadRequest, msg_on_fail="Invalid 'service_type' value '" + str(service_type) + "' specified") services = db_session.query(models.Service).filter(models.Service.type == service_type) return sorted(services, key=lambda svc: svc.resource_name)
[docs] def add_service_getcapabilities_perms(service, db_session, group_name=None): if service.type in SERVICES_PHOENIX_ALLOWED and \ Permission.GET_CAPABILITIES in SERVICE_TYPE_DICT[service.type].permissions: if group_name is None: group_name = get_constant("MAGPIE_ANONYMOUS_USER") group = GroupService.by_group_name(group_name, db_session=db_session) perm = ResourceService.perm_by_group_and_perm_name(service.resource_id, group.id, Permission.GET_CAPABILITIES.value, db_session) if perm is None: # not set, create it create_group_resource_permission_response(group, service, Permission.GET_CAPABILITIES.value, db_session)
[docs] def filter_service_types(service_query, default_services=False): # type: (Optional[Str], bool) -> Optional[List[Str]] """ Obtains all valid case-insensitive service-type names from a filtered comma-separated list. :param service_query: query string or service type(s) comma-separated to parse. :param default_services: specify if the complete list of known service-types must be returned if no query to parse. :returns: parsed service-types if query was provided, or None by default, or all known service-types if requested. """ if service_query: service_types = [ svc_type.lower() for svc_type in service_query.split(",") if svc_type.strip() and svc_type.lower() in SERVICE_TYPE_DICT ] return service_types if default_services: return list(SERVICE_TYPE_DICT) return None