Source code for magpie.owsrequest

"""
The OWSRequest is based on pywps code:

* https://github.com/geopython/pywps/tree/pywps-3.2/pywps/Parser
* https://github.com/geopython/pywps/blob/master/pywps/app/WPSRequest.py
"""

from typing import TYPE_CHECKING

import lxml.etree  # nosec: B410 # module safe but bandit flags it : https://github.com/tiran/defusedxml/issues/38

from magpie.api.requests import get_multiformat_body
from magpie.utils import CONTENT_TYPE_FORM, CONTENT_TYPE_JSON, CONTENT_TYPE_PLAIN, get_header, get_logger, is_json_body

if TYPE_CHECKING:
    from pyramid.request import Request
[docs]LOGGER = get_logger(__name__)
[docs]def ows_parser_factory(request): # type: (Request) -> OWSParser """ Retrieve the appropriate ``OWSParser`` parser using the ``Content-Type`` header. If the ``Content-Type`` header is missing or 'text/plain', and the request has a body, try to parse the body as JSON and set the content-type to 'application/json'. 'application/x-www-form-urlencoded' ``Content-Type`` header is also handled correctly. Otherwise, use the GET/POST WPS parsers. """ content_type = get_header("Content-Type", request.headers, default=None, split=";,") if content_type is None or content_type == CONTENT_TYPE_PLAIN: if is_json_body(request.body): # default to json when can be parsed as json request.headers["Content-Type"] = request.content_type = content_type = CONTENT_TYPE_JSON if content_type in (CONTENT_TYPE_JSON, CONTENT_TYPE_FORM): return MultiFormatParser(request) if request.body: return WPSPost(request) return WPSGet(request)
[docs]class OWSParser(object): def __init__(self, request): self.request = request self.params = {}
[docs] def parse(self, param_list): for param_name in param_list: self.params[param_name] = self._get_param_value(param_name) return self.params
[docs] def _get_param_value(self, param): raise NotImplementedError
[docs]class WPSGet(OWSParser): """ Basically a case-insensitive query string parser. """ def __init__(self, request): super(WPSGet, self).__init__(request) self.all_params = self._request_params()
[docs] def _request_params(self): new_params = {} for param in self.request.params: new_params[param.lower()] = self.request.params[param].lower() return new_params
[docs] def _get_param_value(self, param): if param in self.all_params: return self.all_params[param] return None
[docs]def lxml_strip_ns(tree): for node in tree.iter(): try: has_namespace = node.tag.startswith("{") except AttributeError: continue # node.tag is not a string (node is a comment or similar) if has_namespace: node.tag = node.tag.split("}", 1)[1]
[docs]class WPSPost(OWSParser): def __init__(self, request): super(WPSPost, self).__init__(request) self.document = lxml.etree.fromstring(self.request.body) # nosec: B410 lxml_strip_ns(self.document)
[docs] def _get_param_value(self, param): if param in self.document.attrib: return self.document.attrib[param].lower() if param == "request": return self.document.tag.lower() return None
[docs]class MultiFormatParser(OWSParser):
[docs] def _get_param_value(self, param): return get_multiformat_body(self.request, param, None)