from magpie.api.login import esgfopenid, wso2
from magpie.constants import get_constant
from magpie.definitions.pyramid_definitions import (
AuthTktAuthenticationPolicy, ACLAuthorizationPolicy, Configurator, asbool
)
from magpie.definitions.ziggurat_definitions import groupfinder
from magpie.utils import get_logger, get_settings
from authomatic import Authomatic, provider_id
from authomatic.providers import oauth2, openid
from typing import TYPE_CHECKING
import logging
if TYPE_CHECKING:
from magpie.definitions.typedefs import JSON # noqa: F401
[docs]AUTHOMATIC_LOGGER = get_logger('magpie.authomatic', level=logging.DEBUG)
[docs]LOGGER = get_logger('magpie.security')
[docs]def get_auth_config(container):
settings = get_settings(container)
magpie_secret = get_constant('MAGPIE_SECRET', settings, settings_name='magpie.secret')
magpie_cookie_expire = get_constant('MAGPIE_COOKIE_EXPIRE', settings,
settings_name='magpie.cookie_expire', default_value=None,
raise_missing=False, raise_not_set=False, print_missing=True)
magpie_cookie_name = get_constant('MAGPIE_COOKIE_NAME', settings,
settings_name='magpie.cookie_name', default_value='auth_tkt',
raise_missing=False, raise_not_set=False, print_missing=True)
LOGGER.debug('************************************************************')
LOGGER.debug('Secret : {0}, Cookie name : {1}, Timeout : {2}'.format(
magpie_secret,
magpie_cookie_name,
magpie_cookie_expire
))
LOGGER.debug('************************************************************')
authn_policy = AuthTktAuthenticationPolicy(
magpie_secret,
cookie_name=magpie_cookie_name,
callback=groupfinder,
# Protect against JavaScript CSRF attacks attempting cookies retrieval
http_only=True,
# Automatically refresh the cookie unless inactivity reached 'timeout'
timeout=magpie_cookie_expire,
reissue_time=int(magpie_cookie_expire) / 10 if magpie_cookie_expire else None,
)
authz_policy = ACLAuthorizationPolicy()
# create configurator or use one defined as input to preserve previous setup/include/etc.
config = Configurator() if not isinstance(container, Configurator) else container
from magpie import models
config.setup_registry(
settings=settings,
root_factory=models.RootFactory,
authentication_policy=authn_policy,
authorization_policy=authz_policy
)
return config
[docs]def authomatic_setup(request):
magpie_secret = get_constant('MAGPIE_SECRET', request, settings_name='magpie.secret')
return Authomatic(
config=authomatic_config(request),
secret=magpie_secret,
logger=AUTHOMATIC_LOGGER,
report_errors=True,
logging_level=AUTHOMATIC_LOGGER.level
)
[docs]def authomatic_config(request=None):
defaults_config = {
'popup': True,
}
openid_config = {
'openid': {
'class_': openid.OpenID,
'display_name': 'OpenID',
},
}
esgf_config = {
'dkrz': {
'class_': esgfopenid.ESGFOpenID,
'hostname': 'esgf-data.dkrz.de',
'provider_url': 'https://{hostname}/esgf-idp/openid/{username}',
'display_name': 'DKRZ',
},
'ipsl': {
'class_': esgfopenid.ESGFOpenID,
'hostname': 'esgf-node.ipsl.upmc.fr',
'display_name': 'IPSL',
},
# former 'badc'
'ceda': {
'class_': esgfopenid.ESGFOpenID,
'hostname': 'esgf-index1.ceda.ac.uk',
'provider_url': 'https://{hostname}/openid/{username}',
'display_name': 'CEDA',
},
# former 'pcmdi'
'llnl': {
'class_': esgfopenid.ESGFOpenID,
'hostname': 'esgf-node.llnl.gov',
'display_name': 'LLNL',
},
'smhi': {
'class_': esgfopenid.ESGFOpenID,
'hostname': 'esg-dn1.nsc.liu.se',
'display_name': 'SMHI',
},
}
_get_const_info = dict(raise_missing=False, raise_not_set=False, print_missing=True)
oauth2_config = {
'github': {
'class_': oauth2.GitHub,
'display_name': 'GitHub',
'consumer_key': get_constant('GITHUB_CLIENT_ID', **_get_const_info),
'consumer_secret': get_constant('GITHUB_CLIENT_SECRET', **_get_const_info),
'redirect_uri': request.application_url if request else None,
# 'redirect_uri': '{}/providers/github/signin'.format(request.application_url) if request else None,
'access_headers': {'User-Agent': 'Magpie'},
'id': provider_id(),
'_apis': {
'Get your events': ('GET', 'https://api.github.com/users/{user.username}/events'),
'Get your watched repos': ('GET', 'https://api.github.com/user/subscriptions'),
},
},
'wso2': {
'class_': wso2.WSO2,
'display_name': 'WSO2',
'hostname': get_constant('WSO2_HOSTNAME', **_get_const_info),
'consumer_key': get_constant('WSO2_CLIENT_ID', **_get_const_info),
'consumer_secret': get_constant('WSO2_CLIENT_SECRET', **_get_const_info),
'certificate_file': get_constant('WSO2_CERTIFICATE_FILE', **_get_const_info) or None, # replace if == ''
'ssl_verify': asbool(get_constant('WSO2_SSL_VERIFY', default_value=True, **_get_const_info)),
'redirect_uri': '{}/providers/wso2/signin'.format(request.application_url) if request else None,
'id': provider_id(),
}
}
# Concatenate the configs.
config = {} # type: JSON
config.update(oauth2_config)
config.update(openid_config)
config.update(esgf_config)
config['__defaults__'] = defaults_config
return config
[docs]def get_provider_names():
provider_names = {}
config = authomatic_config()
for provider in config.keys():
if provider != '__defaults__':
provider_names[provider.lower()] = config[provider].get('display_name', provider)
return provider_names