Source code for geonode.harvesting.harvesters.arcgis
#########################################################################
#
# Copyright (C) 2021 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
"""Harvesters for ArcGIS based remote servers."""
import re
import abc
import enum
import json
import logging
import typing
import uuid
from urllib.error import (
HTTPError,
URLError,
)
import urllib.parse
import arcrest
import requests
from django.contrib.gis import geos
from django.template.defaultfilters import slugify
from geonode.layers.enumerations import GXP_PTYPES
from geonode.layers.models import Dataset
from .. import (
models,
resourcedescriptor,
)
from . import base
[docs]
def parse_remote_url(url: str) -> typing.Tuple[str, typing.Optional[str], typing.Optional[str]]:
"""Parse the input url into the ArcGIS REST catalog URL and any service name."""
url_fragments = url.partition("/rest/services")
catalog_url = "".join(url_fragments[:2])
service_type = None
possible_service_name = None
service_type_regex = re.match(r".*\/(.*Server).*", "".join(url_fragments[-1:]))
if service_type_regex:
for service_type_value in service_type_regex.groups():
if ArcgisServiceType.has_value(service_type_value):
service_type = service_type_value
possible_service_name = "".join(url_fragments[-1:]).strip("/").partition(service_type)[0].rstrip("/")
other = None
break
else:
possible_service_name, other = url_fragments[-1].strip("/").partition("/")[::2]
if possible_service_name is not None and possible_service_name != "":
service_name = possible_service_name
if not service_type and other:
service_type = other.partition("/")[0]
else:
service_name = None
return catalog_url, service_name, service_type
[docs]
class ArcgisServiceResourceExtractor(abc.ABC):
"""Abstract base class with the methods that must be reimplemented
in order to add support for additional ArcGIS REST services"""
def __init__(self, service, resource_name_filter: typing.Optional[str] = None):
self.service = service
self.resource_name_filter = resource_name_filter
@abc.abstractmethod
[docs]
def get_num_resources(self) -> int:
"""Return the number of resources that can be extracted from the service."""
@abc.abstractmethod
[docs]
def list_resources(self) -> typing.List[base.BriefRemoteResource]:
"""Return a list of BriefRemoteResource with the resources exposed by the service"""
@abc.abstractmethod
[docs]
def get_resource(self, harvestable_resource: models.HarvestableResource) -> base.HarvestedResourceInfo:
"""Parse the remote resource into a HarvestedResourceInfo"""
[docs]
def _is_relevant_layer(self, layer_name: str) -> bool:
result = False
if self.resource_name_filter is not None:
if self.resource_name_filter.lower() in layer_name.lower():
result = True
else:
result = True
return result
[docs]
class ArcgisMapServiceResourceExtractor(ArcgisServiceResourceExtractor):
def __init__(self, service: arcrest.MapService):
super().__init__(service)
self.http_session = requests.Session()
self._cached_resources = None
[docs]
def get_num_resources(self) -> int:
if self._cached_resources is None:
self._cached_resources = self._extract_resources()
return len(self._cached_resources)
[docs]
def list_resources(
self,
) -> typing.List[base.BriefRemoteResource]:
if self._cached_resources is None:
self._cached_resources = self._extract_resources()
return self._cached_resources
[docs]
def get_resource(self, harvestable_resource: models.HarvestableResource):
response = self.http_session.get(harvestable_resource.unique_identifier, params={"f": "json"})
result = None
if response.status_code == requests.codes.ok:
try:
response_payload = response.json()
except json.JSONDecodeError:
logger.exception("Could not decode response payload as valid JSON")
else:
resource_descriptor = self._get_resource_descriptor(response_payload, harvestable_resource)
result = base.HarvestedResourceInfo(
resource_descriptor=resource_descriptor, additional_information=None
)
else:
logger.error(
f"Could not retrieve remote resource with unique "
f"identifier {harvestable_resource.unique_identifier!r}"
)
return result
[docs]
def _extract_resources(self) -> typing.List[base.BriefRemoteResource]:
result = []
try:
for arc_layer in self.service.layers:
if (
self._is_relevant_layer(arc_layer.name)
and arc_layer.type != ArcgisRestApiLayerType.GROUP_LAYER.value
):
result.append(self._parse_brief_layer(arc_layer))
result.extend(self._list_sub_layers(arc_layer))
except HTTPError:
logger.exception(msg="Could not list resources")
return result
[docs]
def _list_sub_layers(self, arc_layer: arcrest.MapLayer) -> typing.List[base.BriefRemoteResource]:
result = []
for sub_layer in arc_layer.subLayers:
if self._is_relevant_layer(arc_layer.name) and arc_layer.type != ArcgisRestApiLayerType.GROUP_LAYER.value:
result.append(self._parse_brief_layer(sub_layer))
result.extend(self._list_sub_layers(sub_layer))
return result
[docs]
def _get_resource_descriptor(
self, layer_representation: typing.Dict, harvestable_resource: models.HarvestableResource
) -> resourcedescriptor.RecordDescription:
if harvestable_resource.geonode_resource is None:
resource_uuid = uuid.uuid4()
else:
resource_uuid = uuid.UUID(harvestable_resource.geonode_resource.uuid)
_, service_name, service_type = parse_remote_url(harvestable_resource.unique_identifier)
epsg_code, spatial_extent = _parse_spatial_extent(layer_representation["extent"])
ows_url = harvestable_resource.unique_identifier.rpartition("/")[0]
store = slugify(ows_url)
name = layer_representation.get("id", layer_representation.get("name", "Undefined"))
title = layer_representation.get("name", layer_representation.get("title", "Undefined"))
workspace = "remoteWorkspace"
alternate = f"{workspace}:{name}"
return resourcedescriptor.RecordDescription(
uuid=resource_uuid,
identification=resourcedescriptor.RecordIdentification(
name=name,
title=title,
abstract=layer_representation.get("description", ""),
other_constraints=layer_representation.get("copyrightTest", ""),
spatial_extent=spatial_extent,
other_keywords=[
"ESRI",
f"ArcGIS REST {self.service.__service_type__}",
],
),
distribution=resourcedescriptor.RecordDistribution(
link_url=harvestable_resource.unique_identifier,
thumbnail_url=None,
),
reference_systems=[epsg_code],
additional_parameters={
"store": store,
"workspace": workspace,
"alternate": alternate,
"ows_url": ows_url,
"ptype": GXP_PTYPES["REST_MAP"],
},
)
[docs]
def _parse_brief_layer(self, arc_layer: arcrest.MapLayer) -> base.BriefRemoteResource:
base_url = urllib.parse.urlparse(self.service.url)
layer_path = "/".join((base_url.path.rstrip("/"), str(arc_layer.id)))
layer_url = urllib.parse.urlunparse((base_url.scheme, base_url.netloc, layer_path, "", "", ""))
return base.BriefRemoteResource(
unique_identifier=layer_url,
title=arc_layer.name,
resource_type=arc_layer.type,
)
[docs]
class ArcgisImageServiceResourceExtractor(ArcgisServiceResourceExtractor):
def __init__(self, service: arcrest.ImageService):
super().__init__(service)
self.http_session = requests.Session()
[docs]
def list_resources(self) -> typing.List[base.BriefRemoteResource]:
name = self._get_resource_name()
if self._is_relevant_layer(name):
unique_id = self.service.url.rpartition("?")[0].rstrip("/")
result = [
base.BriefRemoteResource(
unique_identifier=unique_id,
title=name,
resource_type="raster",
)
]
else:
result = []
return result
[docs]
def get_resource(self, harvestable_resource: models.HarvestableResource) -> base.HarvestedResourceInfo:
response = self.http_session.get(harvestable_resource.unique_identifier, params={"f": "json"})
result = None
if response.status_code == requests.codes.ok:
try:
response_payload = response.json()
except json.JSONDecodeError:
logger.exception("Could not decode response payload as valid JSON")
else:
resource_descriptor = self._get_resource_descriptor(response_payload, harvestable_resource)
result = base.HarvestedResourceInfo(
resource_descriptor=resource_descriptor, additional_information=None
)
else:
logger.error(
f"Could not retrieve remote resource with unique "
f"identifier {harvestable_resource.unique_identifier!r}"
)
return result
[docs]
def _get_resource_name(self):
return self.service.url.rpartition("/rest/services/")[-1].partition("/ImageServer")[0]
[docs]
def _get_resource_descriptor(
self, layer_representation: typing.Dict, harvestable_resource: models.HarvestableResource
) -> resourcedescriptor.RecordDescription:
if harvestable_resource.geonode_resource is None:
resource_uuid = uuid.uuid4()
else:
resource_uuid = uuid.UUID(harvestable_resource.geonode_resource.uuid)
_, service_name, service_type = parse_remote_url(harvestable_resource.unique_identifier)
epsg_code, spatial_extent = _parse_spatial_extent(layer_representation["extent"])
ows_url = harvestable_resource.unique_identifier.rpartition("/")[0]
store = slugify(ows_url)
name = layer_representation.get("id", layer_representation.get("name", "Undefined"))
title = layer_representation.get("name", layer_representation.get("title", "Undefined"))
workspace = "remoteWorkspace"
alternate = f"{workspace}:{name}"
return resourcedescriptor.RecordDescription(
uuid=resource_uuid,
identification=resourcedescriptor.RecordIdentification(
name=name,
title=title,
abstract=layer_representation.get("description", ""),
other_constraints=layer_representation.get("copyrightTest", ""),
spatial_extent=spatial_extent,
other_keywords=[
"ESRI",
f"ArcGIS REST {self.service.__service_type__}",
],
),
distribution=resourcedescriptor.RecordDistribution(
link_url=harvestable_resource.unique_identifier,
thumbnail_url=None,
),
reference_systems=[epsg_code],
additional_parameters={
"store": store,
"workspace": workspace,
"alternate": alternate,
"ows_url": ows_url,
"ptype": GXP_PTYPES["REST_IMG"],
},
)
[docs]
def get_resource_extractor(resource_unique_identifier: str) -> typing.Optional[ArcgisServiceResourceExtractor]:
"""A factory for instantiating the correct extractor for the resource"""
service_type_name = parse_remote_url(resource_unique_identifier)[-1]
service_type = ArcgisServiceType(service_type_name)
if service_type == ArcgisServiceType.MAP_SERVICE:
service = arcrest.MapService(resource_unique_identifier)
result = ArcgisMapServiceResourceExtractor(service)
elif service_type == ArcgisServiceType.IMAGE_SERVICE:
service = arcrest.ImageService(resource_unique_identifier)
result = ArcgisImageServiceResourceExtractor(service)
else:
logger.error(f"Unsupported ArcGIS REST service {service_type!r}")
result = None
return result
[docs]
class ArcgisHarvesterWorker(base.BaseHarvesterWorker):
[docs]
_relevant_service_extractors: typing.Optional[
typing.List[typing.Union[ArcgisMapServiceResourceExtractor, ArcgisImageServiceResourceExtractor]]
]
[docs]
_supported_service_types = {
ArcgisServiceType.MAP_SERVICE: ArcgisMapServiceResourceExtractor,
ArcgisServiceType.IMAGE_SERVICE: ArcgisImageServiceResourceExtractor,
}
def __init__(
self,
remote_url: str,
harvester_id: int,
harvest_map_services: bool = True,
harvest_image_services: bool = True,
resource_name_filter: typing.Optional[str] = True,
service_names_filter: typing.Optional[typing.List[str]] = None,
) -> None:
catalog_url, service_name, service_type_name = parse_remote_url(remote_url)
if service_name is not None:
names_filter = [service_name] + (service_names_filter or [])
service_type = ArcgisServiceType(service_type_name)
harvest_maps = (service_type == ArcgisServiceType.MAP_SERVICE) or harvest_map_services
harvest_images = (service_type == ArcgisServiceType.IMAGE_SERVICE) or harvest_image_services
else:
names_filter = service_names_filter or []
harvest_maps = harvest_map_services
harvest_images = harvest_image_services
super().__init__(catalog_url, harvester_id)
self.http_session = requests.Session()
self.harvest_map_services = harvest_maps
self.harvest_image_services = harvest_images
self.resource_name_filter = resource_name_filter
self.service_names_filter = names_filter
self._arc_catalog = None
self._relevant_service_extractors = None
@property
@property
[docs]
def arc_catalog(self):
if self._arc_catalog is None:
try:
self._arc_catalog = arcrest.Catalog(self.remote_url)
except (json.JSONDecodeError, URLError, HTTPError):
logger.exception(f"Could not connect to ArcGIS REST server at {self.remote_url!r}")
return self._arc_catalog
@classmethod
[docs]
def from_django_record(cls, harvester: "Harvester"): # noqa
return cls(
remote_url=harvester.remote_url,
harvester_id=harvester.pk,
harvest_map_services=harvester.harvester_type_specific_configuration.get("harvest_map_services", True),
harvest_image_services=harvester.harvester_type_specific_configuration.get("harvest_image_services", True),
resource_name_filter=harvester.harvester_type_specific_configuration.get("resource_name_filter"),
service_names_filter=harvester.harvester_type_specific_configuration.get("service_names_filter"),
)
@classmethod
[docs]
def get_extra_config_schema(cls) -> typing.Optional[typing.Dict]:
return {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": ("https://geonode.org/harvesting/geonode-arcgis-rest-harvester.schema.json"),
"title": "ArcGIS REST harvester config",
"description": (
"A jsonschema for validating configuration option for GeoNode's "
"remote ArcGIS REST services harvester"
),
"type": "object",
"properties": {
"harvest_map_services": {"type": "boolean", "default": True},
"harvest_image_services": {"type": "boolean", "default": True},
"resource_name_filter": {
"type": "string",
},
"service_names_filter": {
"type": "array",
"items": {
"type": "string",
},
},
},
"additionalProperties": False,
}
[docs]
def get_num_available_resources(self) -> int:
result = 0
for service_extractor in self._get_relevant_services():
result += service_extractor.get_num_resources()
return result
[docs]
def list_resources(self, offset: typing.Optional[int] = 0) -> typing.List[base.BriefRemoteResource]:
result = []
# NOTE: Since ArcGIS REST services work in a nested fashion we are
# not able to paginate the underlying results. As such, we resort to
# processing all resources sequentially. This means we only care about
# `offset=0` and explicitly return an empty list when the supplied
# offset is different.
if offset == 0:
for service_extractor in self._get_relevant_services():
result.extend(service_extractor.list_resources())
return result
[docs]
def check_availability(self, timeout_seconds: typing.Optional[int] = 5) -> bool:
return self.arc_catalog is not None
[docs]
def get_geonode_resource_type(self, remote_resource_type: str) -> typing.Type:
return Dataset
[docs]
def get_geonode_resource_defaults(
self,
harvested_info: base.HarvestedResourceInfo,
harvestable_resource: models.HarvestableResource,
) -> typing.Dict:
defaults = super().get_geonode_resource_defaults(harvested_info, harvestable_resource)
defaults["name"] = harvested_info.resource_descriptor.identification.name
defaults.update(harvested_info.resource_descriptor.additional_parameters)
return defaults
[docs]
def get_resource(
self,
harvestable_resource: models.HarvestableResource,
) -> typing.Optional[base.HarvestedResourceInfo]:
extractor = get_resource_extractor(harvestable_resource.unique_identifier)
extractor.resource_name_filter = self.resource_name_filter
return extractor.get_resource(harvestable_resource)
[docs]
def _get_extractor_class(self, service_type: ArcgisServiceType) -> typing.Optional[typing.Type]:
if service_type == ArcgisServiceType.MAP_SERVICE and self.harvest_map_services:
result = ArcgisMapServiceResourceExtractor
elif service_type == ArcgisServiceType.IMAGE_SERVICE and self.harvest_image_services:
result = ArcgisImageServiceResourceExtractor
else:
result = None
return result
[docs]
def _get_service_extractors(self, service) -> typing.List:
# This method is fugly. Unfortunately, when multiple services share the
# same name, arcrest just instantiates an `AmbiguousService` instance and
# shoves the concrete services as attributes of this instance.
# To make matters more unpleasant, the arcrest `AmbiguousService` class is
# defined inside the `__getitem__` method of another class, so it cannot be
# imported outside of it. Thus we resort to checking if there is a
# `__service_type__` attribute on the service in order to deduct whether this is a
# legit service or an ambiguous one and then deal with it
result = []
if not hasattr(service, "__service_type__"):
# this is an arcrest AmbiguousService instance
for sub_service_type in service.__dict__.keys():
try:
type_ = ArcgisServiceType(sub_service_type)
except ValueError:
logger.debug(f"Unrecognized service type: {sub_service_type!r}")
continue
else:
extractor_class = self._get_extractor_class(type_)
if extractor_class is not None:
sub_service = getattr(service, sub_service_type)
extractor = extractor_class(sub_service)
extractor.resource_name_filter = self.resource_name_filter
result.append(extractor)
else:
try:
type_ = ArcgisServiceType(service.__service_type__)
except ValueError:
logger.debug(f"Unrecognized service type: {service.__service_type__!r}")
else:
extractor_class = self._get_extractor_class(type_)
if extractor_class is not None:
extractor = extractor_class(service)
extractor.resource_name_filter = self.resource_name_filter
result.append(extractor)
return result
[docs]
def _get_relevant_services(
self,
) -> typing.List[typing.Union[ArcgisMapServiceResourceExtractor, ArcgisImageServiceResourceExtractor]]:
if self._relevant_service_extractors is None:
result = []
relevant_service_names = self.service_names_filter or self.arc_catalog.servicenames
for service_name in relevant_service_names:
service = None
for _folder in service_name.split("/"):
if not service:
service = self.arc_catalog[_folder]
else:
service = service[_folder]
extractors = self._get_service_extractors(service)
result.extend(extractors)
self._relevant_service_extractors = result
return self._relevant_service_extractors
[docs]
def _parse_spatial_extent(raw_extent: typing.Dict) -> typing.Tuple[str, geos.Polygon]:
spatial_reference = raw_extent.get("spatialReference", {})
epsg_code = f"EPSG:{spatial_reference.get('latestWkid', spatial_reference.get('wkid'))}"
extent = geos.Polygon.from_bbox((raw_extent["xmin"], raw_extent["ymin"], raw_extent["xmax"], raw_extent["ymax"]))
return epsg_code, extent