Source code for magpie.adapter.magpieowssecurity
from magpie.api.exception import evaluate_call, verify_param
from magpie.api.schemas import ProviderSigninAPI
from magpie.constants import get_constant
from magpie.definitions.pyramid_definitions import (
HTTPOk,
HTTPNotFound,
HTTPForbidden,
IAuthenticationPolicy,
IAuthorizationPolicy,
asbool,
)
from magpie.definitions.twitcher_definitions import (
OWSSecurityInterface,
OWSAccessForbidden,
parse_service_name,
)
from magpie.models import Service
from magpie.permissions import Permission
from magpie.services import service_factory
from magpie.utils import get_magpie_url, get_settings, get_logger, CONTENT_TYPE_JSON
from requests.cookies import RequestsCookieJar
from six.moves.urllib.parse import urlparse
import requests
[docs]LOGGER = get_logger("TWITCHER")
[docs]class MagpieOWSSecurity(OWSSecurityInterface):
def __init__(self, request):
super(MagpieOWSSecurity, self).__init__()
self.magpie_url = get_magpie_url(request)
self.settings = get_settings(request)
self.twitcher_ssl_verify = asbool(self.settings.get("twitcher.ows_proxy_ssl_verify", True))
self.twitcher_protected_path = self.settings.get("twitcher.ows_proxy_protected_path", "/ows")
[docs] def check_request(self, request):
if request.path.startswith(self.twitcher_protected_path):
service_name = parse_service_name(request.path, self.twitcher_protected_path)
service = evaluate_call(lambda: Service.by_service_name(service_name, db_session=request.db),
fallback=lambda: request.db.rollback(),
httpError=HTTPForbidden, msgOnFail="Service query by name refused by db.")
verify_param(service, notNone=True, httpError=HTTPNotFound, msgOnFail="Service name not found in db.")
# return a specific type of service, ex: ServiceWPS with all the acl (loaded according to the service_type)
service_specific = service_factory(service, request)
# should contain all the acl, this the only thing important
# parse request (GET/POST) to get the permission requested for that service
permission_requested = service_specific.permission_requested()
# convert permission enum to str for comparison
permission_requested = Permission.get(permission_requested).value if permission_requested else None
if permission_requested:
LOGGER.info('"{0}" request "{1}" permission on "{2}"'.format(request.user, permission_requested, request.path))
self.update_request_cookies(request)
authn_policy = request.registry.queryUtility(IAuthenticationPolicy)
authz_policy = request.registry.queryUtility(IAuthorizationPolicy)
principals = authn_policy.effective_principals(request)
has_permission = authz_policy.permits(service_specific, principals, permission_requested)
LOGGER.debug("{} - AUTHN policy configurations:".format(type(self).__name__))
base_attr = [attr for attr in dir(authn_policy.cookie) if not attr.startswith("_")]
for attr_name in base_attr:
LOGGER.debug(" {}: {}".format(attr_name, getattr(authn_policy.cookie, attr_name)))
if not has_permission:
raise OWSAccessForbidden("Not authorized to access this resource. " +
"User does not meet required permissions.")
[docs] def update_request_cookies(self, request):
"""
Ensure login of the user and update the request cookies if Twitcher is in a special configuration.
Only update if `MAGPIE_COOKIE_NAME` is missing and is retrievable from `access_token` in `Authorization` header.
Counter-validate the login procedure by calling Magpie's `/session` which should indicated a logged user.
"""
token_name = get_constant("MAGPIE_COOKIE_NAME", settings_name=request.registry.settings)
if "Authorization" in request.headers and token_name not in request.cookies:
magpie_prov = request.params.get("provider", "WSO2")
magpie_path = ProviderSigninAPI.path.format(provider_name=magpie_prov)
magpie_auth = "{}{}".format(self.magpie_url, magpie_path)
headers = dict(request.headers)
headers.update({"Homepage-Route": "/session", "Accept": CONTENT_TYPE_JSON})
session_resp = requests.get(magpie_auth, headers=headers, verify=self.twitcher_ssl_verify)
if session_resp.status_code != HTTPOk.code:
raise OWSAccessForbidden("Not authorized to access this resource. " +
"Provider login failed with following reason: [{}]."
.format(session_resp.reason))
# use specific domain to differentiate between `.{hostname}` and `{hostname}` variations if applicable
# noinspection PyProtectedMember
request_cookies = session_resp.request._cookies
magpie_cookies = list(filter(lambda cookie: cookie.name == token_name, request_cookies))
magpie_domain = urlparse(self.magpie_url).hostname if len(magpie_cookies) > 1 else None
session_cookies = RequestsCookieJar.get(request_cookies, token_name, domain=magpie_domain)
if not session_resp.json().get("authenticated") or not session_cookies:
raise OWSAccessForbidden("Not authorized to access this resource. " +
"Session authentication could not be verified.")
request.cookies.update({token_name: session_cookies})