Source code for magpie.cli.sync_services
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Sync registered services in Magpie with resources retrieved from actual service.
.. seealso::
- :py:mod:`magpie.cli.sync_resources`
"""
import abc
from collections import OrderedDict, defaultdict
from typing import TYPE_CHECKING
import requests
import six
import threddsclient
from magpie.utils import CONTENT_TYPE_JSON
if TYPE_CHECKING:
# pylint: disable=W0611,unused-import
from typing import Dict, Type
from magpie.typedefs import AnyNestedChildrenTree, ServiceResourceTypeTree, Str
[docs]
def is_valid_resource_schema(resources):
# type: (AnyNestedChildrenTree) -> bool
"""
Validates the resource structure.
Expected dictionary is a tree of the following form:
.. code-block:: json
{
"resource_name_1": {
"children": {
"resource_name_3": {"children": {}},
"resource_name_4": {"children": {}}
}
}
"resource_name_2": {"children": {}}
}
"""
for values in resources.values():
if "children" not in values:
return False
if not isinstance(values["children"], (OrderedDict, dict)):
return False
return is_valid_resource_schema(values["children"])
return True
@six.add_metaclass(abc.ABCMeta)
[docs]
class SyncServiceInterface(object):
[docs]
sync_type = None # type: Str
def __init__(self, service_name, url):
[docs]
self.service_name = service_name
@property
@abc.abstractmethod
[docs]
def max_depth(self):
# type: () -> int
"""
The max depth at which remote resources are fetched.
"""
@abc.abstractmethod
[docs]
def get_resources(self):
# type: () -> ServiceResourceTypeTree
"""
This is the function actually fetching the data from the remote service. Implement this for every specific
service.
:return: The returned dictionary must be validated by 'is_valid_resource_schema'
"""
[docs]
class SyncServiceGeoserver(SyncServiceInterface):
[docs]
sync_type = "geoserver-api"
@property
[docs]
def max_depth(self):
# type: () -> None
return None
[docs]
def get_resources(self):
# type: () -> ServiceResourceTypeTree
# Only workspaces are fetched for now
resource_type = "route"
workspaces_url = "{}/{}".format(self.url, "workspaces")
resp = requests.get(workspaces_url, headers={"Accept": CONTENT_TYPE_JSON}, timeout=5)
resp.raise_for_status()
workspaces_list = resp.json().get("workspaces", {}).get("workspace", {})
workspaces = {w["name"]: {"children": {}, "resource_type": resource_type} for w in workspaces_list}
workspace_tree = {"workspaces": {"children": workspaces,
"resource_type": resource_type}}
resources = {"geoserver-api": {"children": workspace_tree,
"resource_type": resource_type}}
if not is_valid_resource_schema(resources):
raise ValueError("Error in SyncServiceInterface implementation")
return resources
[docs]
class SyncServiceProjectAPI(SyncServiceInterface):
[docs]
sync_type = "project-api"
@property
[docs]
def max_depth(self):
# type: () -> None
return None
[docs]
def get_resources(self):
# type: () -> ServiceResourceTypeTree
# Only workspaces are fetched for now
resource_type = "route"
projects_url = "/".join([self.url, "Projects"])
resp = requests.get(projects_url, timeout=5)
resp.raise_for_status()
projects = {p["id"]: {"children": {},
"resource_type": resource_type,
"resource_display_name": p["name"]}
for p in resp.json()}
resources = {self.service_name: {"children": projects, "resource_type": resource_type}}
if not is_valid_resource_schema(resources):
raise ValueError("Error in SyncServiceInterface implementation")
return resources
[docs]
class SyncServiceThredds(SyncServiceInterface):
@property
[docs]
def max_depth(self):
# type: () -> int
return 3
@staticmethod
[docs]
def _resource_id(resource):
id_ = resource.name
if len(resource.datasets) > 0:
id_ = resource.datasets[0].ID.split("/")[-1]
return id_
[docs]
def get_resources(self):
# type: () -> ServiceResourceTypeTree
def thredds_get_resources(url, depth):
cat = threddsclient.read_url(url, timeout=5)
name = self._resource_id(cat)
if depth == self.max_depth:
name = self.service_name
resource_type = "directory"
if cat.datasets and cat.datasets[0].content_type != "application/directory":
resource_type = "file"
tree_item = {name: {"children": {}, "resource_type": resource_type}}
if depth > 0:
for reference in cat.flat_references():
tree_item[name]["children"].update(thredds_get_resources(reference.url, depth - 1))
return tree_item
resources = thredds_get_resources(self.url, self.max_depth)
if not is_valid_resource_schema(resources):
raise ValueError("Error in SyncServiceInterface implementation")
return resources
[docs]
class SyncServiceDefault(SyncServiceInterface):
@property
[docs]
def max_depth(self):
# type: () -> None
return None
[docs]
def get_resources(self):
# type: () -> ServiceResourceTypeTree
return {}
[docs]
SYNC_SERVICES_TYPES = defaultdict(lambda: SyncServiceDefault) # type: Dict[Str, Type[SyncServiceInterface]]
for sync_svc in [SyncServiceThredds, SyncServiceGeoserver, SyncServiceProjectAPI]:
if sync_svc.sync_type in SYNC_SERVICES_TYPES:
raise KeyError("Duplicate sync service type identifiers not allowed")
SYNC_SERVICES_TYPES[sync_svc.sync_type] = sync_svc