Source code for magpie.db

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from magpie.constants import get_constant
from magpie.definitions.alembic_definitions import alembic
from magpie.definitions.sqlalchemy_definitions import (
    register, sessionmaker, engine_from_config,
    configure_mappers, select, Inspector, Session, sa_exc
)
from magpie.definitions.pyramid_definitions import asbool
from magpie.utils import get_settings_from_config_ini, get_settings, print_log, raise_log, get_logger
from typing import TYPE_CHECKING
import transaction
import inspect
import warnings
import logging
import time

# import or define all models here to ensure they are attached to the
# Base.metadata prior to any initialization routines
from magpie import models

if TYPE_CHECKING:
    from magpie.definitions.typedefs import Any, AnySettingsContainer, SettingsType, Str, Optional, Union  # noqa: F401
    from magpie.definitions.sqlalchemy_definitions import Engine  # noqa: F401


[docs]LOGGER = get_logger(__name__)
# run configure_mappers after defining all of the models to ensure # all relationships can be setup configure_mappers()
[docs]def get_db_url(username=None, password=None, db_host=None, db_port=None, db_name=None, settings=None): return "postgresql://%s:%s@%s:%s/%s" % ( username if username is not None else get_constant("MAGPIE_POSTGRES_USER", settings, "postgres.user"), password if password is not None else get_constant("MAGPIE_POSTGRES_PASSWORD", settings, "postgres.password"), db_host if db_host is not None else get_constant("MAGPIE_POSTGRES_HOST", settings, "postgres.host"), db_port if db_port is not None else get_constant("MAGPIE_POSTGRES_PORT", settings, "postgres.port"), db_name if db_name is not None else get_constant("MAGPIE_POSTGRES_DB", settings, "postgres.db"),
)
[docs]def get_engine(container=None, prefix="sqlalchemy.", **kwargs): # type: (Optional[AnySettingsContainer], Str, Any) -> Engine settings = get_settings(container or {}) settings[prefix + "url"] = get_db_url() settings.setdefault(prefix + "pool_pre_ping", True) kwargs = kwargs or {} kwargs["convert_unicode"] = True return engine_from_config(settings, prefix, **kwargs)
[docs]def get_session_factory(engine): return sessionmaker(bind=engine)
[docs]def get_tm_session(session_factory, transaction_manager): """ Get a ``sqlalchemy.orm.Session`` instance backed by a transaction. This function will hook the session to the transaction manager which will take care of committing any changes. - When using pyramid_tm it will automatically be committed or aborted depending on whether an exception is raised. - When using scripts you should wrap the session in a manager yourself. For example:: import transaction engine = get_engine(settings) session_factory = get_session_factory(engine) with transaction.manager: db_session = get_tm_session(session_factory, transaction.manager) """ db_session = session_factory() register(db_session, transaction_manager=transaction_manager) return db_session
[docs]def get_db_session_from_settings(settings=None, **kwargs): # type: (Optional[AnySettingsContainer], Any) -> Session session_factory = get_session_factory(get_engine(settings, **kwargs)) db_session = get_tm_session(session_factory, transaction.manager) return db_session
[docs]def get_db_session_from_config_ini(config_ini_path, ini_main_section_name="app:magpie_app", settings_override=None): settings = get_settings_from_config_ini(config_ini_path, ini_main_section_name) if isinstance(settings_override, dict): settings.update(settings_override) return get_db_session_from_settings(settings)
[docs]def run_database_migration(db_session=None): # type: (Optional[Session]) -> None """ Runs db migration operations with alembic, using db session or a new engine connection. """ ini_file = get_constant("MAGPIE_ALEMBIC_INI_FILE_PATH") LOGGER.info("Using file '{}' for migration.".format(ini_file)) alembic_args = ["-c", ini_file, "upgrade", "heads"] if not isinstance(db_session, Session): alembic.config.main(argv=alembic_args) else: engine = db_session.bind with engine.begin() as connection: alembic_cfg = alembic.config.Config(file_=ini_file) alembic_cfg.attributes['connection'] = connection alembic.command.upgrade(alembic_cfg, "head")
[docs]def get_database_revision(db_session): # type: (Session) -> Str s = "SELECT version_num FROM alembic_version" result = db_session.execute(s).fetchone() return result["version_num"]
[docs]def is_database_ready(db_session=None): # type: (Optional[Session]) -> bool if isinstance(db_session, Session): engine = db_session.bind else: engine = get_engine(dict()) inspector = Inspector.from_engine(engine) table_names = inspector.get_table_names() for _, obj in inspect.getmembers(models): if inspect.isclass(obj): # noinspection PyBroadException try: curr_table_name = obj.__tablename__ if curr_table_name not in table_names: return False except Exception: continue return True
[docs]def run_database_migration_when_ready(settings, db_session=None): # type: (SettingsType, Optional[Session]) -> None """ Runs db migration if requested by config and need from revisions. """ db_ready = False if asbool(get_constant("MAGPIE_DB_MIGRATION", settings, "magpie.db_migration", default_value=True, raise_missing=False, raise_not_set=False, print_missing=True)): attempts = int(get_constant("MAGPIE_DB_MIGRATION_ATTEMPTS", settings, "magpie.db_migration_attempts", default_value=5, raise_missing=False, raise_not_set=False, print_missing=True)) print_log("Running database migration (as required)...") attempts = max(attempts, 2) # enforce at least 2 attempts, 1 for db creation and one for actual migration for i in range(1, attempts + 1): try: with warnings.catch_warnings(): warnings.simplefilter("ignore", category=sa_exc.SAWarning) run_database_migration(db_session) except ImportError as e: print_log("Database migration produced [{!r}] (ignored).".format(e), level=logging.WARNING) pass except Exception as e: if i <= attempts: print_log("Database migration failed [{!r}]. Retrying... ({}/{})".format(e, i, attempts)) time.sleep(2) continue else: raise_log("Database migration failed [{!r}]".format(e), exception=RuntimeError) db_ready = is_database_ready(db_session) if not db_ready: print_log("Database not ready. Retrying... ({}/{})".format(i, attempts)) time.sleep(2) continue break else: db_ready = is_database_ready(db_session) if not db_ready: time.sleep(2) raise_log("Database not ready", exception=RuntimeError)
[docs]def set_sqlalchemy_log_level(magpie_log_level): # type: (Union[Str, int]) -> SettingsType """ Suppresses sqlalchemy logging if not in debug for magpie. """ log_lvl = logging.getLevelName(magpie_log_level) if isinstance(magpie_log_level, int) else magpie_log_level sa_settings = {"sqlalchemy.echo": True} if log_lvl.upper() != "DEBUG": sa_settings["sqlalchemy.echo"] = False sa_loggers = "sqlalchemy.engine.base.Engine".split(".") sa_log = logging.getLogger(sa_loggers[0]) sa_log.setLevel(logging.WARN) # WARN to avoid INFO logs for h in sa_log.handlers: sa_log.removeHandler(h) for sa_mod in sa_loggers[1:]: sa_log = sa_log.getChild(sa_mod) sa_log.setLevel(logging.WARN) return sa_settings
[docs]def includeme(config): # use pyramid_tm to hook the transaction lifecycle to the request config.include("pyramid_tm") session_factory = get_session_factory(get_engine(config)) config.registry["db_session_factory"] = session_factory # make `request.db` available for use in Pyramid config.add_request_method( # r.tm is the transaction manager used by pyramid_tm lambda r: get_tm_session(session_factory, r.tm), "db", reify=True
)