Source code for magpie.adapter.magpieowssecurity

from magpie.api.exception import evaluate_call, verify_param
from magpie.api.schemas import ProviderSigninAPI
from magpie.constants import get_constant
from magpie.definitions.pyramid_definitions import (
    HTTPOk,
    HTTPNotFound,
    HTTPForbidden,
    IAuthenticationPolicy,
    IAuthorizationPolicy,
    asbool,
)
from magpie.definitions.twitcher_definitions import (
    OWSSecurityInterface,
    OWSAccessForbidden,
    parse_service_name,
)
from magpie.models import Service
from magpie.permissions import Permission
from magpie.services import service_factory
from magpie.utils import get_magpie_url, get_settings, get_logger, CONTENT_TYPE_JSON
from requests.cookies import RequestsCookieJar
from six.moves.urllib.parse import urlparse
import requests
[docs]LOGGER = get_logger("TWITCHER")
[docs]class MagpieOWSSecurity(OWSSecurityInterface): def __init__(self, request): super(MagpieOWSSecurity, self).__init__() self.magpie_url = get_magpie_url(request) self.settings = get_settings(request) self.twitcher_ssl_verify = asbool(self.settings.get("twitcher.ows_proxy_ssl_verify", True)) self.twitcher_protected_path = self.settings.get("twitcher.ows_proxy_protected_path", "/ows")
[docs] def check_request(self, request): if request.path.startswith(self.twitcher_protected_path): service_name = parse_service_name(request.path, self.twitcher_protected_path) service = evaluate_call(lambda: Service.by_service_name(service_name, db_session=request.db), fallback=lambda: request.db.rollback(), httpError=HTTPForbidden, msgOnFail="Service query by name refused by db.") verify_param(service, notNone=True, httpError=HTTPNotFound, msgOnFail="Service name not found in db.") # return a specific type of service, ex: ServiceWPS with all the acl (loaded according to the service_type) service_specific = service_factory(service, request) # should contain all the acl, this the only thing important # parse request (GET/POST) to get the permission requested for that service permission_requested = service_specific.permission_requested() # convert permission enum to str for comparison permission_requested = Permission.get(permission_requested).value if permission_requested else None if permission_requested: LOGGER.info('"{0}" request "{1}" permission on "{2}"'.format(request.user, permission_requested, request.path)) self.update_request_cookies(request) authn_policy = request.registry.queryUtility(IAuthenticationPolicy) authz_policy = request.registry.queryUtility(IAuthorizationPolicy) principals = authn_policy.effective_principals(request) has_permission = authz_policy.permits(service_specific, principals, permission_requested) LOGGER.debug("{} - AUTHN policy configurations:".format(type(self).__name__)) base_attr = [attr for attr in dir(authn_policy.cookie) if not attr.startswith("_")] for attr_name in base_attr: LOGGER.debug(" {}: {}".format(attr_name, getattr(authn_policy.cookie, attr_name))) if not has_permission: raise OWSAccessForbidden("Not authorized to access this resource. " + "User does not meet required permissions.")
[docs] def update_request_cookies(self, request): """ Ensure login of the user and update the request cookies if Twitcher is in a special configuration. Only update if `MAGPIE_COOKIE_NAME` is missing and is retrievable from `access_token` in `Authorization` header. Counter-validate the login procedure by calling Magpie's `/session` which should indicated a logged user. """ token_name = get_constant("MAGPIE_COOKIE_NAME", settings_name=request.registry.settings) if "Authorization" in request.headers and token_name not in request.cookies: magpie_prov = request.params.get("provider", "WSO2") magpie_path = ProviderSigninAPI.path.format(provider_name=magpie_prov) magpie_auth = "{}{}".format(self.magpie_url, magpie_path) headers = dict(request.headers) headers.update({"Homepage-Route": "/session", "Accept": CONTENT_TYPE_JSON}) session_resp = requests.get(magpie_auth, headers=headers, verify=self.twitcher_ssl_verify) if session_resp.status_code != HTTPOk.code: raise OWSAccessForbidden("Not authorized to access this resource. " + "Provider login failed with following reason: [{}]." .format(session_resp.reason)) # use specific domain to differentiate between `.{hostname}` and `{hostname}` variations if applicable # noinspection PyProtectedMember request_cookies = session_resp.request._cookies magpie_cookies = list(filter(lambda cookie: cookie.name == token_name, request_cookies)) magpie_domain = urlparse(self.magpie_url).hostname if len(magpie_cookies) > 1 else None session_cookies = RequestsCookieJar.get(request_cookies, token_name, domain=magpie_domain) if not session_resp.json().get("authenticated") or not session_cookies: raise OWSAccessForbidden("Not authorized to access this resource. " + "Session authentication could not be verified.") request.cookies.update({token_name: session_cookies})