Source code for magpie.cli.purge_expired_network_tokens
#!/usr/bin/env python3
"""
Removes expired network tokens from the database.
This ensures that the network_tokens table doesn't fill up with expired tokens.
Both an expired token and a non-existent token behave the same from an access perspective (user is denied) so
it is safe to automatically remove all expired tokens.
"""
import argparse
from typing import TYPE_CHECKING
import requests
import transaction
from magpie import models
from magpie.cli.utils import make_logging_options, setup_logger_from_options
from magpie.constants import get_constant
from magpie.db import get_db_session_from_config_ini
from magpie.utils import get_logger, print_log, raise_log
if TYPE_CHECKING:
from typing import Optional, Sequence
from magpie.typedefs import Str
[docs]
LOGGER = get_logger(__name__,
message_format="%(asctime)s - %(levelname)s - %(message)s",
datetime_format="%d-%b-%y %H:%M:%S", force_stdout=False)
[docs]
def make_parser():
# type: () -> argparse.ArgumentParser
parser = argparse.ArgumentParser(description="Delete all expired network tokens.")
parser.add_argument("--config", "--ini", metavar="CONFIG", dest="ini_config",
default=get_constant("MAGPIE_INI_FILE_PATH"),
help="Configuration INI file to retrieve database connection settings (default: %(default)s).")
subparsers = parser.add_subparsers(help="run with API or directly access the database", dest="api_or_db")
api_parser = subparsers.add_parser("api")
subparsers.add_parser("db")
api_parser.add_argument("url", help="URL used to access the magpie service.")
api_parser.add_argument("username", help="Admin username for magpie login.")
api_parser.add_argument("password", help="Admin password for magpie login.")
make_logging_options(parser)
return parser
[docs]
def get_login_session(magpie_url, username, password):
session = requests.Session()
data = {"user_name": username, "password": password}
response = session.post(magpie_url + "/signin", json=data)
if response.status_code != 200:
LOGGER.error(response.content)
return None
return session
[docs]
def main(args=None, parser=None, namespace=None):
# type: (Optional[Sequence[Str]], Optional[argparse.ArgumentParser], Optional[argparse.Namespace]) -> int
if not parser:
parser = make_parser()
args = parser.parse_args(args=args, namespace=namespace)
setup_logger_from_options(LOGGER, args)
if args.api_or_db == "api":
session = get_login_session(args.url, args.username, args.password)
if session is None:
raise_log("Failed to login, invalid username or password", logger=LOGGER)
response = session.delete("{}/network/tokens?expired_only=true".format(args.url))
try:
response.raise_for_status()
except requests.HTTPError as exc:
raise_log("Failed to delete expired network tokens: {}".format(exc), exception=type(exc), logger=LOGGER)
data = response.json()
deleted = int(data["deleted"])
else:
db_session = get_db_session_from_config_ini(args.ini_config)
deleted = models.NetworkToken.delete_expired(db_session)
# clean up unused records in the database (no need to keep records associated with anonymous network users)
(db_session.query(models.NetworkRemoteUser)
.filter(models.NetworkRemoteUser.user_id == None) # noqa: E711 # pylint: disable=singleton-comparison
.filter(models.NetworkRemoteUser.network_token_id == None) # noqa: E711 # pylint: disable=singleton-comparison
.delete())
try:
transaction.commit()
db_session.close()
except Exception as exc: # noqa: W0703 # nosec: B110 # pragma: no cover
db_session.rollback()
raise_log("Failed to delete expired network tokens", exception=type(exc), logger=LOGGER)
if deleted:
print_log("{} expired network tokens deleted".format(deleted), logger=LOGGER)
else:
print_log("No expired network tokens found", logger=LOGGER)
return 0
if __name__ == "__main__":
main()