Source code for magpie.db

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import inspect
import logging
import time
import warnings
from typing import TYPE_CHECKING

import alembic
import alembic.command
import alembic.config
import six
import transaction
from pyramid.settings import asbool
from sqlalchemy import engine_from_config
from sqlalchemy import exc as sa_exc
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.orm import configure_mappers
from sqlalchemy.orm.session import Session, sessionmaker
from zope.sqlalchemy import register

from magpie.constants import get_constant
from magpie.utils import get_logger, get_settings, get_settings_from_config_ini, print_log, raise_log

# import or define all models here to ensure they are attached to the
# Base.metadata prior to any initialization routines
from magpie import models  # isort:skip # noqa: E402

if TYPE_CHECKING:
    # pylint: disable=W0611,unused-import
    from typing import Any, Optional, Union

    from sqlalchemy.engine.base import Engine

    from magpie.typedefs import AnySettingsContainer, SettingsType, Str


[docs]LOGGER = get_logger(__name__)
# run configure_mappers after defining all of the models to ensure # all relationships can be setup configure_mappers()
[docs]def get_db_url(username=None, # type: Optional[Str] password=None, # type: Optional[Str] db_host=None, # type: Optional[Str] db_port=None, # type: Optional[Union[Str,int]] db_name=None, # type: Optional[Str] settings=None, # type: AnySettingsContainer ): # type: (...) -> Str """ Retrieve the database connection URL with provided settings. """ db_url = get_constant("MAGPIE_DB_URL", settings, raise_missing=False, print_missing=True, raise_not_set=False) if db_url: LOGGER.info("Using setting 'MAGPIE_DB_URL' for database connection.") else: def _get(param, names): if param is not None: return param if isinstance(names, six.string_types): names = [names] default = get_constant("MAGPIE_POSTGRES_{}".format(names[0].upper()), {}, raise_not_set=False) for prefixes in [("MAGPIE_POSTGRES_", "magpie.postgres_"), ("POSTGRES_", "postgres.")]: for kw in names: kw_envvar = "{}{}".format(prefixes[0], kw.upper()) kw_setting = "{}{}".format(prefixes[1], kw.lower()) param = get_constant(kw_envvar, settings, kw_setting, raise_missing=False, raise_not_set=False) if param not in (None, default): return param return default db_url = "postgresql://%s:%s@%s:%s/%s" % ( _get(username, ["username", "user"]), _get(password, "password"), _get(db_host, "host"), _get(db_port, "port"), _get(db_name, ["db", "database"]), ) LOGGER.info("Using composed settings 'MAGPIE_POSTGRES_<>' for database connection.") LOGGER.debug("Resolved database connection URL: [%s]", db_url) return db_url
[docs]def get_engine(container=None, prefix="sqlalchemy.", **kwargs): # type: (Optional[AnySettingsContainer], Str, Any) -> Engine settings = get_settings(container or {}) settings[prefix + "url"] = get_db_url(settings=settings) settings.setdefault(prefix + "pool_pre_ping", True) kwargs = kwargs or {} kwargs["convert_unicode"] = True return engine_from_config(settings, prefix, **kwargs)
[docs]def get_session_factory(engine): return sessionmaker(bind=engine)
[docs]def get_tm_session(session_factory, transaction_manager): """ Get a ``sqlalchemy.orm.Session`` instance backed by a transaction. This function will hook the session to the transaction manager which will take care of committing any changes. - When using pyramid_tm it will automatically be committed or aborted depending on whether an exception is raised. - When using scripts you should wrap the session in a manager yourself. For example:: import transaction engine = get_engine(settings) session_factory = get_session_factory(engine) with transaction.manager: db_session = get_tm_session(session_factory, transaction.manager) """ db_session = session_factory() register(db_session, transaction_manager=transaction_manager) return db_session
[docs]def get_db_session_from_settings(settings=None, **kwargs): # type: (Optional[AnySettingsContainer], Any) -> Session session_factory = get_session_factory(get_engine(settings, **kwargs)) db_session = get_tm_session(session_factory, transaction.manager) return db_session
[docs]def get_db_session_from_config_ini(config_ini_path, ini_main_section_name="app:magpie_app", settings_override=None): settings = get_settings_from_config_ini(config_ini_path, ini_main_section_name) if isinstance(settings_override, dict): settings.update(settings_override) return get_db_session_from_settings(settings)
[docs]def run_database_migration(settings=None, db_session=None): # type: (Optional[SettingsType], Optional[Session]) -> None """ Runs database migration operations with :mod:`alembic`, using the provided session or a new engine connection. """ ini_file = get_constant("MAGPIE_INI_FILE_PATH", settings) LOGGER.info("Using file '%s' for migration.", ini_file) alembic_args = ["-c", ini_file, "upgrade", "heads"] with warnings.catch_warnings(): warnings.simplefilter("ignore", category=sa_exc.SAWarning) if not isinstance(db_session, Session): alembic.config.main(argv=alembic_args) else: engine = db_session.bind with engine.begin() as connection: alembic_cfg = alembic.config.Config(file_=ini_file) alembic_cfg.attributes["connection"] = connection # pylint: disable=E1137 alembic.command.upgrade(alembic_cfg, "head")
[docs]def get_database_revision(db_session): # type: (Session) -> Str """ Obtains the database revision number employed by :mod:`alembic` for schema migration. """ query = "SELECT version_num FROM alembic_version" result = db_session.execute(query).fetchone() return result["version_num"]
[docs]def is_database_ready(db_session=None, container=None): # type: (Optional[Session], Optional[AnySettingsContainer]) -> bool """ Obtains the database status against expected table names to ensure it is ready for use. """ if isinstance(db_session, Session): engine = db_session.bind else: engine = get_engine(container=container) inspector = Inspector.from_engine(engine) table_names = inspector.get_table_names() for _, obj in inspect.getmembers(models): if inspect.isclass(obj) and hasattr(obj, "__tablename__"): if obj.__tablename__ not in table_names: print_log("Database table (or its associated parent) is missing for '{}' object".format(obj), logger=LOGGER, level=logging.ERROR) return False return True
[docs]def run_database_migration_when_ready(settings, db_session=None): # type: (SettingsType, Optional[Session]) -> None """ Runs db migration if requested by config and need from revisions. """ db_ready = False if asbool(get_constant("MAGPIE_DB_MIGRATION", settings, "magpie.db_migration", default_value=True, raise_missing=False, raise_not_set=False, print_missing=True)): conf_attempts = int(get_constant("MAGPIE_DB_MIGRATION_ATTEMPTS", settings, "magpie.db_migration_attempts", default_value=5, raise_missing=False, raise_not_set=False, print_missing=True)) print_log("Running database migration (as required)...", logger=LOGGER) attempts = max(conf_attempts, 1) if attempts != conf_attempts: print_log("Database migration attempts updated to {}".format(attempts), logger=LOGGER, level=logging.WARNING) for i in range(1, attempts + 1): try: run_database_migration(db_session=db_session, settings=settings) except ImportError as exc: print_log("Database migration produced [{!r}] (ignored).".format(exc), logger=LOGGER, level=logging.WARNING, exc_info=exc) except Exception as exc: if i <= attempts: print_log("Database migration failed [{!r}]. Retrying... ({}/{})".format(exc, i, attempts), logger=LOGGER, level=logging.WARNING, exc_info=exc) time.sleep(2) continue raise_log("Database migration failed [{!r}]".format(exc), exception=RuntimeError, logger=LOGGER) db_ready = is_database_ready(db_session) if not db_ready: if i <= attempts: print_log("Database not ready. Retrying... ({}/{})".format(i, attempts), logger=LOGGER, level=logging.WARNING) time.sleep(2) continue print_log("Database not ready. Maximum attempts reached ({})".format(attempts), logger=LOGGER, level=logging.WARNING) break else: print_log("Database migration skipped as per 'MAGPIE_DB_MIGRATION' requirement...", logger=LOGGER) db_ready = is_database_ready(db_session) if not db_ready: raise_log("Database not ready", exception=RuntimeError, logger=LOGGER)
[docs]def set_sqlalchemy_log_level(magpie_log_level): # type: (Union[Str, int]) -> SettingsType """ Suppresses :py:mod:`sqlalchemy` verbose logging if not in ``logging.DEBUG`` for Magpie. """ if isinstance(magpie_log_level, six.string_types): magpie_log_level = logging.getLevelName(magpie_log_level) sa_settings = {"sqlalchemy.echo": True} if magpie_log_level > logging.DEBUG: sa_settings["sqlalchemy.echo"] = False sa_loggers = "sqlalchemy.engine.base.Engine".split(".") sa_log = logging.getLogger(sa_loggers[0]) sa_log.setLevel(logging.WARN) # WARN to avoid INFO logs which are too verbose for h in sa_log.handlers: sa_log.removeHandler(h) for sa_mod in sa_loggers[1:]: sa_log = sa_log.getChild(sa_mod) sa_log.setLevel(logging.WARN) return sa_settings
[docs]def includeme(config): # use pyramid_tm to hook the transaction lifecycle to the request config.include("pyramid_tm") session_factory = get_session_factory(get_engine(config)) config.registry["db_session_factory"] = session_factory # make `request.db` available for use in Pyramid config.add_request_method( # r.tm is the transaction manager used by pyramid_tm lambda r: get_tm_session(session_factory, r.tm), "db", reify=True
)