Source code for magpie.adapter

import logging
import time
from typing import TYPE_CHECKING

import requests
import six
from pyramid.authentication import IAuthenticationPolicy
from pyramid.httpexceptions import HTTPForbidden, HTTPOk
from pyramid_beaker import set_cache_regions_from_settings
from import UserService

from magpie import __meta__
from magpie.adapter.magpieowssecurity import MagpieOWSSecurity
from magpie.adapter.magpieservice import MagpieServiceStore
from magpie.api.exception import raise_http, valid_http
from magpie.api.schemas import SigninAPI
from magpie.db import get_engine, get_session_factory, get_tm_session
from import get_auth_config
from magpie.utils import CONTENT_TYPE_JSON, SingletonMeta, get_logger, get_magpie_url, get_settings

# twitcher available only when this module is imported from it
from twitcher.adapter.base import AdapterInterface  # noqa
from twitcher.owsproxy import owsproxy_defaultconfig  # noqa

    # pylint: disable=W0611,unused-import
    from typing import Optional

    from pyramid.config import Configurator
    from pyramid.httpexceptions import HTTPException
    from pyramid.request import Request

    from magpie.models import User
    from magpie.typedefs import JSON, AnySettingsContainer, Str

    from import AccessTokenStoreInterface  # noqa

[docs]LOGGER = get_logger("TWITCHER")
[docs]def get_user(request): # type: (Request) -> Optional[User] """ Obtains the authenticated user from the request (if any). :param request: incoming HTTP request potentially containing authentication definitions. :return: the authenticated user if parameters were valid (good credentials, not expired, etc.) or ``None``. """ user_id = request.unauthenticated_userid LOGGER.debug("Current user id is '%s'", user_id) if user_id is not None: user = UserService.by_id(user_id, db_session=request.db) LOGGER.debug("Current user has been resolved has '%s'", user) return user if LOGGER.isEnabledFor(logging.DEBUG): debug_cookie_identify(request) return None
[docs]def verify_user(request): # type: (Request) -> HTTPException """ Verifies that a valid user authentication on the pointed ``Magpie`` instance (via configuration) also results into a valid user authentication with the current ``Twitcher`` instance to ensure settings match between them. :param request: an HTTP request with valid authentication token/cookie credentials. :return: appropriate HTTP success or error response with details about the result. """ magpie_url = get_magpie_url(request) resp = + SigninAPI.path, json=request.json, headers={"Content-Type": CONTENT_TYPE_JSON, "Accept": CONTENT_TYPE_JSON}) if resp.status_code != HTTPOk.code: content = {"response": resp.json()} return raise_http(HTTPForbidden, detail="Failed Magpie login.", content=content, nothrow=True) # noqa authn_policy = request.registry.queryUtility(IAuthenticationPolicy) # noqa result = authn_policy.cookie.identify(request) if result is None: return raise_http(HTTPForbidden, detail="Twitcher login incompatible with Magpie login.", nothrow=True) # noqa return valid_http(HTTPOk, detail="Twitcher login verified successfully with Magpie login.")
[docs]@six.add_metaclass(SingletonMeta) class MagpieAdapter(AdapterInterface): # pylint: disable: W0223,W0612 def __init__(self, container): self._servicestore = None self._owssecurity = None super(MagpieAdapter, self).__init__(container) # pylint: disable=E1101,no-member @property
[docs] def name(self): # type: () -> Str # pylint: disable=E1101,no-member return # noqa
[docs] def describe_adapter(self): # type: () -> JSON return {"name":, "version": __meta__.__version__}
[docs] def servicestore_factory(self, request): # type: (Request) -> MagpieServiceStore if self._servicestore is None: self._servicestore = MagpieServiceStore(request) return self._servicestore
[docs] def tokenstore_factory(self, request): # type: (Request) -> AccessTokenStoreInterface raise NotImplementedError
[docs] def owssecurity_factory(self, request): # type: (Request) -> MagpieOWSSecurity if self._owssecurity is None: self._owssecurity = MagpieOWSSecurity(request) return self._owssecurity
[docs] def owsproxy_config(self, container): # type: (AnySettingsContainer) -> None"Loading MagpieAdapter owsproxy config") config = self.configurator_factory(container) owsproxy_defaultconfig(config) # let Twitcher configure the rest normally
[docs] def configurator_factory(self, container): # noqa: N805, R0201 # type: (AnySettingsContainer) -> Configurator settings = get_settings(container) set_cache_regions_from_settings(settings) # disable rpcinterface which is conflicting with postgres db settings["twitcher.rpcinterface"] = False"Loading MagpieAdapter config") config = get_auth_config(container) config.include("pyramid_beaker") # use pyramid_tm to hook the transaction lifecycle to the request # make request.db available for use in Pyramid config.include("pyramid_tm") session_factory = get_session_factory(get_engine(settings)) config.registry["dbsession_factory"] = session_factory config.add_request_method( # is the transaction manager used by pyramid_tm lambda r: get_tm_session(session_factory,, "db", reify=True ) # use same 'get_user' method as ziggurat to access 'request.user' from # request with auth token with exactly the same behaviour in Twitcher config.add_request_method(get_user, "user", reify=True) # add route to verify user token matching between Magpie/Twitcher config.add_route("verify-user", "/verify") config.add_view(verify_user, route_name="verify-user") return config