Source code for magpie.models

from typing import TYPE_CHECKING

import sqlalchemy as sa
from pyramid.httpexceptions import HTTPInternalServerError
from pyramid.security import ALL_PERMISSIONS, Allow, Authenticated, Everyone
from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy.orm import relationship
from ziggurat_foundations import ziggurat_model_init
from ziggurat_foundations.models.base import BaseModel, get_db_session
from ziggurat_foundations.models.external_identity import ExternalIdentityMixin
from ziggurat_foundations.models.group import GroupMixin
from ziggurat_foundations.models.group_permission import GroupPermissionMixin
from ziggurat_foundations.models.group_resource_permission import GroupResourcePermissionMixin
from ziggurat_foundations.models.resource import ResourceMixin
from ziggurat_foundations.models.services import BaseService
from ziggurat_foundations.models.services.group import GroupService
from ziggurat_foundations.models.services.resource_tree import ResourceTreeService
from ziggurat_foundations.models.services.resource_tree_postgres import ResourceTreeServicePostgreSQL
from ziggurat_foundations.models.services.user import UserService
from ziggurat_foundations.models.user import UserMixin
from ziggurat_foundations.models.user_group import UserGroupMixin
from ziggurat_foundations.models.user_permission import UserPermissionMixin
from ziggurat_foundations.models.user_resource_permission import UserResourcePermissionMixin
from ziggurat_foundations.permissions import permission_to_pyramid_acls

from magpie.api.exception import evaluate_call
from magpie.constants import get_constant
from magpie.permissions import Permission

if TYPE_CHECKING:
    # pylint: disable=W0611,unused-import
    from typing import Dict, Type

    from magpie.typedefs import Str

[docs]Base = declarative_base() # pylint: disable=C0103,invalid-name
[docs]def get_session_callable(request): return request.db
[docs]class Group(GroupMixin, Base):
[docs] def get_member_count(self, db_session=None): return BaseService.all(UserGroup, db_session=db_session).filter(UserGroup.group_id == self.id).count()
@declared_attr
[docs] def discoverable(self): """ Indicates if the group is discoverable for users to self-register to it. """ return sa.Column(sa.Boolean(), default=False)
[docs]class GroupPermission(GroupPermissionMixin, Base): pass
[docs]class UserGroup(UserGroupMixin, Base): pass
[docs]class GroupResourcePermission(GroupResourcePermissionMixin, Base): pass
[docs]class Resource(ResourceMixin, Base): # required resource type identifier (unique)
[docs] resource_type_name = None # type: Str
[docs] child_resource_allowed = True
[docs] resource_display_name = sa.Column(sa.Unicode(100), nullable=True)
# reference to top-most service under which the resource is nested # if the resource is the service, id is None (NULL) @declared_attr
[docs] def root_service_id(self): return sa.Column(sa.Integer, sa.ForeignKey("services.resource_id", onupdate="CASCADE", ondelete="SET NULL"), index=True)
@property
[docs] def __acl__(self): """ User or group that owns a resource are granted full access to it. """ acl = [] if self.owner_user_id: acl.extend([(Allow, self.owner_user_id, ALL_PERMISSIONS,), ]) if self.owner_group_id: acl.extend([(Allow, "group:%s" % self.owner_group_id, ALL_PERMISSIONS,), ]) return acl
[docs]class UserPermission(UserPermissionMixin, Base): pass
[docs]class UserResourcePermission(UserResourcePermissionMixin, Base): pass
[docs]class User(UserMixin, Base):
[docs] def __str__(self): return "<User: %s, %s>" % (self.id, self.user_name)
[docs]class ExternalIdentity(ExternalIdentityMixin, Base): pass
[docs]class RootFactory(object): """ Used to build base Access Control List (ACL) of the request user. All API and UI routes will employ this set of effective principals to determine if the user is authorized to access the pyramid view according to the ``permission`` value it was configured with. .. note:: Keep in mind that `Magpie` is configured with default permission :py:data:`magpie.constants.MAGPIE_ADMIN_PERMISSION`. Views that require more permissive authorization must be overridden with ``permission`` argument. .. seealso:: - ``set_default_permission`` within :func:`magpie.includeme` initialization steps """
[docs] __name__ = None
[docs] __parent__ = ""
def __init__(self, request): self.request = request @property
[docs] def __acl__(self): """ Administrators have all permissions, user/group-specific permissions added if user is logged in. """ user = self.request.user # allow if role MAGPIE_ADMIN_PERMISSION is somehow directly set instead of inferred via members of admin-group acl = [(Allow, get_constant("MAGPIE_ADMIN_PERMISSION"), ALL_PERMISSIONS)] admins = GroupService.by_group_name(get_constant("MAGPIE_ADMIN_GROUP"), db_session=self.request.db) if admins: # need to add explicit admin-group ALL_PERMISSIONS otherwise views with other permissions than the # default MAGPIE_ADMIN_PERMISSION will be refused access (e.g.: views with MAGPIE_LOGGED_PERMISSION) acl += [(Allow, "group:{}".format(admins.id), ALL_PERMISSIONS)] if user: # user-specific permissions (including group memberships) permissions = UserService.permissions(user, self.request.db) user_acl = permission_to_pyramid_acls(permissions) # allow views that require minimally to be logged in (regardless of who is the user) auth_acl = [(Allow, user.id, Authenticated)] acl += user_acl + auth_acl return acl
[docs]class UserFactory(RootFactory): def __init__(self, request): super(UserFactory, self).__init__(request) self.path_user = None
[docs] def __getitem__(self, user_name): context = UserFactory(self.request) if user_name == get_constant("MAGPIE_LOGGED_USER", self.request): self.path_user = self.request.user else: self.path_user = UserService.by_user_name(user_name, self.request.db) if self.path_user is not None: self.path_user.__parent__ = self self.path_user.__name__ = user_name context.path_user = self.path_user return context
@property
[docs] def __acl__(self): """ Grant access to :term:`Request User` according to its relationship to :term:`Context User`. If it is the same user (either from explicit name or by :py:data:`magpie.constants.MAGPIE_LOGGED_USER` reserved keyword), allow :py:data:`magpie.constants.MAGPIE_LOGGED_PERMISSION` for itself to access corresponding views. If request user is unauthenticated (``None``), :py:data:`magpie.constants.MAGPIE_LOGGED_USER` or itself, also grant :py:data:`magpie.constants.MAGPIE_CONTEXT_PERMISSION` to allow access to contextually-available details (e.g.: user can view his own information and public ones). All ACL permissions from :class:`RootFactory` are applied on top of user-specific permissions added here. """ user = self.request.user acl = super(UserFactory, self).__acl__ # inherit default permissions for non user-scoped routes # when user is authenticated and refers to itself, simultaneously fulfill both logged/context conditions if user and self.path_user and user.id == self.path_user.id: acl += [(Allow, user.id, get_constant("MAGPIE_LOGGED_PERMISSION")), (Allow, user.id, get_constant("MAGPIE_CONTEXT_PERMISSION"))] # unauthenticated context is allowed if and only if referring also to the unauthenticated user elif user is None: if self.path_user is None or self.path_user.user_name == get_constant("MAGPIE_ANONYMOUS_USER"): acl += [(Allow, Everyone, get_constant("MAGPIE_CONTEXT_PERMISSION"))] return acl
[docs]class Service(Resource): """ Resource of `service` type. """
[docs] __tablename__ = "services"
[docs] resource_id = sa.Column(sa.Integer(), sa.ForeignKey("resources.resource_id", onupdate="CASCADE", ondelete="CASCADE", ), primary_key=True, )
[docs] resource_type_name = "service"
[docs] __mapper_args__ = { "polymorphic_identity": resource_type_name, "inherit_condition": resource_id == Resource.resource_id
} @property
[docs] def permissions(self): # pragma: no cover raise TypeError("Service permissions must be accessed by 'magpie.services.ServiceInterface' "
"instead of 'magpie.models.Service'.") @declared_attr
[docs] def url(self): # http://localhost:8083 return sa.Column(sa.UnicodeText(), unique=True)
@declared_attr
[docs] def type(self): """ Identifier matching ``magpie.services.ServiceInterface.service_type``. """ # wps, wms, thredds,... return sa.Column(sa.UnicodeText())
@declared_attr
[docs] def sync_type(self): """ Identifier matching ``magpie.cli.SyncServiceInterface.sync_type``. """ # project-api, geoserver-api,... return sa.Column(sa.UnicodeText(), nullable=True)
@staticmethod
[docs] def by_service_name(service_name, db_session): db = get_db_session(db_session) service = db.query(Service).filter(Resource.resource_name == service_name).first() return service
[docs]class PathBase(object):
[docs] permissions = [ Permission.READ, Permission.WRITE, Permission.GET_CAPABILITIES, Permission.GET_MAP, Permission.GET_FEATURE_INFO, Permission.GET_LEGEND_GRAPHIC, Permission.GET_METADATA,
]
[docs]class File(Resource, PathBase):
[docs] child_resource_allowed = False
[docs] resource_type_name = "file"
[docs] __mapper_args__ = {"polymorphic_identity": resource_type_name}
[docs]class Directory(Resource, PathBase):
[docs] resource_type_name = "directory"
[docs] __mapper_args__ = {"polymorphic_identity": resource_type_name}
[docs]class Workspace(Resource):
[docs] resource_type_name = "workspace"
[docs] __mapper_args__ = {"polymorphic_identity": resource_type_name}
[docs] permissions = [ Permission.GET_CAPABILITIES, Permission.GET_MAP, Permission.GET_FEATURE_INFO, Permission.GET_LEGEND_GRAPHIC, Permission.GET_METADATA, Permission.GET_FEATURE, Permission.DESCRIBE_FEATURE_TYPE, Permission.LOCK_FEATURE, Permission.TRANSACTION,
]
[docs]class Route(Resource):
[docs] resource_type_name = "route"
[docs] __mapper_args__ = {"polymorphic_identity": resource_type_name}
[docs] permissions = [ Permission.READ, # access with inheritance (this route and all under it) Permission.WRITE, # access with inheritance (this route and all under it) Permission.READ_MATCH, # access without inheritance (only on this specific route) Permission.WRITE_MATCH, # access without inheritance (only on this specific route)
]
[docs]class RemoteResource(BaseModel, Base):
[docs] __tablename__ = "remote_resources"
[docs] __possible_permissions__ = ()
[docs] _ziggurat_services = [ResourceTreeService]
[docs] resource_id = sa.Column(sa.Integer(), primary_key=True, nullable=False, autoincrement=True)
[docs] service_id = sa.Column(sa.Integer(), sa.ForeignKey("services.resource_id", onupdate="CASCADE", ondelete="CASCADE"), index=True, nullable=False)
[docs] parent_id = sa.Column(sa.Integer(), sa.ForeignKey("remote_resources.resource_id", onupdate="CASCADE", ondelete="SET NULL"), nullable=True)
[docs] ordering = sa.Column(sa.Integer(), default=0, nullable=False)
[docs] resource_name = sa.Column(sa.Unicode(100), nullable=False)
[docs] resource_display_name = sa.Column(sa.Unicode(100), nullable=True)
[docs] resource_type = sa.Column(sa.Unicode(30), nullable=False)
[docs] def __repr__(self): info = self.resource_type, self.resource_name, self.resource_id, self.ordering, self.parent_id return "<RemoteResource: %s, %s, id: %s position: %s, parent_id: %s>" % info
[docs]class RemoteResourcesSyncInfo(BaseModel, Base):
[docs] __tablename__ = "remote_resources_sync_info"
[docs] id = sa.Column(sa.Integer(), primary_key=True, nullable=False, autoincrement=True)
[docs] service_id = sa.Column(sa.Integer(), sa.ForeignKey("services.resource_id", onupdate="CASCADE", ondelete="CASCADE"), index=True, nullable=False)
[docs] service = relationship("Service", foreign_keys=[service_id])
[docs] remote_resource_id = sa.Column(sa.Integer(), sa.ForeignKey("remote_resources.resource_id", onupdate="CASCADE", ondelete="CASCADE"))
[docs] last_sync = sa.Column(sa.DateTime(), nullable=True)
@staticmethod
[docs] def by_service_id(service_id, session): condition = RemoteResourcesSyncInfo.service_id == service_id service_info = session.query(RemoteResourcesSyncInfo).filter(condition).first() return service_info
[docs] def __repr__(self): last_modified = self.last_sync.strftime("%Y-%m-%dT%H:%M:%S") if self.last_sync else None info = self.service_id, last_modified, self.id return "<RemoteResourcesSyncInfo service_id: %s, last_sync: %s, id: %s>" % info
[docs]class RemoteResourceTreeService(ResourceTreeService): def __init__(self, service_cls): self.model = RemoteResource super(RemoteResourceTreeService, self).__init__(service_cls)
[docs]class RemoteResourceTreeServicePostgresSQL(ResourceTreeServicePostgreSQL):
""" This is necessary, because ResourceTreeServicePostgresSQL.model is the Resource class. If we want to change it for a RemoteResource, we need this class. The ResourceTreeService.__init__ call sets the model. """ ziggurat_model_init(User, Group, UserGroup, GroupPermission, UserPermission, UserResourcePermission, GroupResourcePermission, Resource, ExternalIdentity, passwordmanager=None)
[docs]RESOURCE_TREE_SERVICE = ResourceTreeService(ResourceTreeServicePostgreSQL)
[docs]REMOTE_RESOURCE_TREE_SERVICE = RemoteResourceTreeService(RemoteResourceTreeServicePostgresSQL)
[docs]RESOURCE_TYPE_DICT = dict() # type: Dict[Str, Type[Resource]]
for res in [Service, Directory, File, Workspace, Route]: if res.resource_type_name in RESOURCE_TYPE_DICT: # pragma: no cover raise KeyError("Duplicate resource type identifiers not allowed") RESOURCE_TYPE_DICT[res.resource_type_name] = res
[docs]def resource_factory(**kwargs): resource_type = evaluate_call(lambda: kwargs["resource_type"], http_error=HTTPInternalServerError, msg_on_fail="kwargs do not contain required 'resource_type'", content={"kwargs": repr(kwargs)}) return evaluate_call(lambda: RESOURCE_TYPE_DICT[resource_type](**kwargs), # noqa http_error=HTTPInternalServerError, msg_on_fail="kwargs unpacking failed from specified 'resource_type' and 'RESOURCE_TYPE_DICT'", content={"kwargs": repr(kwargs), "RESOURCE_TYPE_DICT": repr(RESOURCE_TYPE_DICT)})
[docs]def find_children_by_name(child_name, parent_id, db_session): tree_struct = RESOURCE_TREE_SERVICE.from_parent_deeper(parent_id=parent_id, limit_depth=1, db_session=db_session) tree_level_filtered = [node.Resource for node in list(tree_struct) if node.Resource.resource_name.lower() == child_name.lower()] return tree_level_filtered.pop() if len(tree_level_filtered) else None