#########################################################################
#
# Copyright (C) 2016 OSGeo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
import json
import base64
import logging
import requests
import importlib
import mock
from requests.auth import HTTPBasicAuth
from tastypie.test import ResourceTestCaseMixin
from django.db.models import Q
from django.urls import reverse
from django.conf import settings
from django.http import HttpRequest
from django.test.testcases import TestCase
from django.contrib.auth import get_user_model
from django.test.utils import override_settings
from django.contrib.auth.models import AnonymousUser
from guardian.shortcuts import assign_perm, get_anonymous_user
from geonode import geoserver
from geonode.geoserver.helpers import geofence, gf_utils
from geonode.maps.models import Map
from geonode.layers.models import Dataset
from geonode.documents.models import Document
from geonode.compat import ensure_string
from geonode.utils import check_ogc_backend
from geonode.tests.utils import check_dataset
from geonode.decorators import on_ogc_backend
from geonode.resource.manager import resource_manager
from geonode.tests.base import GeoNodeBaseTestSupport
from geonode.groups.models import Group, GroupMember, GroupProfile
from geonode.layers.populate_datasets_data import create_dataset_data
from geonode.base.auth import create_auth_token, get_or_create_token
from geonode.base.models import Configuration, UserGeoLimit, GroupGeoLimit
from geonode.base.populate_test_data import (
all_public,
create_models,
create_single_doc,
create_single_map,
remove_models,
create_single_dataset,
)
from geonode.geoserver.security import (
_get_gf_services,
allow_layer_to_all,
delete_all_acl_rules,
sync_resources_with_guardian,
_get_gwc_filters_and_formats,
has_geolimits,
create_geofence_rules,
delete_acl_rules_for_layer,
)
from .utils import (
get_users_with_perms,
get_visible_resources,
)
from .permissions import PermSpec, PermSpecCompact
[docs]
logger = logging.getLogger(__name__)
[docs]
def _log(msg, *args):
logger.debug(msg, *args)
[docs]
class StreamToLogger:
"""
Fake file-like stream object that redirects writes to a logger instance.
"""
def __init__(self, logger, log_level=logging.INFO):
[docs]
self.log_level = log_level
[docs]
def write(self, buf):
for line in buf.rstrip().splitlines():
self.logger.log(self.log_level, line.rstrip())
[docs]
class SecurityTests(ResourceTestCaseMixin, GeoNodeBaseTestSupport):
"""
Tests for the Geonode security app.
"""
@classmethod
[docs]
def setUpClass(cls):
super().setUpClass()
create_models(type=cls.get_type, integration=cls.get_integration)
all_public()
@classmethod
[docs]
def tearDownClass(cls):
super().tearDownClass()
remove_models(cls.get_obj_ids, type=cls.get_type, integration=cls.get_integration)
[docs]
def setUp(self):
super().setUp()
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
settings.OGC_SERVER["default"]["ACL_SECURITY_ENABLED"] = True
self.maxDiff = None
self.user = "admin"
self.passwd = "admin"
create_dataset_data()
self.anonymous_user = get_anonymous_user()
self.config = Configuration.load()
self.list_url = reverse("api_dispatch_list", kwargs={"api_name": "api", "resource_name": "datasets"})
self.bulk_perms_url = reverse("bulk_permissions")
self.perm_spec = {"users": {"admin": ["view_resourcebase"]}, "groups": []}
@on_ogc_backend(geoserver.BACKEND_PACKAGE)
[docs]
def test_login_middleware(self):
"""
Tests the Geonode login required authentication middleware.
"""
from geonode.security.middleware import LoginRequiredMiddleware
middleware = LoginRequiredMiddleware(None)
white_list = [
reverse("account_ajax_login"),
reverse("account_confirm_email", kwargs=dict(key="test")),
reverse("account_login"),
reverse("account_reset_password"),
reverse("forgot_username"),
reverse("dataset_acls"),
reverse("dataset_resolve_user"),
]
black_list = [
reverse("account_signup"),
reverse("profile_browse"),
]
request = HttpRequest()
request.user = get_anonymous_user()
# Requests should be redirected to the the `redirected_to` path when un-authenticated user attempts to visit
# a black-listed url.
for path in black_list:
request.path = path
response = middleware.process_request(request)
if response:
self.assertEqual(response.status_code, 302)
self.assertTrue(response.get("Location").startswith(middleware.redirect_to))
# The middleware should return None when an un-authenticated user
# attempts to visit a white-listed url.
for path in white_list:
request.path = path
response = middleware.process_request(request)
self.assertIsNone(response, msg=f"Middleware activated for white listed path: {path}")
self.client.login(username="admin", password="admin")
admin = get_user_model().objects.get(username="admin")
self.assertTrue(admin.is_authenticated)
request.user = admin
# The middleware should return None when an authenticated user attempts
# to visit a black-listed url.
for path in black_list:
request.path = path
response = middleware.process_request(request)
self.assertIsNone(response)
@on_ogc_backend(geoserver.BACKEND_PACKAGE)
[docs]
def test_login_middleware_with_basic_auth(self):
"""
Tests the Geonode login required authentication middleware with Basic authenticated queries
"""
from geonode.security.middleware import LoginRequiredMiddleware
middleware = LoginRequiredMiddleware(None)
black_listed_url = reverse("dataset_upload")
white_listed_url = reverse("account_login")
# unauthorized request to black listed URL should be redirected to `redirect_to` URL
request = HttpRequest()
request.user = get_anonymous_user()
request.path = black_listed_url
response = middleware.process_request(request)
if response:
self.assertEqual(response.status_code, 302)
self.assertTrue(response.get("Location").startswith(middleware.redirect_to))
# unauthorized request to white listed URL should be allowed
request.path = white_listed_url
response = middleware.process_request(request)
self.assertIsNone(response, msg=f"Middleware activated for white listed path: {black_listed_url}")
# Basic authorized request to black listed URL should be allowed
request.path = black_listed_url
request.META["HTTP_AUTHORIZATION"] = f'Basic {base64.b64encode(b"bobby:bob").decode("utf-8")}'
response = middleware.process_request(request)
self.assertIsNone(response, msg=f"Middleware activated for white listed path: {black_listed_url}")
@on_ogc_backend(geoserver.BACKEND_PACKAGE)
[docs]
def test_login_middleware_with_custom_login_url(self):
"""
Tests the Geonode login required authentication middleware with Basic authenticated queries
"""
site_url_settings = [f"{settings.SITEURL}login/custom", "/login/custom", "login/custom"]
black_listed_url = reverse("dataset_upload")
for setting in site_url_settings:
with override_settings(LOGIN_URL=setting):
from geonode.security import middleware as mw
# reload the middleware module to fetch overridden settings
importlib.reload(mw)
middleware = mw.LoginRequiredMiddleware(None)
# unauthorized request to black listed URL should be redirected to `redirect_to` URL
request = HttpRequest()
request.user = AnonymousUser()
request.path = black_listed_url
response = middleware.process_request(request)
self.assertIsNotNone(response, "Middleware didn't activate for blacklisted URL.")
self.assertEqual(response.status_code, 302)
self.assertTrue(
response.get("Location").startswith("/"),
msg=f"Returned redirection should be a valid path starting '/'. "
f"Instead got: {response.get('Location')}",
)
@on_ogc_backend(geoserver.BACKEND_PACKAGE)
[docs]
def test_session_ctrl_middleware(self):
"""
Tests the Geonode session control authentication middleware.
"""
from geonode.security.middleware import SessionControlMiddleware
from importlib import import_module
engine = import_module(settings.SESSION_ENGINE)
middleware = SessionControlMiddleware(None)
admin = get_user_model().objects.filter(is_superuser=True).first()
request = HttpRequest()
request.user = admin
request.session = engine.SessionStore()
request.session["access_token"] = get_or_create_token(admin)
request.session.save()
middleware.process_request(request)
self.assertFalse(request.session.is_empty())
request.session["access_token"] = None
request.session.save()
middleware.process_request(request)
self.assertTrue(request.session.is_empty())
# Test the full cycle through the client
path = reverse("account_email")
self.client.login(username="admin", password="admin")
response = self.client.get(path)
self.assertEqual(response.status_code, 200)
# Simulating Token expired (or not set)
session_id = self.client.cookies.get(settings.SESSION_COOKIE_NAME)
session = engine.SessionStore(session_id.value)
session["access_token"] = None
session.save()
response = self.client.get(path)
self.assertEqual(response.status_code, 302)
@on_ogc_backend(geoserver.BACKEND_PACKAGE)
[docs]
def test_attributes_sats_refresh(self):
layers = Dataset.objects.all()[:2].values_list("id", flat=True)
test_dataset = Dataset.objects.get(id=layers[0])
self.client.login(username="admin", password="admin")
dataset_attributes = test_dataset.attributes
self.assertIsNotNone(dataset_attributes)
test_dataset.attribute_set.all().delete()
test_dataset.save()
data = {"uuid": test_dataset.uuid}
resp = self.client.post(reverse("attributes_sats_refresh"), data)
if resp.status_code == 200:
self.assertHttpOK(resp)
self.assertEqual(dataset_attributes.count(), test_dataset.attributes.count())
from geonode.geoserver.helpers import set_attributes_from_geoserver
test_dataset.attribute_set.all().delete()
test_dataset.save()
set_attributes_from_geoserver(test_dataset, overwrite=True)
self.assertEqual(dataset_attributes.count(), test_dataset.attributes.count())
# Remove permissions to anonymous users and try to refresh attributes again
test_dataset.set_permissions({"users": {"AnonymousUser": []}, "groups": []})
test_dataset.attribute_set.all().delete()
test_dataset.save()
set_attributes_from_geoserver(test_dataset, overwrite=True)
self.assertEqual(dataset_attributes.count(), test_dataset.attributes.count())
else:
# If GeoServer is unreachable, this view now returns a 302 error
self.assertEqual(resp.status_code, 302)
@on_ogc_backend(geoserver.BACKEND_PACKAGE)
[docs]
def test_invalidate_tileddataset_cache(self):
layers = Dataset.objects.all()[:2].values_list("id", flat=True)
test_dataset = Dataset.objects.get(id=layers[0])
self.client.login(username="admin", password="admin")
data = {"uuid": test_dataset.uuid}
resp = self.client.post(reverse("invalidate_tileddataset_cache"), data)
self.assertHttpOK(resp)
[docs]
def test_set_bulk_permissions(self):
"""Test that after restrict view permissions on two layers
bobby is unable to see them"""
rules_count = 0
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
delete_all_acl_rules()
# Reset GeoFence Rules
rules_count = geofence.get_rules_count()
self.assertEqual(rules_count, 0)
layers = Dataset.objects.all()[:2].values_list("id", flat=True)
layers_id = [str(x) for x in layers]
test_perm_dataset = Dataset.objects.get(id=layers[0])
self.client.login(username="admin", password="admin")
resp = self.client.get(self.list_url)
self.assertEqual(len(self.deserialize(resp)["objects"]), 8)
data = {"permissions": json.dumps(self.perm_spec), "resources": layers_id}
resp = self.client.post(self.bulk_perms_url, data)
self.assertHttpOK(resp)
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
# Check GeoFence Rules have been correctly created
rules_count = geofence.get_rules_count()
_log(f"1. rules_count: {rules_count} ")
self.assertGreaterEqual(rules_count, 10)
allow_layer_to_all(test_perm_dataset)
rules_count = geofence.get_rules_count()
_log(f"2. rules_count: {rules_count} ")
self.assertGreaterEqual(rules_count, 11)
self.client.logout()
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
self.client.login(username="bobby", password="bob")
resp = self.client.get(self.list_url)
self.assertGreaterEqual(len(self.deserialize(resp)["objects"]), 6)
# perms = get_users_with_perms(test_perm_dataset)
# _log(f"3. perms: {perms} ")
# batch = AutoPriorityBatch(get_first_available_priority(), f'test batch for {test_perm_dataset}')
# for u, p in perms.items():
# create_geofence_rules(test_perm_dataset, p, user=u, batch=batch)
# geofence.run_batch(batch)
# Check GeoFence Rules have been correctly created
rules_count = geofence.get_rules_count()
_log(f"4. rules_count: {rules_count} ")
self.assertGreaterEqual(rules_count, 13)
# Validate maximum priority
available_priority = gf_utils.get_first_available_priority()
_log(f"5. available_priority: {available_priority} ")
self.assertTrue(available_priority > 0)
url = settings.OGC_SERVER["default"]["LOCATION"]
user = settings.OGC_SERVER["default"]["USER"]
passwd = settings.OGC_SERVER["default"]["PASSWORD"]
test_url = f"{url}gwc/rest/seed/{test_perm_dataset.alternate}.json"
r = requests.get(test_url, auth=HTTPBasicAuth(user, passwd))
self.assertEqual(r.status_code, 400, f"GWC error for user: {user} URL: {test_url}\n{r.text}")
rules_count = 0
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
delete_all_acl_rules()
# Reset GeoFence Rules
rules_count = geofence.get_rules_count()
self.assertEqual(rules_count, 0)
[docs]
def test_bobby_cannot_set_all(self):
"""Test that Bobby can set the permissions only on the ones
for which he has the right"""
bobby = get_user_model().objects.get(username="bobby")
layer = Dataset.objects.all().exclude(owner=bobby)[0]
self.client.login(username="admin", password="admin")
# give bobby the right to change the layer permissions
assign_perm("change_resourcebase_permissions", bobby, layer.get_self_resource())
self.client.logout()
self.client.login(username="bobby", password="bob")
layer2 = Dataset.objects.all().exclude(owner=bobby)[1]
data = {
"permissions": json.dumps({"users": {"bobby": ["view_resourcebase"]}, "groups": []}),
"resources": [layer.id, layer2.id],
}
resp = self.client.post(self.bulk_perms_url, data)
content = resp.content
if isinstance(content, bytes):
content = content.decode("UTF-8")
self.assertNotIn(layer.title, json.loads(content)["not_changed"])
self.assertIn(layer2.title, json.loads(content)["not_changed"])
@on_ogc_backend(geoserver.BACKEND_PACKAGE)
[docs]
def test_user_can(self):
bobby = get_user_model().objects.get(username="bobby")
perm_spec = {
"users": {
"bobby": ["view_resourcebase", "download_resourcebase", "change_dataset_data", "change_dataset_style"]
},
"groups": [],
}
dataset = Dataset.objects.filter(subtype="vector").first()
dataset.set_permissions(perm_spec)
# Test user has permission with read_only=False
self.assertTrue(dataset.user_can(bobby, "change_dataset_style"))
# Test with edit permission and read_only=True
self.config.read_only = True
self.config.save()
self.assertFalse(dataset.user_can(bobby, "change_dataset_style"))
# Test with view permission and read_only=True
self.assertTrue(dataset.user_can(bobby, "view_resourcebase"))
# Test on a 'raster' subtype
self.config.read_only = False
self.config.save()
dataset = Dataset.objects.filter(subtype="raster").first()
dataset.set_permissions(perm_spec)
# Test user has permission with read_only=False
self.assertFalse(dataset.user_can(bobby, "change_dataset_data"))
self.assertTrue(dataset.user_can(bobby, "change_dataset_style"))
@on_ogc_backend(geoserver.BACKEND_PACKAGE)
[docs]
def test_perm_specs_synchronization(self):
"""Test that Dataset is correctly synchronized with guardian:
1. Set permissions to all users
2. Set permissions to a single user
3. Set permissions to a group of users
4. Try to sync a layer from GeoServer
"""
bobby = get_user_model().objects.get(username="bobby")
layer = Dataset.objects.filter(subtype="vector").exclude(owner=bobby).first()
self.client.login(username="admin", password="admin")
# Reset GeoFence Rules
delete_all_acl_rules()
self.assertEqual(geofence.get_rules_count(), 0)
perm_spec = {"users": {"AnonymousUser": []}, "groups": []}
layer.set_permissions(perm_spec)
rules_count = geofence.get_rules_count()
_log(f"1. rules_count: {rules_count} ")
self.assertEqual(rules_count, 5)
perm_spec = {"users": {"admin": ["view_resourcebase"]}, "groups": []}
layer.set_permissions(perm_spec)
rules_count = geofence.get_rules_count()
_log(f"2. rules_count: {rules_count} ")
self.assertEqual(rules_count, 9, f"Bad rules count. Got rules: {geofence.get_rules()}")
perm_spec = {"users": {"admin": ["change_dataset_data"]}, "groups": []}
layer.set_permissions(perm_spec)
rules_count = geofence.get_rules_count()
_log(f"3. rules_count: {rules_count} ")
self.assertEqual(rules_count, 8, f"Bad rules count. Got rules: {geofence.get_rules()}")
rules_objs = geofence.get_rules()
wps_subfield_found = False
for rule in rules_objs["rules"]:
if rule["service"] == "WPS" and rule["subfield"] == "GS:DOWNLOAD":
wps_subfield_found = rule["access"] == "DENY"
break
self.assertTrue(wps_subfield_found, f"WPS download not blocked. Got rules: {geofence.get_rules()}")
# FULL WFS-T
perm_spec = {
"users": {
"bobby": ["view_resourcebase", "download_resourcebase", "change_dataset_style", "change_dataset_data"]
},
"groups": [],
}
layer.set_permissions(perm_spec)
rules_count = geofence.get_rules_count()
self.assertEqual(rules_count, 10)
rules_objs = geofence.get_rules()
_deny_wfst_rule_exists = False
for rule in rules_objs["rules"]:
if rule["service"] == "WFS" and rule["userName"] == "bobby" and rule["request"] == "TRANSACTION":
_deny_wfst_rule_exists = rule["access"] == "DENY"
break
self.assertFalse(_deny_wfst_rule_exists)
# NO WFS-T
# - order is important
perm_spec = {
"users": {
"bobby": [
"view_resourcebase",
"download_resourcebase",
]
},
"groups": [],
}
layer.set_permissions(perm_spec)
rules_count = geofence.get_rules_count()
self.assertEqual(rules_count, 13)
rules_objs = geofence.get_rules()
_deny_wfst_rule_exists = False
_deny_wfst_rule_position = -1
_allow_wfs_rule_position = -1
for cnt, rule in enumerate(rules_objs["rules"]):
if rule["service"] == "WFS" and rule["userName"] == "bobby" and rule["request"] == "TRANSACTION":
_deny_wfst_rule_exists = rule["access"] == "DENY"
_deny_wfst_rule_position = cnt
elif (
rule["service"] == "WFS"
and rule["userName"] == "bobby"
and (rule["request"] is None or rule["request"] == "*")
):
_allow_wfs_rule_position = cnt
self.assertTrue(_deny_wfst_rule_exists)
self.assertTrue(_allow_wfs_rule_position > _deny_wfst_rule_position)
# NO WFS
perm_spec = {
"users": {
"bobby": [
"view_resourcebase",
]
},
"groups": [],
}
layer.set_permissions(perm_spec)
rules_count = geofence.get_rules_count()
self.assertEqual(rules_count, 9)
rules_objs = geofence.get_rules()
_deny_wfst_rule_exists = False
for rule in rules_objs["rules"]:
if rule["service"] == "WFS" and rule["userName"] == "bobby" and rule["request"] == "TRANSACTION":
_deny_wfst_rule_exists = rule["access"] == "DENY"
break
self.assertFalse(_deny_wfst_rule_exists)
perm_spec = {"users": {}, "groups": {"bar": ["view_resourcebase"]}}
layer.set_permissions(perm_spec)
rules_count = geofence.get_rules_count()
_log(f"4. rules_count: {rules_count} ")
self.assertEqual(rules_count, 9, f"Bad rule count, got rules {geofence.get_rules()}")
perm_spec = {"users": {}, "groups": {"bar": ["change_resourcebase"]}}
layer.set_permissions(perm_spec)
rules_count = geofence.get_rules_count()
_log(f"5. rules_count: {rules_count} ")
self.assertEqual(rules_count, 5)
# Testing GeoLimits
# Reset GeoFence Rules
delete_all_acl_rules()
rules_count = geofence.get_rules_count()
self.assertEqual(rules_count, 0)
layer = Dataset.objects.first()
# grab bobby
bobby = get_user_model().objects.get(username="bobby")
_disable_dataset_cache = has_geolimits(layer, None, None)
filters, formats = _get_gwc_filters_and_formats(_disable_dataset_cache)
self.assertListEqual(filters, [{"styleParameterFilter": {"STYLES": ""}}])
self.assertListEqual(
formats,
[
"application/json;type=utfgrid",
"image/gif",
"image/jpeg",
"image/png",
"image/png8",
"image/vnd.jpeg-png",
"image/vnd.jpeg-png8",
],
)
geo_limit, _ = UserGeoLimit.objects.get_or_create(user=bobby, resource=layer.get_self_resource())
geo_limit.wkt = "SRID=4326;MULTIPOLYGON (((145.8046418749977 -42.49606500060302, \
146.7000276171853 -42.53655428642583, 146.7110139453067 -43.07256577359489, \
145.9804231249952 -43.05651288026286, 145.8046418749977 -42.49606500060302)))"
geo_limit.save()
layer.users_geolimits.add(geo_limit)
self.assertEqual(layer.users_geolimits.all().count(), 1)
_disable_dataset_cache = has_geolimits(layer, bobby, None)
filters, formats = _get_gwc_filters_and_formats([_disable_dataset_cache])
self.assertIsNone(filters)
self.assertIsNone(formats)
perm_spec = {"users": {"bobby": ["view_resourcebase"]}, "groups": []}
layer.set_permissions(perm_spec)
rules_count = geofence.get_rules_count()
self.assertEqual(rules_count, 10)
rules_objs = geofence.get_rules()
self.assertEqual(len(rules_objs["rules"]), 10)
# Order is important
_limit_rule_position = -1
for cnt, rule in enumerate(rules_objs["rules"]):
if rule["service"] is None and rule["userName"] == "bobby":
self.assertEqual(rule["userName"], "bobby")
self.assertEqual(rule["workspace"], "geonode")
self.assertEqual(rule["layer"], "CA")
self.assertEqual(rule["access"], "LIMIT")
self.assertTrue("limits" in rule)
rule_limits = rule["limits"]
self.assertEqual(
rule_limits["allowedArea"],
"SRID=4326;MULTIPOLYGON (((145.8046418749977 -42.49606500060302, \
146.7000276171853 -42.53655428642583, 146.7110139453067 -43.07256577359489, \
145.9804231249952 -43.05651288026286, 145.8046418749977 -42.49606500060302)))",
)
self.assertEqual(rule_limits["catalogMode"], "MIXED")
_limit_rule_position = cnt
elif rule["userName"] == "bobby":
# When there's a limit rule, "*" must be the first one
self.assertTrue(_limit_rule_position < cnt)
geo_limit, _ = GroupGeoLimit.objects.get_or_create(
group=GroupProfile.objects.get(group__name="bar"), resource=layer.get_self_resource()
)
geo_limit.wkt = "SRID=4326;MULTIPOLYGON (((145.8046418749977 -42.49606500060302, \
146.7000276171853 -42.53655428642583, 146.7110139453067 -43.07256577359489, \
145.9804231249952 -43.05651288026286, 145.8046418749977 -42.49606500060302)))"
geo_limit.save()
layer.groups_geolimits.add(geo_limit)
self.assertEqual(layer.groups_geolimits.all().count(), 1)
perm_spec = {"users": {}, "groups": {"bar": ["change_resourcebase"]}}
layer.set_permissions(perm_spec)
rules_count = geofence.get_rules_count()
self.assertEqual(rules_count, 6)
rules_objs = geofence.get_rules()
self.assertEqual(len(rules_objs["rules"]), 6)
# Order is important
_limit_rule_position = -1
for cnt, rule in enumerate(rules_objs["rules"]):
if rule["roleName"] == "ROLE_BAR":
if rule["service"] is None:
self.assertEqual(rule["userName"], None)
self.assertEqual(rule["workspace"], "geonode")
self.assertEqual(rule["layer"], "CA")
self.assertEqual(rule["access"], "LIMIT")
self.assertTrue("limits" in rule)
rule_limits = rule["limits"]
self.assertEqual(
rule_limits["allowedArea"],
"SRID=4326;MULTIPOLYGON (((145.8046418749977 -42.49606500060302, \
146.7000276171853 -42.53655428642583, 146.7110139453067 -43.07256577359489, \
145.9804231249952 -43.05651288026286, 145.8046418749977 -42.49606500060302)))",
)
self.assertEqual(rule_limits["catalogMode"], "MIXED")
_limit_rule_position = cnt
else:
# When there's a limit rule, "*" must be the first one
self.assertTrue(_limit_rule_position < cnt)
# Change Dataset Type and SRID in order to force GeoFence allowed-area reprojection
_original_subtype = layer.subtype
_original_srid = layer.srid
layer.subtype = "raster"
layer.srid = "EPSG:3857"
layer.save()
layer.set_permissions(perm_spec)
rules_count = geofence.get_rules_count()
rules_objs = geofence.get_rules()
# Order is important
_limit_rule_position = -1
for cnt, rule in enumerate(rules_objs["rules"]):
if rule["roleName"] == "ROLE_BAR":
if rule["service"] is None:
self.assertEqual(rule["service"], None)
self.assertEqual(rule["userName"], None)
self.assertEqual(rule["workspace"], "geonode")
self.assertEqual(rule["layer"], "CA")
self.assertEqual(rule["access"], "LIMIT")
self.assertTrue("limits" in rule)
rule_limits = rule["limits"]
self.assertEqual(
rule_limits["allowedArea"],
"SRID=4326;MULTIPOLYGON (((145.8046418749977 -42.49606500060302, 146.7000276171853 \
-42.53655428642583, 146.7110139453067 -43.07256577359489, 145.9804231249952 \
-43.05651288026286, 145.8046418749977 -42.49606500060302)))",
)
self.assertEqual(rule_limits["catalogMode"], "MIXED")
_limit_rule_position = cnt
else:
# When there's a limit rule, "*" must be the first one
self.assertTrue(_limit_rule_position < cnt)
layer.subtype = _original_subtype
layer.srid = _original_srid
layer.save()
# Reset GeoFence Rules
delete_all_acl_rules()
rules_count = geofence.get_rules_count()
self.assertEqual(rules_count, 0)
@on_ogc_backend(geoserver.BACKEND_PACKAGE)
[docs]
def test_dataset_permissions(self):
# Test permissions on a layer
bobby = get_user_model().objects.get(username="bobby")
layer = create_single_dataset("san_andres_y_providencia_poi")
layer = resource_manager.update(
layer.uuid, instance=layer, notify=False, vals=dict(owner=bobby, workspace=settings.DEFAULT_WORKSPACE)
)
self.assertIsNotNone(layer)
self.assertIsNotNone(layer.ows_url)
self.assertIsNotNone(layer.ptype)
self.assertIsNotNone(layer.sourcetype)
self.assertEqual(layer.alternate, "geonode:san_andres_y_providencia_poi")
# Reset GeoFence Rules
delete_all_acl_rules()
rules_count = geofence.get_rules_count()
self.assertEqual(rules_count, 0)
layer = Dataset.objects.get(name="san_andres_y_providencia_poi")
# removing duplicates
while Dataset.objects.filter(alternate=layer.alternate).count() > 1:
Dataset.objects.filter(alternate=layer.alternate).last().delete()
layer = Dataset.objects.get(alternate=layer.alternate)
layer.set_default_permissions(owner=bobby)
check_dataset(layer)
rules_count = geofence.get_rules_count()
_log(f"0. rules_count: {rules_count} ")
self.assertGreaterEqual(rules_count, 4)
# Set the layer private for not authenticated users
perm_spec = {"users": {"AnonymousUser": []}, "groups": []}
layer.set_permissions(perm_spec)
url = (
f"{settings.GEOSERVER_LOCATION}ows?"
"LAYERS=geonode%3Asan_andres_y_providencia_poi&STYLES="
"&FORMAT=image%2Fpng&SERVICE=WMS&VERSION=1.1.1&REQUEST=GetMap"
"&SRS=EPSG%3A4326"
"&BBOX=-81.394599749999,13.316009005566,"
"-81.370560451855,13.372728455566"
"&WIDTH=217&HEIGHT=512"
)
# test view_resourcebase permission on anonymous user
response = requests.get(url)
self.assertTrue(response.status_code, 404)
self.assertEqual(response.headers.get("Content-Type"), "application/vnd.ogc.se_xml;charset=UTF-8")
# test WMS with authenticated user that has access to the Dataset
response = requests.get(
url,
auth=HTTPBasicAuth(
username=settings.OGC_SERVER["default"]["USER"], password=settings.OGC_SERVER["default"]["PASSWORD"]
),
)
self.assertTrue(response.status_code, 200)
self.assertEqual(response.headers.get("Content-Type"), "image/png")
# test WMS with authenticated user that has no view_resourcebase:
# the layer should be not accessible
response = requests.get(url, auth=HTTPBasicAuth(username="norman", password="norman"))
self.assertTrue(response.status_code, 404)
self.assertEqual(response.headers.get("Content-Type").strip().replace(" ", ""), "text/html;charset=utf-8")
# test change_dataset_style
url = f"{settings.GEOSERVER_LOCATION}rest/workspaces/geonode/styles/san_andres_y_providencia_poi.xml"
sld = """<?xml version="1.0" encoding="UTF-8"?>
<sld:StyledLayerDescriptor xmlns:sld="http://www.opengis.net/sld"
xmlns:gml="http://www.opengis.net/gml" xmlns:ogc="http://www.opengis.net/ogc"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" version="1.0.0"
xsi:schemaLocation="http://www.opengis.net/sld http://schemas.opengis.net/sld/1.0.0/StyledLayerDescriptor.xsd">
<sld:NamedLayer>
<sld:Name>geonode:san_andres_y_providencia_poi</sld:Name>
<sld:UserStyle>
<sld:Name>san_andres_y_providencia_poi</sld:Name>
<sld:Title>san_andres_y_providencia_poi</sld:Title>
<sld:IsDefault>1</sld:IsDefault>
<sld:FeatureTypeStyle>
<sld:Rule>
<sld:PointSymbolizer>
<sld:Graphic>
<sld:Mark>
<sld:Fill>
<sld:CssParameter name="fill">#8A7700
</sld:CssParameter>
</sld:Fill>
<sld:Stroke>
<sld:CssParameter name="stroke">#bbffff
</sld:CssParameter>
</sld:Stroke>
</sld:Mark>
<sld:Size>10</sld:Size>
</sld:Graphic>
</sld:PointSymbolizer>
</sld:Rule>
</sld:FeatureTypeStyle>
</sld:UserStyle>
</sld:NamedLayer>
</sld:StyledLayerDescriptor>"""
# user without change_dataset_style cannot edit it
self.assertTrue(self.client.login(username="bobby", password="bob"))
response = self.client.put(url, sld, content_type="application/vnd.ogc.sld+xml")
self.assertEqual(response.status_code, 404)
# user with change_dataset_style can edit it
perm_spec = {"users": {"bobby": ["view_resourcebase", "change_resourcebase"]}, "groups": []}
layer.set_permissions(perm_spec)
response = self.client.put(url, sld, content_type="application/vnd.ogc.sld+xml")
# _content_type = response.getheader('Content-Type')
# self.assertEqual(_content_type, 'image/png')
# Reset GeoFence Rules
delete_all_acl_rules()
rules_count = geofence.get_rules_count()
self.assertTrue(rules_count == 0)
[docs]
def test_maplayers_default_permissions(self):
"""Verify that Dataset.set_default_permissions is behaving as expected"""
# Get a Dataset object to work with
layer = Dataset.objects.first()
# removing duplicates
while Dataset.objects.filter(alternate=layer.alternate).count() > 1:
Dataset.objects.filter(alternate=layer.alternate).last().delete()
layer = Dataset.objects.get(alternate=layer.alternate)
# Set the default permissions
layer.set_default_permissions()
# Test that the anonymous user can read
self.assertTrue(self.anonymous_user.has_perm("view_resourcebase", layer.get_self_resource()))
# Test that the owner user can read
self.assertTrue(layer.owner.has_perm("view_resourcebase", layer.get_self_resource()))
# Test that the owner user can download it
self.assertTrue(layer.owner.has_perm("download_resourcebase", layer.get_self_resource()))
# Test that the owner user can edit metadata
self.assertTrue(layer.owner.has_perm("change_resourcebase_metadata", layer.get_self_resource()))
# Test that the owner user can edit data if is vector type
if layer.subtype == "vector":
self.assertTrue(layer.owner.has_perm("change_dataset_data", layer))
# Test that the owner user can edit styles
self.assertTrue(layer.owner.has_perm("change_dataset_style", layer))
# Test that the owner can manage the layer
self.assertTrue(layer.owner.has_perm("change_resourcebase", layer.get_self_resource()))
self.assertTrue(layer.owner.has_perm("delete_resourcebase", layer.get_self_resource()))
self.assertTrue(layer.owner.has_perm("change_resourcebase_permissions", layer.get_self_resource()))
self.assertTrue(layer.owner.has_perm("publish_resourcebase", layer.get_self_resource()))
[docs]
def test_set_dataset_permissions(self):
"""Verify that the set_dataset_permissions view is behaving as expected"""
# Get a layer to work with
layer = Dataset.objects.first()
# removing duplicates
while Dataset.objects.filter(alternate=layer.alternate).count() > 1:
Dataset.objects.filter(alternate=layer.alternate).last().delete()
layer = Dataset.objects.get(alternate=layer.alternate)
# FIXME Test a comprehensive set of permissions specifications
# Set the Default Permissions
layer.set_default_permissions()
# Test that the Permissions for anonymous user are set
self.assertTrue(self.anonymous_user.has_perm("view_resourcebase", layer.get_self_resource()))
# Set the Permissions
layer.set_permissions(self.perm_spec)
# Test that the Permissions for anonymous user are un-set
self.assertFalse(self.anonymous_user.has_perm("view_resourcebase", layer.get_self_resource()))
# Test that previous permissions for users other than ones specified in
# the perm_spec (and the layers owner) were removed
current_perms = layer.get_all_level_info()
self.assertGreaterEqual(len(current_perms["users"]), 1)
# Test that there are no duplicates on returned permissions
for _k, _v in current_perms.items():
for _kk, _vv in current_perms[_k].items():
if _vv and isinstance(_vv, list):
_vv_1 = _vv.copy()
_vv_2 = list(set(_vv.copy()))
_vv_1.sort()
_vv_2.sort()
self.assertListEqual(_vv_1, _vv_2)
# Test that the User permissions specified in the perm_spec were
# applied properly
for username, perm in self.perm_spec["users"].items():
user = get_user_model().objects.get(username=username)
self.assertTrue(user.has_perm(perm, layer.get_self_resource()))
[docs]
def test_ajax_dataset_permissions(self):
"""Verify that the ajax_dataset_permissions view is behaving as expected"""
# Setup some layer names to work with
valid_dataset_typename = Dataset.objects.all().first().id
invalid_dataset_id = 9999999
# Test that an invalid layer.alternate is handled for properly
response = self.client.post(
reverse("resource_permissions", args=(invalid_dataset_id,)),
data=json.dumps(self.perm_spec),
content_type="application/json",
)
self.assertEqual(response.status_code, 404)
# Test that GET returns permissions
response = self.client.get(reverse("resource_permissions", args=(valid_dataset_typename,)))
assert "permissions" in ensure_string(response.content)
# Test that a user is required to have maps.change_dataset_permissions
# First test un-authenticated
response = self.client.post(
reverse("resource_permissions", args=(valid_dataset_typename,)),
data=json.dumps(self.perm_spec),
content_type="application/json",
)
self.assertEqual(response.status_code, 401)
# Next Test with a user that does NOT have the proper perms
self.assertTrue(self.client.login(username="norman", password="norman"))
response = self.client.post(
reverse("resource_permissions", args=(valid_dataset_typename,)),
data=json.dumps(self.perm_spec),
content_type="application/json",
)
self.assertEqual(response.status_code, 401)
# Login as a user with the proper permission and test the endpoint
self.assertTrue(self.client.login(username="admin", password="admin"))
response = self.client.post(
reverse("resource_permissions", args=(valid_dataset_typename,)),
data=json.dumps(self.perm_spec),
content_type="application/json",
)
# Test that the method returns 200
self.assertEqual(response.status_code, 200)
# Test that the permissions specification is applied
# Should we do this here, or assume the tests in
# test_set_dataset_permissions will handle for that?
[docs]
def test_perms_info(self):
"""Verify that the perms_info view is behaving as expected"""
# Test with a Dataset object
layer = Dataset.objects.first()
# removing duplicates
while Dataset.objects.filter(alternate=layer.alternate).count() > 1:
Dataset.objects.filter(alternate=layer.alternate).last().delete()
layer = Dataset.objects.get(alternate=layer.alternate)
layer.set_default_permissions()
# Test that the anonymous user can read
self.assertTrue(self.anonymous_user.has_perm("view_resourcebase", layer.get_self_resource()))
# Test that layer owner can edit layer
self.assertTrue(layer.owner.has_perm("change_resourcebase", layer.get_self_resource()))
# Test with a Map object
a_map = Map.objects.first()
a_map.set_default_permissions()
perms = get_users_with_perms(a_map)
self.assertIsNotNone(perms)
self.assertGreaterEqual(len(perms), 1)
# now we test permissions, first on an authenticated user and then on the
# anonymous user
# 1. view_resourcebase
# 2. change_resourcebase
# 3. delete_resourcebase
# 4. change_resourcebase_metadata
# 5. change_resourcebase_permissions
# 6. change_dataset_data
# 7. change_dataset_style
[docs]
def test_not_superuser_permissions(self):
rules_count = 0
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
delete_all_acl_rules()
# Reset GeoFence Rules
rules_count = geofence.get_rules_count()
self.assertTrue(rules_count == 0)
# grab bobby
bob = get_user_model().objects.get(username="bobby")
# grab a layer
layer = Dataset.objects.exclude(owner=bob).first()
# removing duplicates
while Dataset.objects.filter(alternate=layer.alternate).count() > 1:
Dataset.objects.filter(alternate=layer.alternate).last().delete()
layer = Dataset.objects.get(alternate=layer.alternate)
layer.set_default_permissions()
# verify bobby has view permissions on it
self.assertTrue(bob.has_perm("view_resourcebase", layer.get_self_resource()))
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
# Check GeoFence Rules have been correctly created
rules_count = geofence.get_rules_count()
_log(f"1. rules_count: {rules_count} ")
self.assertTrue(self.client.login(username="bobby", password="bob"))
# 1. view_resourcebase
# 1.1 has view_resourcebase: verify that bobby can access the layer
# detail page
self.assertTrue(bob.has_perm("view_resourcebase", layer.get_self_resource()))
response = self.client.get(reverse("dataset_embed", args=(layer.alternate,)))
self.assertEqual(response.status_code, 200, response.status_code)
# 1.2 has not view_resourcebase: verify that bobby can not access the
# layer detail page
layer.set_permissions({"users": {"AnonymousUser": []}, "groups": []})
Group.objects.get(name="anonymous")
response = self.client.get(reverse("dataset_embed", args=(layer.alternate,)))
self.assertTrue(response.status_code in (401, 403), response.status_code)
# 2. change_resourcebase
# 2.1 has not change_resourcebase: verify that bobby cannot access the
# layer replace page
response = self.client.get(reverse("dataset_replace", args=(layer.alternate,)))
self.assertTrue(response.status_code in (401, 403), response.status_code)
# 2.2 has change_resourcebase: verify that bobby can access the layer
# replace page
layer.set_permissions({"users": {"bobby": ["change_resourcebase"]}, "groups": []})
self.assertTrue(bob.has_perm("change_resourcebase", layer.get_self_resource()))
response = self.client.get(reverse("dataset_replace", args=(layer.alternate,)))
self.assertEqual(response.status_code, 200, response.status_code)
# 3. change_resourcebase_metadata
# 3.1 has not change_resourcebase_metadata: verify that bobby cannot
# access the layer metadata page
response = self.client.get(reverse("dataset_metadata", args=(layer.alternate,)))
self.assertTrue(response.status_code in (401, 403), response.status_code)
# 3.2 has delete_resourcebase: verify that bobby can access the layer
# delete page
layer.set_permissions(
{
"users": {"bobby": ["change_resourcebase", "change_resourcebase_metadata", "delete_resourcebase"]},
"groups": [],
}
)
self.assertTrue(bob.has_perm("change_resourcebase_metadata", layer.get_self_resource()))
response = self.client.get(reverse("dataset_metadata", args=(layer.alternate,)))
self.assertEqual(response.status_code, 200, response.status_code)
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
perms = get_users_with_perms(layer)
_log(f"2. perms: {perms} ")
batch = create_geofence_rules(layer, perms, user=bob)
geofence.run_batch(batch)
# Check GeoFence Rules have been correctly created
rules_count = geofence.get_rules_count()
_log(f"3. rules_count: {rules_count} ")
# 4. change_resourcebase_permissions
# should be impossible for the user without change_resourcebase_permissions
# to change permissions as the permission form is not available in the
# layer detail page?
# 5. change_dataset_data
# must be done in integration test sending a WFS-T request with CURL
# 6. change_dataset_style
# 6.1 has not change_dataset_style: verify that bobby cannot access
# the layer style page
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
# Only for geoserver backend
response = self.client.get(reverse("dataset_style_manage", args=(layer.alternate,)))
self.assertTrue(response.status_code in (401, 403), response.status_code)
# 7.2 has change_dataset_style: verify that bobby can access the
# change layer style page
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
# Only for geoserver backend
layer.set_permissions(
{
"users": {
"bobby": [
"change_resourcebase",
"change_resourcebase_metadata",
"delete_resourcebase",
"change_dataset_style",
]
},
"groups": [],
}
)
self.assertTrue(bob.has_perm("change_dataset_style", layer))
response = self.client.get(reverse("dataset_style_manage", args=(layer.alternate,)))
self.assertEqual(response.status_code, 200, response.status_code)
rules_count = 0
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
delete_all_acl_rules()
# Reset GeoFence Rules
rules_count = geofence.get_rules_count()
self.assertEqual(rules_count, 0, rules_count)
[docs]
def test_anonymus_permissions(self):
# grab a layer
layer = Dataset.objects.first()
# removing duplicates
while Dataset.objects.filter(alternate=layer.alternate).count() > 1:
Dataset.objects.filter(alternate=layer.alternate).last().delete()
layer = Dataset.objects.get(alternate=layer.alternate)
layer.set_default_permissions()
# 1. view_resourcebase
# 1.1 has view_resourcebase: verify that anonymous user can access
# the layer detail page
self.assertTrue(self.anonymous_user.has_perm("view_resourcebase", layer.get_self_resource()))
response = self.client.get(reverse("dataset_embed", args=(layer.alternate,)))
self.assertEqual(response.status_code, 200)
# 1.2 has not view_resourcebase: verify that anonymous user can not
# access the layer detail page
layer.set_permissions({"users": {"AnonymousUser": []}, "groups": []})
response = self.client.get(reverse("dataset_embed", args=(layer.alternate,)))
self.assertTrue(response.status_code in (302, 403))
# 2. change_resourcebase
# 2.1 has not change_resourcebase: verify that anonymous user cannot
# access the layer replace page but redirected to login
response = self.client.get(reverse("dataset_replace", args=(layer.alternate,)))
self.assertTrue(response.status_code in (302, 403))
# 3. change_resourcebase_metadata
# 3.1 has not change_resourcebase_metadata: verify that anonymous user
# cannot access the layer metadata page but redirected to login
response = self.client.get(reverse("dataset_metadata", args=(layer.alternate,)))
self.assertTrue(response.status_code in (302, 403))
# 4. change_dataset_style
# 4.1 has not change_dataset_style: verify that anonymous user cannot access
# the layer style page but redirected to login
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
# Only for geoserver backend
response = self.client.get(reverse("dataset_style_manage", args=(layer.alternate,)))
self.assertTrue(response.status_code in (302, 403))
@override_settings(ADMIN_MODERATE_UPLOADS=True, RESOURCE_PUBLISHING=True, GROUP_PRIVATE_RESOURCES=True)
[docs]
def test_get_visible_resources_advanced_workflow(self):
admin_user = get_user_model().objects.get(username="admin")
standard_user = get_user_model().objects.get(username="bobby")
self.assertIsNotNone(admin_user)
self.assertIsNotNone(standard_user)
admin_user.is_superuser = True
admin_user.save()
layers = Dataset.objects.all()
actual = get_visible_resources(
queryset=Dataset.objects.all(),
user=admin_user,
admin_approval_required=True,
unpublished_not_visible=True,
private_groups_not_visibile=True,
)
# The method returns only 'metadata_only=False' resources
self.assertEqual(layers.count(), actual.count())
actual = get_visible_resources(
queryset=Dataset.objects.all(),
user=standard_user,
admin_approval_required=True,
unpublished_not_visible=True,
private_groups_not_visibile=True,
)
# The method returns only 'metadata_only=False' resources
self.assertEqual(layers.count(), actual.count())
# Test 'is_approved=False' 'is_published=False'
Dataset.objects.filter(~Q(owner=standard_user)).update(is_approved=False, is_published=False)
actual = get_visible_resources(
queryset=Dataset.objects.all(),
user=admin_user,
admin_approval_required=True,
unpublished_not_visible=True,
private_groups_not_visibile=True,
)
# The method returns only 'metadata_only=False' resources
self.assertEqual(layers.count(), actual.count())
actual = get_visible_resources(
queryset=Dataset.objects.all(),
user=standard_user,
admin_approval_required=True,
unpublished_not_visible=True,
private_groups_not_visibile=True,
)
# The method returns only 'metadata_only=False' resources
self.assertEqual(layers.count(), actual.count())
actual = get_visible_resources(
queryset=Dataset.objects.all(),
user=None,
admin_approval_required=True,
unpublished_not_visible=True,
private_groups_not_visibile=True,
)
# The method returns only 'metadata_only=False' resources
self.assertEqual(1, actual.count())
# Test private groups
private_groups = GroupProfile.objects.filter(access="private")
if private_groups.first():
private_groups.first().leave(standard_user)
Dataset.objects.filter(~Q(owner=standard_user)).update(group=private_groups.first().group)
actual = get_visible_resources(
queryset=Dataset.objects.all(),
user=admin_user,
admin_approval_required=True,
unpublished_not_visible=True,
private_groups_not_visibile=True,
)
# The method returns only 'metadata_only=False' resources
self.assertEqual(layers.count(), actual.count())
actual = get_visible_resources(
queryset=Dataset.objects.all(),
user=standard_user,
admin_approval_required=True,
unpublished_not_visible=True,
private_groups_not_visibile=True,
)
# The method returns only 'metadata_only=False' resources
self.assertEqual(8, actual.count())
actual = get_visible_resources(
queryset=Dataset.objects.all(),
user=None,
admin_approval_required=True,
unpublished_not_visible=True,
private_groups_not_visibile=True,
)
# The method returns only 'metadata_only=False' resources
self.assertEqual(1, actual.count())
[docs]
def test_get_visible_resources(self):
standard_user = get_user_model().objects.get(username="bobby")
layers = Dataset.objects.all()
# update user's perm on a layer,
# this should not return the layer since it will not be in user's allowed resources
_title = "common bar"
for x in Dataset.objects.filter(title=_title):
x.set_permissions({"users": {"bobby": []}, "groups": []})
actual = get_visible_resources(
queryset=layers,
user=standard_user,
admin_approval_required=True,
unpublished_not_visible=True,
private_groups_not_visibile=True,
)
self.assertNotIn(_title, list(actual.values_list("title", flat=True)))
# get layers as admin, this should return all layers with metadata_only = True
actual = get_visible_resources(queryset=layers, user=get_user_model().objects.get(username=self.user))
self.assertIn(_title, list(actual.values_list("title", flat=True)))
[docs]
def test_perm_spec_conversion(self):
"""
Perm Spec from extended to cmpact and viceversa
"""
standard_user = get_user_model().objects.get(username="bobby")
dataset = Dataset.objects.filter(owner=standard_user).first()
perm_spec = {
"users": {"bobby": ["view_resourcebase", "download_resourcebase", "change_dataset_style"]},
"groups": {},
}
_p = PermSpec(perm_spec, dataset)
self.assertDictEqual(
json.loads(str(_p)),
{"users": {"bobby": ["view_resourcebase", "download_resourcebase", "change_dataset_style"]}, "groups": {}},
)
self.assertDictEqual(
_p.compact,
{
"users": [
{
"id": standard_user.id,
"username": standard_user.username,
"first_name": standard_user.first_name,
"last_name": standard_user.last_name,
"avatar": "https://www.gravatar.com/avatar/d41d8cd98f00b204e9800998ecf8427e/?s=240",
"permissions": "owner",
"is_staff": False,
"is_superuser": False,
},
{
"avatar": "https://www.gravatar.com/avatar/7a68c67c8d409ff07e42aa5d5ab7b765/?s=240",
"first_name": "admin",
"id": 1,
"last_name": "",
"permissions": "manage",
"username": "admin",
"is_staff": True,
"is_superuser": True,
},
],
"organizations": [],
"groups": [
{"id": 3, "title": "anonymous", "name": "anonymous", "permissions": "none"},
{"id": 2, "name": "registered-members", "permissions": "none", "title": "Registered Members"},
],
},
)
perm_spec = {
"users": {
"AnonymousUser": ["view_resourcebase"],
"bobby": ["view_resourcebase", "download_resourcebase", "change_dataset_style"],
},
"groups": {},
}
_p = PermSpec(perm_spec, dataset)
self.assertDictEqual(
json.loads(str(_p)),
{
"users": {
"AnonymousUser": ["view_resourcebase"],
"bobby": ["view_resourcebase", "download_resourcebase", "change_dataset_style"],
},
"groups": {},
},
)
self.assertDictEqual(
_p.compact,
{
"users": [
{
"id": standard_user.id,
"username": standard_user.username,
"first_name": standard_user.first_name,
"last_name": standard_user.last_name,
"avatar": "https://www.gravatar.com/avatar/d41d8cd98f00b204e9800998ecf8427e/?s=240",
"permissions": "owner",
"is_staff": False,
"is_superuser": False,
},
{
"avatar": "https://www.gravatar.com/avatar/7a68c67c8d409ff07e42aa5d5ab7b765/?s=240",
"first_name": "admin",
"id": 1,
"last_name": "",
"permissions": "manage",
"username": "admin",
"is_staff": True,
"is_superuser": True,
},
],
"organizations": [],
"groups": [
{"id": 3, "title": "anonymous", "name": "anonymous", "permissions": "view"},
{"id": 2, "name": "registered-members", "permissions": "none", "title": "Registered Members"},
],
},
)
self.assertTrue(PermSpecCompact.validate(_p.compact))
_pp = PermSpecCompact(_p.compact, dataset)
_pp_e = {
"users": {
"bobby": [
"change_dataset_style",
"publish_resourcebase",
"delete_resourcebase",
"change_resourcebase_metadata",
"download_resourcebase",
"change_resourcebase",
"change_resourcebase_permissions",
"view_resourcebase",
"change_dataset_data",
],
"admin": [
"change_dataset_style",
"publish_resourcebase",
"delete_resourcebase",
"change_resourcebase_metadata",
"download_resourcebase",
"change_resourcebase",
"change_resourcebase_permissions",
"view_resourcebase",
"change_dataset_data",
],
"AnonymousUser": ["view_resourcebase"],
},
"groups": {"anonymous": ["view_resourcebase"], "registered-members": []},
}
self.assertListEqual(list(_pp.extended.keys()), list(_pp_e.keys()))
for _key in _pp.extended.keys():
self.assertListEqual(list(_pp.extended.get(_key).keys()), list(_pp_e.get(_key).keys()))
for __key in _pp.extended.get(_key).keys():
self.assertListEqual(
sorted(list(set(_pp.extended.get(_key).get(__key)))), sorted(list(set(_pp_e.get(_key).get(__key))))
)
_pp2 = PermSpecCompact(
{
"users": [
{
"id": standard_user.id,
"username": standard_user.username,
"first_name": standard_user.first_name,
"last_name": standard_user.last_name,
"avatar": "https://www.gravatar.com/avatar/d41d8cd98f00b204e9800998ecf8427e/?s=240",
"permissions": "view",
}
]
},
dataset,
)
_pp.merge(_pp2)
_pp_e = {
"users": {
"bobby": [
"change_resourcebase_permissions",
"change_resourcebase_metadata",
"change_dataset_data",
"change_resourcebase",
"delete_resourcebase",
"publish_resourcebase",
"download_resourcebase",
"change_dataset_style",
"view_resourcebase",
],
"admin": [
"change_resourcebase_permissions",
"change_resourcebase_metadata",
"change_dataset_data",
"change_resourcebase",
"delete_resourcebase",
"publish_resourcebase",
"download_resourcebase",
"change_dataset_style",
"view_resourcebase",
],
"AnonymousUser": ["view_resourcebase"],
},
"groups": {"anonymous": ["view_resourcebase"], "registered-members": []},
}
self.assertListEqual(list(_pp.extended.keys()), list(_pp_e.keys()))
for _key in _pp.extended.keys():
self.assertListEqual(list(_pp.extended.get(_key).keys()), list(_pp_e.get(_key).keys()))
for __key in _pp.extended.get(_key).keys():
self.assertListEqual(
sorted(list(set(_pp.extended.get(_key).get(__key)))), sorted(list(set(_pp_e.get(_key).get(__key))))
)
# Test "download" permissions retention policy
# 1. "download" permissions are allowed on "Documents"
test_document = Document.objects.first()
perm_spec = {
"users": {
"bobby": [
"view_resourcebase",
"download_resourcebase",
]
},
"groups": {},
}
_p = PermSpec(perm_spec, test_document)
self.assertDictEqual(
json.loads(str(_p)),
{
"users": {
"bobby": [
"view_resourcebase",
"download_resourcebase",
]
},
"groups": {},
},
json.loads(str(_p)),
)
self.assertDictEqual(
_p.compact,
{
"users": [
{
"id": standard_user.id,
"username": standard_user.username,
"first_name": standard_user.first_name,
"last_name": standard_user.last_name,
"avatar": "https://www.gravatar.com/avatar/d41d8cd98f00b204e9800998ecf8427e/?s=240",
"permissions": "download",
"is_staff": False,
"is_superuser": False,
},
{
"avatar": "https://www.gravatar.com/avatar/7a68c67c8d409ff07e42aa5d5ab7b765/?s=240",
"first_name": "admin",
"id": 1,
"last_name": "",
"permissions": "owner",
"username": "admin",
"is_staff": True,
"is_superuser": True,
},
],
"organizations": [],
"groups": [
{"id": 3, "title": "anonymous", "name": "anonymous", "permissions": "none"},
{"id": 2, "name": "registered-members", "permissions": "none", "title": "Registered Members"},
],
},
_p.compact,
)
# 2. "download" permissions are NOT allowed on "Maps"
test_map = Map.objects.first()
perm_spec = {
"users": {
"bobby": [
"view_resourcebase",
"download_resourcebase",
]
},
"groups": {},
}
_p = PermSpec(perm_spec, test_map)
self.assertDictEqual(
json.loads(str(_p)),
{
"users": {
"bobby": [
"view_resourcebase",
"download_resourcebase",
]
},
"groups": {},
},
json.loads(str(_p)),
)
self.assertDictEqual(
_p.compact,
{
"users": [
{
"id": standard_user.id,
"username": standard_user.username,
"first_name": standard_user.first_name,
"last_name": standard_user.last_name,
"avatar": "https://www.gravatar.com/avatar/d41d8cd98f00b204e9800998ecf8427e/?s=240",
"permissions": "view",
"is_staff": False,
"is_superuser": False,
},
{
"avatar": "https://www.gravatar.com/avatar/7a68c67c8d409ff07e42aa5d5ab7b765/?s=240",
"first_name": "admin",
"id": 1,
"last_name": "",
"permissions": "owner",
"username": "admin",
"is_staff": True,
"is_superuser": True,
},
],
"organizations": [],
"groups": [
{"id": 3, "title": "anonymous", "name": "anonymous", "permissions": "none"},
{"id": 2, "name": "registered-members", "permissions": "none", "title": "Registered Members"},
],
},
_p.compact,
)
[docs]
def test_admin_whitelisted_access_backend(self):
from geonode.security.backends import AdminRestrictedAccessBackend
from django.core.exceptions import PermissionDenied
backend = AdminRestrictedAccessBackend()
with self.settings(ADMIN_IP_WHITELIST=["88.88.88.88"]):
with self.assertRaises(PermissionDenied):
backend.authenticate(HttpRequest(), username="admin", password="admin")
with self.settings(ADMIN_IP_WHITELIST=[]):
request = HttpRequest()
request.META["REMOTE_ADDR"] = "127.0.0.1"
user = backend.authenticate(request, username="admin", password="admin")
self.assertIsNone(user)
[docs]
def test_admin_whitelisted_access_middleware(self):
from geonode.security.middleware import AdminAllowedMiddleware
get_response = mock.MagicMock()
middleware = AdminAllowedMiddleware(get_response)
admin = get_user_model().objects.filter(is_superuser=True).first()
# Test invalid IP
with self.settings(ADMIN_IP_WHITELIST=["88.88.88.88"]):
request = HttpRequest()
request.user = admin
request.path = reverse("home")
request.META["REMOTE_ADDR"] = "127.0.0.1"
middleware.process_request(request)
self.assertEqual(request.user, AnonymousUser())
request = HttpRequest()
basic_auth = base64.b64encode(b"admin:admin").decode()
request.META["HTTP_AUTHORIZATION"] = f"Basic {basic_auth}"
request.path = reverse("home")
request.META["REMOTE_ADDR"] = "127.0.0.1"
middleware.process_request(request)
self.assertIsNone(request.META.get("HTTP_AUTHORIZATION"))
token = create_auth_token(admin)
request.META["HTTP_AUTHORIZATION"] = f"Bearer {token}"
middleware.process_request(request)
self.assertIsNone(request.META.get("HTTP_AUTHORIZATION"))
with self.settings(ADMIN_IP_WHITELIST=[]):
request = HttpRequest()
request.user = admin
request.path = reverse("home")
request.META["REMOTE_ADDR"] = "127.0.0.1"
middleware.process_request(request)
self.assertTrue(request.user.is_superuser)
# Test valid IP
with self.settings(ADMIN_IP_WHITELIST=["127.0.0.1"]):
request = HttpRequest()
request.user = admin
request.path = reverse("home")
request.META["REMOTE_ADDR"] = "127.0.0.1"
middleware.process_request(request)
self.assertTrue(request.user.is_superuser)
# Test range of whitelisted IPs
with self.settings(ADMIN_IP_WHITELIST=["127.0.0.0/24"]):
request = HttpRequest()
request.user = admin
request.path = reverse("home")
request.META["REMOTE_ADDR"] = "127.0.0.1"
middleware.process_request(request)
self.assertTrue(request.user.is_superuser)
# Test valid IP in second element
with self.settings(ADMIN_IP_WHITELIST=["88.88.88.88", "127.0.0.1"]):
request = HttpRequest()
request.user = admin
request.path = reverse("home")
request.META["REMOTE_ADDR"] = "127.0.0.1"
middleware.process_request(request)
self.assertTrue(request.user.is_superuser)
[docs]
class SecurityRulesTests(TestCase):
"""
Test resources synchronization with Guardian and dirty states cleaning
"""
[docs]
def setUp(self):
self.maxDiff = None
self._l = create_single_dataset("test_dataset")
[docs]
def test_sync_resources_with_guardian_delay_false(self):
with self.settings(DELAYED_SECURITY_SIGNALS=False, ACL_SECURITY_ENABLED=True):
delete_acl_rules_for_layer(self._l)
# Set geofence (and so the dirty state)
allow_layer_to_all(self._l)
# Retrieve the same layer
dirty_dataset = Dataset.objects.get(pk=self._l.id)
# Check dirty state (True)
self.assertFalse(dirty_dataset.dirty_state)
# Call sync resources
sync_resources_with_guardian()
clean_dataset = Dataset.objects.get(pk=self._l.id)
# Check dirty state
self.assertFalse(clean_dataset.dirty_state)
# TODO: DELAYED SECURITY MUST BE REVISED
[docs]
def test_sync_resources_with_guardian_delay_true(self):
with self.settings(DELAYED_SECURITY_SIGNALS=True, ACL_SECURITY_ENABLED=True):
delete_acl_rules_for_layer(self._l)
# Set geofence (and so the dirty state)
allow_layer_to_all(self._l)
# Retrieve the same layer
dirty_dataset = Dataset.objects.get(pk=self._l.id)
# Check dirty state (True)
self.assertTrue(dirty_dataset.dirty_state)
# Call sync resources
sync_resources_with_guardian()
# clean_dataset = Dataset.objects.get(pk=self._l.id)
# Check dirty state
# TODO: DELAYED SECURITY MUST BE REVISED
# self.assertFalse(clean_dataset.dirty_state)
[docs]
class TestGetUserGeolimits(TestCase):
[docs]
def setUp(self):
self.maxDiff = None
self.layer = create_single_dataset("main-layer")
self.owner = get_user_model().objects.get(username="admin")
self.perms = {"*": ""}
self.gf_services = _get_gf_services(self.layer, self.perms)
[docs]
def test_should_not_disable_cache_for_user_without_geolimits(self):
_disable_dataset_cache = has_geolimits(self.layer, self.owner, None)
self.assertFalse(_disable_dataset_cache)
[docs]
def test_should_disable_cache_for_user_with_geolimits(self):
geo_limit, _ = UserGeoLimit.objects.get_or_create(user=self.owner, resource=self.layer)
self.layer.users_geolimits.set([geo_limit])
self.layer.refresh_from_db()
_disable_dataset_cache = has_geolimits(self.layer, self.owner, None)
self.assertTrue(_disable_dataset_cache)
[docs]
def test_should_not_disable_cache_for_anonymous_without_geolimits(self):
_disable_dataset_cache = has_geolimits(self.layer, None, None)
self.assertFalse(_disable_dataset_cache)
[docs]
def test_should_disable_cache_for_anonymous_with_geolimits(self):
geo_limit, _ = UserGeoLimit.objects.get_or_create(user=get_anonymous_user(), resource=self.layer)
self.layer.users_geolimits.set([geo_limit])
self.layer.refresh_from_db()
_disable_dataset_cache = has_geolimits(self.layer, None, None)
self.assertTrue(_disable_dataset_cache)
[docs]
class SetPermissionsTestCase(GeoNodeBaseTestSupport):
[docs]
def setUp(self):
# Creating groups and asign also to the anonymous_group
self.author, created = get_user_model().objects.get_or_create(username="author")
self.group_manager, created = get_user_model().objects.get_or_create(username="group_manager")
self.group_member, created = get_user_model().objects.get_or_create(username="group_member")
self.not_group_member, created = get_user_model().objects.get_or_create(username="not_group_member")
# Defining group profiles and members
self.group_profile, created = GroupProfile.objects.get_or_create(slug="custom_group")
self.second_custom_group, created = GroupProfile.objects.get_or_create(slug="second_custom_group")
# defining group members
GroupMember.objects.get_or_create(group=self.group_profile, user=self.author, role="member")
GroupMember.objects.get_or_create(group=self.group_profile, user=self.group_manager, role="manager")
GroupMember.objects.get_or_create(group=self.group_profile, user=self.group_member, role="member")
GroupMember.objects.get_or_create(group=self.second_custom_group, user=self.not_group_member, role="member")
# Creating he default resource
self.resource = create_single_dataset(name="test_layer", owner=self.author, group=self.group_profile.group)
self.anonymous_user = get_anonymous_user()
@override_settings(RESOURCE_PUBLISHING=False)
@override_settings(ADMIN_MODERATE_UPLOADS=False)
[docs]
def test_set_compact_permissions(self):
"""
**AUTO PUBLISHING** - test_set_compact_permissions
- `RESOURCE_PUBLISHING = False`
- `ADMIN_MODERATE_UPLOADS = False`
"""
use_cases = [
(
PermSpec({"users": {}, "groups": {}}, self.resource).compact,
{
self.author: [
"change_resourcebase",
"change_resourcebase_metadata",
"change_resourcebase_permissions",
"delete_resourcebase",
"download_resourcebase",
"publish_resourcebase",
"view_resourcebase",
],
self.group_manager: [],
self.group_member: [],
self.not_group_member: [],
self.anonymous_user: [],
},
),
(
PermSpec(
{
"users": {"AnonymousUser": ["view_resourcebase"]},
"groups": {"second_custom_group": ["change_resourcebase"]},
},
self.resource,
).compact,
{
self.author: [
"change_resourcebase",
"change_resourcebase_metadata",
"change_resourcebase_permissions",
"delete_resourcebase",
"download_resourcebase",
"publish_resourcebase",
"view_resourcebase",
],
self.group_manager: ["view_resourcebase"],
self.group_member: ["view_resourcebase"],
self.not_group_member: [
"change_resourcebase",
"view_resourcebase",
"download_resourcebase",
"change_resourcebase_metadata",
],
self.anonymous_user: ["view_resourcebase"],
},
),
]
for counter, item in enumerate(use_cases):
permissions, expected = item
self.resource.set_permissions(permissions)
for authorized_subject, expected_perms in expected.items():
perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)]
self.assertSetEqual(
set(expected_perms),
set(perms_got),
msg=f"use case #{counter} - user: {authorized_subject.username}",
)
@override_settings(RESOURCE_PUBLISHING=True)
[docs]
def test_permissions_are_set_as_expected_resource_publishing_True(self):
"""
**SIMPLE PUBLISHING** - test_permissions_are_set_as_expected_resource_publishing_True
- `RESOURCE_PUBLISHING = True` (Autopublishing is disabled)
- `ADMIN_MODERATE_UPLOADS = False`
"""
use_cases = [
(
{"users": {}, "groups": {}},
{
self.author: [
"delete_resourcebase",
"download_resourcebase",
"view_resourcebase",
"change_resourcebase",
"change_resourcebase_metadata",
"change_resourcebase_permissions",
],
self.group_manager: [
"change_resourcebase",
"change_resourcebase_metadata",
"delete_resourcebase",
"download_resourcebase",
"change_resourcebase_permissions",
"view_resourcebase",
"publish_resourcebase",
],
self.group_member: ["download_resourcebase", "view_resourcebase"],
self.not_group_member: [],
self.anonymous_user: [],
},
),
(
{"users": [], "groups": {"second_custom_group": ["view_resourcebase"]}},
{
self.author: [
"delete_resourcebase",
"download_resourcebase",
"view_resourcebase",
"change_resourcebase",
"change_resourcebase_metadata",
"change_resourcebase_permissions",
],
self.group_manager: [
"change_resourcebase",
"change_resourcebase_metadata",
"delete_resourcebase",
"download_resourcebase",
"view_resourcebase",
"change_resourcebase_permissions",
"publish_resourcebase",
],
self.group_member: ["download_resourcebase", "view_resourcebase"],
self.not_group_member: ["view_resourcebase"],
self.anonymous_user: [],
},
),
]
for counter, item in enumerate(use_cases):
permissions, expected = item
self.resource.set_permissions(permissions)
for authorized_subject, expected_perms in expected.items():
perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)]
self.assertSetEqual(
set(expected_perms),
set(perms_got),
msg=f"use case #{counter} - user: {authorized_subject.username}",
)
@override_settings(RESOURCE_PUBLISHING=True)
@override_settings(ADMIN_MODERATE_UPLOADS=True)
[docs]
def test_permissions_are_set_as_expected_admin_upload_resource_publishing_True(self):
"""
**ADVANCED WORKFLOW** - test_permissions_are_set_as_expected_admin_upload_resource_publishing_True
- `RESOURCE_PUBLISHING = True`
- `ADMIN_MODERATE_UPLOADS = True`
"""
use_cases = [
(
{"users": {}, "groups": {}},
{
self.author: [
"download_resourcebase",
"view_resourcebase",
],
self.group_manager: [
"change_resourcebase",
"change_resourcebase_metadata",
"change_resourcebase_permissions",
"publish_resourcebase",
"delete_resourcebase",
"download_resourcebase",
"view_resourcebase",
],
self.group_member: ["download_resourcebase", "view_resourcebase"],
self.not_group_member: [],
self.anonymous_user: [],
},
),
(
{"users": {}, "groups": {"second_custom_group": ["view_resourcebase"]}},
{
self.author: [
"download_resourcebase",
"view_resourcebase",
],
self.group_manager: [
"change_resourcebase",
"change_resourcebase_metadata",
"change_resourcebase_permissions",
"publish_resourcebase",
"delete_resourcebase",
"download_resourcebase",
"view_resourcebase",
],
self.group_member: ["download_resourcebase", "view_resourcebase"],
self.not_group_member: ["view_resourcebase"],
self.anonymous_user: [],
},
),
]
try:
self.resource.is_approved = True
self.resource.is_published = False
self.resource.save()
for counter, item in enumerate(use_cases):
permissions, expected = item
self.resource.set_permissions(permissions)
for authorized_subject, expected_perms in expected.items():
perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)]
self.assertSetEqual(
set(expected_perms),
set(perms_got),
msg=f"use case #{counter} - user: {authorized_subject.username}",
)
finally:
self.resource.is_approved = True
self.resource.is_published = True
self.resource.save()
@override_settings(RESOURCE_PUBLISHING=False)
@override_settings(ADMIN_MODERATE_UPLOADS=False)
[docs]
def test_permissions_are_set_as_expected_admin_upload_resource_publishing_False(self):
"""
**AUTO PUBLISHING** - test_permissions_are_set_as_expected_admin_upload_resource_publishing_False
- `RESOURCE_PUBLISHING = False`
- `ADMIN_MODERATE_UPLOADS = False`
"""
use_cases = [
(
{"users": {}, "groups": {}},
{
self.author: [
"change_resourcebase",
"change_resourcebase_metadata",
"change_resourcebase_permissions",
"delete_resourcebase",
"download_resourcebase",
"publish_resourcebase",
"view_resourcebase",
],
self.group_manager: [],
self.group_member: [],
self.not_group_member: [],
self.anonymous_user: [],
},
),
(
{
"users": {"AnonymousUser": ["view_resourcebase"]},
"groups": {"second_custom_group": ["change_resourcebase"]},
},
{
self.author: [
"change_resourcebase",
"change_resourcebase_metadata",
"change_resourcebase_permissions",
"delete_resourcebase",
"download_resourcebase",
"publish_resourcebase",
"view_resourcebase",
],
self.group_manager: ["view_resourcebase"],
self.group_member: ["view_resourcebase"],
self.not_group_member: ["view_resourcebase", "change_resourcebase"],
self.anonymous_user: ["view_resourcebase"],
},
),
]
for counter, item in enumerate(use_cases):
permissions, expected = item
self.resource.set_permissions(permissions)
for authorized_subject, expected_perms in expected.items():
perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)]
self.assertSetEqual(
set(expected_perms),
set(perms_got),
msg=f"use case #{counter} - user: {authorized_subject.username}",
)
@override_settings(RESOURCE_PUBLISHING=True)
@override_settings(ADMIN_MODERATE_UPLOADS=True)
@override_settings(RESOURCE_PUBLISHING=True)
@override_settings(ADMIN_MODERATE_UPLOADS=True)
[docs]
def test_permissions_on_user_role_demote_to_member(self):
"""
**ADVANCED WORKFLOW** - test_permissions_on_user_role_demote_to_member
- `RESOURCE_PUBLISHING = True`
- `ADMIN_MODERATE_UPLOADS = True`
"""
sut = GroupMember.objects.filter(user=self.group_manager).exclude(group__title="Registered Members").first()
self.assertEqual(sut.role, "manager")
sut.demote()
sut.refresh_from_db()
self.assertEqual(sut.role, "member")
expected = {
self.author: [
"download_resourcebase",
"view_resourcebase",
],
self.group_manager: ["download_resourcebase", "view_resourcebase"],
self.group_member: ["download_resourcebase", "view_resourcebase"],
}
for authorized_subject, expected_perms in expected.items():
perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)]
self.assertSetEqual(
set(expected_perms), set(perms_got), msg=f"use case #0 - user: {authorized_subject.username}"
)
@override_settings(RESOURCE_PUBLISHING=True)
[docs]
def test_permissions_on_user_role_demote_to_member_only_RESOURCE_PUBLISHING_active(self):
"""
**SIMPLE PUBLISHING** - test_permissions_on_user_role_demote_to_member_only_RESOURCE_PUBLISHING_active
- `RESOURCE_PUBLISHING = True` (Autopublishing is disabled)
- `ADMIN_MODERATE_UPLOADS = False`
"""
sut = GroupMember.objects.filter(user=self.group_manager).exclude(group__title="Registered Members").first()
self.assertEqual(sut.role, "manager")
sut.demote()
sut.refresh_from_db()
self.assertEqual(sut.role, "member")
expected = {
self.author: [
"delete_resourcebase",
"download_resourcebase",
"view_resourcebase",
"change_resourcebase",
"change_resourcebase_metadata",
"change_resourcebase_permissions",
],
self.group_manager: ["download_resourcebase", "view_resourcebase"],
self.group_member: ["download_resourcebase", "view_resourcebase"],
}
for authorized_subject, expected_perms in expected.items():
perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)]
self.assertSetEqual(
set(expected_perms), set(perms_got), msg=f"use case #0 - user: {authorized_subject.username}"
)
@override_settings(RESOURCE_PUBLISHING=True)
@override_settings(RESOURCE_PUBLISHING=True)
@override_settings(ADMIN_MODERATE_UPLOADS=True)
[docs]
class TestPermissionChanges(GeoNodeBaseTestSupport):
[docs]
def setUp(self):
# Creating groups
self.author, _ = get_user_model().objects.get_or_create(username="author")
self.group_manager, _ = get_user_model().objects.get_or_create(username="group_manager")
self.resource_group_manager, _ = get_user_model().objects.get_or_create(username="resource_group_manager")
self.group_member, _ = get_user_model().objects.get_or_create(username="group_member")
self.member_with_perms, _ = get_user_model().objects.get_or_create(username="member_with_perms")
# Defining group profiles and members
self.owner_group, _ = GroupProfile.objects.get_or_create(slug="owner_group")
self.resource_group, _ = GroupProfile.objects.get_or_create(slug="resource_group")
# defining group members
GroupMember.objects.get_or_create(group=self.owner_group, user=self.author, role="member")
GroupMember.objects.get_or_create(group=self.owner_group, user=self.group_member, role="member")
GroupMember.objects.get_or_create(group=self.owner_group, user=self.group_manager, role="manager")
GroupMember.objects.get_or_create(group=self.resource_group, user=self.resource_group_manager, role="manager")
# Creating the default resource
self.resource = create_single_dataset(
name="test_layer_adv",
owner=self.author,
is_approved=False,
is_published=False,
was_approved=False,
was_published=False,
group=self.resource_group.group,
)
self.owner_perms = ["view_resourcebase", "download_resourcebase"]
self.edit_perms = ["change_resourcebase", "change_resourcebase_metadata"]
self.dataset_perms = ["change_dataset_style", "change_dataset_data"]
self.adv_owner_limit = ["delete_resourcebase", "change_resourcebase_permissions", "publish_resourcebase"]
self.safe_perms = ["download_resourcebase", "view_resourcebase"]
self.data = {
"resource-title": self.resource.title,
"resource-owner": self.author.id,
"resource-date": "2021-10-27 05:59 am",
"resource-date_type": "publication",
"resource-language": self.resource.language,
"resource-is_approved": "on",
"resource-group": self.resource_group.group.id,
"dataset_attribute_set-TOTAL_FORMS": 0,
"dataset_attribute_set-INITIAL_FORMS": 0,
}
self.url = reverse("dataset_metadata", args=(self.resource.alternate,))
# Assign manage perms to user member_with_perms
for perm in self.dataset_perms:
assign_perm(perm, self.member_with_perms, self.resource)
for perm in self.owner_perms:
assign_perm(perm, self.member_with_perms, self.resource.get_self_resource())
# Assert inital assignment of permissions to groups and users
resource_perm_specs = self.resource.get_all_level_info()
self.assertSetEqual(
set(resource_perm_specs["users"][self.author]), set(self.owner_perms + self.edit_perms + self.dataset_perms)
)
self.assertSetEqual(
set(resource_perm_specs["users"][self.member_with_perms]), set(self.owner_perms + self.dataset_perms)
)
self.assertSetEqual(
set(resource_perm_specs["users"][self.group_manager]),
set(self.owner_perms + self.edit_perms + self.dataset_perms + self.adv_owner_limit),
)
self.assertSetEqual(
set(resource_perm_specs["users"][self.resource_group_manager]),
set(self.owner_perms + self.edit_perms + self.dataset_perms + self.adv_owner_limit),
)
self.assertSetEqual(set(resource_perm_specs["groups"][self.owner_group.group]), set(self.safe_perms))
self.assertSetEqual(set(resource_perm_specs["groups"][self.resource_group.group]), set(self.safe_perms))
[docs]
def test_permissions_on_approve_and_publish_changes(self):
# Group manager approves a resource
self.group_manager.set_password("group_manager")
self.group_manager.save()
self.assertTrue(self.client.login(username="group_manager", password="group_manager"))
response = self.client.post(self.url, data=self.data)
self.assertEqual(response.status_code, 200)
self.assertions_for_approved_or_published_is_true()
# Un approve resource
self.data.pop("resource-is_approved")
response = self.client.post(self.url, data=self.data)
self.assertEqual(response.status_code, 200)
self.assertions_for_approved_and_published_is_false()
# Admin publishes and approves resource
response = self.admin_approve_and_publish_resource()
self.assertEqual(response.status_code, 200)
self.assertions_for_approved_or_published_is_true()
# Admin Un approves and un publishes resource
response = self.admin_unapprove_and_unpublish_resource()
self.assertEqual(response.status_code, 200)
self.assertions_for_approved_and_published_is_false()
[docs]
def test_owner_is_group_manager(self):
try:
GroupMember.objects.get(group=self.owner_group, user=self.author).promote()
# Admin publishes and approves the resource
response = self.admin_approve_and_publish_resource()
self.assertEqual(response.status_code, 200)
resource_perm_specs = self.resource.get_all_level_info()
# Once a resource has been published, the 'publish_resourcebase' permission should be removed anyway
self.assertSetEqual(
set(resource_perm_specs["users"][self.author]),
set(self.owner_perms + self.edit_perms + self.dataset_perms + self.adv_owner_limit),
)
# Admin un-approves and un-publishes the resource
response = self.admin_unapprove_and_unpublish_resource()
self.assertEqual(response.status_code, 200)
resource_perm_specs = self.resource.get_all_level_info()
self.assertSetEqual(
set(resource_perm_specs["users"][self.author]),
set(self.owner_perms + self.edit_perms + self.dataset_perms + self.adv_owner_limit),
)
finally:
GroupMember.objects.get(group=self.owner_group, user=self.author).demote()
[docs]
def assertions_for_approved_or_published_is_true(self):
resource_perm_specs = self.resource.get_all_level_info()
self.assertSetEqual(set(resource_perm_specs["users"][self.author]), set(self.owner_perms))
self.assertSetEqual(
set(resource_perm_specs["users"][self.member_with_perms]), set(self.owner_perms + self.dataset_perms)
)
self.assertSetEqual(
set(resource_perm_specs["users"][self.group_manager]),
set(self.owner_perms + self.edit_perms + self.dataset_perms + self.adv_owner_limit),
)
self.assertSetEqual(
set(resource_perm_specs["users"][self.resource_group_manager]),
set(self.owner_perms + self.edit_perms + self.dataset_perms + self.adv_owner_limit),
)
self.assertSetEqual(set(resource_perm_specs["groups"][self.owner_group.group]), set(self.safe_perms))
self.assertSetEqual(set(resource_perm_specs["groups"][self.resource_group.group]), set(self.safe_perms))
[docs]
def assertions_for_approved_and_published_is_false(self):
resource_perm_specs = self.resource.get_all_level_info()
self.assertSetEqual(
set(resource_perm_specs["users"][self.author]), set(self.owner_perms + self.edit_perms + self.dataset_perms)
)
self.assertSetEqual(
set(resource_perm_specs["users"][self.member_with_perms]), set(self.owner_perms + self.dataset_perms)
)
self.assertSetEqual(
set(resource_perm_specs["users"][self.group_manager]),
set(self.owner_perms + self.edit_perms + self.dataset_perms + self.adv_owner_limit),
)
self.assertSetEqual(
set(resource_perm_specs["users"][self.resource_group_manager]),
set(self.owner_perms + self.edit_perms + self.dataset_perms + self.adv_owner_limit),
)
self.assertSetEqual(set(resource_perm_specs["groups"][self.owner_group.group]), set(self.safe_perms))
self.assertSetEqual(set(resource_perm_specs["groups"][self.resource_group.group]), set(self.safe_perms))
[docs]
def admin_approve_and_publish_resource(self):
self.assertTrue(self.client.login(username="admin", password="admin"))
self.data["resource-is_approved"] = "on"
self.data["resource-is_published"] = "on"
response = self.client.post(self.url, data=self.data)
self.resource.refresh_from_db()
return response
[docs]
def admin_unapprove_and_unpublish_resource(self):
self.assertTrue(self.client.login(username="admin", password="admin"))
self.data.pop("resource-is_approved")
self.data.pop("resource-is_published")
response = self.client.post(self.url, data=self.data)
self.resource.refresh_from_db()
return response
[docs]
class TestUserHasPerms(GeoNodeBaseTestSupport):
"""
Ensure that the Permission classes behaves as expected
"""
@classmethod
[docs]
def setUpClass(cls) -> None:
super().setUpClass()
cls.dataset = create_single_dataset(name="test_permission_dataset")
cls.document = create_single_doc(name="test_permission_doc")
cls.map = create_single_map(name="test_permission_map")
@classmethod
[docs]
def tearDownClass(self) -> None:
Dataset.objects.filter(name="test_permission_dataset").delete()
Document.objects.filter(title="test_permission_doc").delete()
Map.objects.filter(title="test_permission_map").delete()
[docs]
def setUp(self):
self.marty, _ = get_user_model().objects.get_or_create(username="marty", password="mcfly")
[docs]
def test_user_with_view_perms(self):
use_cases = [
{"resource": self.dataset, "url": "base-resources-detail"},
{"resource": self.dataset, "url": "datasets-detail"},
{"resource": self.document, "url": "documents-detail"},
{"resource": self.map, "url": "maps-detail"},
]
for _case in use_cases:
# setting the view permissions
url = reverse(_case["url"], kwargs={"pk": _case["resource"].pk})
_case["resource"].set_permissions(
{"users": {self.marty.username: ["base.view_resourcebase", "base.download_resourcebase"]}}
)
# calling the api
self.client.force_login(self.marty)
result = self.client.get(url)
# checking that the user can call the url in get
self.assertEqual(200, result.status_code, _case)
# the user cannot patch the resource
result = self.client.patch(url)
# checking that the user cannot call the url in patch due the lack of permissions
self.assertEqual(403, result.status_code, _case)
# after update the permissions list, the user can modify the resource
_case["resource"].set_permissions(
{"users": {self.marty.username: ["base.view_resourcebase", "base.change_resourcebase"]}}
)
# the user can patch the resource
result = self.client.patch(url)
# checking that the user can call the url in patch since now it has the permissions
self.assertEqual(200, result.status_code, _case)
[docs]
def test_user_with_view_listing(self):
use_cases = [
{"resource": self.dataset, "url": "base-resources-list"},
{"resource": self.dataset, "url": "datasets-list"},
{"resource": self.document, "url": "documents-list"},
{"resource": self.map, "url": "maps-list"},
]
for _case in use_cases:
# setting the view permissions
url = reverse(_case["url"])
_case["resource"].set_permissions(
{"users": {self.marty.username: ["base.view_resourcebase", "base.download_resourcebase"]}}
)
# calling the api
self.client.force_login(self.marty)
result = self.client.get(url)
# checking that the user can call the url in get
self.assertEqual(200, result.status_code, _case)
# the user cannot patch the resource
result = self.client.patch(url)
# checking that the user cannot call the url in patch due the lack of permissions
self.assertEqual(403, result.status_code, _case)
[docs]
def test_anonymous_user_is_stripped_off(self):
from geonode.base.models import ResourceBase
perms = ["base.view_resourcebase", "base.download_resourcebase"]
resource = ResourceBase.objects.get(id=self.dataset.id)
for perm in perms:
assign_perm(perm, get_anonymous_user(), resource)
assign_perm(perm, Group.objects.get(name="anonymous"), resource)
perm_spec = resource.get_all_level_info()
anonymous_user_perm = perm_spec["users"].get(get_anonymous_user())
self.assertEqual(anonymous_user_perm, None, "Anynmous user wasn't removed")