import logging
import os
import random
import string
import subprocess # nosec
import time
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING, overload
import requests
import six
import transaction
import yaml
from pyramid.httpexceptions import HTTPException
from sqlalchemy.orm.session import Session
from ziggurat_foundations.models.services.group import GroupService
from ziggurat_foundations.models.services.resource import ResourceService
from ziggurat_foundations.models.services.user import UserService
from ziggurat_foundations.models.services.user_resource_permission import UserResourcePermissionService
from magpie import models
from magpie.api.schemas import (
GroupResourcePermissionsAPI,
GroupsAPI,
ServiceAPI,
ServiceResourcesAPI,
ServicesAPI,
SigninAPI,
SignoutAPI,
UserResourcePermissionsAPI,
UsersAPI
)
from magpie.config import validate_services_config
from magpie.constants import get_constant
from magpie.permissions import Permission, PermissionSet
from magpie.services import SERVICE_TYPE_DICT, ServiceWPS
from magpie.utils import (
bool2str,
get_admin_cookies,
get_json,
get_logger,
get_magpie_url,
get_phoenix_url,
get_twitcher_protected_service_url,
islambda,
print_log,
raise_log
)
if TYPE_CHECKING:
# pylint: disable=W0611,unused-import
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union
from magpie.typedefs import (
JSON,
AnyCookiesType,
AnyResolvedSettings,
AnyResponseType,
AnySettingsContainer,
CombinedConfig,
CookiesOrSessionType,
GroupsConfig,
GroupsSettings,
Literal,
MultiConfigs,
PermissionConfigItem,
PermissionsConfig,
ServicesConfig,
ServicesSettings,
Str,
UsersConfig,
UsersSettings,
WebhooksConfig
)
[docs]
LOGGER = get_logger(__name__)
[docs]
LOGIN_ATTEMPT = 5 # max attempts for login
[docs]
LOGIN_TIMEOUT = 2 # delay (s) between each login attempt
[docs]
CREATE_SERVICE_INTERVAL = 2 # delay (s) between creations to allow server to respond/process
[docs]
GETCAPABILITIES_INTERVAL = 10 # delay (s) between 'GetCapabilities' Phoenix calls to validate service registration
[docs]
GETCAPABILITIES_ATTEMPTS = 12 # max attempts for 'GetCapabilities' validations
# controls
[docs]
SERVICES_MAGPIE = "MAGPIE"
[docs]
SERVICES_PHOENIX = "PHOENIX"
[docs]
SERVICES_PHOENIX_ALLOWED = [ServiceWPS.service_type]
[docs]
class RegistrationError(RuntimeError):
"""
Generic error during registration operation.
"""
[docs]
class RegistrationValueError(RegistrationError, ValueError):
"""
Registration error caused by an invalid value precondition.
"""
[docs]
class RegistrationLoginError(RegistrationError):
"""
Registration error caused by a failure to complete required login operation.
"""
[docs]
class RegistrationConfigurationError(RegistrationValueError):
"""
Registration error caused by an invalid configuration entry or definition.
"""
[docs]
def _login_loop(login_url, cookies_file, data=None, message="Login response"):
cookies_dir = os.path.dirname(cookies_file)
if not os.path.isdir(cookies_dir):
os.makedirs(cookies_dir) # don't use "exist_ok" for backward compatibility (Python<3.5)
data_str = ""
if data is not None and isinstance(data, dict):
for key in data:
data_str = data_str + "&" + str(key) + "=" + str(data[key])
if isinstance(data, six.string_types):
data_str = data
attempt = 0
while True:
err, http = _request_curl(login_url, cookie_jar=cookies_file, form_params=data_str, msg=message)
if not err and http == 200:
break
attempt += 1
LOGGER.warning("Login failed, retrying in %ss (%s/%s)", LOGIN_TIMEOUT, attempt, LOGIN_ATTEMPT)
time.sleep(LOGIN_TIMEOUT)
if attempt >= LOGIN_ATTEMPT:
raise RegistrationLoginError("Cannot log in to {0}".format(login_url))
[docs]
def _request_curl(url, cookie_jar=None, cookies=None, form_params=None, msg="Response"):
# type: (Str, Optional[Str], Optional[Str], Optional[Str], Optional[Str]) -> Tuple[int, int]
"""
Executes a request using cURL.
:returns: tuple of the returned system command code and the response http code
"""
# arg -k allows to ignore insecure SSL errors, ie: access 'https' page not configured for it
# curl_cmd = 'curl -k -L -s -o /dev/null -w "{msg_out} : %{{http_code}}\\n" {params} {url}'
# curl_cmd = curl_cmd.format(msg_out=msg, params=params, url=url)
msg_sep = msg + ": "
params = ["curl", "-k", "-L", "-s", "-o", "/dev/null", "-w", msg_sep + "%{http_code}"]
if cookie_jar is not None and cookies is not None:
raise RegistrationValueError("CookiesType and Cookie_Jar cannot be both set simultaneously")
if cookie_jar is not None:
params.extend(["--cookie-jar", cookie_jar]) # save cookies
if cookies is not None:
params.extend(["--cookie", cookies]) # use cookies
if form_params is not None:
params.extend(["--data", form_params])
params.extend([url])
with subprocess.Popen(params, stdout=subprocess.PIPE) as curl_proc: # nosec
curl_msg = curl_proc.communicate()[0] # type: Str
curl_err = curl_proc.returncode # type: int
http_code = int(six.ensure_text(curl_msg).split(msg_sep)[1])
print_log("[{url}] {response}".format(response=curl_msg, url=url), logger=LOGGER)
return curl_err, http_code
[docs]
def _phoenix_update_services(services_dict):
# type: (JSON) -> bool
if not _phoenix_remove_services():
print_log("Could not remove services, aborting register sync services to Phoenix", logger=LOGGER)
return False
success, _ = _phoenix_register_services(services_dict)
if not success:
print_log("Failed services registration from Magpie to Phoenix\n"
"[warning: services could have been removed but could not be re-added]", logger=LOGGER)
return False
return True
[docs]
def _phoenix_login(cookies_file):
# type: (Str) -> bool
"""
Performs Phoenix login using provided cookies.
"""
phoenix_pwd = get_constant("PHOENIX_PASSWORD")
phoenix_url = get_phoenix_url()
login_url = phoenix_url + "/account/login/phoenix"
login_data = {"password": phoenix_pwd, "submit": "submit"}
_login_loop(login_url, cookies_file, login_data, "Phoenix login response")
return _phoenix_login_check(cookies_file)
[docs]
def _phoenix_login_check(cookies):
# type: (Str) -> bool
"""
Since Phoenix always return 200, even on invalid login, 'hack' check unauthorized access.
:param cookies: temporary cookies file storage used for login with :func:`_phoenix_login`.
:return: status indicating if login access was granted with defined credentials.
"""
no_access_error = "<ExceptionText>Unauthorized: Services failed permission check</ExceptionText>"
svc_url = get_phoenix_url() + "/services"
command = ["curl", "-s", "--cookie", cookies, svc_url]
with subprocess.Popen(command, stdout=subprocess.PIPE) as curl_process: # nosec
curl_http_resp = curl_process.communicate() # nosec
has_access = no_access_error not in curl_http_resp[0]
return has_access
[docs]
def _phoenix_remove_services():
# type: () -> bool
"""
Removes the Phoenix services using temporary cookies retrieved from login with defined `PHOENIX` constants.
:returns: success status of the procedure.
"""
error = 0
try:
with NamedTemporaryFile() as phoenix_cookies_file:
if not _phoenix_login(phoenix_cookies_file.name):
print_log("Login unsuccessful from post-login check, aborting...", logger=LOGGER)
return False
phoenix_url = get_phoenix_url()
remove_services_url = phoenix_url + "/clear_services"
error, _ = _request_curl(remove_services_url, cookies=phoenix_cookies_file.name,
msg="Phoenix remove services")
except Exception as exc:
print_log("Exception during phoenix remove services: [{!r}]".format(exc), logger=LOGGER, level=logging.ERROR)
return error == 0
[docs]
def _phoenix_register_services(services_dict, allowed_service_types=None):
# type: (Dict[Str, Dict[Str, Any]], Optional[List[Str]]) -> Tuple[bool, Dict[Str, int]]
success = False
statuses = {}
try:
with NamedTemporaryFile() as phoenix_cookies_file:
allowed_service_types = SERVICES_PHOENIX_ALLOWED if allowed_service_types is None else allowed_service_types
allowed_service_types = [svc.upper() for svc in allowed_service_types]
if not _phoenix_login(phoenix_cookies_file.name):
print_log("Login unsuccessful from post-login check, aborting...", logger=LOGGER, level=logging.WARN)
return False, {}
# Filter specific services to push
filtered_services_dict = {}
for svc in services_dict:
if str(services_dict[svc].get("type")).upper() in allowed_service_types:
filtered_services_dict[svc] = services_dict[svc]
filtered_services_dict[svc]["type"] = filtered_services_dict[svc]["type"].upper()
# Register services
success, statuses = _register_services(SERVICES_PHOENIX, filtered_services_dict,
phoenix_cookies_file.name, "Phoenix register service")
except Exception as exc:
print_log("Exception during phoenix register services: [{!r}]".format(exc), logger=LOGGER, level=logging.ERROR)
return success, statuses
[docs]
def _register_services(where, # type: Optional[Str]
services_dict, # type: Dict[Str, Dict[Str, Str]]
cookies, # type: Str
message="Register response", # type: Optional[Str]
): # type: (...) -> Tuple[bool, Dict[Str, int]]
"""
Registers services on desired location using provided configurations and access cookies.
:returns: tuple of overall success and individual http response of each service registration.
"""
success = True
svc_url = None
statuses = {}
if where == SERVICES_MAGPIE:
svc_url_tag = "service_url"
register_service_url = get_magpie_url() + ServicesAPI.path
elif where == SERVICES_PHOENIX:
svc_url_tag = "url"
register_service_url = get_phoenix_url() + "/services/register"
else:
raise RegistrationValueError("Unknown location for service registration", where)
for service_name in services_dict:
cfg = services_dict[service_name]
cfg["public"] = bool2str(cfg.get("public"))
cfg["c4i"] = bool2str(cfg.get("c4i"))
cfg["url"] = cfg.get("url")
if where == SERVICES_MAGPIE:
svc_url = cfg["url"]
elif where == SERVICES_PHOENIX:
svc_url = get_twitcher_protected_service_url(service_name)
params = "service_name={name}&" \
"{svc_url_tag}={svc_url}&" \
"service_title={cfg[title]}&" \
"public={cfg[public]}&" \
"c4i={cfg[c4i]}&" \
"service_type={cfg[type]}&" \
"register=register" \
.format(name=service_name, cfg=cfg, svc_url_tag=svc_url_tag, svc_url=svc_url)
service_msg = "{msg} ({svc}) [{url}]".format(msg=message, svc=service_name, url=svc_url)
error, http_code = _request_curl(register_service_url, cookies=cookies, form_params=params, msg=service_msg)
statuses[service_name] = http_code
success = success and not error and ((where == SERVICES_PHOENIX and http_code == 200) or
(where == SERVICES_MAGPIE and http_code == 201))
time.sleep(CREATE_SERVICE_INTERVAL)
return success, statuses
[docs]
def sync_services_phoenix(services, services_as_dicts=False):
# type: (Union[Iterable[models.Service], JSON], bool) -> bool
"""
Syncs Magpie services by pushing updates to Phoenix.
Services must be one of types specified in :py:data:`magpie.register.SERVICES_PHOENIX_ALLOWED`.
:param services:
An iterable of :class:`models.Service` by default, or a dictionary of ``{svc-name: {<service-info>}}`` JSON
objects containing each service's information if :paramref:`services_ad_dicts` is ``True``.
where ``<service-info>`` is defined as::
{"public_url": <url>, "service_name": <name>, "service_type": <type>}
:param services_as_dicts: indicate if services must be parsed as JSON definitions.
"""
services_dict = {}
for svc in services:
if services_as_dicts:
svc_dict = services[svc] # type: JSON
services_dict[svc] = {"url": svc_dict["public_url"], "title": svc_dict["service_name"],
"type": svc_dict["service_type"], "c4i": False, "public": True}
else:
services_dict[svc.resource_name] = {"url": svc.url, "title": svc.resource_name,
"type": svc.type, "c4i": False, "public": True}
return _phoenix_update_services(services_dict)
[docs]
def _magpie_add_register_services_perms(services, statuses, curl_cookies, request_cookies, disable_getcapabilities):
# type: (ServicesSettings, Dict[Str, int], str, AnyCookiesType, bool) -> None
magpie_url = get_magpie_url()
anon_group = get_constant("MAGPIE_ANONYMOUS_GROUP")
for service_name in services:
svc_available_perms_url = "{magpie}/services/{svc}/permissions" \
.format(magpie=magpie_url, svc=service_name)
resp_available_perms = requests.get(svc_available_perms_url, cookies=request_cookies, timeout=5)
if resp_available_perms.status_code == 401:
raise_log("Invalid credentials, cannot update service permissions",
exception=RegistrationLoginError, logger=LOGGER)
available_perms = get_json(resp_available_perms).get("permission_names", [])
# only applicable to services supporting "GetCapabilities" request
if resp_available_perms.status_code and Permission.GET_CAPABILITIES.value in available_perms:
# enforce 'getcapabilities' permission if available for service just updated (200) or created (201)
# update 'getcapabilities' permission when the service existed and it allowed
if ((not disable_getcapabilities and statuses[service_name] == 409)
or statuses[service_name] == 200 or statuses[service_name] == 201):
svc_anonym_add_perms_url = "{magpie}/groups/{grp}/services/{svc}/permissions" \
.format(magpie=magpie_url, grp=anon_group, svc=service_name)
svc_anonym_perm_data = {"permission_name": Permission.GET_CAPABILITIES.value}
requests.post(svc_anonym_add_perms_url, data=svc_anonym_perm_data, cookies=request_cookies, timeout=5)
# check service response so Phoenix doesn't refuse registration
# try with both the 'direct' URL and the 'GetCapabilities' URL
attempt = 0
service_info_url = "{magpie}/services/{svc}".format(magpie=magpie_url, svc=service_name)
service_info_resp = requests.get(service_info_url, cookies=request_cookies, timeout=5)
service_url = get_json(service_info_resp).get(service_name).get("service_url")
svc_getcap_url = "{svc_url}/wps?service=WPS&version=1.0.0&request=GetCapabilities" \
.format(svc_url=service_url)
while True:
service_msg_direct = "Service response ({svc})".format(svc=service_name)
service_msg_getcap = "Service response ({svc}, GetCapabilities)".format(svc=service_name)
err, http = _request_curl(service_url, cookies=curl_cookies, msg=service_msg_direct)
if not err and http == 200:
break
err, http = _request_curl(svc_getcap_url, cookies=curl_cookies, msg=service_msg_getcap)
if not err and http == 200:
break
print_log("[{url}] Bad response from service '{svc}' retrying after {sec}s..."
.format(svc=service_name, url=service_url, sec=GETCAPABILITIES_INTERVAL), logger=LOGGER)
time.sleep(GETCAPABILITIES_INTERVAL)
attempt += 1
if attempt >= GETCAPABILITIES_ATTEMPTS:
msg = "[{url}] No response from service '{svc}' after {tries} attempts. Skipping..." \
.format(svc=service_name, url=service_url, tries=attempt)
print_log(msg, logger=LOGGER)
break
[docs]
def _magpie_update_services_conflict(conflict_services, services_dict, request_cookies):
# type: (List[Str], ServicesSettings, AnyCookiesType) -> Dict[Str, int]
"""
Resolve conflicting services by name during registration by updating them only if pointing to different URL.
"""
magpie_url = get_magpie_url()
statuses = {}
for svc_name in conflict_services:
statuses[svc_name] = 409
svc_url_new = services_dict[svc_name]["url"]
svc_url_db = "{magpie}/services/{svc}".format(magpie=magpie_url, svc=svc_name)
svc_resp = requests.get(svc_url_db, cookies=request_cookies, timeout=5)
svc_info = get_json(svc_resp).get(svc_name)
svc_url_old = svc_info["service_url"]
if svc_url_old != svc_url_new:
svc_info["service_url"] = svc_url_new
res_svc_put = requests.patch(svc_url_db, data=svc_info, cookies=request_cookies, timeout=5)
statuses[svc_name] = res_svc_put.status_code
print_log("[{url_old}] => [{url_new}] Service URL update ({svc}): {resp}"
.format(svc=svc_name, url_old=svc_url_old, url_new=svc_url_new, resp=res_svc_put.status_code),
logger=LOGGER)
return statuses
[docs]
def _magpie_register_services_with_requests(services_dict, push_to_phoenix, username, password, provider,
force_update=False, disable_getcapabilities=False):
# type: (ServicesSettings, bool, Str, Str, Str, bool, bool) -> bool
"""
Registers :term:`Services` of loaded ``providers`` configuration using API requests.
.. seealso::
:func:`magpie_register_services_from_config`
:param services_dict: services configuration definition.
:param push_to_phoenix: push registered Magpie services to Phoenix for synced configurations.
:param username: login username to use to obtain permissions for services registration.
:param password: login password to use to obtain permissions for services registration.
:param provider: login provider to use to obtain permissions for services registration.
:param force_update: override existing services matched by name
:param disable_getcapabilities: do not execute 'GetCapabilities' validation for applicable services.
:return: successful operation status
"""
magpie_url = get_magpie_url()
session = requests.Session()
success = False
try:
with NamedTemporaryFile() as magpie_cookies_file:
# Need to login first as admin
login_url = magpie_url + SigninAPI.path
login_data = {"user_name": username, "password": password, "provider_name": provider}
_login_loop(login_url, magpie_cookies_file.name, login_data, "Magpie login response")
login_resp = session.post(login_url, data=login_data)
if login_resp.status_code != 200:
raise_log("Failed login with specified credentials", exception=RegistrationLoginError, logger=LOGGER)
request_cookies = login_resp.cookies
# Register services
# Magpie will not overwrite existing services by default, 409 Conflict instead of 201 Created
success, statuses_register = _register_services(SERVICES_MAGPIE, services_dict,
magpie_cookies_file.name, "Magpie register service")
# Service URL update if conflicting and requested
if force_update and not success:
conflict_services = [svc_name for svc_name, http_code in statuses_register.items() if http_code == 409]
statuses_update = _magpie_update_services_conflict(conflict_services, services_dict, request_cookies)
statuses_register.update(statuses_update) # update previous statuses with new ones
# Add 'GetCapabilities' permissions on newly created services to allow 'ping' from Phoenix
# Phoenix doesn't register the service if it cannot be checked with this request
_magpie_add_register_services_perms(services_dict, statuses_register,
magpie_cookies_file.name, request_cookies, disable_getcapabilities)
session.get(magpie_url + SignoutAPI.path)
# Push updated services to Phoenix
if push_to_phoenix:
success = _phoenix_update_services(services_dict)
except Exception as exc:
print_log("Exception during magpie register services: [{!r}]".format(exc), logger=LOGGER, level=logging.ERROR)
finally:
session.cookies.clear()
return success
[docs]
def _magpie_register_services_with_db_session(services_dict, db_session, push_to_phoenix=False,
force_update=False, update_getcapabilities_permissions=False):
# type: (ServicesSettings, Session, bool, bool, bool) -> bool
"""
Registration procedure of :term:`Services` from ``providers`` section using pre-established database session.
.. seealso::
:func:`magpie_register_services_from_config`
"""
db_session.begin(subtransactions=True)
existing_services_names = [n[0] for n in db_session.query(models.Service.resource_name)]
magpie_anonymous_user = get_constant("MAGPIE_ANONYMOUS_USER")
anonymous_user = UserService.by_user_name(magpie_anonymous_user, db_session=db_session)
for svc_name, svc_values in services_dict.items():
svc_new_url = svc_values["url"]
svc_type = svc_values["type"]
svc_config = svc_values.get("configuration")
svc_sync_type = svc_values.get("sync_type")
if force_update and svc_name in existing_services_names:
svc = models.Service.by_service_name(svc_name, db_session=db_session)
if svc.url == svc_new_url:
print_log("Service URL already properly set [{url}] ({svc})"
.format(url=svc.url, svc=svc_name), logger=LOGGER)
else:
print_log("Service URL update [{url_old}] => [{url_new}] ({svc})"
.format(url_old=svc.url, url_new=svc_new_url, svc=svc_name),
logger=LOGGER, level=logging.WARN)
svc.url = svc_new_url
if svc.type != svc_type:
print_log("Service type update [{type_old}] => [{type_new}] ({svc}). "
"If children resources/permissions are not compatible, this could break the instance."
.format(type_old=svc.type, type_new=svc_type, svc=svc_name),
logger=LOGGER, level=logging.WARN)
svc.type = svc_type
svc.sync_type = svc_sync_type
svc.configuration = svc_config
elif not force_update and svc_name in existing_services_names:
print_log("Skipping service [{svc}] (conflict)" .format(svc=svc_name), logger=LOGGER)
else:
print_log("Adding service [{svc}]".format(svc=svc_name), logger=LOGGER)
svc = models.Service(
resource_name=svc_name,
resource_type=models.Service.resource_type_name,
url=svc_new_url,
type=svc_type,
configuration=svc_config,
sync_type=svc_sync_type
)
db_session.add(svc)
getcap_perm = Permission.GET_CAPABILITIES
if update_getcapabilities_permissions and anonymous_user is None:
print_log("Cannot update 'getcapabilities' permission of non-existing anonymous user",
level=logging.WARN, logger=LOGGER)
elif update_getcapabilities_permissions and getcap_perm in SERVICE_TYPE_DICT[svc_type].permissions:
svc = db_session.query(models.Service.resource_id).filter_by(resource_name=svc_name).first()
svc_perm_getcapabilities = UserResourcePermissionService.by_resource_user_and_perm(
user_id=anonymous_user.id,
perm_name=getcap_perm.value,
resource_id=svc.resource_id,
db_session=db_session
)
if svc_perm_getcapabilities is None:
print_log("Adding '{}' permission to anonymous user.".format(getcap_perm.value), logger=LOGGER)
svc_perm_getcapabilities = models.UserResourcePermission(
user_id=anonymous_user.id,
perm_name=getcap_perm.value,
resource_id=svc.resource_id
)
db_session.add(svc_perm_getcapabilities)
transaction.commit()
if push_to_phoenix:
return _phoenix_update_services(services_dict)
return True
[docs]
def _load_config(path_or_dict, section, allow_missing=False):
# type: (Union[Str, CombinedConfig], Str, bool) -> Union
"""
Loads a YAML/JSON file path or pre-loaded dictionary configuration.
"""
try:
if isinstance(path_or_dict, six.string_types):
with open(path_or_dict, mode="r", encoding="utf-8") as yml_file:
cfg = yaml.safe_load(yml_file)
else:
cfg = path_or_dict
return _expand_all(cfg[section])
except KeyError:
msg = "Config file section [{!s}] not found.".format(section)
if allow_missing:
print_log(msg, level=logging.WARNING, logger=LOGGER)
return {}
raise_log(msg, exception=RegistrationError, logger=LOGGER)
except Exception as exc:
raise_log("Invalid config file [{!r}]".format(exc), exception=RegistrationError, logger=LOGGER)
[docs]
CONFIG_KNOWN_EXTENSIONS = frozenset([".cfg", ".json", ".yml", ".yaml"])
@overload
[docs]
def get_all_configs(path_or_dict, section, allow_missing=False):
# type: (Union[Str, CombinedConfig], Literal["groups"], bool) -> GroupsConfig
...
@overload
def get_all_configs(path_or_dict, section, allow_missing=False):
# type: (Union[Str, CombinedConfig], Literal["users"], bool) -> UsersConfig
...
@overload
def get_all_configs(path_or_dict, section, allow_missing=False):
# type: (Union[Str, CombinedConfig], Literal["permissions"], bool) -> PermissionsConfig
...
@overload
def get_all_configs(path_or_dict, section, allow_missing=False):
# type: (Union[Str, CombinedConfig], Literal["services"], bool) -> ServicesConfig
...
@overload
def get_all_configs(path_or_dict, section, allow_missing=False):
# type: (Union[Str, CombinedConfig], Literal["webhooks"], bool) -> WebhooksConfig
...
def get_all_configs(path_or_dict, section, allow_missing=False):
# type: (Union[Str, CombinedConfig], Str, bool) -> MultiConfigs
"""
Loads all matched configurations.
Configurations are considered a valid match if they have one of the :py:data:`CONFIG_KNOWN_EXTENSIONS` (if path)
and that loaded (or passed) configurations contain the specified :paramref:`section` name.
If the input is a directory path, loads any number of files contained in it that fulfill matching conditions.
If it is a path pointing to a single valid configuration file, loads it by itself.
If a dictionary is passed, returns it directly if it fulfills validation.
:param path_or_dict: directory path, file path or literal dictionary.
:param section: section name that must be inside every matched configuration file to be loaded.
:param allow_missing: allow to have no valid configuration after all are resolved, otherwise raises.
:raises RegistrationError: when no valid configuration can be found and empty one is not allowed.
:returns:
- list of configurations loaded if input was a directory path
- list of single configuration if input was a file path
- list of single configuration if input was a JSON dict
- empty list if none of the other cases where matched
.. note::
Order of file loading will be resolved by alphabetically sorted filename if specifying a directory path.
"""
if isinstance(path_or_dict, six.string_types):
if os.path.isdir(path_or_dict):
dir_path = os.path.abspath(path_or_dict)
cfg_names = list(sorted({fn for fn in os.listdir(dir_path)
if any([fn.endswith(ext) for ext in CONFIG_KNOWN_EXTENSIONS])}))
return [_load_config(os.path.join(dir_path, fn), section, allow_missing) for fn in cfg_names]
if os.path.isfile(path_or_dict):
return [_load_config(path_or_dict, section, allow_missing)]
elif isinstance(path_or_dict, dict):
return [_load_config(path_or_dict, section, allow_missing)]
return []
[docs]
def _expand_all(config):
# type: (JSON) -> JSON
"""
Applies environment variable expansion recursively to all applicable fields of a configuration definition.
"""
if isinstance(config, dict):
for cfg in list(config):
cfg_key = os.path.expandvars(cfg)
if cfg_key != cfg:
config[cfg_key] = config.pop(cfg)
config[cfg_key] = _expand_all(config[cfg_key])
elif isinstance(config, (list, set)):
for i, cfg in enumerate(config):
config[i] = _expand_all(cfg)
elif isinstance(config, six.string_types):
config = os.path.expandvars(str(config))
elif isinstance(config, (int, bool, float, type(None))):
pass
else:
raise NotImplementedError("unknown parsing of config of type: {}".format(type(config)))
return config
[docs]
def magpie_register_services_from_config(service_config_path, push_to_phoenix=False, skip_registration=False,
force_update=False, disable_getcapabilities=False, db_session=None):
# type: (Str, bool, bool, bool, bool, Optional[Session]) -> ServicesSettings
"""
Registers Magpie services from one or many `providers.cfg` file.
Uses the provided DB session to directly update service definitions, or uses API request routes as admin. Optionally
pushes updates to Phoenix.
:param service_config_path: where to look for `providers` configuration(s). Directory or file path.
:param push_to_phoenix: whether to push loaded service definitions to remote `Phoenix` service.
:param skip_registration: Load, validate and combine :term:`Service` configurations, but don't register them.
:param force_update: override service definitions that conflict by name with registered ones.
:param disable_getcapabilities:
Skip `GetCapabilities` request validation and permission update.
By default, any service with `type` that allows `GetCapabilities` permissions will be tested to ensure it can
be reached on the provided `url`. Once validated, this permission is applied to `anonymous` group to make its
entrypoint accessible by anyone.
Services that cannot have `GetCapabilities` permission are ignored regardless.
:param db_session: Use a pre-established database connection for registration. Otherwise, API requests are employed.
:returns: loaded service configurations.
"""
LOGGER.info("Starting services processing.")
services_configs = get_all_configs(service_config_path, "providers") # type: List[ServicesConfig]
services_config_count = len(services_configs)
LOGGER.log(logging.INFO if services_config_count else logging.WARNING,
"Found %s service configurations to process", services_config_count)
merged_service_configs = {}
for services in services_configs:
if not services:
LOGGER.warning("Services configuration are empty.")
continue
if force_update:
merged_service_configs.update(services)
else:
for svc, svc_cfg in services.items():
merged_service_configs.setdefault(svc, svc_cfg)
merged_service_configs = validate_services_config(merged_service_configs)
if not skip_registration:
# register services using API POSTs
if db_session is None:
admin_usr = get_constant("MAGPIE_ADMIN_USER")
admin_pwd = get_constant("MAGPIE_ADMIN_PASSWORD")
local_provider = get_constant("MAGPIE_DEFAULT_PROVIDER")
_magpie_register_services_with_requests(merged_service_configs, push_to_phoenix,
admin_usr, admin_pwd, local_provider,
force_update=force_update,
disable_getcapabilities=disable_getcapabilities)
# register services directly to db using session
else:
_magpie_register_services_with_db_session(merged_service_configs, db_session,
push_to_phoenix=push_to_phoenix, force_update=force_update,
update_getcapabilities_permissions=not disable_getcapabilities)
LOGGER.info("All services processed.")
return merged_service_configs
[docs]
def _handle_permission(message, permission_index, trail=", skipping...", detail=None, permission=None,
level=logging.WARN, raise_errors=False):
# type: (Str, int, Str, Optional[Str], Optional[Str], Union[Str, int], bool) -> None
"""
Logs a message related to a 'permission' entry and raises an error if required.
Log message format is as follows (detail portion omitted if none provided)::
{message} [permission: #{permission_index}] [{permission}]{trail}
Detail: [{detail}]
Such that the following logging entry is generated (omitting any additional logging formatters)::
>> log_permission("test", 1, " skip test...", "just a test", "fake")
test [permission: #1] [fake] skip test...
Detail: [just a test]
:param message: base message to log
:param permission_index: index of the permission in the configuration list for traceability
:param trail: trailing message appended after the base message
:param detail: additional details appended after the trailing message after moving to another line.
:param permission: permission name to log just before the trailing message.
:param level: logging level (default: ``logging.WARN``)
:param raise_errors: raises errors related to permissions, instead of just logging the info.
.. seealso::
`magpie/config/permissions.cfg`
"""
trail = "{}\nDetail: [{!s}]".format(trail, detail) if detail else (trail or "")
permission = " [{!s}]".format(permission) if permission else ""
msg = "{} [permission #{}]{}{}".format(message, permission_index, permission, trail)
LOGGER.log(level, msg)
if raise_errors:
raise RegistrationConfigurationError(msg)
[docs]
def _use_request(cookies_or_session):
return not isinstance(cookies_or_session, Session)
[docs]
def _parse_resource_path(permission_config_entry, # type: PermissionConfigItem
entry_index, # type: int
service_info, # type: JSON
cookies_or_session=None, # type: CookiesOrSessionType
magpie_url=None, # type: Optional[Str]
raise_errors=False # type: bool
): # type: (...) -> Tuple[Optional[int], bool]
"""
Parses the `resource` field of a permission config entry and retrieves the final resource id. Creates missing
resources as necessary if they can be automatically resolved.
If `cookies` are provided, uses requests to a running `Magpie` instance (with ``magpie_url``) to apply permission.
If `session` to db is provided, uses direct db connection instead to apply permission.
:returns: tuple of found id (if any, ``None`` otherwise), and success status of the parsing operation (error)
"""
# pylint: disable=C0415 # avoid circular imports
if not magpie_url and _use_request(cookies_or_session):
raise RegistrationValueError("cannot use cookies without corresponding request URL")
resource = None
resource_path = permission_config_entry.get("resource", "").strip("/")
resource_type_config = permission_config_entry.get("type")
if resource_path:
try:
svc_name = service_info["service_name"]
svc_type = service_info["service_type"]
# Prepare a list of types that fits with the list of resources
resource_type_list = resource_type_config.strip("/").split("/") if resource_type_config else [None]
resource_list = resource_path.split("/")
if len(resource_type_list) == 1:
# if only one type specified, assume every path of the resource uses the same resource type
resource_type_list = resource_type_list * len(resource_list)
if len(resource_list) != len(resource_type_list):
raise RegistrationConfigurationError("Invalid resource type found in configuration : " +
permission_config_entry.get("type"))
res_path = None
if _use_request(cookies_or_session):
res_path = magpie_url + ServiceResourcesAPI.path.format(service_name=svc_name)
res_resp = requests.get(res_path, cookies=cookies_or_session, timeout=5)
res_dict = get_json(res_resp)[svc_name] # type: JSON
else:
from magpie.api.management.service.service_formats import format_service_resources
svc = models.Service.by_service_name(svc_name, db_session=cookies_or_session)
res_dict = format_service_resources(svc, show_all_children=True, db_session=cookies_or_session)
parent = res_dict["resource_id"]
child_resources = res_dict["resources"] # type: Dict[Str, JSON]
for res, resource_type in zip(resource_list, resource_type_list):
# search in existing children resources
if len(child_resources):
res_id = list(filter(lambda r: res in [r, child_resources[r]["resource_name"]], child_resources))
if res_id:
res_info = child_resources[res_id[0]] # type: Dict[Str, JSON]
child_resources = res_info["children"] # update next sub-resource iteration
parent = res_info["resource_id"]
continue
# missing resource, attempt creation
svc_res_types = SERVICE_TYPE_DICT[svc_type].resource_type_names
type_count = len(svc_res_types)
if type_count == 0:
_handle_permission("Cannot generate resource", entry_index, raise_errors=True,
detail="Service [{!s}] of type [{!s}] doesn't allow any sub-resource types. "
.format(svc_name, svc_type))
if type_count != 1 and not (isinstance(resource_type, six.string_types) and resource_type):
_handle_permission("Cannot automatically generate resource", entry_index, raise_errors=True,
detail="Service [{!s}] of type [{!s}] allows more than 1 sub-resource "
"types ({}). Type must be explicitly specified for auto-creation. "
"Available choices are: {}"
.format(svc_name, svc_type, type_count, svc_res_types))
if type_count != 1 and resource_type not in svc_res_types:
_handle_permission("Cannot generate resource", entry_index, raise_errors=True,
detail="Service [{!s}] of type [{!s}] allows more than 1 sub-resource "
"types ({}). Specified type [{!s}] doesn't match any of the allowed "
"resource types. Available choices are: {}"
.format(svc_name, svc_type, type_count, resource_type, svc_res_types))
res_type = resource_type or svc_res_types[0]
if _use_request(cookies_or_session):
body = {"resource_name": res, "resource_type": res_type, "parent_id": parent}
resp = requests.post(res_path, json=body, cookies=cookies_or_session, timeout=5)
else:
from magpie.api.management.resource.resource_utils import create_resource
resp = create_resource(res, res, res_type, parent, db_session=cookies_or_session)
if resp.status_code != 201:
resp.raise_for_status()
child_resources = {}
parent = get_json(resp)["resource"]["resource_id"]
resource = parent
if not resource:
raise RegistrationConfigurationError("Could not extract child resource from resource path.")
except HTTPException as exc:
detail = "{} ({}), {!s}".format(type(exc).__name__, exc.code, exc)
_handle_permission("Failed resources parsing.", entry_index, detail=detail, raise_errors=raise_errors)
return None, False
except Exception as exc:
_handle_permission("Failed resources parsing.", entry_index, detail=repr(exc), raise_errors=raise_errors)
return None, False
return resource, True
[docs]
def _apply_permission_entry(permission_config_entry, # type: PermissionConfigItem
entry_index, # type: int
resource_id, # type: int
cookies_or_session, # type: CookiesOrSessionType
magpie_url, # type: Str
users, # type: UsersSettings
groups, # type: GroupsSettings
raise_errors=False, # type: bool
): # type: (...) -> None
"""
Applies the single permission entry retrieved from the permission configuration.
Assumes that permissions fields where pre-validated. Permission is applied for the user/group/resource using request
or db session accordingly to arguments.
"""
def _apply_request(_usr_name=None, _grp_name=None):
# type: (Optional[Str], Optional[Str]) -> Optional[AnyResponseType]
"""
Apply operation using HTTP request.
"""
action_oper = None
if _usr_name:
action_oper = UserResourcePermissionsAPI.path.format(user_name=_usr_name, resource_id=resource_id)
if _grp_name:
action_oper = GroupResourcePermissionsAPI.path.format(group_name=_grp_name, resource_id=resource_id)
if not action_oper:
return None
action_func = requests.post if create_perm else requests.delete
action_body = {"permission": perm.json()}
action_path = "{url}{path}".format(url=magpie_url, path=action_oper)
action_resp = action_func(action_path, json=action_body, cookies=cookies_or_session, timeout=5)
return action_resp
def _apply_session(_usr_name=None, _grp_name=None):
# type: (Optional[Str], Optional[Str]) -> AnyResponseType
"""
Apply operation using db session.
"""
# pylint: disable=C0415 # avoid circular imports
# pylint: disable=R1705 # aligned methods are easier to read
from magpie.api.management.group import group_utils as gt
from magpie.api.management.user import user_utils as ut
res = ResourceService.by_resource_id(resource_id, db_session=cookies_or_session)
if _usr_name:
usr = UserService.by_user_name(_usr_name, db_session=cookies_or_session)
if create_perm:
return ut.create_user_resource_permission_response(usr, res, perm, overwrite=True,
db_session=cookies_or_session)
else:
return ut.delete_user_resource_permission_response(usr, res, perm,
db_session=cookies_or_session)
if _grp_name:
grp = GroupService.by_group_name(_grp_name, db_session=cookies_or_session)
if create_perm:
return gt.create_group_resource_permission_response(grp, res, perm, overwrite=True,
db_session=cookies_or_session)
else:
return gt.delete_group_resource_permission_response(grp, res, perm,
db_session=cookies_or_session)
def _apply_profile(_usr_name=None, _grp_name=None):
# type: (Optional[Str], Optional[Str]) -> AnyResponseType
"""
Creates the user/group profile as required.
"""
password = pseudo_random_string(length=get_constant("MAGPIE_PASSWORD_MIN_LENGTH"))
usr_data = {
"user_name": _usr_name,
"password": users.get(_usr_name, {}).get("password", password),
"email": users.get(_usr_name, {}).get("email", "{}@mail.com".format(_usr_name)),
"group_name": users.get(_usr_name, {}).get("group", get_constant("MAGPIE_ANONYMOUS_GROUP"))
}
grp_data = {
"group_name": _grp_name,
"description": groups.get(_grp_name, {}).get("description", ""),
"discoverable": groups.get(_grp_name, {}).get("discoverable", False),
"terms": groups.get(_grp_name, {}).get("terms", "")
}
if _use_request(cookies_or_session):
if _usr_name:
path = "{url}{path}".format(url=magpie_url, path=UsersAPI.path)
return requests.post(path, json=usr_data, cookies=cookies_or_session, timeout=5)
if _grp_name:
path = "{url}{path}".format(url=magpie_url, path=GroupsAPI.path)
return requests.post(path, json=grp_data, cookies=cookies_or_session, timeout=5)
else:
if _usr_name:
from magpie.api.management.user.user_utils import create_user
usr_data["db_session"] = cookies_or_session # back-compatibility python 2 cannot have kw after **unpack
return create_user(**usr_data)
if _grp_name:
grp_data["db_session"] = cookies_or_session # back-compatibility python 2 cannot have kw after **unpack
from magpie.api.management.group.group_utils import create_group
return create_group(**grp_data)
def _validate_response(operation, is_create, item_type="Permission"):
# type: (Callable[[], Optional[AnyResponseType]], bool, str) -> None
"""
Validate action/operation applied and handles raised ``HTTPException`` as returned response.
"""
if not islambda(operation):
raise TypeError("invalid use of method")
try:
_resp = operation()
if _resp is None:
return
except HTTPException as exc:
_resp = exc
except Exception:
raise
# validation according to status code returned
if is_create:
if _resp.status_code in [200, 201]: # update/create
_handle_permission("{} successfully created.".format(item_type), entry_index,
level=logging.INFO, trail="")
elif _resp.status_code == 409:
_handle_permission("{} already exists.".format(item_type), entry_index, level=logging.INFO)
else:
_handle_permission("Unknown response [{}]".format(_resp.status_code), entry_index,
permission=permission_config_entry, level=logging.ERROR, raise_errors=raise_errors)
else:
if _resp.status_code == 200:
_handle_permission("{} successfully removed.".format(item_type), entry_index,
level=logging.INFO, trail="")
elif _resp.status_code == 404:
_handle_permission("{} already removed.".format(item_type), entry_index, level=logging.INFO)
else:
_handle_permission("Unknown response [{}]".format(_resp.status_code), entry_index,
permission=permission_config_entry, level=logging.ERROR, raise_errors=raise_errors)
create_perm = permission_config_entry["action"] == "create"
perm_def = permission_config_entry["permission"] # name or object
usr_name = permission_config_entry.get("user")
grp_name = permission_config_entry.get("group")
perm = PermissionSet(perm_def)
# process groups first as they can be referenced by user definitions
_validate_response(lambda: _apply_profile(None, grp_name), is_create=True)
_validate_response(lambda: _apply_profile(usr_name, None), is_create=True)
if _use_request(cookies_or_session):
_validate_response(lambda: _apply_request(None, grp_name), is_create=create_perm)
_validate_response(lambda: _apply_request(usr_name, None), is_create=create_perm)
else:
_validate_response(lambda: _apply_session(None, grp_name), is_create=create_perm)
_validate_response(lambda: _apply_session(usr_name, None), is_create=create_perm)
[docs]
def magpie_register_permissions_from_config(
permissions_config, # type: Union[Str, PermissionsConfig]
settings=None, # type: Optional[AnySettingsContainer]
db_session=None, # type: Optional[Session]
raise_errors=False, # type: bool
): # type: (...) -> None
"""
Applies `permissions` specified in configuration(s) defined as file, directory with files or literal configuration.
:param permissions_config: file/dir path to `permissions` config or JSON/YAML equivalent pre-loaded.
:param settings: Magpie settings to resolve an instance session when using requests instead of DB session.
Will look for ``magpie.url``, ``magpie.admin_user`` and ``magpie.admin_password`` by default, or any
corresponding environment variable resolution if omitted in the settings.
:param db_session: db session to use instead of requests to directly create/remove permissions with config.
:param raise_errors: raises errors related to permissions, instead of just logging the info.
.. seealso::
`magpie/config/permissions.cfg` for specific parameters and operational details.
"""
LOGGER.info("Starting permissions processing.")
magpie_url = None
if _use_request(db_session):
magpie_url = get_magpie_url(settings)
LOGGER.debug("Editing permissions using requests to [%s]...", magpie_url)
err_msg = "Invalid credentials to register Magpie permissions."
cookies_or_session = get_admin_cookies(settings, raise_message=err_msg)
else:
LOGGER.debug("Editing permissions using db session...")
cookies_or_session = db_session
LOGGER.debug("Loading configurations.")
if isinstance(permissions_config, list):
permissions = [permissions_config]
else:
permissions = get_all_configs(permissions_config, "permissions")
perms_cfg_count = len(permissions)
LOGGER.log(logging.INFO if perms_cfg_count else logging.WARNING,
"Found %s permissions configurations.", perms_cfg_count)
users_settings = groups_settings = None
if perms_cfg_count:
if isinstance(permissions_config, str):
users = get_all_configs(permissions_config, "users", allow_missing=True)
else:
users = []
if isinstance(permissions_config, str):
groups = get_all_configs(permissions_config, "groups", allow_missing=True)
else:
groups = []
users_settings = _resolve_config_registry(users, "username") or {}
groups_settings = _resolve_config_registry(groups, "name") or {}
for i, perms in enumerate(permissions):
LOGGER.info("Processing permissions from configuration (%s/%s).", i + 1, perms_cfg_count)
_process_permissions(
perms,
magpie_url,
cookies_or_session,
users_settings,
groups_settings,
settings,
raise_errors,
)
LOGGER.info("All permissions processed.")
[docs]
def _resolve_config_registry(config_files, key):
# type: (Optional[MultiConfigs], Str) -> AnyResolvedSettings
"""
Converts a list of configurations entries from multiple files into a single resolved mapping.
Resolution is accomplished against :paramref:`key` to generate the mapping of unique items.
First configuration entries have priority over later ones if keys are duplicated.
"""
config_map = {}
config_files = config_files or []
for cfg in config_files:
if not cfg:
continue
if isinstance(cfg, dict):
cfg_key = cfg.get(key, None)
if cfg_key:
config_map[cfg_key] = cfg
else:
for cfg_item in cfg:
cfg_key = cfg_item.get(key, None)
if cfg_key:
config_map[cfg_key] = cfg_item
return config_map
[docs]
def _process_permissions(
permissions, # type: PermissionsConfig
magpie_url, # type: Str
cookies_or_session, # type: Session
users=None, # type: Optional[UsersSettings]
groups=None, # type: Optional[GroupsSettings]
settings=None, # type: Optional[AnySettingsContainer]
raise_errors=False, # type: bool
): # type: (...) -> None
"""
Processes a single `permissions` configuration.
"""
if not permissions:
LOGGER.warning("Permissions configuration are empty.")
return
anon_user = get_constant("MAGPIE_ANONYMOUS_USER", settings)
perm_count = len(permissions)
LOGGER.log(logging.INFO if perm_count else logging.WARNING,
"Found %s permissions to evaluate from configuration.", perm_count)
for i, perm_cfg in enumerate(permissions):
# parameter validation
if not isinstance(perm_cfg, dict) or not all(f in perm_cfg for f in ["permission", "service"]):
_handle_permission("Invalid permission format for [{!s}]".format(perm_cfg), i, raise_errors=raise_errors)
continue
try:
perm = PermissionSet(perm_cfg["permission"])
except (ValueError, TypeError):
perm = None
if not perm:
_handle_permission("Unknown permission [{!s}]".format(perm_cfg["permission"]), i, raise_errors=raise_errors)
continue
usr_name = perm_cfg.get("user")
grp_name = perm_cfg.get("group")
if not any([usr_name, grp_name]):
_handle_permission("Missing required user and/or group field.", i, raise_errors=raise_errors)
continue
if usr_name == anon_user:
_handle_permission("Skipping forbidden user permission (reserved special user: {}).".format(anon_user), i)
continue
if "action" not in perm_cfg:
_handle_permission("Unspecified action", i, trail="using default (create)...", raise_errors=raise_errors)
perm_cfg["action"] = "create"
if perm_cfg["action"] not in ["create", "remove"]:
_handle_permission("Unknown action [{!s}]".format(perm_cfg["action"]), i, raise_errors=raise_errors)
continue
# retrieve service for permissions validation
svc_name = perm_cfg["service"]
if _use_request(cookies_or_session):
svc_path = magpie_url + ServiceAPI.path.format(service_name=svc_name)
svc_resp = requests.get(svc_path, cookies=cookies_or_session, timeout=5)
if svc_resp.status_code != 200:
_handle_permission("Unknown service [{!s}]".format(svc_name), i, raise_errors=raise_errors)
continue
service_json = get_json(svc_resp)
service_info = service_json.get(svc_name) or service_json.get("service") # format depends on magpie version
else:
transaction.commit() # force any pending transaction to be applied to find possible dependencies
svc = models.Service.by_service_name(svc_name, db_session=cookies_or_session)
if not svc:
_handle_permission("Unknown service [{!s}]. Can't edit permissions without service.".format(svc_name),
i, raise_errors=raise_errors)
continue
from magpie.api.management.service.service_formats import format_service
service_info = format_service(svc)
# apply permission config
resource_id, found = _parse_resource_path(perm_cfg, i, service_info, cookies_or_session, magpie_url,
raise_errors)
if found:
if not resource_id:
resource_id = service_info["resource_id"]
_apply_permission_entry(perm_cfg, i, resource_id, cookies_or_session, magpie_url, users, groups,
raise_errors)
if not _use_request(cookies_or_session):
transaction.commit()
LOGGER.info("Done processing permissions configuration.")
[docs]
def pseudo_random_string(length=8, allow_chars=string.ascii_letters + string.digits):
# type: (int, Str) -> Str
"""
Generate a string made of random characters.
"""
rnd = random.SystemRandom()
return "".join(rnd.choice(allow_chars) for _ in range(length))