import json
from collections import OrderedDict
from datetime import datetime
from typing import TYPE_CHECKING
import humanize
import transaction
import yaml
from pyramid.httpexceptions import (
HTTPBadRequest,
HTTPConflict,
HTTPException,
HTTPFound,
HTTPMovedPermanently,
HTTPNotFound,
HTTPUnprocessableEntity
)
from pyramid.settings import asbool
from pyramid.view import view_config
from magpie import register
from magpie.api import schemas
from magpie.cli.sync_resources import OUT_OF_SYNC, fetch_single_service, get_last_sync, merge_local_and_remote_resources
from magpie.cli.sync_services import SYNC_SERVICES_TYPES
from magpie.constants import get_constant
# FIXME: remove (REMOTE_RESOURCE_TREE_SERVICE, RESOURCE_TYPE_DICT), implement getters via API
from magpie.models import REMOTE_RESOURCE_TREE_SERVICE, RESOURCE_TYPE_DICT, UserGroupStatus, UserStatuses
from magpie.permissions import Permission, PermissionSet
# FIXME: remove (SERVICE_TYPE_DICT), implement getters via API
from magpie.services import SERVICE_TYPE_DICT
from magpie.ui.utils import AdminRequests, BaseViews, check_response, handle_errors, request_api
from magpie.utils import CONTENT_TYPE_JSON, get_json, get_logger, is_json_body
if TYPE_CHECKING:
from typing import Any, Dict, List, Optional, Tuple
from sqlalchemy.orm.session import Session
from magpie.typedefs import JSON, Str
[docs]
LOGGER = get_logger(__name__)
[docs]
class ManagementViews(AdminRequests, BaseViews):
@handle_errors
[docs]
def goto_service(self, resource_id):
path = schemas.ResourceAPI.path.format(resource_id=resource_id)
resp = request_api(self.request, path, "GET")
check_response(resp)
body = get_json(resp)
svc_name = body["resource"]["resource_name"]
# get service type instead of 'cur_svc_type' in case of 'default' ('cur_svc_type' not set yet)
path = schemas.ServiceAPI.path.format(service_name=svc_name)
resp = request_api(self.request, path, "GET")
check_response(resp)
body = get_json(resp)
svc_type = body["service"]["service_type"]
return HTTPFound(self.request.route_url("edit_service", service_name=svc_name, cur_svc_type=svc_type))
@view_config(route_name="view_users", renderer="templates/view_users.mako")
[docs]
def view_users(self):
user_name = self.request.POST.get("user_name")
if "delete" in self.request.POST:
path = schemas.UserAPI.path.format(user_name=user_name)
resp = request_api(self.request, path, "DELETE")
check_response(resp)
if "edit" in self.request.POST:
return HTTPFound(self.request.route_url("edit_user", user_name=user_name, cur_svc_type="default"))
if "delete-pending" in self.request.POST:
path = schemas.RegisterUserAPI.path.format(user_name=user_name)
resp = request_api(self.request, path, "DELETE")
check_response(resp)
if "view-pending" in self.request.POST:
return HTTPFound(self.request.route_url("view_pending_user", user_name=user_name))
users = self.get_user_details(status="all")
non_error = UserStatuses.OK | UserStatuses.Pending # use combine in case more error types gets added later on
user_info = [{"name": user["user_name"], "email": user["email"]} for user in users]
user_error = [user["user_name"] for user in users if UserStatuses.get(user["status"]) not in non_error]
pending = [user["user_name"] for user in users if UserStatuses.get(user["status"]) == UserStatuses.Pending]
return self.add_template_data({"users": user_info, "users_with_error": user_error, "users_pending": pending})
@view_config(route_name="add_user", renderer="templates/add_user.mako")
[docs]
def add_user(self):
"""
User creation by a logged administrator.
.. note::
The template employed for this form is reused for user self-registration as the fields and validation
of inputs are essentially the same. Their actual processing is different though, as the administrator
user is already logged in this case, and nobody is logged in the other.
.. seealso::
:meth:`magpie.ui.login.views.LoginViews.register_user`
"""
groups = self.get_all_groups(first_default_group=self.MAGPIE_ANONYMOUS_GROUP)
return_data = {"user_groups": groups, "is_registration": False}
return_data = self.create_user_default_template_data(return_data)
if "create" in self.request.POST:
# delegate form submission to validation and creation
return_data = self.create_user(return_data)
if return_data["is_error"]:
return self.add_template_data(return_data)
# successful user creation, redirect to list of users since logged administrator
# initiated this process from there by clicking the 'add user' button
return HTTPFound(self.request.route_url("view_users"))
# first page load or refresh
return self.add_template_data(return_data)
@view_config(route_name="edit_user", renderer="templates/edit_user.mako")
[docs]
def edit_user(self):
"""
Edit the fields of any referenced user profile by an administrator.
.. seealso::
- :meth:`magpie.ui.user.views.UserViews.edit_current_user` for corresponding operation by user self-update
"""
user_name = self.request.matchdict["user_name"] # keep reference to original name in case of update request
cur_svc_type = self.request.matchdict["cur_svc_type"]
inherit_grp_perms = self.request.matchdict.get("inherit_groups_permissions", False)
own_groups = self.get_user_groups(user_name)
pending_groups = self.get_user_groups(user_name, user_group_status=UserGroupStatus.PENDING)
all_groups = self.get_all_groups(first_default_group=get_constant("MAGPIE_USERS_GROUP", self.request))
# TODO:
# Until the api is modified to make it possible to request from the RemoteResource table,
# we have to access the database directly here
session = self.request.db
svc_types, cur_svc_type, services = self.get_services(cur_svc_type)
user_path = schemas.UserAPI.path.format(user_name=user_name)
user_resp = request_api(self.request, user_path, "GET")
check_response(user_resp)
# set default values needed by the page in case of early return due to error
user_info = get_json(user_resp)["user"]
user_info["user_edit_email"] = True # always allowed by administrators
user_info["user_with_error"] = UserStatuses.get(user_info["status"]) != UserStatuses.OK
user_info["edit_mode"] = "no_edit"
user_info["own_groups"] = own_groups
user_info["pending_groups"] = pending_groups
user_info["groups"] = all_groups
user_info["cur_svc_type"] = cur_svc_type
user_info["svc_types"] = svc_types
user_info["inherit_groups_permissions"] = inherit_grp_perms
user_info["error_message"] = ""
user_info["last_sync"] = "Never"
user_info["ids_to_clean"] = []
user_info["out_of_sync"] = []
user_info["sync_implemented"] = False
param_fields = ["password", "user_name", "user_email"]
for field in param_fields:
user_info["invalid_{}".format(field)] = False
user_info["reason_{}".format(field)] = ""
if self.request.method == "POST":
res_id = self.request.POST.get("resource_id")
is_edit_group_membership = False
is_save_user_info = False
requires_update_name = False
if "inherit_groups_permissions" in self.request.POST:
inherit_grp_perms = asbool(self.request.POST["inherit_groups_permissions"])
user_info["inherit_groups_permissions"] = inherit_grp_perms
if "delete" in self.request.POST:
resp = request_api(self.request, user_path, "DELETE")
check_response(resp)
return HTTPFound(self.request.route_url("view_users"))
if "goto_service" in self.request.POST:
return self.goto_service(res_id)
if "clean_resource" in self.request.POST:
# "clean_resource" must be above "edit_permissions" because they're in the same form.
self.delete_resource(res_id)
elif "edit_permissions" in self.request.POST and not inherit_grp_perms:
# FIXME:
# Add remote does not make sense anymore because we batch update resources (instead of one-by-one).
# Also not necessary because recursive permission don't require to actually have the sub-resources.
# If resources are needed to apply permissions on them, they are either added manually or via sync.
# if not res_id or res_id == "None":
# remote_id = int(self.request.POST.get("remote_id"))
# services_names = [s["service_name"] for s in services.values()]
# res_id = self.add_remote_resource(cur_svc_type, services_names, user_name,
# remote_id, is_user=True)
self.edit_user_or_group_resource_permissions(user_name, is_user=True)
elif "edit_group_membership" in self.request.POST:
is_edit_group_membership = True
elif "edit_username" in self.request.POST:
user_info["edit_mode"] = "edit_username"
elif "edit_password" in self.request.POST:
user_info["edit_mode"] = "edit_password"
elif "edit_email" in self.request.POST:
user_info["edit_mode"] = "edit_email"
elif "save_username" in self.request.POST:
user_info["user_name"] = self.request.POST.get("new_user_name")
is_save_user_info = True
requires_update_name = True
elif "save_password" in self.request.POST:
user_info["password"] = self.request.POST.get("new_user_password")
is_save_user_info = True
elif "save_email" in self.request.POST:
user_info["email"] = self.request.POST.get("new_user_email")
is_save_user_info = True
elif "force_sync" in self.request.POST:
_, errmsg = self.sync_services(services)
user_info["error_message"] += errmsg or ""
elif "clean_all" in self.request.POST:
ids_to_clean = self.request.POST.get("ids_to_clean").split(";")
for id_ in ids_to_clean:
self.delete_resource(id_)
if is_save_user_info:
resp = request_api(self.request, user_path, "PATCH", data=user_info)
if resp.status_code in (HTTPBadRequest.code, HTTPUnprocessableEntity.code):
requires_update_name = False # revoke fetch new name because failure occurred
# attempt to retrieve the API more-specific reason why the operation is invalid
body = get_json(resp)
param_name = body.get("param", {}).get("name")
reason = body.get("detail", "Invalid")
for field in param_fields:
if param_name == field:
user_info["invalid_{}".format(field)] = True
user_info["reason_{}".format(field)] = reason
break # cannot return early because we are still missing other resources/permissions info
else:
check_response(resp)
# FIXME: need to commit updates since we are using the same session
# otherwise, updated user doesn't exist yet in the db for next calls
self.request.tm.commit()
# ensure remove password from output (just in case)
user_info.pop("password", None)
if requires_update_name:
# re-fetch user groups as current user-group will have changed on new user_name
user_name = user_info["user_name"]
user_info["own_groups"] = self.get_user_groups(user_name)
# return immediately with updated URL to user with new name
users_url = self.request.route_url("edit_user", user_name=user_name, cur_svc_type=cur_svc_type)
return HTTPMovedPermanently(location=users_url)
# edits to group's checkboxes
if is_edit_group_membership:
selected_groups = self.request.POST.getall("member")
removed_groups = list(set(own_groups) - set(selected_groups) - {self.MAGPIE_ANONYMOUS_GROUP})
new_groups = list(set(selected_groups) - set(own_groups))
for group in removed_groups:
path = schemas.UserGroupAPI.path.format(user_name=user_name, group_name=group)
resp = request_api(self.request, path, "DELETE")
check_response(resp)
user_info["edit_new_membership_error"] = set()
successful_new_groups = set()
for group in new_groups:
try:
path = schemas.UserGroupsAPI.path.format(user_name=user_name)
data = {"group_name": group}
resp = request_api(self.request, path, "POST", data=data)
check_response(resp)
except HTTPException as exc:
detail = "{} ({}), {!s}".format(type(exc).__name__, exc.code, exc)
LOGGER.error("Unexpected API error under UI operation. [%s]", detail)
user_info["edit_new_membership_error"].add(group)
else:
successful_new_groups.add(group)
user_info["own_groups"] = self.get_user_groups(user_name)
user_info["pending_groups"] = self.get_user_groups(user_name, user_group_status=UserGroupStatus.PENDING)
user_info["edit_membership_pending_success"] = successful_new_groups & set(user_info["pending_groups"])
# display resources permissions per service type tab
try:
res_perm_names, res_perms = self.get_user_or_group_resources_permissions_dict(
user_name, services, cur_svc_type, is_user=True, is_inherit_groups_permissions=inherit_grp_perms
)
except Exception as exc:
raise HTTPBadRequest(detail=repr(exc))
sync_types = [s["service_sync_type"] for s in services.values()]
sync_implemented = any(s in SYNC_SERVICES_TYPES for s in sync_types)
info = self.get_remote_resources_info(res_perms, services, session)
res_perms, ids_to_clean, last_sync_humanized, out_of_sync = info
if out_of_sync:
user_info["error_message"] = self.make_sync_error_message(out_of_sync)
user_info["ids_to_clean"] = ";".join(ids_to_clean)
user_info["last_sync"] = last_sync_humanized
user_info["sync_implemented"] = sync_implemented
user_info["out_of_sync"] = out_of_sync
user_info["cur_svc_type"] = cur_svc_type
user_info["svc_types"] = svc_types
user_info["resources"] = res_perms
user_info["permissions"] = res_perm_names
user_info["permission_titles"] = [Permission(perm).title for perm in res_perm_names]
return self.add_template_data(data=user_info)
[docs]
def view_pending_user(self):
"""
Displays a pending user registration profile details.
.. note::
View configuration is added dynamically because this page it should be available only when the
corresponding feature is activated with configuration settings.
"""
user_name = self.request.matchdict["user_name"]
path = schemas.RegisterUserAPI.path.format(user_name=user_name)
# process removal of pending user registration the same way with either button
if "delete" in self.request.POST or "decline" in self.request.POST:
resp = request_api(self.request, path, "DELETE")
check_response(resp)
return HTTPFound(self.request.route_url("view_users"))
resp = request_api(self.request, path)
check_response(resp)
data = get_json(resp)["registration"]
# approval must be done with the explicit URL, user should exist afterwards
if "approve" in self.request.POST and data["approve_url"]:
path = data["approve_url"]
resp = request_api(self.request, path, "GET")
check_response(resp)
return HTTPFound(self.request.route_url("edit_user", user_name=user_name, cur_svc_type="default"))
return self.add_template_data(data=data)
@view_config(route_name="view_groups", renderer="templates/view_groups.mako")
[docs]
def view_groups(self):
if "delete" in self.request.POST:
group_name = self.request.POST.get("group_name")
path = schemas.GroupAPI.path.format(group_name=group_name)
resp = request_api(self.request, path, "DELETE")
check_response(resp)
if "edit" in self.request.POST:
group_name = self.request.POST.get("group_name")
return HTTPFound(self.request.route_url("edit_group", group_name=group_name, cur_svc_type="default"))
groups_info = {}
groups = sorted(self.get_all_groups())
for grp in groups:
if grp != "":
groups_info.setdefault(grp, {"members": len(self.get_group_users(grp))})
return self.add_template_data({"group_names": groups_info})
@view_config(route_name="add_group", renderer="templates/add_group.mako")
[docs]
def add_group(self):
return_data = {"invalid_group_name": False, "invalid_description": False, "invalid_terms": False,
"reason_group_name": "Invalid", "reason_description": "Invalid", "reason_terms": "Invalid",
"form_group_name": "", "form_discoverable": False, "form_description": "", "form_terms": ""}
if "create" in self.request.POST:
group_name = self.request.POST.get("group_name")
description = self.request.POST.get("description")
discoverable = asbool(self.request.POST.get("discoverable"))
terms = self.request.POST.get("terms")
return_data["form_group_name"] = group_name
return_data["form_description"] = description
return_data["form_discoverable"] = discoverable
return_data["form_terms"] = terms
if not group_name:
return_data["invalid_group_name"] = True
return self.add_template_data(return_data)
data = {
"group_name": group_name,
"description": return_data["form_description"],
"discoverable": return_data["form_discoverable"],
"terms": return_data["form_terms"],
}
resp = request_api(self.request, schemas.GroupsAPI.path, "POST", data=data)
if resp.status_code == HTTPConflict.code:
return_data["invalid_group_name"] = True
return_data["reason_group_name"] = "Conflict"
return self.add_template_data(return_data)
if resp.status_code == HTTPBadRequest.code:
# attempt to retrieve the API more-specific reason why the operation is invalid
body = get_json(resp)
param_name = body.get("param", {}).get("name")
reason = body.get("detail", "Invalid")
if param_name == "group_name":
return_data["invalid_group_name"] = True
return_data["reason_group_name"] = reason
return self.add_template_data(return_data)
if param_name == "description":
return_data["invalid_description"] = True
return_data["reason_description"] = reason
return self.add_template_data(return_data)
if param_name == "terms":
return_data["invalid_terms"] = True
return_data["reason_terms"] = reason
return self.add_template_data(return_data)
check_response(resp) # check for any other exception than checked use-cases
return HTTPFound(self.request.route_url("view_groups"))
return self.add_template_data(return_data)
[docs]
def resource_tree_parser(self, raw_resources_tree, permission):
resources_tree = {}
for r_id, resource in raw_resources_tree.items():
perms = permission.get(r_id, [])
perm_names = [PermissionSet(perm_json).explicit_permission for perm_json in perms]
children = self.resource_tree_parser(resource["children"], permission)
children = OrderedDict(sorted(children.items()))
resources_tree[resource["resource_name"]] = dict(
id=r_id,
permissions=perms,
permission_names=perm_names,
resource_type=resource["resource_type"],
resource_display_name=resource["resource_display_name"],
children=children
)
return resources_tree
[docs]
def perm_tree_parser(self, raw_perm_tree):
permission = {}
for r_id, resource in raw_perm_tree.items():
permission[r_id] = resource["permissions"]
permission.update(self.perm_tree_parser(resource["children"]))
return permission
[docs]
def edit_group_users(self, group_name):
current_members = self.get_group_users(group_name)
selected_members = self.request.POST.getall("member")
removed_members = list(set(current_members) - set(selected_members))
new_members = list(set(selected_members) - set(current_members))
for user_name in removed_members:
path = schemas.UserGroupAPI.path.format(user_name=user_name, group_name=group_name)
resp = request_api(self.request, path, "DELETE")
check_response(resp)
report_info = {"edit_new_membership_success": set(),
"edit_new_membership_error": set()}
for user_name in new_members:
try:
path = schemas.UserGroupsAPI.path.format(user_name=user_name)
data = {"group_name": group_name}
resp = request_api(self.request, path, "POST", data=data)
check_response(resp)
except HTTPException as exc:
detail = "{} ({}), {!s}".format(type(exc).__name__, exc.code, exc)
LOGGER.error("Unexpected API error under UI operation. [%s]", detail)
report_info["edit_new_membership_error"].add(user_name)
else:
report_info["edit_new_membership_success"].add(user_name)
return report_info
[docs]
def edit_user_or_group_resource_permissions(self, user_or_group_name, is_user=False):
posted = self.request.POST.dict_of_lists().items()
# retrieve all selectors that have a value during apply (either added, same or modified)
# (note: could have N times the resource ID per available permissions for it)
res_applied_perms = {perm_res_id.replace("permission_resource_", ""): set(permissions) - {""}
for perm_res_id, permissions in posted if perm_res_id.startswith("permission_resource")}
# retrieve all resources that previously had permissions (last apply or when generated page)
res_with_perms = {res_id.replace("resource_", ""): set(permissions) - {""}
for res_id, permissions in posted if res_id.startswith("resource_")}
res_with_perms.pop("id") # remove invalid entry used for redirects
updated_perms = {}
for res_id, applied in res_applied_perms.items():
prev_perms = res_with_perms.get(res_id, set())
removed = prev_perms - applied
updated = applied - prev_perms
if not (removed or updated):
continue
updated_perms[res_id] = applied
if is_user:
res_perms_path = schemas.UserResourcePermissionsAPI.path \
.format(user_name=user_or_group_name, resource_id=res_id)
else:
res_perms_path = schemas.GroupResourcePermissionsAPI.path \
.format(group_name=user_or_group_name, resource_id=res_id)
for perm in removed:
data = {"permission": perm}
resp = request_api(self.request, res_perms_path, "DELETE", data=data)
check_response(resp)
for perm in updated:
data = {"permission": perm}
resp = request_api(self.request, res_perms_path, "PUT", data=data)
check_response(resp)
[docs]
def get_user_or_group_resources_permissions_dict(self, user_or_group_name, services, service_type,
is_user=False, is_inherit_groups_permissions=False):
"""
Get the user or group applied permissions as well as applicable permissions for corresponding services.
Result is a :class:`tuple` of:
- combined :term:`Allowed Permissions <Applied Permission>` (*names only*) for services and their children
:term:`Resources <Resource>`.
- dictionary of key-service-name, each with recursive map value of children resource details including
the :term:`Applied Permissions <Applied Permission>` or :term:`Inherited Resources` for the corresponding
:term:`User` or :term:`Group` accordingly to specified arguments.
"""
if is_user:
# because page can only show a single permission (per name/resource) at a time, apply resolution
# on top of inheritance in order to display the highest priority permission in the tree hierarchy
query = "inherited=true&resolve=true" if is_inherit_groups_permissions else ""
path = schemas.UserResourcesAPI.path.format(user_name=user_or_group_name)
else:
query = ""
path = schemas.GroupResourcesAPI.path.format(group_name=user_or_group_name)
query_type = "type={}".format(service_type) # try to limit results for faster processing time
query_sep = "&" if query else ""
path += "?{}{}{}".format(query, query_sep, query_type)
resp = request_api(self.request, path, "GET")
check_response(resp)
body = get_json(resp)
path = schemas.ServiceTypeAPI.path.format(service_type=service_type)
resp = request_api(self.request, path, "GET")
check_response(resp)
resp_available_svc_types = get_json(resp)["services"][service_type]
# remove possible duplicate permissions from different services
resources_permission_names = set()
for svc in resp_available_svc_types:
perm_names = {perm["name"] for perm in resp_available_svc_types[svc]["permissions"]}
resources_permission_names.update(perm_names)
resources_permission_names = sorted(resources_permission_names)
resources = OrderedDict()
for service in sorted(services):
if not service:
continue
permission = OrderedDict()
try:
raw_perms = body["resources"][service_type][service]
permission[raw_perms["resource_id"]] = raw_perms["permissions"]
permission.update(self.perm_tree_parser(raw_perms["resources"]))
except KeyError:
pass
path = schemas.ServiceResourcesAPI.path.format(service_name=service)
resp = request_api(self.request, path, "GET")
check_response(resp)
raw_resources = get_json(resp)[service] # type: Dict[Str, JSON]
perms = permission.get(raw_resources["resource_id"], [])
perm_names = [PermissionSet(perm_json).explicit_permission for perm_json in perms]
resources[service] = OrderedDict(
id=raw_resources["resource_id"],
resource_type="service",
permissions=perms,
permission_names=perm_names,
children=self.resource_tree_parser(raw_resources["resources"], permission))
return resources_permission_names, resources
@view_config(route_name="edit_group", renderer="templates/edit_group.mako")
[docs]
def edit_group(self):
group_name = self.request.matchdict["group_name"]
cur_svc_type = self.request.matchdict["cur_svc_type"]
group_info = {
"edit_mode": "no_edit",
"group_name": group_name,
"cur_svc_type": cur_svc_type,
# user not used, but avoid error when checking against
# MAGPIE_FIXED_USERS_REFS since using same tree-view script
"user_name": None,
}
error_message = ""
edit_grp_users_info = {}
# TODO:
# Until the api is modified to make it possible to request from the RemoteResource table,
# we have to access the database directly here
session = self.request.db
# when service type is 'default', this function replaces 'cur_svc_type' with the first one available
svc_types, cur_svc_type, services = self.get_services(cur_svc_type)
# move to service or edit requested group/permission changes
if self.request.method == "POST":
is_edit_group_members = False
res_id = self.request.POST.get("resource_id")
if "delete" in self.request.POST:
self.delete_group(group_name)
return HTTPFound(self.request.route_url("view_groups"))
if "goto_service" in self.request.POST:
return self.goto_service(res_id)
if "edit_group_name" in self.request.POST:
group_info["edit_mode"] = "edit_group_name"
elif "save_group_name" in self.request.POST:
group_info["group_name"] = self.request.POST.get("new_group_name")
group_info = self.update_group_info(group_name, group_info)
# return immediately with updated URL to group with new name (reprocess this template from scratch)
return HTTPFound(self.request.route_url("edit_group", **group_info))
if "edit_description" in self.request.POST:
group_info["edit_mode"] = "edit_description"
elif "save_description" in self.request.POST:
group_info["description"] = self.request.POST.get("new_description")
group_info.update(self.update_group_info(group_name, group_info))
elif "clean_resource" in self.request.POST:
# "clean_resource" must be above "edit_permissions" because they're in the same form.
self.delete_resource(res_id)
elif "is_discoverable" in self.request.POST:
group_info["discoverable"] = not asbool(self.request.POST.get("is_discoverable"))
group_info.update(self.update_group_info(group_name, group_info))
elif "edit_permissions" in self.request.POST:
# FIXME:
# Add remote does not make sense anymore because we batch update resources (instead of one-by-one).
# Also not necessary because recursive permission don't require to actually have the sub-resources.
# If resources are needed to apply permissions on them, they are either added manually or via sync.
# if not res_id or res_id == "None":
# remote_id = int(self.request.POST.get("remote_id"))
# services_names = [s["service_name"] for s in services.values()]
# res_id = self.add_remote_resource(cur_svc_type, services_names, group_name,
# remote_id, is_user=False)
self.edit_user_or_group_resource_permissions(group_name, is_user=False)
elif "edit_group_members" in self.request.POST:
is_edit_group_members = True
elif "force_sync" in self.request.POST:
_, errmsg = self.sync_services(services)
error_message += errmsg or ""
elif "clean_all" in self.request.POST:
ids_to_clean = self.request.POST.get("ids_to_clean").split(";")
for id_ in ids_to_clean:
self.delete_resource(id_)
elif "no_edit" not in self.request.POST:
raise HTTPBadRequest(detail="Invalid POST request for group edit.")
# edits to group members checkboxes
if is_edit_group_members:
edit_grp_users_info = self.edit_group_users(group_name)
# display resources permissions per service type tab
try:
res_perm_names, res_perms = self.get_user_or_group_resources_permissions_dict(
group_name, services, cur_svc_type, is_user=False
)
except Exception as exc:
raise HTTPBadRequest(detail=repr(exc))
sync_types = [s["service_sync_type"] for s in services.values()]
sync_implemented = any(s in SYNC_SERVICES_TYPES for s in sync_types)
info = self.get_remote_resources_info(res_perms, services, session)
res_perms, ids_to_clean, last_sync_humanized, out_of_sync = info
if out_of_sync:
error_message = self.make_sync_error_message(out_of_sync)
group_info.update(self.get_group_info(group_name))
group_info["members"] = group_info.pop("user_names")
group_info["pending_users"] = self.get_group_users(group_name, user_group_status=UserGroupStatus.PENDING)
group_info["error_message"] = error_message
group_info["ids_to_clean"] = ";".join(ids_to_clean)
group_info["last_sync"] = last_sync_humanized
group_info["sync_implemented"] = sync_implemented
group_info["out_of_sync"] = out_of_sync
group_info["users"] = self.get_user_names()
group_info["svc_types"] = svc_types
group_info["cur_svc_type"] = cur_svc_type
group_info["resources"] = res_perms
group_info["permissions"] = res_perm_names
group_info["permission_titles"] = [Permission(perm).title for perm in res_perm_names]
if edit_grp_users_info:
group_info["edit_membership_pending_success"] = (
edit_grp_users_info["edit_new_membership_success"] & set(group_info["pending_users"])
)
group_info["edit_new_membership_error"] = edit_grp_users_info["edit_new_membership_error"]
return self.add_template_data(data=group_info)
@staticmethod
[docs]
def make_sync_error_message(service_names):
this = "this service" if len(service_names) == 1 else "these services"
error_message = ("There seems to be an issue synchronizing resources from "
"{}: {}".format(this, ", ".join(service_names)))
return error_message
[docs]
def sync_services(self, services):
# type: (Dict[Str, JSON]) -> Tuple[List[Str], Optional[Str]]
"""
Syncs specified services.
:returns: names of services that produced a sync error and corresponding sync message (if any).
"""
errors = []
session = self.request.db
for service_info in services.values():
try:
fetch_single_service(service_info["resource_id"], session)
transaction.commit()
except Exception: # noqa: W0703 # nosec: B110
errors.append(service_info["service_name"])
if errors:
return errors, self.make_sync_error_message(errors)
return errors, None
[docs]
def get_remote_resources_info(self, res_perms, services, session):
last_sync_humanized = "Never"
ids_to_clean, out_of_sync = [], []
now = datetime.now()
service_ids = [s["resource_id"] for s in services.values()]
last_sync_datetimes = list(filter(bool, self.get_last_sync_datetimes(service_ids, session)))
if any(last_sync_datetimes):
last_sync_datetime = min(last_sync_datetimes)
last_sync_humanized = humanize.naturaltime(now - last_sync_datetime)
res_perms = self.merge_remote_resources(res_perms, services, session)
for last_sync, service_name in zip(last_sync_datetimes, services):
if last_sync:
ids_to_clean += self.get_ids_to_clean(res_perms[service_name]["children"])
if now - last_sync > OUT_OF_SYNC:
out_of_sync.append(service_name)
return res_perms, ids_to_clean, last_sync_humanized, out_of_sync
@staticmethod
[docs]
def merge_remote_resources(res_perms, services, session):
merged_resources = {}
for service_name, service_values in services.items():
service_id = service_values["resource_id"]
merge = merge_local_and_remote_resources
# create a subset for the current local service resources tree
# avoids over-copying/looping the multi-service tree by merge function that works on the full set each time
local_svc_res = {service_name: res_perms[service_name]}
resources_for_service = merge(local_svc_res, service_values["service_sync_type"], service_id, session)
merged_resources[service_name] = resources_for_service[service_name]
return merged_resources
@staticmethod
[docs]
def get_last_sync_datetimes(service_ids, session):
# type: (List[int], Session) -> List[Optional[datetime]]
return [get_last_sync(s, session) for s in service_ids]
[docs]
def delete_resource(self, res_id):
try:
path = schemas.ResourceAPI.path.format(resource_id=res_id)
resp = request_api(self.request, path, "DELETE")
check_response(resp)
except HTTPNotFound:
# Some resource ids are already deleted because they were a child
# of another just deleted parent resource.
# We just skip them.
pass
[docs]
def get_ids_to_clean(self, resources):
ids = []
for _, values in resources.items():
if "matches_remote" in values and not values["matches_remote"]:
ids.append(values["id"])
ids += self.get_ids_to_clean(values["children"])
return ids
[docs]
def add_remote_resource(self, service_type, services_names, user_or_group, remote_id, is_user=False):
try:
_, res_perms = self.get_user_or_group_resources_permissions_dict(
user_or_group, services=services_names, service_type=service_type, is_user=is_user
)
except Exception as exc:
raise HTTPBadRequest(detail=repr(exc))
# get the parent resources for this remote_id
# TODO:
# Until the api is modified to make it possible to request from the RemoteResource table,
# we have to access the database directly here
session = self.request.db
parents = REMOTE_RESOURCE_TREE_SERVICE.path_upper(remote_id, db_session=session)
parents = list(reversed(list(parents)))
parent_id = None
current_resources = res_perms
for remote_resource in parents:
name = remote_resource.resource_name
if name in current_resources:
parent_id = int(current_resources[name]["id"])
current_resources = current_resources[name]["children"]
else:
data = {
"resource_name": name,
"resource_display_name": remote_resource.resource_display_name,
"resource_type": remote_resource.resource_type,
"parent_id": parent_id,
}
resp = request_api(self.request, schemas.ResourcesAPI.path, "POST", data=data)
check_response(resp)
parent_id = get_json(resp)["resource"]["resource_id"]
return parent_id
@handle_errors
[docs]
def get_service_resources(self, service_name):
resources = {}
path = schemas.ServiceResourcesAPI.path.format(service_name=service_name)
resp = request_api(self.request, path, "GET")
check_response(resp)
raw_resources = get_json(resp)[service_name] # type: Dict[Str, JSON]
resources[service_name] = dict(
id=raw_resources["resource_id"],
permissions=[],
resource_type="service",
children=self.resource_tree_parser(raw_resources["resources"], {}))
resources_id_type = self.get_resource_types()
return resources, resources_id_type
@view_config(route_name="view_services", renderer="templates/view_services.mako")
[docs]
def view_services(self):
if "delete" in self.request.POST:
service_name = self.request.POST.get("service_name")
service_data = {"service_push": self.request.POST.get("service_push")}
path = schemas.ServiceAPI.path.format(service_name=service_name)
data = json.dumps(service_data)
resp = request_api(self.request, path, "DELETE", data=data)
check_response(resp)
cur_svc_type = self.request.matchdict["cur_svc_type"]
svc_types, cur_svc_type, services = self.get_services(cur_svc_type)
service_names = services.keys()
success_sync = None
if "phoenix_push" in self.request.POST:
if cur_svc_type in register.SERVICES_PHOENIX_ALLOWED:
success_sync = register.sync_services_phoenix(services, services_as_dicts=True)
if "edit" in self.request.POST:
service_name = self.request.POST.get("service_name")
return HTTPFound(self.request.route_url("edit_service",
service_name=service_name, cur_svc_type=cur_svc_type))
data = {
"cur_svc_type": cur_svc_type,
"svc_types": svc_types,
"service_names": service_names,
"service_push_show": cur_svc_type in register.SERVICES_PHOENIX_ALLOWED,
"service_push_success": success_sync
}
return self.add_template_data(data)
@view_config(route_name="add_service", renderer="templates/add_service.mako")
[docs]
def add_service(self):
cur_svc_type = self.request.matchdict["cur_svc_type"]
svc_types, cur_svc_type, _ = self.get_services(cur_svc_type)
services_keys_sorted = self.get_service_types()
services_phoenix_enabled = [
(1 if services_keys_sorted[i] in register.SERVICES_PHOENIX_ALLOWED else 0)
for i in range(len(services_keys_sorted))
]
# FIXME: retrieve from API
services_config_enabled = [
int(SERVICE_TYPE_DICT[svc_type].configurable)
for svc_type in services_keys_sorted
]
data = {
"service_name": "",
"service_url": "",
"service_config": "",
"invalid_config": False,
"cur_svc_type": cur_svc_type,
"service_types": svc_types,
"services_phoenix": register.SERVICES_PHOENIX_ALLOWED,
"services_phoenix_enabled": services_phoenix_enabled,
"services_config_enabled": services_config_enabled,
}
if "register" in self.request.POST:
service_name = self.request.POST.get("service_name")
service_url = self.request.POST.get("service_url")
service_type = self.request.POST.get("service_type")
service_push = self.request.POST.get("service_push")
service_config = self.request.POST.get("service_config")
json_config = None
if service_type in svc_types and SERVICE_TYPE_DICT[service_type].configurable:
json_config = None
if service_config:
json_config = is_json_body(service_config, return_body=True)
if json_config is None:
data.update({
# forward any fields to avoid dropping values filled by user
"service_name": service_name,
"service_type": service_type,
"service_url": service_url,
"service_config": service_config,
"service_push": service_push,
"invalid_config": True,
})
return self.add_template_data(data)
body = {
"service_name": service_name,
"service_url": service_url,
"service_type": service_type,
"service_push": service_push,
"configuration": json_config,
}
resp = request_api(self.request, schemas.ServicesAPI.path, "POST", data=body)
check_response(resp)
return HTTPFound(self.request.route_url("view_services", cur_svc_type=service_type))
return self.add_template_data(data)
@view_config(route_name="edit_service", renderer="templates/edit_service.mako")
[docs]
def edit_service(self):
cur_svc_type = self.request.matchdict["cur_svc_type"]
service_name = self.request.matchdict["service_name"]
service_data = self.get_service_data(service_name)
service_url = service_data["service_url"]
service_perm = {perm["name"] for perm in service_data["permissions"]}
service_id = service_data["resource_id"]
# apply default state if arriving on the page for the first time
# future editions on the page will transfer the last saved state
service_push_show = cur_svc_type in register.SERVICES_PHOENIX_ALLOWED
service_push = asbool(self.request.POST.get("service_push", False))
service_info = {
"edit_mode": "no_edit",
"public_url": register.get_twitcher_protected_service_url(service_name),
"service_name": service_name,
"service_url": service_url,
"service_perm": service_perm,
"service_id": service_id,
"service_push": service_push,
"service_push_show": service_push_show,
"service_sync_type": service_data.get("service_sync_type"),
"cur_svc_type": cur_svc_type,
} # type: Dict[str, Any]
svc_config = service_data["configuration"]
if not svc_config:
service_info["service_configuration"] = None
service_info["service_config_json"] = None
service_info["service_config_yaml"] = None
else:
svc_cfg_json = json.dumps(svc_config, ensure_ascii=False, indent=4).strip()
svc_cfg_yaml = yaml.safe_dump(svc_config, allow_unicode=True, indent=4, sort_keys=False).strip()
service_info["service_configuration"] = True
service_info["service_config_json"] = svc_cfg_json
service_info["service_config_yaml"] = svc_cfg_yaml
if "edit_name" in self.request.POST:
service_info["edit_mode"] = "edit_name"
if "save_name" in self.request.POST:
new_svc_name = self.request.POST.get("new_svc_name")
if service_name not in (new_svc_name, ""):
self.update_service_name(service_name, new_svc_name, service_push)
service_info["service_name"] = new_svc_name
service_info["public_url"] = register.get_twitcher_protected_service_url(new_svc_name)
service_info["edit_mode"] = "no_edit"
# return directly to "regenerate" the URL with the modified name
return HTTPFound(self.request.route_url("edit_service", **service_info))
if "edit_url" in self.request.POST:
service_info["edit_mode"] = "edit_url"
if "save_url" in self.request.POST:
new_svc_url = self.request.POST.get("new_svc_url")
if service_url not in (new_svc_url, ""):
self.update_service_url(service_name, new_svc_url, service_push)
service_info["service_url"] = new_svc_url
service_info["edit_mode"] = "no_edit"
if "delete" in self.request.POST:
service_data = json.dumps({"service_push": service_push})
path = schemas.ServiceAPI.path.format(service_name=service_name)
resp = request_api(self.request, path, "DELETE", data=service_data)
check_response(resp)
return HTTPFound(self.request.route_url("view_services", **service_info))
if "delete_child" in self.request.POST:
resource_id = self.request.POST.get("resource_id")
path = schemas.ResourceAPI.path.format(resource_id=resource_id)
resp = request_api(self.request, path, "DELETE")
check_response(resp)
if "add_child" in self.request.POST:
service_info["resource_id"] = self.request.POST.get("resource_id")
return HTTPFound(self.request.route_url("add_resource", **service_info))
resources, resources_id_type = self.get_service_resources(service_name)
path = schemas.ServiceAPI.path.format(service_name=service_name)
resp = request_api(self.request, path, "GET")
check_response(resp)
svc_body = get_json(resp)["service"]
# TODO: use an API request instead of direct access to `RESOURCE_TYPE_DICT`
service_info["resources"] = resources
service_info["resources_id_type"] = resources_id_type
service_info["resources_no_child"] = [name for name, res in RESOURCE_TYPE_DICT.items()
if not res.child_resource_allowed]
service_info["service_no_child"] = not svc_body["resource_child_allowed"]
return self.add_template_data(service_info)
@view_config(route_name="add_resource", renderer="templates/add_resource.mako")
[docs]
def add_resource(self):
cur_svc_type = self.request.matchdict["cur_svc_type"]
service_name = self.request.matchdict["service_name"]
resource_id = self.request.matchdict["resource_id"]
if "add_child" in self.request.POST:
resource_name = self.request.POST.get("resource_name")
resource_type = self.request.POST.get("resource_type")
data = {"resource_name": resource_name,
"resource_type": resource_type,
"parent_id": int(resource_id) if resource_id else None}
resp = request_api(self.request, schemas.ResourcesAPI.path, "POST", data=data,
headers={"Content-Type": CONTENT_TYPE_JSON})
check_response(resp)
return HTTPFound(self.request.route_url("edit_service",
service_name=service_name,
cur_svc_type=cur_svc_type))
path = schemas.ResourceTypesAPI.path.format(resource_id=resource_id)
resp = request_api(self.request, path, "GET")
check_response(resp)
svc_res_types = get_json(resp)["children_resource_types"]
data = {
"service_name": service_name,
"cur_svc_type": cur_svc_type,
"resource_id": resource_id,
"cur_svc_res": svc_res_types,
}
return self.add_template_data(data)