Source code for magpie.owsrequest

"""
The OWSRequest is based on pywps code:

* https://github.com/geopython/pywps/tree/pywps-3.2/pywps/Parser
* https://github.com/geopython/pywps/blob/master/pywps/app/WPSRequest.py
"""

import abc
from typing import TYPE_CHECKING

import lxml.etree  # nosec: B410 # module safe but bandit flags it : https://github.com/tiran/defusedxml/issues/38

from magpie.api.requests import get_multiformat_body
from magpie.utils import CONTENT_TYPE_FORM, CONTENT_TYPE_JSON, CONTENT_TYPE_PLAIN, get_header, get_logger, is_json_body

if TYPE_CHECKING:
    from pyramid.request import Request
[docs]LOGGER = get_logger(__name__)
[docs]def ows_parser_factory(request): # type: (Request) -> OWSParser """ Retrieve the appropriate :class:`OWSParser` parser using the ``Content-Type`` header. If the ``Content-Type`` header is missing or ``text/plain``, and the request has a body, try to parse the body as JSON and set the content-type to ``application/json`` if successful. Handle XML-like ``Content-Type`` headers such as ``application/x-www-form-urlencoded`` whenever applicable. Otherwise, use the basic :class:`OWSGetParser` or :class:`OWSPostParser` according to the presence of a body. These provide minimal parsing to handle most typical `OGC Web Services` (:term:`OWS`) request parameters. """ content_type = get_header("Content-Type", request.headers, default=None, split=";,") if content_type is None or content_type == CONTENT_TYPE_PLAIN: if is_json_body(request.body): # default to json when can be parsed as json request.headers["Content-Type"] = request.content_type = content_type = CONTENT_TYPE_JSON if content_type in (CONTENT_TYPE_JSON, CONTENT_TYPE_FORM): return MultiFormatParser(request) if request.body: return OWSPostParser(request) return OWSGetParser(request)
[docs]class OWSParser(object): def __init__(self, request): self.request = request self.params = {}
[docs] def parse(self, param_list): """ Parses the initialized :attr:`request` to populate :attr:`params` retrieved from the parser. Once this method has been called, all expected parameters are guaranteed to exist within :attr`params`. Missing query parameters from the :attr:`request` will be set to ``None``. All query parameter names will be normalized to *lower* characters for easier retrieval. :param param_list: all known query parameters to the service. """ for param_name in param_list: self.params[param_name] = self._get_param_value(param_name) return self.params
@abc.abstractmethod
[docs] def _get_param_value(self, param): raise NotImplementedError
[docs]class OWSGetParser(OWSParser): """ Basically a case-insensitive query string parser. """ def __init__(self, request): super(OWSGetParser, self).__init__(request) self._params = self._request_params()
[docs] def _request_params(self): new_params = {} for param in self.request.params: new_params[param.lower()] = self.request.params[param].lower() return new_params
[docs] def _get_param_value(self, param): if param in self._params: return self._params[param] return None
[docs]def lxml_strip_ns(tree): for node in tree.iter(): try: has_namespace = node.tag.startswith("{") except AttributeError: continue # node.tag is not a string (node is a comment or similar) if has_namespace: node.tag = node.tag.split("}", 1)[1]
[docs]class OWSPostParser(OWSParser): def __init__(self, request): super(OWSPostParser, self).__init__(request) self.document = lxml.etree.fromstring(self.request.body) # nosec: B410 lxml_strip_ns(self.document)
[docs] def _get_param_value(self, param): if param in self.document.attrib: return self.document.attrib[param].lower() if param == "request": return self.document.tag.lower() for section in self.document: if param == section.tag.lower(): return section.text.strip() return None
[docs]class MultiFormatParser(OWSParser):
[docs] def _get_param_value(self, param): return get_multiformat_body(self.request, param, None)