Source code for magpie.cli.register_defaults

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Magpie helpers for user and group registration.
"""
import argparse
import logging
import time
from typing import TYPE_CHECKING

import transaction
from sqlalchemy.orm.session import Session
from ziggurat_foundations.models.services import BaseService
from ziggurat_foundations.models.services.group import GroupService
from ziggurat_foundations.models.services.user import UserService

from magpie import db, models
from magpie.api.management.user import user_utils as uu
from magpie.cli.utils import make_logging_options, setup_logger_from_options
from magpie.constants import get_constant
from magpie.register import pseudo_random_string
from magpie.utils import get_json, get_logger, print_log, raise_log

if TYPE_CHECKING:
    # pylint: disable=W0611,unused-import
    from typing import Any, Optional, Sequence

    from magpie.typedefs import AnySettingsContainer, Str

[docs] LOGGER = get_logger(__name__)
[docs] def register_user_with_group(user_name, group_name, email, password, db_session): # type: (Str, Str, Str, Optional[Str], Session) -> None """ Registers the user if missing and associate him to a group specified by name, also created if missing. :param user_name: name of the user to create (if missing) and to make part of the group (if specified) :param group_name: name of the group to create (if missing and specified) and to make the user join (if not already) :param email: email of the user to be created (if missing) :param password: password of the user to be created (if missing), auto-generate if not provided (None). :param db_session: database connexion to apply changes .. warning:: Should be employed only for **special** users/groups in this module as other expected API behaviour and operations will not be applied (ex: create additional permissions or user-group references). """ if not GroupService.by_group_name(group_name, db_session=db_session): new_group = models.Group(group_name=group_name) # noqa db_session.add(new_group) registered_group = GroupService.by_group_name(group_name=group_name, db_session=db_session) registered_user = UserService.by_user_name(user_name, db_session=db_session) if not registered_user: if password is None: LOGGER.debug("No password provided for user [%s], auto-generating one.", user_name) password = pseudo_random_string(length=get_constant("MAGPIE_PASSWORD_MIN_LENGTH")) uu.check_user_info(user_name=user_name, password=password, group_name=group_name, check_email=False, check_not_reserved=False) new_user = models.User(user_name=user_name, email=email) # noqa UserService.set_password(new_user, password) db_session.add(new_user) if group_name is not None: registered_user = UserService.by_user_name(user_name, db_session=db_session) else: print_log("User '{}' already exist".format(user_name), level=logging.DEBUG, logger=LOGGER) try: # ensure the reference between user/group exists (user joined the group) user_group_refs = BaseService.all(models.UserGroup, db_session=db_session) user_group_refs_tup = [(ref.group_id, ref.user_id) for ref in user_group_refs] if (registered_group.id, registered_user.id) not in user_group_refs_tup: group_entry = models.UserGroup(group_id=registered_group.id, user_id=registered_user.id) # noqa db_session.add(group_entry) except Exception: # noqa: W0703 # nosec: B110 # pragma: no cover # in case reference already exists, avoid duplicate error db_session.rollback()
[docs] def init_anonymous(db_session, settings=None): # type: (Session, Optional[AnySettingsContainer]) -> None """ Registers into the database the user and group matching configuration values of. :py:data:`magpie.constants.MAGPIE_ANONYMOUS_USER` and :py:data:`magpie.constants.MAGPIE_ANONYMOUS_GROUP` respectively if not defined. Afterwards, updates the group's parameters to ensure integrity with `Magpie` settings. """ anonymous_group = get_constant("MAGPIE_ANONYMOUS_GROUP", settings_container=settings) register_user_with_group(user_name=get_constant("MAGPIE_ANONYMOUS_USER", settings_container=settings), group_name=anonymous_group, email=get_constant("MAGPIE_ANONYMOUS_EMAIL", settings_container=settings), password=None, # autogen, value doesn't matter as no login applicable, just make it valid db_session=db_session) # enforce some admin group fields group = GroupService.by_group_name(anonymous_group, db_session=db_session) group.description = "Group that grants public access to its members for applicable resources." group.discoverable = False
[docs] def init_admin(db_session, settings=None): # type: (Session, Optional[AnySettingsContainer]) -> None """ Registers into the database the user and group matching configuration values of. :py:data:`magpie.constants.MAGPIE_ADMIN_USER` and :py:data:`magpie.constants.MAGPIE_ADMIN_GROUP` respectively if not defined. Also associates the created admin user with the admin group and give it admin permissions. Finally, updates the group's parameters to ensure integrity with `Magpie` settings. """ admin_usr_name = get_constant("MAGPIE_ADMIN_USER", settings_container=settings) admin_grp_name = get_constant("MAGPIE_ADMIN_GROUP", settings_container=settings) admin_password = get_constant("MAGPIE_ADMIN_PASSWORD", settings_container=settings) admin_usr = UserService.by_user_name(admin_usr_name, db_session=db_session) if not (admin_usr and GroupService.by_group_name(admin_grp_name, db_session=db_session)): register_user_with_group(user_name=admin_usr_name, group_name=admin_grp_name, email=get_constant("MAGPIE_ADMIN_EMAIL", settings_container=settings), password=admin_password, db_session=db_session) # allow old 'invalid' password to ease backward support of existing servers elif admin_usr and not UserService.check_password(admin_usr, admin_password): # admin user already exist, update modified password LOGGER.warning("Detected password change for 'MAGPIE_ADMIN_USER'. Attempting to update...") try: uu.check_user_info(password=admin_password, check_name=False, check_email=False, check_group=False, check_not_reserved=False) UserService.set_password(admin_usr, admin_password) UserService.regenerate_security_code(admin_usr) except Exception as http_exc: # noqa # re-raised as value error # pragma: no cover db_session.rollback() try: msg = "[{}]".format(get_json(http_exc)["detail"]) except Exception: # noqa msg = "(allowed characters, min-length, etc.)" # give generic detail as explicit could not be found raise_log("Failed to update 'MAGPIE_ADMIN_PASSWORD'. Provided configuration value does not conform to " "password format requirements {}.".format(msg), exception=ValueError, logger=LOGGER) elif len(admin_password) < get_constant("MAGPIE_PASSWORD_MIN_LENGTH", settings_container=settings): print_log("Weak password detected for 'MAGPIE_ADMIN_USER', consider changing it.", logger=LOGGER, level=logging.WARN) # Check if MAGPIE_ADMIN_GROUP has permission MAGPIE_ADMIN_PERMISSION magpie_admin_group = GroupService.by_group_name(admin_grp_name, db_session=db_session) # type: models.Group permission_names = [permission.perm_name for permission in magpie_admin_group.permissions] admin_perm = get_constant("MAGPIE_ADMIN_PERMISSION", settings_container=settings) if admin_perm not in permission_names: new_group_permission = models.GroupPermission(perm_name=admin_perm, group_id=magpie_admin_group.id) # noqa try: db_session.add(new_group_permission) except Exception as exc: # noqa: W0703 # nosec: B110 # pragma: no cover db_session.rollback() raise_log("Failed to create admin user-group permission", exception=type(exc), logger=LOGGER) # enforce some admin group fields magpie_admin_group.description = "Administrative group that grants full access management control to its members." magpie_admin_group.discoverable = False
[docs] def init_users_group(db_session, settings=None): # type: (Session, Optional[AnySettingsContainer]) -> None """ Registers into database the group matching :py:data:`magpie.constants.MAGPIE_USERS_GROUP` if not defined. """ usr_grp_name = get_constant("MAGPIE_USERS_GROUP", settings_container=settings) if not GroupService.by_group_name(usr_grp_name, db_session=db_session): user_group = models.Group(group_name=usr_grp_name) # noqa db_session.add(user_group) else: print_log("MAGPIE_USERS_GROUP already initialized", level=logging.DEBUG, logger=LOGGER)
[docs] def register_defaults(db_session=None, settings=None, ini_file_path=None): # type: (Optional[Session], Optional[AnySettingsContainer], Optional[Str]) -> None """ Registers into database every undefined default users and groups matching following variables: - :py:data:`magpie.constants.MAGPIE_ANONYMOUS_USER` - :py:data:`magpie.constants.MAGPIE_USERS_GROUP` - :py:data:`magpie.constants.MAGPIE_ADMIN_GROUP` - :py:data:`magpie.constants.MAGPIE_ADMIN_USER` """ if not isinstance(db_session, Session): if not ini_file_path: ini_file_path = get_constant("MAGPIE_INI_FILE_PATH", settings_container=settings) db_session = db.get_db_session_from_config_ini(ini_file_path) if not db.is_database_ready(db_session): time.sleep(2) raise_log("Database not ready", logger=LOGGER) init_admin(db_session, settings) init_anonymous(db_session, settings) init_users_group(db_session, settings) transaction.commit() db_session.close()
[docs] def make_parser(): # type: () -> argparse.ArgumentParser parser = argparse.ArgumentParser(description="Registers default users and groups in Magpie.") parser.add_argument("ini_file_path", help="Path of the configuration INI file to use to retrieve required settings") make_logging_options(parser) return parser
[docs] def main(args=None, parser=None, namespace=None): # type: (Optional[Sequence[Str]], Optional[argparse.ArgumentParser], Optional[argparse.Namespace]) -> Any if not parser: parser = make_parser() args = parser.parse_args(args=args, namespace=namespace) setup_logger_from_options(LOGGER, args) return register_defaults(ini_file_path=args.ini_file_path)
if __name__ == "__main__": main()