Source code for magpie.helpers.sync_services
from magpie.utils import CONTENT_TYPE_JSON
from collections import OrderedDict, defaultdict
from six import with_metaclass
from typing import TYPE_CHECKING
import abc
import requests
import threddsclient
if TYPE_CHECKING:
from magpie.definitions.typedefs import Dict, JSON, Str, Type # noqa: F401
[docs]def is_valid_resource_schema(resources):
# type: (JSON) -> bool
"""
Returns ``True`` if the structure of the input dictionary is a tree of the form::
{
"resource_name_1": {
"children": {
"resource_name_3": {"children": {}},
"resource_name_4": {"children": {}}
}
}
"resource_name_2": {"children": {}}
}
"""
for resource_name, values in resources.items():
if "children" not in values:
return False
if not isinstance(values["children"], (OrderedDict, dict)):
return False
return is_valid_resource_schema(values["children"])
return True
[docs]class SyncServiceInterface(with_metaclass(abc.ABCMeta)):
[docs] sync_type = None # type: Str
def __init__(self, service_name, url):
self.service_name = service_name
self.url = url
@property
@abc.abstractmethod
# type: () -> int
"""
The max depth at which remote resources are fetched.
"""
@abc.abstractmethod
[docs] def get_resources(self):
"""
This is the function actually fetching the data from the remote service. Implement this for every specific
service.
:return: The returned dictionary must be validated by 'is_valid_resource_schema'
"""
pass
[docs]class SyncServiceGeoserver(SyncServiceInterface):
[docs] sync_type = u"geoserver-api"
@property
[docs] def max_depth(self):
return None
[docs] def get_resources(self):
# Only workspaces are fetched for now
resource_type = "route"
workspaces_url = "{}/{}".format(self.url, "workspaces")
resp = requests.get(workspaces_url, headers={"Accept": CONTENT_TYPE_JSON})
resp.raise_for_status()
workspaces_list = resp.json().get("workspaces", {}).get("workspace", {})
workspaces = {w["name"]: {"children": {}, "resource_type": resource_type} for w in workspaces_list}
workspace_tree = {"workspaces": {"children": workspaces,
"resource_type": resource_type}}
resources = {"geoserver-api": {"children": workspace_tree,
"resource_type": resource_type}}
assert is_valid_resource_schema(resources), "Error in Interface implementation"
return resources
[docs]class SyncServiceProjectAPI(SyncServiceInterface):
[docs] sync_type = u"project-api"
@property
[docs] def max_depth(self):
return None
[docs] def get_resources(self):
# Only workspaces are fetched for now
resource_type = "route"
projects_url = "/".join([self.url, "Projects"])
resp = requests.get(projects_url)
resp.raise_for_status()
projects = {p["id"]: {"children": {},
"resource_type": resource_type,
"resource_display_name": p["name"]}
for p in resp.json()}
resources = {self.service_name: {"children": projects, "resource_type": resource_type}}
assert is_valid_resource_schema(resources), "Error in Interface implementation"
return resources
[docs]class SyncServiceThredds(SyncServiceInterface):
@property
[docs] def max_depth(self):
return 3
@staticmethod
[docs] def _resource_id(resource):
id_ = resource.name
if len(resource.datasets) > 0:
id_ = resource.datasets[0].ID.split("/")[-1]
return id_
[docs] def get_resources(self):
def thredds_get_resources(url, depth):
cat = threddsclient.read_url(url)
name = self._resource_id(cat)
if depth == self.max_depth:
name = self.service_name
resource_type = 'directory'
if cat.datasets and cat.datasets[0].content_type != "application/directory":
resource_type = 'file'
tree_item = {name: {'children': {}, 'resource_type': resource_type}}
if depth > 0:
for reference in cat.flat_references():
tree_item[name]['children'].update(thredds_get_resources(reference.url, depth - 1))
return tree_item
resources = thredds_get_resources(self.url, self.max_depth)
assert is_valid_resource_schema(resources), 'Error in Interface implementation'
return resources
[docs]class SyncServiceDefault(SyncServiceInterface):
@property
[docs] def max_depth(self):
return None
[docs] def get_resources(self):
return {}
[docs]SYNC_SERVICES_TYPES = defaultdict(lambda: SyncServiceDefault) # type: Dict[Str, Type[SyncServiceInterface]]
for sync_svc in [SyncServiceThredds, SyncServiceGeoserver, SyncServiceProjectAPI]:
if sync_svc.sync_type in SYNC_SERVICES_TYPES:
raise KeyError("Duplicate sync service type identifiers not allowed")
SYNC_SERVICES_TYPES[sync_svc.sync_type] = sync_svc