Source code for magpie.owsrequest
"""
The OWSRequest is based on pywps code:
* https://github.com/geopython/pywps/tree/pywps-3.2/pywps/Parser
* https://github.com/geopython/pywps/blob/master/pywps/app/WPSRequest.py
"""
from typing import TYPE_CHECKING
import lxml.etree # nosec: B410 # module safe but bandit flags it : https://github.com/tiran/defusedxml/issues/38
from magpie.api.requests import get_multiformat_body
from magpie.utils import CONTENT_TYPE_FORM, CONTENT_TYPE_JSON, CONTENT_TYPE_PLAIN, get_header, get_logger, is_json_body
if TYPE_CHECKING:
from pyramid.request import Request
[docs]LOGGER = get_logger(__name__)
[docs]def ows_parser_factory(request):
# type: (Request) -> OWSParser
"""
Retrieve the appropriate ``OWSParser`` parser using the ``Content-Type`` header.
If the ``Content-Type`` header is missing or 'text/plain', and the request has a body,
try to parse the body as JSON and set the content-type to 'application/json'.
'application/x-www-form-urlencoded' ``Content-Type`` header is also handled correctly.
Otherwise, use the GET/POST WPS parsers.
"""
content_type = get_header("Content-Type", request.headers, default=None, split=";,")
if content_type is None or content_type == CONTENT_TYPE_PLAIN:
if is_json_body(request.body):
# default to json when can be parsed as json
request.headers["Content-Type"] = request.content_type = content_type = CONTENT_TYPE_JSON
if content_type in (CONTENT_TYPE_JSON, CONTENT_TYPE_FORM):
return MultiFormatParser(request)
if request.body:
return WPSPost(request)
return WPSGet(request)
[docs]class OWSParser(object):
def __init__(self, request):
self.request = request
self.params = {}
[docs] def parse(self, param_list):
for param_name in param_list:
self.params[param_name] = self._get_param_value(param_name)
return self.params
[docs] def _get_param_value(self, param):
raise NotImplementedError
[docs]class WPSGet(OWSParser):
"""
Basically a case-insensitive query string parser.
"""
def __init__(self, request):
super(WPSGet, self).__init__(request)
self.all_params = self._request_params()
[docs] def _request_params(self):
new_params = {}
for param in self.request.params:
new_params[param.lower()] = self.request.params[param].lower()
return new_params
[docs] def _get_param_value(self, param):
if param in self.all_params:
return self.all_params[param]
return None
[docs]def lxml_strip_ns(tree):
for node in tree.iter():
try:
has_namespace = node.tag.startswith("{")
except AttributeError:
continue # node.tag is not a string (node is a comment or similar)
if has_namespace:
node.tag = node.tag.split("}", 1)[1]
[docs]class WPSPost(OWSParser):
def __init__(self, request):
super(WPSPost, self).__init__(request)
self.document = lxml.etree.fromstring(self.request.body) # nosec: B410
lxml_strip_ns(self.document)
[docs] def _get_param_value(self, param):
if param in self.document.attrib:
return self.document.attrib[param].lower()
if param == "request":
return self.document.tag.lower()
return None