Source code for geonode.services.serviceprocessors.wms

#########################################################################
#
# Copyright (C) 2018 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
"""Utilities for enabling OGC WMS remote services in geonode."""
import json
import logging
import requests

from uuid import uuid4
from urllib.parse import (
    unquote,
    urlparse,
    urlsplit,
    urlencode,
    parse_qsl,
    ParseResult,
)

from django.conf import settings
from django.db import transaction
from django.template.defaultfilters import slugify
from django.utils.translation import ugettext as _

from geonode import GeoNodeException
from geonode.base.bbox_utils import BBOXHelper
from geonode.harvesting.models import Harvester
from geonode.harvesting.harvesters.wms import OgcWmsHarvester, WebMapService

from .. import enumerations
from ..enumerations import CASCADED
from ..enumerations import INDEXED
from .. import models
from .. import utils
from . import base

[docs] logger = logging.getLogger(__name__)
[docs] class WmsServiceHandler(base.ServiceHandlerBase, base.CascadableServiceHandlerMixin): """Remote service handler for OGC WMS services"""
[docs] service_type = enumerations.WMS
def __init__(self, url, geonode_service_id=None): base.ServiceHandlerBase.__init__(self, url, geonode_service_id)
[docs] self._parsed_service = None
[docs] self.indexing_method = INDEXED if self._offers_geonode_projection() else CASCADED
[docs] self.name = slugify(self.url)[:255]
@staticmethod
[docs] def get_cleaned_url_params(url): # Unquoting URL first so we don't loose existing args url = unquote(url) # Extracting url info parsed_url = urlparse(url) # Extracting URL arguments from parsed URL get_args = parsed_url.query # Converting URL arguments to dict parsed_get_args = dict(parse_qsl(get_args)) # Strip out redoundant args _version = parsed_get_args.pop("version", "1.3.0") if "version" in parsed_get_args else "1.3.0" _service = parsed_get_args.pop("service") if "service" in parsed_get_args else None _request = parsed_get_args.pop("request") if "request" in parsed_get_args else None # Converting URL argument to proper query string encoded_get_args = urlencode(parsed_get_args, doseq=True) # Creating new parsed result object based on provided with new # URL arguments. Same thing happens inside of urlparse. new_url = ParseResult( parsed_url.scheme, parsed_url.netloc, parsed_url.path, parsed_url.params, encoded_get_args, parsed_url.fragment, ) return (new_url, _service, _version, _request)
@property
[docs] def parsed_service(self): cleaned_url, service, version, request = WmsServiceHandler.get_cleaned_url_params(self.url) _url, _parsed_service = WebMapService(cleaned_url.geturl(), version=version) return _parsed_service
[docs] def probe(self): try: return True if len(self.parsed_service.contents) > 0 else False except Exception: return False
[docs] def create_cascaded_store(self, service): ogc_wms_url = service.service_url ogc_wms_get_capabilities = service.operations.get("GetCapabilities", None) if ogc_wms_get_capabilities and ogc_wms_get_capabilities.get("methods", None): for _op_method in ogc_wms_get_capabilities.get("methods"): if _op_method.get("type", None).upper() == "GET" and _op_method.get("url", None): ogc_wms_url = _op_method.get("url") store = self._get_store(create=True) store.capabilitiesURL = ogc_wms_url cat = store.catalog cat.save(store) return store
[docs] def create_geonode_service(self, owner, parent=None): """Create a new geonode.service.models.Service instance :arg owner: The user who will own the service instance :type owner: geonode.people.models.Profile """ cleaned_url, service, version, request = WmsServiceHandler.get_cleaned_url_params(self.url) with transaction.atomic(): instance = models.Service.objects.create( uuid=str(uuid4()), base_url=f"{cleaned_url.scheme}://{cleaned_url.netloc}{cleaned_url.path}".encode( "utf-8", "ignore" ).decode("utf-8"), extra_queryparams=cleaned_url.query, type=self.service_type, method=self.indexing_method, owner=owner, metadata_only=True, version=str(self.parsed_service.identification.version).encode("utf-8", "ignore").decode("utf-8"), name=self.name, title=str(self.parsed_service.identification.title).encode("utf-8", "ignore").decode("utf-8") or self.name, abstract=str(self.parsed_service.identification.abstract).encode("utf-8", "ignore").decode("utf-8") or _("Not provided"), operations=OgcWmsHarvester.get_wms_operations(self.parsed_service.url, version=version), ) service_harvester = Harvester.objects.create( name=self.name, default_owner=owner, scheduling_enabled=False, remote_url=instance.service_url, delete_orphan_resources_automatically=True, harvester_type=enumerations.HARVESTER_TYPES[self.service_type], harvester_type_specific_configuration=self.get_harvester_configuration_options(), ) if service_harvester.update_availability(): service_harvester.initiate_update_harvestable_resources() else: logger.exception(GeoNodeException("Could not reach remote endpoint.")) instance.harvester = service_harvester self.geonode_service_id = instance.id return instance
[docs] def get_keywords(self): return self.parsed_service.identification.keywords
[docs] def _get_cascaded_dataset_fields(self, geoserver_resource): name = geoserver_resource.name workspace = geoserver_resource.workspace.name if hasattr(geoserver_resource, "workspace") else None store = geoserver_resource.store if hasattr(geoserver_resource, "store") else None bbox = ( utils.decimal_encode(geoserver_resource.native_bbox) if hasattr(geoserver_resource, "native_bbox") else utils.decimal_encode(geoserver_resource.boundingBox) ) return { "name": name, "workspace": workspace or "remoteWorkspace", "store": store.name if store and hasattr(store, "name") else self.name, "typename": f"{workspace}:{name}" if workspace not in name else name, "alternate": f"{workspace}:{name}" if workspace not in name else name, "subtype": "remote", "title": geoserver_resource.title, "abstract": geoserver_resource.abstract, "bbox_polygon": BBOXHelper.from_xy([bbox[0], bbox[2], bbox[1], bbox[3]]).as_polygon(), "srid": bbox[4] if len(bbox) > 4 else "EPSG:4326", }
[docs] def _get_indexed_dataset_fields(self, dataset_meta): bbox = utils.decimal_encode(dataset_meta.boundingBox) if len(bbox) < 4: raise RuntimeError(f"Resource BBOX is not valid: {bbox}") return { "name": dataset_meta.name, "store": self.name, "subtype": "remote", "workspace": "remoteWorkspace", "typename": dataset_meta.name, "alternate": dataset_meta.name, "title": dataset_meta.title, "abstract": dataset_meta.abstract, "bbox_polygon": BBOXHelper.from_xy([bbox[0], bbox[2], bbox[1], bbox[3]]).as_polygon(), "srid": bbox[4] if len(bbox) > 4 else "EPSG:4326", "keywords": [keyword[:100] for keyword in dataset_meta.keywords], }
[docs] def _get_store(self, create=True): """Return the geoserver store associated with this service. The store may optionally be created if it doesn't exist already. Store is assumed to be (and created) named after the instance's name and belongs to the default geonode workspace for cascaded layers. """ workspace = base.get_geoserver_cascading_workspace(create=create) cat = workspace.catalog store = cat.get_store(self.name, workspace=workspace) if store is None and create: # store did not exist. Create it logger.debug(f"name: {self.name} - store: {store}") store = cat.create_wmsstore(name=self.name, workspace=workspace, user=cat.username, password=cat.password) return store
[docs] def _import_cascaded_resource(self, service, dataset_meta): """Import a layer into geoserver in order to enable cascading.""" store = self._get_store(create=False) if not store: store = self.create_cascaded_store(service) if not store: raise RuntimeError("Could not create WMS CASCADE store.") cat = store.catalog workspace = store.workspace dataset_resource = cat.get_resource(name=dataset_meta.id, store=store, workspace=workspace) if dataset_resource is None: dataset_resource = cat.create_wmslayer(workspace, store, dataset_meta.id) dataset_resource.projection = getattr(settings, "DEFAULT_MAP_CRS", "EPSG:3857") # Do not use the geoserver.support.REPROJECT enumeration until # https://github.com/boundlessgeo/gsconfig/issues/174 # has been fixed dataset_resource.projection_policy = "REPROJECT_TO_DECLARED" cat.save(dataset_resource) if dataset_resource is None: raise RuntimeError(f"Could not cascade resource {dataset_meta} through " "geoserver") dataset_resource = dataset_resource.resource else: logger.debug(f"Dataset {dataset_meta.id} is already present. Skipping...") dataset_resource.refresh() return dataset_resource
[docs] def _offers_geonode_projection(self): geonode_projection = getattr(settings, "DEFAULT_MAP_CRS", "EPSG:3857") contents_gen = self.parsed_service.contents.values() geonode_resources = list(r for r in contents_gen if not any(r.children)) if len(geonode_resources) > 0: first_dataset = geonode_resources[0] return geonode_projection in first_dataset.crsOptions else: return geonode_projection
[docs] class GeoNodeServiceHandler(WmsServiceHandler): """Remote service handler for OGC WMS services"""
[docs] service_type = enumerations.GN_WMS
def __init__(self, url, geonode_service_id=None): base.ServiceHandlerBase.__init__(self, url, geonode_service_id)
[docs] self.indexing_method = INDEXED
[docs] self.name = slugify(self.url)[:255]
@property
[docs] def parsed_service(self): cleaned_url, service, version, request = WmsServiceHandler.get_cleaned_url_params(self.ows_endpoint()) _url, _parsed_service = WebMapService(cleaned_url.geturl(), version=version) return _parsed_service
[docs] def probe(self): return base.ServiceHandlerBase.probe(self)
[docs] def get_harvester_configuration_options(self): return {"harvest_datasets": True, "harvest_documents": True, "copy_datasets": False, "copy_documents": False}
[docs] def ows_endpoint(self): url = urlsplit(self.url) base_url = f"{url.scheme}://{url.netloc}/" response = requests.get(f"{base_url}api/ows_endpoints/", {}, timeout=30, verify=False) content = response.content status = response.status_code content_type = response.headers["Content-Type"] # NEW-style OWS Enabled GeoNode if int(status) == 200 and "application/json" == content_type: try: _json_obj = json.loads(content) if "data" in _json_obj: data = _json_obj["data"] for ows_endpoint in data: if ows_endpoint["type"] in ("OGC:OWS", "OGC:WMS"): _params = url.query if url.query else "" _query_separator = "?" if "?" not in ows_endpoint["url"] else "" _url = f"{ows_endpoint['url']}{_query_separator}{_params}" return _url except Exception as e: logger.exception(e) return False # OLD-style not OWS Enabled GeoNode _url = f"{url.scheme}://{url.netloc}/geoserver/ows" return _url