Source code for magpie.api.management.network.node.network_node_views

import jwt
import requests
from pyramid.httpexceptions import (
    HTTPBadRequest,
    HTTPCreated,
    HTTPForbidden,
    HTTPFound,
    HTTPInternalServerError,
    HTTPNotFound,
    HTTPOk
)
from pyramid.security import Authenticated
from pyramid.view import view_config
from six.moves.urllib import parse as up

from magpie import models
from magpie.api import exception as ax
from magpie.api import requests as ar
from magpie.api import schemas as s
from magpie.api.management.network.network_utils import decode_jwt, encode_jwt
from magpie.api.management.network.node.network_node_utils import (
    check_network_node_info,
    create_associated_user_groups,
    delete_network_node,
    load_redirect_uris,
    update_associated_user_groups
)
from magpie.api.requests import check_network_mode_enabled
from magpie.constants import get_constant


@s.NetworkNodesAPI.get(tags=[s.NetworkTag], response_schemas=s.NetworkNodes_GET_responses)
@view_config(route_name=s.NetworkNodesAPI.name, request_method="GET",
             decorator=check_network_mode_enabled, permission=Authenticated)
[docs] def get_network_nodes_view(request): is_admin = False if request.user is not None: admin_group = get_constant("MAGPIE_ADMIN_GROUP", settings_container=request) is_admin = admin_group in [group.group_name for group in request.user.groups] nodes = [n.as_dict() for n in request.db.query(models.NetworkNode).all()] if not is_admin: nodes = [{"name": n["name"]} for n in nodes] return ax.valid_http(http_success=HTTPOk, detail=s.NetworkNodes_GET_OkResponseSchema.description, content={"nodes": nodes})
@s.NetworkNodeAPI.get(tags=[s.NetworkTag], response_schemas=s.NetworkNode_GET_responses) @view_config(route_name=s.NetworkNodeAPI.name, request_method="GET", decorator=check_network_mode_enabled)
[docs] def get_network_node_view(request): node_name = ar.get_value_matchdict_checked(request, "node_name") node = ax.evaluate_call( lambda: request.db.query(models.NetworkNode).filter(models.NetworkNode.name == node_name).one(), http_error=HTTPNotFound, msg_on_fail=s.NetworkNode_NotFoundResponseSchema.description) return ax.valid_http(http_success=HTTPOk, detail=s.NetworkNode_GET_OkResponseSchema.description, content=node.as_dict())
@s.NetworkNodesAPI.post(schema=s.NetworkNode_POST_RequestSchema, tags=[s.NetworkTag], response_schemas=s.NetworkNodes_POST_responses) @view_config(route_name=s.NetworkNodesAPI.name, request_method="POST", decorator=check_network_mode_enabled)
[docs] def post_network_nodes_view(request): required_params = ("name", "base_url", "jwks_url", "token_url", "authorization_url") kwargs = {} for param in required_params: value = ar.get_multiformat_body(request, param, default=None) if value is None: ax.raise_http(http_error=HTTPBadRequest, detail=s.BadRequestResponseSchema.description) kwargs[param] = value redirect_uris = ar.get_multiformat_body(request, "redirect_uris", default=None) if redirect_uris is not None: kwargs["redirect_uris"] = load_redirect_uris(redirect_uris, request) check_network_node_info(request.db, **kwargs) node = models.NetworkNode(**kwargs) request.db.add(node) create_associated_user_groups(node, request) return ax.valid_http(http_success=HTTPCreated, detail=s.NetworkNodes_POST_CreatedResponseSchema.description)
@s.NetworkNodeAPI.patch(schema=s.NetworkNode_PATCH_RequestSchema, tags=[s.NetworkTag], response_schemas=s.NetworkNode_PATCH_responses) @view_config(route_name=s.NetworkNodeAPI.name, request_method="PATCH", decorator=check_network_mode_enabled)
[docs] def patch_network_node_view(request): node_name = ar.get_value_matchdict_checked(request, "node_name") node = ax.evaluate_call( lambda: request.db.query(models.NetworkNode).filter(models.NetworkNode.name == node_name).one(), http_error=HTTPNotFound, msg_on_fail=s.NetworkNode_NotFoundResponseSchema.description) params = ("name", "base_url", "jwks_url", "token_url", "authorization_url", "redirect_uris") kwargs = {} for param in params: param_value = ar.get_multiformat_body(request, param, default=None) if param_value is not None: if param == "redirect_uris": kwargs[param] = load_redirect_uris(param_value, request) else: kwargs[param] = param_value if not kwargs: ax.raise_http(http_error=HTTPBadRequest, detail=s.NetworkNodes_PATCH_BadRequestResponseSchema.description) check_network_node_info(request.db, **kwargs) for attr, value in kwargs.items(): setattr(node, attr, value) update_associated_user_groups(node, node_name, request) return ax.valid_http(http_success=HTTPOk, detail=s.NetworkNode_PATCH_OkResponseSchema.description)
@s.NetworkNodeAPI.delete(tags=[s.NetworkTag], response_schemas=s.NetworkNode_DELETE_responses) @view_config(route_name=s.NetworkNodeAPI.name, request_method="DELETE", decorator=check_network_mode_enabled)
[docs] def delete_network_node_view(request): node_name = ar.get_value_matchdict_checked(request, "node_name") node = ax.evaluate_call( lambda: request.db.query(models.NetworkNode).filter(models.NetworkNode.name == node_name).one(), http_error=HTTPNotFound, msg_on_fail=s.NetworkNode_NotFoundResponseSchema.description) ax.evaluate_call(lambda: delete_network_node(request, node), http_error=HTTPInternalServerError, fallback=lambda: request.db.rollback(), msg_on_fail=s.InternalServerErrorResponseSchema.description) return ax.valid_http(http_success=HTTPOk, detail=s.NetworkNode_DELETE_OkResponseSchema.description)
@s.NetworkNodeTokenAPI.get(tags=[s.NetworkTag], response_schemas=s.NetworkNodeToken_GET_responses) @view_config(route_name=s.NetworkNodeTokenAPI.name, request_method="GET", decorator=check_network_mode_enabled, permission=Authenticated)
[docs] def get_network_node_token_view(request): node_name = ar.get_value_matchdict_checked(request, "node_name") node = ax.evaluate_call( lambda: request.db.query(models.NetworkNode).filter(models.NetworkNode.name == node_name).one(), http_error=HTTPNotFound, msg_on_fail=s.NetworkNode_NotFoundResponseSchema.description) token = encode_jwt({"user_name": request.user.user_name}, node_name, request) access_token = ax.evaluate_call(lambda: requests.post(node.token_url, json={"token": token}, timeout=5).json()["token"], http_error=HTTPInternalServerError, msg_on_fail=s.NetworkNodeToken_GET_InternalServerErrorResponseSchema.description) return ax.valid_http(http_success=HTTPOk, content={"token": access_token}, detail=s.NetworkNodeToken_GET_OkResponseSchema)
@s.NetworkNodeTokenAPI.delete(tags=[s.NetworkTag], response_schemas=s.NetworkNodeToken_DELETE_responses) @view_config(route_name=s.NetworkNodeTokenAPI.name, request_method="DELETE", decorator=check_network_mode_enabled, permission=Authenticated)
[docs] def delete_network_node_token_view(request): node_name = ar.get_value_matchdict_checked(request, "node_name") node = ax.evaluate_call( lambda: request.db.query(models.NetworkNode).filter(models.NetworkNode.name == node_name).one(), http_error=HTTPNotFound, msg_on_fail=s.NetworkNode_NotFoundResponseSchema.description) token = encode_jwt({"user_name": request.user.user_name}, node_name, request) ax.evaluate_call(lambda: requests.delete(node.token_url, json={"token": token}, timeout=5).raise_for_status(), http_error=HTTPInternalServerError, msg_on_fail=s.NetworkNodeToken_DELETE_InternalServerErrorResponseSchema.description) return ax.valid_http(http_success=HTTPOk, detail=s.NetworkNodeToken_DELETE_OkResponseSchema)
@s.NetworkLinkAPI.get(schema=s.NetworkLink_GET_RequestSchema, tags=[s.NetworkTag], response_schemas=s.NetworkLink_GET_responses) @view_config(route_name=s.NetworkLinkAPI.name, request_method="GET", decorator=check_network_mode_enabled, permission=Authenticated) @s.NetworkNodeLinkAPI.post(tags=[s.NetworkTag], response_schemas=s.NetworkNodeLink_POST_responses) @view_config(route_name=s.NetworkNodeLinkAPI.name, request_method="POST", decorator=check_network_mode_enabled, permission=Authenticated)
[docs] def post_network_node_link_view(request): node_name = ar.get_value_matchdict_checked(request, "node_name") node = ax.evaluate_call( lambda: request.db.query(models.NetworkNode).filter(models.NetworkNode.name == node_name).one(), http_error=HTTPNotFound, msg_on_fail=s.NetworkNode_NotFoundResponseSchema.description ) location_tuple = up.urlparse(node.authorization_url) location_query_list = up.parse_qsl(location_tuple.query) location_query_list.extend(( ("token", encode_jwt({"user_name": request.user.user_name}, node.name, request)), ("response_type", "id_token"), ("redirect_uri", request.route_url(s.NetworkLinkAPI.name)) )) location = up.urlunparse(location_tuple._replace(query=up.urlencode(location_query_list, doseq=True))) return ax.valid_http(http_success=HTTPFound, detail=s.NetworkNodeLink_POST_FoundResponseSchema.description, http_kwargs={"location": location})