Source code for magpie.api.management.network.node.network_node_utils

import json
from typing import TYPE_CHECKING

import six
from pyramid.httpexceptions import HTTPBadRequest, HTTPConflict

from magpie import models
from magpie.api import exception as ax
from magpie.api import schemas as s
from magpie.api.exception import URL_REGEX
from magpie.cli.register_defaults import register_user_with_group
from magpie.constants import get_constant

if TYPE_CHECKING:
    from pyramid.request import Request

    from magpie.typedefs import JSON, AnyRequestType, List, Optional, Session, Str

[docs] NAME_REGEX = r"^[\w-]+$"
[docs] def create_associated_user_groups(new_node, request): # type: (models.NetworkNode, Request) -> None """ Creates an associated anonymous user and group for the newly created ``new_node``. This will also create the network group (named ``MAGPIE_NETWORK_GROUP_NAME``) if it does not yet exist. """ name = new_node.anonymous_user_name() # create an anonymous user and group for this network node register_user_with_group(user_name=name, group_name=name, email=get_constant("MAGPIE_NETWORK_ANONYMOUS_EMAIL_FORMAT").format(new_node.name), password=None, # autogen, value doesn't matter as no login applicable, just make it valid db_session=request.db) group = models.GroupService.by_group_name(name, db_session=request.db) group.description = "Group for users who have accounts on the networked Magpie instance named '{}'.".format( new_node.name) group.discoverable = False # add the anonymous user to a group for all users in the network (from nodes other than this one). register_user_with_group(user_name=name, group_name=get_constant("MAGPIE_NETWORK_GROUP_NAME"), email=get_constant("MAGPIE_NETWORK_ANONYMOUS_EMAIL_FORMAT").format(new_node.name), password=None, db_session=request.db) group = models.GroupService.by_group_name(get_constant("MAGPIE_NETWORK_GROUP_NAME"), db_session=request.db) group.description = "Group for users who have accounts on a different Magpie instance on this network." group.discoverable = False
[docs] def update_associated_user_groups(node, old_node_name, request): # type: (models.NetworkNode, Str, Request) -> None """ If the ``NetworkNode`` name has changed, update the names of the associated anonymous user and group to match. """ if node.name != old_node_name: old_anonymous_name = models.NetworkNode.anonymous_user_name_formatter(old_node_name) anonymous_user = request.db.query(models.User).filter(models.User.user_name == old_anonymous_name).one() anonymous_group = request.db.query(models.Group).filter(models.Group.group_name == old_anonymous_name).one() anonymous_user.user_name = node.anonymous_user_name() anonymous_user.email = get_constant("MAGPIE_NETWORK_ANONYMOUS_EMAIL_FORMAT").format(node.name) anonymous_group.group_name = node.anonymous_user_name()
[docs] def delete_network_node(request, node): # type: (Request, Str) -> None """ Delete a NetworkNode and the associated anonymous user and group. """ request.db.delete(node.anonymous_user(request.db)) request.db.delete(node.anonymous_group(request.db)) request.db.delete(node)
[docs] def check_network_node_info(db_session=None, # type: Optional[Session] name=None, # type: Optional[Str] base_url=None, # type: Optional[Str] jwks_url=None, # type: Optional[Str] token_url=None, # type: Optional[Str] authorization_url=None, # type: Optional[Str] redirect_uris=None # type: Optional[List[Str]] ): # type: (...) -> None """ Check that the parameters used to create a new ``NetworkNode`` or update an existing one are well-formed. """ if name is not None: ax.verify_param(name, matches=True, param_name="name", param_compare=NAME_REGEX, http_error=HTTPBadRequest, msg_on_fail=s.NetworkNodes_CheckInfo_NameValue_BadRequestResponseSchema.description) ax.verify_param(name, not_in=True, param_name="name", param_compare=[n.name for n in db_session.query(models.NetworkNode)], http_error=HTTPConflict, msg_on_fail=s.NetworkNodes_CheckInfo_NameValue_ConflictResponseSchema.description) if base_url is not None: ax.verify_param(base_url, matches=True, param_name="base_url", param_compare=URL_REGEX, http_error=HTTPBadRequest, msg_on_fail=s.NetworkNodes_CheckInfo_BaseURLValue_BadRequestResponseSchema.description) if jwks_url is not None: ax.verify_param(jwks_url, matches=True, param_name="jwks_url", param_compare=URL_REGEX, http_error=HTTPBadRequest, msg_on_fail=s.NetworkNodes_CheckInfo_JWKSURLValue_BadRequestResponseSchema.description) if token_url is not None: ax.verify_param(token_url, matches=True, param_name="token_url", param_compare=URL_REGEX, http_error=HTTPBadRequest, msg_on_fail=s.NetworkNodes_CheckInfo_TokenURLValue_BadRequestResponseSchema.description) if authorization_url is not None: ax.verify_param(authorization_url, matches=True, param_name="authorization_url", param_compare=URL_REGEX, http_error=HTTPBadRequest, msg_on_fail=s.NetworkNodes_CheckInfo_AuthorizationURLValue_BadRequestResponseSchema.description) if redirect_uris is not None: for uri in redirect_uris: ax.verify_param(uri, matches=True, param_name="redirect_uris", param_compare=URL_REGEX, http_error=HTTPBadRequest, msg_on_fail=s.NetworkNodes_CheckInfo_RedirectURIsValue_BadRequestResponseSchema.description)
[docs] def load_redirect_uris(uris, request): # type: (JSON, AnyRequestType) -> List[Str] """ If the uris are a string type, load them as a JSON into a list and return the list. """ if isinstance(uris, six.string_types): return ax.evaluate_call( lambda: json.loads(uris), http_error=HTTPBadRequest, fallback=lambda: request.db.rollback(), msg_on_fail=s.NetworkNodes_CheckInfo_RedirectURIsValue_BadRequestResponseSchema.description ) return uris