#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
from magpie.constants import get_constant
from magpie.definitions.pyramid_definitions import (
HTTPOk, HTTPClientError, HTTPException, ConfigurationError, Configurator, Registry, Request, Response, truthy
)
from six.moves.urllib.parse import urlparse
# noinspection PyProtectedMember
from enum import EnumMeta
from requests.cookies import RequestsCookieJar
from requests.structures import CaseInsensitiveDict
from webob.headers import ResponseHeaders, EnvironHeaders
from distutils.dir_util import mkpath
from six.moves import configparser
from typing import TYPE_CHECKING
import requests
import logging
import types
import six
import sys
import os
if TYPE_CHECKING:
from magpie.definitions.typedefs import ( # noqa: F401
Any, AnyKey, Str, List, Optional, Type, Union,
AnyResponseType, AnyHeadersType, LoggerType, CookiesType, SettingsType, AnySettingsContainer,
)
# noinspection PyProtectedMember
from typing import _TC # noqa: F401
[docs]CONTENT_TYPE_ANY = "*/*"
[docs]CONTENT_TYPE_JSON = "application/json"
[docs]CONTENT_TYPE_FORM = "application/x-www-form-urlencoded"
[docs]CONTENT_TYPE_HTML = "text/html"
[docs]CONTENT_TYPE_PLAIN = "text/plain"
[docs]SUPPORTED_CONTENT_TYPES = [CONTENT_TYPE_JSON, CONTENT_TYPE_HTML, CONTENT_TYPE_PLAIN]
[docs]def get_logger(name, level=None):
"""
Immediately sets the logger level to avoid duplicate log outputs from the `root logger` and `this logger` when
`level` is `NOTSET`.
"""
logger = logging.getLogger(name)
if logger.level == logging.NOTSET:
if level is None:
from magpie.constants import MAGPIE_LOG_LEVEL
level = MAGPIE_LOG_LEVEL
logger.setLevel(level)
return logger
[docs]LOGGER = get_logger(__name__)
[docs]def print_log(msg, logger=None, level=logging.INFO):
# type: (Str, Optional[LoggerType], int) -> None
from magpie.constants import MAGPIE_LOG_PRINT # cannot use 'get_constant', recursive call
if not logger:
logger = get_logger(__name__)
if MAGPIE_LOG_PRINT:
all_handlers = logging.root.handlers + logger.handlers
if not any(isinstance(h, logging.StreamHandler) for h in all_handlers):
logger.addHandler(logging.StreamHandler(sys.stdout))
if logger.disabled:
logger.disabled = False
logger.log(level, msg)
[docs]def raise_log(msg, exception=Exception, logger=None, level=logging.ERROR):
# type: (Str, Optional[Type[Exception]], Optional[LoggerType], int) -> None
if not logger:
logger = get_logger(__name__)
logger.log(level, msg)
if not isclass(exception) or not issubclass(exception, Exception):
exception = Exception
raise exception(msg)
[docs]def bool2str(value):
# type: (Any) -> Str
return "true" if str(value).lower() in truthy else "false"
[docs]def islambda(func):
return isinstance(func, types.LambdaType) and func.__name__ == (lambda: None).__name__
[docs]def isclass(obj):
"""
Evaluate an object for class type (ie: class definition, not an instance nor any other type).
:param obj: object to evaluate for class type
:return: (bool) indicating if `object` is a class
"""
return isinstance(obj, (type, six.class_types))
# alternative to 'makedirs' with 'exists_ok' parameter only available for python>3.5
[docs]def make_dirs(path):
dir_path = os.path.dirname(path)
if not os.path.isfile(path) or not os.path.isdir(dir_path):
for subdir in mkpath(dir_path):
if not os.path.isdir(subdir):
os.mkdir(subdir)
[docs]def get_settings_from_config_ini(config_ini_path, ini_main_section_name="app:magpie_app"):
parser = configparser.ConfigParser()
parser.optionxform = lambda option: option # preserve case of config (ziggurat requires it for 'User' model)
parser.read([config_ini_path])
settings = dict(parser.items(ini_main_section_name))
return settings
[docs]def get_json(response):
"""
Retrieves the 'JSON' body of a response using the property/callable according to the response's implementation.
"""
if isinstance(response.json, dict):
return response.json
return response.json()
[docs]def convert_response(response):
# type: (AnyResponseType) -> Response
"""
Converts a ``response`` implementation (e.g.: ``requests.Response``) to an equivalent ``pyramid.response.Response``
version.
"""
if isinstance(response, Response):
return response
json_body = get_json(response)
pyramid_response = Response(body=json_body, headers=response.headers)
if hasattr(response, "cookies"):
for cookie in response.cookies:
pyramid_response.set_cookie(name=cookie.name, value=cookie.value, overwrite=True)
if isinstance(response, HTTPException):
# noinspection PyProtectedMember
for header_name, header_value in response.headers._items:
if header_name.lower() == "set-cookie":
pyramid_response.set_cookie(name=header_name, value=header_value, overwrite=True)
return pyramid_response
[docs]def get_admin_cookies(container, verify=True, raise_message=None):
# type: (AnySettingsContainer, bool, Optional[Str]) -> CookiesType
from magpie.api.schemas import SigninAPI
magpie_url = get_magpie_url(container)
magpie_login_url = "{}{}".format(magpie_url, SigninAPI.path)
cred = {"user_name": get_constant("MAGPIE_ADMIN_USER", container),
"password": get_constant("MAGPIE_ADMIN_PASSWORD", container)}
resp = requests.post(magpie_login_url, data=cred, headers={"Accept": CONTENT_TYPE_JSON}, verify=verify)
if resp.status_code != HTTPOk.code:
if raise_message:
raise_log(raise_message, logger=LOGGER)
raise resp.raise_for_status()
token_name = get_constant("MAGPIE_COOKIE_NAME", container)
# use specific domain to differentiate between `.{hostname}` and `{hostname}` variations if applicable
# noinspection PyProtectedMember
request_cookies = resp.cookies
magpie_cookies = list(filter(lambda cookie: cookie.name == token_name, request_cookies))
magpie_domain = urlparse(magpie_url).hostname if len(magpie_cookies) > 1 else None
session_cookies = RequestsCookieJar.get(request_cookies, token_name, domain=magpie_domain)
return {token_name: session_cookies}
[docs]def get_settings(container):
# type: (AnySettingsContainer) -> SettingsType
if isinstance(container, (Configurator, Request)):
return container.registry.settings
if isinstance(container, Registry):
return container.settings
if isinstance(container, dict):
return container
raise TypeError("Could not retrieve settings from container object [{}]".format(type(container)))
[docs]def patch_magpie_url(container):
# type: (AnySettingsContainer) -> SettingsType
"""
Updates potentially missing configuration settings for normal application execution.
"""
settings = get_settings(container)
try:
get_magpie_url(settings)
except ConfigurationError:
magpie_url_template = "{scheme}://{hostname}:{port}"
port = get_constant("MAGPIE_PORT", settings, raise_not_set=False)
scheme = get_constant("MAGPIE_SCHEME", settings, raise_missing=False, raise_not_set=False, default_value="http")
if port:
settings["magpie.port"] = port
hostname = get_constant("HOSTNAME")
if hostname:
magpie_url = magpie_url_template.format(scheme=scheme, hostname=hostname, port=settings["magpie.port"])
print_log("Updating 'magpie.url' value: {}".format(magpie_url), LOGGER, logging.WARNING)
settings["magpie.url"] = magpie_url
return settings
[docs]def get_magpie_url(container=None):
# type: (Optional[AnySettingsContainer]) -> Str
if container is None:
LOGGER.warning("Registry not specified, trying to find Magpie URL from environment")
url = get_constant("MAGPIE_URL", raise_missing=False, raise_not_set=False, print_missing=False)
if url:
return url
hostname = get_constant("HOSTNAME", raise_not_set=False, raise_missing=False) or \
get_constant("MAGPIE_HOST", raise_not_set=False, raise_missing=False) # noqa
if not hostname:
raise ConfigurationError("Missing or unset MAGPIE_HOST or HOSTNAME value.")
magpie_port = get_constant("MAGPIE_PORT", raise_not_set=False)
magpie_scheme = get_constant("MAGPIE_SCHEME", raise_not_set=False, raise_missing=False, default_value="http")
return "{}://{}{}".format(magpie_scheme, hostname, ":{}".format(magpie_port) if magpie_port else "")
try:
# add "http" scheme to url if omitted from config since further 'requests' calls fail without it
# mostly for testing when only "localhost" is specified
# otherwise config should explicitly define it with 'MAGPIE_URL' env or 'magpie.url' config
settings = get_settings(container)
url_parsed = urlparse(get_constant("MAGPIE_URL", settings, "magpie.url").strip("/"))
if url_parsed.scheme in ["http", "https"]:
return url_parsed.geturl()
else:
magpie_url = "http://{}".format(url_parsed.geturl())
print_log("Missing scheme from settings URL, new value: '{}'".format(magpie_url), LOGGER, logging.WARNING)
return magpie_url
except AttributeError:
# If magpie.url does not exist, calling strip fct over None will raise this issue
raise ConfigurationError("MAGPIE_URL or magpie.url config cannot be found")
[docs]def get_phoenix_url(container=None):
# type: (Optional[AnySettingsContainer]) -> Str
hostname = get_constant("PHOENIX_HOST", container, raise_missing=False, raise_not_set=False) or \
get_constant("HOSTNAME", raise_missing=False, raise_not_set=False)
if not hostname:
raise ConfigurationError("Missing or unset PHOENIX_HOST or HOSTNAME value.")
phoenix_port = get_constant("PHOENIX_PORT", raise_not_set=False)
return "https://{0}{1}".format(hostname, ":{}".format(phoenix_port) if phoenix_port else "")
[docs]def get_twitcher_protected_service_url(magpie_service_name, hostname=None):
twitcher_proxy_url = get_constant("TWITCHER_PROTECTED_URL", raise_not_set=False)
if not twitcher_proxy_url:
twitcher_proxy = get_constant("TWITCHER_PROTECTED_PATH", raise_not_set=False)
if not twitcher_proxy.endswith("/"):
twitcher_proxy = twitcher_proxy + "/"
if not twitcher_proxy.startswith("/"):
twitcher_proxy = "/" + twitcher_proxy
if not twitcher_proxy.startswith("/twitcher"):
twitcher_proxy = "/twitcher" + twitcher_proxy
hostname = hostname or get_constant("HOSTNAME")
twitcher_proxy_url = "https://{0}{1}".format(hostname, twitcher_proxy)
twitcher_proxy_url = twitcher_proxy_url.rstrip("/")
return "{0}/{1}".format(twitcher_proxy_url, magpie_service_name)
[docs]def log_request(event):
"""
Subscriber event that logs basic details about the incoming requests.
"""
LOGGER.info("Request: [{}]".format(log_request_format(event.request)))
# noinspection PyUnusedLocal
[docs]def log_exception_tween(handler, registry):
"""
Tween factory that logs any exception before re-raising it.
Application errors are marked as ``ERROR`` while non critical HTTP errors are marked as ``WARNING``.
"""
def log_exc(request):
try:
return handler(request)
except Exception as err:
lvl = logging.ERROR
exc = True
if isinstance(err, HTTPClientError):
lvl = logging.WARNING
exc = False
LOGGER.log(lvl, "Exception during request: [{}]".format(log_request_format(request)), exc_info=exc)
raise err
return log_exc
[docs]def is_json_body(body):
if not body:
return False
try:
json.loads(body)
except (ValueError, TypeError):
return False
return True