Source code for geonode.services.tests

#########################################################################
#
# Copyright (C) 2016 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################

from unittest.mock import MagicMock
import mock
import logging

from flaky import flaky
from selenium import webdriver
from urllib.error import HTTPError
from collections import namedtuple
from arcrest import MapService as ArcMapService
from unittest import TestCase as StandardTestCase
from owslib.wms import WebMapService as OwsWebMapService

from django.test import Client, override_settings
from django.urls import reverse
from django.db.utils import IntegrityError
from django.contrib.auth import get_user_model
from django.template.defaultfilters import slugify
from django.contrib.staticfiles.testing import StaticLiveServerTestCase

from owslib.map.wms111 import ContentMetadata

from geonode.layers.models import Dataset
from geonode.tests.base import GeoNodeBaseTestSupport
from geonode.resource.manager import resource_manager
from geonode.base import enumerations as base_enumerations
from geonode.harvesting.harvesters.wms import WebMapService
from geonode.services.utils import parse_services_types, test_resource_table_status

from . import enumerations, forms
from .models import Service
from .serviceprocessors import base, wms, arcgis, get_service_handler, get_available_service_types
from .serviceprocessors.arcgis import ArcImageServiceHandler, ArcMapServiceHandler, MapLayer

[docs] logger = logging.getLogger(__name__)
[docs] class ModuleFunctionsTestCase(StandardTestCase): @mock.patch("geonode.services.serviceprocessors.base.catalog", autospec=True) @mock.patch("geonode.services.serviceprocessors.base.settings", autospec=True)
[docs] def test_get_cascading_workspace_returns_existing(self, mock_settings, mock_catalog): mock_settings.OGC_SERVER = { "default": { "LOCATION": "nowhere/", "USER": "nouser", "PASSWORD": "nopass", } } mock_settings.CASCADE_WORKSPACE = "something" phony_workspace = "fake" cat = mock_catalog cat.get_workspace.return_value = phony_workspace result = base.get_geoserver_cascading_workspace(create=False) self.assertEqual(result, phony_workspace) cat.get_workspace.assert_called_with(mock_settings.CASCADE_WORKSPACE)
@mock.patch("geonode.services.serviceprocessors.base.catalog", autospec=True) @mock.patch("geonode.services.serviceprocessors.base.settings", autospec=True)
[docs] def test_get_cascading_workspace_creates_new_workspace(self, mock_settings, mock_catalog): mock_settings.OGC_SERVER = { "default": { "LOCATION": "nowhere/", "USER": "nouser", "PASSWORD": "nopass", } } mock_settings.CASCADE_WORKSPACE = "something" phony_workspace = "fake" cat = mock_catalog cat.get_workspace.return_value = None cat.create_workspace.return_value = phony_workspace result = base.get_geoserver_cascading_workspace(create=True) self.assertEqual(result, phony_workspace) cat.get_workspace.assert_called_with(mock_settings.CASCADE_WORKSPACE) cat.create_workspace.assert_called_with( mock_settings.CASCADE_WORKSPACE, f"http://www.geonode.org/{mock_settings.CASCADE_WORKSPACE}" )
@mock.patch("geonode.services.serviceprocessors.get_available_service_types", autospec=True)
[docs] def test_get_service_handler_wms(self, mock_wms_handler): _handler = MagicMock() mock_wms_handler.return_value = { enumerations.WMS: {"OWS": True, "handler": _handler, "label": "Web Map Service"} } phony_url = "http://fake" get_service_handler(phony_url, service_type=enumerations.WMS) _handler.assert_called_with(phony_url, None)
@mock.patch("arcrest.MapService", autospec=True)
[docs] def test_get_service_handler_arcgis(self, mock_map_service): mock_arcgis_service_contents = { "currentVersion": 10.51, "serviceDescription": "Droits petroliers et gaziers / Oil and Gas Rights", "mapName": "Droits_petroliers_et_gaziers_Oil_and_Gas_Rights", "description": "", "copyrightText": "", "supportsDynamicLayers": False, "layers": [ { "id": 0, "name": "Droits pétroliers et gaziers / Oil and Gas Rights", "parentLayerId": -1, "defaultVisibility": True, "subLayerIds": None, "minScale": 0, "maxScale": 0, } ], "tables": [], "spatialReference": {"wkid": 4140, "latestWkid": 4617}, "singleFusedMapCache": False, "initialExtent": { "xmin": -144.97375000000002, "ymin": 58.90551066699999, "xmax": -57.55125000000002, "ymax": 91.84630866699999, "spatialReference": {"wkid": 4140, "latestWkid": 4617}, }, "fullExtent": { "xmin": -144.97375, "ymin": 34.637024667000006, "xmax": -57.55125, "ymax": 91.84630866699999, "spatialReference": {"wkid": 4140, "latestWkid": 4617}, }, "minScale": 0, "maxScale": 0, "units": "esriDecimalDegrees", "supportedImageFormatTypes": "PNG32,PNG24,PNG,JPG,DIB,TIFF,EMF,PS,PDF,GIF,SVG,SVGZ,BMP", "documentInfo": { "Title": "Droits petroliers et gaziers / Oil and Gas Rights", "Author": "", "Comments": "Droits petroliers et gaziers / Oil and Gas Rights", "Subject": "Droits petroliers et gaziers / Oil and Gas Rights", "Category": "", "AntialiasingMode": "None", "TextAntialiasingMode": "Force", "Keywords": "Droits petroliers et gaziers,Oil and Gas Rights", }, "capabilities": "Map,Query,Data", "supportedQueryFormats": "JSON, AMF, geoJSON", "exportTilesAllowed": False, "supportsDatumTransformation": True, "maxRecordCount": 1000, "maxImageHeight": 2048, "maxImageWidth": 2048, "supportedExtensions": "FeatureServer, KmlServer, WFSServer, WMSServer", } mock_arcgis_service_json_struct = { "supportsDynamicLayers": False, "initialExtent": { "xmin": -144.97375000000002, "ymin": 58.90551066699999, "ymax": 91.84630866699999, "xmax": -57.55125000000002, "spatialReference": {"wkid": 4140, "latestWkid": 4617}, }, "documentInfo": { "Category": "", "Author": "", "TextAntialiasingMode": "Force", "Title": "Droits petroliers et gaziers / Oil and Gas Rights", "Comments": "Droits petroliers et gaziers / Oil and Gas Rights", "AntialiasingMode": "None", "Keywords": "Droits petroliers et gaziers,Oil and Gas Rights", "Subject": "Droits petroliers et gaziers / Oil and Gas Rights", }, "spatialReference": {"wkid": 4140, "latestWkid": 4617}, "description": "", "layers": [ { "name": "Droits pétroliers et gaziers / Oil and Gas Rights", "maxScale": 0, "defaultVisibility": True, "parentLayerId": -1, "id": 0, "minScale": 0, "subLayerIds": None, } ], "tables": [], "supportedImageFormatTypes": "PNG32,PNG24,PNG,JPG,DIB,TIFF,EMF,PS,PDF,GIF,SVG,SVGZ,BMP", "capabilities": "Map,Query,Data", "mapName": "Droits_petroliers_et_gaziers_Oil_and_Gas_Rights", "currentVersion": 10.51, "units": "esriDecimalDegrees", "supportedQueryFormats": "JSON, AMF, geoJSON", "maxRecordCount": 1000, "exportTilesAllowed": False, "maxImageHeight": 2048, "supportedExtensions": "FeatureServer, KmlServer, WFSServer, WMSServer", "fullExtent": { "xmin": -144.97375, "ymin": 34.637024667000006, "ymax": 91.84630866699999, "xmax": -57.55125, "spatialReference": {"wkid": 4140, "latestWkid": 4617}, }, "singleFusedMapCache": False, "supportsDatumTransformation": True, "maxImageWidth": 2048, "maxScale": 0, "copyrightText": "", "minScale": 0, "serviceDescription": "Droits petroliers et gaziers / Oil and Gas Rights", } phony_url = "http://fake" mock_parsed_arcgis = mock.MagicMock(ArcMapService).return_value (url, mock_parsed_arcgis) = mock.MagicMock( ArcMapService, return_value=(phony_url, mock_parsed_arcgis) ).return_value mock_parsed_arcgis.url = phony_url mock_parsed_arcgis._contents = mock_arcgis_service_contents mock_parsed_arcgis._json_struct = mock_arcgis_service_json_struct mock_map_service.return_value = (phony_url, mock_parsed_arcgis) handler = arcgis.ArcImageServiceHandler(phony_url) self.assertEqual(handler.url, phony_url) LayerESRIExtent = namedtuple("LayerESRIExtent", "spatialReference xmin ymin ymax xmax") LayerESRIExtentSpatialReference = namedtuple("LayerESRIExtentSpatialReference", "wkid latestWkid") dataset_meta = MapLayer( id=0, title="Droits pétroliers et gaziers / Oil and Gas Rights", abstract="Droits pétroliers et gaziers / Oil and Gas Rights", type="Feature Dataset", geometryType="esriGeometryPolygon", copyrightText="", extent=LayerESRIExtent( LayerESRIExtentSpatialReference(4140, 4617), -144.97375, 34.637024667000006, 91.84630866699999, -57.55125, ), fields=[ {"alias": "OBJECTID", "domain": None, "type": "esriFieldTypeOID", "name": "OBJECTID"}, { "alias": "Numéro du titre / Title Number", "length": 16, "type": "esriFieldTypeString", "name": "LICENCE_NUMBER", "domain": None, }, { "alias": "Superficie actuelle (ha) / Current Area (ha)", "domain": None, "type": "esriFieldTypeDouble", "name": "CURRENT_AREA_HA", }, { "alias": "Code du type de permis / Licence Type Code", "length": 5, "type": "esriFieldTypeString", "name": "AGRMT_TYPE", "domain": None, }, {"alias": "Datum", "length": 8, "type": "esriFieldTypeString", "name": "DATUM", "domain": None}, { "alias": "Région (anglais) / Region (English)", "length": 64, "type": "esriFieldTypeString", "name": "REGION_E", "domain": None, }, { "alias": "Région (français) / Region (French)", "length": 64, "type": "esriFieldTypeString", "name": "REGION_F", "domain": None, }, { "alias": "Représentant / Representative", "length": 50, "type": "esriFieldTypeString", "name": "COMPANY_NAME", "domain": None, }, { "alias": "Date d'entrée en vigueur / Effective Date", "length": 8, "type": "esriFieldTypeDate", "name": "LICENCE_ISSUE_DATE", "domain": None, }, { "alias": "Date d'échéance / Expiry Date", "length": 8, "type": "esriFieldTypeDate", "name": "LICENCE_EXPIRY_DATE", "domain": None, }, { "alias": "Type d'accord (anglais) / Agreement Type (English)", "length": 50, "type": "esriFieldTypeString", "name": "AGRMT_TYPE_E", "domain": None, }, { "alias": "Type d'accord (français) / Agreement Type (French)", "length": 50, "type": "esriFieldTypeString", "name": "AGRMT_TYPE_F", "domain": None, }, {"alias": "Shape", "domain": None, "type": "esriFieldTypeGeometry", "name": "SHAPE"}, ], minScale=0, maxScale=0, ) resource_fields = handler._get_indexed_dataset_fields(dataset_meta) self.assertEqual(resource_fields["alternate"], f"{slugify(phony_url)}:{dataset_meta.id}")
@mock.patch("arcrest.MapService", autospec=True)
[docs] def test_get_arcgis_alternative_structure(self, mock_map_service): LayerESRIExtent = namedtuple("LayerESRIExtent", "spatialReference xmin ymin ymax xmax") LayerESRIExtentSpatialReference = namedtuple("LayerESRIExtentSpatialReference", "wkid latestWkid") mock_arcgis_service_contents = { "copyrightText": "", "description": "", "documentInfo": { "Author": "Administrator", "Category": "", "Comments": "", "Keywords": "", "Subject": "", "Title": "basemap_ortofoto_AGEA2011", }, "fullExtent": { "xmax": 579764.2319999984, "xmin": 386130.6820000001, "ymax": 4608909.064, "ymin": 4418016.7140000025, }, "initialExtent": { "xmax": 605420.5635976626, "xmin": 349091.7176066373, "ymax": 4608197.140968505, "ymin": 4418728.637031497, }, "layers": [ { "copyrightText": "", "definitionExpression": "", "description": "", "displayField": "", "extent": LayerESRIExtent( LayerESRIExtentSpatialReference(None, None), 570962.7069999985, 4600232.139, 394932.207, 4426693.639000002, ), "fields": [], "geometryType": "", "id": 1, "maxScale": 0.0, "minScale": 0.0, "name": "Regione_Campania.ecw", "title": "Regione_Campania.ecw", "parentLayer": {"id": -1, "name": "-1"}, "subLayers": [], "type": "Raster Dataset", } ], "mapName": "Layers", "serviceDescription": "", "singleFusedMapCache": True, "spatialReference": None, "tileInfo": { "cols": 512, "compressionQuality": 0, "dpi": 96, "format": "PNG8", "lods": [ {"level": 0, "resolution": 185.20870375074085, "scale": 700000.0}, {"level": 1, "resolution": 66.1459656252646, "scale": 250000.0}, {"level": 2, "resolution": 26.458386250105836, "scale": 100000.0}, {"level": 3, "resolution": 19.843789687579378, "scale": 75000.0}, {"level": 4, "resolution": 13.229193125052918, "scale": 50000.0}, {"level": 5, "resolution": 6.614596562526459, "scale": 25000.0}, {"level": 6, "resolution": 2.6458386250105836, "scale": 10000.0}, {"level": 7, "resolution": 1.3229193125052918, "scale": 5000.0}, {"level": 8, "resolution": 0.5291677250021167, "scale": 2000.0}, ], "origin": {"x": 289313.907000001, "y": 4704355.239}, "rows": 512, "spatialReference": None, }, "units": "esriMeters", } phony_url = "http://sit.cittametropolitana.na.it/arcgis/rest/services/basemap_ortofoto_AGEA2011/MapServer" mock_parsed_arcgis = mock.MagicMock(ArcMapService).return_value (url, mock_parsed_arcgis) = mock.MagicMock( ArcMapService, return_value=(phony_url, mock_parsed_arcgis) ).return_value mock_parsed_arcgis.url = phony_url mock_parsed_arcgis.layers = mock_arcgis_service_contents["layers"] mock_parsed_arcgis._contents = mock_arcgis_service_contents mock_parsed_arcgis._json_struct = mock_arcgis_service_contents mock_map_service.return_value = (phony_url, mock_parsed_arcgis) handler = arcgis.ArcImageServiceHandler(phony_url) self.assertEqual(handler.url, phony_url) dataset_meta = handler._dataset_meta(mock_parsed_arcgis.layers[0]) self.assertIsNotNone(dataset_meta) self.assertEqual(dataset_meta.id, 1) resource_fields = handler._get_indexed_dataset_fields(dataset_meta) self.assertEqual(resource_fields["alternate"], f"{slugify(phony_url)}:{dataset_meta.id}") test_user, created = get_user_model().objects.get_or_create(username="serviceowner") if created: test_user.set_password("somepassword") test_user.save() try: result = handler.create_geonode_service(test_user) geonode_service, created = Service.objects.get_or_create(base_url=result.base_url, owner=test_user) for _d in Dataset.objects.filter(remote_service=geonode_service): resource_manager.delete(_d.uuid, instance=_d) handler._harvest_resource(dataset_meta, geonode_service) geonode_dataset = Dataset.objects.filter(remote_service=geonode_service).get() self.assertIsNotNone(geonode_dataset) self.assertNotEqual(geonode_dataset.srid, "EPSG:4326") self.assertEqual(geonode_dataset.sourcetype, base_enumerations.SOURCE_TYPE_REMOTE) self.client.login(username="admin", password="admin") response = self.client.get(reverse("dataset_embed", args=(geonode_dataset.name,))) self.assertEqual(response.status_code, 200) for _d in Dataset.objects.filter(remote_service=geonode_service): resource_manager.delete(_d.uuid, instance=_d) except (Service.DoesNotExist, HTTPError) as e: # In the case the Service URL becomes inaccessible for some reason logger.error(e)
[docs] class WmsServiceHandlerTestCase(GeoNodeBaseTestSupport):
[docs] def setUp(self): super().setUp() self.phony_url = "http://a-really-long-and-fake-name-here-so-that-" "we-use-it-in-tests" self.phony_title = "a generic title" self.phony_version = "s.version" self.phony_dataset_name = "phony_name" self.phony_keywords = ["first", "second"] mock_parsed_wms = mock.MagicMock(OwsWebMapService).return_value (url, mock_parsed_wms) = mock.MagicMock( WebMapService, return_value=(self.phony_url, mock_parsed_wms) ).return_value mock_parsed_wms.provider.url = self.phony_url mock_parsed_wms.identification.abstract = None mock_parsed_wms.identification.title = self.phony_title mock_parsed_wms.identification.version = self.phony_version mock_parsed_wms.identification.keywords = self.phony_keywords mock_parsed_wms_getcapa_operation = { "name": "GetCapabilities", "methods": [{"type": "Get", "url": self.phony_url}], } mock_parsed_wms.operations = [ mock_parsed_wms_getcapa_operation, ] mock_dataset_meta = mock.MagicMock(ContentMetadata) mock_dataset_meta.name = self.phony_dataset_name mock_dataset_meta.title = self.phony_dataset_name mock_dataset_meta.abstract = "" mock_dataset_meta.keywords = [] mock_dataset_meta.children = [] mock_dataset_meta.crsOptions = ["EPSG:3857"] mock_dataset_meta.boundingBox = [-5000, -5000, 5000, 5000, "EPSG:3857"] mock_parsed_wms.contents = { mock_dataset_meta.name: mock_dataset_meta, } self.parsed_wms = mock_parsed_wms self.test_user, created = get_user_model().objects.get_or_create(username="serviceowner") if created: self.test_user.set_password("somepassword") self.test_user.save() self.local_user, created = get_user_model().objects.get_or_create(username="serviceuser") if created: self.local_user.set_password("somepassword") self.local_user.save()
@mock.patch("geonode.harvesting.harvesters.wms.WebMapService") @mock.patch("geonode.services.serviceprocessors.wms.WmsServiceHandler.parsed_service", autospec=True)
[docs] def test_has_correct_url(self, mock_wms_parsed_service, mock_wms): mock_wms.return_value = (self.phony_url, self.parsed_wms) mock_wms_parsed_service.return_value = self.parsed_wms handler = wms.WmsServiceHandler(self.phony_url) self.assertEqual(handler.url, self.phony_url)
@mock.patch("geonode.harvesting.harvesters.wms.WebMapService") @mock.patch("geonode.services.serviceprocessors.wms.WmsServiceHandler.parsed_service", autospec=True)
[docs] def test_has_valid_name_when_no_title_exists(self, mock_wms_parsed_service, mock_wms): mock_wms.return_value = (self.phony_url, self.parsed_wms) mock_wms.return_value[1].identification.title = "" mock_wms_parsed_service.return_value = self.parsed_wms handler = wms.WmsServiceHandler(self.phony_url) self.assertEqual(handler.name, slugify(self.phony_url)[:255])
@mock.patch("geonode.harvesting.harvesters.wms.WebMapService") @mock.patch("geonode.services.serviceprocessors.wms.WmsServiceHandler.parsed_service", autospec=True)
[docs] def test_has_valid_name_when_title_exists(self, mock_wms_parsed_service, mock_wms): mock_wms.return_value = (self.phony_url, self.parsed_wms) mock_wms_parsed_service.return_value = self.parsed_wms handler = wms.WmsServiceHandler(self.phony_url) self.assertNotEqual(handler.name, slugify(self.phony_title)) self.assertEqual("a-generic-title", slugify(self.phony_title))
@mock.patch("geonode.harvesting.harvesters.wms.WebMapService") @mock.patch("geonode.services.serviceprocessors.wms.WmsServiceHandler.parsed_service", autospec=True)
[docs] def test_has_correct_service_type(self, mock_wms_parsed_service, mock_wms): mock_wms.return_value = (self.phony_url, self.parsed_wms) mock_wms_parsed_service.return_value = self.parsed_wms handler = wms.WmsServiceHandler(self.phony_url) self.assertEqual(handler.service_type, enumerations.WMS)
@mock.patch("geonode.harvesting.harvesters.wms.WebMapService") @mock.patch("geonode.services.serviceprocessors.wms.WmsServiceHandler.parsed_service", autospec=True) @mock.patch("geonode.services.serviceprocessors.wms.settings", autospec=True)
[docs] def test_detects_indexed_service(self, mock_settings, mock_wms_parsed_service, mock_wms): mock_settings.DEFAULT_MAP_CRS = "EPSG:3857" mock_wms.return_value = (self.phony_url, self.parsed_wms) mock_wms_parsed_service.return_value = self.parsed_wms handler = wms.WmsServiceHandler(self.phony_url) self.assertEqual(handler.indexing_method, enumerations.INDEXED)
@mock.patch("geonode.harvesting.harvesters.wms.WebMapService") @mock.patch("geonode.services.serviceprocessors.wms.WmsServiceHandler.parsed_service", autospec=True) @mock.patch("geonode.services.serviceprocessors.wms.settings", autospec=True)
[docs] def test_detects_cascaded_service(self, mock_settings, mock_wms_parsed_service, mock_wms): mock_settings.DEFAULT_MAP_CRS = "EPSG:3857" mock_dataset_meta = mock.MagicMock(ContentMetadata) mock_dataset_meta.name = "phony_name" mock_dataset_meta.children = [] mock_dataset_meta.crsOptions = ["EPSG:4326"] self.parsed_wms.contents = { mock_dataset_meta.name: mock_dataset_meta, } mock_wms.return_value = (self.phony_url, self.parsed_wms) mock_wms_parsed_service.return_value = self.parsed_wms handler = wms.WmsServiceHandler(self.phony_url) self.assertEqual(handler.indexing_method, enumerations.INDEXED)
@mock.patch("geonode.harvesting.harvesters.wms.WebMapService") @mock.patch.object(wms.WmsServiceHandler, "parsed_service")
[docs] def test_create_geonode_service(self, mock_wms_parsed_service, mock_wms): mock_wms.return_value = (self.phony_url, self.parsed_wms) mock_wms_parsed_service.return_value = self.parsed_wms mock_wms_parsed_service.provider.url = self.phony_url mock_wms_parsed_service.identification.title = self.phony_title mock_wms_parsed_service.identification.version = self.phony_version handler = wms.WmsServiceHandler(self.phony_url) result = handler.create_geonode_service(self.test_user) self.assertEqual(result.base_url, self.phony_url) self.assertEqual(result.type, handler.service_type) self.assertEqual(result.method, handler.indexing_method) self.assertEqual(result.owner, self.test_user) self.assertEqual(result.version, self.phony_version) self.assertEqual(result.name, handler.name) self.assertEqual(result.title, self.phony_title) # mata_data_only is set to Try self.assertTrue(result.metadata_only)
@mock.patch("geonode.harvesting.harvesters.wms.WebMapService") @mock.patch("geonode.services.serviceprocessors.wms.WmsServiceHandler.parsed_service", autospec=True)
[docs] def test_geonode_service_uses_given_getmap_params(self, mock_wms_parsed_service, mock_wms): phony_url = ( "https://www.geoportal.hessen.de/mapbender/php/wms.php?" "layer_id=36995&PHPSESSID=27jb139lqk29rmul77beuji261&" "withChilds=1&" "version=1.1.1&" "REQUEST=GetCapabilities&" "SERVICE=WMS" ) mock_wms.return_value = (phony_url, self.parsed_wms) mock_wms_parsed_service.return_value = self.parsed_wms mock_wms_parsed_service.provider.url = self.phony_url mock_wms_parsed_service.identification.title = self.phony_title mock_wms_parsed_service.identification.version = self.phony_version handler = wms.WmsServiceHandler(phony_url) result = handler.create_geonode_service(self.test_user) self.assertEqual(result.base_url, "https://www.geoportal.hessen.de/mapbender/php/wms.php") self.assertEqual( result.extra_queryparams, "layer_id=36995&PHPSESSID=27jb139lqk29rmul77beuji261&withChilds=1&REQUEST=GetCapabilities&SERVICE=WMS", ) self.assertEqual(result.service_url, f"{result.base_url}?{result.extra_queryparams}") self.assertEqual(result.type, handler.service_type) self.assertEqual(result.method, handler.indexing_method) self.assertEqual(result.owner, self.test_user) self.assertEqual(result.version, self.phony_version) self.assertEqual(result.name, handler.name) self.assertEqual(result.title, self.phony_title) # mata_data_only is set to Try self.assertTrue(result.metadata_only) self.assertDictEqual( result.operations, { "GetCapabilities": { "name": "GetCapabilities", "methods": [ {"type": "Get", "url": "http://a-really-long-and-fake-name-here-so-that-we-use-it-in-tests"} ], "formatOptions": [], } }, )
@mock.patch("geonode.harvesting.harvesters.wms.WebMapService") @mock.patch.object(wms.WmsServiceHandler, "parsed_service")
[docs] def test_get_keywords(self, mock_wms_parsed_service, mock_wms): mock_wms.return_value = (self.phony_url, self.parsed_wms) mock_wms_parsed_service.return_value = self.parsed_wms mock_wms_parsed_service.identification.keywords = self.phony_keywords mock_wms_parsed_service.identification.title = self.phony_title mock_wms_parsed_service.identification.version = self.phony_version handler = wms.WmsServiceHandler(self.phony_url) result = handler.get_keywords() self.assertEqual(result, self.phony_keywords)
@mock.patch("geonode.harvesting.harvesters.wms.WebMapService") @mock.patch("geonode.services.serviceprocessors.wms.WmsServiceHandler.parsed_service", autospec=True)
[docs] def test_get_resource(self, mock_wms_parsed_service, mock_wms): mock_wms.return_value = (self.phony_url, self.parsed_wms) mock_wms_parsed_service.return_value = self.parsed_wms handler = wms.WmsServiceHandler(self.phony_url) result = handler.get_resource(self.phony_dataset_name) self.assertIsNone(result)
@mock.patch("geonode.harvesting.harvesters.wms.WebMapService") @mock.patch("geonode.services.serviceprocessors.wms.WmsServiceHandler.parsed_service", autospec=True) @mock.patch("geonode.services.serviceprocessors.wms.WmsServiceHandler.get_resources", autospec=True) @mock.patch("geonode.services.serviceprocessors.wms.WmsServiceHandler.get_resource", autospec=True)
[docs] def test_get_resources(self, mock_wms_get_resource, mock_wms_get_resources, mock_wms_parsed_service, mock_wms): mock_wms.return_value = (self.phony_url, self.parsed_wms) mock_wms_parsed_service.return_value = self.parsed_wms mock_wms_parsed_service.provider.url = self.phony_url mock_wms_parsed_service.identification.title = self.phony_title mock_wms_parsed_service.identification.version = self.phony_version mock_wms_get_resource.return_value = list(self.parsed_wms.contents.values())[0] mock_wms_get_resources.return_value = self.parsed_wms.contents.values() handler = wms.WmsServiceHandler(self.phony_url) result = list(handler.get_resources()) self.assertEqual(len(result), 1) test_user, created = get_user_model().objects.get_or_create(username="serviceowner") if created: test_user.set_password("somepassword") test_user.save() result = handler.create_geonode_service(test_user) try: geonode_service, created = Service.objects.get_or_create(base_url=result.base_url, owner=test_user) for _d in Dataset.objects.filter(remote_service=geonode_service): resource_manager.delete(_d.uuid, instance=_d) result = list(handler.get_resources()) dataset_meta = handler.get_resource(result[0].name) resource_fields = handler._get_indexed_dataset_fields(dataset_meta) keywords = resource_fields.pop("keywords") resource_fields["keywords"] = keywords resource_fields["is_approved"] = True resource_fields["is_published"] = True except Service.DoesNotExist as e: # In the case the Service URL becomes inaccessible for some reason logger.error(e)
@mock.patch("geonode.harvesting.harvesters.wms.WebMapService") @mock.patch("geonode.services.serviceprocessors.wms.WmsServiceHandler.parsed_service", autospec=True) @mock.patch("geonode.services.serviceprocessors.wms.settings", autospec=True)
[docs] def test_offers_geonode_projection(self, mock_settings, mock_wms_parsed_service, mock_wms): mock_settings.DEFAULT_MAP_CRS = "EPSG:3857" mock_wms.return_value = (self.phony_url, self.parsed_wms) mock_wms_parsed_service.return_value = self.parsed_wms handler = wms.WmsServiceHandler(self.phony_url) result = handler._offers_geonode_projection() self.assertTrue(result)
@mock.patch("geonode.harvesting.harvesters.wms.WebMapService") @mock.patch("geonode.services.serviceprocessors.wms.WmsServiceHandler.parsed_service", autospec=True) @mock.patch("geonode.services.serviceprocessors.wms.settings", autospec=True)
[docs] def test_does_not_offer_geonode_projection(self, mock_settings, mock_wms_parsed_service, mock_wms): mock_settings.DEFAULT_MAP_CRS = "EPSG:3857" mock_wms.return_value = (self.phony_url, self.parsed_wms) mock_wms_parsed_service.return_value = self.parsed_wms self.parsed_wms.contents[self.phony_dataset_name].crsOptions = ["EPSG:4326"] handler = wms.WmsServiceHandler(self.phony_url) result = handler._offers_geonode_projection() self.assertEqual(result, "EPSG:3857")
@mock.patch("geonode.harvesting.harvesters.wms.WebMapService") @mock.patch("geonode.services.serviceprocessors.wms.WmsServiceHandler.parsed_service", autospec=True) @mock.patch("geonode.services.serviceprocessors.base.get_geoserver_" "cascading_workspace", autospec=True)
[docs] def test_get_store(self, mock_get_gs_cascading_store, mock_wms_parsed_service, mock_wms): mock_workspace = mock_get_gs_cascading_store.return_value mock_catalog = mock_workspace.catalog mock_catalog.get_store.return_value = None mock_wms.return_value = (self.phony_url, self.parsed_wms) mock_wms_parsed_service.return_value = self.parsed_wms handler = wms.WmsServiceHandler(self.phony_url) handler._get_store(create=True) mock_catalog.create_wmsstore.assert_called_with( name=handler.name, workspace=mock_workspace, user=mock_catalog.username, password=mock_catalog.password )
@flaky(max_runs=3)
[docs] def test_local_user_cant_delete_service(self): self.client.logout() response = self.client.get(reverse("register_service")) self.assertEqual(response.status_code, 302) url = "https://maps.geosolutionsgroup.com/geoserver/ows?service=wms&version=1.3.0&request=GetCapabilities" # url = "http://fake" service_type = enumerations.WMS form_data = {"url": url, "type": service_type} form = forms.CreateServiceForm(form_data) # The service sometimes is not available, therefore the form won't be valid... if form.is_valid(): self.client.login(username="serviceowner", password="somepassword") response = self.client.post(reverse("register_service"), data=form_data) s = Service.objects.all().first() self.assertEqual(len(Service.objects.all()), 1) self.assertEqual(s.owner, self.test_user) self.client.login(username="serviceuser", password="somepassword") response = self.client.post(reverse("edit_service", args=(s.id,))) self.assertEqual(response.status_code, 401) response = self.client.post(reverse("remove_service", args=(s.id,))) self.assertEqual(response.status_code, 401) self.assertEqual(len(Service.objects.all()), 1) self.client.login(username="serviceowner", password="somepassword") form_data = { "service-title": "Foo Title", "service-description": "Foo Description", "service-abstract": "Foo Abstract", "service-keywords": "Foo, Service, OWS", } form = forms.ServiceForm(form_data, instance=s, prefix="service") self.assertTrue(form.is_valid()) response = self.client.post(reverse("edit_service", args=(s.id,)), data=form_data) self.assertEqual(s.title, "Foo Title") self.assertEqual(s.description, "Foo Description") self.assertEqual(s.abstract, "Foo Abstract") self.assertEqual(["Foo", "OWS", "Service"], list(s.keywords.values_list("name", flat=True))) response = self.client.post(reverse("remove_service", args=(s.id,))) self.assertEqual(len(Service.objects.all()), 0)
@flaky(max_runs=3)
[docs] def test_add_duplicate_remote_service_url(self): form_data = { "url": "https://demo.geosolutionsgroup.com/geoserver/ows?service=wms&version=1.3.0&request=GetCapabilities", "type": enumerations.WMS, } self.client.login(username="serviceowner", password="somepassword") # Add the first resource url = "https://demo.geosolutionsgroup.com/geoserver/ows?service=wms&version=1.3.0&request=GetCapabilities" # url = "http://fake" service_type = enumerations.WMS form_data = {"url": url, "type": service_type} form = forms.CreateServiceForm(form_data) # The service sometimes is not available, therefore the form won't be valid... if form.is_valid(): self.assertEqual(Service.objects.count(), 0) self.client.post(reverse("register_service"), data=form_data) self.assertEqual(Service.objects.count(), 1) # Try adding the same URL again form = forms.CreateServiceForm(form_data) self.assertEqual(Service.objects.count(), 1) with self.assertRaises(IntegrityError): self.client.post(reverse("register_service"), data=form_data) self.assertEqual(Service.objects.count(), 1)
[docs] class WmsServiceHarvestingTestCase(StaticLiveServerTestCase):
[docs] selenium = None
@classmethod
[docs] def setUpClass(cls): super().setUpClass() try: cls.client = Client() UserModel = get_user_model() cls.user = UserModel.objects.create_user( username="test", password="test@123", first_name="ather", last_name="ashraf", is_staff=True, is_active=True, is_superuser=False, ) cls.user.save() cls.client.login(username="test", password="test@123") cls.cookie = cls.client.cookies["sessionid"] cls.selenium = webdriver.Firefox() cls.selenium.implicitly_wait(10) cls.selenium.get(f"{cls.live_server_url}/") cls.selenium.add_cookie({"name": "sessionid", "value": cls.cookie.value, "secure": False, "path": "/"}) cls.selenium.refresh() reg_url = reverse("register_service") cls.client.get(reg_url) url = "https://demo.geosolutionsgroup.com/geoserver/ows?service=wms&version=1.3.0&request=GetCapabilities" service_type = enumerations.WMS form_data = {"url": url, "type": service_type} forms.CreateServiceForm(form_data) response = cls.client.post(reverse("register_service"), data=form_data) cls.selenium.get(cls.live_server_url + response.url) cls.selenium.refresh() except Exception as e: msg = str(e) print(msg)
@classmethod
[docs] def tearDownClass(cls): if cls.selenium: cls.selenium.quit() super().tearDownClass()
[docs] def test_harvest_resources(self): if self.selenium: table = self.selenium.find_element_by_id("resource_table") if table: test_resource_table_status(self, table, False) self.selenium.find_element_by_id("id-filter").send_keys("atlantis:roads") self.selenium.find_element_by_id("btn-id-filter").click() test_resource_table_status(self, table, True) self.selenium.find_element_by_id("name-filter").send_keys("landmarks") self.selenium.find_element_by_id("btn-name-filter").click() test_resource_table_status(self, table, True) self.selenium.find_element_by_id("desc-filter").send_keys("None") self.selenium.find_element_by_id("btn-desc-filter").click() test_resource_table_status(self, table, True) self.selenium.find_element_by_id("desc-filter").send_keys("") self.selenium.find_element_by_id("btn-desc-filter").click() test_resource_table_status(self, table, True) self.selenium.find_element_by_id("btnClearFilter").click() test_resource_table_status(self, table, False) self.selenium.find_element_by_id("id-filter").send_keys("atlantis:tiger_roads_tiger_roads") self.selenium.find_element_by_id("btn-id-filter").click()
# self.selenium.find_element_by_id('option_atlantis:tiger_roads_tiger_roads').click() # self.selenium.find_element_by_tag_name('form').submit()
[docs] SERVICES_TYPE_MODULES = [ "geonode.services.tests.dummy_services_type", "geonode.services.tests.dummy_services_type2", ]
[docs] class TestServiceViews(GeoNodeBaseTestSupport):
[docs] def setUp(self): self.user = "admin" self.passwd = "admin" self.admin = get_user_model().objects.get(username="admin") self.sut, _ = Service.objects.get_or_create( type=enumerations.WMS, name="Bogus", title="Pocus", owner=self.admin, method=enumerations.INDEXED, metadata_only=True, base_url="http://bogus.pocus.com/ows", ) self.sut.clear_dirty_state()
[docs] def test_user_admin_can_access_to_page(self): self.client.login(username="admin", password="admin") response = self.client.get(reverse("services")) self.assertEqual(response.status_code, 200)
[docs] def test_anonymous_user_can_see_the_services(self): response = self.client.get(reverse("services")) self.assertEqual(response.status_code, 200)
@override_settings(SERVICES_TYPE_MODULES=SERVICES_TYPE_MODULES)
[docs] def test_will_use_multiple_service_types_defined(self): elems = parse_services_types() expected = { "test": { "OWS": True, "handler": "TestHandler", "label": "Test Number 1", "management_view": "path.to.view1", }, "test2": { "OWS": False, "handler": "TestHandler2", "label": "Test Number 2", "management_view": "path.to.view2", }, "test3": { "OWS": True, "handler": "TestHandler3", "label": "Test Number 3", "management_view": "path.to.view3", }, "test4": { "OWS": False, "handler": "TestHandler4", "label": "Test Number 4", "management_view": "path.to.view4", }, } self.assertDictEqual(expected, elems)
@override_settings(SERVICES_TYPE_MODULES=SERVICES_TYPE_MODULES)
[docs] def test_will_use_multiple_service_types_defined_for_choices(self): elems = get_available_service_types() expected = { "WMS": {"OWS": True, "handler": wms.WmsServiceHandler, "label": "Web Map Service"}, "GN_WMS": {"OWS": True, "handler": wms.GeoNodeServiceHandler, "label": "GeoNode (Web Map Service)"}, "REST_MAP": {"OWS": False, "handler": ArcMapServiceHandler, "label": "ArcGIS REST MapServer"}, "REST_IMG": {"OWS": False, "handler": ArcImageServiceHandler, "label": "ArcGIS REST ImageServer"}, "test": { "OWS": True, "handler": "TestHandler", "label": "Test Number 1", "management_view": "path.to.view1", }, "test2": { "OWS": False, "handler": "TestHandler2", "label": "Test Number 2", "management_view": "path.to.view2", }, "test3": { "OWS": True, "handler": "TestHandler3", "label": "Test Number 3", "management_view": "path.to.view3", }, "test4": { "OWS": False, "handler": "TestHandler4", "label": "Test Number 4", "management_view": "path.to.view4", }, } self.assertDictEqual(expected, elems)
""" Just a dummy function required for the smoke test above """
[docs] class dummy_services_type:
[docs] services_type = { "test": {"OWS": True, "handler": "TestHandler", "label": "Test Number 1", "management_view": "path.to.view1"}, "test2": { "OWS": False, "handler": "TestHandler2", "label": "Test Number 2", "management_view": "path.to.view2", }, }
[docs] class dummy_services_type2:
[docs] services_type = { "test3": {"OWS": True, "handler": "TestHandler3", "label": "Test Number 3", "management_view": "path.to.view3"}, "test4": { "OWS": False, "handler": "TestHandler4", "label": "Test Number 4", "management_view": "path.to.view4", }, }