Source code for geonode.geoserver.manager

#########################################################################
#
# Copyright (C) 2021 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################

import os
import typing
import logging
import tempfile
import dataclasses

from gsimporter.api import Session

from django.conf import settings
from django.db.models.query import QuerySet
from django.contrib.auth.models import Group
from django.contrib.auth import get_user_model

from geonode.maps.models import Map
from geonode.base import enumerations
from geonode.layers.models import Dataset
from geonode.upload.models import Upload
from geonode.base.models import ResourceBase
from geonode.utils import get_dataset_workspace
from geonode.services.enumerations import CASCADED
from geonode.security.utils import skip_registered_members_common_group
from geonode.security.permissions import (
    VIEW_PERMISSIONS,
    OWNER_PERMISSIONS,
    DOWNLOAD_PERMISSIONS,
    DATASET_ADMIN_PERMISSIONS,
)
from geonode.resource.manager import ResourceManager, ResourceManagerInterface
from geonode.geoserver.signals import acl_rule_assign
from .acl.acl_client import AutoPriorityBatch
from .tasks import geoserver_set_style, geoserver_delete_map, geoserver_create_style, geoserver_cascading_delete
from .helpers import (
    SpatialFilesLayerType,
    gs_catalog,
    gs_uploader,
    set_styles,
    set_time_info,
    ogc_server_settings,
    get_spatial_files_dataset_type,
    sync_instance_with_geoserver,
    set_attributes_from_geoserver,
    create_gs_thumbnail,
    create_geoserver_db_featurestore,
    acl,
    acl_utils,
)
from .security import (
    _get_gwc_filters_and_formats,
    toggle_dataset_cache,
    invalidate_acl_cache,
    has_geolimits,
    create_acl_rules,
)

[docs] logger = logging.getLogger(__name__)
@dataclasses.dataclass()
[docs] class GeoServerImporterSessionInfo:
[docs] upload_session: Upload
[docs] import_session: Session
[docs] spatial_files_type: SpatialFilesLayerType
[docs] dataset_name: typing.AnyStr
[docs] workspace: typing.AnyStr
[docs] target_store: typing.AnyStr
[docs] class GeoServerResourceManager(ResourceManagerInterface):
[docs] def search(self, filter: dict, /, resource_type: typing.Optional[object]) -> QuerySet: return resource_type.objects.none()
[docs] def exists(self, uuid: str, /, instance: ResourceBase = None) -> bool: if instance: _real_instance = instance.get_real_instance() if hasattr(_real_instance, "subtype") and _real_instance.subtype not in ["tileStore", "remote"]: try: logger.debug(f"Searching GeoServer for layer '{_real_instance.alternate}'") # Let's reset the connections first gs_catalog._cache.clear() gs_catalog.reset() if gs_catalog.get_layer(_real_instance.alternate): return True except Exception as e: logger.debug(e) return False return True return False
[docs] def delete(self, uuid: str, /, instance: ResourceBase = None) -> int: """Removes the layer from GeoServer""" # cascading_delete should only be called if # ogc_server_settings.BACKEND_WRITE_ENABLED == True if instance and getattr(ogc_server_settings, "BACKEND_WRITE_ENABLED", True): try: _real_instance = instance.get_real_instance() if ( isinstance(_real_instance, Dataset) and hasattr(_real_instance, "alternate") and _real_instance.alternate ): if ( not hasattr(_real_instance, "remote_service") or _real_instance.remote_service is None or _real_instance.remote_service.method == CASCADED ): geoserver_cascading_delete.apply_async(args=(_real_instance.alternate,), expiration=30) elif isinstance(_real_instance, Map): geoserver_delete_map.apply_async(args=(_real_instance.id,), expiration=30) except Exception as e: logger.exception(e)
[docs] def create(self, uuid: str, /, resource_type: typing.Optional[object] = None, defaults: dict = {}) -> ResourceBase: _resource = resource_type.objects.get(uuid=uuid) if resource_type == Dataset: _synced_resource = sync_instance_with_geoserver(_resource.id) _resource = _synced_resource or _resource return _resource
[docs] def update( self, uuid: str, /, instance: ResourceBase = None, xml_file: str = None, metadata_uploaded: bool = False, vals: dict = {}, regions: dict = {}, keywords: dict = {}, custom: dict = {}, notify: bool = True, **kwargs, ) -> ResourceBase: if instance: if isinstance(instance.get_real_instance(), Dataset): _synced_resource = sync_instance_with_geoserver(instance.id) instance = _synced_resource or instance return instance
[docs] def ingest( self, files: typing.List[str], /, uuid: str = None, resource_type: typing.Optional[object] = None, defaults: dict = {}, **kwargs, ) -> ResourceBase: instance = ResourceManager._get_instance(uuid) if instance and isinstance(instance.get_real_instance(), Dataset): instance = self.import_dataset( "import_dataset", instance.uuid, instance=instance, files=files, user=defaults.get("user", instance.owner), defaults=defaults, action_type="create", **kwargs, ) return instance
[docs] def copy( self, instance: ResourceBase, /, uuid: str = None, owner: settings.AUTH_USER_MODEL = None, defaults: dict = {} ) -> ResourceBase: if uuid and instance: _resource = ResourceManager._get_instance(uuid) if _resource and isinstance(_resource.get_real_instance(), Dataset): importer_session_opts = defaults.get("importer_session_opts", {}) if not importer_session_opts: _src_upload_session = Upload.objects.filter(resource=instance.get_real_instance().resourcebase_ptr) if _src_upload_session.exists(): _src_upload_session = _src_upload_session.get() if _src_upload_session and _src_upload_session.get_session: try: _src_importer_session = _src_upload_session.get_session.import_session.reload() importer_session_opts.update({"transforms": _src_importer_session.tasks[0].transforms}) except Exception as e: logger.exception(e) return self.import_dataset( "import_dataset", uuid, instance=_resource, files=defaults.get("files", None), user=defaults.get("user", _resource.owner), defaults=defaults, action_type="create", importer_session_opts=importer_session_opts, ) return _resource
[docs] def append(self, instance: ResourceBase, vals: dict = {}, *args, **kwargs) -> ResourceBase: if instance and isinstance(instance.get_real_instance(), Dataset): return self.import_dataset( "import_dataset", instance.uuid, instance=instance, files=vals.get("files", None), user=vals.get("user", instance.owner), action_type="append", importer_session_opts=vals.get("importer_session_opts", None), **kwargs, ) return instance
[docs] def replace(self, instance: ResourceBase, vals: dict = {}, *args, **kwargs) -> ResourceBase: if instance and isinstance(instance.get_real_instance(), Dataset): return self.import_dataset( "import_dataset", instance.uuid, instance=instance, files=vals.get("files", None), user=vals.get("user", instance.owner), action_type="replace", importer_session_opts=vals.get("importer_session_opts", None), **kwargs, ) return instance
[docs] def import_dataset(self, method: str, uuid: str, /, instance: ResourceBase = None, **kwargs) -> ResourceBase: instance = instance or ResourceManager._get_instance(uuid) if instance and isinstance(instance.get_real_instance(), Dataset): try: _gs_import_session_info = self._execute_resource_import( instance, kwargs.get("files", None), kwargs.get("user", instance.owner), action_type=kwargs.get("action_type", "create"), importer_session_opts=kwargs.get("importer_session_opts", None), ) import_session = _gs_import_session_info.import_session if import_session: if import_session.state == enumerations.STATE_PENDING: task = None native_crs = None target_crs = "EPSG:4326" for _task in import_session.tasks: # CRS missing/unknown if _task.state == "NO_CRS": task = _task native_crs = _task.layer.srs break if not native_crs: native_crs = "EPSG:4326" if task: task.set_srs(native_crs) transform = { "type": "ReprojectTransform", "source": native_crs, "target": target_crs, } task.remove_transforms([transform], by_field="type", save=False) task.add_transforms([transform], save=False) task.save_transforms() # Starting import process import_session.commit() import_session = import_session.reload() _gs_import_session_info.import_session = import_session _gs_import_session_info.dataset_name = import_session.tasks[0].layer.name _name = ( _gs_import_session_info.dataset_name if import_session.state == enumerations.STATE_COMPLETE else "" ) _alternate = ( f"{_gs_import_session_info.workspace}:{_gs_import_session_info.dataset_name}" if import_session.state == enumerations.STATE_COMPLETE else "" ) _to_update = { "name": _name, "title": instance.title or _gs_import_session_info.dataset_name, "files": kwargs.get("files", None), "workspace": _gs_import_session_info.workspace, "alternate": _alternate, "typename": _alternate, "store": _gs_import_session_info.target_store or _gs_import_session_info.dataset_name, "subtype": _gs_import_session_info.spatial_files_type.dataset_type, } if "defaults" in kwargs: kwargs["defaults"].update(_to_update) Dataset.objects.filter(uuid=instance.uuid).update(**_to_update) instance.get_real_instance_class().objects.filter(uuid=instance.uuid).update(**_to_update) # Refresh from DB instance.refresh_from_db() if kwargs.get("action_type", "create") == "create": set_styles(instance.get_real_instance(), gs_catalog) set_attributes_from_geoserver(instance.get_real_instance(), overwrite=True) elif kwargs.get("action_type", "create") == "create": logger.exception(Exception(f"Importer Session not valid - STATE: {import_session.state}")) if import_session.state == enumerations.STATE_COMPLETE: instance.set_processing_state(enumerations.STATE_PROCESSED) else: instance.set_processing_state(import_session.state) instance.set_dirty_state() instance.save(notify=False) except Exception as e: logger.exception(e) if kwargs.get("action_type", "create") == "create": instance.delete() instance = None return instance
[docs] def _execute_resource_import( self, instance, files: list, user, action_type: str, importer_session_opts: typing.Optional[typing.Dict] = None ): from geonode.utils import get_allowed_extensions ALLOWED_EXTENSIONS = get_allowed_extensions() session_opts = dict(importer_session_opts) if importer_session_opts is not None else {} spatial_files_type = get_spatial_files_dataset_type(ALLOWED_EXTENSIONS, files) if not spatial_files_type: raise Exception(f"No suitable Spatial Files avaialable for 'ALLOWED_EXTENSIONS' = {ALLOWED_EXTENSIONS}.") upload_session, _ = Upload.objects.get_or_create( resource=instance.get_real_instance().resourcebase_ptr, user=user ) upload_session.resource = instance.get_real_instance().resourcebase_ptr upload_session.save() _name = instance.get_real_instance().name if not _name: _name = ( session_opts.get("name", None) or os.path.splitext(os.path.basename(spatial_files_type.base_file))[0] ) instance.get_real_instance().name = _name gs_dataset = None try: gs_dataset = gs_catalog.get_layer(_name) except Exception as e: logger.debug(e) _workspace = None _target_store = None if gs_dataset: _target_store = gs_dataset.resource.store.name if instance.get_real_instance().subtype == "vector" else None _workspace = gs_dataset.resource.workspace.name if gs_dataset.resource.workspace else None if not _workspace: _workspace = session_opts.get("workspace", instance.get_real_instance().workspace) if not _workspace: _workspace = instance.get_real_instance().workspace or settings.DEFAULT_WORKSPACE if not _target_store: if instance.get_real_instance().subtype == "vector" or spatial_files_type.dataset_type == "vector": _dsname = ogc_server_settings.datastore_db["NAME"] _ds = create_geoserver_db_featurestore(store_name=_dsname, workspace=_workspace) if _ds: _target_store = session_opts.get("target_store", None) or _dsname # opening Import session for the selected layer # Let's reset the connections first gs_catalog._cache.clear() gs_catalog.reset() # Let's now try the new ingestion import_session = gs_uploader.start_import(import_id=upload_session.id, name=_name, target_store=_target_store) upload_session.set_processing_state(enumerations.STATE_PROCESSED) upload_session.import_id = import_session.id upload_session.name = _name upload_session.complete = True upload_session.processed = True upload_session.save() _gs_import_session_info = GeoServerImporterSessionInfo( upload_session=upload_session, import_session=import_session, spatial_files_type=spatial_files_type, dataset_name=None, workspace=_workspace, target_store=_target_store, ) import_session.upload_task(files) task = import_session.tasks[0] # Changing layer name, mode and target task.layer.set_target_layer_name(_name) task.set_update_mode(action_type.upper()) task.set_target(store_name=_target_store, workspace=_workspace) transforms = session_opts.get("transforms", None) if transforms: task.set_transforms(transforms) # Starting import process import_session.commit() import_session = import_session.reload() _gs_import_session_info.import_session = import_session _gs_import_session_info.dataset_name = import_session.tasks[0].layer.name return _gs_import_session_info
[docs] def remove_permissions(self, uuid: str, /, instance: ResourceBase = None) -> bool: instance = instance or ResourceManager._get_instance(uuid) try: if instance and isinstance(instance.get_real_instance(), Dataset): if settings.OGC_SERVER["default"]["ACL_SECURITY_ENABLED"]: if not getattr(settings, "DELAYED_SECURITY_SIGNALS", False): workspace = get_dataset_workspace(instance) removed = acl_utils.delete_layer_rules(workspace, instance.name) if removed: invalidate_acl_cache() else: instance.set_dirty_state() except Exception as e: logger.exception(e) return False return True
[docs] def set_permissions( self, uuid: str, /, instance: ResourceBase = None, owner: settings.AUTH_USER_MODEL = None, permissions: dict = {}, created: bool = False, approval_status_changed: bool = False, group_status_changed: bool = False, ) -> bool: _resource = instance or ResourceManager._get_instance(uuid) try: if _resource: _resource = _resource.get_real_instance() logger.info(f'Requesting ACL rules on resource "{_resource}" :: {type(_resource).__name__}') if isinstance(_resource, Dataset): if settings.OGC_SERVER["default"].get("ACL_SECURITY_ENABLED", False) or getattr( settings, "ACL_SECURITY_ENABLED", False ): if not getattr(settings, "DELAYED_SECURITY_SIGNALS", False): batch = AutoPriorityBatch( acl_utils.get_first_available_priority(), f"Set permission for resource {_resource}" ) workspace = get_dataset_workspace(_resource) if not created: acl_utils.collect_delete_layer_rules(workspace, _resource.name, batch) exist_geolimits = None _owner = owner or _resource.owner if permissions is not None and len(permissions): # Owner perms = ( OWNER_PERMISSIONS.copy() + DATASET_ADMIN_PERMISSIONS.copy() + DOWNLOAD_PERMISSIONS.copy() ) create_acl_rules(_resource, perms, _owner, None, batch) exist_geolimits = exist_geolimits or has_geolimits(_resource, _owner, None) deferred_anon_perms = [] # All the other users if "users" in permissions and len(permissions["users"]) > 0: for user, user_perms in permissions["users"].items(): _user = get_user_model().objects.get(username=user) if _user != _owner: if user == "AnonymousUser": _user = None deferred_anon_perms.append(user_perms) else: create_acl_rules(_resource, user_perms, _user, None, batch) exist_geolimits = exist_geolimits or has_geolimits(_resource, _user, None) # All the other groups if "groups" in permissions and len(permissions["groups"]) > 0: for group, perms in permissions["groups"].items(): _group = Group.objects.get(name=group) if _group and _group.name and _group.name == "anonymous": _group = None deferred_anon_perms.append(perms) else: create_acl_rules(_resource, perms, None, _group, batch) exist_geolimits = exist_geolimits or has_geolimits(_resource, None, _group) for perm in deferred_anon_perms: create_acl_rules(_resource, perm, None, None, batch) else: # Owner & Managers perms = ( OWNER_PERMISSIONS.copy() + DATASET_ADMIN_PERMISSIONS.copy() + DOWNLOAD_PERMISSIONS.copy() ) create_acl_rules(_resource, perms, _owner, None, batch) exist_geolimits = exist_geolimits or has_geolimits(_resource, _owner, None) _resource_groups, _group_managers = _resource.get_group_managers(group=_resource.group) for _group_manager in _group_managers: create_acl_rules(_resource, perms, _group_manager, None, batch) exist_geolimits = exist_geolimits or has_geolimits(_resource, _group_manager, None) for user_group in _resource_groups: if not skip_registered_members_common_group(user_group): create_acl_rules(_resource, perms, None, user_group, batch) exist_geolimits = exist_geolimits or has_geolimits(_resource, None, user_group) # Anonymous if settings.DEFAULT_ANONYMOUS_VIEW_PERMISSION: create_acl_rules(_resource, VIEW_PERMISSIONS, None, None, batch) exist_geolimits = exist_geolimits or has_geolimits(_resource, None, None) if settings.DEFAULT_ANONYMOUS_DOWNLOAD_PERMISSION: create_acl_rules(_resource, DOWNLOAD_PERMISSIONS, None, None, batch) exist_geolimits = exist_geolimits or has_geolimits(_resource, None, None) if exist_geolimits is not None: filters, formats = _get_gwc_filters_and_formats(exist_geolimits) try: _dataset_workspace = get_dataset_workspace(_resource) toggle_dataset_cache( f"{_dataset_workspace}:{_resource.name}", filters=filters, formats=formats ) except Dataset.DoesNotExist: pass try: logger.info( f"Pushing {batch.length()} " f"changes into ACL for resource {_resource.name}" ) executed = acl.run_batch(batch) if executed: acl.invalidate_cache() except Exception as e: logger.warning( f"Could not sync ACL for resource {_resource}: {e}." " Retrying async." ) _resource.set_dirty_state() else: _resource.set_dirty_state() except Exception as e: logger.exception(e) return False acl_rule_assign.send_robust(sender=instance, instance=instance) return True
[docs] def set_thumbnail( self, uuid: str, /, instance: ResourceBase = None, overwrite: bool = True, check_bbox: bool = True ) -> bool: if instance and ( isinstance(instance.get_real_instance(), Dataset) or isinstance(instance.get_real_instance(), Map) ): if overwrite or not instance.thumbnail_url: create_gs_thumbnail(instance.get_real_instance(), overwrite=overwrite, check_bbox=check_bbox) return True return False
[docs] def exec(self, method: str, uuid: str, /, instance: ResourceBase = None, **kwargs) -> ResourceBase: raise NotImplementedError
[docs] def set_style(self, method: str, uuid: str, instance: ResourceBase = None, **kwargs) -> ResourceBase: instance = instance or ResourceManager._get_instance(uuid) if instance and isinstance(instance.get_real_instance(), Dataset): try: logger.info(f"Creating style for Dataset {instance.get_real_instance()} / {kwargs}") _sld_file = kwargs.get("sld_file", None) _tempdir = kwargs.get("tempdir", tempfile.gettempdir()) if _sld_file and kwargs.get("sld_uploaded", False): geoserver_set_style(instance.get_real_instance().id, _sld_file) else: geoserver_create_style( instance.get_real_instance().id, instance.get_real_instance().name, _sld_file, _tempdir ) except Exception as e: logger.exception(e) return None return instance
[docs] def set_time_info(self, method: str, uuid: str, /, instance: ResourceBase = None, **kwargs) -> ResourceBase: instance = instance or ResourceManager._get_instance(uuid) if instance and isinstance(instance.get_real_instance(), Dataset): try: if kwargs.get("time_info", None): set_time_info(instance.get_real_instance(), **kwargs["time_info"]) except Exception as e: logger.exception(e) return None return instance