#########################################################################
#
# Copyright (C) 2021 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
import logging
import requests
import traceback
from packaging import version
import re
import typing
import xml.etree.ElementTree as ET
from lxml import etree
from defusedxml import lxml as dlxml
from requests.auth import HTTPBasicAuth
from guardian.shortcuts import get_anonymous_user
from django.conf import settings
from django.contrib.auth.models import Group
from django.contrib.auth import get_user_model
from .acl.acl_client import Batch, Rule, AutoPriorityBatch
from geonode.geoserver.helpers import acl, acl_utils, gs_catalog
from geonode.groups.models import GroupProfile
from geonode.utils import get_dataset_workspace
[docs]
logger = logging.getLogger(__name__)
[docs]
def delete_all_acl_rules():
"""purge all existing ACL Rules"""
raise NotImplementedError("This method is not implemented")
# if settings.OGC_SERVER["default"]["ACL_SECURITY_ENABLED"] or getattr(
# settings, "ACL_SECURITY_ENABLED", False
# ):
# acl_utils.delete_all_rules()
[docs]
def delete_acl_rules_for_layer(instance):
"""Delete all rules for a given layer
This function wraps the acl_util method, since it doesn't know about Dataset model
"""
if settings.OGC_SERVER["default"]["ACL_SECURITY_ENABLED"] or getattr(
settings, "ACL_SECURITY_ENABLED", False
):
resource = instance.get_self_resource()
workspace_name = get_dataset_workspace(resource.dataset)
layer_name = (
resource.dataset.name
if resource.dataset and hasattr(resource.dataset, "name")
else resource.dataset.alternate
)
logger.debug(f"Removing rules for layer {workspace_name}:{layer_name}")
acl_utils.delete_layer_rules(workspace_name, layer_name)
[docs]
def invalidate_acl_cache():
"""invalidate ACL Cache Rules"""
if settings.OGC_SERVER["default"]["ACL_SECURITY_ENABLED"]:
try:
acl.invalidate_cache()
return True
except Exception:
tb = traceback.format_exc()
logger.debug(tb)
return False
[docs]
def toggle_dataset_cache(dataset_name, enable=True, filters=None, formats=None):
"""Disable/enable a GeoServer Tiled Dataset Configuration"""
if settings.OGC_SERVER["default"]["ACL_SECURITY_ENABLED"]:
try:
url = settings.OGC_SERVER["default"]["LOCATION"]
user = settings.OGC_SERVER["default"]["USER"]
passwd = settings.OGC_SERVER["default"]["PASSWORD"]
"""
curl -v -u admin:geoserver -XGET \
"http://<host>:<port>/geoserver/gwc/rest/layers/geonode:tasmania_roads.xml"
"""
r = requests.get(f"{url}gwc/rest/layers/{dataset_name}.xml", auth=HTTPBasicAuth(user, passwd))
if r.status_code < 200 or r.status_code > 201:
logger.debug(f"Could not Retrieve {dataset_name} Cache.")
return False
try:
xml_content = r.content
tree = dlxml.fromstring(xml_content)
gwc_id = tree.find("id")
tree.remove(gwc_id)
gwc_enabled = tree.find("enabled")
if gwc_enabled is None:
gwc_enabled = etree.Element("enabled")
tree.append(gwc_enabled)
gwc_enabled.text = str(enable).lower()
gwc_mimeFormats = tree.find("mimeFormats")
# Returns an element instance or None
if gwc_mimeFormats is not None and len(gwc_mimeFormats):
tree.remove(gwc_mimeFormats)
if formats is not None:
for format in formats:
gwc_format = etree.Element("string")
gwc_format.text = format
gwc_mimeFormats.append(gwc_format)
tree.append(gwc_mimeFormats)
gwc_parameterFilters = tree.find("parameterFilters")
if filters is None:
tree.remove(gwc_parameterFilters)
else:
for filter in filters:
for k, v in filter.items():
"""
<parameterFilters>
<styleParameterFilter>
<key>STYLES</key>
<defaultValue/>
</styleParameterFilter>
</parameterFilters>
"""
gwc_parameter = etree.Element(k)
for parameter_key, parameter_value in v.items():
gwc_parameter_key = etree.Element("key")
gwc_parameter_key.text = parameter_key
gwc_parameter_value = etree.Element("defaultValue")
gwc_parameter_value.text = parameter_value
gwc_parameter.append(gwc_parameter_key)
gwc_parameter.append(gwc_parameter_value)
gwc_parameterFilters.append(gwc_parameter)
"""
curl -v -u admin:geoserver -XPOST \
-H "Content-type: text/xml" -d @poi.xml \
"http://localhost:8080/geoserver/gwc/rest/layers/tiger:poi.xml"
"""
headers = {"Content-type": "text/xml"}
payload = ET.tostring(tree)
r = requests.post(
f"{url}gwc/rest/layers/{dataset_name}.xml",
headers=headers,
data=payload,
auth=HTTPBasicAuth(user, passwd),
)
if r.status_code < 200 or r.status_code > 201:
logger.debug(f"Could not Update {dataset_name} Cache.")
return False
except Exception:
tb = traceback.format_exc()
logger.debug(tb)
return False
return True
except Exception:
tb = traceback.format_exc()
logger.debug(tb)
return False
[docs]
def delete_dataset_cache(dataset_name):
"""Delete a GeoServer Tiled Dataset Configuration and all the cache"""
if settings.OGC_SERVER["default"]["ACL_SECURITY_ENABLED"]:
try:
url = settings.OGC_SERVER["default"]["LOCATION"]
user = settings.OGC_SERVER["default"]["USER"]
passwd = settings.OGC_SERVER["default"]["PASSWORD"]
"""
curl -v -u admin:geoserver -XDELETE \
"http://<host>:<port>/geoserver/gwc/rest/layers/geonode:tasmania_roads.xml"
"""
r = requests.delete(f"{url}gwc/rest/layers/{dataset_name}.xml", auth=HTTPBasicAuth(user, passwd))
if r.status_code < 200 or r.status_code > 201:
logger.debug(f"Could not Delete {dataset_name} Cache.")
return False
return True
except Exception:
tb = traceback.format_exc()
logger.debug(tb)
return False
[docs]
def set_geowebcache_invalidate_cache(dataset_alternate, cat=None):
"""invalidate GeoWebCache Cache Rules"""
if dataset_alternate is not None and len(dataset_alternate) and "None" not in dataset_alternate:
try:
if cat is None or cat.get_layer(dataset_alternate) is not None:
url = settings.OGC_SERVER["default"]["LOCATION"]
user = settings.OGC_SERVER["default"]["USER"]
passwd = settings.OGC_SERVER["default"]["PASSWORD"]
"""
curl -v -u admin:geoserver \
-H "Content-type: text/xml" \
-d "<truncateLayer><layerName>{dataset_alternate}</layerName></truncateLayer>" \
http://localhost:8080/geoserver/gwc/rest/masstruncate
"""
headers = {"Content-type": "text/xml"}
payload = f"<truncateLayer><layerName>{dataset_alternate}</layerName></truncateLayer>"
r = requests.post(
f"{url.rstrip('/')}/gwc/rest/masstruncate",
headers=headers,
data=payload,
auth=HTTPBasicAuth(user, passwd),
)
if r.status_code < 200 or r.status_code > 201:
logger.debug(f"Could not Truncate GWC Cache for Dataset '{dataset_alternate}'.")
except Exception:
tb = traceback.format_exc()
logger.debug(tb)
[docs]
def allow_layer_to_all(instance):
"""assign access permissions to all users
This method is only relevant to Dataset instances that have their
underlying data managed by geoserver, meaning:
* layers that are not associated with a Service
* layers that are associated with a Service that is being CASCADED through
geoserver
"""
resource = instance.get_self_resource()
logger.debug(f"Inside allow_layer_to_all for instance {instance}")
workspace = get_dataset_workspace(resource.dataset)
dataset_name = (
resource.dataset.name if resource.dataset and hasattr(resource.dataset, "name") else resource.dataset.alternate
)
logger.debug(f"Allowing {workspace}:{dataset_name} access to everybody")
try:
priority = acl_utils.get_first_available_priority()
acl.insert_rule(Rule(Rule.ALLOW, priority=priority, workspace=workspace, layer=dataset_name))
except Exception as e:
tb = traceback.format_exc()
logger.debug(tb)
raise RuntimeError(f"Could not ADD GeoServer ANONYMOUS Rule for Dataset {dataset_name}: {e}")
finally:
if not getattr(settings, "DELAYED_SECURITY_SIGNALS", False):
invalidate_acl_cache()
else:
resource.set_dirty_state()
[docs]
def create_acl_rules(layer, perms, user=None, group=None, batch: Batch = None):
"""
Collect ACL rules related to passed perms into a Batch.
If the batch does not exist, it is created and returned.
"""
if user and group:
raise ValueError("Both user and group given")
layer_name = layer.name if layer and hasattr(layer, "name") else layer.alternate
workspace_name = get_dataset_workspace(layer)
# Create new rule-set
acl_services = _get_acl_services(layer, perms)
username = (user if isinstance(user, str) else user.username) if user else None
groupname = (group if isinstance(group, str) else group.name) if group else None
users_geolimits, groups_geolimits, anonymous_geolimits = get_geolimits(layer, username, groupname)
if not batch:
batch = AutoPriorityBatch(acl_utils.get_first_available_priority(), f"Sync {workspace_name}:{layer_name}")
# Set global geolimits
# Anon limits should go at the end, but it's responsibility of the caller to create first user/group rules
for limits, scope, u, g in (
(users_geolimits, "USER", username, None),
(groups_geolimits, "GROUP", None, groupname),
(anonymous_geolimits, "ANON", None, None),
):
if limits and limits.exists():
logger.debug(f"Adding ACL {scope} GeoLimit rule: U:{u} G:{g} L:{layer} ")
wkt = limits.last().wkt
# Set priority to 0 just for compatibility with the old code
batch.add_insert_rule(
Rule(
Rule.LIMIT,
priority=0,
workspace=workspace_name,
layer=layer_name,
user=u,
group=g,
geo_limit=wkt,
catalog_mode=Rule.CM_MIXED,
)
)
if username:
msg = "Adding ACL USER rules: U:{username} S:{service} L:{layer} "
elif not groupname:
msg = "Adding ACL ANON rules: S:{service} L:{layer} "
if groupname:
msg = "Adding ACL GROUP rules: G:{groupname} S:{service} L:{layer} "
# Set services rules
for rule_fields in acl_services:
if layer and layer_name:
logger.debug(msg.format(username=username, groupname=groupname, layer=layer_name, **rule_fields))
# Set priority to 0 just for compatibility with the old code
batch.add_insert_rule(
Rule(
priority=0,
user=username,
group=groupname,
workspace=workspace_name,
layer=layer_name,
**rule_fields, # access, service, request, subfield
)
)
return batch
[docs]
def sync_resources_with_guardian(resource=None, force=False):
"""
Sync resources with Guardian and clear their dirty state
"""
from geonode.layers.models import Dataset
if resource:
datasets = Dataset.objects.filter(id=resource.id)
else:
if force:
datasets = Dataset.objects.all()
else:
datasets = Dataset.objects.filter(dirty_state=True)
if datasets and datasets.exists():
logger.debug(" --------------------------- synching with guardian!")
rules_committed = False
for dataset in datasets:
try:
batch = AutoPriorityBatch(acl_utils.get_first_available_priority(), f"Sync resources {dataset}")
acl_utils.collect_delete_layer_rules(get_dataset_workspace(dataset), dataset.name, batch)
perm_spec = dataset.get_all_level_info()
# All the other users
if "users" in perm_spec:
for user, perms in perm_spec["users"].items():
user = get_user_model().objects.get(username=user)
# Set the ACL User Rules
acl_user = str(user)
if "AnonymousUser" in acl_user or str(get_anonymous_user()) in acl_user:
acl_user = None
create_acl_rules(dataset, perms, user=acl_user, batch=batch)
# All the other groups
if "groups" in perm_spec:
for group, perms in perm_spec["groups"].items():
group = Group.objects.get(name=group)
if group and group.name and group.name == "anonymous":
group = None
# Set the ACL Group Rules
create_acl_rules(dataset, perms, group=group, batch=batch)
logger.info(f"Going to synch permissions in ACL for resource {dataset}")
rules_committed = acl.run_batch(batch)
dataset.clear_dirty_state()
except Exception as e:
logger.exception(e)
logger.warning(f"!WARNING! - Failure Synching-up Security Rules for Resource [{dataset}]")
if rules_committed:
invalidate_acl_cache()
[docs]
def get_geolimits(layer, username, groupname):
users_geolimits = None
groups_geolimits = None
anonymous_geolimits = None
if username:
users_geolimits = layer.users_geolimits.filter(user=get_user_model().objects.get(username=username))
if groupname:
if GroupProfile.objects.filter(group__name=groupname).exists():
groups_geolimits = layer.groups_geolimits.filter(group=GroupProfile.objects.get(group__name=groupname))
if not username and not groupname:
anonymous_geolimits = layer.users_geolimits.filter(user=get_anonymous_user())
return users_geolimits, groups_geolimits, anonymous_geolimits
[docs]
def has_geolimits(layer, user, group):
if user:
_user = user if isinstance(user, str) else user.username
users_geolimits = layer.users_geolimits.filter(user=get_user_model().objects.get(username=_user))
return users_geolimits.exists()
if group:
_group = group if isinstance(group, str) else group.name
if GroupProfile.objects.filter(group__name=_group).count() == 1:
groups_geolimits = layer.groups_geolimits.filter(group=GroupProfile.objects.get(group__name=_group))
return groups_geolimits.exists()
if not user and not group:
anonymous_geolimits = layer.users_geolimits.filter(user=get_anonymous_user())
return anonymous_geolimits.exists()
[docs]
def _get_acl_services(layer, perms):
edit = "change_dataset_data" in perms
download = "download_resourcebase" in perms
view = "view_resourcebase" in perms
acl_services = []
# view services
if view or "change_dataset_style" in perms:
acl_services.append({"service": "WMS", "access": True})
acl_services.append({"service": "GWC", "access": True})
# WPS
if view or download or edit:
if not download:
if geoserver_allows_wps_rules(gs_catalog.get_version()):
acl_services.append({"service": "WPS", "subfield": "GS:DOWNLOAD", "access": False})
acl_services.append({"service": "WPS", "access": True})
if download and not edit and layer.is_vector():
for request in ("TRANSACTION", "LOCKFEATURE", "GETFEATUREWITHLOCK"):
acl_services.append({"service": "WFS", "request": request, "access": False})
if download or edit:
service = "WFS" if layer.is_vector() else "WCS"
acl_services.append({"service": service, "access": True})
# TODO: check if this rule is really needed
if download and (view or edit):
acl_services.append({"service": "*", "access": True})
return acl_services
[docs]
def geoserver_allows_wps_rules(ver: str) -> bool:
try:
s = re.search(r"^[0-9]\.[0-9]*", ver)
if s:
ver_clean = s.group()
return version.parse(ver_clean) >= version.parse("2.23.0")
else:
logger.warning(f'Unparsable GeoServer version string "{ver}"')
except Exception as e:
logger.warning(f'Error evaluating GeoServer version "{ver}": {e}')
return False